Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
05e0666
Added base utilities and implementation for sender schedule (untested)
Jun 16, 2025
700eb7f
Merged-in dev-rework-multiplexer, post-fixed
Jun 25, 2025
371606f
Added SocketKeeper tracking. Added speed stats recording
Jul 9, 2025
85daec8
Merged und fixed
Jul 10, 2025
6a94985
Added stop handling and muxer config entry. STILL NOT TESTED.
Jul 11, 2025
5b4cb49
Fixed bug: twice locking in wait. STILL TESTS FAIL
Jul 11, 2025
ddc6ab2
Merged latest changes from dev and dev-rework-multiplexer
Aug 12, 2025
c5b1e2c
Added HeapSet::find_next function
Aug 13, 2025
c7e85ad
Some log fixes
Aug 14, 2025
25a6027
Updated from dev and post-fixed
Nov 6, 2025
b4943f1
Cleared a compile warning
Nov 6, 2025
34c46ea
Updated to latest dev
Nov 6, 2025
12c4285
Removed another warning error
Nov 6, 2025
4f60162
Extracted CiBuffer from CRcvBuffer. Removed offset-dep from CSndBuffe…
Nov 12, 2025
1fbe6b4
Added alternative sender buffer
Nov 21, 2025
86bc2c9
Implemented per-packet busy flags and locker object for sending packets
Nov 25, 2025
1213952
Fixed build break
Nov 25, 2025
97c005f
Fixed bug: missing setting srctime when extracting. Fixed bug: missin…
Nov 26, 2025
16d3f20
Rewritten insert_loss. Reshaped the code to collect all SndPktArray m…
Dec 4, 2025
e1162dd
Updated with latest cutoff-dev
Dec 4, 2025
c9d1ec6
Fixed some build breaks
Dec 4, 2025
5fc7180
Fixed some build breaks (test)
Dec 4, 2025
05a4f03
Restored working code with the old CSndBuffer
Dec 4, 2025
ee5ef58
Fixed preallocation and maximum storage capacity parameters
Dec 4, 2025
a76e5b1
Tracking a CI failure on Ubuntu
Dec 4, 2025
5d3c5be
[BUG] Fixed std::shared_mutex usage for C++17. Removed external clone…
Dec 8, 2025
eb255c1
Merge branch 'dev-fix-shared-mutex-selection' into dev-new-sender-buffer
Dec 8, 2025
0144e17
Logging: no passing string by value. Removed verbose entries for RcvB…
Dec 8, 2025
3455f5c
Unified rexmit extraction. Removed old implementation
Dec 10, 2025
7444735
Merge branch 'dev' into dev-new-sender-buffer
Dec 11, 2025
f17d9b9
Fixed bug in CRcvBuf: m_iEndOff incorrectly updated after removal. Re…
Dec 12, 2025
bad3676
Fixed build break on pedantic configurations
Dec 12, 2025
8396cf4
Merge branch 'dev' into dev-sender-schedule
Dec 12, 2025
a6e9cfd
Merged the changes with the new sender buffer
Dec 15, 2025
f1e5b89
Literal in comment
Dec 15, 2025
db168ac
Updated to latest dev
Apr 9, 2026
9468f92
Fixed: deleted outdated doc file
Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/socketoptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ const SocketOption srt_options [] {
{ "bindtodevice", 0, SRTO_BINDTODEVICE, SocketOption::PRE, SocketOption::STRING, nullptr},
{ "retransmitalgo", 0, SRTO_RETRANSMITALGO, SocketOption::PRE, SocketOption::INT, nullptr },
{ "cryptomode", 0, SRTO_CRYPTOMODE, SocketOption::PRE, SocketOption::INT, nullptr },
{ "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr }
{ "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr },
{ "sendmode", 0, SRTO_SENDMODE, SocketOption::PRE, SocketOption::INT, nullptr }
};
}

Expand Down
43 changes: 22 additions & 21 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_
}

CUDTSocket* ls;
SocketKeeper keep_ls;
SocketKeeper keep_ls = CUDT::keep();

// We keep the mutex locked for the whole time of instant checks.
// Once they pass, extend the life for the scope by SocketKeeper.
Expand Down Expand Up @@ -1462,7 +1462,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_

// NOTE: release() locks m_GlobControlLock.
// Once we extracted the accepted socket, we don't need to keep ls busy.
keep_ls.release(*this);
keep_ls.release();
ls = NULL; // NOT USABLE ANYMORE!

if (!accepted) // The loop was interrupted
Expand Down Expand Up @@ -2387,7 +2387,8 @@ SRTSTATUS CUDTUnited::close(const SRTSOCKET u, int reason)
};
#endif

SocketKeeper k(*this, u, ERH_THROW);
SocketKeeper k = SOCKET_KEEP(u, ERH_THROW);

IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket));
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());

Expand All @@ -2397,7 +2398,7 @@ SRTSTATUS CUDTUnited::close(const SRTSOCKET u, int reason)
// Releasing under the global lock to avoid even theoretical
// data race.

k.release(*this);
k.release();
return cstatus;
}

Expand Down Expand Up @@ -2525,7 +2526,7 @@ void CUDTSocket::breakNonAcceptedSockets()
HLOGC(smlog.Debug, log << "breakNonAcceptedSockets: found " << accepted.size() << " leaky accepted sockets");
for (vector<SRTSOCKET>::iterator i = accepted.begin(); i != accepted.end(); ++i)
{
CUDTUnited::SocketKeeper sk(m_UDT.uglobal(), *i);
SocketKeeper sk = SOCKET_KEEP(*i, ERH_RETURN);
if (sk.socket)
{
sk.socket->m_UDT.m_bBroken = true;
Expand Down Expand Up @@ -4443,7 +4444,7 @@ SRTSTATUS CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}

CUDTUnited::GroupKeeper k(uglobal(), groupid, CUDTUnited::ERH_RETURN);
CUDTUnited::GroupKeeper k(uglobal(), groupid, ERH_RETURN);
if (!k.group)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
Expand Down Expand Up @@ -4617,7 +4618,7 @@ SRTSOCKET CUDT::connectLinks(SRTSOCKET grp, SRT_SOCKGROUPCONFIG targets[], int a

try
{
CUDTUnited::GroupKeeper k(uglobal(), grp, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), grp, ERH_THROW);
return uglobal().groupConnect(k.group, targets, arraysize);
}
catch (CUDTException& e)
Expand Down Expand Up @@ -4740,13 +4741,13 @@ SRTSTATUS CUDT::getsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optva
#if SRT_ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
k.group->getOpt(optname, (pw_optval), (*pw_optlen));
return SRT_STATUS_OK;
}
#endif

CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
udt.getOpt(optname, (pw_optval), (*pw_optlen));
return SRT_STATUS_OK;
}
Expand All @@ -4771,13 +4772,13 @@ SRTSTATUS CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* op
#if SRT_ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
k.group->setOpt(optname, optval, optlen);
return SRT_STATUS_OK;
}
#endif

CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
udt.setOpt(optname, optval, optlen);
return SRT_STATUS_OK;
}
Expand Down Expand Up @@ -4816,12 +4817,12 @@ int CUDT::sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
#if SRT_ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
return k.group->send(buf, len, (w_m));
}
#endif

return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
return uglobal().locateSocket(u, ERH_THROW)->core().sendmsg2(buf, len, (w_m));
}
catch (const CUDTException& e)
{
Expand Down Expand Up @@ -4860,12 +4861,12 @@ int CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
#if SRT_ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
return k.group->recv(buf, len, (w_m));
}
#endif

return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
return uglobal().locateSocket(u, ERH_THROW)->core().recvmsg2(buf, len, (w_m));
}
catch (const CUDTException& e)
{
Expand All @@ -4882,7 +4883,7 @@ int64_t CUDT::sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size,
{
try
{
CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
return udt.sendfile(ifs, offset, size, block);
}
catch (const CUDTException& e)
Expand All @@ -4904,7 +4905,7 @@ int64_t CUDT::recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size,
{
try
{
return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
return uglobal().locateSocket(u, ERH_THROW)->core().recvfile(ofs, offset, size, block);
}
catch (const CUDTException& e)
{
Expand Down Expand Up @@ -5209,7 +5210,7 @@ SRTSTATUS CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instant

try
{
CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
udt.bstats(perf, clear, instantaneous);
return SRT_STATUS_OK;
}
Expand All @@ -5229,7 +5230,7 @@ SRTSTATUS CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
{
try
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
k.group->bstatsSocket(perf, clear);
return SRT_STATUS_OK;
}
Expand All @@ -5251,7 +5252,7 @@ CUDT* CUDT::getUDTHandle(SRTSOCKET u)
{
try
{
return &uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
return &uglobal().locateSocket(u, ERH_THROW)->core();
}
catch (const CUDTException& e)
{
Expand All @@ -5273,7 +5274,7 @@ SRT_SOCKSTATUS CUDT::getsockstate(SRTSOCKET u)
#if SRT_ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
return k.group->getStatus();
}
#endif
Expand Down
68 changes: 4 additions & 64 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,6 @@ class CUDTUnited
static const size_t MAX_CLOSE_RECORD_SIZE = 10;

public:
enum ErrorHandling
{
ERH_RETURN,
ERH_THROW,
ERH_ABORT
};
static std::string CONID(SRTSOCKET sock);

/// initialize the UDT library.
Expand Down Expand Up @@ -568,10 +562,14 @@ class CUDTUnited
};
#endif

public:

CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
bool acquireSocket(CUDTSocket* s);
void releaseSocket(CUDTSocket* s);

private:

SRT_TSA_NEEDS_LOCKED(m_InitLock)
bool startGarbageCollector();

Expand All @@ -584,64 +582,6 @@ class CUDTUnited
void cleanupAllSockets();
void closeAllSockets();

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

void acquire_LOCKED(CUDTSocket* s)
{
socket = s;
s->apiAcquire();
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
if (s == NULL)
{
socket = NULL;
return false;
}

const bool caught = glob.acquireSocket(s);
socket = caught ? s : NULL;
return caught;
}

bool release(CUDTUnited& glob)
{
if (!socket)
return false;

glob.releaseSocket(socket);
socket = NULL;
return true;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

Expand Down
52 changes: 52 additions & 0 deletions srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,58 @@ int32_t CSndBuffer::getMsgNoAtSeq(const int32_t seqno)
return m_Packets[offset].getMsgSeq();
}

bool CSndBuffer::getPacketRangeSize(int32_t seqlo, int32_t seqhi, int& w_packets, int& w_bytes)
{
ScopedLock bufferguard(m_BufLock);
int offset_lo = CSeqNo::seqoff(m_iSndLastDataAck, seqlo);
int offset_hi = CSeqNo::seqoff(m_iSndLastDataAck, seqhi);

w_packets = 0;
w_bytes = 0;

if (offset_hi < 0 || offset_hi < offset_lo)
{
HLOGC(bslog.Debug, log << "getPacketRangeSize: invalid seq range %(" << seqlo << "-" << seqhi
<< ") map to off=(" << offset_lo << ", " << offset_hi << ")");
return false;
}

// Rule out empty packets case, not sure if possible, but still
if (m_Packets.empty())
{
// Treat this as false because if the buffer is empty,
// the sequence numbers definitely don't refer to any existing
// packets in the buffer.
return false;
}

bool full_range = true;
if (offset_lo < 0)
{
offset_lo = 0;
full_range = false;
}

if (offset_hi >= int(m_Packets.size()))
{
offset_hi = m_Packets.size() - 1;
full_range = false;
}

int npackets = offset_hi - offset_lo + 1,
nbytes = 0;

for (int i = offset_lo; i <= offset_hi; ++i)
{
nbytes += m_Packets[i].m_iLength;
}

w_packets = npackets;
w_bytes = nbytes;

return full_range;
}

// XXX Likely unused (left for the use by tests)
int CSndBuffer::readOldPacket(int32_t seqno, CSndPacket& w_sndpkt, steady_clock::time_point& w_srctime, DropRange& w_drop)
{
Expand Down
1 change: 1 addition & 0 deletions srtcore/buffer_snd.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ class CSndBuffer

int getAvgBufSize(int& bytes, int& timespan);

bool getPacketRangeSize(int32_t seqlo, int32_t seqhi, int& w_packets, int& w_bytes);
int getCurrBufSize(int& bytes, int& timespan) const
{
sync::ScopedLock lk (m_BufLock);
Expand Down
2 changes: 2 additions & 0 deletions srtcore/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ modified by
#include <ifaddrs.h>
#endif

#include "api.h"
#include "md5.h"
#include "common.h"
#include "netinet_any.h"
Expand Down Expand Up @@ -554,6 +555,7 @@ vector<LocalInterface> GetLocalInterfaces()
return locals;
}

SRTSOCKET SocketKeeper::id() const { return socket ? socket->id() : SRT_INVALID_SOCK; }


// Value display utilities
Expand Down
Loading
Loading