SRS源码分析-读写类SrsStSocket
关键类SrsStSocket封装了socket的读写操作,将负责将数据发送给对端(send)以及读取对端发送过来的数据(read)。
继承关系为:
相关源码如下:
//使用协程的TCP
class SrsStSocket : public ISrsProtocolReaderWriter
{
private:
// The recv/send timeout in ms.
// @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms.
int64_t rtm; //recv timeout
int64_t stm; //send timeout
// The recv/send data in bytes
int64_t rbytes; //recv data bytes
int64_t sbytes; //send data bytes
// The underlayer st fd.
srs_netfd_t stfd; //st fd
public:
SrsStSocket();
virtual ~SrsStSocket();
public:
// Initialize the socket with stfd, user must manage it.
virtual srs_error_t initialize(srs_netfd_t fd); //初始化
public:
virtual bool is_never_timeout(int64_t tm);
virtual void set_recv_timeout(int64_t tm);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t tm);
virtual int64_t get_send_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
/**
* @param nread, the actual read bytes, ignore if NULL.
*/
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
/**
* @param nwrite, the actual write bytes, ignore if NULL.
*/
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
//构造函数
SrsStSocket::SrsStSocket()
{
stfd = NULL;
stm = rtm = SRS_CONSTS_NO_TMMS;
rbytes = sbytes = 0;
}
//析构函数
SrsStSocket::~SrsStSocket()
{
}
//初始化,srs的fd
srs_error_t SrsStSocket::initialize(srs_netfd_t fd)
{
stfd = fd;
return srs_success;
}
//是否永不超时
bool SrsStSocket::is_never_timeout(int64_t tm)
{
return tm == SRS_CONSTS_NO_TMMS;
}
//设置接收消息的超时时间
void SrsStSocket::set_recv_timeout(int64_t tm)
{
rtm = tm;
}
//获取接收消息的超时时间
int64_t SrsStSocket::get_recv_timeout()
{
return rtm;
}
//设置发送消息的超时时间
void SrsStSocket::set_send_timeout(int64_t tm)
{
stm = tm;
}
//获取发送消息的超时时间
int64_t SrsStSocket::get_send_timeout()
{
return stm;
}
//获取接收到的消息的字节数
int64_t SrsStSocket::get_recv_bytes()
{
return rbytes;
}
//获取发送的消息的字节数
int64_t SrsStSocket::get_send_bytes()
{
return sbytes;
}
//读:从stfd中读取size bytes数据到buf,读取到的字节数为nread
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
srs_error_t err = srs_success;
ssize_t nb_read; //读取的字节数
//从stfd中读取size个字节的消息到buf中
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value of 0 means the network connection is closed or end of file is reached).
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
//nb_read < 0:超时
//nb_read = 0:对端已经关闭
//nb_read > 0:得到读取的字节数nb_read
if (nb_read <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
}
if (nb_read == 0) {
errno = ECONNRESET;
}
return srs_error_new(ERROR_SOCKET_READ, "read");
}
rbytes += nb_read; //读取到的字节数增加nb_read
return err;
}
//读取size,必须读完
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
srs_error_t err = srs_success;
ssize_t nb_read;
if (rtm == SRS_CONSTS_NO_TMMS) {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
}
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value less than nbyte means the network connection is closed or end of file is reached)
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_read != (ssize_t)size) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_read < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
}
if (nb_read >= 0) {
errno = ECONNRESET;
}
return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
}
rbytes += nb_read;
return err;
}
//向fd写入数据,buf为存放数据的数组,size为写入的字节数,nwrite为实际写入的字节数
//在实际st库中,调用write/writev写入数据,提高写入的效率
srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", stm);
}
return srs_error_new(ERROR_SOCKET_WRITE, "write");
}
sbytes += nb_write;
return err;
}
//writev,一次写入多个buffer
srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
ssize_t nb_write;
if (stm == SRS_CONSTS_NO_TMMS) {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
} else {
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
}
if (nwrite) {
*nwrite = nb_write;
}
// On success a non-negative integer equal to nbyte is returned.
// Otherwise, a value of -1 is returned and errno is set to indicate the error.
if (nb_write <= 0) {
// @see https://github.com/ossrs/srs/issues/200
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", stm);
}
return srs_error_new(ERROR_SOCKET_WRITE, "writev");
}
sbytes += nb_write;
return err;
}