RTSPServer's Net Module


RTSPServer项目

这是一个使用类似于libevent、libuv服务器架构来实现服务器,符合事件驱动模型,同时搭配了音视频的解析功能,可以实现RTSP推流。

主要实现功能

  • 支持 H264、H265、G711A、AAC四种音频格式的转发
  • 支持同时传输音视频数据

分析其中的网络架构,以及音视频数据的解析与传输,可以很好的理解音视频的推流的实现过程,下面就是关于源码机构的理解。

源码地址:RtspServer

net 结构分析

这部分模块负责 RTSP 服务中的网络通信、事件调度与 I/O 管理,下面是这些文件的初步功能分析。


✅ 连接的处理模块

文件名 功能说明
Acceptor.cpp / .h 管理服务器端 socket 的监听,并与事件循环EventLoop()相结合,实现异步非阻塞的新连接接受处理(将newConnection事件注册到EventLoop中)。
TcpConnection.cpp / .h 表示一个客户端连接,负责数据的收发,管理状态和回调。
TcpServer.cpp / .h 封装一个 TCP 服务端,内部持有 Acceptor 和连接管理。
TcpSocket.cpp / .h 封装 TCP 套接字基本操作,包括连接、发送、接收、关闭等。
Socket.h 套接字基础类(底层抽象),用于 UDP/TCP 的通用封装。

TcpSocket:该类是对 Tcp 套接字的一个封装,它抽象出来了socket的基本操作(创建、绑定、监听、连接、接受连接、关闭),并且隐藏了平台差异(Linux + Windows)。值得注意的是,ShutdownWrite()该函数关闭写操作,常用于Tcp的半关闭。

Acceptor:该类是一个底层组件,持有TcpSocket指针(unique_ptr),只负责监听,并不关心连接之后如何处理,接受连接之后通过回调“通知上层”(TcpServer)。

TcpConnection:该类是管理客户端连接的核心组件,包括接受数据(HandleRead)、发送数据(Send、HandleWrite)、连接以及断开管理(DisConnect、HandleClose)、注册到事件循环(TaskScheduler)。为什么有了TcpConnection还需要Acceptor

TcpServer:该类是RTSP网络框架的控制中枢模块,用于管理整个TCP服务的生命周期,包括监听端口、接收连接、管理连接表(一个unordered_map用于管理所有的TcpConnection)。

网络通信模块调用流程

TcpServer::Start()—->新连接到来—->Acceptor::OnAccept()—->TcpSocket::Accept()—->调用NewConnectionCallback交给TcpServer处理—->后续sockfd有读、写、关闭等操作就交给TcpConnection的Read、Write、Close的回调函数来处理。


✅ I/O 缓冲模块

文件名 功能说明
BufferReader.cpp / .h 封装读取缓冲区逻辑,支持流式解析等操作。
BufferWriter.cpp / .h 封装写缓冲区逻辑,用于构建和发送数据。
RingBuffer.h 环形缓冲区,适合连续读写、高性能传输的情况。
Pipe.cpp / .h 提供用于 EventLoop 的“唤醒机制”(如 eventfd / 管道),实现线程安全任务调度。

BufferReader:该类实现了一个环形读缓冲区,用于管理socket读取到的数据,支持HTTP/RTSP 协议中常用的 \r\n\r\n\r\n 结尾识别。

BufferWriter:Packet 是一个内部结构体,用来表示待发送的数据块,实现了部分发送(writeIndex)。#if defined(__linux) 编译器在编译时,会自动定义特定平台的宏。

RingBuffer:该类通过put_pos_get_pos_num_datas_这些变量之间的搭配,实现了一个环形队列缓冲区,但是只适合与单线程的环境,或者将其与mutex结合来实现多线程环境下的数据缓冲区。

该模块可用于缓存服务器的解码帧数据,以及音视频的数据报,也可以减缓生产消费速度不匹配的问题。


✅ 事件驱动与调度模块

文件名 功能说明
EventLoop.cpp / .h 核心事件循环类,类似 libevent 中的 loop,对所有事件进行分发。
EpollTaskScheduler.cpp / .h 基于 Linux epoll 的事件调度器。
SelectTaskScheduler.cpp / .h 基于 select() 的跨平台事件调度器。
TaskScheduler.cpp / .h 抽象任务调度接口,Epoll/Select 是其实现类。
Channel.h 表示某个 fd 上的读/写事件,用于注册到 EventLoop 中处理。

该模块实现了Reactor的事件驱动方式,符合one loop per thread?(这里问号的原因是,因为在EventLoop::Loop()中并没有实现类似muduo的设计)

步骤:

在主线程或者子线程:

EventLoop 不断轮询——>TashScheduler分发任务(EpollTaskScheduler、SelectTaskScheduler)——>Channel管理的fd里面注册的读、写、异常任务回调

事件类型,使用 bitmask 实现,可以组合:

  • EPOLL_NONE 表示无事件发生
  • EPOLL_IN 表示可读事件
  • EPOLL_PRI 表示高优先级数据事件
  • EPOLL_OUT 表示可写事件
  • EPOLL_ERR 表示错误事件
  • EPOLL_HUP 表示对端关闭事件
  • EPOLL_RDHUP 表示读半关闭事件

有一个形象的比喻,如果将事件调度器比作“雷达”,则 Channel 就是一个监听器,关注的目标是 socket 的 fd上面的事件.每一个Channel都绑定了一个sockfd_TaskScheduler中的所有scheduler都可以管理Channel,所以下面的问题部分的问题四是需要修改的,以达到负载均衡的目的,实现高并发。

TaskScheduler作为一个事件分发器,可以添加触发器事件(由RingBuffer实现管理事件队列),也可以通过事件队列处理定时器的任务,以及处理通过其实现类(EpollTaskScheduler、SelectTaskScheduler)实现的HandleEvent,处理到来的IO事件。该类中唤醒事件处理的方式是使用Pipe + Channel实现。

EventLoop是事件循环处理核心。

在这个模块,有一个问题,关于事件处理机制。在EventLoop::EventLoop(uint32_t num_threads)函数,即构造函数中,调用了void EventLoop::Loop()函数,这并不符合事件循环机制库的常见设计,void EventLoop::Loop()函数应该由用户显示指定,用户设置好所有的资源之后再启动。


✅ 工具类模块

文件名 功能说明
SocketUtil.cpp / .h 提供与 socket 相关的工具函数,例如设置非阻塞、发送和接受缓冲区大小等。
Timer.cpp / .h 定时任务相关类,支持在指定时间执行任务,挂在 EventLoop 上,支持重复设置重复执行的任务(处理重复执行的事件不合理)。
Logger.cpp / .h 日志系统的实现类,支持日志输出、级别控制等。
log.h 提供简化的宏定义,如 LOG_INFO, LOG_ERROR 等。
NetInterface.cpp / .h 封装对上层的网络接口,可能用于统一 API、回调注册等。
MemoryManager.cpp / .h 内存池管理器,提高分配效率,避免频繁的 malloc/free

该模块实现了一些常用的工具类部分。


问题

在逐步调试以及分析各个模块职能的过程中,发现了一部分问题,需要逐渐修改验证,以及调整,合理分析各个功能分配职责。

1、TcpSocket::Bind(std::string ip, uint16_t port),SocketUtil::Bind(SOCKET sockfd, std::string ip, uint16_t port),但是我也不知道如何去分配职能,在写完Test之后可以去思考。

2、Logger::~Logger()析构函数添加调用Logger::Exit().

3、Timestamp::Localtime(),在Linux平台下使用std::LocalTime()是线程不安全的,使用POSIX提供的localtime_r.

4、EventLoop::UpdateChannelEventLoop::RemoveChannel这两个函数都是使用task_scheduler[0]来更新Channel信息,这个职能分配有问题,如果在多线程情况下将多个任务都给了index_==0的任务分配器,需改正。

5、Timer::SetNextTimeout(timeout)设计问题,TimerQueue是类似于事件驱动的方式来管理定时事件的结构,它本身不会自动运行,需要周期性的调用HandleEvent()来驱动着调用触发事件,所以在test/TimerTest.cpp中的TimerQueueHandleRepeatingTimer,重复调用了多次HandleEvent()。以上实现有一定缺点,比如重复 timer 不会“补偿遗漏”,间隔长了会丢失触发次数。

6、优化TcpSocket类,添加调试信息的输出(包括errno),添加各函数对sockfd的合法性检测。

7、修改SockUtil::Connect无法处理错误IP地址的问题,同时修改连接的部分处理逻辑(比如默认是未连接状态)。

8、修改BufferReader类,添加了Append()EnsureWritableBytes函数接口,以及修改了部分处理逻辑。

9、修改BufferWriter类,添加了SetMaxQueueLength()GetMaxQueueLength()Pop()函数接口,以及修改了部分处理逻辑(如添加static_cast)。

10、修改EventLoop类,添加SetThreadPriority方法,但是如何修改Loop逻辑,执行一次,一直在线监督,改正不了。

Test的编写

使用GTest框架来编写测试模块,测试模块不仅仅是测试工程师的活,也是每一个后端Coder要掌握的。

继续看Timer的压力测试,以及其中的xop模块,也就是真实的音视频模块,先看完一个整体的思路,再去改进,可能改进的都有问题,无法实现。


文章作者: AllenMirac
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 AllenMirac !
  目录