Skip to content

Commit

Permalink
for ossrs#293, support rtmp remux to http flv live stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 18, 2015
1 parent faaa918 commit eea31ef
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 3 deletions.
134 changes: 131 additions & 3 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ using namespace std;
#include <srs_kernel_file.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_protocol_rtmp.hpp>
#include <srs_app_source.hpp>
#include <srs_protocol_msg_array.hpp>

SrsVodStream::SrsVodStream(string root_dir)
: SrsGoHttpFileServer(root_dir)
Expand Down Expand Up @@ -136,6 +138,42 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage*
return ret;
}

SrsFlvStreamWriter::SrsFlvStreamWriter(ISrsGoHttpResponseWriter* w)
{
writer = w;
}

SrsFlvStreamWriter::~SrsFlvStreamWriter()
{
}

int SrsFlvStreamWriter::open(std::string /*file*/)
{
return ERROR_SUCCESS;
}

void SrsFlvStreamWriter::close()
{
}

bool SrsFlvStreamWriter::is_open()
{
return true;
}

int64_t SrsFlvStreamWriter::tellg()
{
return 0;
}

int SrsFlvStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite)
{
if (pnwrite) {
*pnwrite = count;
}
return writer->write((char*)buf, (int)count);
}

SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r)
{
source = s;
Expand All @@ -150,7 +188,97 @@ SrsLiveStream::~SrsLiveStream()
int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
{
int ret = ERROR_SUCCESS;
// TODO: FIMXE: implements it.

// create consumer of souce.
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
srs_error("http: create consumer failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("http: consumer created success.");

SrsMessageArray msgs(SRS_PERF_MW_MSGS);
// TODO: FIMXE: add pithy print.

// write http header for ts.
w->header()->set_content_length((int64_t)2 * 1024 * 1024 * 1024);
w->header()->set_content_type("video/x-flv");

// the memory writer.
SrsFlvStreamWriter writer(w);

SrsFlvEncoder enc;
if ((ret = enc.initialize(&writer)) != ERROR_SUCCESS) {
return ret;
}

// write flv header.
if ((ret = enc.write_header()) != ERROR_SUCCESS) {
return ret;
}

while (true) {
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}

if (count <= 0) {
srs_info("mw sleep %dms for no msg", mw_sleep);
// directly use sleep, donot use consumer wait.
st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);

// ignore when nothing got.
continue;
}
srs_info("got %d msgs, min=%d, mw=%d", count,
SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);

// sendout all messages.
ret = send_messages(&enc, msgs.msgs, count);

// free the messages.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_freep(msg);
}

// check send error code.
if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send messages to client failed. ret=%d", ret);
}
return ret;
}
}

return ret;
}

int SrsLiveStream::send_messages(SrsFlvEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs)
{
int ret = ERROR_SUCCESS;

for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];

if (msg->is_audio()) {
ret = enc->write_audio(msg->timestamp, msg->payload, msg->size);
} else if (msg->is_video()) {
ret = enc->write_video(msg->timestamp, msg->payload, msg->size);
} else {
ret = enc->write_metadata(msg->timestamp, msg->payload, msg->size);
}

if (ret != ERROR_SUCCESS) {
return ret;
}
}

return ret;
}

Expand Down Expand Up @@ -198,7 +326,7 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r)
mount = srs_string_replace(mount, "[stream]", r->stream);

// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "");
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");

// mount the http flv stream.
if ((ret = mux.handle(mount, new SrsLiveStream(s, r))) != ERROR_SUCCESS) {
Expand Down Expand Up @@ -261,7 +389,7 @@ int SrsHttpServer::mount_static_file()
mount = srs_string_replace(mount, "[vhost]", vhost);

// remove the default vhost mount
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "");
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");

// the dir mount must always ends with "/"
if (mount != "/" && mount.rfind("/") != mount.length() - 1) {
Expand Down
25 changes: 25 additions & 0 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_conn.hpp>
#include <srs_app_http.hpp>
#include <srs_app_reload.hpp>
#include <srs_kernel_file.hpp>

class SrsSource;
class SrsRequest;
class SrsStSocket;
class SrsFlvEncoder;
class SrsHttpParser;
class SrsHttpMessage;
class SrsHttpHandler;
class SrsSharedPtrMessage;

/**
* the flv vod stream supports flv?start=offset-bytes.
Expand All @@ -59,6 +62,26 @@ class SrsVodStream : public SrsGoHttpFileServer
virtual int serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r, std::string fullpath, int offset);
};

/**
* write stream to http response direclty.
*/
class SrsFlvStreamWriter : public SrsFileWriter
{
private:
ISrsGoHttpResponseWriter* writer;
public:
SrsFlvStreamWriter(ISrsGoHttpResponseWriter* w);
virtual ~SrsFlvStreamWriter();
public:
virtual int open(std::string file);
virtual void close();
public:
virtual bool is_open();
virtual int64_t tellg();
public:
virtual int write(void* buf, size_t count, ssize_t* pnwrite);
};

/**
* the flv live stream supports access rtmp in flv over http.
* srs will remux rtmp to flv streaming.
Expand All @@ -73,6 +96,8 @@ class SrsLiveStream : public ISrsGoHttpHandler
virtual ~SrsLiveStream();
public:
virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r);
private:
virtual int send_messages(SrsFlvEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
};

/**
Expand Down

0 comments on commit eea31ef

Please # to comment.