Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix computing absolute timestamp with multiple renditions #192

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"

Expand Down Expand Up @@ -121,30 +120,24 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
return err
}

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative int64
var leadingClockRate int

partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if partTrack == nil {
leadingPartTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if leadingPartTrack == nil {
return fmt.Errorf("could not find data of leading track")
}

if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, partTrack)
err := p.initializeTrackProcessors(ctx, leadingPartTrack)
if err != nil {
return err
}
}

leadingTrackProc := p.trackProcessors[partTrack.ID]
leadingClockRate = leadingTrackProc.track.track.ClockRate

if seg.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *seg.dateTime
ntpRelative = p.timeConv.convert(int64(partTrack.BaseTime), leadingClockRate)
if p.isLeading {
if seg.dateTime != nil {
leadingPartTrackProc := p.trackProcessors[leadingPartTrack.ID]
dts := p.timeConv.convert(int64(leadingPartTrack.BaseTime), leadingPartTrackProc.track.track.ClockRate)
p.timeConv.setNTP(*seg.dateTime, dts, leadingPartTrackProc.track.track.ClockRate)
}
}

partTrackCount := 0
Expand All @@ -156,11 +149,13 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
continue
}

dts := p.timeConv.convert(int64(partTrack.BaseTime), trackProc.track.track.ClockRate)
ntp := p.timeConv.getNTP(dts, trackProc.track.track.ClockRate)

err := trackProc.push(ctx, &procEntryFMP4{
ntpAvailable: ntpAvailable,
ntpAbsolute: ntpAbsolute,
ntpRelative: multiplyAndDivide(ntpRelative, int64(trackProc.track.track.ClockRate), int64(leadingClockRate)),
partTrack: partTrack,
partTrack: partTrack,
dts: dts,
ntp: ntp,
})
if err != nil {
return err
Expand Down Expand Up @@ -223,7 +218,6 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors(
for i, track := range p.clientStreamTracks {
trackProc := &clientTrackProcessorFMP4{
track: track,
timeConv: p.timeConv,
onPartTrackProcessed: p.onPartTrackProcessed,
}
err := trackProc.initialize()
Expand Down
20 changes: 3 additions & 17 deletions client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"time"

"github.com/asticode/go-astits"

Expand Down Expand Up @@ -180,10 +179,6 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
return fmt.Errorf("terminated")
}

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative int64

for i, mpegtsTrack := range p.reader.Tracks() {
track := p.clientStreamTracks[i]
isLeadingTrack := (i == leadingTrackID)
Expand Down Expand Up @@ -213,24 +208,15 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
pts := p.timeConv.convert(rawPTS)
dts := p.timeConv.convert(rawDTS)

if isLeadingTrack && !p.dateTimeProcessed {
if !p.dateTimeProcessed && p.isLeading && isLeadingTrack {
p.dateTimeProcessed = true

if p.curSegment.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *p.curSegment.dateTime
ntpRelative = dts
} else {
ntpAvailable = false
p.timeConv.setNTP(*p.curSegment.dateTime, dts)
}
}

ntp := time.Time{}
if ntpAvailable {
diff := dts - ntpRelative
diffDur := timestampToDuration(diff, 90000)
ntp = ntpAbsolute.Add(diffDur)
}
ntp := p.timeConv.getNTP(dts)

return trackProc.push(ctx, &procEntryMPEGTS{
pts: pts,
Expand Down
13 changes: 7 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,9 @@ func TestClient(t *testing.T) {
require.Equal(t, int64(6000), dts)
require.Equal(t, int64(6000), pts)
require.Equal(t, [][]byte{{4}}, au)
_, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, false, ok)
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 66666666, time.UTC), ntp)
close(videoRecv)
}
videoCount++
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
"#EXT-X-INDEPENDENT-SEGMENTS\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MAP:URI=\"init_audio.mp4\"\n" +
"#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" +
"#EXT-X-PROGRAM-DATE-TIME:2014-02-05T01:02:02Z\n" +
"#EXTINF:2,\n" +
"segment_audio.mp4\n" +
"#EXT-X-ENDLIST"))
Expand Down Expand Up @@ -601,7 +602,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 3000,
BaseTime: 44100 / 2, // +0.5 sec
Samples: []*fmp4.PartSample{{
Duration: 44100,
Payload: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -664,13 +665,13 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
})

c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
require.Equal(t, int64(3000), pts)
require.Equal(t, int64(22050), pts)
require.Equal(t, [][]byte{
{1, 2, 3, 4},
}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 34693877, time.UTC), ntp)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 500000000, time.UTC), ntp)
packetRecv <- struct{}{}
})

Expand Down
37 changes: 37 additions & 0 deletions client_time_conv_fmp4.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package gohlslib

import (
"sync"
"time"
)

type clientTimeConvFMP4 struct {
leadingTimeScale int64
leadingBaseTime int64

mutex sync.Mutex
ntpAvailable bool
ntpValue time.Time
ntpTimestamp int64
ntpClockRate int
}

func (ts *clientTimeConvFMP4) initialize() {
Expand All @@ -11,3 +22,29 @@ func (ts *clientTimeConvFMP4) initialize() {
func (ts *clientTimeConvFMP4) convert(v int64, clockRate int) int64 {
return v - multiplyAndDivide(ts.leadingBaseTime, int64(clockRate), ts.leadingTimeScale)
}

func (ts *clientTimeConvFMP4) setNTP(value time.Time, timestamp int64, clockRate int) {
ts.mutex.Lock()
defer ts.mutex.Unlock()

ts.ntpAvailable = true
ts.ntpValue = value
ts.ntpTimestamp = timestamp
ts.ntpClockRate = clockRate
}

func (ts *clientTimeConvFMP4) getNTP(timestamp int64, clockRate int) *time.Time {
ts.mutex.Lock()
defer ts.mutex.Unlock()

if !ts.ntpAvailable {
return nil
}

v := ts.ntpValue.Add(
timestampToDuration(
timestamp-multiplyAndDivide(ts.ntpTimestamp, int64(clockRate), int64(ts.ntpClockRate)),
clockRate))

return &v
}
30 changes: 28 additions & 2 deletions client_time_conv_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package gohlslib

import (
"sync"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)

type clientTimeConvMPEGTS struct {
startDTS int64

td *mpegts.TimeDecoder2
mutex sync.Mutex
mutex sync.Mutex
td *mpegts.TimeDecoder2
ntpAvailable bool
ntpValue time.Time
ntpTimestamp int64
}

func (ts *clientTimeConvMPEGTS) initialize() {
Expand All @@ -21,5 +25,27 @@ func (ts *clientTimeConvMPEGTS) initialize() {
func (ts *clientTimeConvMPEGTS) convert(v int64) int64 {
ts.mutex.Lock()
defer ts.mutex.Unlock()

return ts.td.Decode(v)
}

func (ts *clientTimeConvMPEGTS) setNTP(value time.Time, timestamp int64) {
ts.mutex.Lock()
defer ts.mutex.Unlock()

ts.ntpAvailable = true
ts.ntpValue = value
ts.ntpTimestamp = timestamp
}

func (ts *clientTimeConvMPEGTS) getNTP(timestamp int64) *time.Time {
ts.mutex.Lock()
defer ts.mutex.Unlock()

if !ts.ntpAvailable {
return nil
}

v := ts.ntpValue.Add(timestampToDuration(timestamp-ts.ntpTimestamp, 90000))
return &v
}
8 changes: 4 additions & 4 deletions client_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
type clientTrack struct {
track *Track
onData clientOnDataFunc
lastAbsoluteTime time.Time
lastAbsoluteTime *time.Time
startRTC time.Time
}

func (t *clientTrack) absoluteTime() (time.Time, bool) {
if t.lastAbsoluteTime == zero {
if t.lastAbsoluteTime == nil {
return zero, false
}
return t.lastAbsoluteTime, true
return *t.lastAbsoluteTime, true
}

func (t *clientTrack) handleData(
ctx context.Context,
pts int64,
dts int64,
ntp time.Time,
ntp *time.Time,
data [][]byte,
) error {
// silently discard packets prior to the first packet of the leading track
Expand Down
26 changes: 11 additions & 15 deletions client_track_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import (
)

type procEntryFMP4 struct {
ntpAvailable bool
ntpAbsolute time.Time
ntpRelative int64
partTrack *fmp4.PartTrack
partTrack *fmp4.PartTrack
dts int64
ntp *time.Time
}

type clientTrackProcessorFMP4 struct {
track *clientTrack
timeConv *clientTimeConvFMP4
onPartTrackProcessed func(ctx context.Context)

decodePayload func(sample *fmp4.PartSample) ([][]byte, error)
Expand Down Expand Up @@ -76,30 +74,28 @@ func (t *clientTrackProcessorFMP4) run(ctx context.Context) error {
}

func (t *clientTrackProcessorFMP4) process(ctx context.Context, entry *procEntryFMP4) error {
rawDTS := int64(entry.partTrack.BaseTime)
dts := entry.dts

for _, sample := range entry.partTrack.Samples {
data, err := t.decodePayload(sample)
if err != nil {
return err
}

dts := t.timeConv.convert(rawDTS, t.track.track.ClockRate)
pts := dts + int64(sample.PTSOffset)
rawDTS += int64(sample.Duration)

ntp := time.Time{}
if entry.ntpAvailable {
trackNTPRelative := multiplyAndDivide(entry.ntpRelative, int64(t.track.track.ClockRate), t.timeConv.leadingTimeScale)
diff := dts - trackNTPRelative
diffDur := timestampToDuration(diff, t.track.track.ClockRate)
ntp = entry.ntpAbsolute.Add(diffDur)

var ntp *time.Time
if entry.ntp != nil {
v := entry.ntp.Add(timestampToDuration(dts-entry.dts, t.track.track.ClockRate))
ntp = &v
}

err = t.track.handleData(ctx, pts, dts, ntp, data)
if err != nil {
return err
}

dts += int64(sample.Duration)
}

t.onPartTrackProcessed(ctx)
Expand Down
2 changes: 1 addition & 1 deletion client_track_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type procEntryMPEGTS struct {
pts int64
dts int64
ntp time.Time
ntp *time.Time
data [][]byte
}

Expand Down
Loading