Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support extracting absolute timestamp of incoming data #122

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ Features:

* Client

* Read MPEG-TS or fMP4 streams
* Read streams in MPEG-TS or fMP4 format
* Read tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 Audio (AAC)
* Get NTP timestamp (absolute timestamp) of incoming data

* Muxer

* Generate MPEG-TS, fMP4, Low-latency streams
* Generate streams in MPEG-TS, fMP4 or Low-latency format
* Write tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 audio (AAC)
* Save generated segments on disk

Expand All @@ -39,6 +40,7 @@ Features:

* [playlist-parser](examples/playlist-parser/main.go)
* [client](examples/client/main.go)
* [client-ntp-timestamp](examples/client-ntp-timestamp/main.go)
* [muxer](examples/muxer/main.go)

## API Documentation
Expand Down
51 changes: 28 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
)

const (
clientMPEGTSSampleQueueSize = 100
clientFMP4PartTrackQueueSize = 10
clientFMP4MaxPartTracksPerSegment = 200
clientLiveInitialDistance = 3
clientLiveMaxDistanceFromEnd = 5
clientMaxDTSRTCDiff = 10 * time.Second
clientMaxTracksPerStream = 10
clientMPEGTSSampleQueueSize = 100
clientLiveInitialDistance = 3
clientLiveMaxDistanceFromEnd = 5
clientMaxDTSRTCDiff = 10 * time.Second
)

// ClientOnDownloadPrimaryPlaylistFunc is the prototype of Client.OnDownloadPrimaryPlaylist.
Expand Down Expand Up @@ -92,10 +91,11 @@
// private
//

ctx context.Context
ctxCancel func()
onData map[*Track]interface{}
playlistURL *url.URL
ctx context.Context
ctxCancel func()
onData map[*Track]interface{}
playlistURL *url.URL
primaryDownloader *clientPrimaryDownloader

// out
outErr chan error
Expand Down Expand Up @@ -159,28 +159,33 @@
}

// OnDataAV1 sets a callback that is called when data from an AV1 track is received.
func (c *Client) OnDataAV1(forma *Track, cb ClientOnDataAV1Func) {
c.onData[forma] = cb
func (c *Client) OnDataAV1(track *Track, cb ClientOnDataAV1Func) {
c.onData[track] = cb

Check warning on line 163 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L162-L163

Added lines #L162 - L163 were not covered by tests
}

// OnDataVP9 sets a callback that is called when data from a VP9 track is received.
func (c *Client) OnDataVP9(forma *Track, cb ClientOnDataVP9Func) {
c.onData[forma] = cb
func (c *Client) OnDataVP9(track *Track, cb ClientOnDataVP9Func) {
c.onData[track] = cb

Check warning on line 168 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L167-L168

Added lines #L167 - L168 were not covered by tests
}

// OnDataH26x sets a callback that is called when data from an H26x track is received.
func (c *Client) OnDataH26x(forma *Track, cb ClientOnDataH26xFunc) {
c.onData[forma] = cb
func (c *Client) OnDataH26x(track *Track, cb ClientOnDataH26xFunc) {
c.onData[track] = cb
}

// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received.
func (c *Client) OnDataMPEG4Audio(forma *Track, cb ClientOnDataMPEG4AudioFunc) {
c.onData[forma] = cb
func (c *Client) OnDataMPEG4Audio(track *Track, cb ClientOnDataMPEG4AudioFunc) {
c.onData[track] = cb
}

// OnDataOpus sets a callback that is called when data from an Opus track is received.
func (c *Client) OnDataOpus(forma *Track, cb ClientOnDataOpusFunc) {
c.onData[forma] = cb
func (c *Client) OnDataOpus(track *Track, cb ClientOnDataOpusFunc) {
c.onData[track] = cb

Check warning on line 183 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L182-L183

Added lines #L182 - L183 were not covered by tests
}

// NTP returns the NTP timestamp (absolute timestamp) of a packet with given track and DTS.
func (c *Client) NTP(track *Track, dts time.Duration) (time.Time, bool) {
return c.primaryDownloader.ntp(track, dts)
}

func (c *Client) run() {
Expand All @@ -191,7 +196,7 @@
rp := &clientRoutinePool{}
rp.initialize()

dl := &clientPrimaryDownloader{
c.primaryDownloader = &clientPrimaryDownloader{
primaryPlaylistURL: c.playlistURL,
httpClient: c.HTTPClient,
onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist,
Expand All @@ -202,8 +207,8 @@
onTracks: c.OnTracks,
onData: c.onData,
}
dl.initialize()
rp.add(dl)
c.primaryDownloader.initialize()
rp.add(c.primaryDownloader)

select {
case err := <-rp.errorChan():
Expand Down
14 changes: 13 additions & 1 deletion client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/bluenviron/gohlslib/pkg/playlist"
)
Expand Down Expand Up @@ -107,7 +108,8 @@ type clientPrimaryDownloader struct {
onTracks ClientOnTracksFunc
onData map[*Track]interface{}

leadingTimeSync clientTimeSync
streamProcByTrack map[*Track]clientStreamProcessor
leadingTimeSync clientTimeSync

// in
chStreamTracks chan clientStreamProcessor
Expand All @@ -118,6 +120,7 @@ type clientPrimaryDownloader struct {
}

func (d *clientPrimaryDownloader) initialize() {
d.streamProcByTrack = make(map[*Track]clientStreamProcessor)
d.chStreamTracks = make(chan clientStreamProcessor)
d.startStreaming = make(chan struct{})
d.leadingTimeSyncReady = make(chan struct{})
Expand Down Expand Up @@ -227,6 +230,11 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
} else {
tracks = append(tracks, streamProc.getTracks()...)
}

for _, track := range streamProc.getTracks() {
d.streamProcByTrack[track] = streamProc
}

case <-ctx.Done():
return fmt.Errorf("terminated")
}
Expand Down Expand Up @@ -275,3 +283,7 @@ func (d *clientPrimaryDownloader) onGetLeadingTimeSync(ctx context.Context) (cli
}
return d.leadingTimeSync, true
}

func (d *clientPrimaryDownloader) ntp(track *Track, dts time.Duration) (time.Time, bool) {
return d.streamProcByTrack[track].ntp(dts)
}
14 changes: 10 additions & 4 deletions client_segment_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package gohlslib
import (
"context"
"sync"
"time"
)

type segmentData struct {
dateTime *time.Time
payload []byte
}

type clientSegmentQueue struct {
mutex sync.Mutex
queue [][]byte
queue []*segmentData
didPush chan struct{}
didPull chan struct{}
}
Expand All @@ -17,7 +23,7 @@ func (q *clientSegmentQueue) initialize() {
q.didPull = make(chan struct{})
}

func (q *clientSegmentQueue) push(seg []byte) {
func (q *clientSegmentQueue) push(seg *segmentData) {
q.mutex.Lock()

queueWasEmpty := (len(q.queue) == 0)
Expand Down Expand Up @@ -50,7 +56,7 @@ func (q *clientSegmentQueue) waitUntilSizeIsBelow(ctx context.Context, n int) bo
return true
}

func (q *clientSegmentQueue) pull(ctx context.Context) ([]byte, bool) {
func (q *clientSegmentQueue) pull(ctx context.Context) (*segmentData, bool) {
q.mutex.Lock()

for len(q.queue) == 0 {
Expand All @@ -66,7 +72,7 @@ func (q *clientSegmentQueue) pull(ctx context.Context) ([]byte, bool) {
q.mutex.Lock()
}

var seg []byte
var seg *segmentData
seg, q.queue = q.queue[0], q.queue[1:]

close(q.didPull)
Expand Down
6 changes: 5 additions & 1 deletion client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}
proc.initialize()
d.rp.add(proc)
}

Expand Down Expand Up @@ -224,7 +225,10 @@ func (d *clientStreamDownloader) fillSegmentQueue(
return err
}

segmentQueue.push(byts)
segmentQueue.push(&segmentData{
dateTime: seg.DateTime,
payload: byts,
})

if pl.Endlist && pl.Segments[len(pl.Segments)-1] == seg {
<-ctx.Done()
Expand Down
3 changes: 3 additions & 0 deletions client_stream_processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gohlslib

import "time"

type clientStreamProcessor interface {
getIsLeading() bool
getTracks() []*Track
ntp(dts time.Duration) (time.Time, bool)
}
Loading
Loading