Skip to content

Commit

Permalink
[core] Improved mutex protection of the TSBPD (#3038).
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko authored Oct 8, 2024
1 parent da89cbd commit 3c7022e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
4 changes: 2 additions & 2 deletions srtcore/buffer_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ void CSndRateEstimator::addSample(const time_point& ts, int pkts, size_t bytes)
}
}

m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
m_Samples[m_iCurSampleIdx].m_iBytesCount += (int) bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
}

int CSndRateEstimator::getCurrentRate() const
Expand Down
33 changes: 21 additions & 12 deletions srtcore/tsbpd_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPkt
if (!m_bTsbPdMode)
return false;

ScopedLock lck(m_mtxRW);
ExclusiveLock lck(m_mtxRW);

// Remember the first RTT sample measured. Ideally we need RTT0 - the one from the handshaking phase,
// because TSBPD base is initialized there. But HS-based RTT is not yet implemented.
Expand All @@ -122,7 +122,7 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPkt
// is to estimate RTT change and assume that the change of the one way network delay is
// approximated by the half of the RTT change.
const duration tdRTTDelta = usRTTSample >= 0 ? microseconds_from((usRTTSample - m_iFirstRTT) / 2) : duration(0);
const time_point tsPktBaseTime = getPktTsbPdBaseTime(usPktTimestamp);
const time_point tsPktBaseTime = getPktTsbPdBaseTimeNoLock(usPktTimestamp);
const steady_clock::duration tdDrift = tsPktArrival - tsPktBaseTime - tdRTTDelta;

const bool updated = m_DriftTracer.update(count_microseconds(tdDrift));
Expand Down Expand Up @@ -158,6 +158,7 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPkt

void CTsbpdTime::setTsbPdMode(const steady_clock::time_point& timebase, bool wrap, duration delay)
{
ExclusiveLock lck(m_mtxRW);
m_bTsbPdMode = true;
m_bTsbPdWrapCheck = wrap;

Expand All @@ -183,6 +184,7 @@ void CTsbpdTime::applyGroupTime(const steady_clock::time_point& timebase,
// newly added to the group must get EXACTLY the same internal timebase
// or otherwise the TsbPd time calculation will ship different results
// on different member sockets.
ExclusiveLock lck(m_mtxRW);

m_bTsbPdMode = true;

Expand All @@ -196,6 +198,7 @@ void CTsbpdTime::applyGroupDrift(const steady_clock::time_point& timebase,
bool wrp,
const steady_clock::duration& udrift)
{
ExclusiveLock lck(m_mtxRW);
// This is only when a drift was updated on one of the group members.
HLOGC(brlog.Debug,
log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift)
Expand All @@ -207,7 +210,7 @@ void CTsbpdTime::applyGroupDrift(const steady_clock::time_point& timebase,
m_DriftTracer.forceDrift(count_microseconds(udrift));
}

CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const
CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBaseNoLock(uint32_t timestamp_us) const
{
// A data packet within [TSBPD_WRAP_PERIOD; 2 * TSBPD_WRAP_PERIOD] would end TSBPD wrap-aware state.
// Some incoming control packets may not update the TSBPD base (calling updateTsbPdTimeBase(..)),
Expand All @@ -218,27 +221,33 @@ CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const
return (m_tsTsbPdTimeBase + microseconds_from(carryover_us));
}

CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const
CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const
{
time_point value = getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift());
SharedLock lck(m_mtxRW);
return getTsbPdTimeBaseNoLock(timestamp_us);
}

/*
HLOGC(brlog.Debug, log << "getPktTsbPdTime:"
<< " BASE=" << FormatTime(m_tsTsbPdTimeBase)
<< " TS=" << usPktTimestamp << "us, lat=" << FormatDuration<DUNIT_US>(m_tdTsbPdDelay)
<< " DRF=" << m_DriftTracer.drift() << "us = " << FormatTime(value));
*/
CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const
{
SharedLock lck(m_mtxRW);
time_point value = getPktTsbPdBaseTimeNoLock(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift());

return value;
}

CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTimeNoLock(uint32_t usPktTimestamp) const
{
return getTsbPdTimeBaseNoLock(usPktTimestamp) + microseconds_from(usPktTimestamp);
}

CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) const
{
return getTsbPdTimeBase(usPktTimestamp) + microseconds_from(usPktTimestamp);
}

void CTsbpdTime::updateTsbPdTimeBase(uint32_t usPktTimestamp)
{
ExclusiveLock lck(m_mtxRW);
if (m_bTsbPdWrapCheck)
{
// Wrap check period.
Expand Down Expand Up @@ -267,7 +276,7 @@ void CTsbpdTime::updateTsbPdTimeBase(uint32_t usPktTimestamp)

void CTsbpdTime::getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const
{
ScopedLock lck(m_mtxRW);
ExclusiveLock lck(m_mtxRW);
w_tb = m_tsTsbPdTimeBase;
w_udrift = microseconds_from(m_DriftTracer.drift());
w_wrp = m_bTsbPdWrapCheck;
Expand Down
22 changes: 20 additions & 2 deletions srtcore/tsbpd_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CTsbpdTime
typedef srt::sync::steady_clock steady_clock;
typedef steady_clock::time_point time_point;
typedef steady_clock::duration duration;
typedef srt::sync::Mutex Mutex;
typedef srt::sync::SharedMutex SharedMutex;

public:
CTsbpdTime()
Expand Down Expand Up @@ -117,6 +117,24 @@ class CTsbpdTime
void getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const;

private:
/// @brief Get TSBPD base time adjusted for carryover, which occurs when
/// a packet's timestamp exceeds the UINT32_MAX and continues from zero.
/// Does not lock the internal state.
/// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds).
///
/// @return TSBPD base time for a provided packet timestamp.
time_point getTsbPdTimeBaseNoLock(uint32_t usPktTimestamp) const;

/// @brief Get packet TSBPD time without buffering delay and clock drift, which is
/// the target time for delivering the packet to an upstream application.
/// Essentially: getTsbPdTimeBase(usPktTimestamp) + usPktTimestamp
/// Does not lock the internal state.
/// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds).
///
/// @return Packet TSBPD base time without buffering delay.
time_point getPktTsbPdBaseTimeNoLock(uint32_t usPktTimestamp) const;


int m_iFirstRTT; // First measured RTT sample.
bool m_bTsbPdMode; // Receiver buffering and TSBPD is active when true.
duration m_tdTsbPdDelay; // Negotiated buffering delay.
Expand Down Expand Up @@ -155,7 +173,7 @@ class CTsbpdTime
DriftTracer<TSBPD_DRIFT_MAX_SAMPLES, TSBPD_DRIFT_MAX_VALUE> m_DriftTracer;

/// Protect simultaneous change of state (read/write).
mutable Mutex m_mtxRW;
mutable SharedMutex m_mtxRW;
};

} // namespace srt
Expand Down

0 comments on commit 3c7022e

Please sign in to comment.