Skip to content

Commit

Permalink
client: fix race condition when getting NTP (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Dec 25, 2024
1 parent 4362616 commit 5ae5072
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
3 changes: 2 additions & 1 deletion client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
dts := p.timeConv.convert(int64(leadingPartTrack.BaseTime), leadingPartTrackProc.track.track.ClockRate)
p.timeConv.setNTP(*seg.dateTime, dts, leadingPartTrackProc.track.track.ClockRate)
}
p.timeConv.setLeadingNTPReceived()
}

partTrackCount := 0
Expand All @@ -174,7 +175,7 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
}

dts := p.timeConv.convert(int64(partTrack.BaseTime), trackProc.track.track.ClockRate)
ntp := p.timeConv.getNTP(dts, trackProc.track.track.ClockRate)
ntp := p.timeConv.getNTP(ctx, dts, trackProc.track.track.ClockRate)

err := trackProc.push(ctx, &procEntryFMP4{
partTrack: partTrack,
Expand Down
3 changes: 2 additions & 1 deletion client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
if p.curSegment.dateTime != nil {
p.timeConv.setNTP(*p.curSegment.dateTime, dts)
}
p.timeConv.setLeadingNTPReceived()
}

ntp := p.timeConv.getNTP(dts)
ntp := p.timeConv.getNTP(ctx, dts)

return trackProc.push(ctx, &procEntryMPEGTS{
pts: pts,
Expand Down
21 changes: 20 additions & 1 deletion client_time_conv_fmp4.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gohlslib

import (
"context"
"sync"
"time"
)
Expand All @@ -14,9 +15,12 @@ type clientTimeConvFMP4 struct {
ntpValue time.Time
ntpTimestamp int64
ntpClockRate int

chLeadingNTPReceived chan struct{}
}

func (ts *clientTimeConvFMP4) initialize() {
ts.chLeadingNTPReceived = make(chan struct{})
}

func (ts *clientTimeConvFMP4) convert(v int64, clockRate int) int64 {
Expand All @@ -33,7 +37,22 @@ func (ts *clientTimeConvFMP4) setNTP(value time.Time, timestamp int64, clockRate
ts.ntpClockRate = clockRate
}

func (ts *clientTimeConvFMP4) getNTP(timestamp int64, clockRate int) *time.Time {
func (ts *clientTimeConvFMP4) setLeadingNTPReceived() {
select {
case <-ts.chLeadingNTPReceived:
return
default:
}
close(ts.chLeadingNTPReceived)
}

func (ts *clientTimeConvFMP4) getNTP(ctx context.Context, timestamp int64, clockRate int) *time.Time {
select {
case <-ts.chLeadingNTPReceived:
case <-ctx.Done():
return nil
}

ts.mutex.Lock()
defer ts.mutex.Unlock()

Expand Down
21 changes: 20 additions & 1 deletion client_time_conv_mpegts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gohlslib

import (
"context"
"sync"
"time"

Expand All @@ -15,11 +16,14 @@ type clientTimeConvMPEGTS struct {
ntpAvailable bool
ntpValue time.Time
ntpTimestamp int64

chLeadingNTPReceived chan struct{}
}

func (ts *clientTimeConvMPEGTS) initialize() {
ts.td = mpegts.NewTimeDecoder2()
ts.td.Decode(ts.startDTS)
ts.chLeadingNTPReceived = make(chan struct{})
}

func (ts *clientTimeConvMPEGTS) convert(v int64) int64 {
Expand All @@ -38,7 +42,22 @@ func (ts *clientTimeConvMPEGTS) setNTP(value time.Time, timestamp int64) {
ts.ntpTimestamp = timestamp
}

func (ts *clientTimeConvMPEGTS) getNTP(timestamp int64) *time.Time {
func (ts *clientTimeConvMPEGTS) setLeadingNTPReceived() {
select {
case <-ts.chLeadingNTPReceived:
return
default:
}
close(ts.chLeadingNTPReceived)
}

func (ts *clientTimeConvMPEGTS) getNTP(ctx context.Context, timestamp int64) *time.Time {
select {
case <-ts.chLeadingNTPReceived:
case <-ctx.Done():
return nil
}

ts.mutex.Lock()
defer ts.mutex.Unlock()

Expand Down

0 comments on commit 5ae5072

Please sign in to comment.