diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index f72f9e87a3..2e006efc01 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -143,14 +143,16 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* SrsStreamCache::SrsStreamCache(SrsSource* s) { source = s; - pthread = new SrsThread("http-stream", - this, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US, false); + queue = new SrsMessageQueue(true); + pthread = new SrsThread("http-stream", this, 0, false); } SrsStreamCache::~SrsStreamCache() { pthread->stop(); srs_freep(pthread); + + srs_freep(queue); } int SrsStreamCache::start() @@ -161,12 +163,60 @@ int SrsStreamCache::start() int SrsStreamCache::dump_cache(SrsConsumer* consumer) { int ret = ERROR_SUCCESS; + + if ((ret = queue->dump_packets(consumer, false, 0, 0, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) { + return ret; + } + + srs_trace("http: dump cache %d msgs, duration=%dms", queue->size(), queue->duration()); + return ret; } int SrsStreamCache::cycle() { int ret = ERROR_SUCCESS; + + SrsConsumer* consumer = NULL; + if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { + srs_error("http: create consumer failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsConsumer, consumer); + + SrsMessageArray msgs(SRS_PERF_MW_MSGS); + // TODO: FIMXE: add pithy print. + + // TODO: FIXME: config it. + queue->set_queue_size(60); + + while (true) { + // get messages from consumer. + // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. + int count = 0; + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { + srs_error("http: get messages from consumer failed. ret=%d", ret); + return ret; + } + + if (count <= 0) { + srs_info("http: mw sleep %dms for no msg", mw_sleep); + // directly use sleep, donot use consumer wait. + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + + // ignore when nothing got. + continue; + } + srs_info("http: got %d msgs, min=%d, mw=%d", count, + SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000); + + // free the messages. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + queue->enqueue(msg); + } + } + return ret; } @@ -412,7 +462,7 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) // create consumer of souce, ignore gop cache, use the audio gop cache. SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer, !enc->has_cache())) != ERROR_SUCCESS) { + if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { srs_error("http: create consumer failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index e60f04b4c4..b5c9e4777a 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -49,6 +49,7 @@ class SrsFlvEncoder; class SrsHttpParser; class SrsHttpMessage; class SrsHttpHandler; +class SrsMessageQueue; class SrsSharedPtrMessage; /** @@ -74,6 +75,7 @@ class SrsVodStream : public SrsGoHttpFileServer class SrsStreamCache : public ISrsThreadHandler { private: + SrsMessageQueue* queue; SrsSource* source; SrsThread* pthread; public: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 89286c923f..fb02137096 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -244,8 +244,9 @@ void SrsFastVector::free() } #endif -SrsMessageQueue::SrsMessageQueue() +SrsMessageQueue::SrsMessageQueue(bool ignore_shrink) { + _ignore_shrink = ignore_shrink; queue_size_ms = 0; av_start_time = av_end_time = -1; } @@ -330,6 +331,26 @@ int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, in return ret; } +int SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag) +{ + int ret = ERROR_SUCCESS; + + int nb_msgs = (int)msgs.size(); + if (nb_msgs <= 0) { + return ret; + } + + SrsSharedPtrMessage** omsgs = msgs.data(); + for (int i = 0; i < nb_msgs; i++) { + SrsSharedPtrMessage* msg = omsgs[i]; + if ((ret = consumer->enqueue(msg, atc, tba, tbv, ag)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; +} + void SrsMessageQueue::shrink() { int iframe_index = -1; @@ -364,8 +385,13 @@ void SrsMessageQueue::shrink() return; } - srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", - (int)msgs.size(), iframe_index, queue_size_ms / 1000.0); + if (_ignore_shrink) { + srs_info("shrink the cache queue, size=%d, removed=%d, max=%.2f", + (int)msgs.size(), iframe_index, queue_size_ms / 1000.0); + } else { + srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", + (int)msgs.size(), iframe_index, queue_size_ms / 1000.0); + } // remove the first gop from the front for (int i = 0; i < iframe_index; i++) { @@ -1702,7 +1728,7 @@ void SrsSource::on_unpublish() handler->on_unpublish(this, _req); } -int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) +int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg) { int ret = ERROR_SUCCESS; @@ -1730,14 +1756,14 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) SrsRtmpJitterAlgorithm ag = jitter_algorithm; // copy metadata. - if (cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, tba, tbv, ag)) != ERROR_SUCCESS) { + if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, tba, tbv, ag)) != ERROR_SUCCESS) { srs_error("dispatch metadata failed. ret=%d", ret); return ret; } srs_info("dispatch metadata success"); // copy sequence header - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, tba, tbv, ag)) != ERROR_SUCCESS) { + if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, tba, tbv, ag)) != ERROR_SUCCESS) { srs_error("dispatch video sequence header failed. ret=%d", ret); return ret; } @@ -1750,10 +1776,12 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) srs_info("dispatch audio sequence header success"); // copy gop cache to client. - if (dump_gop_cache) { - if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) { - return ret; - } + if (dg && (ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) { + return ret; + } + + // print status. + if (dg) { srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate); } else { srs_trace("create consumer, ignore gop cache, tba=%d, tbv=%d", sample_rate, frame_rate); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 84ddf2ea73..859f395ca4 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsConsumer; class SrsPlayEdge; class SrsPublishEdge; class SrsSource; @@ -137,6 +138,7 @@ class SrsFastVector class SrsMessageQueue { private: + bool _ignore_shrink; int64_t av_start_time; int64_t av_end_time; int queue_size_ms; @@ -146,7 +148,7 @@ class SrsMessageQueue std::vector msgs; #endif public: - SrsMessageQueue(); + SrsMessageQueue(bool ignore_shrink = false); virtual ~SrsMessageQueue(); public: /** @@ -176,6 +178,11 @@ class SrsMessageQueue * @max_count the max count to dequeue, must be positive. */ virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); + /** + * dumps packets to consumer, use specified args. + * @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue(). + */ + virtual int dump_packets(SrsConsumer* consumer, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag); private: /** * remove a gop from the front. @@ -494,7 +501,17 @@ class SrsSource : public ISrsReloadHandler virtual void on_unpublish(); // consumer methods public: - virtual int create_consumer(SrsConsumer*& consumer, bool dump_gop_cache = true); + /** + * create consumer and dumps packets in cache. + * @param consumer, output the create consumer. + * @param ds, whether dumps the sequence header. + * @param dm, whether dumps the metadata. + * @param dg, whether dumps the gop cache. + */ + virtual int create_consumer( + SrsConsumer*& consumer, + bool ds = true, bool dm = true, bool dg = true + ); virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); // internal