From f24c4e3401607a446055c43d1aaa00756aefad56 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 7 Oct 2024 18:52:05 +0200 Subject: [PATCH] client: fix computing absolute timestamp with multiple renditions --- client_stream_processor_fmp4.go | 36 +++++++++++++----------------- client_stream_processor_mpegts.go | 20 +++-------------- client_test.go | 9 ++++---- client_time_conv_fmp4.go | 37 +++++++++++++++++++++++++++++++ client_time_conv_mpegts.go | 30 +++++++++++++++++++++++-- client_track.go | 8 +++---- client_track_processor_fmp4.go | 26 +++++++++------------- client_track_processor_mpegts.go | 2 +- 8 files changed, 104 insertions(+), 64 deletions(-) diff --git a/client_stream_processor_fmp4.go b/client_stream_processor_fmp4.go index cb9af2c..bd7cfe6 100644 --- a/client_stream_processor_fmp4.go +++ b/client_stream_processor_fmp4.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "time" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" @@ -121,30 +120,24 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg return err } - ntpAvailable := false - var ntpAbsolute time.Time - var ntpRelative int64 - var leadingClockRate int - - partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID) - if partTrack == nil { + leadingPartTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID) + if leadingPartTrack == nil { return fmt.Errorf("could not find data of leading track") } if p.trackProcessors == nil { - err := p.initializeTrackProcessors(ctx, partTrack) + err := p.initializeTrackProcessors(ctx, leadingPartTrack) if err != nil { return err } } - leadingTrackProc := p.trackProcessors[partTrack.ID] - leadingClockRate = leadingTrackProc.track.track.ClockRate - - if seg.dateTime != nil { - ntpAvailable = true - ntpAbsolute = *seg.dateTime - ntpRelative = p.timeConv.convert(int64(partTrack.BaseTime), leadingClockRate) + if p.isLeading { + if seg.dateTime != nil { + leadingPartTrackProc := p.trackProcessors[leadingPartTrack.ID] + dts := p.timeConv.convert(int64(leadingPartTrack.BaseTime), leadingPartTrackProc.track.track.ClockRate) + p.timeConv.setNTP(*seg.dateTime, dts, leadingPartTrackProc.track.track.ClockRate) + } } partTrackCount := 0 @@ -156,11 +149,13 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg continue } + dts := p.timeConv.convert(int64(partTrack.BaseTime), trackProc.track.track.ClockRate) + ntp := p.timeConv.getNTP(dts, trackProc.track.track.ClockRate) + err := trackProc.push(ctx, &procEntryFMP4{ - ntpAvailable: ntpAvailable, - ntpAbsolute: ntpAbsolute, - ntpRelative: multiplyAndDivide(ntpRelative, int64(trackProc.track.track.ClockRate), int64(leadingClockRate)), - partTrack: partTrack, + partTrack: partTrack, + dts: dts, + ntp: ntp, }) if err != nil { return err @@ -223,7 +218,6 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors( for i, track := range p.clientStreamTracks { trackProc := &clientTrackProcessorFMP4{ track: track, - timeConv: p.timeConv, onPartTrackProcessed: p.onPartTrackProcessed, } err := trackProc.initialize() diff --git a/client_stream_processor_mpegts.go b/client_stream_processor_mpegts.go index 67be81e..7c69fdf 100644 --- a/client_stream_processor_mpegts.go +++ b/client_stream_processor_mpegts.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "time" "github.com/asticode/go-astits" @@ -180,10 +179,6 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs return fmt.Errorf("terminated") } - ntpAvailable := false - var ntpAbsolute time.Time - var ntpRelative int64 - for i, mpegtsTrack := range p.reader.Tracks() { track := p.clientStreamTracks[i] isLeadingTrack := (i == leadingTrackID) @@ -213,24 +208,15 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs pts := p.timeConv.convert(rawPTS) dts := p.timeConv.convert(rawDTS) - if isLeadingTrack && !p.dateTimeProcessed { + if !p.dateTimeProcessed && p.isLeading && isLeadingTrack { p.dateTimeProcessed = true if p.curSegment.dateTime != nil { - ntpAvailable = true - ntpAbsolute = *p.curSegment.dateTime - ntpRelative = dts - } else { - ntpAvailable = false + p.timeConv.setNTP(*p.curSegment.dateTime, dts) } } - ntp := time.Time{} - if ntpAvailable { - diff := dts - ntpRelative - diffDur := timestampToDuration(diff, 90000) - ntp = ntpAbsolute.Add(diffDur) - } + ntp := p.timeConv.getNTP(dts) return trackProc.push(ctx, &procEntryMPEGTS{ pts: pts, diff --git a/client_test.go b/client_test.go index c03c818..083c018 100644 --- a/client_test.go +++ b/client_test.go @@ -461,8 +461,9 @@ func TestClient(t *testing.T) { require.Equal(t, int64(6000), dts) require.Equal(t, int64(6000), pts) require.Equal(t, [][]byte{{4}}, au) - _, ok := c.AbsoluteTime(tracks[0]) - require.Equal(t, false, ok) + ntp, ok := c.AbsoluteTime(tracks[0]) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 66666666, time.UTC), ntp) close(videoRecv) } videoCount++ @@ -539,7 +540,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) { "#EXT-X-INDEPENDENT-SEGMENTS\n" + "#EXT-X-TARGETDURATION:2\n" + "#EXT-X-MAP:URI=\"init_audio.mp4\"\n" + - "#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" + + "#EXT-X-PROGRAM-DATE-TIME:2014-02-05T01:02:02Z\n" + "#EXTINF:2,\n" + "segment_audio.mp4\n" + "#EXT-X-ENDLIST")) @@ -670,7 +671,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) { }, aus) ntp, ok := c.AbsoluteTime(tracks[1]) require.Equal(t, true, ok) - require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 34693877, time.UTC), ntp) + require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 68027210, time.UTC), ntp) packetRecv <- struct{}{} }) diff --git a/client_time_conv_fmp4.go b/client_time_conv_fmp4.go index dbf547b..11f406d 100644 --- a/client_time_conv_fmp4.go +++ b/client_time_conv_fmp4.go @@ -1,8 +1,19 @@ package gohlslib +import ( + "sync" + "time" +) + type clientTimeConvFMP4 struct { leadingTimeScale int64 leadingBaseTime int64 + + mutex sync.Mutex + ntpAvailable bool + ntpValue time.Time + ntpTimestamp int64 + ntpClockRate int } func (ts *clientTimeConvFMP4) initialize() { @@ -11,3 +22,29 @@ func (ts *clientTimeConvFMP4) initialize() { func (ts *clientTimeConvFMP4) convert(v int64, clockRate int) int64 { return v - multiplyAndDivide(ts.leadingBaseTime, int64(clockRate), ts.leadingTimeScale) } + +func (ts *clientTimeConvFMP4) setNTP(value time.Time, timestamp int64, clockRate int) { + ts.mutex.Lock() + defer ts.mutex.Unlock() + + ts.ntpAvailable = true + ts.ntpValue = value + ts.ntpTimestamp = timestamp + ts.ntpClockRate = clockRate +} + +func (ts *clientTimeConvFMP4) getNTP(timestamp int64, clockRate int) *time.Time { + ts.mutex.Lock() + defer ts.mutex.Unlock() + + if !ts.ntpAvailable { + return nil + } + + v := ts.ntpValue.Add( + timestampToDuration( + timestamp-multiplyAndDivide(ts.ntpTimestamp, int64(clockRate), int64(ts.ntpClockRate)), + clockRate)) + + return &v +} diff --git a/client_time_conv_mpegts.go b/client_time_conv_mpegts.go index f089be6..1f173f8 100644 --- a/client_time_conv_mpegts.go +++ b/client_time_conv_mpegts.go @@ -2,6 +2,7 @@ package gohlslib import ( "sync" + "time" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) @@ -9,8 +10,11 @@ import ( type clientTimeConvMPEGTS struct { startDTS int64 - td *mpegts.TimeDecoder2 - mutex sync.Mutex + mutex sync.Mutex + td *mpegts.TimeDecoder2 + ntpAvailable bool + ntpValue time.Time + ntpTimestamp int64 } func (ts *clientTimeConvMPEGTS) initialize() { @@ -21,5 +25,27 @@ func (ts *clientTimeConvMPEGTS) initialize() { func (ts *clientTimeConvMPEGTS) convert(v int64) int64 { ts.mutex.Lock() defer ts.mutex.Unlock() + return ts.td.Decode(v) } + +func (ts *clientTimeConvMPEGTS) setNTP(value time.Time, timestamp int64) { + ts.mutex.Lock() + defer ts.mutex.Unlock() + + ts.ntpAvailable = true + ts.ntpValue = value + ts.ntpTimestamp = timestamp +} + +func (ts *clientTimeConvMPEGTS) getNTP(timestamp int64) *time.Time { + ts.mutex.Lock() + defer ts.mutex.Unlock() + + if !ts.ntpAvailable { + return nil + } + + v := ts.ntpValue.Add(timestampToDuration(timestamp-ts.ntpTimestamp, 90000)) + return &v +} diff --git a/client_track.go b/client_track.go index d59a13b..43ed896 100644 --- a/client_track.go +++ b/client_track.go @@ -9,22 +9,22 @@ import ( type clientTrack struct { track *Track onData clientOnDataFunc - lastAbsoluteTime time.Time + lastAbsoluteTime *time.Time startRTC time.Time } func (t *clientTrack) absoluteTime() (time.Time, bool) { - if t.lastAbsoluteTime == zero { + if t.lastAbsoluteTime == nil { return zero, false } - return t.lastAbsoluteTime, true + return *t.lastAbsoluteTime, true } func (t *clientTrack) handleData( ctx context.Context, pts int64, dts int64, - ntp time.Time, + ntp *time.Time, data [][]byte, ) error { // silently discard packets prior to the first packet of the leading track diff --git a/client_track_processor_fmp4.go b/client_track_processor_fmp4.go index cf1aa4b..72375ee 100644 --- a/client_track_processor_fmp4.go +++ b/client_track_processor_fmp4.go @@ -10,15 +10,13 @@ import ( ) type procEntryFMP4 struct { - ntpAvailable bool - ntpAbsolute time.Time - ntpRelative int64 - partTrack *fmp4.PartTrack + partTrack *fmp4.PartTrack + dts int64 + ntp *time.Time } type clientTrackProcessorFMP4 struct { track *clientTrack - timeConv *clientTimeConvFMP4 onPartTrackProcessed func(ctx context.Context) decodePayload func(sample *fmp4.PartSample) ([][]byte, error) @@ -76,7 +74,7 @@ func (t *clientTrackProcessorFMP4) run(ctx context.Context) error { } func (t *clientTrackProcessorFMP4) process(ctx context.Context, entry *procEntryFMP4) error { - rawDTS := int64(entry.partTrack.BaseTime) + dts := entry.dts for _, sample := range entry.partTrack.Samples { data, err := t.decodePayload(sample) @@ -84,22 +82,20 @@ func (t *clientTrackProcessorFMP4) process(ctx context.Context, entry *procEntry return err } - dts := t.timeConv.convert(rawDTS, t.track.track.ClockRate) pts := dts + int64(sample.PTSOffset) - rawDTS += int64(sample.Duration) - - ntp := time.Time{} - if entry.ntpAvailable { - trackNTPRelative := multiplyAndDivide(entry.ntpRelative, int64(t.track.track.ClockRate), t.timeConv.leadingTimeScale) - diff := dts - trackNTPRelative - diffDur := timestampToDuration(diff, t.track.track.ClockRate) - ntp = entry.ntpAbsolute.Add(diffDur) + + var ntp *time.Time + if entry.ntp != nil { + v := entry.ntp.Add(timestampToDuration(dts-entry.dts, t.track.track.ClockRate)) + ntp = &v } err = t.track.handleData(ctx, pts, dts, ntp, data) if err != nil { return err } + + dts += int64(sample.Duration) } t.onPartTrackProcessed(ctx) diff --git a/client_track_processor_mpegts.go b/client_track_processor_mpegts.go index dce6ac4..ce8df74 100644 --- a/client_track_processor_mpegts.go +++ b/client_track_processor_mpegts.go @@ -9,7 +9,7 @@ import ( type procEntryMPEGTS struct { pts int64 dts int64 - ntp time.Time + ntp *time.Time data [][]byte }