From 98c6988bf3295b1b9b5e3b26057798aff46a997f Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 3 Sep 2022 22:04:13 +0800 Subject: [PATCH] WebRTC: Support reuse HTTP port for WebRTC over TCP. --- trunk/src/app/srs_app_caster_flv.cpp | 4 - trunk/src/app/srs_app_conn.cpp | 139 ++++++++++++++++-- trunk/src/app/srs_app_conn.hpp | 37 ++++- trunk/src/app/srs_app_http_conn.cpp | 26 +--- trunk/src/app/srs_app_http_conn.hpp | 10 +- trunk/src/app/srs_app_http_stream.cpp | 21 +-- trunk/src/app/srs_app_rtc_network.cpp | 4 +- trunk/src/app/srs_app_rtc_network.hpp | 4 +- trunk/src/app/srs_app_rtc_server.cpp | 19 ++- trunk/src/app/srs_app_rtmp_conn.cpp | 4 - trunk/src/app/srs_app_server.cpp | 93 ++++++++---- trunk/src/app/srs_app_server.hpp | 6 +- trunk/src/protocol/srs_protocol_http_conn.cpp | 7 +- trunk/src/protocol/srs_protocol_st.cpp | 71 +++++---- trunk/src/protocol/srs_protocol_st.hpp | 12 +- trunk/src/utest/srs_utest_service.cpp | 38 ++--- 16 files changed, 311 insertions(+), 184 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 7c5db1d3b8f..8c0762136e2 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -290,10 +290,6 @@ srs_error_t SrsDynamicHttpConn::start() return srs_error_wrap(err, "set cors=%d", v); } - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - return conn->start(); } diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 394baa05f55..1f30ca86ccc 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -18,7 +18,7 @@ using namespace std; #include #include #include - +#include #include SrsPps* _srs_pps_ids = NULL; @@ -417,7 +417,7 @@ ISrsExpire::~ISrsExpire() SrsTcpConnection::SrsTcpConnection(srs_netfd_t c) { stfd = c; - skt = new SrsStSocket(); + skt = new SrsStSocket(c); } SrsTcpConnection::~SrsTcpConnection() @@ -426,17 +426,6 @@ SrsTcpConnection::~SrsTcpConnection() srs_close_stfd(stfd); } -srs_error_t SrsTcpConnection::initialize() -{ - srs_error_t err = srs_success; - - if ((err = skt->initialize(stfd)) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - - return err; -} - srs_error_t SrsTcpConnection::set_tcp_nodelay(bool v) { srs_error_t err = srs_success; @@ -570,6 +559,130 @@ srs_error_t SrsTcpConnection::writev(const iovec *iov, int iov_size, ssize_t* nw return skt->writev(iov, iov_size, nwrite); } +SrsBufferedReader::SrsBufferedReader(ISrsProtocolReadWriter* io) +{ + io_ = io; + buf_ = NULL; +} + +SrsBufferedReader::~SrsBufferedReader() +{ + srs_freep(buf_); +} + +srs_error_t SrsBufferedReader::peek(char* buf, int* size) +{ + srs_error_t err = srs_success; + + if ((err = reload_buffer()) != srs_success) { + return srs_error_wrap(err, "reload buffer"); + } + + int nn = srs_min(buf_->left(), *size); + *size = nn; + + if (nn) { + memcpy(buf, buf_->head(), nn); + } + + return err; +} + +srs_error_t SrsBufferedReader::reload_buffer() +{ + srs_error_t err = srs_success; + + if (buf_ && !buf_->empty()) { + return err; + } + + // We use read_fully to always full fill the cache, to avoid peeking failed. + ssize_t nread = 0; + if ((err = io_->read_fully(cache_, sizeof(cache_), &nread)) != srs_success) { + return srs_error_wrap(err, "read"); + } + + srs_freep(buf_); + buf_ = new SrsBuffer(cache_, nread); + + return err; +} + +srs_error_t SrsBufferedReader::read(void* buf, size_t size, ssize_t* nread) +{ + if (!buf_ || buf_->empty()) { + return io_->read(buf, size, nread); + } + + int nn = srs_min(buf_->left(), size); + *nread = nn; + + if (nn) { + buf_->read_bytes((char*)buf, nn); + } + return srs_success; +} + +srs_error_t SrsBufferedReader::read_fully(void* buf, size_t size, ssize_t* nread) +{ + if (!buf_ || buf_->empty()) { + return io_->read_fully(buf, size, nread); + } + + int nn = srs_min(buf_->left(), size); + if (nn) { + buf_->read_bytes((char*)buf, nn); + } + + int left = size - nn; + *nread = size; + + if (left) { + return io_->read_fully((char*)buf + nn, left, NULL); + } + return srs_success; +} + +void SrsBufferedReader::set_recv_timeout(srs_utime_t tm) +{ + return io_->set_recv_timeout(tm); +} + +srs_utime_t SrsBufferedReader::get_recv_timeout() +{ + return io_->get_recv_timeout(); +} + +int64_t SrsBufferedReader::get_recv_bytes() +{ + return io_->get_recv_bytes(); +} + +int64_t SrsBufferedReader::get_send_bytes() +{ + return io_->get_send_bytes(); +} + +void SrsBufferedReader::set_send_timeout(srs_utime_t tm) +{ + return io_->set_send_timeout(tm); +} + +srs_utime_t SrsBufferedReader::get_send_timeout() +{ + return io_->get_send_timeout(); +} + +srs_error_t SrsBufferedReader::write(void* buf, size_t size, ssize_t* nwrite) +{ + return io_->write(buf, size, nwrite); +} + +srs_error_t SrsBufferedReader::writev(const iovec *iov, int iov_size, ssize_t* nwrite) +{ + return io_->writev(iov, iov_size, nwrite); +} + SrsSslConnection::SrsSslConnection(ISrsProtocolReadWriter* c) { transport = c; diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 6cba505496b..99255cc98b4 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -21,6 +21,7 @@ #include class SrsWallClock; +class SrsBuffer; // Hooks for connection manager, to handle the event when disposing connections. class ISrsDisposingHandler @@ -148,8 +149,6 @@ class SrsTcpConnection : public ISrsProtocolReadWriter public: SrsTcpConnection(srs_netfd_t c); virtual ~SrsTcpConnection(); -public: - virtual srs_error_t initialize(); public: // Set socket option TCP_NODELAY. virtual srs_error_t set_tcp_nodelay(bool v); @@ -169,6 +168,40 @@ class SrsTcpConnection : public ISrsProtocolReadWriter virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); }; +// With a small fast read buffer, to support peek for protocol detecting. Note that directly write to io without any +// cache or buffer. +class SrsBufferedReader : public ISrsProtocolReadWriter +{ +private: + // The under-layer transport. + ISrsProtocolReadWriter* io_; + // Fixed, small and fast buffer. Note that it must be very small piece of cache, make sure matches all protocols, + // because we will full fill it when peeking. + char cache_[16]; + // Current reading position. + SrsBuffer* buf_; +public: + SrsBufferedReader(ISrsProtocolReadWriter* io); + virtual ~SrsBufferedReader(); +public: + // Peek the head of cache to buf in size of bytes. + srs_error_t peek(char* buf, int* size); +private: + srs_error_t reload_buffer(); +// Interface ISrsProtocolReadWriter +public: + virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); + virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread); + virtual void set_recv_timeout(srs_utime_t tm); + virtual srs_utime_t get_recv_timeout(); + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_utime_t get_send_timeout(); + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); + virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); +}; + // The SSL connection over TCP transport, in server mode. class SrsSslConnection : public ISrsProtocolReadWriter { diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 6860a6f0ce8..65268944612 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -285,21 +285,21 @@ void SrsHttpConn::expire() trd->interrupt(); } -SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) +SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, string cip, int port) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); + io_ = io; manager = cm; - skt = new SrsTcpConnection(fd); enable_stat_ = false; if (https) { - ssl = new SrsSslConnection(skt); + ssl = new SrsSslConnection(io_); conn = new SrsHttpConn(this, ssl, m, cip, port); } else { ssl = NULL; - conn = new SrsHttpConn(this, skt, m, cip, port); + conn = new SrsHttpConn(this, io_, m, cip, port); } _srs_config->subscribe(this); @@ -311,7 +311,7 @@ SrsHttpxConn::~SrsHttpxConn() srs_freep(conn); srs_freep(ssl); - srs_freep(skt); + srs_freep(io_); } void SrsHttpxConn::set_enable_stat(bool v) @@ -323,7 +323,7 @@ srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq) { srs_error_t err = srs_success; - ISrsProtocolReadWriter* io = skt; + ISrsProtocolReadWriter* io = io_; if (ssl) { io = ssl; } @@ -424,16 +424,6 @@ srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0) return r0; } -srs_error_t SrsHttpxConn::set_tcp_nodelay(bool v) -{ - return skt->set_tcp_nodelay(v); -} - -srs_error_t SrsHttpxConn::set_socket_buffer(srs_utime_t buffer_v) -{ - return skt->set_socket_buffer(buffer_v); -} - std::string SrsHttpxConn::desc() { if (ssl) { @@ -461,10 +451,6 @@ srs_error_t SrsHttpxConn::start() return srs_error_wrap(err, "set cors=%d", v); } - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - return conn->start(); } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 5970cd9ee1a..aebd484e933 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -128,13 +128,13 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt private: // The manager object to manage the connection. ISrsResourceManager* manager; - SrsTcpConnection* skt; + ISrsProtocolReadWriter* io_; SrsSslConnection* ssl; SrsHttpConn* conn; // We should never enable the stat, unless HTTP stream connection requires. bool enable_stat_; public: - SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); + SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpxConn(); public: // Require statistic about HTTP connection, for HTTP streaming clients only. @@ -151,12 +151,6 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt virtual srs_error_t on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w); virtual srs_error_t on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w); virtual srs_error_t on_conn_done(srs_error_t r0); -// Extract APIs from SrsTcpConnection. -public: - // Set socket option TCP_NODELAY. - virtual srs_error_t set_tcp_nodelay(bool v); - // Set socket option SO_SNDBUF in srs_utime_t. - virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); // Interface ISrsResource. public: virtual std::string desc(); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index b143124caad..bb56420cdd5 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -628,19 +628,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Note that the handler of hc now is hxc. SrsHttpxConn* hxc = dynamic_cast(hc->handler()); srs_assert(hxc); - - // Set the socket options for transport. - bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost); - if (tcp_nodelay) { - if ((err = hxc->set_tcp_nodelay(tcp_nodelay)) != srs_success) { - return srs_error_wrap(err, "set tcp nodelay"); - } - } - - srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); - if ((err = hxc->set_socket_buffer(mw_sleep)) != srs_success) { - return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep); - } // Start a thread to receive all messages from client, then drop them. SrsHttpRecvThread* trd = new SrsHttpRecvThread(hxc); @@ -649,10 +636,10 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start recv thread"); } - - srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d", - entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep), - enc->has_cache(), msgs.max); + + srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); + srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d", entry->pattern.c_str(), enc_desc.c_str(), + srsu2msi(mw_sleep), enc->has_cache(), msgs.max); // TODO: free and erase the disabled entry after all related connections is closed. // TODO: FXIME: Support timeout for player, quit infinite-loop. diff --git a/trunk/src/app/srs_app_rtc_network.cpp b/trunk/src/app/srs_app_rtc_network.cpp index 30577c0a9b4..d154673d904 100644 --- a/trunk/src/app/srs_app_rtc_network.cpp +++ b/trunk/src/app/srs_app_rtc_network.cpp @@ -371,12 +371,12 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite) return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT); } -SrsRtcTcpConn::SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm) +SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm) { manager_ = cm; ip_ = cip; port_ = port; - skt_ = new SrsTcpConnection(fd); + skt_ = skt; delta_ = new SrsNetworkDelta(); delta_->set_io(skt_, skt_); trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id()); diff --git a/trunk/src/app/srs_app_rtc_network.hpp b/trunk/src/app/srs_app_rtc_network.hpp index d8b03d60be3..b9c935dfe1a 100644 --- a/trunk/src/app/srs_app_rtc_network.hpp +++ b/trunk/src/app/srs_app_rtc_network.hpp @@ -155,9 +155,9 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo // The delta for statistic. SrsNetworkDelta* delta_; // TCP Transport object. - SrsTcpConnection* skt_; + ISrsProtocolReadWriter* skt_; public: - SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm); + SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm); virtual ~SrsRtcTcpConn(); public: ISrsKbpsDelta* delta(); diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 500bddec9bf..3e38771b31f 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -546,17 +546,22 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local // We allows to mock the eip of server. if (true) { - int listen_port = _srs_config->get_rtc_server_listen(); + int udp_port = _srs_config->get_rtc_server_listen(); + int tcp_port = _srs_config->get_rtc_server_tcp_listen(); string protocol = _srs_config->get_rtc_server_protocol(); set candidates = discover_candidates(ruc); for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { - string hostname; int port = listen_port; - srs_parse_hostport(*it, hostname, port); - if (protocol == "udp" || protocol == "tcp") { - local_sdp.add_candidate(protocol, hostname, port, "host"); + string hostname; + int uport = udp_port; srs_parse_hostport(*it, hostname, uport); + int tport = tcp_port; srs_parse_hostport(*it, hostname, tport); + + if (protocol == "udp") { + local_sdp.add_candidate("udp", hostname, uport, "host"); + } else if (protocol == "tcp") { + local_sdp.add_candidate("tcp", hostname, tport, "host"); } else { - local_sdp.add_candidate("udp", hostname, port, "host"); - local_sdp.add_candidate("tcp", hostname, port, "host"); + local_sdp.add_candidate("udp", hostname, uport, "host"); + local_sdp.add_candidate("tcp", hostname, tport, "host"); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 023a90fd1ef..dc49fcb1af3 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1444,10 +1444,6 @@ srs_error_t SrsRtmpConn::start() { srs_error_t err = srs_success; - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); - } - if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "coroutine"); } diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 77aee1642e0..20ed47094e6 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -36,6 +36,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_RTC #include #endif @@ -535,6 +536,7 @@ SrsServer::SrsServer() http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); reuse_api_over_server_ = false; + reuse_rtc_over_server_ = false; http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); @@ -657,16 +659,26 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) return srs_error_wrap(err, "handler initialize"); } +#ifdef SRS_RTC + // If enabled and listen is the same value, resue port for WebRTC over TCP. + if (_srs_config->get_http_stream_enabled() && _srs_config->get_rtc_server_enabled() && _srs_config->get_rtc_server_tcp_enabled() + && _srs_config->get_http_stream_listen() == srs_int2str(_srs_config->get_rtc_server_tcp_listen()) + ) { + srs_trace("WebRTC reuse http=%s server", _srs_config->get_http_stream_listen().c_str()); + reuse_rtc_over_server_ = true; + } +#endif + // If enabled and the listen is the same value, reuse port. if (_srs_config->get_http_stream_enabled() && _srs_config->get_http_api_enabled() && _srs_config->get_http_api_listen() == _srs_config->get_http_stream_listen() && _srs_config->get_https_api_listen() == _srs_config->get_https_stream_listen() ) { - srs_trace("API reuse listen to https server at %s", _srs_config->get_https_stream_listen().c_str()); + srs_trace("API reuse http=%s and https=%s server", _srs_config->get_http_stream_listen().c_str(), _srs_config->get_https_stream_listen().c_str()); reuse_api_over_server_ = true; } - // If reuse port, use the same object as server. + // Only init HTTP API when not reusing HTTP server. if (!reuse_api_over_server_) { SrsHttpServeMux *api = dynamic_cast(http_api_mux); srs_assert(api); @@ -747,20 +759,22 @@ srs_error_t SrsServer::listen() } #ifdef SRS_RTC - // TODO: FIXME: Refine the listeners. - close_listeners(SrsListenerTcp); - if (_srs_config->get_rtc_server_tcp_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp); - listeners.push_back(listener); + if (!reuse_rtc_over_server_) { + // TODO: FIXME: Refine the listeners. + close_listeners(SrsListenerTcp); + if (_srs_config->get_rtc_server_tcp_enabled()) { + SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp); + listeners.push_back(listener); - std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); + std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); + std::string ip; + int port; + srs_parse_endpoint(ep, ip, port); - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port); + if ((err = listener->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port); + } } } #endif @@ -1403,7 +1417,6 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) ISrsResource* resource = NULL; if ((err = fd_to_resource(type, stfd, &resource)) != srs_success) { - //close fd on conn error, otherwise will lead to fd leak -gs srs_close_stfd(stfd); if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) { srs_error_reset(err); @@ -1411,7 +1424,11 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) } return srs_error_wrap(err, "fd to resource"); } - srs_assert(resource); + + // Ignore if no resource found. + if (!resource) { + return err; + } // directly enqueue, the cycle thread will remove the client. conn_manager->add(resource); @@ -1429,7 +1446,7 @@ ISrsHttpServeMux* SrsServer::api_server() return http_api_mux; } -srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr) +srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr) { srs_error_t err = srs_success; @@ -1468,26 +1485,48 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, IS } } + // We will free the stfd from now on. + srs_netfd_t fd2 = stfd; + stfd = NULL; + // The context id may change during creating the bellow objects. SrsContextRestore(_srs_context->get_id()); + +#ifdef SRS_RTC + // If reuse HTTP server with WebRTC TCP, peek to detect the client. + if (reuse_rtc_over_server_ && (type == SrsListenerHttpStream || type == SrsListenerHttpsStream)) { + SrsTcpConnection* skt = new SrsTcpConnection(fd2); + SrsBufferedReader* io = new SrsBufferedReader(skt); + + char buf[3]; int nn = sizeof(buf); + if ((err = io->peek(buf, &nn)) != srs_success) { + srs_freep(io); srs_freep(skt); + return srs_error_wrap(err, "peek"); + } + + // If first peeking bytes is text, serve as HTTP client; otherwise, WebRTC client over TCP. + if (nn == 3 && (buf[0] >= 'A' && buf[0] <= 'Z') && (buf[1] >= 'A' && buf[1] <= 'Z') && (buf[2] >= 'A' && buf[2] <= 'Z')) { + *pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, io, http_server, ip, port); + } else { + *pr = new SrsRtcTcpConn(io, ip, port, this); + } + return err; + } +#endif if (type == SrsListenerRtmpStream) { - *pr = new SrsRtmpConn(this, stfd, ip, port); - } else if (type == SrsListenerHttpApi) { - *pr = new SrsHttpxConn(false, this, stfd, http_api_mux, ip, port); - } else if (type == SrsListenerHttpsApi) { - *pr = new SrsHttpxConn(true, this, stfd, http_api_mux, ip, port); - } else if (type == SrsListenerHttpStream) { - *pr = new SrsHttpxConn(false, this, stfd, http_server, ip, port); - } else if (type == SrsListenerHttpsStream) { - *pr = new SrsHttpxConn(true, this, stfd, http_server, ip, port); + *pr = new SrsRtmpConn(this, fd2, ip, port); + } else if (type == SrsListenerHttpApi || type == SrsListenerHttpsApi) { + *pr = new SrsHttpxConn(type == SrsListenerHttpsApi, this, new SrsTcpConnection(fd2), http_api_mux, ip, port); + } else if (type == SrsListenerHttpStream || type == SrsListenerHttpsStream) { + *pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, new SrsTcpConnection(fd2), http_server, ip, port); #ifdef SRS_RTC } else if (type == SrsListenerTcp) { - *pr = new SrsRtcTcpConn(stfd, ip, port, this); + *pr = new SrsRtcTcpConn(new SrsTcpConnection(fd2), ip, port, this); #endif } else { srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); - srs_close_stfd(stfd); + srs_close_stfd(fd2); return err; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 723f84bd392..e3df09a6760 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -203,8 +203,10 @@ class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler // TODO: FIXME: Extract an HttpApiServer. ISrsHttpServeMux* http_api_mux; SrsHttpServer* http_server; - // If reuse, HTTP API use the same port of HTTP server. + // If reusing, HTTP API use the same port of HTTP server. bool reuse_api_over_server_; + // If reusing, WebRTC TCP use the same port of HTTP server. + bool reuse_rtc_over_server_; private: SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; @@ -313,7 +315,7 @@ class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler // TODO: FIXME: Fetch from hybrid server manager. virtual ISrsHttpServeMux* api_server(); private: - virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr); + virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr); // Interface ISrsResourceManager public: // A callback for connection to remove itself. diff --git a/trunk/src/protocol/srs_protocol_http_conn.cpp b/trunk/src/protocol/srs_protocol_http_conn.cpp index 7b151857a1d..ad832233c72 100644 --- a/trunk/src/protocol/srs_protocol_http_conn.cpp +++ b/trunk/src/protocol/srs_protocol_http_conn.cpp @@ -76,6 +76,8 @@ srs_error_t SrsHttpParser::parse_message(ISrsReader* reader, ISrsHttpMessage** p p_body_start = p_header_tail = NULL; // We must reset the field name and value, because we may get a partial value in on_header_value. field_name = field_value = ""; + // Reset the url. + url = ""; // The header of the request. srs_freep(header); header = new SrsHttpHeader(); @@ -218,9 +220,10 @@ int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; srs_assert(obj); - + if (length > 0) { - obj->url = string(at, (int)length); + // Note that this function might be called for multiple times, and we got pieces of content. + obj->url.append(at, (int)length); } // When header parsed, we must save the position of start for body, diff --git a/trunk/src/protocol/srs_protocol_st.cpp b/trunk/src/protocol/srs_protocol_st.cpp index 73d2ed10ada..f18d511a8b2 100644 --- a/trunk/src/protocol/srs_protocol_st.cpp +++ b/trunk/src/protocol/srs_protocol_st.cpp @@ -467,21 +467,19 @@ bool srs_is_never_timeout(srs_utime_t tm) return tm == SRS_UTIME_NO_TIMEOUT; } -SrsStSocket::SrsStSocket() +SrsStSocket::SrsStSocket() : SrsStSocket(NULL) { - stfd = NULL; - stm = rtm = SRS_UTIME_NO_TIMEOUT; - rbytes = sbytes = 0; } -SrsStSocket::~SrsStSocket() +SrsStSocket::SrsStSocket(srs_netfd_t fd) { + stfd_ = fd; + stm = rtm = SRS_UTIME_NO_TIMEOUT; + rbytes = sbytes = 0; } -srs_error_t SrsStSocket::initialize(srs_netfd_t fd) +SrsStSocket::~SrsStSocket() { - stfd = fd; - return srs_success; } void SrsStSocket::set_recv_timeout(srs_utime_t tm) @@ -517,12 +515,14 @@ int64_t SrsStSocket::get_send_bytes() srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + + srs_assert(stfd_); + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { - nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_read = st_read((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read((st_netfd_t)stfd, buf, size, rtm); + nb_read = st_read((st_netfd_t)stfd_, buf, size, rtm); } if (nread) { @@ -552,12 +552,14 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; + + srs_assert(stfd_); ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { - nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm); + nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, rtm); } if (nread) { @@ -576,7 +578,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) errno = ECONNRESET; } - return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully"); + return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully, size=%d, nn=%d", size, nb_read); } rbytes += nb_read; @@ -587,12 +589,14 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) { srs_error_t err = srs_success; + + srs_assert(stfd_); ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { - nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); + nb_write = st_write((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_write((st_netfd_t)stfd, buf, size, stm); + nb_write = st_write((st_netfd_t)stfd_, buf, size, stm); } if (nwrite) { @@ -617,12 +621,14 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { srs_error_t err = srs_success; + + srs_assert(stfd_); ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { - nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); + nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm); + nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, stm); } if (nwrite) { @@ -646,7 +652,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) { - stfd = NULL; + stfd_ = NULL; io = new SrsStSocket(); host = h; @@ -656,37 +662,26 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) SrsTcpClient::~SrsTcpClient() { - close(); - srs_freep(io); + srs_close_stfd(stfd_); } srs_error_t SrsTcpClient::connect() { srs_error_t err = srs_success; - close(); - - srs_assert(stfd == NULL); + srs_netfd_t stfd = NULL; if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) { return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } - - if ((err = io->initialize(stfd)) != srs_success) { - return srs_error_wrap(err, "tcp: init socket object"); - } - - return err; -} -void SrsTcpClient::close() -{ - // Ignore when already closed. - if (!io) { - return; - } + srs_freep(io); + io = new SrsStSocket(stfd); + + srs_close_stfd(stfd_); + stfd_ = stfd; - srs_close_stfd(stfd); + return err; } void SrsTcpClient::set_recv_timeout(srs_utime_t tm) diff --git a/trunk/src/protocol/srs_protocol_st.hpp b/trunk/src/protocol/srs_protocol_st.hpp index 5603f02e4ea..b94c622b949 100644 --- a/trunk/src/protocol/srs_protocol_st.hpp +++ b/trunk/src/protocol/srs_protocol_st.hpp @@ -127,13 +127,11 @@ class SrsStSocket : public ISrsProtocolReadWriter int64_t rbytes; int64_t sbytes; // The underlayer st fd. - srs_netfd_t stfd; + srs_netfd_t stfd_; public: SrsStSocket(); + SrsStSocket(srs_netfd_t fd); virtual ~SrsStSocket(); -public: - // Initialize the socket with stfd, user must manage it. - virtual srs_error_t initialize(srs_netfd_t fd); public: virtual void set_recv_timeout(srs_utime_t tm); virtual srs_utime_t get_recv_timeout(); @@ -161,7 +159,7 @@ class SrsStSocket : public ISrsProtocolReadWriter class SrsTcpClient : public ISrsProtocolReadWriter { private: - srs_netfd_t stfd; + srs_netfd_t stfd_; SrsStSocket* io; private: std::string host; @@ -179,10 +177,6 @@ class SrsTcpClient : public ISrsProtocolReadWriter // Connect to server over TCP. // @remark We will close the exists connection before do connect. virtual srs_error_t connect(); -private: - // Close the connection to server. - // @remark User should never use the client when close it. - virtual void close(); // Interface ISrsProtocolReadWriter public: virtual void set_recv_timeout(srs_utime_t tm); diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index dbd09742c92..3b564dbe143 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -113,12 +113,11 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL)); @@ -135,12 +134,11 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL)); HELPER_EXPECT_SUCCESS(c.write((void*)" ", 1, NULL)); @@ -159,12 +157,11 @@ VOID TEST(TCPServerTest, PingPong) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello SRS", 9, NULL)); EXPECT_EQ(9, c.get_send_bytes()); @@ -194,12 +191,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS); char buf[16] = {0}; @@ -216,12 +212,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS); char buf[16] = {0}; @@ -238,12 +233,11 @@ VOID TEST(TCPServerTest, PingPongWithTimeout) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); skt.set_recv_timeout(1 * SRS_UTIME_MILLISECONDS); HELPER_EXPECT_SUCCESS(c.write((void*)"Hello", 5, NULL)); @@ -418,12 +412,11 @@ VOID TEST(TCPServerTest, WritevIOVC) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); iovec iovs[3]; iovs[0].iov_base = (void*)"H"; @@ -448,12 +441,11 @@ VOID TEST(TCPServerTest, WritevIOVC) SrsTcpClient c(_srs_tmp_host, _srs_tmp_port, _srs_tmp_timeout); HELPER_EXPECT_SUCCESS(c.connect()); - SrsStSocket skt; srs_usleep(30 * SRS_UTIME_MILLISECONDS); #ifdef SRS_OSX ASSERT_TRUE(h.fd != NULL); #endif - HELPER_EXPECT_SUCCESS(skt.initialize(h.fd)); + SrsStSocket skt(h.fd); iovec iovs[3]; iovs[0].iov_base = (void*)"H"; @@ -1012,11 +1004,7 @@ class MockOnCycleThread3 : public ISrsCoroutineHandler virtual srs_error_t do_cycle(srs_netfd_t cfd) { srs_error_t err = srs_success; - SrsStSocket skt; - if ((err = skt.initialize(cfd)) != srs_success) { - return err; - } - + SrsStSocket skt(cfd); skt.set_recv_timeout(1 * SRS_UTIME_SECONDS); skt.set_send_timeout(1 * SRS_UTIME_SECONDS); @@ -1085,7 +1073,7 @@ VOID TEST(TCPServerTest, TCPClientServer) HELPER_ASSERT_SUCCESS(c.write((void*)"Hello", 5, NULL)); char buf[6]; HELPER_ARRAY_INIT(buf, 6, 0); - ASSERT_EQ(5, srs_read(c.stfd, buf, 5, 1*SRS_UTIME_SECONDS)); + ASSERT_EQ(5, srs_read(c.stfd_, buf, 5, 1*SRS_UTIME_SECONDS)); EXPECT_STREQ("Hello", buf); } } @@ -1263,11 +1251,7 @@ class MockOnCycleThread4 : public ISrsCoroutineHandler virtual srs_error_t do_cycle(srs_netfd_t cfd) { srs_error_t err = srs_success; - SrsStSocket skt; - if ((err = skt.initialize(cfd)) != srs_success) { - return err; - } - + SrsStSocket skt(cfd); skt.set_recv_timeout(1 * SRS_UTIME_SECONDS); skt.set_send_timeout(1 * SRS_UTIME_SECONDS);