From 2698e6dbae307781960add20c07613593e8a0b29 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 18 Jan 2015 18:39:53 +0800 Subject: [PATCH] for bug #293, http live streaming framework. --- trunk/src/app/srs_app_http_conn.cpp | 34 ++++++++++++++++++++ trunk/src/app/srs_app_http_conn.hpp | 5 +++ trunk/src/app/srs_app_rtmp_conn.cpp | 2 +- trunk/src/app/srs_app_server.cpp | 20 ++++++++++++ trunk/src/app/srs_app_server.hpp | 8 ++++- trunk/src/app/srs_app_source.cpp | 48 +++++++++++++++++++++-------- trunk/src/app/srs_app_source.hpp | 36 ++++++++++++++++++---- 7 files changed, 133 insertions(+), 20 deletions(-) diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 8ef0cbb4ff..75ae1eb1d9 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -41,6 +41,7 @@ using namespace std; #include #include #include +#include SrsVodStream::SrsVodStream(string root_dir) : SrsGoHttpFileServer(root_dir) @@ -177,6 +178,39 @@ int SrsHttpServer::initialize() return ret; } +int SrsHttpServer::mount(SrsSource* s, SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + + if (flvs.empty()) { + srs_info("ignore mount, no flv stream configed."); + return ret; + } + + if (flvs.find(r->vhost) == flvs.end()) { + srs_info("ignore mount flv stream for disabled"); + return ret; + } + + // TODO: FIXME: implements it. + return ret; +} + +void SrsHttpServer::unmount(SrsSource* s, SrsRequest* r) +{ + if (flvs.empty()) { + srs_info("ignore unmount, no flv stream configed."); + return; + } + + if (flvs.find(r->vhost) == flvs.end()) { + srs_info("ignore unmount flv stream for disabled"); + return; + } + + // TODO: FIXME: implements it. +} + int SrsHttpServer::on_reload_vhost_http_updated() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 7fbeae3f48..9f66638cd4 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -37,6 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsSource; +class SrsRequest; class SrsStSocket; class SrsHttpParser; class SrsHttpMessage; @@ -85,6 +87,9 @@ class SrsHttpServer : public ISrsReloadHandler virtual ~SrsHttpServer(); public: virtual int initialize(); +public: + virtual int mount(SrsSource* s, SrsRequest* r); + virtual void unmount(SrsSource* s, SrsRequest* r); // interface ISrsThreadHandler. public: virtual int on_reload_vhost_http_updated(); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 647876af40..d9e96de4c9 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -393,7 +393,7 @@ int SrsRtmpConn::stream_service_cycle() // find a source to serve. SrsSource* source = NULL; - if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) { + if ((ret = SrsSource::find(req, server, &source)) != ERROR_SUCCESS) { return ret; } srs_assert(source != NULL); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 87ad4d596a..193d613dcd 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1074,3 +1074,23 @@ int SrsServer::on_reload_http_stream_updated() return ret; } +int SrsServer::on_publish(SrsSource* s, SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_SERVER + if ((ret = http_stream_mux->mount(s, r)) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + +void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r) +{ +#ifdef SRS_AUTO_HTTP_SERVER + http_stream_mux->unmount(s, r); +#endif +} + diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index d88b1f5585..d99426de6a 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include class SrsServer; class SrsConnection; @@ -113,7 +114,8 @@ class SrsSignalManager : public ISrsThreadHandler * SRS RTMP server, initialize and listen, * start connection service thread, destroy client. */ -class SrsServer : public ISrsReloadHandler +class SrsServer : virtual public ISrsReloadHandler + , virtual public ISrsSourceHandler { private: #ifdef SRS_AUTO_HTTP_API @@ -241,6 +243,10 @@ class SrsServer : public ISrsReloadHandler virtual int on_reload_http_stream_enabled(); virtual int on_reload_http_stream_disabled(); virtual int on_reload_http_stream_updated(); +// interface ISrsSourceHandler +public: + virtual int on_publish(SrsSource* s, SrsRequest* r); + virtual void on_unpublish(SrsSource* s, SrsRequest* r); }; #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index af2c7162a4..d6f5d893ec 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -672,24 +672,33 @@ bool SrsGopCache::pure_audio() return cached_video_count == 0; } +ISrsSourceHandler::ISrsSourceHandler() +{ +} + +ISrsSourceHandler::~ISrsSourceHandler() +{ +} + std::map SrsSource::pool; -int SrsSource::find(SrsRequest* req, SrsSource** ppsource) +int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) { int ret = ERROR_SUCCESS; - string stream_url = req->get_stream_url(); - string vhost = req->vhost; + string stream_url = r->get_stream_url(); + string vhost = r->vhost; if (pool.find(stream_url) == pool.end()) { - SrsSource* source = new SrsSource(req); - if ((ret = source->initialize()) != ERROR_SUCCESS) { + SrsSource* source = new SrsSource(); + if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) { srs_freep(source); return ret; } pool[stream_url] = source; - srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); + srs_info("create new source for url=%s, vhost=%s", + stream_url.c_str(), vhost.c_str()); } // we always update the request of resource, @@ -697,8 +706,8 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource) // and we only need to update the token of request, it's simple. if (true) { SrsSource* source = pool[stream_url]; - source->_req->update_auth(req); - *ppsource = source; + source->_req->update_auth(r); + *pps = source; } return ret; @@ -714,9 +723,9 @@ void SrsSource::destroy() pool.clear(); } -SrsSource::SrsSource(SrsRequest* req) +SrsSource::SrsSource() { - _req = req->copy(); + _req = NULL; jitter_algorithm = SrsRtmpJitterAlgorithmOFF; #ifdef SRS_AUTO_HLS @@ -741,7 +750,7 @@ SrsSource::SrsSource(SrsRequest* req) aggregate_stream = new SrsStream(); _srs_config->subscribe(this); - atc = _srs_config->get_atc(_req->vhost); + atc = false; } SrsSource::~SrsSource() @@ -783,10 +792,14 @@ SrsSource::~SrsSource() srs_freep(_req); } -int SrsSource::initialize() +int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) { int ret = ERROR_SUCCESS; + handler = h; + _req = r->copy(); + atc = _srs_config->get_atc(_req->vhost); + #ifdef SRS_AUTO_DVR if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) { return ret; @@ -1643,6 +1656,13 @@ int SrsSource::on_publish() } #endif + // notify the handler. + srs_assert(handler); + if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) { + srs_error("handle on publish failed. ret=%d", ret); + return ret; + } + return ret; } @@ -1676,6 +1696,10 @@ void SrsSource::on_unpublish() _can_publish = true; _source_id = -1; + + // notify the handler. + srs_assert(handler); + handler->on_unpublish(this, _req); } int SrsSource::create_consumer(SrsConsumer*& consumer) diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 145adb8ebc..a482a5e136 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -336,6 +336,27 @@ class SrsGopCache virtual bool pure_audio(); }; +/** +* the handler to handle the event of srs source. +* for example, the http flv streaming module handle the event and +* mount http when rtmp start publishing. +*/ +class ISrsSourceHandler +{ +public: + ISrsSourceHandler(); + virtual ~ISrsSourceHandler(); +public: + /** + * when stream start publish, mount stream. + */ + virtual int on_publish(SrsSource* s, SrsRequest* r) = 0; + /** + * when stream stop publish, unmount stream. + */ + virtual void on_unpublish(SrsSource* s, SrsRequest* r) = 0; +}; + /** * live streaming source. */ @@ -346,11 +367,11 @@ class SrsSource : public ISrsReloadHandler public: /** * find stream by vhost/app/stream. - * @param req the client request. - * @param ppsource the matched source, if success never be NULL. - * @remark stream_url should without port and schema. + * @param r the client request. + * @param h the event handler for source. + * @param pps the matched source, if success never be NULL. */ - static int find(SrsRequest* req, SrsSource** ppsource); + static int find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps); /** * when system exit, destroy the sources, * for gmc to analysis mem leaks. @@ -390,6 +411,8 @@ class SrsSource : public ISrsReloadHandler std::vector forwarders; // for aggregate message SrsStream* aggregate_stream; + // the event handler. + ISrsSourceHandler* handler; private: /** * the sample rate of audio in metadata. @@ -421,10 +444,11 @@ class SrsSource : public ISrsReloadHandler * @param _req the client request object, * this object will deep copy it for reload. */ - SrsSource(SrsRequest* req); + SrsSource(); virtual ~SrsSource(); +// initialize, get and setter. public: - virtual int initialize(); + virtual int initialize(SrsRequest* r, ISrsSourceHandler* h); // interface ISrsReloadHandler public: virtual int on_reload_vhost_atc(std::string vhost);