diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 426f0cdd4b..94fcfd5051 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -42,6 +42,8 @@ using namespace std; #include #include #include +#include +#include SrsVodStream::SrsVodStream(string root_dir) : SrsGoHttpFileServer(root_dir) @@ -136,6 +138,42 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* return ret; } +SrsFlvStreamWriter::SrsFlvStreamWriter(ISrsGoHttpResponseWriter* w) +{ + writer = w; +} + +SrsFlvStreamWriter::~SrsFlvStreamWriter() +{ +} + +int SrsFlvStreamWriter::open(std::string /*file*/) +{ + return ERROR_SUCCESS; +} + +void SrsFlvStreamWriter::close() +{ +} + +bool SrsFlvStreamWriter::is_open() +{ + return true; +} + +int64_t SrsFlvStreamWriter::tellg() +{ + return 0; +} + +int SrsFlvStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite) +{ + if (pnwrite) { + *pnwrite = count; + } + return writer->write((char*)buf, (int)count); +} + SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r) { source = s; @@ -150,7 +188,97 @@ SrsLiveStream::~SrsLiveStream() int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) { int ret = ERROR_SUCCESS; - // TODO: FIMXE: implements it. + + // create consumer of souce. + SrsConsumer* consumer = NULL; + if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { + srs_error("http: create consumer failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsConsumer, consumer); + srs_verbose("http: consumer created success."); + + SrsMessageArray msgs(SRS_PERF_MW_MSGS); + // TODO: FIMXE: add pithy print. + + // write http header for ts. + w->header()->set_content_length((int64_t)2 * 1024 * 1024 * 1024); + w->header()->set_content_type("video/x-flv"); + + // the memory writer. + SrsFlvStreamWriter writer(w); + + SrsFlvEncoder enc; + if ((ret = enc.initialize(&writer)) != ERROR_SUCCESS) { + return ret; + } + + // write flv header. + if ((ret = enc.write_header()) != ERROR_SUCCESS) { + return ret; + } + + while (true) { + // get messages from consumer. + // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. + int count = 0; + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { + srs_error("get messages from consumer failed. ret=%d", ret); + return ret; + } + + if (count <= 0) { + srs_info("mw sleep %dms for no msg", mw_sleep); + // directly use sleep, donot use consumer wait. + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + + // ignore when nothing got. + continue; + } + srs_info("got %d msgs, min=%d, mw=%d", count, + SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000); + + // sendout all messages. + ret = send_messages(&enc, msgs.msgs, count); + + // free the messages. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + srs_freep(msg); + } + + // check send error code. + if (ret != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send messages to client failed. ret=%d", ret); + } + return ret; + } + } + + return ret; +} + +int SrsLiveStream::send_messages(SrsFlvEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs) +{ + int ret = ERROR_SUCCESS; + + for (int i = 0; i < nb_msgs; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + + if (msg->is_audio()) { + ret = enc->write_audio(msg->timestamp, msg->payload, msg->size); + } else if (msg->is_video()) { + ret = enc->write_video(msg->timestamp, msg->payload, msg->size); + } else { + ret = enc->write_metadata(msg->timestamp, msg->payload, msg->size); + } + + if (ret != ERROR_SUCCESS) { + return ret; + } + } + return ret; } @@ -198,7 +326,7 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r) mount = srs_string_replace(mount, "[stream]", r->stream); // remove the default vhost mount - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", ""); + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); // mount the http flv stream. if ((ret = mux.handle(mount, new SrsLiveStream(s, r))) != ERROR_SUCCESS) { @@ -261,7 +389,7 @@ int SrsHttpServer::mount_static_file() mount = srs_string_replace(mount, "[vhost]", vhost); // remove the default vhost mount - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", ""); + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); // the dir mount must always ends with "/" if (mount != "/" && mount.rfind("/") != mount.length() - 1) { diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 4c9010cfdb..d26f618989 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -36,13 +36,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include class SrsSource; class SrsRequest; class SrsStSocket; +class SrsFlvEncoder; class SrsHttpParser; class SrsHttpMessage; class SrsHttpHandler; +class SrsSharedPtrMessage; /** * the flv vod stream supports flv?start=offset-bytes. @@ -59,6 +62,26 @@ class SrsVodStream : public SrsGoHttpFileServer virtual int serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r, std::string fullpath, int offset); }; +/** +* write stream to http response direclty. +*/ +class SrsFlvStreamWriter : public SrsFileWriter +{ +private: + ISrsGoHttpResponseWriter* writer; +public: + SrsFlvStreamWriter(ISrsGoHttpResponseWriter* w); + virtual ~SrsFlvStreamWriter(); +public: + virtual int open(std::string file); + virtual void close(); +public: + virtual bool is_open(); + virtual int64_t tellg(); +public: + virtual int write(void* buf, size_t count, ssize_t* pnwrite); +}; + /** * the flv live stream supports access rtmp in flv over http. * srs will remux rtmp to flv streaming. @@ -73,6 +96,8 @@ class SrsLiveStream : public ISrsGoHttpHandler virtual ~SrsLiveStream(); public: virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r); +private: + virtual int send_messages(SrsFlvEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); }; /**