srs代码学习(4)-怎么转发流

publish的流和play的流怎么连接呢?这个恐怕是最绕的地方了。看了一上午的代码,淹没于各种数据结构与流程之中后,俺终于发现了连接publish和play的关键连个类是

SrsSource

SrsConsumer

负责连接着连个类实例的是

SrsRtmpConn 

下面我们详细讲解连接过程


上片我们说到。在底层客户端连接上来后,会经过一系列处理,最后绕到SrsRtmpConn类的循环函数中。就是下面的函数

[cpp] view plain copy
  1. int SrsConnection::cycle()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     _srs_context->generate_id();  
  6.     id = _srs_context->get_id();  
  7.       
  8.     ip = srs_get_peer_ip(st_netfd_fileno(stfd));  
  9.       
  10.     ret = do_cycle();  
  11.       
  12.     // if socket io error, set to closed.  
  13.     if (srs_is_client_gracefully_close(ret)) {  
  14.         ret = ERROR_SOCKET_CLOSED;  
  15.     }  
  16.       
  17.     // success.  
  18.     if (ret == ERROR_SUCCESS) {  
  19.         srs_trace("client finished.");  
  20.     }  
  21.       
  22.     // client close peer.  
  23.     if (ret == ERROR_SOCKET_CLOSED) {  
  24.         srs_warn("client disconnect peer. ret=%d", ret);  
  25.     }  
  26.   
  27.     return ERROR_SUCCESS;  
  28. }  

这个是SrsRtmpConn的基类SrsConnection的函数。在基类里,do_cycle()是个纯虚函数。具体实现完全是靠这子类来的。

那么rtmp类型的这个子类,到底有多么的变态呢,先看看我画的一个流程图,都没有画完。一张放不下,的截好几张

srs代码学习(4)-怎么转发流

srs代码学习(4)-怎么转发流


srs代码学习(4)-怎么转发流


srs代码学习(4)-怎么转发流


够长的,这里我还只是画到了播放的时候,发布流程还没有画。因为太复杂了。


下面开始一步一步的分析

首先看do_cycle()函数这个函数主要负责握手和连命令。并在成功后。获取流的配置信息。关键代码如下

[cpp] view plain copy
  1. if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {  
  2.         srs_error("rtmp handshake failed. ret=%d", ret);  
  3.         return ret;  
  4.     }  
  5.     srs_verbose("rtmp handshake success");  
  6.       
  7.     if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {  
  8.         srs_error("rtmp connect vhost/app failed. ret=%d", ret);  
  9.         return ret;  
  10.     }  
  11.     srs_verbose("rtmp connect app success");  
注意这里有一个比较重要的数据结构

[cpp] view plain copy
  1. SrsRequest* req  


这个主要是存储请求信息的,比如app turl streamid等等。

在各种分析后,进入下一个cycle,service_cycle()函数

service_cycly()函数在做了一些设置工作,设置比如chunk size。代码如下


[cpp] view plain copy
  1. if ((ret = rtmp->set_window_ack_size((int)(2.5 * 1000 * 1000))) != ERROR_SUCCESS) {  
  2.        srs_error("set window acknowledgement size failed. ret=%d", ret);  
  3.        return ret;  
  4.    }  
  5.    srs_verbose("set window acknowledgement size success");  
  6.          
  7.    if ((ret = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != ERROR_SUCCESS) {  
  8.        srs_error("set peer bandwidth failed. ret=%d", ret);  
  9.        return ret;  
  10.    }  

下面一段代码没有看明白。这个是一个补丁打上去的,说说为了做do token traverse。这个暂时先不研究了。

[cpp] view plain copy
  1. if (true) {  
  2.         bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
  3.         bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);  
  4.         if (vhost_is_edge && edge_traverse) {  
  5.             if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {  
  6.                 srs_warn("token auth failed, ret=%d", ret);  
  7.                 return ret;  
  8.             }  
  9.         }  
  10.     }  

接着设置chunk 的大小

[cpp] view plain copy
  1. int chunk_size = _srs_config->get_chunk_size(req->vhost);  
  2.    if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {  
  3.        srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);  
  4.        return ret;  
  5.    }  

回应客户端。连接ok

[cpp] view plain copy
  1. if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {  
  2.        srs_error("response connect app failed. ret=%d", ret);  
  3.        return ret;  
  4.    }  

然后连接就结束了,进入stream_service_cycle()函数,从名字上就可以看出。这个函数是开始就如流命令时代

[cpp] view plain copy
  1. while (!disposed) {  
  2.     ret = stream_service_cycle();  
  3.       
  4.     // stream service must terminated with error, never success.  
  5.     // when terminated with success, it's user required to stop.  
  6.     if (ret == ERROR_SUCCESS) {  
  7.         continue;  
  8.     }  
  9.       
  10.     // when not system control error, fatal error, return.  
  11.     if (!srs_is_system_control_error(ret)) {  
  12.         if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {  
  13.             srs_error("stream service cycle failed. ret=%d", ret);  
  14.         }  
  15.         return ret;  
  16.     }  
  17.       
  18.     // for republish, continue service  
  19.     if (ret == ERROR_CONTROL_REPUBLISH) {  
  20.         // set timeout to a larger value, wait for encoder to republish.  
  21.         rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);  
  22.         rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);  
  23.           
  24.         srs_trace("control message(unpublish) accept, retry stream service.");  
  25.         continue;  
  26.     }  
  27.       
  28.     // for "some" system control error,   
  29.     // logical accept and retry stream service.  
  30.     if (ret == ERROR_CONTROL_RTMP_CLOSE) {  
  31.         // TODO: FIXME: use ping message to anti-death of socket.  
  32.         // @see: https://github.com/ossrs/srs/issues/39  
  33.         // set timeout to a larger value, for user paused.  
  34.         rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);  
  35.         rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);  
  36.           
  37.         srs_trace("control message(close) accept, retry stream service.");  
  38.         continue;  
  39.     }  
  40.       
  41.     // for other system control message, fatal error.  
  42.     srs_error("control message(%d) reject as error. ret=%d", ret, ret);  
  43.     return ret;  
  44. }  

stream_service_cycle()函数闪亮登场

首先进行一些安全验证

[cpp] view plain copy
  1. f ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {  
  2.         if (!srs_is_client_gracefully_close(ret)) {  
  3.             srs_error("identify client failed. ret=%d", ret);  
  4.         }  
  5.         return ret;  
  6.     }  
  7.     req->strip();  
  8.     srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f",   
  9.         srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);  
  10.       
  11.     // security check  
  12.     if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {  
  13.         srs_error("security check failed. ret=%d", ret);  
  14.         return ret;  
  15.     }  
  16.     srs_info("security check ok");  

然后进入比较有意思的环节

[cpp] view plain copy
  1. SrsSource* source = SrsSource::fetch(req);  
  2.    if (!source) {  
  3.        if ((ret = SrsSource::create(req, server, server, &source)) != ERROR_SUCCESS) {  
  4.            return ret;  
  5.        }  
  6.    }  
  7.    srs_assert(source != NULL);  

根据req,寻找是否有这个源,如果没有,那么久创建一个。主要creat()是个静态函数。实现代码为

[cpp] view plain copy
  1. int SrsSource::create(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps)  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     string stream_url = r->get_stream_url();  
  6.     string vhost = r->vhost;  
  7.       
  8.     // should always not exists for create a source.  
  9.     srs_assert (pool.find(stream_url) == pool.end());  
  10.   
  11.     SrsSource* source = new SrsSource();  
  12.     if ((ret = source->initialize(r, h, hh)) != ERROR_SUCCESS) {  
  13.         srs_freep(source);  
  14.         return ret;  
  15.     }  
  16.           
  17.     pool[stream_url] = source;  
  18.     srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());  
  19.       
  20.     *pps = source;  
  21.       
  22.     return ret;  
  23. }  

创建一个新的source,并且放到poo中。pool是什么

[cpp] view plain copy
  1. static std::map<std::string, SrsSource*> pool;  

也是一个全局的静态变量,用了存储所欲的源。到此。谜底进一步解开了。

同意fetch()函数也是静态的。

[cpp] view plain copy
  1. static SrsSource* fetch(SrsRequest* r);  
  2. static SrsSource* fetch(std::string vhost, std::string app, std::string stream);  
ok!在绕道循环函数看看接下来该怎么办

[cpp] view plain copy
  1. SrsStatistic* stat = SrsStatistic::instance();  
  2.     if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {  
  3.         srs_error("stat client failed. ret=%d", ret);  
  4.         return ret;  
  5.     }  

这个是做统计用的,没啥。

[cpp] view plain copy
  1. bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
  2.     bool enabled_cache = _srs_config->get_gop_cache(req->vhost);  
  3.     srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",  
  4.         req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,   
  5.         source->source_id(), source->source_id());  
  6.     source->set_cache(enabled_cache);  
判断是否是边缘节点,是否需要gop缓冲。无他

[cpp] view plain copy
  1. switch (type) {  
  2.        case SrsRtmpConnPlay: {  
  3.            srs_verbose("start to play stream %s.", req->stream.c_str());  
  4.              
  5.            // response connection start play  
  6.            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {  
  7.                srs_error("start to play stream failed. ret=%d", ret);  
  8.                return ret;  
  9.            }  
  10.            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {  
  11.                srs_error("http hook on_play failed. ret=%d", ret);  
  12.                return ret;  
  13.            }  
  14.              
  15.            srs_info("start to play stream %s success", req->stream.c_str());  
  16.            ret = playing(source);  
  17.            http_hooks_on_stop();  
  18.              
  19.            return ret;  
  20.        }  
  21.        case SrsRtmpConnFMLEPublish: {  
  22.            srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());  
  23.              
  24.            if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {  
  25.                srs_error("start to publish stream failed. ret=%d", ret);  
  26.                return ret;  
  27.            }  
  28.              
  29.            return publishing(source);  
  30.        }  
  31.        case SrsRtmpConnFlashPublish: {  
  32.            srs_verbose("flash start to publish stream %s.", req->stream.c_str());  
  33.              
  34.            if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {  
  35.                srs_error("flash start to publish stream failed. ret=%d", ret);  
  36.                return ret;  
  37.            }  
  38.              
  39.            return publishing(source);  
  40.        }  
  41.        default: {  
  42.            ret = ERROR_SYSTEM_CLIENT_INVALID;  
  43.            srs_info("invalid client type=%d. ret=%d", type, ret);  
  44.            return ret;  
  45.        }  
  46.    }  

大流程看。好像是根据不同走到了发布或者播放流程里。但首先。这个type是从哪里来的。怎么没有发现呢?

[cpp] view plain copy
  1. int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, double& duration)  
在这个函数里做确认类型的。rmptserver类不在我们这次分析。


我们分析下play的流程,函数名称为

[cpp] view plain copy
  1. int SrsRtmpConn::playing(SrsSource* source)  
关键代码

[cpp] view plain copy
  1. SrsConsumer* consumer = NULL;  
  2.    if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {  
  3.        srs_error("create consumer failed. ret=%d", ret);  
  4.        return ret;  
  5.    }  
  6.    SrsAutoFree(SrsConsumer, consumer);  
  7.    srs_verbose("consumer created success.");  

 利用source创建一个consumer.创建代码为

[cpp] view plain copy
  1. int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     consumer = new SrsConsumer(this, conn);  
  6.     consumers.push_back(consumer);  
  7.       
  8.     double queue_size = _srs_config->get_queue_length(_req->vhost);  
  9.     consumer->set_queue_size(queue_size);  
  10.       
  11.     // if atc, update the sequence header to gop cache time.  
  12.     if (atc && !gop_cache->empty()) {  
  13.         if (cache_metadata) {  
  14.             cache_metadata->timestamp = gop_cache->start_time();  
  15.         }  
  16.         if (cache_sh_video) {  
  17.             cache_sh_video->timestamp = gop_cache->start_time();  
  18.         }  
  19.         if (cache_sh_audio) {  
  20.             cache_sh_audio->timestamp = gop_cache->start_time();  
  21.         }  
  22.     }  
  23.       
  24.     // copy metadata.  
  25.     if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {  
  26.         srs_error("dispatch metadata failed. ret=%d", ret);  
  27.         return ret;  
  28.     }  
  29.     srs_info("dispatch metadata success");  
  30.       
  31.     // copy sequence header  
  32.     // copy audio sequence first, for hls to fast parse the "right" audio codec.  
  33.     // @see https://github.com/ossrs/srs/issues/301  
  34.     if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {  
  35.         srs_error("dispatch audio sequence header failed. ret=%d", ret);  
  36.         return ret;  
  37.     }  
  38.     srs_info("dispatch audio sequence header success");  
  39.   
  40.     if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algorithm)) != ERROR_SUCCESS) {  
  41.         srs_error("dispatch video sequence header failed. ret=%d", ret);  
  42.         return ret;  
  43.     }  
  44.     srs_info("dispatch video sequence header success");  
  45.       
  46.     // copy gop cache to client.  
  47.     if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {  
  48.         return ret;  
  49.     }  
  50.       
  51.     // print status.  
  52.     if (dg) {  
  53.         srs_trace("create consumer, queue_size=%.2f, jitter=%d", queue_size, jitter_algorithm);  
  54.     } else {  
  55.         srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);  
  56.     }  
  57.   
  58.     // for edge, when play edge stream, check the state  
  59.     if (_srs_config->get_vhost_is_edge(_req->vhost)) {  
  60.         // notice edge to start for the first client.  
  61.         if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {  
  62.             srs_error("notice edge start play stream failed. ret=%d", ret);  
  63.             return ret;  
  64.         }  
  65.     }  
  66.       
  67.     return ret;  
  68. }  

代码好长。主要是创建 放进数据结构中,并拷贝一些metadata进去,对于edge的处理,还没有看明白。

之后的动作

[cpp] view plain copy
  1. SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);  
  2.       
  3.     // start isolate recv thread.  
  4.     if ((ret = trd.start()) != ERROR_SUCCESS) {  
  5.         srs_error("start isolate recv thread failed. ret=%d", ret);  
  6.         return ret;  
  7.     }  

什么?单独创建了一个接受线程,实现了recv never send,send never recv,据说这样效率提高了33%

在绕回去

[cpp] view plain copy
  1. // delivery messages for clients playing stream.  
  2. wakable = consumer;  
  3. ret = do_playing(source, consumer, &trd);  
  4. wakable = NULL;  
进入下一个循环体do_playing()

在分析下一个函数之前。让我总结下缩做的工作

1)创建或者获取了一个source

2)创建一个consumer

3) 创建一个接受线程


下面开始看函数的关键代码

[cpp] view plain copy
  1. // setup the realtime.  
  2.     realtime = _srs_config->get_realtime_enabled(req->vhost);  
  3.     // setup the mw config.  
  4.     // when mw_sleep changed, resize the socket send buffer.  
  5.     mw_enabled = true;  
  6.     change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));  
  7.     // initialize the send_min_interval  
  8.     send_min_interval = _srs_config->get_send_min_interval(req->vhost);  

做实时性 merge write 的设置

[cpp] view plain copy
  1. while (!trd->empty()) {  
  2.           SrsCommonMessage* msg = trd->pump();  
  3.           srs_verbose("pump client message to process.");  
  4.             
  5.           if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {  
  6.               if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {  
  7.                   srs_error("process play control message failed. ret=%d", ret);  
  8.               }  
  9.               return ret;  
  10.           }  
  11.       }  
首先处理接受消息。主要是暂停的消息。

下面进入核心代码

[cpp] view plain copy
  1. <strong><span style="color:#ff6666;"int count = (send_min_interval > 0)? 1 : 0;  
  2.         if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {  
  3.             srs_error("get messages from consumer failed. ret=%d", ret);  
  4.             return ret;  
  5.         }</span></strong>  
这段代码的作用就是,把消息,从consumer里,拷贝到本地的msg队列里。当然。这个拷贝是浅拷贝,只是指针过来了。

首先看msgs的定义

[cpp] view plain copy
  1. SrsMessageArray msgs(SRS_PERF_MW_MSGS)  

这个类里有个核心变量
[cpp] view plain copy
  1. SrsSharedPtrMessage** msgs;  
可以看到它保存的是一个指向指针的指针。

那么dump_packets是怎么实现的呢?

[cpp] view plain copy
  1. int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)  
  2. {  
  3.     int ret =ERROR_SUCCESS;  
  4.       
  5.     srs_assert(count >= 0);  
  6.     srs_assert(msgs->max > 0);  
  7.       
  8.     // the count used as input to reset the max if positive.  
  9.     int max = count? srs_min(count, msgs->max) : msgs->max;  
  10.       
  11.     // the count specifies the max acceptable count,  
  12.     // here maybe 1+, and we must set to 0 when got nothing.  
  13.     count = 0;  
  14.       
  15.     if (should_update_source_id) {  
  16.         srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());  
  17.         should_update_source_id = false;  
  18.     }  
  19.       
  20.     // paused, return nothing.  
  21.     if (paused) {  
  22.         return ret;  
  23.     }  
  24.   
  25.     // pump msgs from queue.  
  26.     if ((ret = queue->dump_packets(max, msgs->msgs, count)) != ERROR_SUCCESS) {  
  27.         return ret;  
  28.     }  
  29.       
  30.     return ret;  
  31. }  


[cpp] view plain copy
  1. int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     int nb_msgs = (int)msgs.size();  
  6.     if (nb_msgs <= 0) {  
  7.         return ret;  
  8.     }  
  9.       
  10.     srs_assert(max_count > 0);  
  11.     count = srs_min(max_count, nb_msgs);  
  12.   
  13.     SrsSharedPtrMessage** omsgs = msgs.data();  
  14.     for (int i = 0; i < count; i++) {  
  15.         pmsgs[i] = omsgs[i];  
  16.     }  
  17.       
  18.     SrsSharedPtrMessage* last = omsgs[count - 1];  
  19.     av_start_time = last->timestamp;  
  20.       
  21.     if (count >= nb_msgs) {  
  22.         // the pmsgs is big enough and clear msgs at most time.  
  23.         msgs.clear();  
  24.     } else {  
  25.         // erase some vector elements may cause memory copy,  
  26.         // maybe can use more efficient vector.swap to avoid copy.  
  27.         // @remark for the pmsgs is big enough, for instance, SRS_PERF_MW_MSGS 128,  
  28.         //      the rtmp play client will get 128msgs once, so this branch rarely execute.  
  29.         msgs.erase(msgs.begin(), msgs.begin() + count);  
  30.     }  
  31.       
  32.     return ret;  
  33. }  

ok代码我就不想分析了。只是个指针拷贝。

下一个问题。consumer的数据是怎么来的呢?

看source的代码,比如音频数据

[cpp] view plain copy
  1. int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)  

代码里有这么一段

[cpp] view plain copy
  1. // copy to all consumer  
  2.    if (!drop_for_reduce) {  
  3.        for (int i = 0; i < (int)consumers.size(); i++) {  
  4.            SrsConsumer* consumer = consumers.at(i);  
  5.            if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {  
  6.                srs_error("dispatch the audio failed. ret=%d", ret);  
  7.                return ret;  
  8.            }  
  9.        }  
  10.        srs_info("dispatch audio success.");  
  11.    }  

到此,一个循环就结束了。