New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Logic refax: new rules for the receiver buffer and TSBPD #2527
base: master
Are you sure you want to change the base?
Logic refax: new rules for the receiver buffer and TSBPD #2527
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2527 +/- ##
==========================================
+ Coverage 64.72% 64.89% +0.16%
==========================================
Files 101 101
Lines 17543 17673 +130
==========================================
+ Hits 11355 11469 +114
- Misses 6188 6204 +16 ☔ View full report in Codecov by Sentry. |
…for out-of-order concerns
(further impossible fixes pending) Co-authored-by: Maxim Sharabayko <maxlovic@gmail.com>
// XXX unsure as to whether this should change anything in the TSBPD conditions. | ||
// Might be that this trigger is not necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This signal is indeed no longer necessary because the TSBPD thread now gets notified when an earlier packet has been received.
@@ -5529,6 +5544,8 @@ void * srt::CUDT::tsbpd(void* param) | |||
tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for. | |||
} | |||
|
|||
SRT_ATR_UNUSED bool bWakeupOnSignal = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to bWokeUpOnSignal
to describe an event, not the intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was intended to be a substantive describing the performed activity: "a wakeup". But I agree this sounds ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SRT_ATR_UNUSED bool bWakeupOnSignal = true; | |
SRT_ATR_UNUSED bool bWokenupOnSignal = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use either bWokeUpOnSignal
or bWokenUpOnSignal
.
// API reader thread sleeping until there is a "bigger portion" | ||
// of data to read. In TSBPD mode this isn't done because every | ||
// packet has its individual delivery time and its readiness is signed | ||
// off by the TSBPD thread. | ||
HLOGC(xtlog.Debug, | ||
log << CONID() << "ACK: clip %" << m_iRcvLastAck << "-%" << ack << ", REVOKED " | ||
<< CSeqNo::seqoff(ack, m_iRcvLastAck) << " from RCV buffer"); | ||
|
||
if (m_bTsbPd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just remove this if
case. Keep the else
case.
if (m_bTsbPd && ((m_bWakeOnRecv && new_inserted) || next_tsbpd_avail != time_point())) | ||
{ | ||
HLOGC(qrlog.Debug, log << "processData: will SIGNAL TSBPD for socket. WakeOnRecv=" << m_bWakeOnRecv | ||
<< " new_inserted=" << new_inserted << " next_tsbpd_avail=" << FormatTime(next_tsbpd_avail)); | ||
CUniqueSync tsbpd_cc(m_RecvLock, m_RcvTsbPdCond); | ||
tsbpd_cc.notify_all(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be that m_bWakeOnRecv
still has to be protected by m_RecvLock
to ensure the TSBPD thread has set it appropriately before it is checked here. Making it atomic does not resolve this transition state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
if (m_bTsbPd && ((m_bWakeOnRecv && new_inserted) || next_tsbpd_avail != time_point())) | |
{ | |
HLOGC(qrlog.Debug, log << "processData: will SIGNAL TSBPD for socket. WakeOnRecv=" << m_bWakeOnRecv | |
<< " new_inserted=" << new_inserted << " next_tsbpd_avail=" << FormatTime(next_tsbpd_avail)); | |
CUniqueSync tsbpd_cc(m_RecvLock, m_RcvTsbPdCond); | |
tsbpd_cc.notify_all(); | |
} | |
if (m_bTsbPd) | |
{ | |
CUniqueSync tsbpd_cc(m_RecvLock, m_RcvTsbPdCond); | |
if ((m_bWakeOnRecv && new_inserted) || next_tsbpd_avail != time_point()) | |
{ | |
HLOGC(qrlog.Debug, log << "processData: will SIGNAL TSBPD for socket. WakeOnRecv=" << m_bWakeOnRecv | |
<< " new_inserted=" << new_inserted << " next_tsbpd_avail=" << FormatTime(next_tsbpd_avail)); | |
tsbpd_cc.notify_all(); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
// This function is to return the packet's play time (time when | ||
// it is submitted to the reading application) of the given packet. | ||
// This grp passed here by void* because in the current imp it's | ||
// unused and shall not be used in case when ENABLE_BONDING=0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// This function is to return the packet's play time (time when | |
// it is submitted to the reading application) of the given packet. | |
// This grp passed here by void* because in the current imp it's | |
// unused and shall not be used in case when ENABLE_BONDING=0. | |
/// This function is to return the packet's play time (time when | |
/// it is submitted to the reading application) of the given packet. | |
/// This grp passed here by void* because in the current imp it's | |
/// unused and shall not be used in case when ENABLE_BONDING=0. |
THREAD_PAUSED(); | ||
tsbpd_cc.wait_until(tsNextDelivery); | ||
bWakeupOnSignal = tsbpd_cc.wait_until(tsNextDelivery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bWakeupOnSignal = tsbpd_cc.wait_until(tsNextDelivery); | |
bWokenupOnSignal = tsbpd_cc.wait_until(tsNextDelivery); |
THREAD_PAUSED(); | ||
tsbpd_cc.wait(); | ||
THREAD_RESUMED(); | ||
} | ||
|
||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); | ||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWakeupOnSignal? "signal" : "timeout") << "]!!! - " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWakeupOnSignal? "signal" : "timeout") << "]!!! - " | |
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWokenupOnSignal? "signal" : "timeout") << "]!!! - " |
DRAFT BECAUSE: the concept is going to be changed to use Offset instead of Position for End and Drop, but the quick change resulted in heavy bugs. The change was necessary due to reported performance problems.
This changes largely the logics of the buffer and TSBPD. This change was necessary for supporting the group common buffer reception, but it should as well improve the TSBPD performance and efficiency.
The previously working receiver buffer and cooperating with it the TSBPD thread were following the previous design of the per-ACK "eclipsing" packets and until then those packets were not available, even if their retrieval would be theoretically possible. This design changes the TSBPD to work directly on the buffer, therefore there have been consolidated less events to trigger the TSBPD wakeup and recheck. There are actually the following situations:
[0]
cell as ready to play and expects that the API call retrieve it: the API call should wake it up after the packet has been retrieved (this is the old way and remains the same here).[0]
cell. There should be no reason to wake it up whatsoever.Note that in the case of forever sleeping, there's no way to distinguish the reason of "still ready to read" and "nothing to wait up to" situations - might be that this can be improved, so - for example - if there is at least one packet "in the past" (not necessarily at cell
[0]
) then TSBPD should NOT be woken up in the event of incoming packet, regardless of its position.Anyway, the change for triggering TSBPD is that:
Beside the taking out of the TSBPD triggering in the ACK handler, there's also one more change added: the ACK number to be sent back to the sender is extracted exclusively from the receiver buffer, and no longer from the loss list and internal fields. This is because the receiver buffer was added a functionality to be able to check this thing quickly. This is also a more reliable source of information and not prone to any race conditions.
Changes in the receiver buffer:
Introduced are terms of "initial contiguous region" and "first gap". The first term existed in the old receiver buffer and it's the series of valid packets starting with the first cell up to the "first gap", that is, the series starting from at least one empty cell followed by a valid packet. None of them exist in an empty buffer. The "first gap" also may not exist if all packets from the first cell up to the one pointed by
m_iMaxPosOff
are valid. If the first gap starts with the first cell, this is also an "initial gap".Two new flags were added:
m_iEndPos
andm_iDropPos
:m_iEndPos
: marks the past-the-end of the initial contiguous region. If this region is empty,m_iEndPos
is equal tom_iStartPos
. This field always points to an empty cell.m_iDropPos
: marks the position of the first valid packet following the first gap. This position is always earlier than the one pointed bym_iMaxPosOff
, but later thanm_iEndPos
. If no such packet exists, this position is equal tom_iEndPos
.These positions are being updated in case of various events that may influence their positions, so for example:
updateGapInfo
).Note that there are many mutually-exclusive conditions here, for example:
m_iDropPos
, and if this happens, it was pointing to a valid packet. After thatm_iStartPos
is set to that position and searching for candidates to set tom_iEndPos
begins with the next packet.There are therefore changed the procedures of: