From d74a3bd613b2f2f36a4fac96e6829cb834452007 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 1 Jan 2024 19:07:31 +0100 Subject: [PATCH] client: support extracting absolute timestamp of incoming data --- README.md | 6 +- client.go | 51 +- client_primary_downloader.go | 14 +- client_segment_queue.go | 14 +- client_stream_downloader.go | 6 +- client_stream_processor.go | 3 + client_stream_processor_fmp4.go | 86 +-- client_stream_processor_mpegts.go | 148 +++-- client_test.go | 760 ++++++++++++-------------- client_timesync_fmp4.go | 17 +- client_timesync_mpegts.go | 14 +- client_track_processor_fmp4.go | 2 +- client_track_processor_mpegts.go | 12 +- examples/client-ntp-timestamp/main.go | 68 +++ examples/client/main.go | 8 +- examples/muxer/main.go | 2 +- examples/playlist-parser/main.go | 2 +- 17 files changed, 675 insertions(+), 538 deletions(-) create mode 100644 examples/client-ntp-timestamp/main.go diff --git a/README.md b/README.md index aeb6a8e..322c407 100644 --- a/README.md +++ b/README.md @@ -14,12 +14,13 @@ Features: * Client - * Read MPEG-TS or fMP4 streams + * Read streams in MPEG-TS or fMP4 format * Read tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 Audio (AAC) + * Get NTP timestamp (absolute timestamp) of incoming data * Muxer - * Generate MPEG-TS, fMP4, Low-latency streams + * Generate streams in MPEG-TS, fMP4 or Low-latency format * Write tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 audio (AAC) * Save generated segments on disk @@ -39,6 +40,7 @@ Features: * [playlist-parser](examples/playlist-parser/main.go) * [client](examples/client/main.go) +* [client-ntp-timestamp](examples/client-ntp-timestamp/main.go) * [muxer](examples/muxer/main.go) ## API Documentation diff --git a/client.go b/client.go index 832387c..5d98d67 100644 --- a/client.go +++ b/client.go @@ -15,12 +15,11 @@ import ( ) const ( - clientMPEGTSSampleQueueSize = 100 - clientFMP4PartTrackQueueSize = 10 - clientFMP4MaxPartTracksPerSegment = 200 - clientLiveInitialDistance = 3 - clientLiveMaxDistanceFromEnd = 5 - clientMaxDTSRTCDiff = 10 * time.Second + clientMaxTracksPerStream = 10 + clientMPEGTSSampleQueueSize = 100 + clientLiveInitialDistance = 3 + clientLiveMaxDistanceFromEnd = 5 + clientMaxDTSRTCDiff = 10 * time.Second ) // ClientOnDownloadPrimaryPlaylistFunc is the prototype of Client.OnDownloadPrimaryPlaylist. @@ -92,10 +91,11 @@ type Client struct { // private // - ctx context.Context - ctxCancel func() - onData map[*Track]interface{} - playlistURL *url.URL + ctx context.Context + ctxCancel func() + onData map[*Track]interface{} + playlistURL *url.URL + primaryDownloader *clientPrimaryDownloader // out outErr chan error @@ -159,28 +159,33 @@ func (c *Client) Wait() chan error { } // OnDataAV1 sets a callback that is called when data from an AV1 track is received. -func (c *Client) OnDataAV1(forma *Track, cb ClientOnDataAV1Func) { - c.onData[forma] = cb +func (c *Client) OnDataAV1(track *Track, cb ClientOnDataAV1Func) { + c.onData[track] = cb } // OnDataVP9 sets a callback that is called when data from a VP9 track is received. -func (c *Client) OnDataVP9(forma *Track, cb ClientOnDataVP9Func) { - c.onData[forma] = cb +func (c *Client) OnDataVP9(track *Track, cb ClientOnDataVP9Func) { + c.onData[track] = cb } // OnDataH26x sets a callback that is called when data from an H26x track is received. -func (c *Client) OnDataH26x(forma *Track, cb ClientOnDataH26xFunc) { - c.onData[forma] = cb +func (c *Client) OnDataH26x(track *Track, cb ClientOnDataH26xFunc) { + c.onData[track] = cb } // OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received. -func (c *Client) OnDataMPEG4Audio(forma *Track, cb ClientOnDataMPEG4AudioFunc) { - c.onData[forma] = cb +func (c *Client) OnDataMPEG4Audio(track *Track, cb ClientOnDataMPEG4AudioFunc) { + c.onData[track] = cb } // OnDataOpus sets a callback that is called when data from an Opus track is received. -func (c *Client) OnDataOpus(forma *Track, cb ClientOnDataOpusFunc) { - c.onData[forma] = cb +func (c *Client) OnDataOpus(track *Track, cb ClientOnDataOpusFunc) { + c.onData[track] = cb +} + +// NTP returns the NTP timestamp (absolute timestamp) of a packet with given track and DTS. +func (c *Client) NTP(track *Track, dts time.Duration) (time.Time, bool) { + return c.primaryDownloader.ntp(track, dts) } func (c *Client) run() { @@ -191,7 +196,7 @@ func (c *Client) runInner() error { rp := &clientRoutinePool{} rp.initialize() - dl := &clientPrimaryDownloader{ + c.primaryDownloader = &clientPrimaryDownloader{ primaryPlaylistURL: c.playlistURL, httpClient: c.HTTPClient, onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist, @@ -202,8 +207,8 @@ func (c *Client) runInner() error { onTracks: c.OnTracks, onData: c.onData, } - dl.initialize() - rp.add(dl) + c.primaryDownloader.initialize() + rp.add(c.primaryDownloader) select { case err := <-rp.errorChan(): diff --git a/client_primary_downloader.go b/client_primary_downloader.go index 59b89f2..08467f5 100644 --- a/client_primary_downloader.go +++ b/client_primary_downloader.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/bluenviron/gohlslib/pkg/playlist" ) @@ -107,7 +108,8 @@ type clientPrimaryDownloader struct { onTracks ClientOnTracksFunc onData map[*Track]interface{} - leadingTimeSync clientTimeSync + streamProcByTrack map[*Track]clientStreamProcessor + leadingTimeSync clientTimeSync // in chStreamTracks chan clientStreamProcessor @@ -118,6 +120,7 @@ type clientPrimaryDownloader struct { } func (d *clientPrimaryDownloader) initialize() { + d.streamProcByTrack = make(map[*Track]clientStreamProcessor) d.chStreamTracks = make(chan clientStreamProcessor) d.startStreaming = make(chan struct{}) d.leadingTimeSyncReady = make(chan struct{}) @@ -227,6 +230,11 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error { } else { tracks = append(tracks, streamProc.getTracks()...) } + + for _, track := range streamProc.getTracks() { + d.streamProcByTrack[track] = streamProc + } + case <-ctx.Done(): return fmt.Errorf("terminated") } @@ -275,3 +283,7 @@ func (d *clientPrimaryDownloader) onGetLeadingTimeSync(ctx context.Context) (cli } return d.leadingTimeSync, true } + +func (d *clientPrimaryDownloader) ntp(track *Track, dts time.Duration) (time.Time, bool) { + return d.streamProcByTrack[track].ntp(dts) +} diff --git a/client_segment_queue.go b/client_segment_queue.go index 47a6d4d..28d1481 100644 --- a/client_segment_queue.go +++ b/client_segment_queue.go @@ -3,11 +3,17 @@ package gohlslib import ( "context" "sync" + "time" ) +type segmentData struct { + dateTime *time.Time + payload []byte +} + type clientSegmentQueue struct { mutex sync.Mutex - queue [][]byte + queue []*segmentData didPush chan struct{} didPull chan struct{} } @@ -17,7 +23,7 @@ func (q *clientSegmentQueue) initialize() { q.didPull = make(chan struct{}) } -func (q *clientSegmentQueue) push(seg []byte) { +func (q *clientSegmentQueue) push(seg *segmentData) { q.mutex.Lock() queueWasEmpty := (len(q.queue) == 0) @@ -50,7 +56,7 @@ func (q *clientSegmentQueue) waitUntilSizeIsBelow(ctx context.Context, n int) bo return true } -func (q *clientSegmentQueue) pull(ctx context.Context) ([]byte, bool) { +func (q *clientSegmentQueue) pull(ctx context.Context) (*segmentData, bool) { q.mutex.Lock() for len(q.queue) == 0 { @@ -66,7 +72,7 @@ func (q *clientSegmentQueue) pull(ctx context.Context) ([]byte, bool) { q.mutex.Lock() } - var seg []byte + var seg *segmentData seg, q.queue = q.queue[0], q.queue[1:] close(q.didPull) diff --git a/client_stream_downloader.go b/client_stream_downloader.go index 4e98aea..e26604f 100644 --- a/client_stream_downloader.go +++ b/client_stream_downloader.go @@ -98,6 +98,7 @@ func (d *clientStreamDownloader) run(ctx context.Context) error { onGetLeadingTimeSync: d.onGetLeadingTimeSync, onData: d.onData, } + proc.initialize() d.rp.add(proc) } @@ -224,7 +225,10 @@ func (d *clientStreamDownloader) fillSegmentQueue( return err } - segmentQueue.push(byts) + segmentQueue.push(&segmentData{ + dateTime: seg.DateTime, + payload: byts, + }) if pl.Endlist && pl.Segments[len(pl.Segments)-1] == seg { <-ctx.Done() diff --git a/client_stream_processor.go b/client_stream_processor.go index 37235b5..1114235 100644 --- a/client_stream_processor.go +++ b/client_stream_processor.go @@ -1,6 +1,9 @@ package gohlslib +import "time" + type clientStreamProcessor interface { getIsLeading() bool getTracks() []*Track + ntp(dts time.Duration) (time.Time, bool) } diff --git a/client_stream_processor_fmp4.go b/client_stream_processor_fmp4.go index a088bdb..a93c19f 100644 --- a/client_stream_processor_fmp4.go +++ b/client_stream_processor_fmp4.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "time" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" @@ -22,7 +23,7 @@ func fmp4PickLeadingTrack(init *fmp4.Init) int { return init.Tracks[0].ID } -func findPartTrackOfLeadingTrack(parts []*fmp4.Part, leadingTrackID int) *fmp4.PartTrack { +func findFirstPartTrackOfLeadingTrack(parts []*fmp4.Part, leadingTrackID int) *fmp4.PartTrack { for _, part := range parts { for _, partTrack := range part.Tracks { if partTrack.ID == leadingTrackID { @@ -57,13 +58,17 @@ type clientStreamProcessorFMP4 struct { init fmp4.Init leadingTrackID int trackProcessors map[int]*clientTrackProcessorFMP4 + timeSync *clientTimeSyncFMP4 + ntpAvailable bool + ntpAbsolute time.Time + ntpRelative time.Duration // in chPartTrackProcessed chan struct{} } func (p *clientStreamProcessorFMP4) initialize() error { - p.chPartTrackProcessed = make(chan struct{}, clientFMP4MaxPartTracksPerSegment) + p.chPartTrackProcessed = make(chan struct{}, clientMaxTracksPerStream) err := p.init.Unmarshal(bytes.NewReader(p.initFile)) if err != nil { @@ -79,6 +84,10 @@ func (p *clientStreamProcessorFMP4) initialize() error { } } + if len(p.tracks) > clientMaxTracksPerStream { + return fmt.Errorf("too many tracks per stream") + } + ok := p.onStreamTracks(p.ctx, p) if !ok { return fmt.Errorf("terminated") @@ -109,28 +118,39 @@ func (p *clientStreamProcessorFMP4) run(ctx context.Context) error { } } -func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, byts []byte) error { +func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *segmentData) error { var parts fmp4.Parts - err := parts.Unmarshal(byts) + err := parts.Unmarshal(seg.payload) if err != nil { return err } - if p.trackProcessors == nil { - err := p.initializeTrackProcessors(ctx, parts) - if err != nil { - return err + if p.trackProcessors == nil || seg.dateTime != nil { + partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID) + if partTrack == nil { + return fmt.Errorf("could not find data of leading track") + } + + if p.trackProcessors == nil { + err := p.initializeTrackProcessors(ctx, partTrack) + if err != nil { + return err + } + } + + if seg.dateTime != nil { + p.ntpAvailable = true + p.ntpAbsolute = *seg.dateTime + p.ntpRelative = p.timeSync.convert(partTrack.BaseTime, p.timeSync.leadingTimeScale) + } else { + p.ntpAvailable = false } } - processingCount := 0 + partTrackCount := 0 for _, part := range parts { for _, partTrack := range part.Tracks { - if processingCount >= (clientFMP4MaxPartTracksPerSegment - 1) { - return fmt.Errorf("too many part tracks at once") - } - trackProc, ok := p.trackProcessors[partTrack.ID] if !ok { continue @@ -141,15 +161,19 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, byts []b return err } - processingCount++ + partTrackCount++ } } - for i := 0; i < processingCount; i++ { + return p.joinTrackProcessors(ctx, partTrackCount) +} + +func (p *clientStreamProcessorFMP4) joinTrackProcessors(ctx context.Context, partTrackCount int) error { + for i := 0; i < partTrackCount; i++ { select { case <-p.chPartTrackProcessed: case <-ctx.Done(): - return fmt.Errorf("terminated") + return nil } } @@ -165,32 +189,25 @@ func (p *clientStreamProcessorFMP4) onPartTrackProcessed(ctx context.Context) { func (p *clientStreamProcessorFMP4) initializeTrackProcessors( ctx context.Context, - parts []*fmp4.Part, + partTrack *fmp4.PartTrack, ) error { - var timeSync *clientTimeSyncFMP4 - if p.isLeading { - trackPart := findPartTrackOfLeadingTrack(parts, p.leadingTrackID) - if trackPart == nil { - return nil - } - timeScale := findTimeScaleOfLeadingTrack(p.init.Tracks, p.leadingTrackID) - timeSync = &clientTimeSyncFMP4{ - timeScale: timeScale, - baseTime: trackPart.BaseTime, + p.timeSync = &clientTimeSyncFMP4{ + leadingTimeScale: timeScale, + initialBaseTime: partTrack.BaseTime, } - timeSync.initialize() + p.timeSync.initialize() - p.onSetLeadingTimeSync(timeSync) + p.onSetLeadingTimeSync(p.timeSync) } else { rawTS, ok := p.onGetLeadingTimeSync(ctx) if !ok { return fmt.Errorf("terminated") } - timeSync, ok = rawTS.(*clientTimeSyncFMP4) + p.timeSync, ok = rawTS.(*clientTimeSyncFMP4) if !ok { return fmt.Errorf("stream playlists are mixed MPEG-TS/fMP4") } @@ -203,7 +220,7 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors( track: track, onData: p.onData[track], timeScale: p.init.Tracks[i].TimeScale, - timeSync: timeSync, + timeSync: p.timeSync, onPartTrackProcessed: p.onPartTrackProcessed, } err := trackProc.initialize() @@ -217,3 +234,10 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors( return nil } + +func (p *clientStreamProcessorFMP4) ntp(dts time.Duration) (time.Time, bool) { + if !p.ntpAvailable { + return time.Time{}, false + } + return p.ntpAbsolute.Add(dts - p.ntpRelative), true +} diff --git a/client_stream_processor_mpegts.go b/client_stream_processor_mpegts.go index ea52690..890d685 100644 --- a/client_stream_processor_mpegts.go +++ b/client_stream_processor_mpegts.go @@ -3,9 +3,9 @@ package gohlslib import ( "bytes" "context" - "errors" "fmt" "io" + "time" "github.com/asticode/go-astits" @@ -14,18 +14,16 @@ import ( "github.com/bluenviron/gohlslib/pkg/codecs" ) -var errSkipSilently = errors.New("skip silently") - -func mpegtsPickLeadingTrack(mpegtsTracks []*mpegts.Track) *mpegts.Track { +func mpegtsPickLeadingTrack(mpegtsTracks []*mpegts.Track) int { // pick first video track - for _, track := range mpegtsTracks { + for i, track := range mpegtsTracks { if _, ok := track.Codec.(*mpegts.CodecH264); ok { - return track + return i } } // otherwise, pick first track - return mpegtsTracks[0] + return 0 } type switchableReader struct { @@ -46,10 +44,22 @@ type clientStreamProcessorMPEGTS struct { onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) onData map[*Track]interface{} - switchableReader *switchableReader - reader *mpegts.Reader - tracks []*Track - trackProcessors map[*Track]*clientTrackProcessorMPEGTS + switchableReader *switchableReader + reader *mpegts.Reader + tracks []*Track + trackProcessors map[*Track]*clientTrackProcessorMPEGTS + timeSync *clientTimeSyncMPEGTS + curSegment *segmentData + leadingTrackFound bool + ntpAvailable bool + ntpAbsolute time.Time + ntpRelative time.Duration + + chTrackProcessorDone chan struct{} +} + +func (p *clientStreamProcessorMPEGTS) initialize() { + p.chTrackProcessorDone = make(chan struct{}, clientMaxTracksPerStream) } func (p *clientStreamProcessorMPEGTS) getIsLeading() bool { @@ -74,25 +84,60 @@ func (p *clientStreamProcessorMPEGTS) run(ctx context.Context) error { } } -func (p *clientStreamProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error { +func (p *clientStreamProcessorMPEGTS) processSegment(ctx context.Context, seg *segmentData) error { if p.switchableReader == nil { - err := p.initializeReader(ctx, byts) + err := p.initializeReader(ctx, seg.payload) if err != nil { return err } } else { - p.switchableReader.r = bytes.NewReader(byts) + p.switchableReader.r = bytes.NewReader(seg.payload) } + p.curSegment = seg + p.leadingTrackFound = false + for { err := p.reader.Read() if err != nil { if err == astits.ErrNoMorePackets { - return nil + break } return err } } + + if !p.leadingTrackFound { + return fmt.Errorf("could not find data of leading track") + } + + return p.joinTrackProcessors(ctx) +} + +func (p *clientStreamProcessorMPEGTS) joinTrackProcessors(ctx context.Context) error { + for _, track := range p.tracks { + err := p.trackProcessors[track].push(ctx, nil) + if err != nil { + return err + } + } + + for range p.tracks { + select { + case <-p.chTrackProcessorDone: + case <-ctx.Done(): + return nil + } + } + + return nil +} + +func (p *clientStreamProcessorMPEGTS) onPartProcessorDone(ctx context.Context) { + select { + case p.chTrackProcessorDone <- struct{}{}: + case <-ctx.Done(): + } } func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, byts []byte) error { @@ -116,7 +161,7 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, byts } } - leadingTrack := mpegtsPickLeadingTrack(p.reader.Tracks()) + leadingTrackID := mpegtsPickLeadingTrack(p.reader.Tracks()) p.tracks = make([]*Track, len(p.reader.Tracks())) for i, mpegtsTrack := range p.reader.Tracks() { @@ -125,6 +170,10 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, byts } } + if len(p.tracks) > clientMaxTracksPerStream { + return fmt.Errorf("too many tracks per stream") + } + ok := p.onStreamTracks(ctx, p) if !ok { return fmt.Errorf("terminated") @@ -132,22 +181,35 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, byts for i, mpegtsTrack := range p.reader.Tracks() { track := p.tracks[i] - isLeadingTrack := (leadingTrack == mpegtsTrack) + isLeadingTrack := (i == leadingTrackID) var trackProc *clientTrackProcessorMPEGTS processSample := func(pts int64, dts int64, sample *mpegtsSample) error { - if p.trackProcessors == nil { - err := p.initializeTrackProcessors(ctx, isLeadingTrack, dts) - if err != nil { - if err == errSkipSilently { - return nil + if !p.leadingTrackFound && isLeadingTrack { + p.leadingTrackFound = true + + if p.trackProcessors == nil { + err := p.initializeTrackProcessors(ctx, dts) + if err != nil { + return err } - return err + } + + if p.curSegment.dateTime != nil { + p.ntpAvailable = true + p.ntpAbsolute = *p.curSegment.dateTime + p.ntpRelative = p.timeSync.convert(dts) + } else { + p.ntpAvailable = false } } if trackProc == nil { trackProc = p.trackProcessors[track] + + if trackProc == nil { + return nil + } } return trackProc.push(ctx, sample) @@ -156,23 +218,20 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, byts switch track.Codec.(type) { case *codecs.H264: p.reader.OnDataH26x(mpegtsTrack, func(pts int64, dts int64, au [][]byte) error { - sample := &mpegtsSample{ + return processSample(pts, dts, &mpegtsSample{ pts: pts, dts: dts, data: au, - } - return processSample(pts, dts, sample) + }) }) case *codecs.MPEG4Audio: p.reader.OnDataMPEG4Audio(mpegtsTrack, func(pts int64, aus [][]byte) error { - sample := &mpegtsSample{ + return processSample(pts, pts, &mpegtsSample{ pts: pts, dts: pts, data: aus, - } - - return processSample(pts, pts, sample) + }) }) } } @@ -182,29 +241,22 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, byts func (p *clientStreamProcessorMPEGTS) initializeTrackProcessors( ctx context.Context, - isLeadingTrack bool, dts int64, ) error { - var timeSync *clientTimeSyncMPEGTS - if p.isLeading { - if !isLeadingTrack { - return errSkipSilently - } - - timeSync = &clientTimeSyncMPEGTS{ + p.timeSync = &clientTimeSyncMPEGTS{ startDTS: dts, } - timeSync.initialize() + p.timeSync.initialize() - p.onSetLeadingTimeSync(timeSync) + p.onSetLeadingTimeSync(p.timeSync) } else { rawTS, ok := p.onGetLeadingTimeSync(ctx) if !ok { return fmt.Errorf("terminated") } - timeSync, ok = rawTS.(*clientTimeSyncMPEGTS) + p.timeSync, ok = rawTS.(*clientTimeSyncMPEGTS) if !ok { return fmt.Errorf("stream playlists are mixed MPEGTS/FMP4") } @@ -214,9 +266,10 @@ func (p *clientStreamProcessorMPEGTS) initializeTrackProcessors( for _, track := range p.tracks { proc := &clientTrackProcessorMPEGTS{ - track: track, - onData: p.onData[track], - timeSync: timeSync, + track: track, + onData: p.onData[track], + timeSync: p.timeSync, + onPartProcessorDone: p.onPartProcessorDone, } proc.initialize() p.rp.add(proc) @@ -225,3 +278,10 @@ func (p *clientStreamProcessorMPEGTS) initializeTrackProcessors( return nil } + +func (p *clientStreamProcessorMPEGTS) ntp(dts time.Duration) (time.Time, bool) { + if !p.ntpAvailable { + return time.Time{}, false + } + return p.ntpAbsolute.Add(dts - p.ntpRelative), true +} diff --git a/client_test.go b/client_test.go index db181f4..3aff615 100644 --- a/client_test.go +++ b/client_test.go @@ -1,7 +1,6 @@ package gohlslib import ( - "bytes" "context" "crypto/tls" "io" @@ -11,8 +10,6 @@ import ( "testing" "time" - "github.com/asticode/go-astits" - "github.com/aler9/writerseeker" "github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/mediacommon/pkg/codecs/h264" @@ -21,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) var serverCert = []byte(`-----BEGIN CERTIFICATE----- @@ -90,42 +88,12 @@ func writeTempFile(byts []byte) (string, error) { return tmpf.Name(), nil } -func mpegtsSegment(t *testing.T, w io.Writer) { - mux := astits.NewMuxer(context.Background(), w) - - err := mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 256, - StreamType: astits.StreamTypeH264Video, - }) - require.NoError(t, err) - - mux.SetPCRPID(256) - - _, err = mux.WriteTables() - require.NoError(t, err) - - enc, _ := h264.AnnexBMarshal([][]byte{ - {7, 1, 2, 3}, // SPS - {8}, // PPS - {5}, // IDR - }) - - _, err = mux.WriteData(&astits.MuxerData{ - PID: 256, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent, - PTS: &astits.ClockReference{Base: 90000}, // +1 sec - DTS: &astits.ClockReference{Base: 0x1FFFFFFFF - 90000 + 1}, // -1 sec - }, - StreamID: 224, // = video - }, - Data: enc, - }, - }) - require.NoError(t, err) +func mustMarshalAVCC(au [][]byte) []byte { + enc, err := h264.AVCCMarshal(au) + if err != nil { + panic(err) + } + return enc } type marshaler interface { @@ -143,340 +111,303 @@ func mp4ToWriter(i marshaler, w io.Writer) error { return err } -func TestClientMPEGTS(t *testing.T) { - for _, ca := range []string{ - "plain", - "tls", - } { - t.Run(ca, func(t *testing.T) { - gin.SetMode(gin.ReleaseMode) - router := gin.New() - sent := false - - router.GET("/stream.m3u8", func(ctx *gin.Context) { - if sent { - return - } - sent = true - - ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U -#EXT-X-VERSION:3 -#EXT-X-ALLOW-CACHE:NO -#EXT-X-TARGETDURATION:2 -#EXT-X-MEDIA-SEQUENCE:0 -#EXTINF:2, -segment.ts?key=val -#EXT-X-ENDLIST -`))) - }) - - router.GET("/segment.ts", func(ctx *gin.Context) { - require.Equal(t, "val", ctx.Query("key")) - ctx.Writer.Header().Set("Content-Type", `video/MP2T`) - - mux := astits.NewMuxer(context.Background(), ctx.Writer) - - err := mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 256, - StreamType: astits.StreamTypeH264Video, - }) - require.NoError(t, err) - - err = mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 257, - StreamType: astits.StreamTypeAACAudio, - }) - require.NoError(t, err) - - mux.SetPCRPID(256) - - _, err = mux.WriteTables() - require.NoError(t, err) - - enc, _ := h264.AnnexBMarshal([][]byte{ - {7, 1, 2, 3}, // SPS - {8}, // PPS - {5}, // IDR - }) - - _, err = mux.WriteData(&astits.MuxerData{ - PID: 256, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent, - PTS: &astits.ClockReference{Base: 90000}, // +1 sec - DTS: &astits.ClockReference{Base: 0x1FFFFFFFF - 90000 + 1}, // -1 sec - }, - StreamID: 224, // = video - }, - Data: enc, - }, - }) - require.NoError(t, err) +func TestClient(t *testing.T) { + for _, mode := range []string{"plain", "tls"} { + for _, format := range []string{"mpegts", "fmp4"} { + t.Run(mode+"_"+format, func(t *testing.T) { + gin.SetMode(gin.ReleaseMode) + router := gin.New() + + if format == "mpegts" { + router.GET("/stream.m3u8", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-VERSION:3\n" + + "#EXT-X-ALLOW-CACHE:NO\n" + + "#EXT-X-TARGETDURATION:2\n" + + "#EXT-X-MEDIA-SEQUENCE:0\n" + + "#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" + + "#EXTINF:2,\n" + + "segment1.ts?key=val\n" + + "#EXT-X-ENDLIST\n")) + }) - pkts := mpeg4audio.ADTSPackets{ - { - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - AU: []byte{1, 2, 3, 4}, - }, - } - enc, err = pkts.Marshal() - require.NoError(t, err) + router.GET("/segment1.ts", func(ctx *gin.Context) { + require.Equal(t, "val", ctx.Query("key")) + ctx.Writer.Header().Set("Content-Type", `video/MP2T`) - _, err = mux.WriteData(&astits.MuxerData{ - PID: 257, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: 0x1FFFFFFFF - 90000 + 1}, - }, - StreamID: 192, // = audio - }, - Data: enc, - }, - }) - require.NoError(t, err) - }) - - ln, err := net.Listen("tcp", "localhost:5780") - require.NoError(t, err) - - s := &http.Server{Handler: router} - - if ca == "tls" { - go func() { - serverCertFpath, err := writeTempFile(serverCert) - if err != nil { - panic(err) - } - defer os.Remove(serverCertFpath) - - serverKeyFpath, err := writeTempFile(serverKey) - if err != nil { - panic(err) - } - defer os.Remove(serverKeyFpath) - - s.ServeTLS(ln, serverCertFpath, serverKeyFpath) - }() - } else { - go s.Serve(ln) - } - - defer s.Shutdown(context.Background()) - - packetRecv := make(chan struct{}, 2) - - prefix := "http" - if ca == "tls" { - prefix = "https" - } - - var c *Client - c = &Client{ - URI: prefix + "://localhost:5780/stream.m3u8", - HTTPClient: &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - }, - OnTracks: func(tracks []*Track) error { - require.Equal(t, []*Track{ - { - Codec: &codecs.H264{}, - }, - { - Codec: &codecs.MPEG4Audio{ + h264Track := &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + mpeg4audioTrack := &mpegts.Track{ + Codec: &mpegts.CodecMPEG4Audio{ Config: mpeg4audio.AudioSpecificConfig{ Type: 2, SampleRate: 44100, ChannelCount: 2, }, }, - }, - }, tracks) - - c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) { - require.Equal(t, 2*time.Second, pts) - require.Equal(t, time.Duration(0), dts) - require.Equal(t, [][]byte{ - {7, 1, 2, 3}, - {8}, - {5}, - }, au) - packetRecv <- struct{}{} + } + mw := mpegts.NewWriter(ctx.Writer, []*mpegts.Track{h264Track, mpeg4audioTrack}) + + err := mw.WriteH26x( + h264Track, + 90000, // +1 sec + 8589844592, // -1 sec + true, + [][]byte{ + {7, 1, 2, 3}, // SPS + {8}, // PPS + {5}, // IDR + }, + ) + require.NoError(t, err) + + err = mw.WriteH26x( + h264Track, + 90000+90000/30, + 8589844592+90000/30, + false, + [][]byte{ + {1, 4, 5, 6}, + }, + ) + require.NoError(t, err) + + err = mw.WriteMPEG4Audio( + mpeg4audioTrack, + 8589844592, + [][]byte{{1, 2, 3, 4}}, + ) + require.NoError(t, err) + + err = mw.WriteMPEG4Audio( + mpeg4audioTrack, + 8589844592+90000/30, + [][]byte{{5, 6, 7, 8}}, + ) + require.NoError(t, err) + }) + } else { + router.GET("/stream.m3u8", func(ctx *gin.Context) { + ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-VERSION:7\n" + + "#EXT-X-MEDIA-SEQUENCE:20\n" + + "#EXT-X-INDEPENDENT-SEGMENTS\n" + + "#EXT-X-TARGETDURATION:2\n" + + "#EXT-X-MAP:URI=\"init.mp4?key=val\"\n" + + "#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" + + "#EXTINF:2,\n" + + "segment1.mp4?key=val\n" + + "#EXT-X-ENDLIST\n")) }) - c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) { - require.Equal(t, 0*time.Second, pts) - require.Equal(t, [][]byte{ - {1, 2, 3, 4}, - }, aus) - packetRecv <- struct{}{} + router.GET("/init.mp4", func(ctx *gin.Context) { + require.Equal(t, "val", ctx.Query("key")) + ctx.Writer.Header().Set("Content-Type", `video/mp4`) + err := mp4ToWriter(&fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 99, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: testSPS, + PPS: testPPS, + }, + }, + { + ID: 98, + TimeScale: 44100, + Codec: &fmp4.CodecMPEG4Audio{ + Config: testConfig, + }, + }, + }, + }, ctx.Writer) + require.NoError(t, err) }) - return nil - }, - } + router.GET("/segment1.mp4", func(ctx *gin.Context) { + require.Equal(t, "val", ctx.Query("key")) + ctx.Writer.Header().Set("Content-Type", `video/mp4`) + + err := mp4ToWriter(&fmp4.Part{ + Tracks: []*fmp4.PartTrack{ + { + ID: 98, + BaseTime: 44100 * 6, + Samples: []*fmp4.PartSample{ + { + Duration: 44100 / 30, + Payload: []byte{1, 2, 3, 4}, + }, + { + Duration: 44100 / 30, + Payload: []byte{5, 6, 7, 8}, + }, + }, + }, + { + ID: 99, + BaseTime: 90000 * 6, + Samples: []*fmp4.PartSample{ + { + Duration: 90000 / 30, + PTSOffset: 90000 * 2, + Payload: mustMarshalAVCC([][]byte{ + {7, 1, 2, 3}, // SPS + {8}, // PPS + {5}, // IDR + }), + }, + { + Duration: 90000 / 30, + PTSOffset: 90000 * 2, + Payload: mustMarshalAVCC([][]byte{ + {1, 4, 5, 6}, + }), + }, + }, + }, + }, + }, ctx.Writer) + require.NoError(t, err) + }) + } - err = c.Start() - require.NoError(t, err) + ln, err := net.Listen("tcp", "localhost:5780") + require.NoError(t, err) - for i := 0; i < 2; i++ { - <-packetRecv - } + s := &http.Server{Handler: router} + + if mode == "tls" { + go func() { + serverCertFpath, err := writeTempFile(serverCert) + if err != nil { + panic(err) + } + defer os.Remove(serverCertFpath) + + serverKeyFpath, err := writeTempFile(serverKey) + if err != nil { + panic(err) + } + defer os.Remove(serverKeyFpath) + + s.ServeTLS(ln, serverCertFpath, serverKeyFpath) + }() + } else { + go s.Serve(ln) + } - c.Close() - <-c.Wait() - }) - } -} + defer s.Shutdown(context.Background()) -func TestClientFMP4(t *testing.T) { - gin.SetMode(gin.ReleaseMode) - router := gin.New() + videoRecv := make(chan struct{}) + audioRecv := make(chan struct{}) + videoCount := 0 + audioCount := 0 - router.GET("/stream.m3u8", func(ctx *gin.Context) { - ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U -#EXT-X-VERSION:7 -#EXT-X-MEDIA-SEQUENCE:20 -#EXT-X-INDEPENDENT-SEGMENTS -#EXT-X-TARGETDURATION:2 -#EXT-X-MAP:URI="init.mp4?key=val" -#EXTINF:2, -segment.mp4?key=val -#EXT-X-ENDLIST -`))) - }) + prefix := "http" + if mode == "tls" { + prefix = "https" + } - router.GET("/init.mp4", func(ctx *gin.Context) { - require.Equal(t, "val", ctx.Query("key")) - ctx.Writer.Header().Set("Content-Type", `video/mp4`) - err := mp4ToWriter(&fmp4.Init{ - Tracks: []*fmp4.InitTrack{ - { - ID: 99, - TimeScale: 90000, - Codec: &fmp4.CodecH264{ - SPS: testSPS, - PPS: testPPS, + var c *Client + c = &Client{ + URI: prefix + "://localhost:5780/stream.m3u8", + HTTPClient: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, }, - }, - { - ID: 98, - TimeScale: 44100, - Codec: &fmp4.CodecMPEG4Audio{ - Config: testConfig, + OnTracks: func(tracks []*Track) error { + var sps []byte + var pps []byte + if format == "fmp4" { + sps = testSPS + pps = testPPS + } + + require.Equal(t, []*Track{ + { + Codec: &codecs.H264{ + SPS: sps, + PPS: pps, + }, + }, + { + Codec: &codecs.MPEG4Audio{ + Config: mpeg4audio.AudioSpecificConfig{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + }, + }, + }, tracks) + + c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) { + if videoCount == 0 { + require.Equal(t, time.Duration(0), dts) + require.Equal(t, 2*time.Second, pts) + require.Equal(t, [][]byte{ + {7, 1, 2, 3}, + {8}, + {5}, + }, au) + ntp, ok := c.NTP(tracks[0], dts) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp) + } else { + require.Equal(t, 33333333*time.Nanosecond, dts) + require.Equal(t, 2*time.Second+33333333*time.Nanosecond, pts) + require.Equal(t, [][]byte{{1, 4, 5, 6}}, au) + ntp, ok := c.NTP(tracks[0], dts) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 33333333, time.UTC), ntp) + } + videoRecv <- struct{}{} + videoCount++ + }) + + c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) { + if audioCount == 0 { + require.Equal(t, 0*time.Second, pts) + require.Equal(t, [][]byte{{1, 2, 3, 4}}, aus) + ntp, ok := c.NTP(tracks[1], pts) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp) + } else { + require.Equal(t, 33333333*time.Nanosecond, pts) + require.Equal(t, [][]byte{{5, 6, 7, 8}}, aus) + ntp, ok := c.NTP(tracks[1], pts) + require.Equal(t, true, ok) + require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 33333333, time.UTC), ntp) + } + audioRecv <- struct{}{} + audioCount++ + }) + + return nil }, - }, - }, - }, ctx.Writer) - require.NoError(t, err) - }) - - router.GET("/segment.mp4", func(ctx *gin.Context) { - require.Equal(t, "val", ctx.Query("key")) - ctx.Writer.Header().Set("Content-Type", `video/mp4`) - - payload, _ := h264.AVCCMarshal([][]byte{ - {7, 1, 2, 3}, // SPS - {8}, // PPS - {5}, // IDR - }) - - err := mp4ToWriter(&fmp4.Part{ - Tracks: []*fmp4.PartTrack{ - { - ID: 98, - BaseTime: 44100 * 6, - Samples: []*fmp4.PartSample{{ - Duration: 44100 / 30, - Payload: []byte{1, 2, 3, 4}, - }}, - }, - { - ID: 99, - BaseTime: 90000 * 6, - Samples: []*fmp4.PartSample{{ - Duration: 90000 / 30, - PTSOffset: 90000 * 2, - Payload: payload, - }}, - }, - }, - }, ctx.Writer) - require.NoError(t, err) - }) - - ln, err := net.Listen("tcp", "localhost:5780") - require.NoError(t, err) - - s := &http.Server{Handler: router} - go s.Serve(ln) - defer s.Shutdown(context.Background()) - - packetRecv := make(chan struct{}, 2) + } - var c *Client - c = &Client{ - URI: "http://localhost:5780/stream.m3u8", - OnTracks: func(tracks []*Track) error { - require.Equal(t, []*Track{ - { - Codec: &codecs.H264{ - SPS: testSPS, - PPS: testPPS, - }, - }, - { - Codec: &codecs.MPEG4Audio{ - Config: testConfig, - }, - }, - }, tracks) + err = c.Start() + require.NoError(t, err) - c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) { - require.Equal(t, 2*time.Second, pts) - require.Equal(t, time.Duration(0), dts) - require.Equal(t, [][]byte{ - {7, 1, 2, 3}, - {8}, - {5}, - }, au) - packetRecv <- struct{}{} - }) + <-videoRecv + <-videoRecv + <-audioRecv + <-audioRecv - c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) { - require.Equal(t, 0*time.Second, pts) - require.Equal(t, [][]byte{ - {1, 2, 3, 4}, - }, aus) - packetRecv <- struct{}{} + c.Close() + <-c.Wait() }) - - return nil - }, - } - - err = c.Start() - require.NoError(t, err) - - for i := 0; i < 2; i++ { - <-packetRecv + } } - - c.Close() - <-c.Wait() } func TestClientFMP4MultiRenditions(t *testing.T) { @@ -485,39 +416,37 @@ func TestClientFMP4MultiRenditions(t *testing.T) { router.GET("/index.m3u8", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U -#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="aac",NAME="English",DEFAULT=YES,AUTOSELECT=YES,LANGUAGE="en",URI="audio.m3u8" -#EXT-X-STREAM-INF:BANDWIDTH=7680000,CODECS="avc1.640015,mp4a.40.5",AUDIO="aac" -video.m3u8 -`))) + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID=\"aac\",NAME=\"English\"," + + "DEFAULT=YES,AUTOSELECT=YES,LANGUAGE=\"en\",URI=\"audio.m3u8\"\n" + + "#EXT-X-STREAM-INF:BANDWIDTH=7680000,CODECS=\"avc1.640015,mp4a.40.5\",AUDIO=\"aac\"\n" + + "video.m3u8\n")) }) router.GET("/video.m3u8", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U -#EXT-X-VERSION:7 -#EXT-X-MEDIA-SEQUENCE:20 -#EXT-X-INDEPENDENT-SEGMENTS -#EXT-X-TARGETDURATION:2 -#EXT-X-MAP:URI="init_video.mp4" -#EXTINF:2, -segment_video.mp4 -#EXT-X-ENDLIST -`))) + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-VERSION:7\n" + + "#EXT-X-MEDIA-SEQUENCE:20\n" + + "#EXT-X-INDEPENDENT-SEGMENTS\n" + + "#EXT-X-TARGETDURATION:2\n" + + "#EXT-X-MAP:URI=\"init_video.mp4\"\n" + + "#EXTINF:2,\n" + + "segment_video.mp4\n" + + "#EXT-X-ENDLIST\n")) }) router.GET("/audio.m3u8", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U -#EXT-X-VERSION:7 -#EXT-X-MEDIA-SEQUENCE:20 -#EXT-X-INDEPENDENT-SEGMENTS -#EXT-X-TARGETDURATION:2 -#EXT-X-MAP:URI="init_audio.mp4" -#EXTINF:2, -segment_audio.mp4 -#EXT-X-ENDLIST -`))) + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-VERSION:7\n" + + "#EXT-X-MEDIA-SEQUENCE:20\n" + + "#EXT-X-INDEPENDENT-SEGMENTS\n" + + "#EXT-X-TARGETDURATION:2\n" + + "#EXT-X-MAP:URI=\"init_audio.mp4\"\n" + + "#EXTINF:2,\n" + + "segment_audio.mp4\n" + + "#EXT-X-ENDLIST")) }) router.GET("/init_video.mp4", func(ctx *gin.Context) { @@ -558,12 +487,6 @@ segment_audio.mp4 router.GET("/segment_video.mp4", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `video/mp4`) - payload, _ := h264.AVCCMarshal([][]byte{ - {7, 1, 2, 3}, // SPS - {8}, // PPS - {5}, // IDR - }) - err := mp4ToWriter(&fmp4.Part{ Tracks: []*fmp4.PartTrack{ { @@ -571,7 +494,11 @@ segment_audio.mp4 Samples: []*fmp4.PartSample{{ Duration: 90000, PTSOffset: 90000 * 3, - Payload: payload, + Payload: mustMarshalAVCC([][]byte{ + {7, 1, 2, 3}, // SPS + {8}, // PPS + {5}, // IDR + }), }}, }, }, @@ -662,46 +589,59 @@ segment_audio.mp4 func TestClientErrorInvalidSequenceID(t *testing.T) { router := gin.New() - firstPlaylist := true + first := true router.GET("/stream.m3u8", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - if firstPlaylist { - firstPlaylist = false - io.Copy(ctx.Writer, bytes.NewReader([]byte( - `#EXTM3U -#EXT-X-VERSION:3 -#EXT-X-ALLOW-CACHE:NO -#EXT-X-TARGETDURATION:2 -#EXT-X-MEDIA-SEQUENCE:2 -#EXTINF:2, -segment1.ts -#EXTINF:2, -segment1.ts -#EXTINF:2, -segment1.ts -`))) + if first { + first = false + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-VERSION:3\n" + + "#EXT-X-ALLOW-CACHE:NO\n" + + "#EXT-X-TARGETDURATION:2\n" + + "#EXT-X-MEDIA-SEQUENCE:2\n" + + "#EXTINF:2,\n" + + "segment1.ts\n" + + "#EXTINF:2,\n" + + "segment1.ts\n" + + "#EXTINF:2,\n" + + "segment1.ts\n")) } else { - io.Copy(ctx.Writer, bytes.NewReader([]byte( - `#EXTM3U -#EXT-X-VERSION:3 -#EXT-X-ALLOW-CACHE:NO -#EXT-X-TARGETDURATION:2 -#EXT-X-MEDIA-SEQUENCE:4 -#EXTINF:2, -segment1.ts -#EXTINF:2, -segment1.ts -#EXTINF:2, -segment1.ts -`))) + ctx.Writer.Write([]byte("#EXTM3U\n" + + "#EXT-X-VERSION:3\n" + + "#EXT-X-ALLOW-CACHE:NO\n" + + "#EXT-X-TARGETDURATION:2\n" + + "#EXT-X-MEDIA-SEQUENCE:4\n" + + "#EXTINF:2,\n" + + "segment1.ts\n" + + "#EXTINF:2,\n" + + "segment1.ts\n" + + "#EXTINF:2,\n" + + "segment1.ts\n")) } }) router.GET("/segment1.ts", func(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `video/MP2T`) - mpegtsSegment(t, ctx.Writer) + + h264Track := &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + mw := mpegts.NewWriter(ctx.Writer, []*mpegts.Track{h264Track}) + + err := mw.WriteH26x( + h264Track, + 90000, // +1 sec + 0x1FFFFFFFF-90000+1, // -1 sec + true, + [][]byte{ + {7, 1, 2, 3}, // SPS + {8}, // PPS + {5}, // IDR + }, + ) + require.NoError(t, err) }) ln, err := net.Listen("tcp", "localhost:5780") diff --git a/client_timesync_fmp4.go b/client_timesync_fmp4.go index 670ab33..ac86866 100644 --- a/client_timesync_fmp4.go +++ b/client_timesync_fmp4.go @@ -21,8 +21,8 @@ func durationMp4ToGo(v uint64, timeScale uint32) time.Duration { } type clientTimeSyncFMP4 struct { - timeScale uint32 - baseTime uint64 + leadingTimeScale uint32 + initialBaseTime uint64 startRTC time.Time startDTS time.Duration @@ -30,7 +30,11 @@ type clientTimeSyncFMP4 struct { func (ts *clientTimeSyncFMP4) initialize() { ts.startRTC = time.Now() - ts.startDTS = durationMp4ToGo(ts.baseTime, ts.timeScale) + ts.startDTS = durationMp4ToGo(ts.initialBaseTime, ts.leadingTimeScale) +} + +func (ts *clientTimeSyncFMP4) convert(v uint64, timeScale uint32) time.Duration { + return durationMp4ToGo(v, timeScale) - ts.startDTS } func (ts *clientTimeSyncFMP4) convertAndSync( @@ -39,11 +43,8 @@ func (ts *clientTimeSyncFMP4) convertAndSync( rawDTS uint64, ptsOffset int32, ) (time.Duration, time.Duration, error) { - pts := durationMp4ToGo(rawDTS+uint64(ptsOffset), timeScale) - dts := durationMp4ToGo(rawDTS, timeScale) - - pts -= ts.startDTS - dts -= ts.startDTS + pts := ts.convert(rawDTS+uint64(ptsOffset), timeScale) + dts := ts.convert(rawDTS, timeScale) elapsed := time.Since(ts.startRTC) if dts > elapsed { diff --git a/client_timesync_mpegts.go b/client_timesync_mpegts.go index f0f6b27..ab0841e 100644 --- a/client_timesync_mpegts.go +++ b/client_timesync_mpegts.go @@ -22,12 +22,18 @@ func (ts *clientTimeSyncMPEGTS) initialize() { ts.td = mpegts.NewTimeDecoder(ts.startDTS) } -func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, - rawPTS int64, rawDTS int64, +func (ts *clientTimeSyncMPEGTS) convert(v int64) time.Duration { + return ts.td.Decode(v) +} + +func (ts *clientTimeSyncMPEGTS) convertAndSync( + ctx context.Context, + rawPTS int64, + rawDTS int64, ) (time.Duration, time.Duration, error) { ts.mutex.Lock() - pts := ts.td.Decode(rawPTS) - dts := ts.td.Decode(rawDTS) + pts := ts.convert(rawPTS) + dts := ts.convert(rawDTS) ts.mutex.Unlock() elapsed := time.Since(ts.startRTC) diff --git a/client_track_processor_fmp4.go b/client_track_processor_fmp4.go index 1cc6e85..2596556 100644 --- a/client_track_processor_fmp4.go +++ b/client_track_processor_fmp4.go @@ -90,7 +90,7 @@ func (t *clientTrackProcessorFMP4) initialize() error { } } - t.queue = make(chan *fmp4.PartTrack, clientFMP4PartTrackQueueSize) + t.queue = make(chan *fmp4.PartTrack) return nil } diff --git a/client_track_processor_mpegts.go b/client_track_processor_mpegts.go index 102fd61..166fa17 100644 --- a/client_track_processor_mpegts.go +++ b/client_track_processor_mpegts.go @@ -15,9 +15,10 @@ type mpegtsSample struct { } type clientTrackProcessorMPEGTS struct { - track *Track - onData interface{} - timeSync *clientTimeSyncMPEGTS + track *Track + onData interface{} + timeSync *clientTimeSyncMPEGTS + onPartProcessorDone func(ctx context.Context) postProcess func(pts time.Duration, dts time.Duration, data [][]byte) @@ -66,6 +67,11 @@ func (t *clientTrackProcessorMPEGTS) run(ctx context.Context) error { } func (t *clientTrackProcessorMPEGTS) process(ctx context.Context, sample *mpegtsSample) error { + if sample == nil { + t.onPartProcessorDone(ctx) + return nil + } + pts, dts, err := t.timeSync.convertAndSync(ctx, sample.pts, sample.dts) if err != nil { return err diff --git a/examples/client-ntp-timestamp/main.go b/examples/client-ntp-timestamp/main.go new file mode 100644 index 0000000..9153a47 --- /dev/null +++ b/examples/client-ntp-timestamp/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "log" + "time" + + "github.com/bluenviron/gohlslib" + "github.com/bluenviron/gohlslib/pkg/codecs" +) + +// This example shows how to +// 1. read a HLS stream +// 2. get NTP timestamp (absolute timestamp) of incoming data + +func main() { + // setup client + var c *gohlslib.Client + c = &gohlslib.Client{ + URI: "https://myserver/mystream/index.m3u8", + + // set a callback that is called when tracks are parsed + OnTracks: func(tracks []*gohlslib.Track) error { + for _, track := range tracks { + ttrack := track + + log.Printf("detected track with codec %T\n", track.Codec) + + // set a callback that is called when data is received + switch track.Codec.(type) { + case *codecs.AV1: + c.OnDataAV1(track, func(pts time.Duration, tu [][]byte) { + ntp, ntpAvailable := c.NTP(ttrack, pts) + log.Printf("received data from track %T, pts = %v, ntp available = %v, ntp = %v\n", ttrack, pts, ntpAvailable, ntp) + }) + + case *codecs.H264, *codecs.H265: + c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { + ntp, ntpAvailable := c.NTP(ttrack, pts) + log.Printf("received data from track %T, pts = %v, ntp available = %v, ntp = %v\n", ttrack, pts, ntpAvailable, ntp) + }) + + case *codecs.MPEG4Audio: + c.OnDataMPEG4Audio(track, func(pts time.Duration, aus [][]byte) { + ntp, ntpAvailable := c.NTP(ttrack, pts) + log.Printf("received data from track %T, pts = %v, ntp available = %v, ntp = %v\n", ttrack, pts, ntpAvailable, ntp) + }) + + case *codecs.Opus: + c.OnDataOpus(track, func(pts time.Duration, packets [][]byte) { + ntp, ntpAvailable := c.NTP(ttrack, pts) + log.Printf("received data from track %T, pts = %v, ntp available = %v, ntp = %v\n", ttrack, pts, ntpAvailable, ntp) + }) + } + } + return nil + }, + } + + // start reading + err := c.Start() + if err != nil { + panic(err) + } + defer c.Close() + + // wait for a fatal error + panic(<-c.Wait()) +} diff --git a/examples/client/main.go b/examples/client/main.go index 0dca577..cf5600b 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -27,22 +27,22 @@ func main() { switch track.Codec.(type) { case *codecs.AV1: c.OnDataAV1(track, func(pts time.Duration, tu [][]byte) { - log.Printf("received data from track %T, pts = %v", ttrack, pts) + log.Printf("received data from track %T, pts = %v\n", ttrack, pts) }) case *codecs.H264, *codecs.H265: c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { - log.Printf("received data from track %T, pts = %v", ttrack, pts) + log.Printf("received data from track %T, pts = %v\n", ttrack, pts) }) case *codecs.MPEG4Audio: c.OnDataMPEG4Audio(track, func(pts time.Duration, aus [][]byte) { - log.Printf("received data from track %T, pts = %v", ttrack, pts) + log.Printf("received data from track %T, pts = %v\n", ttrack, pts) }) case *codecs.Opus: c.OnDataOpus(track, func(pts time.Duration, packets [][]byte) { - log.Printf("received data from track %T, pts = %v", ttrack, pts) + log.Printf("received data from track %T, pts = %v\n", ttrack, pts) }) } } diff --git a/examples/muxer/main.go b/examples/muxer/main.go index 4697f5e..d6c5da1 100644 --- a/examples/muxer/main.go +++ b/examples/muxer/main.go @@ -15,7 +15,7 @@ import ( // This example shows how to: // 1. generate a MPEG-TS/H264 stream with GStreamer -// 2. re-encode the stream into HLS and serve it with an HTTP server. +// 2. re-encode the stream into HLS and serve it with an HTTP server //go:embed index.html var index []byte diff --git a/examples/playlist-parser/main.go b/examples/playlist-parser/main.go index 78e0b99..b2f67ac 100644 --- a/examples/playlist-parser/main.go +++ b/examples/playlist-parser/main.go @@ -11,7 +11,7 @@ import ( // This example shows how to download and parse a HLS playlist. func main() { - // connect to the HTTP server of the playlist + // connect to the HTTP server that provides the playlist req, err := http.Get("http://amssamples.streaming.mediaservices.windows.net/91492735-c523-432b-ba01-faba6c2206a2/AzureMediaServicesPromo.ism/manifest(format=m3u8-aapl)") if err != nil { panic(err)