Skip to content

Commit

Permalink
for ossrs#293, add http stream cache for audio mp3/aac stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 19, 2015
1 parent e6549b2 commit f9f2fcb
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 19 deletions.
104 changes: 96 additions & 8 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,36 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage*
return ret;
}

SrsStreamCache::SrsStreamCache(SrsSource* s)
{
source = s;
pthread = new SrsThread("http-stream",
this, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US, false);
}

SrsStreamCache::~SrsStreamCache()
{
pthread->stop();
srs_freep(pthread);
}

int SrsStreamCache::start()
{
return pthread->start();
}

int SrsStreamCache::dump_cache(SrsConsumer* consumer)
{
int ret = ERROR_SUCCESS;
return ret;
}

int SrsStreamCache::cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}

ISrsStreamEncoder::ISrsStreamEncoder()
{
}
Expand All @@ -158,7 +188,7 @@ SrsFlvStreamEncoder::~SrsFlvStreamEncoder()
srs_freep(enc);
}

int SrsFlvStreamEncoder::initialize(SrsFileWriter* w)
int SrsFlvStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* /*c*/)
{
int ret = ERROR_SUCCESS;

Expand Down Expand Up @@ -189,20 +219,35 @@ int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size)
return enc->write_metadata(timestamp, data, size);
}

bool SrsFlvStreamEncoder::has_cache()
{
// for flv stream, use gop cache of SrsSource is ok.
return false;
}

int SrsFlvStreamEncoder::dump_cache(SrsConsumer* /*consumer*/)
{
// for flv stream, ignore cache.
return ERROR_SUCCESS;
}

SrsAacStreamEncoder::SrsAacStreamEncoder()
{
enc = new SrsAacEncoder();
cache = NULL;
}

SrsAacStreamEncoder::~SrsAacStreamEncoder()
{
srs_freep(enc);
}

int SrsAacStreamEncoder::initialize(SrsFileWriter* w)
int SrsAacStreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c)
{
int ret = ERROR_SUCCESS;

cache = c;

if ((ret = enc->initialize(w)) != ERROR_SUCCESS) {
return ret;
}
Expand All @@ -227,20 +272,34 @@ int SrsAacStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, i
return ERROR_SUCCESS;
}

bool SrsAacStreamEncoder::has_cache()
{
return true;
}

int SrsAacStreamEncoder::dump_cache(SrsConsumer* consumer)
{
srs_assert(cache);
return cache->dump_cache(consumer);
}

SrsMp3StreamEncoder::SrsMp3StreamEncoder()
{
enc = new SrsMp3Encoder();
cache = NULL;
}

SrsMp3StreamEncoder::~SrsMp3StreamEncoder()
{
srs_freep(enc);
}

int SrsMp3StreamEncoder::initialize(SrsFileWriter* w)
int SrsMp3StreamEncoder::initialize(SrsFileWriter* w, SrsStreamCache* c)
{
int ret = ERROR_SUCCESS;

cache = c;

if ((ret = enc->initialize(w)) != ERROR_SUCCESS) {
return ret;
}
Expand Down Expand Up @@ -269,6 +328,17 @@ int SrsMp3StreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, i
return ERROR_SUCCESS;
}

bool SrsMp3StreamEncoder::has_cache()
{
return true;
}

int SrsMp3StreamEncoder::dump_cache(SrsConsumer* consumer)
{
srs_assert(cache);
return cache->dump_cache(consumer);
}

SrsStreamWriter::SrsStreamWriter(ISrsGoHttpResponseWriter* w)
{
writer = w;
Expand Down Expand Up @@ -305,9 +375,10 @@ int SrsStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite)
return writer->write((char*)buf, (int)count);
}

SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r)
SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c)
{
source = s;
cache = c;
req = r->copy();
}

Expand Down Expand Up @@ -339,9 +410,9 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
}
SrsAutoFree(ISrsStreamEncoder, enc);

// create consumer of souce.
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
if ((ret = source->create_consumer(consumer, !enc->has_cache())) != ERROR_SUCCESS) {
srs_error("http: create consumer failed. ret=%d", ret);
return ret;
}
Expand All @@ -353,10 +424,19 @@ int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)

// the memory writer.
SrsStreamWriter writer(w);
if ((ret = enc->initialize(&writer)) != ERROR_SUCCESS) {
if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) {
srs_error("http: initialize stream encoder failed. ret=%d", ret);
return ret;
}

// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((ret = enc->dump_cache(consumer)) != ERROR_SUCCESS) {
srs_error("http: dump cache to consumer failed. ret=%d", ret);
return ret;
}
}

while (true) {
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
Expand Down Expand Up @@ -424,6 +504,7 @@ int SrsLiveStream::streaming_send_messages(ISrsStreamEncoder* enc, SrsSharedPtrM
SrsLiveEntry::SrsLiveEntry()
{
stream = NULL;
cache = NULL;
}

SrsHttpServer::SrsHttpServer()
Expand Down Expand Up @@ -485,7 +566,14 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r)
// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");

entry->stream = new SrsLiveStream(s, r);
entry->cache = new SrsStreamCache(s);
entry->stream = new SrsLiveStream(s, r, entry->cache);

// start http stream cache thread
if ((ret = entry->cache->start()) != ERROR_SUCCESS) {
srs_error("http: start stream cache failed. ret=%d", ret);
return ret;
}

// mount the http flv stream.
if ((ret = mux.handle(mount, entry->stream)) != ERROR_SUCCESS) {
Expand Down
65 changes: 60 additions & 5 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_http.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_file.hpp>
#include <srs_app_thread.hpp>

class SrsSource;
class SrsRequest;
class SrsConsumer;
class SrsStSocket;
class SrsAacEncoder;
class SrsMp3Encoder;
Expand All @@ -64,6 +66,27 @@ class SrsVodStream : public SrsGoHttpFileServer
virtual int serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r, std::string fullpath, int offset);
};

/**
* for the srs http stream cache,
* for example, the audio stream cache to make android(weixin) happy.
* we start a thread to shrink the queue.
*/
class SrsStreamCache : public ISrsThreadHandler
{
private:
SrsSource* source;
SrsThread* pthread;
public:
SrsStreamCache(SrsSource* s);
virtual ~SrsStreamCache();
public:
virtual int start();
virtual int dump_cache(SrsConsumer* consumer);
// interface ISrsThreadHandler.
public:
virtual int cycle();
};

/**
* the stream encoder in some codec, for example, flv or aac.
*/
Expand All @@ -73,10 +96,29 @@ class ISrsStreamEncoder
ISrsStreamEncoder();
virtual ~ISrsStreamEncoder();
public:
virtual int initialize(SrsFileWriter* w) = 0;
/**
* initialize the encoder with file writer(to http response) and stream cache.
* @param w the writer to write to http response.
* @param c the stream cache for audio stream fast startup.
*/
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c) = 0;
/**
* write rtmp video/audio/metadata.
*/
virtual int write_audio(int64_t timestamp, char* data, int size) = 0;
virtual int write_video(int64_t timestamp, char* data, int size) = 0;
virtual int write_metadata(int64_t timestamp, char* data, int size) = 0;
public:
/**
* for some stream, for example, mp3 and aac, the audio stream,
* we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio.
* @return true to use gop cache of encoder; otherwise, use SrsSource.
*/
virtual bool has_cache() = 0;
/**
* dumps the cache of encoder to consumer.
*/
virtual int dump_cache(SrsConsumer* consumer) = 0;
};

/**
Expand All @@ -90,10 +132,13 @@ class SrsFlvStreamEncoder : public ISrsStreamEncoder
SrsFlvStreamEncoder();
virtual ~SrsFlvStreamEncoder();
public:
virtual int initialize(SrsFileWriter* w);
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c);
virtual int write_audio(int64_t timestamp, char* data, int size);
virtual int write_video(int64_t timestamp, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual int dump_cache(SrsConsumer* consumer);
};

/**
Expand All @@ -103,14 +148,18 @@ class SrsAacStreamEncoder : public ISrsStreamEncoder
{
private:
SrsAacEncoder* enc;
SrsStreamCache* cache;
public:
SrsAacStreamEncoder();
virtual ~SrsAacStreamEncoder();
public:
virtual int initialize(SrsFileWriter* w);
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c);
virtual int write_audio(int64_t timestamp, char* data, int size);
virtual int write_video(int64_t timestamp, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual int dump_cache(SrsConsumer* consumer);
};

/**
Expand All @@ -120,14 +169,18 @@ class SrsMp3StreamEncoder : public ISrsStreamEncoder
{
private:
SrsMp3Encoder* enc;
SrsStreamCache* cache;
public:
SrsMp3StreamEncoder();
virtual ~SrsMp3StreamEncoder();
public:
virtual int initialize(SrsFileWriter* w);
virtual int initialize(SrsFileWriter* w, SrsStreamCache* c);
virtual int write_audio(int64_t timestamp, char* data, int size);
virtual int write_video(int64_t timestamp, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual int dump_cache(SrsConsumer* consumer);
};

/**
Expand Down Expand Up @@ -159,8 +212,9 @@ class SrsLiveStream : public ISrsGoHttpHandler
private:
SrsRequest* req;
SrsSource* source;
SrsStreamCache* cache;
public:
SrsLiveStream(SrsSource* s, SrsRequest* r);
SrsLiveStream(SrsSource* s, SrsRequest* r, SrsStreamCache* c);
virtual ~SrsLiveStream();
public:
virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r);
Expand All @@ -176,6 +230,7 @@ struct SrsLiveEntry
std::string vhost;
std::string mount;
SrsLiveStream* stream;
SrsStreamCache* cache;

SrsLiveEntry();
};
Expand Down
13 changes: 8 additions & 5 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ void SrsSource::on_unpublish()
handler->on_unpublish(this, _req);
}

int SrsSource::create_consumer(SrsConsumer*& consumer)
int SrsSource::create_consumer(SrsConsumer*& consumer, bool dump_gop_cache)
{
int ret = ERROR_SUCCESS;

Expand Down Expand Up @@ -1750,12 +1750,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer)
srs_info("dispatch audio sequence header success");

// copy gop cache to client.
if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
return ret;
if (dump_gop_cache) {
if ((ret = gop_cache->dump(consumer, atc, tba, tbv, ag)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);
} else {
srs_trace("create consumer, ignore gop cache, tba=%d, tbv=%d", sample_rate, frame_rate);
}

srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);

return ret;
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class SrsSource : public ISrsReloadHandler
virtual void on_unpublish();
// consumer methods
public:
virtual int create_consumer(SrsConsumer*& consumer);
virtual int create_consumer(SrsConsumer*& consumer, bool dump_gop_cache = true);
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual void set_cache(bool enabled);
// internal
Expand Down

0 comments on commit f9f2fcb

Please # to comment.