Skip to content

Commit

Permalink
client: support extracting absolute timestamp of incoming data
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jan 1, 2024
1 parent 9eb4ef3 commit d74a3bd
Show file tree
Hide file tree
Showing 17 changed files with 675 additions and 538 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ Features:

* Client

* Read MPEG-TS or fMP4 streams
* Read streams in MPEG-TS or fMP4 format
* Read tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 Audio (AAC)
* Get NTP timestamp (absolute timestamp) of incoming data

* Muxer

* Generate MPEG-TS, fMP4, Low-latency streams
* Generate streams in MPEG-TS, fMP4 or Low-latency format
* Write tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 audio (AAC)
* Save generated segments on disk

Expand All @@ -39,6 +40,7 @@ Features:

* [playlist-parser](examples/playlist-parser/main.go)
* [client](examples/client/main.go)
* [client-ntp-timestamp](examples/client-ntp-timestamp/main.go)
* [muxer](examples/muxer/main.go)

## API Documentation
Expand Down
51 changes: 28 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ import (
)

const (
clientMPEGTSSampleQueueSize = 100
clientFMP4PartTrackQueueSize = 10
clientFMP4MaxPartTracksPerSegment = 200
clientLiveInitialDistance = 3
clientLiveMaxDistanceFromEnd = 5
clientMaxDTSRTCDiff = 10 * time.Second
clientMaxTracksPerStream = 10
clientMPEGTSSampleQueueSize = 100
clientLiveInitialDistance = 3
clientLiveMaxDistanceFromEnd = 5
clientMaxDTSRTCDiff = 10 * time.Second
)

// ClientOnDownloadPrimaryPlaylistFunc is the prototype of Client.OnDownloadPrimaryPlaylist.
Expand Down Expand Up @@ -92,10 +91,11 @@ type Client struct {
// private
//

ctx context.Context
ctxCancel func()
onData map[*Track]interface{}
playlistURL *url.URL
ctx context.Context
ctxCancel func()
onData map[*Track]interface{}
playlistURL *url.URL
primaryDownloader *clientPrimaryDownloader

// out
outErr chan error
Expand Down Expand Up @@ -159,28 +159,33 @@ func (c *Client) Wait() chan error {
}

// OnDataAV1 sets a callback that is called when data from an AV1 track is received.
func (c *Client) OnDataAV1(forma *Track, cb ClientOnDataAV1Func) {
c.onData[forma] = cb
func (c *Client) OnDataAV1(track *Track, cb ClientOnDataAV1Func) {
c.onData[track] = cb

Check warning on line 163 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L162-L163

Added lines #L162 - L163 were not covered by tests
}

// OnDataVP9 sets a callback that is called when data from a VP9 track is received.
func (c *Client) OnDataVP9(forma *Track, cb ClientOnDataVP9Func) {
c.onData[forma] = cb
func (c *Client) OnDataVP9(track *Track, cb ClientOnDataVP9Func) {
c.onData[track] = cb

Check warning on line 168 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L167-L168

Added lines #L167 - L168 were not covered by tests
}

// OnDataH26x sets a callback that is called when data from an H26x track is received.
func (c *Client) OnDataH26x(forma *Track, cb ClientOnDataH26xFunc) {
c.onData[forma] = cb
func (c *Client) OnDataH26x(track *Track, cb ClientOnDataH26xFunc) {
c.onData[track] = cb
}

// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received.
func (c *Client) OnDataMPEG4Audio(forma *Track, cb ClientOnDataMPEG4AudioFunc) {
c.onData[forma] = cb
func (c *Client) OnDataMPEG4Audio(track *Track, cb ClientOnDataMPEG4AudioFunc) {
c.onData[track] = cb
}

// OnDataOpus sets a callback that is called when data from an Opus track is received.
func (c *Client) OnDataOpus(forma *Track, cb ClientOnDataOpusFunc) {
c.onData[forma] = cb
func (c *Client) OnDataOpus(track *Track, cb ClientOnDataOpusFunc) {
c.onData[track] = cb

Check warning on line 183 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L182-L183

Added lines #L182 - L183 were not covered by tests
}

// NTP returns the NTP timestamp (absolute timestamp) of a packet with given track and DTS.
func (c *Client) NTP(track *Track, dts time.Duration) (time.Time, bool) {
return c.primaryDownloader.ntp(track, dts)
}

func (c *Client) run() {
Expand All @@ -191,7 +196,7 @@ func (c *Client) runInner() error {
rp := &clientRoutinePool{}
rp.initialize()

dl := &clientPrimaryDownloader{
c.primaryDownloader = &clientPrimaryDownloader{
primaryPlaylistURL: c.playlistURL,
httpClient: c.HTTPClient,
onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist,
Expand All @@ -202,8 +207,8 @@ func (c *Client) runInner() error {
onTracks: c.OnTracks,
onData: c.onData,
}
dl.initialize()
rp.add(dl)
c.primaryDownloader.initialize()
rp.add(c.primaryDownloader)

select {
case err := <-rp.errorChan():
Expand Down
14 changes: 13 additions & 1 deletion client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/bluenviron/gohlslib/pkg/playlist"
)
Expand Down Expand Up @@ -107,7 +108,8 @@ type clientPrimaryDownloader struct {
onTracks ClientOnTracksFunc
onData map[*Track]interface{}

leadingTimeSync clientTimeSync
streamProcByTrack map[*Track]clientStreamProcessor
leadingTimeSync clientTimeSync

// in
chStreamTracks chan clientStreamProcessor
Expand All @@ -118,6 +120,7 @@ type clientPrimaryDownloader struct {
}

func (d *clientPrimaryDownloader) initialize() {
d.streamProcByTrack = make(map[*Track]clientStreamProcessor)
d.chStreamTracks = make(chan clientStreamProcessor)
d.startStreaming = make(chan struct{})
d.leadingTimeSyncReady = make(chan struct{})
Expand Down Expand Up @@ -227,6 +230,11 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
} else {
tracks = append(tracks, streamProc.getTracks()...)
}

for _, track := range streamProc.getTracks() {
d.streamProcByTrack[track] = streamProc
}

case <-ctx.Done():
return fmt.Errorf("terminated")
}
Expand Down Expand Up @@ -275,3 +283,7 @@ func (d *clientPrimaryDownloader) onGetLeadingTimeSync(ctx context.Context) (cli
}
return d.leadingTimeSync, true
}

func (d *clientPrimaryDownloader) ntp(track *Track, dts time.Duration) (time.Time, bool) {
return d.streamProcByTrack[track].ntp(dts)
}
14 changes: 10 additions & 4 deletions client_segment_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package gohlslib
import (
"context"
"sync"
"time"
)

type segmentData struct {
dateTime *time.Time
payload []byte
}

type clientSegmentQueue struct {
mutex sync.Mutex
queue [][]byte
queue []*segmentData
didPush chan struct{}
didPull chan struct{}
}
Expand All @@ -17,7 +23,7 @@ func (q *clientSegmentQueue) initialize() {
q.didPull = make(chan struct{})
}

func (q *clientSegmentQueue) push(seg []byte) {
func (q *clientSegmentQueue) push(seg *segmentData) {
q.mutex.Lock()

queueWasEmpty := (len(q.queue) == 0)
Expand Down Expand Up @@ -50,7 +56,7 @@ func (q *clientSegmentQueue) waitUntilSizeIsBelow(ctx context.Context, n int) bo
return true
}

func (q *clientSegmentQueue) pull(ctx context.Context) ([]byte, bool) {
func (q *clientSegmentQueue) pull(ctx context.Context) (*segmentData, bool) {
q.mutex.Lock()

for len(q.queue) == 0 {
Expand All @@ -66,7 +72,7 @@ func (q *clientSegmentQueue) pull(ctx context.Context) ([]byte, bool) {
q.mutex.Lock()
}

var seg []byte
var seg *segmentData
seg, q.queue = q.queue[0], q.queue[1:]

close(q.didPull)
Expand Down
6 changes: 5 additions & 1 deletion client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}
proc.initialize()
d.rp.add(proc)
}

Expand Down Expand Up @@ -224,7 +225,10 @@ func (d *clientStreamDownloader) fillSegmentQueue(
return err
}

segmentQueue.push(byts)
segmentQueue.push(&segmentData{
dateTime: seg.DateTime,
payload: byts,
})

if pl.Endlist && pl.Segments[len(pl.Segments)-1] == seg {
<-ctx.Done()
Expand Down
3 changes: 3 additions & 0 deletions client_stream_processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gohlslib

import "time"

type clientStreamProcessor interface {
getIsLeading() bool
getTracks() []*Track
ntp(dts time.Duration) (time.Time, bool)
}
Loading

0 comments on commit d74a3bd

Please sign in to comment.