Skip to content

Commit

Permalink
for bug ossrs#293, http live streaming framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 18, 2015
1 parent 9bf408a commit 2698e6d
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 20 deletions.
34 changes: 34 additions & 0 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_kernel_file.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_protocol_rtmp.hpp>

SrsVodStream::SrsVodStream(string root_dir)
: SrsGoHttpFileServer(root_dir)
Expand Down Expand Up @@ -177,6 +178,39 @@ int SrsHttpServer::initialize()
return ret;
}

int SrsHttpServer::mount(SrsSource* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;

if (flvs.empty()) {
srs_info("ignore mount, no flv stream configed.");
return ret;
}

if (flvs.find(r->vhost) == flvs.end()) {
srs_info("ignore mount flv stream for disabled");
return ret;
}

// TODO: FIXME: implements it.
return ret;
}

void SrsHttpServer::unmount(SrsSource* s, SrsRequest* r)
{
if (flvs.empty()) {
srs_info("ignore unmount, no flv stream configed.");
return;
}

if (flvs.find(r->vhost) == flvs.end()) {
srs_info("ignore unmount flv stream for disabled");
return;
}

// TODO: FIXME: implements it.
}

int SrsHttpServer::on_reload_vhost_http_updated()
{
int ret = ERROR_SUCCESS;
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_http.hpp>
#include <srs_app_reload.hpp>

class SrsSource;
class SrsRequest;
class SrsStSocket;
class SrsHttpParser;
class SrsHttpMessage;
Expand Down Expand Up @@ -85,6 +87,9 @@ class SrsHttpServer : public ISrsReloadHandler
virtual ~SrsHttpServer();
public:
virtual int initialize();
public:
virtual int mount(SrsSource* s, SrsRequest* r);
virtual void unmount(SrsSource* s, SrsRequest* r);
// interface ISrsThreadHandler.
public:
virtual int on_reload_vhost_http_updated();
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ int SrsRtmpConn::stream_service_cycle()

// find a source to serve.
SrsSource* source = NULL;
if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) {
if ((ret = SrsSource::find(req, server, &source)) != ERROR_SUCCESS) {
return ret;
}
srs_assert(source != NULL);
Expand Down
20 changes: 20 additions & 0 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,3 +1074,23 @@ int SrsServer::on_reload_http_stream_updated()
return ret;
}

int SrsServer::on_publish(SrsSource* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;

#ifdef SRS_AUTO_HTTP_SERVER
if ((ret = http_stream_mux->mount(s, r)) != ERROR_SUCCESS) {
return ret;
}
#endif

return ret;
}

void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r)
{
#ifdef SRS_AUTO_HTTP_SERVER
http_stream_mux->unmount(s, r);
#endif
}

8 changes: 7 additions & 1 deletion trunk/src/app/srs_app_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_st.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_thread.hpp>
#include <srs_app_source.hpp>

class SrsServer;
class SrsConnection;
Expand Down Expand Up @@ -113,7 +114,8 @@ class SrsSignalManager : public ISrsThreadHandler
* SRS RTMP server, initialize and listen,
* start connection service thread, destroy client.
*/
class SrsServer : public ISrsReloadHandler
class SrsServer : virtual public ISrsReloadHandler
, virtual public ISrsSourceHandler
{
private:
#ifdef SRS_AUTO_HTTP_API
Expand Down Expand Up @@ -241,6 +243,10 @@ class SrsServer : public ISrsReloadHandler
virtual int on_reload_http_stream_enabled();
virtual int on_reload_http_stream_disabled();
virtual int on_reload_http_stream_updated();
// interface ISrsSourceHandler
public:
virtual int on_publish(SrsSource* s, SrsRequest* r);
virtual void on_unpublish(SrsSource* s, SrsRequest* r);
};

#endif
48 changes: 36 additions & 12 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,33 +672,42 @@ bool SrsGopCache::pure_audio()
return cached_video_count == 0;
}

ISrsSourceHandler::ISrsSourceHandler()
{
}

ISrsSourceHandler::~ISrsSourceHandler()
{
}

std::map<std::string, SrsSource*> SrsSource::pool;

int SrsSource::find(SrsRequest* req, SrsSource** ppsource)
int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;

string stream_url = req->get_stream_url();
string vhost = req->vhost;
string stream_url = r->get_stream_url();
string vhost = r->vhost;

if (pool.find(stream_url) == pool.end()) {
SrsSource* source = new SrsSource(req);
if ((ret = source->initialize()) != ERROR_SUCCESS) {
SrsSource* source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}

pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
srs_info("create new source for url=%s, vhost=%s",
stream_url.c_str(), vhost.c_str());
}

// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
if (true) {
SrsSource* source = pool[stream_url];
source->_req->update_auth(req);
*ppsource = source;
source->_req->update_auth(r);
*pps = source;
}

return ret;
Expand All @@ -714,9 +723,9 @@ void SrsSource::destroy()
pool.clear();
}

SrsSource::SrsSource(SrsRequest* req)
SrsSource::SrsSource()
{
_req = req->copy();
_req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;

#ifdef SRS_AUTO_HLS
Expand All @@ -741,7 +750,7 @@ SrsSource::SrsSource(SrsRequest* req)
aggregate_stream = new SrsStream();

_srs_config->subscribe(this);
atc = _srs_config->get_atc(_req->vhost);
atc = false;
}

SrsSource::~SrsSource()
Expand Down Expand Up @@ -783,10 +792,14 @@ SrsSource::~SrsSource()
srs_freep(_req);
}

int SrsSource::initialize()
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
int ret = ERROR_SUCCESS;

handler = h;
_req = r->copy();
atc = _srs_config->get_atc(_req->vhost);

#ifdef SRS_AUTO_DVR
if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) {
return ret;
Expand Down Expand Up @@ -1643,6 +1656,13 @@ int SrsSource::on_publish()
}
#endif

// notify the handler.
srs_assert(handler);
if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
srs_error("handle on publish failed. ret=%d", ret);
return ret;
}

return ret;
}

Expand Down Expand Up @@ -1676,6 +1696,10 @@ void SrsSource::on_unpublish()

_can_publish = true;
_source_id = -1;

// notify the handler.
srs_assert(handler);
handler->on_unpublish(this, _req);
}

int SrsSource::create_consumer(SrsConsumer*& consumer)
Expand Down
36 changes: 30 additions & 6 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,27 @@ class SrsGopCache
virtual bool pure_audio();
};

/**
* the handler to handle the event of srs source.
* for example, the http flv streaming module handle the event and
* mount http when rtmp start publishing.
*/
class ISrsSourceHandler
{
public:
ISrsSourceHandler();
virtual ~ISrsSourceHandler();
public:
/**
* when stream start publish, mount stream.
*/
virtual int on_publish(SrsSource* s, SrsRequest* r) = 0;
/**
* when stream stop publish, unmount stream.
*/
virtual void on_unpublish(SrsSource* s, SrsRequest* r) = 0;
};

/**
* live streaming source.
*/
Expand All @@ -346,11 +367,11 @@ class SrsSource : public ISrsReloadHandler
public:
/**
* find stream by vhost/app/stream.
* @param req the client request.
* @param ppsource the matched source, if success never be NULL.
* @remark stream_url should without port and schema.
* @param r the client request.
* @param h the event handler for source.
* @param pps the matched source, if success never be NULL.
*/
static int find(SrsRequest* req, SrsSource** ppsource);
static int find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
/**
* when system exit, destroy the sources,
* for gmc to analysis mem leaks.
Expand Down Expand Up @@ -390,6 +411,8 @@ class SrsSource : public ISrsReloadHandler
std::vector<SrsForwarder*> forwarders;
// for aggregate message
SrsStream* aggregate_stream;
// the event handler.
ISrsSourceHandler* handler;
private:
/**
* the sample rate of audio in metadata.
Expand Down Expand Up @@ -421,10 +444,11 @@ class SrsSource : public ISrsReloadHandler
* @param _req the client request object,
* this object will deep copy it for reload.
*/
SrsSource(SrsRequest* req);
SrsSource();
virtual ~SrsSource();
// initialize, get and setter.
public:
virtual int initialize();
virtual int initialize(SrsRequest* r, ISrsSourceHandler* h);
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_atc(std::string vhost);
Expand Down

0 comments on commit 2698e6d

Please # to comment.