diff --git a/apps/socketoptions.hpp b/apps/socketoptions.hpp index 461af4713..8f21b2b09 100644 --- a/apps/socketoptions.hpp +++ b/apps/socketoptions.hpp @@ -254,7 +254,8 @@ const SocketOption srt_options [] { { "bindtodevice", 0, SRTO_BINDTODEVICE, SocketOption::PRE, SocketOption::STRING, nullptr}, { "retransmitalgo", 0, SRTO_RETRANSMITALGO, SocketOption::PRE, SocketOption::INT, nullptr }, { "cryptomode", 0, SRTO_CRYPTOMODE, SocketOption::PRE, SocketOption::INT, nullptr }, - { "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr } + { "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr }, + { "sendmode", 0, SRTO_SENDMODE, SocketOption::PRE, SocketOption::INT, nullptr } }; } diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 85d8aac19..a945dc180 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1377,7 +1377,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_ } CUDTSocket* ls; - SocketKeeper keep_ls; + SocketKeeper keep_ls = CUDT::keep(); // We keep the mutex locked for the whole time of instant checks. // Once they pass, extend the life for the scope by SocketKeeper. @@ -1462,7 +1462,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_ // NOTE: release() locks m_GlobControlLock. // Once we extracted the accepted socket, we don't need to keep ls busy. - keep_ls.release(*this); + keep_ls.release(); ls = NULL; // NOT USABLE ANYMORE! if (!accepted) // The loop was interrupted @@ -2387,7 +2387,8 @@ SRTSTATUS CUDTUnited::close(const SRTSOCKET u, int reason) }; #endif - SocketKeeper k(*this, u, ERH_THROW); + SocketKeeper k = SOCKET_KEEP(u, ERH_THROW); + IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket)); HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); @@ -2397,7 +2398,7 @@ SRTSTATUS CUDTUnited::close(const SRTSOCKET u, int reason) // Releasing under the global lock to avoid even theoretical // data race. - k.release(*this); + k.release(); return cstatus; } @@ -2525,7 +2526,7 @@ void CUDTSocket::breakNonAcceptedSockets() HLOGC(smlog.Debug, log << "breakNonAcceptedSockets: found " << accepted.size() << " leaky accepted sockets"); for (vector::iterator i = accepted.begin(); i != accepted.end(); ++i) { - CUDTUnited::SocketKeeper sk(m_UDT.uglobal(), *i); + SocketKeeper sk = SOCKET_KEEP(*i, ERH_RETURN); if (sk.socket) { sk.socket->m_UDT.m_bBroken = true; @@ -4443,7 +4444,7 @@ SRTSTATUS CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t return APIError(MJ_NOTSUP, MN_INVAL, 0); } - CUDTUnited::GroupKeeper k(uglobal(), groupid, CUDTUnited::ERH_RETURN); + CUDTUnited::GroupKeeper k(uglobal(), groupid, ERH_RETURN); if (!k.group) { return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -4617,7 +4618,7 @@ SRTSOCKET CUDT::connectLinks(SRTSOCKET grp, SRT_SOCKGROUPCONFIG targets[], int a try { - CUDTUnited::GroupKeeper k(uglobal(), grp, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), grp, ERH_THROW); return uglobal().groupConnect(k.group, targets, arraysize); } catch (CUDTException& e) @@ -4740,13 +4741,13 @@ SRTSTATUS CUDT::getsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optva #if SRT_ENABLE_BONDING if (CUDT::isgroup(u)) { - CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW); k.group->getOpt(optname, (pw_optval), (*pw_optlen)); return SRT_STATUS_OK; } #endif - CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core(); + CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core(); udt.getOpt(optname, (pw_optval), (*pw_optlen)); return SRT_STATUS_OK; } @@ -4771,13 +4772,13 @@ SRTSTATUS CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* op #if SRT_ENABLE_BONDING if (CUDT::isgroup(u)) { - CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW); k.group->setOpt(optname, optval, optlen); return SRT_STATUS_OK; } #endif - CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core(); + CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core(); udt.setOpt(optname, optval, optlen); return SRT_STATUS_OK; } @@ -4816,12 +4817,12 @@ int CUDT::sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m) #if SRT_ENABLE_BONDING if (CUDT::isgroup(u)) { - CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW); return k.group->send(buf, len, (w_m)); } #endif - return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m)); + return uglobal().locateSocket(u, ERH_THROW)->core().sendmsg2(buf, len, (w_m)); } catch (const CUDTException& e) { @@ -4860,12 +4861,12 @@ int CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m) #if SRT_ENABLE_BONDING if (CUDT::isgroup(u)) { - CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW); return k.group->recv(buf, len, (w_m)); } #endif - return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m)); + return uglobal().locateSocket(u, ERH_THROW)->core().recvmsg2(buf, len, (w_m)); } catch (const CUDTException& e) { @@ -4882,7 +4883,7 @@ int64_t CUDT::sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, { try { - CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core(); + CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core(); return udt.sendfile(ifs, offset, size, block); } catch (const CUDTException& e) @@ -4904,7 +4905,7 @@ int64_t CUDT::recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, { try { - return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block); + return uglobal().locateSocket(u, ERH_THROW)->core().recvfile(ofs, offset, size, block); } catch (const CUDTException& e) { @@ -5209,7 +5210,7 @@ SRTSTATUS CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instant try { - CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core(); + CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core(); udt.bstats(perf, clear, instantaneous); return SRT_STATUS_OK; } @@ -5229,7 +5230,7 @@ SRTSTATUS CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear) { try { - CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW); k.group->bstatsSocket(perf, clear); return SRT_STATUS_OK; } @@ -5251,7 +5252,7 @@ CUDT* CUDT::getUDTHandle(SRTSOCKET u) { try { - return &uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core(); + return &uglobal().locateSocket(u, ERH_THROW)->core(); } catch (const CUDTException& e) { @@ -5273,7 +5274,7 @@ SRT_SOCKSTATUS CUDT::getsockstate(SRTSOCKET u) #if SRT_ENABLE_BONDING if (CUDT::isgroup(u)) { - CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW); + CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW); return k.group->getStatus(); } #endif diff --git a/srtcore/api.h b/srtcore/api.h index fe08947c5..2e8434804 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -276,12 +276,6 @@ class CUDTUnited static const size_t MAX_CLOSE_RECORD_SIZE = 10; public: - enum ErrorHandling - { - ERH_RETURN, - ERH_THROW, - ERH_ABORT - }; static std::string CONID(SRTSOCKET sock); /// initialize the UDT library. @@ -568,10 +562,14 @@ class CUDTUnited }; #endif +public: + CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN); bool acquireSocket(CUDTSocket* s); void releaseSocket(CUDTSocket* s); +private: + SRT_TSA_NEEDS_LOCKED(m_InitLock) bool startGarbageCollector(); @@ -584,64 +582,6 @@ class CUDTUnited void cleanupAllSockets(); void closeAllSockets(); -public: - struct SocketKeeper - { - CUDTSocket* socket; - - SocketKeeper(): socket(NULL) {} - - // This is intended for API functions to lock the socket's existence - // for the lifetime of their call. - SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); } - - // This is intended for TSBPD thread that should lock the socket's - // existence until it exits. - SocketKeeper(CUDTUnited& glob, CUDTSocket* s) - { - acquire(glob, s); - } - - void acquire_LOCKED(CUDTSocket* s) - { - socket = s; - s->apiAcquire(); - } - - // Note: acquire doesn't check if the keeper already keeps anything. - // This is only for a use together with an empty constructor. - bool acquire(CUDTUnited& glob, CUDTSocket* s) - { - if (s == NULL) - { - socket = NULL; - return false; - } - - const bool caught = glob.acquireSocket(s); - socket = caught ? s : NULL; - return caught; - } - - bool release(CUDTUnited& glob) - { - if (!socket) - return false; - - glob.releaseSocket(socket); - socket = NULL; - return true; - } - - ~SocketKeeper() - { - if (socket) - { - SRT_ASSERT(socket->isStillBusy() > 0); - socket->apiRelease(); - } - } - }; private: diff --git a/srtcore/buffer_snd.cpp b/srtcore/buffer_snd.cpp index 1b7efdcf6..97eb9f7c9 100644 --- a/srtcore/buffer_snd.cpp +++ b/srtcore/buffer_snd.cpp @@ -349,6 +349,58 @@ int32_t CSndBuffer::getMsgNoAtSeq(const int32_t seqno) return m_Packets[offset].getMsgSeq(); } +bool CSndBuffer::getPacketRangeSize(int32_t seqlo, int32_t seqhi, int& w_packets, int& w_bytes) +{ + ScopedLock bufferguard(m_BufLock); + int offset_lo = CSeqNo::seqoff(m_iSndLastDataAck, seqlo); + int offset_hi = CSeqNo::seqoff(m_iSndLastDataAck, seqhi); + + w_packets = 0; + w_bytes = 0; + + if (offset_hi < 0 || offset_hi < offset_lo) + { + HLOGC(bslog.Debug, log << "getPacketRangeSize: invalid seq range %(" << seqlo << "-" << seqhi + << ") map to off=(" << offset_lo << ", " << offset_hi << ")"); + return false; + } + + // Rule out empty packets case, not sure if possible, but still + if (m_Packets.empty()) + { + // Treat this as false because if the buffer is empty, + // the sequence numbers definitely don't refer to any existing + // packets in the buffer. + return false; + } + + bool full_range = true; + if (offset_lo < 0) + { + offset_lo = 0; + full_range = false; + } + + if (offset_hi >= int(m_Packets.size())) + { + offset_hi = m_Packets.size() - 1; + full_range = false; + } + + int npackets = offset_hi - offset_lo + 1, + nbytes = 0; + + for (int i = offset_lo; i <= offset_hi; ++i) + { + nbytes += m_Packets[i].m_iLength; + } + + w_packets = npackets; + w_bytes = nbytes; + + return full_range; +} + // XXX Likely unused (left for the use by tests) int CSndBuffer::readOldPacket(int32_t seqno, CSndPacket& w_sndpkt, steady_clock::time_point& w_srctime, DropRange& w_drop) { diff --git a/srtcore/buffer_snd.h b/srtcore/buffer_snd.h index 16b95aa2e..f60029bd5 100644 --- a/srtcore/buffer_snd.h +++ b/srtcore/buffer_snd.h @@ -453,6 +453,7 @@ class CSndBuffer int getAvgBufSize(int& bytes, int& timespan); + bool getPacketRangeSize(int32_t seqlo, int32_t seqhi, int& w_packets, int& w_bytes); int getCurrBufSize(int& bytes, int& timespan) const { sync::ScopedLock lk (m_BufLock); diff --git a/srtcore/common.cpp b/srtcore/common.cpp index b978549cb..f48260c3e 100644 --- a/srtcore/common.cpp +++ b/srtcore/common.cpp @@ -69,6 +69,7 @@ modified by #include #endif +#include "api.h" #include "md5.h" #include "common.h" #include "netinet_any.h" @@ -554,6 +555,7 @@ vector GetLocalInterfaces() return locals; } +SRTSOCKET SocketKeeper::id() const { return socket ? socket->id() : SRT_INVALID_SOCK; } // Value display utilities diff --git a/srtcore/common.h b/srtcore/common.h index abea7f108..7d55b79ce 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -95,6 +95,17 @@ modified by namespace srt { +// export import .api default; +class CUDTUnited; +class CUDTSocket; + +enum ErrorHandling +{ + ERH_RETURN, + ERH_THROW, + ERH_ABORT +}; + #if HAVE_FULL_CXX11 #define SRT_STATIC_ASSERT(cond, msg) static_assert(cond, msg) #else @@ -154,6 +165,60 @@ struct CNetworkInterface } }; +#if ENABLE_HEAVY_LOGGING +inline std::string RecordLocation(const char* file, int line) +{ + std::ostringstream out; + out << file << ":" << line; + return out.str(); +} +#else +inline std::string RecordLocation(const char*, int) { return std::string(); } +#endif + +struct SocketKeeper +{ + CUDTSocket* socket; + CUDTUnited& glob; + std::string location; + + SocketKeeper(CUDTUnited& go, CUDTSocket* p = NULL, bool acquire_after = true): socket(p), glob(go) + { + if (acquire_after && socket) + acquire_socket(socket); + } + + SocketKeeper(const SocketKeeper& r): socket(r.socket), glob(r.glob) + { + acquire_socket(socket); + } + + SocketKeeper& operator=(const SocketKeeper& r) + { + // Assume the object could not be created without glob. + socket = r.socket; + acquire_socket(socket); + return *this; + } + + void acquire_LOCKED(CUDTSocket* s); + + bool release(); + + ~SocketKeeper() + { + if (socket) + release_socket(socket); + } + + SRTSOCKET id() const; + +private: + static void acquire_socket(CUDTSocket* s); + static void release_socket(CUDTSocket* s); +}; + + std::string SockStatusStr(SRT_SOCKSTATUS s); std::string MemberStatusStr(SRT_MEMBERSTATUS s); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index efa869d76..44ec31163 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -241,6 +241,77 @@ CUDTUnited& CUDT::uglobal() return instance; } +SocketKeeper CUDT::keep(CUDTSocket* s, string loc) +{ + SocketKeeper k(uglobal()); + if (s == NULL || !uglobal().acquireSocket(s)) + { + HLOGC(gglog.Debug, log << "Socket " << s << " acquisition failed at " << loc); + return k; + } + + k.socket = s; + HLOGC(gglog.Debug, log << "Socket " << s << " @" << s->id() << " acquisition at " << loc); + k.location = loc; + return k; +} + +SocketKeeper CUDT::keep(SRTSOCKET id, ErrorHandling erh, string loc) +{ + HLOGC(gglog.Debug, log << "Socket @" << id << " acquisition at " << loc); + SocketKeeper kp (uglobal(), uglobal().locateAcquireSocket(id, erh), false /* do not acquire again*/); + kp.location = loc; + return kp; +} + +void SocketKeeper::acquire_socket(CUDTSocket* s) +{ + // This is an internal function called in the copy constructor. + // ASSUMES the socket exists and is already kept by another + // SocketKeeper, so no official acquisition is done, just + // increase the counter. + SRT_ASSERT(s); + if (s) + { + SRT_ASSERT(s->isStillBusy() > 0); + s->apiAcquire(); + } +} + +void SocketKeeper::acquire_LOCKED(CUDTSocket* s) +{ + socket = s; + s->apiAcquire(); +} + +bool SocketKeeper::release() +{ + if (!socket) + return false; + + glob.releaseSocket(socket); + socket = NULL; + return true; +} + +// NOTE: This is an object that is being kept alive in the central +// database. The release action may turn the counter to 0, but +// this object shall not do anything about this. This is only an +// information for the central database that it is now free to delete +// the socket when it sees it fit. Busy counter only prevents the +// central database from doing it. +void SocketKeeper::release_socket(CUDTSocket* s) +{ + SRT_ASSERT(s); + if (s) + { + SRT_ASSERT(s->isStillBusy() > 0); + s->apiRelease(); + } +} + + + #ifdef SRT_ENABLE_RATE_MEASUREMENT void RateMeasurement::pickup(const clock_time& time) { @@ -1113,7 +1184,7 @@ void CUDT::open() m_tsNextACKTime.store(currtime + m_tdACKInterval); m_tsNextNAKTime.store(currtime + m_tdNAKInterval); m_tsLastRspAckTime = currtime; - m_tsLastSndTime.store(currtime); + m_LastSend.reset(currtime); #if SRT_ENABLE_BONDING m_tsUnstableSince = steady_clock::time_point(); @@ -5646,7 +5717,7 @@ void * CUDT::tsbpd(void* param) rxready = true; if (info.seq_gap) { - // XXX TSA: Requires lock on m_RcvBufferLock (locked already by enterCS) + // XXX [TSA]: Requires lock on m_RcvBufferLock (locked already) const int iDropCnt SRT_ATR_UNUSED = self->rcvDropTooLateUpTo(info.seqno); #if HVU_ENABLE_LOGGING @@ -6281,7 +6352,7 @@ SRT_REJECT_REASON CUDT::setupCC() m_tsNextACKTime.store(currtime + m_tdACKInterval); m_tsNextNAKTime.store(currtime + m_tdNAKInterval); m_tsLastRspAckTime = currtime; - m_tsLastSndTime.store(currtime); + m_LastSend.reset(currtime); #ifdef SRT_ENABLE_RATE_MEASUREMENT HLOGC(bslog.Debug, log << CONID() << "RATE-MEASUREMENT: initializing time TS=" << FormatTime(currtime)); @@ -6453,7 +6524,10 @@ bool CUDT::closeEntity(int reason) ATR_NOEXCEPT // remove this socket from the snd queue if (m_bConnected) + { + HLOGC(smlog.Debug, log << CONID() << "CLOSING: Remove from sender queue"); m_pMuxer->removeSender(this); + } /* * update_events below useless @@ -6958,11 +7032,12 @@ int CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) size = min(len, sndBuffersLeft() * m_iMaxSRTPayloadSize); } + int32_t seqno; { ScopedLock recvAckLock(m_RecvAckLock); // insert the user buffer into the sending list - int32_t seqno = m_iSndNextSeqNo; + seqno = m_iSndNextSeqNo; IF_HEAVY_LOGGING(int32_t orig_seqno = seqno); IF_HEAVY_LOGGING(steady_clock::time_point ts_srctime = steady_clock::time_point() + microseconds_from(w_mctrl.srctime)); @@ -7056,9 +7131,18 @@ int CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) } } - // Insert this socket to the snd list if it is not on the list already. - // m_pMuxer->sndUList()->pop may lock CSndUList::m_ListLock and then m_RecvAckLock - m_pMuxer->updateSendNormal(m_parent); + if (m_config.uSenderMode == 0) + { + // Insert this socket to the snd list if it is not on the list already. + m_pMuxer->updateSendNormal(m_parent); + } + else + { + if (m_LastSched.kickSchedule(seqno, steady_clock::now(), m_tdSendInterval)) + { + m_pMuxer->scheduleSend(m_parent, seqno, sched::TP_REGULAR, m_LastSched.lastTime()); + } + } #ifdef SRT_ENABLE_ECN // IF there was a packet drop on the sender side, report congestion to the app. @@ -7096,6 +7180,21 @@ void CUDT::updateCryptoOnSending() } } +sync::steady_clock::time_point CUDT::calculateRegularSchedTime() +{ + time_point last = m_LastSched.lastTime(); + + time_point now = steady_clock::now(); + if (now - last > seconds_from(1)) + { + // If the sending event is 1 second old, return current time. + // XXX Notify in the log that there was 1 second of time forgotten. + return now; + } + + return last + m_tdSendInterval.load(); +} + int CUDT::recv(char* data, int len) { SRT_MSGCTRL mctrl = srt_msgctrl_default; @@ -7460,6 +7559,12 @@ int64_t CUDT::sendfile(fstream &ifs, int64_t &offset, int64_t size, int block) throw CUDTException(MJ_SETUP, MN_SECURITY, 0); } + if (m_config.uSenderMode != 0) + { + LOGC(aslog.Error, log << CONID() << "In FILE mode the scheduled sender mode is not supported"); + throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI); + } + ScopedLock sendguard (m_SendLock); if (m_pSndBuffer->getCurrBufSize() == 0) @@ -8248,7 +8353,7 @@ void CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rparam, // Fix keepalive if (nbsent) - m_tsLastSndTime.store(steady_clock::now()); + m_LastSend.update(steady_clock::now(), nbsent); } bool CUDT::getFirstNoncontSequence(int32_t& w_seq, string& w_log_reason) @@ -8633,7 +8738,9 @@ bool CUDT::revokeACKedSequences(int32_t ackdata_seqno, int32_t& w_last_sent_seqn HLOGC(inlog.Debug, log << "ACK: kicking the send schedule/cond"); // insert this socket to snd list if it is not on the list yet - const steady_clock::time_point currtime = m_pMuxer->updateSendNormal(m_parent); + const steady_clock::time_point currtime = (m_config.uSenderMode == 0) + ? m_pMuxer->updateSendNormal(m_parent) + : steady_clock::now(); if (m_config.bSynSending) { @@ -8761,7 +8868,9 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point const int cwnd = std::min(m_iFlowWindowSize, m_iCongestionWindow); if (bWasStuck && cwnd > getFlightSpan()) { - m_pMuxer->updateSendNormal(m_parent); + if (m_config.uSenderMode == 0) + m_pMuxer->updateSendNormal(m_parent); + HLOGC(gglog.Debug, log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize << " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno); @@ -9022,6 +9131,10 @@ void CUDT::processCtrlLossReport(const CPacket& ctrlpkt) // when logging is forcefully off. int32_t wrong_loss SRT_ATR_UNUSED = SRT_SEQNO_NONE; + // Will be used to determine rexmit packets to schedule. + // If remain with this value, there's nothing to schedule. + int32_t sched_lo = SRT_SEQNO_NONE, sched_hi = SRT_SEQNO_NONE; + { #if SRT_ENABLE_BONDING // Keep the group from disappearing in the meantime @@ -9082,6 +9195,8 @@ void CUDT::processCtrlLossReport(const CPacket& ctrlpkt) HLOGC(inlog.Debug, log << CONID() << "LOSSREPORT: adding " << losslist_lo << " - " << losslist_hi << " to loss list"); num = m_pSndBuffer->insertLoss(losslist_lo, losslist_hi, steady_clock::now()); + sched_lo = losslist_lo; + sched_hi = losslist_hi; } // ELSE losslist_lo %< m_iSndLastAck else @@ -9108,6 +9223,9 @@ void CUDT::processCtrlLossReport(const CPacket& ctrlpkt) num = m_pSndBuffer->insertLoss(m_iSndLastAck, losslist_hi, steady_clock::now()); dropreq_hi = CSeqNo::decseq(m_iSndLastAck); IF_HEAVY_LOGGING(drop_type = "partially"); + + sched_lo = m_iSndLastAck; + sched_hi = losslist_hi; } // In distinction to losslist, DROPREQ has always just one range, @@ -9169,6 +9287,37 @@ void CUDT::processCtrlLossReport(const CPacket& ctrlpkt) } } + if (m_config.uSenderMode == 1) + { + int npackets = 0, nbytes = 0; + if (!m_pSndBuffer->getPacketRangeSize(sched_lo, sched_hi, (npackets), (nbytes))) + { + // XXX LOGC + } + + time_point start; + duration step; + if (npackets && defineSchedTimes(sched_lo, sched_hi, (start), (step))) + { + int32_t seqno = sched_lo, endseq = CSeqNo::incseq(sched_hi); + + time_point when = start; + for (;;) + { + m_pMuxer->scheduleSend(m_parent, seqno, sched::TP_REXMIT, when); + seqno = CSeqNo::incseq(seqno); + if (seqno == endseq) + { + break; + } + when += step; + } + + // After scheduling update the send time stats. + m_LastSend.update(when, nbytes, npackets); + } + } + updateCC(TEV_LOSSREPORT, EventVariant(losslist, losslist_len)); if (!secure) @@ -9197,7 +9346,8 @@ void CUDT::processCtrlLossReport(const CPacket& ctrlpkt) // blind rexmit mode is laterexmit (the sender will repeat sending // unacknowledged packets only when it has reached the limit with // nothing more to withdraw from the sender buffer). - m_pMuxer->updateSendNormal(m_parent); + if (m_config.uSenderMode == 0) + m_pMuxer->updateSendNormal(m_parent); m_StatsLock.lock(); m_stats.sndr.recvdNak.count(1); @@ -9307,7 +9457,7 @@ void CUDT::processCtrlHS(const CPacket& ctrlpkt) const int nbsent = channel()->sendto(m_PeerAddr, rsppkt, m_SourceAddr); if (nbsent) { - m_tsLastSndTime.store(steady_clock::now()); + m_LastSend.update(steady_clock::now(), nbsent); } } } @@ -10168,7 +10318,7 @@ bool CUDT::packData(CSndPacket& w_sndpkt, steady_clock::time_point& w_nexttime, #endif // Fix keepalive - m_tsLastSndTime.store(enter_time); + m_LastSend.update(enter_time, payload); considerLegacySrtHandshake(steady_clock::time_point()); @@ -10236,6 +10386,117 @@ bool CUDT::packData(CSndPacket& w_sndpkt, steady_clock::time_point& w_nexttime, return payload >= 0; // XXX shouldn't be > 0 ? == 0 is only when buffer range exceeded. } +// Second version of packData: pack the exact data as specified in the specification +// reported from the schedule. +bool CUDT::packData(const SchedPacket& spec, CSndPacket& w_sndpkt, CNetworkInterface& w_src_addr) +{ + int payload = 0; + bool new_packet_packed = false; + + ScopedLock connectguard(m_ConnectionLock); + // If a closing action is done simultaneously, then + // m_bOpened should already be false, and it's set + // just before releasing this lock. + // + // If this lock is caught BEFORE the closing could + // start the dissolving process, this process will + // not be started until this function is finished. + if (!m_bOpened) + return false; + + time_point enter_time = steady_clock::now(); + w_src_addr = m_SourceAddr; + + CPacket& w_packet = w_sndpkt.pkt; + + IF_HEAVY_LOGGING(const char* reason = ""); // The source of the data packet (normal/rexmit/filter) + if (spec.type() == sched::TP_REXMIT) + { + payload = packLostData((w_sndpkt)); + IF_HEAVY_LOGGING(reason = "reXmit"); + } + else if (spec.type() == sched::TP_CONTROL) + { + if (m_PacketFilter.packControlPacket(spec.seqno(), m_CryptoControl.getSndCryptoFlags(), (w_sndpkt.pkt))) + { + HLOGC(qslog.Debug, log << CONID() << "filter: filter/CTL packet ready - packing instead of data."); + payload = (int) w_sndpkt.pkt.getLength(); + IF_HEAVY_LOGGING(reason = "filter"); + + // Stats + ScopedLock lg(m_StatsLock); + m_stats.sndr.sentFilterExtra.count(1); + } + else + { + LOGC(qslog.Error, log << CONID() << "filter: IPE: didn't provide control packet for %" << m_iSndCurrSeqNo + << " that has been scheduled"); + return false; + } + } + else // type() == TP_REGULAR + { + if (!packUniqueData(w_sndpkt)) + { + return false; + } +#if SRT_ENABLE_MAXREXMITBW + if (m_zSndAveragePacketSize > 0) + { + m_zSndAveragePacketSize = avg_iir<16>(m_zSndAveragePacketSize, w_packet.getLength()); + } + else + { + m_zSndAveragePacketSize = w_packet.getLength(); + } + m_zSndMaxPacketSize = std::max(m_zSndMaxPacketSize, w_packet.getLength()); +#endif + new_packet_packed = true; + + payload = (int) w_packet.getLength(); + IF_HEAVY_LOGGING(reason = "normal"); + } + + w_packet.set_id(m_PeerID); // Set the destination SRT socket ID. + + // XXX This should be done when scheduling the packet first time. + // The problem: m_PacketFilter has assigned affinity to the receiver worker thread, + // while scheduling a packet is happening in the application thread. + if (new_packet_packed && m_PacketFilter) + { + HLOGC(qslog.Debug, log << CONID() << "filter: Feeding packet for source clip"); + m_PacketFilter.feedSource((w_packet)); + } + + HLOGC(qslog.Debug, + log << CONID() << "packData: " << reason << " packet seq=" << w_packet.seqno() << " (ACK=" << m_iSndLastAck + << " ACKDATA=" << m_pSndBuffer->firstSeqNo() << " MSG/FLAGS: " << w_packet.MessageFlagStr() << ")"); + + // This means that the packet will really be sent. + if (payload >= 0) + { + // Fix keepalive + m_LastSend.update(enter_time, payload); + + considerLegacySrtHandshake(steady_clock::time_point()); + + // WARNING: TEV_SEND is the only event that is reported from + // the CSndQueue::worker thread. All others are reported from + // CRcvQueue::worker. If you connect to this signal, make sure + // that you are aware of prospective simultaneous access. + updateCC(TEV_SEND, EventVariant(&w_packet)); + + m_StatsLock.lock(); + m_stats.sndr.sent.count(payload); + if (new_packet_packed) + m_stats.sndr.sentUnique.count(payload); + m_StatsLock.unlock(); + return true; + } + + return false; +} + bool CUDT::packUniqueData(CSndPacket& w_sndpkt) { int current_sequence_number; // reflexing variable @@ -12349,6 +12610,11 @@ void CUDT::checkBlindRexmitTimer(const steady_clock::time_point& currtime) log << CONID() << "ENFORCED " << (is_laterexmit ? "LATEREXMIT" : "FASTREXMIT") << " by ACK-TMOUT (scheduling): " << CSeqNo::incseq(m_iSndLastAck) << "-" << csn << " (" << CSeqNo::seqoff(m_iSndLastAck, csn) << " packets)"); + + if (m_config.uSenderMode != 0) + { + scheduleRexmitRange(m_iSndLastAck, csn); + } } } } @@ -12361,7 +12627,98 @@ void CUDT::checkBlindRexmitTimer(const steady_clock::time_point& currtime) const ECheckTimerStage stage = is_fastrexmit ? TEV_CHT_FASTREXMIT : TEV_CHT_REXMIT; updateCC(TEV_CHECKTIMER, EventVariant(stage)); - m_pMuxer->updateSendNormal(m_parent); + if (m_config.uSenderMode == 0) + { + m_pMuxer->updateSendNormal(m_parent); + } +} + +void CUDT::scheduleRexmitRange(int32_t lo, int32_t hi) +{ + // First, measure the distance and see if you can gracefully + // schedule these packets without breaking the current bandwidth limit. + + time_point start; + duration step; + if (defineSchedTimes(lo, hi, (start), (step))) + { + int32_t seqno = lo, endseq = CSeqNo::incseq(hi); + + int npackets = 0, nbytes = 0; + if (!m_pSndBuffer->getPacketRangeSize(lo, hi, (npackets), (nbytes))) + { + // XXX LOGC + } + time_point when = start; + for (;;) + { + m_pMuxer->scheduleSend(m_parent, seqno, sched::TP_REXMIT, when); + seqno = CSeqNo::incseq(seqno); + if (seqno == endseq) + { + break; + } + when += step; + } + + // After scheduling update the send time stats. + m_LastSend.update(when, nbytes, npackets); + } +} + +// This function should check if all packets in the range can be retransmitted +// and whether they can be scheduled for that easily. Remarks: +// - This function is used only for FASTREXMIT or LATEREXMIT; so we can be certain +// that have NAKREPORT off, might be that TLPKTDROP is on, if so, TSBPDMODE is also on. +// - if TLPKTDROP is off, then all packets must be retransmitted, so this procedure must +// define scheduling time for all packets in the range no matter what. +// - if TLPKTDROP is on, DO NOT schedule these packets at all, if the next regular +// packet to send has a critically small expected arrival time (less than avg RTT) +bool CUDT::defineSchedTimes(int32_t lo, int32_t hi, time_point& w_start, duration& w_step) +{ + // XXX THIS IS NOT READY YET. KINDA STUB. + + int distance SRT_ATR_UNUSED = CSeqNo::seqlen(lo, hi); + + time_point start = m_LastSend.time(); + + // XXX HERE do some estimation on how much time you have to send packes + // depending on the expected arrival time and whether there's measurement + // time or flush time. + + // The application shall send packets in groups, while within a group all packets + // should be declared the same delivery time (as expected for the last packet) + // and all should be sent one after another without waiting in between. + + // This procedure should now define sending time for all packets in the range + // depending on the current mode: + // - In measurement mode, all packets should be scheduled as fast as possible. + // The scheduler should be configured here to send packets with maximum + // currently allowed speed, which is defined in m_tdSendInterval. That + // value should be shaped according to the MAXBW settings, also "infinite", + // as well as the currently measured "bandwidth" speed on the link (that is + // for measurement the speed may also increase in time). Note that the LAST + // packet in the group must be sent at time not earlier than RTT + LATENCY + EAT. + // - In flush time, usually set when difference frames are to be scheduled, + // the step time should be defined as: + // - for regular packets, the measured "reasonable" speed, which is the + // speed that should ensure no packet loss, at minimum the speed required + // to send all packets from the group up to the declared time (the + // whole timeslice split into the number of packets in the group). + // - for lost packets, keep the "bandwidh overhead" rule, that is, use the + // "maximum reasonable" speed (or MAXBW if lower) + overhead percentage. + + // Summary of the data required for calculations: + // + // - BANDWIDTH: the currently measured maximum speed. May exceed the MAXBW. + // - MAXBW, INPUTBW: declared or measured maximum bandwidth + // - OHEADBW: the overhead percentage, allowed for retransmissions + // - TOPBW: the maximum speed measured during measurement time + // + + w_start = start; + w_step = m_tdSendInterval; + return true; } void CUDT::checkTimers() @@ -12394,7 +12751,7 @@ void CUDT::checkTimers() checkBlindRexmitTimer(currtime); - if (currtime > m_tsLastSndTime.load() + microseconds_from(COMM_KEEPALIVE_PERIOD_US)) + if (currtime > m_LastSend.time() + microseconds_from(COMM_KEEPALIVE_PERIOD_US)) { sendCtrl(UMSG_KEEPALIVE); #if SRT_ENABLE_BONDING diff --git a/srtcore/core.h b/srtcore/core.h index 28ba0f4d4..bcb913594 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -69,6 +69,7 @@ modified by #include "handshake.h" #include "congctl.h" #include "packetfilter.h" +#include "schedule_snd.h" #include "socketconfig.h" #include "utilities.h" @@ -139,6 +140,234 @@ enum SeqPairItems // Extended SRT Congestion control class - only an incomplete definition required class CCryptoControl; + +class CLastSend +{ + friend class CUDT; + sync::AtomicClock m_tsSendTime; + + // Statistical data, reset when sending ACKACK + sync::AtomicClock m_tsBeginTime; + sync::atomic m_uNumberPackets; + sync::atomic m_uNumberBytes; + uint32_t m_uBytesPerSecond; + + CLastSend(): m_uNumberPackets(0), m_uNumberBytes(0) + { + } + + sync::steady_clock::time_point time() const { return m_tsSendTime; } + + void reset(const sync::steady_clock::time_point& tm) + { + using namespace sync; + + steady_clock::time_point old = m_tsBeginTime; + m_tsBeginTime.store(m_tsSendTime.load()); + + if (!is_zero(old)) + { + steady_clock::duration diff = m_tsBeginTime.load() - old; + uint64_t basesize = m_uNumberBytes * 1000 * 1000; + uint64_t basetime = count_microseconds(diff); + if (basetime) // prevent division by 0 + m_uBytesPerSecond = basesize / basetime; + // Otherwise keep unchanged; this branch is considered to + // be run only if there was some data collected b4 + } + else + { + m_uBytesPerSecond = 0; + } + m_tsSendTime = tm; + m_uNumberBytes = 4; + m_uNumberPackets = 1; + + steady_clock::duration diff = tm - m_tsBeginTime.load(); + uint64_t basesize = m_uNumberBytes * 1000 * 1000; + uint64_t basetime = count_microseconds(diff); + if (basetime) // prevent division by 0 + m_uBytesPerSecond = (5*m_uBytesPerSecond + (basesize / basetime))/6; + } + + void update(const sync::steady_clock::time_point& tm, uint32_t bytes, uint32_t npackets = 1) + { + m_tsSendTime = tm; + // XXX IMPLEMENT R-M-W mode += operator for atomics!!! + m_uNumberBytes = m_uNumberBytes + bytes; + m_uNumberPackets = m_uNumberPackets + npackets; + + // Do not calculate speed here. Do it on reset only. + } +}; + +class CLastSched +{ + friend class CUDT; + + sync::Mutex m_Lock; + sync::atomic m_iSchedSeqNo; // SEQNO up to which regular packets were scheduled + int32_t m_iBufferedSeqNo; // SEQNO up to which there are packets in the sender buffer + + sync::AtomicClock m_tsTime; + sync::AtomicDuration m_tdLastInterval; + + sync::atomic m_bChain; + +public: + + static const int SCHEDULE_FORFEIT_LIMIT_MS = 500; + + CLastSched() : m_iSchedSeqNo(SRT_SEQNO_NONE), m_iBufferedSeqNo(SRT_SEQNO_NONE), m_tdLastInterval(), m_bChain(false) {} + + sync::steady_clock::time_point lastTime() const { return m_tsTime.load(); } + bool shallChain() const { return m_bChain; } + void set_intervnal(const sync::steady_clock::duration& i) { m_tdLastInterval = i; } + int32_t lastSchedSeq() const { return m_iSchedSeqNo; } + + // This is to be called on adding a new packet to the sender buffer. + // We need: + // - buffered_end: sequence number of the just added packet + // - currtime: the current time when calling + // - interval: the scheduling interval at the current maximum allowed speed + // Return: + // - true: the caller should then enqueue the packet in the schedule as regular + // - false: do nothing; the sender worker will do the job as needed + bool kickSchedule(int32_t buffered_end, const sync::steady_clock::time_point& currtime, const sync::steady_clock::duration& interval) + { + sync::ScopedLock lk (m_Lock); + + if (m_iSchedSeqNo == SRT_SEQNO_NONE) + { + // Nothing was scheduled so far and buffered_end is the + // very first packet in the buffer (we schedule one packet + // at a time). + m_iBufferedSeqNo = buffered_end; + m_iSchedSeqNo = buffered_end; + m_tsTime = currtime; + + // Still false because we have just one packet here, + // so once this is scheduled, there's nothing more. + m_bChain = false; + return true; + } + + // We had something scheduled earlier + bool empty = (m_iSchedSeqNo == m_iBufferedSeqNo); + m_iBufferedSeqNo = buffered_end; + + if (!empty && m_bChain) + { + // If it wasn't empty, rely on the next schedule + // from the worker thread (chain scheduling). + + // Also do not update the scheduling time - this + // will be in the hands of the worker thread. + return false; + } + + // Set the interval for the chain. The interval can be + // modified at any time later. + m_tdLastInterval = interval; + + // Here we know that the worker will not be chaining + // the schedule after the previous execution. We need + // to schedule. + + sync::steady_clock::time_point forfeiture = currtime - interval; + // Ok, note that the last schedule time can as well be in the future. + // The `interval` is the shortest possible time distance allowed + // to be kept between sent packets. That's why we need to have + // exactly 1s of positive distance so that the unused time forfeits. + if (forfeiture - m_tsTime.load() > sync::milliseconds_from(SCHEDULE_FORFEIT_LIMIT_MS)) + { + // In case when the last send time recorded here is older by + // more than 1 second from the current time carried back by 1 + // interval, forfeit that time and update to that "earliest possible". + m_tsTime = forfeiture; + } + else + { + // If the last sending time was even in the past, but less than + // 1 second behind the current time, just increase the sending + // time by interval. + m_tsTime = m_tsTime.load() + interval; + } + + m_iSchedSeqNo = CSeqNo::incseq(m_iSchedSeqNo); + + // If equal, it means that we DO WANT to schedule the earliest + // handing packet, but the worker should not try to chain + // the next one. + m_bChain = (m_iSchedSeqNo != m_iBufferedSeqNo); + return true; + } + + // The function to be called from the sender worker. + // It has just executed schedule for a given regular packet + // and has to schedule sending the next one. + // Returns false if it turned to the last one and should not chain next one. + bool chainSchedule(int32_t last_seqno, const sync::steady_clock::time_point& exec_time) + { + // Note: exec_time is the expected time when the next + // packet should be scheduled. + sync::ScopedLock lk (m_Lock); + + // We state that m_iSeqNo is set already, as it should have been + // set by the main thread scheduler (after adding buffer). We + // need to only check if there's anything new to schedule. + // The lock is applied to prevent the simultaneous regular packet + // checker to change the state when trying to add a new packet. + + // If reached the end (no new regular packets eligible for scheduling) + // do nothing and return false. JUST IN CASE (the call shall not happen + // in such case). + if (m_iSchedSeqNo == m_iBufferedSeqNo || !m_bChain) + return false; + + if (last_seqno != m_iSchedSeqNo) + { + // XXX ERROR jump-over schedule + if (CSeqNo::seqcmp(last_seqno, m_iBufferedSeqNo) > 0) + { + // XXX ERROR has scheduled packet not added to the buffer + // Just stop the schedule and wait for the API function + // to reinstate it + m_iSchedSeqNo = m_iBufferedSeqNo; + m_bChain = false; + return false; + } + + m_iSchedSeqNo = last_seqno; + } + + // Ok, we have one packet to schedule. + // The time passed as exec_time already involves interval and it should + // be calculated by the caller basing on lastTime() returned from here. + // Note that the API function modifies this time only when it schedules + // the packet, while worker doesn't. + + int32_t newseq = CSeqNo::incseq(m_iSchedSeqNo); + + if (newseq == m_iBufferedSeqNo) + { + // After this one, there are no more packets to schedule ATM. + // So turn off chaining. The API call will reinstate it on the + // next call. + m_bChain = false; + + // NOTE: if m_bChain == false, this function shall not have + // been called + } + + // We do want to schedule this one, so record the data + m_iSchedSeqNo = newseq; + m_tsTime = exec_time; + return true; + } + +}; + class CUDTUnited; class CUDTSocket; #if SRT_ENABLE_BONDING @@ -591,6 +820,13 @@ class CUDT static CUDTUnited& uglobal(); // UDT global management base + static SocketKeeper keep_none() { SocketKeeper k(uglobal()); return k; } + static SocketKeeper keep_noacquire(CUDTSocket* s) { SocketKeeper k(uglobal(), s, false); return k; } + static SocketKeeper keep(CUDTSocket* s = NULL, std::string loc = ""); + static SocketKeeper keep(SRTSOCKET, ErrorHandling erh = ERH_RETURN, std::string loc = ""); + +#define SOCKET_KEEP(...) CUDT::keep(__VA_ARGS__, RecordLocation(__FILE__, __LINE__)) + std::set& pollset() { return m_sPollID; } CSrtConfig m_config; @@ -916,7 +1152,10 @@ class CUDT /// and KMX message resent (when key change period passed and the packet was lost). SRT_TSA_NEEDS_NONLOCKED(m_ConnectionLock) void checkSndTimers(); - + + // For schedule mode sending + sync::steady_clock::time_point calculateRegularSchedTime(); + /// @brief Check and perform KM refresh if needed. bool checkSndKMRefresh(int* aw_keyindex); @@ -1091,7 +1330,7 @@ class CUDT #endif #endif - atomic_duration m_tdSendInterval; // Inter-packet time, in CPU clock cycles + atomic_duration m_tdSendInterval; // Inter-packet time according to the current bandwidth limit atomic_duration m_tdSendTimeDiff; // Aggregate difference in inter-packet sending time @@ -1111,7 +1350,7 @@ class CUDT SRT_TSA_GUARDED_BY(m_RecvAckLock) atomic_time_point m_tsLastRspTime; // Timestamp of last response from the peer time_point m_tsLastRspAckTime; // (SND) Timestamp of last ACK from the peer - atomic_time_point m_tsLastSndTime; // Timestamp of last data/ctrl sent (in system ticks) + CLastSend m_LastSend; // Time and stats for the last sending time_point m_tsLastWarningTime; // Last time that a warning message is sent atomic_time_point m_tsLastReqTime; // last time when a connection request is sent time_point m_tsRcvPeerStartTime; @@ -1123,7 +1362,8 @@ class CUDT int m_iPktCount; // Packet counter for ACK int m_iLightACKCount; // Light ACK counter - time_point m_tsNextSendTime; // Scheduled time of next packet sending + time_point m_tsNextSendTime; // Scheduled time of next packet sending (normal mode) + CLastSched m_LastSched; // Data for packet scheduling (scheduler mode) sync::atomic m_iSndLastFullAck; // Last full ACK received SRT_TSA_GUARDED_BY(m_RecvAckLock) @@ -1387,6 +1627,10 @@ class CUDT bool packData(CSndPacket& packet, time_point& nexttime, CNetworkInterface& src_addr); bool releaseSend(); void removeSndLossUpTo(int32_t seq); + bool packData(const SchedPacket& spec, CSndPacket& packet, CNetworkInterface& src_addr); + + bool defineSchedTimes(int32_t lo, int32_t hi, time_point& w_start, duration& w_step); + void scheduleRexmitRange(int32_t lo, int32_t hi); #if USE_RECEIVER_UNIT_POOL SRT_TSA_NEEDS_NONLOCKED(m_RcvTsbPdStartupLock, m_StatsLock, m_RecvLock, m_RcvLossLock, m_RcvBufferLock) diff --git a/srtcore/filelist.maf b/srtcore/filelist.maf index a5421bf38..d257cd773 100644 --- a/srtcore/filelist.maf +++ b/srtcore/filelist.maf @@ -21,6 +21,7 @@ packet.cpp packetfilter.cpp queue.cpp congctl.cpp +schedule_snd.cpp socketconfig.cpp srt_c_api.cpp ../logging/hvu_compat.c diff --git a/srtcore/group.cpp b/srtcore/group.cpp index fd8483bd8..0e9e6d186 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -953,7 +953,7 @@ void CUDTGroup::getOpt(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen) // going to be deleted. Hence use the safest method by extracting through the id. if (firstsocket != SRT_INVALID_SOCK) { - CUDTUnited::SocketKeeper sk(CUDT::uglobal(), firstsocket); + SocketKeeper sk = CUDT::keep(firstsocket); if (sk.socket) { // Return the value from the first member socket, if any is present @@ -3093,7 +3093,7 @@ int CUDTGroup::recv_old(char* buf, int len, SRT_MSGCTRL& w_mc) << " time=" << FormatTime(infoToRead.tsbpd_time)); } - const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN); + const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), ERH_RETURN); HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << socketToRead->core().m_SocketID << ": Extracted data with %" << w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res))); @@ -4399,7 +4399,7 @@ void CUDTGroup::sendBackup_RetryWaitBlocked(SendBackupCtx& w_sendBackupCtx if (i->second & SRT_EPOLL_ERR) { SRTSOCKET id = i->first; - CUDTSocket* s = m_Global.locateSocket(id, CUDTUnited::ERH_RETURN); // << LOCKS m_GlobControlLock! + CUDTSocket* s = m_Global.locateSocket(id, ERH_RETURN); // << LOCKS m_GlobControlLock! if (s) { HLOGC(gslog.Debug, diff --git a/srtcore/packetfilter.cpp b/srtcore/packetfilter.cpp index c9635cbbc..e80c7b400 100644 --- a/srtcore/packetfilter.cpp +++ b/srtcore/packetfilter.cpp @@ -27,6 +27,11 @@ using namespace srt::logging; using namespace srt::sync; namespace srt { + +/////////////////////////////////////// +// CONFIGURATION section +////////////////////////////////////// + bool PacketFilter::Internal::ParseConfig(const string& s, SrtFilterConfig& w_config, PacketFilter::Factory** ppf) { if (!SrtParseConfig(s, (w_config))) @@ -112,6 +117,10 @@ bool PacketFilter::Internal::CheckFilterCompat(SrtFilterConfig& w_agent, const S return true; } +/////////////////////////////////////// +// RECEIVER section +////////////////////////////////////// + bool PacketFilter::provide(const CPacket& rpkt, CallbackHolder handler, loss_seqs_t& w_loss_seqs) { bool passthrough = false; @@ -176,20 +185,55 @@ bool PacketFilter::provide(const CPacket& rpkt, CallbackHolder handler) { - bool have = m_filter->packControlPacket(m_sndctlpkt, seq); - if (!have) - return false; + if (m_provided.empty()) + return; + + for (vector::iterator i = m_provided.begin(); i != m_provided.end(); ++i) + { + bool shall_continue = CALLBACK_CALL(handler, (const char*)i->hdr, i->buffer, i->length); + if (!shall_continue) + break; + } + + m_provided.clear(); +} + + +/////////////////////////////////////// +// SENDER section +////////////////////////////////////// + +size_t PacketFilter::cacheControlPackets(int32_t seq) +{ + size_t oldsize = m_control.size(); + for (int y = 0; y < 16; ++y) + { + m_control.push_back(SrtPacket()); + size_t lastx = m_control.size() - 1; + bool have = m_filter->packControlPacket((m_control[lastx]), seq); + if (!have) + { + // Remove the prepared object if that time it didn't work. + m_control.pop_back(); + break; + } + } + + return m_control.size() - oldsize; +} +void PacketFilter::copyPacket(SrtPacket& src, int kflg, CPacket& w_packet) +{ // Now this should be repacked back to CPacket. // The header must be copied, it's always part of CPacket. uint32_t* hdr = w_packet.getHeader(); - memcpy((hdr), m_sndctlpkt.hdr, SRT_PH_E_SIZE * sizeof(*hdr)); + memcpy((hdr), src.hdr, SRT_PH_E_SIZE * sizeof(*hdr)); // The buffer can be assigned. - w_packet.m_pcData = m_sndctlpkt.buffer; - w_packet.setLength(m_sndctlpkt.length); + w_packet.m_pcData = src.buffer; + w_packet.setLength(src.length); // This sets only the Packet Boundary flags, while all other things: // - Order @@ -204,24 +248,66 @@ bool PacketFilter::packControlPacket(int32_t seq, int kflg, CPacket& w_packet) // Don't set the ID, it will be later set for any kind of packet. // Write the timestamp clip into the timestamp field. +} + +bool PacketFilter::packControlPacket(int32_t seq, int kflg, CPacket& w_packet) +{ + ScopedLock lk (m_SenderLock); + // First, check how many have been already once extracted. + // Note that extracted packages remain here until they are decommissioned. + + if (m_control_extracted == m_control.size()) // includes empty + { + // Nothing in cache, try to cache. + // Here we extract as much control packets as possible. + // All packets are being cached. One packet gets extracted here. + if (cacheControlPackets(seq) == 0) + return false; // still nothing in cache + } + + copyPacket(m_control[m_control_extracted], kflg, (w_packet)); + ++m_control_extracted; return true; } -void PacketFilter::CopyRebuilt(CallbackHolder handler) +size_t PacketFilter::cachedPackets() const { - if (m_provided.empty()) - return; + ScopedLock lk (m_SenderLock); + return m_control.size() - m_control_extracted; +} - for (vector::iterator i = m_provided.begin(); i != m_provided.end(); ++i) +void PacketFilter::decommissionSender(int32_t seq) +{ + ScopedLock lk (m_SenderLock); + + vector::iterator i = m_control.begin(); + while (i != m_control.end()) { - bool shall_continue = CALLBACK_CALL(handler, (const char*)i->hdr, i->buffer, i->length); - if (!shall_continue) + if (CSeqNo::seqcmp(i->header(SRT_PH_SEQNO), seq) > 0) break; } - - m_provided.clear(); + // Now i points to a packet that should stay. + if (i == m_control.end()) + { + m_control.clear(); + m_control_extracted = 0; + } + else + { + int n = std::distance(m_control.begin(), i); + m_control.erase(m_control.begin(), i); + if (n >= int(m_control_extracted)) + m_control_extracted = 0; + else + m_control_extracted -= n; + } } + +/////////////////////////////////////// +// FACTORY section +////////////////////////////////////// + // Placement here is necessary in order to mark the location to // store the PacketFilter::Factory class characteristic object. PacketFilter::Factory::~Factory() diff --git a/srtcore/packetfilter.h b/srtcore/packetfilter.h index 87fd58ba1..821770549 100644 --- a/srtcore/packetfilter.h +++ b/srtcore/packetfilter.h @@ -179,13 +179,17 @@ class PacketFilter // In the beginning it's initialized as first, builtin default. // Still, it will be created only when requested. - PacketFilter(): m_filter(), m_parent(), m_sndctlpkt(0) /*, m_unitq()*/ {} + PacketFilter(): m_filter(), m_parent(), m_control_extracted(0) /*, m_unitq()*/ + { + } // Copy constructor - important when listener-spawning // Things being done: // 1. The filter is individual, so don't copy it. Set NULL. // 2. This will be configured anyway basing on possibly a new rule set. - PacketFilter(const PacketFilter& source SRT_ATR_UNUSED): m_filter(), m_parent(), m_sndctlpkt(0) /*, m_unitq()*/ {} + PacketFilter(const PacketFilter& source SRT_ATR_UNUSED): m_filter(), m_parent(), m_control_extracted(0) /*, m_unitq()*/ + { + } // This function will be called by the parent CUDT // in appropriate time. It should select appropriate @@ -203,6 +207,9 @@ class PacketFilter // Simple wrappers void feedSource(CPacket& w_packet) { SRT_ASSERT(m_filter); return m_filter->feedSource((w_packet)); } SRT_ARQLevel arqLevel() { SRT_ASSERT(m_filter); return m_filter->arqLevel(); } + + // Packs a single control packet. Prepares the cache, if needed. Returns false + // if no packet was cached and cache refreshing also failed. bool packControlPacket(int32_t seq, int kflg, CPacket& w_packet); // This handler will be called for every packet rebuilt (including 0 times). @@ -212,6 +219,14 @@ class PacketFilter // NOTE: it's up to the caller to sort all provided packets by sequence number! typedef bool copy_rebuilt_fn(void* opaq, const char* header, const char* data, size_t datasize); bool provide(const CPacket& packet, CallbackHolder handler, loss_seqs_t& w_loss_seqs); + // Caches any pending control packets and returns the number of + // packets that were not scheduled yet. + size_t cacheControlPackets(int32_t seq); + + // Removes from the control packet cache all packets older than + // the given sequence. + void decommissionSender(int32_t seq); + size_t cachedPackets() const; protected: PacketFilter& operator=(const PacketFilter& p); @@ -220,10 +235,18 @@ class PacketFilter CUDT* m_parent; // Sender part - SrtPacket m_sndctlpkt; + // After a single packet getting scheduled, there can be generated + // maximum of 2 control packets, if it happened after the last packet + // in the column and a complete row simultaneously + + mutable sync::Mutex m_SenderLock; + std::vector m_control; + size_t m_control_extracted; // Receiver part std::vector m_provided; + + static void copyPacket(SrtPacket& src, int kflg, CPacket& w_packet); }; inline bool CheckFilterCompat(SrtFilterConfig& w_agent, const SrtFilterConfig& peer) diff --git a/srtcore/packetfilter_api.h b/srtcore/packetfilter_api.h index ef0d8867f..d5058f1a6 100644 --- a/srtcore/packetfilter_api.h +++ b/srtcore/packetfilter_api.h @@ -69,7 +69,7 @@ struct SrtPacket char buffer[SRT_MAX_PLSIZE_AF_INET]; // Using this as the bigger one (this for AF_INET6 is smaller) size_t length; - SrtPacket(size_t size): length(size) + SrtPacket(size_t size = 0): length(size) { memset(hdr, 0, sizeof(hdr)); } diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 947bdc49c..65c9706cd 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -605,6 +605,7 @@ void CSendOrderList::signalInterrupt() // CSndQueue::CSndQueue(CMultiplexer* parent): m_parent(parent), + m_pWorkerFunction(NULL), m_SendOrderList(parent->m_SocketsLock), m_pChannel(NULL), m_bClosing(false) @@ -621,8 +622,14 @@ void CSndQueue::stop() { // We use the decent way, so we say to the thread "please exit". m_bClosing = true; - - m_SendOrderList.signalInterrupt(); + if (m_pWorkerFunction == &workerSendOrder_fwd) + { + m_SendOrderList.signalInterrupt(); + } + else if (m_pWorkerFunction == &workerPacketScheduler_fwd) + { + m_Scheduler.interrupt(); + } // Sanity check of the function's affinity. if (sync::this_thread_is(m_WorkerThread)) @@ -656,12 +663,22 @@ void CSndQueue::init(CChannel* c) #else const char* thname = "SRT:SndQ"; #endif - if (!StartThread((m_WorkerThread), CSndQueue::worker_fwd, this, thname)) + m_pWorkerFunction = SelectWorkerFunction(); + + if (!StartThread((m_WorkerThread), *m_pWorkerFunction, this, thname)) { throw CUDTException(MJ_SYSTEMRES, MN_THREAD); } } +CSndQueue::worker_fn* CSndQueue::SelectWorkerFunction() +{ + if (m_parent->cfg().uSenderMode == 0) + return workerSendOrder_fwd; + + return workerPacketScheduler_fwd; +} + #if defined(SRT_DEBUG_SNDQ_HIGHRATE) static void CSndQueueDebugHighratePrint(const CSndQueue* self, const steady_clock::time_point currtime) @@ -738,7 +755,7 @@ void CSndQueue::workerSendOrder() // We can use acquire_LOCKED because no one will delete a bound socket // until it's removed from the multiplexer, and against that we are protected // by m_SocketsLock mutex. - CUDTUnited::SocketKeeper keep; + SocketKeeper keep = CUDT::keep_noacquire(runner->m_pSocket); keep.acquire_LOCKED(runner->m_pSocket); // Get a socket with a send request if any. @@ -824,6 +841,88 @@ void CSndQueue::workerSendOrder() // to be returned from none(). Here it's returned as empty_list.end(). SocketHolder::socklist_t SocketHolder::empty_list; +SendTask::taskiter_t CMultiplexer::scheduleSend(CUDTSocket* src, int32_t seqno, sched::Type type, const sync::steady_clock::time_point& when) +{ + SendTask task (SchedPacket(src, seqno, type), when); + return m_SndQueue.m_Scheduler.enqueue_task(src->id(), task); +} + +void CSndQueue::workerPacketScheduler() +{ + std::string thname; + ThreadName::get(thname); + THREAD_STATE_INIT(thname.c_str()); + + int interrupt_credit = 5; + + for (;;) + { + if (m_bClosing) + { + HLOGC(qslog.Debug, log << "SndQ: closed, exiting"); + break; + } + + HLOGC(qslog.Debug, log << "SndQ: waiting to get next send candidate..."); + THREAD_PAUSED(); + SchedPacket p = m_Scheduler.wait_pop(); + THREAD_RESUMED(); + INCREMENT_THREAD_ITERATIONS(); + + if (p.empty()) + { + --interrupt_credit; + if (!m_bClosing) + { + LOGC(qslog.Error, log << "SndQ: IPE: scheduler interrupted while queue is running!"); + if (!interrupt_credit) + { + m_bClosing = true; + LOGC(qslog.Fatal, log << "SndQ: IPE: closing sender queue to prevent spamming"); + break; + } + } + continue; // If m_bClosing, the loop will exit at the next iteration + } + + CSndPacket sndpkt; + CNetworkInterface source_addr; + CUDTSocket* s = p.m_Socket.socket; + const bool res = s->core().packData(p, (sndpkt), (source_addr)); + if (!res) + { + LOGC(qslog.Error, log << "SndQ: IPE: @" << s->id() + << " didn't provide packet for scheduled specification"); + --interrupt_credit; + if (!interrupt_credit) + { + // XXX Signal broken socket by IPE + m_bClosing = true; + } + continue; + } + const sockaddr_any target_addr = s->core().m_PeerAddr; + m_pChannel->sendto(target_addr, sndpkt.pkt, source_addr); + + // Restore to maximum after a successful extraction. + interrupt_credit = 5; + + steady_clock::time_point when = s->core().calculateRegularSchedTime(); + + // Schedule the next packet, if possible. + // If not possible, the next packet will be chained directly + // by the API function. + CLastSched& sc = s->core().m_LastSched; + if (sc.chainSchedule(sndpkt.pkt.getSeqNo(), when)) + { + SendTask task (SchedPacket(s, sc.lastSchedSeq() , sched::TP_REGULAR), when); + m_Scheduler.enqueue_task(s->id(), task); + } + } + THREAD_EXIT(); +} + + void CMultiplexer::removeRID(const SRTSOCKET& id) { ScopedLock lkv(m_SocketsLock); @@ -1491,7 +1590,7 @@ EReadStatus CRcvQueue::worker_DropIncomingPacket(sockaddr_any& w_addr) // - CONN_CONTINUE: the socket is acquired, you can continue passing the packet to it. // - any other: this is an error, socket NOT acquired // must be static due to dependencies that can't be exposed to the interface -static EConnectStatus rcv_AcquireTargetSocket(CMultiplexer* parent, SRTSOCKET id, const sockaddr_any& addr, CUDTUnited::SocketKeeper& w_sk, SocketHolder::State& w_hstate) +static EConnectStatus rcv_AcquireTargetSocket(CMultiplexer* parent, SRTSOCKET id, const sockaddr_any& addr, SocketKeeper& w_sk, SocketHolder::State& w_hstate) { w_hstate = SocketHolder::INIT; CUDTSocket* s = parent->findAgent(id, addr, (w_hstate), parent->ACQ_ACQUIRE); @@ -1521,8 +1620,7 @@ static EConnectStatus rcv_AcquireTargetSocket(CMultiplexer* parent, SRTSOCKET id if (!u.stillConnected()) { // Socket will be released in this block. - CUDTUnited::SocketKeeper sk; - sk.socket = s; // Acquired by findAgent() call + SocketKeeper sk = CUDT::keep_noacquire(s); // Acquired by findAgent() call u.updateRejectReason(SRT_REJ_CLOSE); HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: target @" @@ -1602,7 +1700,7 @@ EReadStatus CRcvQueue::worker_RetrieveAndProcessUnit(EConnectStatus& w_cst, cons // Otherwise ID is expected to be associated with: // - an enqueued rendezvous socket // - a socket connected to a peer - CUDTUnited::SocketKeeper sk; + SocketKeeper sk = CUDT::keep_none(); SocketHolder::State hstate; w_cst = rcv_AcquireTargetSocket(m_parent, w_id, sa, (sk), (hstate)); if (w_cst == CONN_CONTINUE) @@ -1766,8 +1864,7 @@ bool CRcvQueue::worker_TryAcceptedSocket(const CPacket& pkt, const sockaddr_any& } // Acquired in findPeer, so this can be now kept without acquiring m_GlobControlLock. - CUDTUnited::SocketKeeper keep; - keep.socket = s; + SocketKeeper skeep = CUDT::keep_noacquire(s); CUDT* u = &s->core(); if (u->m_bBroken || u->m_bClosing) diff --git a/srtcore/queue.h b/srtcore/queue.h index 6352e219c..5ca79b23a 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -60,6 +60,7 @@ modified by #include "common.h" #include "packet.h" #include "socketconfig.h" +#include "schedule_snd.h" #include "netinet_any.h" #include "utilities.h" @@ -622,17 +623,17 @@ class CSendOrderList m_bRunning = false; } - // Design-patching. - // This lock must be applied when a socket is removed from the - // multiplexer. This must be done so to prevent another thread from - // reaching out to the order container containing nodes begin now removed. - private: HeapSet m_Schedule; friend class CSndQueue; + // Design-patching. + // This lock must be applied when a socket is removed from the + // multiplexer. This must be done so to prevent another thread from + // reaching out to the order container containing nodes begin now removed. + sync::Mutex& m_ExternLock; // pinned into CMultiplexer::m_SocketsLock mutable sync::Condition m_ListCond; sync::atomic m_bRunning; @@ -688,23 +689,31 @@ class CSndQueue void stop(); private: - static void* worker_fwd(void* param) - { - CSndQueue* self = (CSndQueue*)param; - self->workerSendOrder(); + typedef void* worker_fn(void*); +#define DEFINE_FWD(classname, name) \ + static void* name##_fwd(void* param) { classname* self = (classname*)param; self->name(); return NULL; } - return NULL; - } void workerSendOrder(); + DEFINE_FWD(CSndQueue, workerSendOrder); + void workerPacketScheduler(); + DEFINE_FWD(CSndQueue, workerPacketScheduler); + +#undef DEFINE_FWD + + worker_fn* m_pWorkerFunction; + sync::CThread m_WorkerThread; -private: CSendOrderList m_SendOrderList; // List of socket instances for data sending CChannel* m_pChannel; // The UDP channel for data sending sync::atomic m_bClosing; // closing the worker + SendScheduler m_Scheduler; + + worker_fn* SelectWorkerFunction(); + public: @@ -1086,6 +1095,10 @@ struct CMultiplexer void updateSendFast(CUDTSocket* s); void removeSender(CUDT* u); + // SCHEDULER API + + SendTask::taskiter_t scheduleSend(CUDTSocket* src, int32_t seqno, sched::Type type, const sync::steady_clock::time_point& when); + }; } // namespace srt diff --git a/srtcore/schedule_snd.cpp b/srtcore/schedule_snd.cpp new file mode 100644 index 000000000..7a2f5d7e3 --- /dev/null +++ b/srtcore/schedule_snd.cpp @@ -0,0 +1,279 @@ + +#include +#include + +#include "schedule_snd.h" +#include "core.h" +#include "logging.h" + +using namespace std; +using namespace srt; +using namespace srt::sync; + +using srt::logging::qslog; + +namespace srt +{ + +SchedPacket::SchedPacket(CUDTSocket* sock, int32_t seqno, sched::Type t): + m_Socket(CUDT::uglobal()) +{ + if (sock) + m_Socket = CUDT::keep(sock); + + m_iSeqNo = seqno; + m_Type = t; +} + +void SchedPacket::set_socket(CUDTSocket* sock) +{ + if (sock) + m_Socket = CUDT::keep(sock); +} + +static const char* const schedtype [] = {"regular", "rexmit", "pf-control"}; + +std::string SendTask::print(SendTask::taskiter_t v) +{ + std::ostringstream out; + // Complicated pre-C++20 time formatting... + time_t tval = count_seconds(v->m_tsSendTime.time_since_epoch()); + + int64_t total_usec = count_microseconds(v->m_tsSendTime.time_since_epoch()); + int64_t usec = total_usec - (tval * 1000000); + + struct tm tme; + localtime_r(&tval, &tme); + out << "<" << std::put_time(&tme, "%T") << "."; + out << setw(6) << setfill('0') << usec << "> @" << v->m_Packet.m_Socket.id(); + int32_t seq = v->m_Packet.seqno(); + if (seq == SRT_SEQNO_NONE) + { + out << " [empty]"; + } + else + { + out << " ["; + out << schedtype[v->m_Packet.type()]; + out << "] %" << seq; + } + return out.str(); +} + +std::list SendTask::free_list; + +SendTask::taskiter_t SendScheduler::enqueue_task(socket_t id, const SendTask& proto) +{ + if (m_bBroken) + { + HLOGC(qslog.Debug, log << "Schedule: ENQ: DENIED, schedule is broken"); + return SendTask::none(); + } + + sync::ScopedLock lk (m_Lock); + SendTask::tasklist_t& wlist = m_TaskMap[id]; + wlist.push_back(proto); + SendTask::taskiter_t itask = --wlist.end(); + itask->m_pBaseList = &wlist; + + bool was_ready = have_task_ready(); + + // Now enqueue it in m_TaskQueue + size_t pos = m_TaskQueue.insert(itask); + + IF_HEAVY_LOGGING(bool was_first = false); + if (pos == 0) // earliest task + { + m_tsAboutTime = m_TaskQueue.top()->m_tsSendTime; // INSERTED: will not be empty + IF_HEAVY_LOGGING(was_first = true); + } + + if (!was_ready && have_task_ready()) + { + HLOGC(qslog.Debug, log << "Schedule: ENQ: new READY task at T=" << FormatTime(itask->m_tsSendTime) + << (was_first ? " (NEW TOP)" : "") << " - NOTIFY"); + m_TaskReadyCond.notify_all(); + } + else + { + HLOGC(qslog.Debug, log << "Schedule: ENQ: new task at T=" << FormatTime(itask->m_tsSendTime) + << (was_first ? " (NEW TOP)" : "") << (!was_ready ? " (NOW READY)" : " (ONLY ADDED)")); + } + return itask; +} + +bool SendScheduler::have_task_ready() +{ + if (!m_TaskQueue.empty()) + { + SendTask::taskiter_t earliest = m_TaskQueue.top(); + if (earliest->is_ready(clock_type::now())) + { + return true; + } + } + return false; +} + +bool SendScheduler::wait() +{ + UniqueLock lk (m_Lock); + return wait_extlock((lk)); +} + +bool SendScheduler::wait_extlock(UniqueLock& lk) +{ + IF_HEAVY_LOGGING(typedef steady_clock::time_point ClockTime); + for (;;) + { + if (m_bBroken) + { + HLOGC(qslog.Debug, log << "Schedule: WAIT: not waiting, schedule is broken"); + return false; + } + + IF_HEAVY_LOGGING(ClockTime now = steady_clock::now()); + if (have_task_ready()) + { + IF_HEAVY_LOGGING(ClockTime next = m_TaskQueue.top()->m_tsSendTime); + HLOGC(qslog.Debug, log << "Schedule: WAIT: task ready since " << FormatDurationAuto(now - next)); + break; + } + +#if ENABLE_HEAVY_LOGGING + if (m_TaskQueue.empty()) + { + LOGC(qslog.Debug, log << "Schedule: WAIT: task NOT ready, NO NEW TASKS, WAIT FOR SIGNAL"); + } + else + { + ClockTime next = m_TaskQueue.top()->m_tsSendTime; + LOGC(qslog.Debug, log << "Schedule: WAIT: task NOT ready, next in " + << FormatDurationAuto(next - now) << " at T=" << FormatTime(next) << " - WAIT FOR READY"); + } +#endif + + m_TaskReadyCond.wait(lk); + } + + return true; +} + +void SendScheduler::withdraw(socket_t id) +{ + sync::ScopedLock lk (m_Lock); + // Delete all tasks for the given socket id. + // We have them collected in the list: m_TaskMap + + SendTask::tasklist_t& id_list = m_TaskMap[id]; + + // As we know that all these items were added to m_TaskQueue, + // we need to withdraw them all from m_TaskQueue. + + IF_HEAVY_LOGGING(int nerased = 0); + for (SendTask::taskiter_t idt = id_list.begin(); idt != id_list.end(); ++idt) + { + if (m_TaskQueue.erase(idt)) + { + IF_HEAVY_LOGGING(++nerased); + } + } + // The list should be empty, so delete the entry itself. + int iderased SRT_ATR_UNUSED = m_TaskMap.erase(id); + // We don't know if the earliest in the queue was deleted, + // so just rewrite it anyway. + pop_update_time(); + +#if HVU_ENABLE_HEAVY_LOGGING + hvu::ofmtbufstream nextone; + if (m_TaskQueue.empty()) + nextone << "NO NEXT TASK"; + else + nextone << "next in " << FormatDurationAuto(m_tsAboutTime - steady_clock::now()) + << " from @" << m_TaskQueue.top()->m_Packet.id(); +#endif + HLOGC(qslog.Debug, log << "Schedule: withdrawn @" << int(id) + << (iderased ? "" : " (NOT FOUND!)") << " - erased " << nerased << " tasks -" << nextone); +} + +void SendScheduler::pop_update_time() +{ + if (!m_TaskQueue.empty()) + m_tsAboutTime = m_TaskQueue.top()->m_tsSendTime; // checked that ! empty + else + m_tsAboutTime = clock_time(); +} + +void SendScheduler::cancel(SendTask::taskiter_t itask) +{ + sync::ScopedLock lk (m_Lock); + cancel_nolock(itask); +} + +void SendScheduler::interrupt() +{ + m_bBroken = true; + HLOGC(qslog.Debug, log << "Schedule: INTERRUPT: locking..."); + sync::ScopedLock hold (m_Lock); + HLOGC(qslog.Debug, log << "Schedule: INTERRUPT: notifying waiters"); + + m_TaskReadyCond.notify_all(); // Force waiting functions to exit +} + + +void SendScheduler::cancel_nolock(SendTask::taskiter_t itask) +{ + HLOGC(qslog.Debug, log << "Schedule: CANCEL: @" << itask->m_Packet.id() << " T=" << FormatTime(itask->m_tsSendTime)); + m_TaskQueue.erase(itask); + itask->m_pBaseList->erase(itask); + pop_update_time(); +} + +SchedPacket SendScheduler::wait_pop() +{ + SchedPacket packet; + sync::UniqueLock lk (m_Lock); + // Wait until the time has come to execute + // the next task. Extract the task structure + // and remove the task from the list. + for (;;) + { + if (m_bBroken) + { + HLOGC(qslog.Debug, log << "Schedule: wait_pop: broken"); + break; + } + bool have = wait_extlock((lk)); + if (have) + { + break; + } + else + { + HLOGC(qslog.Debug, log << "Schedule: wait_pop: SPURIOUS"); + } + } + // Here we are sure that the top() task is ready to execute + SendTask::taskiter_t itask = m_TaskQueue.pop(); + pop_update_time(); + + if (itask == SendTask::none()) + { + HLOGC(qslog.Debug, log << "Schedule: wait_pop: IPE: THE QUEUE IS EMPTY"); + return SchedPacket(); + } + // The node is already removed from the heapset. + + // Extract the required data + packet = itask->m_Packet; + + // Now remove it from the corresponding list. + itask->m_pBaseList->erase(itask); + + IF_HEAVY_LOGGING(static string typenames[3] = {"REGULAR", "REXMIT", "CONTROL"}); + HLOGC(qslog.Debug, log << "Schedule: wait_pop: PICKUP from @" << packet.id() << " %" << packet.seqno() << " type=" << typenames[packet.type()]); + + return packet; +} + +} diff --git a/srtcore/schedule_snd.h b/srtcore/schedule_snd.h new file mode 100644 index 000000000..7130d8bf3 --- /dev/null +++ b/srtcore/schedule_snd.h @@ -0,0 +1,193 @@ + +#ifndef INC_SRT_SCHEDULE_SND_H +#define INC_SRT_SCHEDULE_SND_H + +#include +#include +#include "sync.h" +#include "atomic.h" +#include "common.h" // SocketKeeper +#include "utilities.h" +#include "buffer_snd.h" + +namespace srt +{ + +namespace sched +{ + enum Type + { + TP_REGULAR = 0, + TP_REXMIT = 1, + TP_CONTROL = 2 + }; +} + +// This structure contains the information about the +// socket, packet contents and sequence number of +// the packet that is about to be sent. + +// Note: scheduling should happen at the exact place +// where the scheduling event should appear: +// - When calling srt_send: schedule regular packet +// - - If packetfilter control packet is ready AFTER that, schedule that, too. +// - When dispatching LOSSREPORT: schedule rexmit packet +// - When NAKREPORT timer expired: schedule rexmit packet +struct SchedPacket +{ + SocketKeeper m_Socket; + int32_t m_iSeqNo; + sched::Type m_Type; + + // NOTE: Both constructor and set_socket() call will need to + // perform the official acquisition of the socket, which requires + // locking CUDTUnited::m_GlobControlLock. Further copying of the + // SocketKeeper object doesn't require any locking. + SchedPacket(CUDTSocket* sock = NULL, int32_t seqno = SRT_SEQNO_NONE, sched::Type t = sched::TP_REGULAR); + void set_socket(CUDTSocket* sock); + + SRTSOCKET id() const { return m_Socket.id(); } + + bool empty() const { return m_iSeqNo == SRT_SEQNO_NONE; } + + int32_t seqno() const { return m_iSeqNo; } + sched::Type type() const { return m_Type; } +}; + +struct SendTask +{ + typedef std::list tasklist_t; + typedef typename tasklist_t::iterator taskiter_t; + typedef sync::steady_clock::time_point key_type; + key_type m_tsSendTime; + SchedPacket m_Packet; + sync::atomic m_zHeapPos; // Required by HeapSet + std::list* m_pBaseList; + + // Same definition as by HeapSet; here a shortcut. + // Can't use the definition from HeapSet because it's + // a template that has requirements for the type parameter. + static const size_t npos = std::string::npos; + + SendTask() + : m_tsSendTime(), m_Packet(), m_zHeapPos(npos), m_pBaseList(0) {} + + SendTask(const SchedPacket& sp, sync::steady_clock::time_point when) + : m_tsSendTime(when), m_Packet(sp), m_zHeapPos(npos), m_pBaseList(0) {} + + // Note: Copying a task is only allowed because of the need + // to move from one container to another. A single task that is + // pinned to a sender buffer may however exist only in one instance. + SendTask(const SendTask& src): + m_tsSendTime(src.m_tsSendTime), + m_Packet(src.m_Packet), + m_zHeapPos(src.m_zHeapPos.load()), + m_pBaseList(src.m_pBaseList) + {} + + bool is_ready(key_type basetime) const + { + return m_tsSendTime < basetime; + } + + SendTask& operator=(const SendTask& src) + { + m_tsSendTime = src.m_tsSendTime; + m_Packet = src.m_Packet; + m_zHeapPos = src.m_zHeapPos.load(); + return *this; + } + + static sync::atomic& position(taskiter_t v) { return v->m_zHeapPos; } + static key_type& key(taskiter_t v) { return v->m_tsSendTime; } + static bool order(const key_type& left, const key_type& right) + { + return left < right; + } + + static std::list free_list; + static taskiter_t none() { return free_list.end(); } + + static std::string print(taskiter_t v); +}; + +struct SendScheduler +{ + typedef SRTSOCKET socket_t; + typedef sync::steady_clock clock_type; + typedef clock_type::time_point clock_time; + +protected: + std::map m_TaskMap; + HeapSet m_TaskQueue; + clock_time m_tsAboutTime; + void pop_update_time(); + + sync::Mutex m_Lock; + sync::Condition m_TaskReadyCond; + sync::atomic m_bBroken; + +public: + const HeapSet& queue() { return m_TaskQueue; } + + SendScheduler(): m_bBroken(false) + { + } + + void interrupt(); + + bool running() + { + return !m_bBroken; + } + + SendTask::taskiter_t enqueue_task(socket_t id, const SendTask& proto); + + void update_task(SendTask::taskiter_t ti); + +protected: + // This is NOLOCK; derived classes please use lock. + bool have_task_ready(); + +public: + // Wait until the time has come + bool wait(); + bool wait_extlock(srt::sync::UniqueLock&); + + void withdraw(socket_t id); + + template + void withdraw_if(socket_t id, Predicate match) + { + sync::ScopedLock lk (m_Lock); + // Delete all tasks for the given socket id. + // We have them collected in the list: m_TaskMap + + SendTask::tasklist_t& id_list = m_TaskMap[id]; + + // As we know that all these items were added to m_TaskQueue, + // we need to withdraw them all from m_TaskQueue. + SendTask::taskiter_t idt_next = id_list.begin(); + for (SendTask::taskiter_t idt = idt_next; idt != id_list.end(); idt = idt_next) + { + ++idt_next; + if (match(idt)) + { + cancel_nolock(idt); + } + } + } + +protected: + void cancel_nolock(SendTask::taskiter_t itask); + +public: + void cancel(SendTask::taskiter_t itask); + + SchedPacket wait_pop(); + +}; + +} + +#endif diff --git a/srtcore/socketconfig.cpp b/srtcore/socketconfig.cpp index 32220a8c4..4b0361d3f 100644 --- a/srtcore/socketconfig.cpp +++ b/srtcore/socketconfig.cpp @@ -54,6 +54,7 @@ written by #include "ofmt.h" using namespace hvu; // fmt +using namespace srt::logging; namespace srt { @@ -96,7 +97,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; const int ival = cast_optval(optval, optlen); const int handshake_size = CHandShake::m_iContentSize + (sizeof(uint32_t) * SRT_HS_E_SIZE); const int minval = int(CPacket::udpHeaderSize(AF_INET6) + CPacket::HDR_SIZE + handshake_size); @@ -121,7 +121,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; const int fc = cast_optval(optval, optlen); if (fc < co.DEF_MIN_FLIGHT_PKT) { @@ -142,7 +141,14 @@ struct CSrtConfigSetter if (bs <= 0) throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + // THIS is when the option value is intended to limit the + // maximum size of the buffer (will use less than this if + // required for alignment) co.iSndBufSize = bs / co.bytesPerPkt(); + + // OR: it can use enough capacity to satisfy this value + // as a minimum (and use more for alignment if needed). + // co.iSndBufSize = number_slices(bs, co.bytesPerPkt()); } }; @@ -309,7 +315,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; #ifdef SRT_ENABLE_BINDTODEVICE using namespace std; @@ -383,7 +388,6 @@ struct CSrtConfigSetter #ifdef SRT_ENABLE_ENCRYPTION if (val == false && co.iCryptoMode == CSrtConfig::CIPHER_MODE_AES_GCM) { - using namespace srt::logging; LOGC(aclog.Error, log << "Can't disable TSBPD as long as AES GCM is enabled."); throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); } @@ -454,7 +458,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; #ifdef SRT_ENABLE_ENCRYPTION // Password must be 10-80 characters. // Or it can be empty to clear the password. @@ -481,7 +484,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; #ifdef SRT_ENABLE_ENCRYPTION const int v = cast_optval(optval, optlen); int const allowed[4] = { @@ -635,7 +637,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; const int val = cast_optval(optval, optlen); if (val < 0) { @@ -737,8 +738,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; - const int val = cast_optval(optval, optlen); if (val < 0) { @@ -771,8 +770,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; - const int val = cast_optval(optval, optlen); if (val < 0) { @@ -832,7 +829,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; std::string arg((const char*)optval, optlen); // Parse the configuration string prematurely SrtFilterConfig fc; @@ -871,7 +867,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; // This option is meaningless for the socket itself. // It's set here just for the sake of setting it on a listener // socket so that it is then applied on the group when a @@ -921,7 +916,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& co, const void* optval, int optlen) { - using namespace srt::logging; const int val = cast_optval(optval, optlen); if (val < CSrtConfig::CIPHER_MODE_AUTO || val > CSrtConfig::CIPHER_MODE_AES_GCM) throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); @@ -948,7 +942,6 @@ struct CSrtConfigSetter { static void set(CSrtConfig& , const void* , int ) { - using namespace srt::logging; #ifdef SRT_ENABLE_ENCRYPTION LOGC(aclog.Error, log << "SRT was built without AEAD enabled."); #else @@ -959,6 +952,23 @@ struct CSrtConfigSetter }; #endif +template<> +struct CSrtConfigSetter +{ + static void set(CSrtConfig& co, const void* optval, int optlen) + { + const int val = cast_optval(optval, optlen); + + if (val < 0 || val > 1) + { + LOGC(aclog.Error, log << "OPTION: sendmode: only 0 and 1 allowed"); + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + + co.uSenderMode = val; + } +}; + int dispatchSet(SRT_SOCKOPT optName, CSrtConfig& co, const void* optval, int optlen) { switch (optName) @@ -1019,6 +1029,7 @@ int dispatchSet(SRT_SOCKOPT optName, CSrtConfig& co, const void* optval, int opt #ifdef SRT_ENABLE_MAXREXMITBW DISPATCH(SRTO_MAXREXMITBW); #endif + DISPATCH(SRTO_SENDMODE); #undef DISPATCH default: diff --git a/srtcore/socketconfig.h b/srtcore/socketconfig.h index 5c0623652..ae95a1775 100644 --- a/srtcore/socketconfig.h +++ b/srtcore/socketconfig.h @@ -87,6 +87,7 @@ struct CSrtMuxerConfig int iIpToS; int iIpV6Only; // IPV6_V6ONLY option (-1 if not set) bool bReuseAddr; // reuse an exiting port or not, for UDP multiplexer + uint32_t uSenderMode; #ifdef SRT_ENABLE_BINDTODEVICE std::string sBindToDevice; @@ -102,6 +103,7 @@ struct CSrtMuxerConfig return CEQUAL(iIpTTL) && CEQUAL(iIpToS) && CEQUAL(bReuseAddr) + && CEQUAL(uSenderMode) #ifdef SRT_ENABLE_BINDTODEVICE && CEQUAL(sBindToDevice) #endif @@ -122,6 +124,7 @@ struct CSrtMuxerConfig , iIpToS(-1) /* IPv4 Type of Service or IPv6 Traffic Class [0x00..0xff] (-1:undefined) */ , iIpV6Only(-1) , bReuseAddr(true) // This is default in SRT + , uSenderMode(0) , iUDPSndBufSize(DEF_UDP_BUFFER_SIZE) , iUDPRcvBufSize(DEF_UDP_BUFFER_SIZE) { diff --git a/srtcore/srt.h b/srtcore/srt.h index c6a2c9297..799231a50 100644 --- a/srtcore/srt.h +++ b/srtcore/srt.h @@ -263,6 +263,7 @@ typedef enum SRT_SOCKOPT { SRTO_RETRANSMITALGO = 61, // An option to select packet retransmission algorithm SRTO_CRYPTOMODE = 62, // Encryption cipher mode (AES-CTR, AES-GCM, ...). SRTO_MAXREXMITBW = 63, // Maximum bandwidth limit for retransmision (Bytes/s) + SRTO_SENDMODE = 64, // Use of the sending mode SRTO_E_SIZE // Always last element, not a valid option. } SRT_SOCKOPT; diff --git a/test/test_common.cpp b/test/test_common.cpp index 8038cca23..6058d5c2f 100644 --- a/test/test_common.cpp +++ b/test/test_common.cpp @@ -205,4 +205,3 @@ TEST(Common, CookieContest) // NOTE: 0x80000001 is a negative number in hex testCookieContest(0x00000001, 0x80000001); } -