Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support reading multiple audio tracks (#179) #200

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Features:
* Client

* Read streams in MPEG-TS, fMP4 or Low-latency format
* Read a single video track and/or a single audio track
* Read a single video track and/or multiple audio tracks
* Read tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 Audio (AAC)
* Get absolute timestamp of incoming data

Expand Down
4 changes: 0 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ type ClientOnDataMPEG4AudioFunc func(pts int64, aus [][]byte)
// ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus().
type ClientOnDataOpusFunc func(pts int64, packets [][]byte)

type clientOnStreamTracksFunc func(ctx context.Context, isLeading bool, tracks []*Track) ([]*clientTrack, bool)

type clientOnDataFunc func(pts int64, dts int64, data [][]byte)

func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
if err != nil {
Expand Down
178 changes: 62 additions & 116 deletions client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
}

func clientDownloadPlaylist(
func downloadPlaylist(
ctx context.Context,
httpClient *http.Client,
onRequest ClientOnRequestFunc,
Expand Down Expand Up @@ -94,34 +94,19 @@
return leadingPlaylist
}

func pickAudioPlaylist(alternatives []*playlist.MultivariantRendition, groupID string) *playlist.MultivariantRendition {
candidates := func() []*playlist.MultivariantRendition {
var ret []*playlist.MultivariantRendition
for _, alt := range alternatives {
if alt.GroupID == groupID {
ret = append(ret, alt)
}
}
return ret
}()
if candidates == nil {
return nil
}
func getRenditionsByGroup(
renditions []*playlist.MultivariantRendition,
groupID string,
) []*playlist.MultivariantRendition {
var ret []*playlist.MultivariantRendition

// pick the default audio playlist
for _, alt := range candidates {
if alt.Default {
return alt
for _, alt := range renditions {
if alt.GroupID == groupID {
ret = append(ret, alt)
}
}

// alternatively, pick the first one
return candidates[0]
}

type streamTracksEntry struct {
isLeading bool
tracks []*Track
return ret
}

type clientPrimaryDownloader struct {
Expand All @@ -139,34 +124,24 @@
getLeadingTimeConv func(ctx context.Context) (clientTimeConv, bool)

clientTracks map[*Track]*clientTrack

// in
chStreamTracks chan streamTracksEntry
chStreamEnded chan struct{}

// out
startStreaming chan struct{}
}

func (d *clientPrimaryDownloader) initialize() {
d.chStreamTracks = make(chan streamTracksEntry)
d.chStreamEnded = make(chan struct{})
d.startStreaming = make(chan struct{})
}

func (d *clientPrimaryDownloader) run(ctx context.Context) error {
d.onDownloadPrimaryPlaylist(d.primaryPlaylistURL.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.onRequest, d.primaryPlaylistURL)
pl, err := downloadPlaylist(ctx, d.httpClient, d.onRequest, d.primaryPlaylistURL)
if err != nil {
return err
}

streamCount := 0
var streams []*clientStreamDownloader

switch plt := pl.(type) {
case *playlist.Media:
ds := &clientStreamDownloader{
stream := &clientStreamDownloader{
isLeading: true,
httpClient: d.httpClient,
onRequest: d.onRequest,
Expand All @@ -177,13 +152,12 @@
playlistURL: d.primaryPlaylistURL,
firstPlaylist: plt,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
d.rp.add(ds)
streamCount++
stream.initialize()
d.rp.add(stream)
streams = append(streams, stream)

case *playlist.Multivariant:
leadingPlaylist := pickLeadingPlaylist(plt.Variants)
Expand All @@ -197,7 +171,7 @@
return err
}

ds := &clientStreamDownloader{
stream := &clientStreamDownloader{
isLeading: true,
httpClient: d.httpClient,
onRequest: d.onRequest,
Expand All @@ -208,44 +182,49 @@
playlistURL: u,
firstPlaylist: nil,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
d.rp.add(ds)
streamCount++
stream.initialize()
d.rp.add(stream)
streams = append(streams, stream)

if leadingPlaylist.Audio != "" {
audioPlaylist := pickAudioPlaylist(plt.Renditions, leadingPlaylist.Audio)
if audioPlaylist == nil {
return fmt.Errorf("audio playlist with id \"%s\" not found", leadingPlaylist.Audio)
audioPlaylists := getRenditionsByGroup(plt.Renditions, leadingPlaylist.Audio)
if audioPlaylists == nil {
return fmt.Errorf("no playlist with Group ID \"%s\" found", leadingPlaylist.Audio)

Check warning on line 195 in client_primary_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_primary_downloader.go#L195

Added line #L195 was not covered by tests
}

if audioPlaylist.URI != nil {
u, err = clientAbsoluteURL(d.primaryPlaylistURL, *audioPlaylist.URI)
if err != nil {
return err
for _, pl := range audioPlaylists {
// stream data already included in the leading playlist
if pl.URI == nil {
continue

Check warning on line 201 in client_primary_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_primary_downloader.go#L201

Added line #L201 was not covered by tests
}

ds := &clientStreamDownloader{
isLeading: false,
onRequest: d.onRequest,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: u,
firstPlaylist: nil,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
setStreamEnded: d.setStreamEnded,
if pl.URI != nil {
u, err = clientAbsoluteURL(d.primaryPlaylistURL, *pl.URI)
if err != nil {
return err
}

Check warning on line 208 in client_primary_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_primary_downloader.go#L207-L208

Added lines #L207 - L208 were not covered by tests

stream := &clientStreamDownloader{
isLeading: false,
onRequest: d.onRequest,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: u,
rendition: pl,
rp: d.rp,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
stream.initialize()
d.rp.add(stream)
streams = append(streams, stream)
}
d.rp.add(ds)
streamCount++
}
}

Expand All @@ -255,14 +234,10 @@

var tracks []*Track

for i := 0; i < streamCount; i++ {
for _, stream := range streams {
select {
case entry := <-d.chStreamTracks:
if entry.isLeading {
tracks = append(append([]*Track(nil), entry.tracks...), tracks...)
} else {
tracks = append(tracks, entry.tracks...)
}
case streamTracks := <-stream.chTracks:
tracks = append(tracks, streamTracks...)

case <-ctx.Done():
return fmt.Errorf("terminated")
Expand All @@ -278,50 +253,21 @@
return err
}

close(d.startStreaming)

for i := 0; i < streamCount; i++ {
for _, stream := range streams {
select {
case <-d.chStreamEnded:
case stream.chStartStreaming <- d.clientTracks:
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}

return ErrClientEOS
}

func (d *clientPrimaryDownloader) setStreamTracks(
ctx context.Context,
isLeading bool,
tracks []*Track,
) ([]*clientTrack, bool) {
select {
case d.chStreamTracks <- streamTracksEntry{
isLeading: isLeading,
tracks: tracks,
}:
case <-ctx.Done():
return nil, false
}

select {
case <-d.startStreaming:
case <-ctx.Done():
return nil, false
}

streamClientTracks := make([]*clientTrack, len(tracks))
for i, track := range tracks {
streamClientTracks[i] = d.clientTracks[track]
for _, stream := range streams {
select {
case <-stream.chEnded:
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}

return streamClientTracks, true
}

func (d *clientPrimaryDownloader) setStreamEnded(ctx context.Context) {
select {
case d.chStreamEnded <- struct{}{}:
case <-ctx.Done():
}
return ErrClientEOS
}
58 changes: 49 additions & 9 deletions client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,27 @@
onDownloadPart ClientOnDownloadPartFunc
onDecodeError ClientOnDecodeErrorFunc
playlistURL *url.URL
rendition *playlist.MultivariantRendition
firstPlaylist *playlist.Media
rp *clientRoutinePool
setStreamTracks clientOnStreamTracksFunc
setStreamEnded func(context.Context)
setLeadingTimeConv func(clientTimeConv)
getLeadingTimeConv func(context.Context) (clientTimeConv, bool)

segmentQueue *clientSegmentQueue
curSegmentID *int

// out
chTracks chan []*Track
chEnded chan struct{}

// in
chStartStreaming chan map[*Track]*clientTrack
}

func (d *clientStreamDownloader) initialize() {
d.chTracks = make(chan []*Track)
d.chEnded = make(chan struct{})
d.chStartStreaming = make(chan map[*Track]*clientTrack)
}

func (d *clientStreamDownloader) run(ctx context.Context) error {
Expand All @@ -82,7 +94,7 @@
d.segmentQueue.initialize()

if d.firstPlaylist.Map != nil && d.firstPlaylist.Map.URI != "" {
byts, err := d.downloadSegment(
initFile, err := d.downloadSegment(
ctx,
d.firstPlaylist.Map.URI,
d.firstPlaylist.Map.ByteRangeStart,
Expand All @@ -94,11 +106,12 @@
proc := &clientStreamProcessorFMP4{
ctx: ctx,
isLeading: d.isLeading,
initFile: byts,
rendition: d.rendition,
initFile: initFile,
segmentQueue: d.segmentQueue,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setTracks: d.setTracks,
setEnded: d.setEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
Expand All @@ -110,8 +123,8 @@
isLeading: d.isLeading,
segmentQueue: d.segmentQueue,
rp: d.rp,
setStreamTracks: d.setStreamTracks,
setStreamEnded: d.setStreamEnded,
setTracks: d.setTracks,
setEnded: d.setEnded,
setLeadingTimeConv: d.setLeadingTimeConv,
getLeadingTimeConv: d.getLeadingTimeConv,
}
Expand Down Expand Up @@ -190,7 +203,7 @@

d.onDownloadStreamPlaylist(ur.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.onRequest, ur)
pl, err := downloadPlaylist(ctx, d.httpClient, d.onRequest, ur)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -346,3 +359,30 @@

return nil
}

func (d *clientStreamDownloader) setTracks(ctx context.Context, tracks []*Track) ([]*clientTrack, bool) {
select {
case d.chTracks <- tracks:
case <-ctx.Done():
return nil, false

Check warning on line 367 in client_stream_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_stream_downloader.go#L366-L367

Added lines #L366 - L367 were not covered by tests
}

var allTracks map[*Track]*clientTrack

select {
case allTracks = <-d.chStartStreaming:
case <-ctx.Done():
return nil, false

Check warning on line 375 in client_stream_downloader.go

View check run for this annotation

Codecov / codecov/patch

client_stream_downloader.go#L374-L375

Added lines #L374 - L375 were not covered by tests
}

streamTracks := make([]*clientTrack, len(tracks))
for i, track := range tracks {
streamTracks[i] = allTracks[track]
}

return streamTracks, true
}

func (d *clientStreamDownloader) setEnded() {
close(d.chEnded)
}
Loading