diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 78e1d5e058..f72f9e87a3 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -140,6 +140,36 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* return ret; } +SrsStreamCache::SrsStreamCache(SrsSource* s) +{ + source = s; + pthread = new SrsThread("http-stream", + this, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US, false); +} + +SrsStreamCache::~SrsStreamCache() +{ + pthread->stop(); + srs_freep(pthread); +} + +int SrsStreamCache::start() +{ + return pthread->start(); +} + +int SrsStreamCache::dump_cache(SrsConsumer* consumer) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsStreamCache::cycle() +{ + int ret = ERROR_SUCCESS; + return ret; +} + ISrsStreamEncoder::ISrsStreamEncoder() { } @@ -158,7 +188,7 @@ SrsFlvStreamEncoder::~SrsFlvStreamEncoder() srs_freep(enc); } -int SrsFlvStreamEncoder::initialize(SrsFileWriter* w) +int SrsFlvStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/) { int ret = ERROR_SUCCESS; @@ -189,9 +219,22 @@ int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size) return enc->write_metadata(timestamp, data, size); } +bool SrsFlvStreamEncoder::has_cache() +{ + // for flv stream, use gop cache of SrsSource is ok. + return false; +} + +int SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/) +{ + // for flv stream, ignore cache. + return ERROR_SUCCESS; +} + SrsAacStreamEncoder::SrsAacStreamEncoder() { enc = new SrsAacEncoder(); + cache = NULL; } SrsAacStreamEncoder::~SrsAacStreamEncoder() @@ -199,10 +242,12 @@ SrsAacStreamEncoder::~SrsAacStreamEncoder() srs_freep(enc); } -int SrsAacStreamEncoder::initialize(SrsFileWriter* w) +int SrsAacStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c) { int ret = ERROR_SUCCESS; + cache = c; + if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { return ret; } @@ -227,9 +272,21 @@ int SrsAacStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, i return ERROR_SUCCESS; } +bool SrsAacStreamEncoder::has_cache() +{ + return true; +} + +int SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer) +{ + srs_assert(cache); + return cache->dump_cache(consumer); +} + SrsMp3StreamEncoder::SrsMp3StreamEncoder() { enc = new SrsMp3Encoder(); + cache = NULL; } SrsMp3StreamEncoder::~SrsMp3StreamEncoder() @@ -237,10 +294,12 @@ SrsMp3StreamEncoder::~SrsMp3StreamEncoder() srs_freep(enc); } -int SrsMp3StreamEncoder::initialize(SrsFileWriter* w) +int SrsMp3StreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c) { int ret = ERROR_SUCCESS; + cache = c; + if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { return ret; } @@ -269,6 +328,17 @@ int SrsMp3StreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, i return ERROR_SUCCESS; } +bool SrsMp3StreamEncoder::has_cache() +{ + return true; +} + +int SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer) +{ + srs_assert(cache); + return cache->dump_cache(consumer); +} + SrsStreamWriter::SrsStreamWriter(ISrsGoHttpResponseWriter* w) { writer = w; @@ -305,9 +375,10 @@ int SrsStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite) return writer->write((char*)buf, (int)count); } -SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r) +SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c) { source = s; + cache = c; req = r->copy(); } @@ -339,9 +410,9 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) } SrsAutoFree(ISrsStreamEncoder, enc); - // create consumer of souce. + // create consumer of souce, ignore gop cache, use the audio gop cache. SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { + if ((ret = source->create_consumer(consumer, !enc->has_cache())) != ERROR_SUCCESS) { srs_error("http: create consumer failed. ret=%d", ret); return ret; } @@ -353,10 +424,19 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) // the memory writer. SrsStreamWriter writer(w); - if ((ret = enc->initialize(&writer)) != ERROR_SUCCESS) { + if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) { + srs_error("http: initialize stream encoder failed. ret=%d", ret); return ret; } + // if gop cache enabled for encoder, dump to consumer. + if (enc->has_cache()) { + if ((ret = enc->dump_cache(consumer)) != ERROR_SUCCESS) { + srs_error("http: dump cache to consumer failed. ret=%d", ret); + return ret; + } + } + while (true) { // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. @@ -424,6 +504,7 @@ int SrsLiveStream::streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrM SrsLiveEntry::SrsLiveEntry() { stream = NULL; + cache = NULL; } SrsHttpServer::SrsHttpServer() @@ -485,7 +566,14 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r) // remove the default vhost mount mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); - entry->stream = new SrsLiveStream(s, r); + entry->cache = new SrsStreamCache(s); + entry->stream = new SrsLiveStream(s, r, entry->cache); + + // start http stream cache thread + if ((ret = entry->cache->start()) != ERROR_SUCCESS) { + srs_error("http: start stream cache failed. ret=%d", ret); + return ret; + } // mount the http flv stream. if ((ret = mux.handle(mount, entry->stream)) != ERROR_SUCCESS) { diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index dac0e48228..e60f04b4c4 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -37,9 +37,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include class SrsSource; class SrsRequest; +class SrsConsumer; class SrsStSocket; class SrsAacEncoder; class SrsMp3Encoder; @@ -64,6 +66,27 @@ class SrsVodStream : public SrsGoHttpFileServer virtual int serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r, std::string fullpath, int offset); }; +/** +* for the srs http stream cache, +* for example, the audio stream cache to make android(weixin) happy. +* we start a thread to shrink the queue. +*/ +class SrsStreamCache : public ISrsThreadHandler +{ +private: + SrsSource* source; + SrsThread* pthread; +public: + SrsStreamCache(SrsSource* s); + virtual ~SrsStreamCache(); +public: + virtual int start(); + virtual int dump_cache(SrsConsumer* consumer); +// interface ISrsThreadHandler. +public: + virtual int cycle(); +}; + /** * the stream encoder in some codec, for example, flv or aac. */ @@ -73,10 +96,29 @@ class ISrsStreamEncoder ISrsStreamEncoder(); virtual ~ISrsStreamEncoder(); public: - virtual int initialize(SrsFileWriter* w) = 0; + /** + * initialize the encoder with file writer(to http response) and stream cache. + * @param w the writer to write to http response. + * @param c the stream cache for audio stream fast startup. + */ + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c) = 0; + /** + * write rtmp video/audio/metadata. + */ virtual int write_audio(int64_t timestamp, char* data, int size) = 0; virtual int write_video(int64_t timestamp, char* data, int size) = 0; virtual int write_metadata(int64_t timestamp, char* data, int size) = 0; +public: + /** + * for some stream, for example, mp3 and aac, the audio stream, + * we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio. + * @return true to use gop cache of encoder; otherwise, use SrsSource. + */ + virtual bool has_cache() = 0; + /** + * dumps the cache of encoder to consumer. + */ + virtual int dump_cache(SrsConsumer* consumer) = 0; }; /** @@ -90,10 +132,13 @@ class SrsFlvStreamEncoder : public ISrsStreamEncoder SrsFlvStreamEncoder(); virtual ~SrsFlvStreamEncoder(); public: - virtual int initialize(SrsFileWriter* w); + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); virtual int write_audio(int64_t timestamp, char* data, int size); virtual int write_video(int64_t timestamp, char* data, int size); virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer); }; /** @@ -103,14 +148,18 @@ class SrsAacStreamEncoder : public ISrsStreamEncoder { private: SrsAacEncoder* enc; + SrsStreamCache* cache; public: SrsAacStreamEncoder(); virtual ~SrsAacStreamEncoder(); public: - virtual int initialize(SrsFileWriter* w); + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); virtual int write_audio(int64_t timestamp, char* data, int size); virtual int write_video(int64_t timestamp, char* data, int size); virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer); }; /** @@ -120,14 +169,18 @@ class SrsMp3StreamEncoder : public ISrsStreamEncoder { private: SrsMp3Encoder* enc; + SrsStreamCache* cache; public: SrsMp3StreamEncoder(); virtual ~SrsMp3StreamEncoder(); public: - virtual int initialize(SrsFileWriter* w); + virtual int initialize(SrsFileWriter* w, SrsStreamCache* c); virtual int write_audio(int64_t timestamp, char* data, int size); virtual int write_video(int64_t timestamp, char* data, int size); virtual int write_metadata(int64_t timestamp, char* data, int size); +public: + virtual bool has_cache(); + virtual int dump_cache(SrsConsumer* consumer); }; /** @@ -159,8 +212,9 @@ class SrsLiveStream : public ISrsGoHttpHandler private: SrsRequest* req; SrsSource* source; + SrsStreamCache* cache; public: - SrsLiveStream(SrsSource* s, SrsRequest* r); + SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c); virtual ~SrsLiveStream(); public: virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r); @@ -176,6 +230,7 @@ struct SrsLiveEntry std::string vhost; std::string mount; SrsLiveStream* stream; + SrsStreamCache* cache; SrsLiveEntry(); }; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d6f5d893ec..89286c923f 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1702,7 +1702,7 @@ void SrsSource::on_unpublish() handler->on_unpublish(this, _req); } -int SrsSource::create_consumer(SrsConsumer*& consumer) +int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache) { int ret = ERROR_SUCCESS; @@ -1750,12 +1750,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer) srs_info("dispatch audio sequence header success"); // copy gop cache to client. - if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) { - return ret; + if (dump_gop_cache) { + if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) { + return ret; + } + srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate); + } else { + srs_trace("create consumer, ignore gop cache, tba=%d, tbv=%d", sample_rate, frame_rate); } - srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate); - return ret; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index a482a5e136..84ddf2ea73 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -494,7 +494,7 @@ class SrsSource : public ISrsReloadHandler virtual void on_unpublish(); // consumer methods public: - virtual int create_consumer(SrsConsumer*& consumer); + virtual int create_consumer(SrsConsumer*& consumer, bool dump_gop_cache = true); virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); // internal