Skip to content

Commit

Permalink
WebRTC: Support reuse HTTP port for WebRTC over TCP.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 3, 2022
1 parent 3493fa1 commit 80bd486
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 182 deletions.
4 changes: 0 additions & 4 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ srs_error_t SrsDynamicHttpConn::start()
return srs_error_wrap(err, "set cors=%d", v);
}

if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}

return conn->start();
}

Expand Down
139 changes: 126 additions & 13 deletions trunk/src/app/srs_app_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using namespace std;
#include <srs_app_log.hpp>
#include <srs_app_config.hpp>
#include <srs_core_autofree.hpp>

#include <srs_kernel_buffer.hpp>
#include <srs_protocol_kbps.hpp>

SrsPps* _srs_pps_ids = NULL;
Expand Down Expand Up @@ -417,7 +417,7 @@ ISrsExpire::~ISrsExpire()
SrsTcpConnection::SrsTcpConnection(srs_netfd_t c)
{
stfd = c;
skt = new SrsStSocket();
skt = new SrsStSocket(c);
}

SrsTcpConnection::~SrsTcpConnection()
Expand All @@ -426,17 +426,6 @@ SrsTcpConnection::~SrsTcpConnection()
srs_close_stfd(stfd);
}

srs_error_t SrsTcpConnection::initialize()
{
srs_error_t err = srs_success;

if ((err = skt->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}

return err;
}

srs_error_t SrsTcpConnection::set_tcp_nodelay(bool v)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -570,6 +559,130 @@ srs_error_t SrsTcpConnection::writev(const iovec *iov, int iov_size, ssize_t* nw
return skt->writev(iov, iov_size, nwrite);
}

SrsBufferedReader::SrsBufferedReader(ISrsProtocolReadWriter* io)
{
io_ = io;
buf_ = NULL;
}

SrsBufferedReader::~SrsBufferedReader()
{
srs_freep(buf_);
}

srs_error_t SrsBufferedReader::peek(char* buf, int* size)
{
srs_error_t err = srs_success;

if ((err = reload_buffer()) != srs_success) {
return srs_error_wrap(err, "reload buffer");
}

int nn = srs_min(buf_->left(), *size);
*size = nn;

if (nn) {
memcpy(buf, buf_->head(), nn);
}

return err;
}

srs_error_t SrsBufferedReader::reload_buffer()
{
srs_error_t err = srs_success;

if (buf_ && !buf_->empty()) {
return err;
}

// We use read_fully to always full fill the cache, to avoid peeking failed.
ssize_t nread = 0;
if ((err = io_->read_fully(cache_, sizeof(cache_), &nread)) != srs_success) {
return srs_error_wrap(err, "read");
}

srs_freep(buf_);
buf_ = new SrsBuffer(cache_, nread);

return err;
}

srs_error_t SrsBufferedReader::read(void* buf, size_t size, ssize_t* nread)
{
if (!buf_ || buf_->empty()) {
return io_->read(buf, size, nread);
}

int nn = srs_min(buf_->left(), size);
*nread = nn;

if (nn) {
buf_->read_bytes((char*)buf, nn);
}
return srs_success;
}

srs_error_t SrsBufferedReader::read_fully(void* buf, size_t size, ssize_t* nread)
{
if (!buf_ || buf_->empty()) {
return io_->read_fully(buf, size, nread);
}

int nn = srs_min(buf_->left(), size);
if (nn) {
buf_->read_bytes((char*)buf, nn);
}

int left = size - nn;
*nread = size;

if (left) {
return io_->read_fully((char*)buf + nn, left, NULL);
}
return srs_success;
}

void SrsBufferedReader::set_recv_timeout(srs_utime_t tm)
{
return io_->set_recv_timeout(tm);
}

srs_utime_t SrsBufferedReader::get_recv_timeout()
{
return io_->get_recv_timeout();
}

int64_t SrsBufferedReader::get_recv_bytes()
{
return io_->get_recv_bytes();
}

int64_t SrsBufferedReader::get_send_bytes()
{
return io_->get_send_bytes();
}

void SrsBufferedReader::set_send_timeout(srs_utime_t tm)
{
return io_->set_send_timeout(tm);
}

srs_utime_t SrsBufferedReader::get_send_timeout()
{
return io_->get_send_timeout();
}

srs_error_t SrsBufferedReader::write(void* buf, size_t size, ssize_t* nwrite)
{
return io_->write(buf, size, nwrite);
}

srs_error_t SrsBufferedReader::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
return io_->writev(iov, iov_size, nwrite);
}

SrsSslConnection::SrsSslConnection(ISrsProtocolReadWriter* c)
{
transport = c;
Expand Down
37 changes: 35 additions & 2 deletions trunk/src/app/srs_app_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <srs_protocol_conn.hpp>

class SrsWallClock;
class SrsBuffer;

// Hooks for connection manager, to handle the event when disposing connections.
class ISrsDisposingHandler
Expand Down Expand Up @@ -148,8 +149,6 @@ class SrsTcpConnection : public ISrsProtocolReadWriter
public:
SrsTcpConnection(srs_netfd_t c);
virtual ~SrsTcpConnection();
public:
virtual srs_error_t initialize();
public:
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
Expand All @@ -169,6 +168,40 @@ class SrsTcpConnection : public ISrsProtocolReadWriter
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};

// With a small fast read buffer, to support peek for protocol detecting. Note that directly write to io without any
// cache or buffer.
class SrsBufferedReader : public ISrsProtocolReadWriter
{
private:
// The under-layer transport.
ISrsProtocolReadWriter* io_;
// Fixed, small and fast buffer. Note that it must be very small piece of cache, make sure matches all protocols,
// because we will full fill it when peeking.
char cache_[16];
// Current reading position.
SrsBuffer* buf_;
public:
SrsBufferedReader(ISrsProtocolReadWriter* io);
virtual ~SrsBufferedReader();
public:
// Peek the head of cache to buf in size of bytes.
srs_error_t peek(char* buf, int* size);
private:
srs_error_t reload_buffer();
// Interface ISrsProtocolReadWriter
public:
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};

// The SSL connection over TCP transport, in server mode.
class SrsSslConnection : public ISrsProtocolReadWriter
{
Expand Down
26 changes: 6 additions & 20 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,21 +285,21 @@ void SrsHttpConn::expire()
trd->interrupt();
}

SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, string cip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());

io_ = io;
manager = cm;
skt = new SrsTcpConnection(fd);
enable_stat_ = false;

if (https) {
ssl = new SrsSslConnection(skt);
ssl = new SrsSslConnection(io_);
conn = new SrsHttpConn(this, ssl, m, cip, port);
} else {
ssl = NULL;
conn = new SrsHttpConn(this, skt, m, cip, port);
conn = new SrsHttpConn(this, io_, m, cip, port);
}

_srs_config->subscribe(this);
Expand All @@ -311,7 +311,7 @@ SrsHttpxConn::~SrsHttpxConn()

srs_freep(conn);
srs_freep(ssl);
srs_freep(skt);
srs_freep(io_);
}

void SrsHttpxConn::set_enable_stat(bool v)
Expand All @@ -323,7 +323,7 @@ srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;

ISrsProtocolReadWriter* io = skt;
ISrsProtocolReadWriter* io = io_;
if (ssl) {
io = ssl;
}
Expand Down Expand Up @@ -424,16 +424,6 @@ srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0)
return r0;
}

srs_error_t SrsHttpxConn::set_tcp_nodelay(bool v)
{
return skt->set_tcp_nodelay(v);
}

srs_error_t SrsHttpxConn::set_socket_buffer(srs_utime_t buffer_v)
{
return skt->set_socket_buffer(buffer_v);
}

std::string SrsHttpxConn::desc()
{
if (ssl) {
Expand Down Expand Up @@ -461,10 +451,6 @@ srs_error_t SrsHttpxConn::start()
return srs_error_wrap(err, "set cors=%d", v);
}

if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}

return conn->start();
}

Expand Down
10 changes: 2 additions & 8 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
SrsTcpConnection* skt;
ISrsProtocolReadWriter* io_;
SrsSslConnection* ssl;
SrsHttpConn* conn;
// We should never enable the stat, unless HTTP stream connection requires.
bool enable_stat_;
public:
SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
SrsHttpxConn(bool https, ISrsResourceManager* cm, ISrsProtocolReadWriter* io, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpxConn();
public:
// Require statistic about HTTP connection, for HTTP streaming clients only.
Expand All @@ -151,12 +151,6 @@ class SrsHttpxConn : public ISrsConnection, public ISrsStartable, public ISrsHtt
virtual srs_error_t on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w);
virtual srs_error_t on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w);
virtual srs_error_t on_conn_done(srs_error_t r0);
// Extract APIs from SrsTcpConnection.
public:
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in srs_utime_t.
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Interface ISrsResource.
public:
virtual std::string desc();
Expand Down
21 changes: 4 additions & 17 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,19 +628,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Note that the handler of hc now is hxc.
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(hxc);

// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = hxc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}

srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
if ((err = hxc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
}

// Start a thread to receive all messages from client, then drop them.
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hxc);
Expand All @@ -649,10 +636,10 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start recv thread");
}
srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",
entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep),
enc->has_cache(), msgs.max);

srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d", entry->pattern.c_str(), enc_desc.c_str(),
srsu2msi(mw_sleep), enc->has_cache(), msgs.max);

// TODO: free and erase the disabled entry after all related connections is closed.
// TODO: FXIME: Support timeout for player, quit infinite-loop.
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,12 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT);
}

SrsRtcTcpConn::SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm)
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
{
manager_ = cm;
ip_ = cip;
port_ = port;
skt_ = new SrsTcpConnection(fd);
skt_ = skt;
delta_ = new SrsNetworkDelta();
delta_->set_io(skt_, skt_);
trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id());
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo
// The delta for statistic.
SrsNetworkDelta* delta_;
// TCP Transport object.
SrsTcpConnection* skt_;
ISrsProtocolReadWriter* skt_;
public:
SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm);
SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm);
virtual ~SrsRtcTcpConn();
public:
ISrsKbpsDelta* delta();
Expand Down
Loading

0 comments on commit 80bd486

Please # to comment.