diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 85e836136c0..45305bfb680 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -2236,6 +2236,11 @@ SrsRtcUdpNetwork* SrsRtcConnection::udp() return networks_->udp(); } +SrsRtcTcpNetwork* SrsRtcConnection::tcp() +{ + return networks_->tcp(); +} + srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 8e49eb854eb..18127f24f1c 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -55,6 +55,7 @@ class SrsEphemeralDelta; class SrsRtcNetworks; class SrsRtcUdpNetwork; class ISrsRtcNetwork; +class SrsRtcTcpNetwork; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -518,6 +519,7 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi void alive(); public: SrsRtcUdpNetwork* udp(); + SrsRtcTcpNetwork* tcp(); public: // send rtcp srs_error_t send_rtcp(char *data, int nb_data); diff --git a/trunk/src/app/srs_app_rtc_network.cpp b/trunk/src/app/srs_app_rtc_network.cpp index d154673d904..2f362bff397 100644 --- a/trunk/src/app/srs_app_rtc_network.cpp +++ b/trunk/src/app/srs_app_rtc_network.cpp @@ -22,6 +22,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_OSX // These functions are similar to the older byteorder(3) family of functions. @@ -30,16 +31,25 @@ using namespace std; #define be32toh ntohl #endif +extern bool srs_is_stun(const uint8_t* data, size_t size); +extern bool srs_is_dtls(const uint8_t* data, size_t len); +extern bool srs_is_rtp_or_rtcp(const uint8_t* data, size_t len); +extern bool srs_is_rtcp(const uint8_t* data, size_t len); + SrsRtcNetworks::SrsRtcNetworks(SrsRtcConnection* conn) { conn_ = conn; delta_ = new SrsEphemeralDelta(); udp_ = new SrsRtcUdpNetwork(conn_, delta_); + tcp_ = new SrsRtcTcpNetwork(conn_, delta_); + dummy_ = new SrsRtcDummyNetwork(); } SrsRtcNetworks::~SrsRtcNetworks() { srs_freep(udp_); + srs_freep(tcp_); + srs_freep(dummy_); srs_freep(delta_); } @@ -48,7 +58,11 @@ srs_error_t SrsRtcNetworks::initialize(SrsSessionConfig* cfg, bool dtls, bool sr srs_error_t err = srs_success; if ((err = udp_->initialize(cfg, dtls, srtp)) != srs_success) { - return srs_error_wrap(err, "init"); + return srs_error_wrap(err, "udp init"); + } + + if ((err = tcp_->initialize(cfg, dtls, srtp)) != srs_success) { + return srs_error_wrap(err, "tcp init"); } return err; @@ -57,6 +71,7 @@ srs_error_t SrsRtcNetworks::initialize(SrsSessionConfig* cfg, bool dtls, bool sr void SrsRtcNetworks::set_state(SrsRtcNetworkState state) { udp_->set_state(state); + tcp_->set_state(state); } SrsRtcUdpNetwork* SrsRtcNetworks::udp() @@ -64,9 +79,21 @@ SrsRtcUdpNetwork* SrsRtcNetworks::udp() return udp_; } +SrsRtcTcpNetwork* SrsRtcNetworks::tcp() +{ + return tcp_; +} + ISrsRtcNetwork* SrsRtcNetworks::available() { - return udp_; + if(udp_->is_establelished()) { + return udp_; + } + + if(tcp_->is_establelished()) { + return tcp_; + } + return dummy_; } ISrsKbpsDelta* SrsRtcNetworks::delta() @@ -76,12 +103,52 @@ ISrsKbpsDelta* SrsRtcNetworks::delta() ISrsRtcNetwork::ISrsRtcNetwork() { + establelished_ = false; } ISrsRtcNetwork::~ISrsRtcNetwork() { } +bool ISrsRtcNetwork::is_establelished() +{ + return establelished_; +} + +SrsRtcDummyNetwork::SrsRtcDummyNetwork() +{ + establelished_ = true; +} + +SrsRtcDummyNetwork::~SrsRtcDummyNetwork() +{ +} + +srs_error_t SrsRtcDummyNetwork::on_connection_established() +{ + return srs_success; +} + +srs_error_t SrsRtcDummyNetwork::on_dtls_alert(std::string type, std::string desc) +{ + return srs_success; +} + +srs_error_t SrsRtcDummyNetwork::protect_rtp(void* packet, int* nb_cipher) +{ + return srs_success; +} + +srs_error_t SrsRtcDummyNetwork::protect_rtcp(void* packet, int* nb_cipher) +{ + return srs_success; +} + +srs_error_t SrsRtcDummyNetwork::write(void* buf, size_t size, ssize_t* nwrite) +{ + return srs_success; +} + SrsRtcUdpNetwork::SrsRtcUdpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) { state_ = SrsRtcNetworkStateInit; @@ -135,6 +202,7 @@ srs_error_t SrsRtcUdpNetwork::start_active_handshake() srs_error_t SrsRtcUdpNetwork::on_dtls(char* data, int nb_data) { + establelished_ = true; // Update stat when we received data. delta_->add_delta(nb_data, 0); @@ -371,6 +439,268 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite) return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT); } +SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) +{ + conn_ = conn; + delta_ = delta; + sendonly_skt_ = NULL; + transport_ = new SrsSecurityTransport(this); + peer_port_ = 0; + state_ = SrsRtcNetworkStateInit; +} +SrsRtcTcpNetwork::~SrsRtcTcpNetwork() +{ + srs_freep(transport_); +} + +void SrsRtcTcpNetwork::update_sendonly_socket(ISrsProtocolReadWriter* skt) +{ + sendonly_skt_ = skt; +} + +srs_error_t SrsRtcTcpNetwork::on_connection_established() +{ + srs_error_t err = srs_success; + + // If DTLS done packet received many times, such as ARQ, ignore. + if(SrsRtcNetworkStateClosed == state_) { + return err; + } + + if ((err = conn_->on_connection_established()) != srs_success) { + return srs_error_wrap(err, "udp"); + } + + state_ = SrsRtcNetworkStateClosed; + return err; +} + +srs_error_t SrsRtcTcpNetwork::on_dtls_alert(std::string type, std::string desc) +{ + return conn_->on_dtls_alert(type, desc); +} + +srs_error_t SrsRtcTcpNetwork::protect_rtp(void* packet, int* nb_cipher) +{ + return transport_->protect_rtp(packet, nb_cipher); +} + +srs_error_t SrsRtcTcpNetwork::protect_rtcp(void* packet, int* nb_cipher) +{ + return transport_->protect_rtcp(packet, nb_cipher); +} + +srs_error_t SrsRtcTcpNetwork::on_stun(SrsStunPacket* r, char* data, int nb_data) +{ + srs_error_t err = srs_success; + + // Write STUN messages to blackhole. + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(data, nb_data); + } + + if (!r->is_binding_request()) { + return err; + } + + string ice_pwd; + if ((err = conn_->on_binding_request(r, ice_pwd)) != srs_success) { + return srs_error_wrap(err, "udp"); + } + + if ((err = on_binding_request(r, ice_pwd)) != srs_success) { + return srs_error_wrap(err, "stun binding request failed"); + } + + return err; +} + +srs_error_t SrsRtcTcpNetwork::on_binding_request(SrsStunPacket* r, std::string ice_pwd) +{ + srs_error_t err = srs_success; + + SrsStunPacket stun_binding_response; + char buf[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stun_binding_response.set_message_type(BindingResponse); + stun_binding_response.set_local_ufrag(r->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); + stun_binding_response.set_transcation_id(r->get_transcation_id()); + // FIXME: inet_addr is deprecated, IPV6 support + stun_binding_response.set_mapped_address(be32toh(inet_addr(get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(get_peer_port()); + + if ((err = stun_binding_response.encode(ice_pwd, stream)) != srs_success) { + return srs_error_wrap(err, "stun binding response encode failed"); + } + + if ((err = write(stream->data(), stream->pos(), NULL)) != srs_success) { + return srs_error_wrap(err, "stun binding response send failed"); + } + + if (state_ == SrsRtcNetworkStateWaitingStun) { + state_ = SrsRtcNetworkStateDtls; + // TODO: FIXME: Add cost. + srs_trace("RTC: session STUN done, waiting DTLS handshake."); + + if((err = start_active_handshake()) != srs_success) { + return srs_error_wrap(err, "fail to dtls handshake"); + } + } + + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(stream->data(), stream->pos()); + } + + return err; +} + +srs_error_t SrsRtcTcpNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp) +{ + srs_error_t err = srs_success; + + if (!srtp) { + srs_freep(transport_); + if (dtls) { + transport_ = new SrsSemiSecurityTransport(this); + } else { + transport_ = new SrsPlaintextTransport(this); + } + } + + if ((err = transport_->initialize(cfg)) != srs_success) { + return srs_error_wrap(err, "init"); + } + + return err; +} + +srs_error_t SrsRtcTcpNetwork::start_active_handshake() +{ + return transport_->start_active_handshake(); +} + +srs_error_t SrsRtcTcpNetwork::on_dtls(char* data, int nb_data) +{ + establelished_ = true; + // Update stat when we received data. + delta_->add_delta(nb_data, 0); + + return transport_->on_dtls(data, nb_data); +} + +srs_error_t SrsRtcTcpNetwork::on_rtcp(char* data, int nb_data) +{ + srs_error_t err = srs_success; + + // Update stat when we received data. + delta_->add_delta(nb_data, 0); + + int nb_unprotected_buf = nb_data; + if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "rtcp unprotect"); + } + + char* unprotected_buf = data; + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); + } + + if ((err = conn_->on_rtcp(unprotected_buf, nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "cipher=%d", nb_data); + } + + return err; +} + +srs_error_t SrsRtcTcpNetwork::on_rtp(char* data, int nb_data) +{ + srs_error_t err = srs_success; + + // Update stat when we received data. + delta_->add_delta(nb_data, 0); + + if ((err = conn_->on_rtp_cipher(data, nb_data)) != srs_success) { + return srs_error_wrap(err, "cipher=%d", nb_data); + } + + int nb_unprotected_buf = nb_data; + if ((err = transport_->unprotect_rtp(data, &nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "rtp unprotect"); + } + + char* unprotected_buf = data; + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(unprotected_buf, nb_unprotected_buf); + } + + if ((err = conn_->on_rtp_plaintext(unprotected_buf, nb_unprotected_buf)) != srs_success) { + return srs_error_wrap(err, "cipher=%d", nb_data); + } + + return err; +} + +void SrsRtcTcpNetwork::set_state(SrsRtcNetworkState state) +{ + if (state_ > state) { + srs_warn("RTC: Ignore setting state=%d, now=%d", state, state_); + return; + } + + state_ = state; +} + +std::string SrsRtcTcpNetwork::get_peer_ip() +{ + return peer_ip_; +} + +int SrsRtcTcpNetwork::get_peer_port() +{ + return peer_port_; +} + +srs_error_t SrsRtcTcpNetwork::write(void* buf, size_t size, ssize_t* nwrite) +{ + srs_assert(size <= 65535); + srs_error_t err = srs_success; + + char len_str[2]; + SrsBuffer buf_len(len_str, sizeof(len_str)); + buf_len.write_2bytes(size); + + ssize_t n = 0; + + if((err = sendonly_skt_->write(buf_len.data(), sizeof(len_str), &n)) != srs_success) { + return srs_error_wrap(err, "rtc tcp write len(%d)", size); + } + + if(nwrite) { + *nwrite = n; + } + + // TODO: FIXME: maybe need to send by a few times + if((err = sendonly_skt_->write(buf, size, &n)) != srs_success) { + return srs_error_wrap(err, "rtc tcp write body"); + } + + if(nwrite) { + *nwrite += n; + } + + + return err; +} + +void SrsRtcTcpNetwork::set_peer_id(const std::string& ip, int port) +{ + peer_ip_ = ip; + peer_port_ = port; +} + SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm) { manager_ = cm; @@ -380,10 +710,13 @@ SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int p delta_ = new SrsNetworkDelta(); delta_->set_io(skt_, skt_); trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id()); + session_ = NULL; + disposing_ = false; } SrsRtcTcpConn::~SrsRtcTcpConn() { + _srs_rtc_manager->unsubscribe(this); trd_->interrupt(); srs_freep(trd_); @@ -413,12 +746,16 @@ std::string SrsRtcTcpConn::remote_ip() srs_error_t SrsRtcTcpConn::start() { + _srs_rtc_manager->subscribe(this); return trd_->start(); } srs_error_t SrsRtcTcpConn::cycle() { - srs_error_t err = do_cycle(); + + srs_error_t err = srs_success; + + err = do_cycle(); // Only stat the HTTP streaming clients, ignore all API clients. SrsStatistic::instance()->on_disconnect(get_id().c_str()); @@ -441,9 +778,131 @@ srs_error_t SrsRtcTcpConn::cycle() srs_error_t SrsRtcTcpConn::do_cycle() { srs_error_t err = srs_success; + char len_str[2]; + ssize_t nread = 0; + char* pkt = new char[1500]; + SrsAutoFreeA(char, pkt); // TODO: FIXME: Handle all bytes of TCP Connection. + while(!disposing_) { + if((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "rtc tcp conn"); + } + + // @doc: https://www.rfc-editor.org/rfc/rfc4571#section-2 + // read length + if((err = skt_->read(len_str, sizeof(len_str), &nread)) != srs_success) { + return srs_error_wrap(err, "rtc tcp conn read len"); + } + + SrsBuffer buf(len_str, sizeof(len_str)); + int16_t npkt = buf.read_2bytes(); + srs_assert(npkt <= 1500); + + // read rtc pkt such as STUN, DTLS or RTP/RTCP + if((err = skt_->read_fully(pkt, npkt, &nread)) != srs_success) { + return srs_error_wrap(err, "rtc tcp conn read body"); + } + if(disposing_) { + // ready to be destroyed, not need to process new packet + return err; + } + + if((err = on_tcp_pkt(pkt, npkt)) != srs_success) { + return srs_error_wrap(err, "process rtc tcp pkt"); + } + } return err; } +srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt) +{ + srs_error_t err = srs_success; + + bool is_stun = srs_is_stun((uint8_t*)pkt, nb_pkt); + bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)pkt, nb_pkt); + bool is_rtcp = srs_is_rtcp((uint8_t*)pkt, nb_pkt); + + if(!is_stun && !session_) { + srs_warn("rtc tcp received a mess pkt. %d[%s]", nb_pkt, srs_string_dumps_hex(pkt, nb_pkt, 8).c_str()); + return err; + } + + if (session_) { + // When got any packet, the session is alive now. + session_->alive(); + } + + if(is_stun) { + SrsStunPacket ping; + if ((err = ping.decode(pkt, nb_pkt)) != srs_success) { + return srs_error_wrap(err, "decode stun packet failed"); + } + if (!session_) { + session_ = dynamic_cast(_srs_rtc_manager->find_by_name(ping.get_username())); + } + if (session_) { + session_->switch_to_context(); + } + + srs_trace("recv stun packet from %s:%d, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", + ip_.c_str(), port_, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); + + // TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it. + if (!session_) { + return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s", + ping.get_username().c_str()); + } + + // For each binding request, update the TCP socket. + if (ping.is_binding_request()) { + session_->tcp()->update_sendonly_socket(skt_); + session_->tcp()->set_peer_id(ip_, port_); + } + return session_->tcp()->on_stun(&ping, pkt, nb_pkt); + } + + // For DTLS, RTCP or RTP, which does not support peer address changing. + if (!session_) { + return srs_error_new(ERROR_RTC_STUN, "no session peer=%s:%d", ip_.c_str(), port_); + } + + // Note that we don't(except error) switch to the context of session, for performance issue. + if (is_rtp_or_rtcp && !is_rtcp) { + err = session_->tcp()->on_rtp(pkt, nb_pkt); + if (err != srs_success) { + return srs_error_wrap(err, "rtc tcp rtp"); + } + return err; + } + + if (is_rtp_or_rtcp && is_rtcp) { + return session_->tcp()->on_rtcp(pkt, nb_pkt); + } + if (srs_is_dtls((uint8_t*)pkt, nb_pkt)) { + srs_trace("receive a dtls pkt"); + return session_->tcp()->on_dtls(pkt, nb_pkt); + } + return srs_error_new(ERROR_RTC_UDP, "unknown packet"); +} + +void SrsRtcTcpConn::on_before_dispose(ISrsResource* c) +{ + if(!session_ || disposing_) { + return; + } + SrsRtcConnection *conn = dynamic_cast(c); + if(conn == session_) { + // the related rtc connection will be disposed + srs_trace("RTC: tcp conn diposing, because of rtc connection"); + session_ = NULL; + disposing_ = true; + } +} + +void SrsRtcTcpConn::on_disposing(ISrsResource* c) +{ + return; +} + diff --git a/trunk/src/app/srs_app_rtc_network.hpp b/trunk/src/app/srs_app_rtc_network.hpp index b9c935dfe1a..7987edcf216 100644 --- a/trunk/src/app/srs_app_rtc_network.hpp +++ b/trunk/src/app/srs_app_rtc_network.hpp @@ -29,6 +29,8 @@ class SrsEphemeralDelta; class ISrsKbpsDelta; class SrsRtcUdpNetwork; class ISrsRtcNetwork; +class SrsRtcTcpNetwork; +class SrsRtcDummyNetwork; // The network stat. enum SrsRtcNetworkState @@ -47,6 +49,10 @@ class SrsRtcNetworks private: // Network over UDP. SrsRtcUdpNetwork* udp_; + // Network over TCP + SrsRtcTcpNetwork* tcp_; + // Network over dummy + SrsRtcDummyNetwork* dummy_; private: // WebRTC session object. SrsRtcConnection* conn_; @@ -63,6 +69,7 @@ class SrsRtcNetworks void set_state(SrsRtcNetworkState state); // Get the UDP network object. SrsRtcUdpNetwork* udp(); + SrsRtcTcpNetwork* tcp(); // Get an available network. ISrsRtcNetwork* available(); public: @@ -73,6 +80,9 @@ class SrsRtcNetworks // For DTLS or Session to call network service. class ISrsRtcNetwork : public ISrsStreamWriter { +protected: + bool establelished_; + public: ISrsRtcNetwork(); virtual ~ISrsRtcNetwork(); @@ -86,6 +96,27 @@ class ISrsRtcNetwork : public ISrsStreamWriter virtual srs_error_t protect_rtp(void* packet, int* nb_cipher) = 0; // Protect RTCP packet by SRTP context. virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0; +public: + bool is_establelished(); +}; + +// Dummy networks +class SrsRtcDummyNetwork : public ISrsRtcNetwork +{ +public: + SrsRtcDummyNetwork(); + virtual ~SrsRtcDummyNetwork(); + +// The interface of ISrsRtcNetwork +public: + virtual srs_error_t on_connection_established(); + virtual srs_error_t on_dtls_alert(std::string type, std::string desc); +public: + virtual srs_error_t protect_rtp(void* packet, int* nb_cipher); + virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher); +// Interface ISrsStreamWriter. +public: + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); }; // The WebRTC over UDP network. @@ -141,8 +172,64 @@ class SrsRtcUdpNetwork : public ISrsRtcNetwork virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); }; +class SrsRtcTcpNetwork: public ISrsRtcNetwork +{ +private: + SrsRtcConnection* conn_; + SrsEphemeralDelta* delta_; + ISrsProtocolReadWriter* sendonly_skt_; + + // The DTLS transport over this network. + ISrsRtcTransport* transport_; + + std::string peer_ip_; + int peer_port_; + SrsRtcNetworkState state_; +public: + SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta); + virtual ~SrsRtcTcpNetwork(); + + void update_sendonly_socket(ISrsProtocolReadWriter* skt); +//ISrsRtcNetwork +public: + // Callback when DTLS connected. + virtual srs_error_t on_connection_established(); + // Callback when DTLS disconnected. + virtual srs_error_t on_dtls_alert(std::string type, std::string desc); + // Protect RTP packet by SRTP context. + virtual srs_error_t protect_rtp(void* packet, int* nb_cipher); + // Protect RTCP packet by SRTP context. + virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher); + +// When got STUN ping message. The peer address may change, we can identify that by STUN messages. + srs_error_t on_stun(SrsStunPacket* r, char* data, int nb_data); +private: + srs_error_t on_binding_request(SrsStunPacket* r, std::string ice_pwd); +// DTLS transport functions. +public: + srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); + virtual srs_error_t start_active_handshake(); + virtual srs_error_t on_dtls(char* data, int nb_data); +// When got data from socket. +public: + srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtp(char* data, int nb_data); +// Other functions. +public: + // Connection level state machine, for ARQ of UDP packets. + void set_state(SrsRtcNetworkState state); + // ICE reflexive address functions. + std::string get_peer_ip(); + int get_peer_port(); +// Interface ISrsStreamWriter. +public: + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); +public: + void set_peer_id(const std::string& ip, int port); +}; + // For WebRTC over TCP. -class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler +class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler, public ISrsDisposingHandler { private: // The manager object to manage the connection. @@ -154,7 +241,10 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo int port_; // The delta for statistic. SrsNetworkDelta* delta_; - // TCP Transport object. + + // WebRTC session object. + SrsRtcConnection* session_; + bool disposing_; ISrsProtocolReadWriter* skt_; public: SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm); @@ -176,6 +266,11 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo virtual srs_error_t cycle(); private: virtual srs_error_t do_cycle(); + srs_error_t on_tcp_pkt(char* pkt, int nb_pkt); +// Interface of ISrsDisposingHandler +public: + virtual void on_before_dispose(ISrsResource* c); + virtual void on_disposing(ISrsResource* c); }; #endif