Skip to content

Commit

Permalink
client: move track-related code into track processor (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Dec 29, 2023
1 parent 6a554dd commit 9eb4ef3
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 290 deletions.
5 changes: 3 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
)

const (
clientMPEGTSEntryQueueSize = 100
clientMPEGTSSampleQueueSize = 100
clientFMP4PartTrackQueueSize = 10
clientFMP4MaxPartTracksPerSegment = 200
clientLiveInitialDistance = 3
clientLiveMaxDistanceFromEnd = 5
Expand Down Expand Up @@ -190,7 +191,7 @@ func (c *Client) runInner() error {
rp := &clientRoutinePool{}
rp.initialize()

dl := &clientDownloaderPrimary{
dl := &clientPrimaryDownloader{
primaryPlaylistURL: c.playlistURL,
httpClient: c.HTTPClient,
onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist,
Expand Down
18 changes: 9 additions & 9 deletions client_downloader_primary.go → client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func pickAudioPlaylist(alternatives []*playlist.MultivariantRendition, groupID s
return candidates[0]
}

type clientDownloaderPrimary struct {
type clientPrimaryDownloader struct {
primaryPlaylistURL *url.URL
httpClient *http.Client
onDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc
Expand All @@ -117,13 +117,13 @@ type clientDownloaderPrimary struct {
leadingTimeSyncReady chan struct{}
}

func (d *clientDownloaderPrimary) initialize() {
func (d *clientPrimaryDownloader) initialize() {
d.chStreamTracks = make(chan clientStreamProcessor)
d.startStreaming = make(chan struct{})
d.leadingTimeSyncReady = make(chan struct{})
}

func (d *clientDownloaderPrimary) run(ctx context.Context) error {
func (d *clientPrimaryDownloader) run(ctx context.Context) error {
d.onDownloadPrimaryPlaylist(d.primaryPlaylistURL.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.primaryPlaylistURL)
Expand All @@ -135,7 +135,7 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {

switch plt := pl.(type) {
case *playlist.Media:
ds := &clientDownloaderStream{
ds := &clientStreamDownloader{
isLeading: true,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
Expand Down Expand Up @@ -163,7 +163,7 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
return err
}

ds := &clientDownloaderStream{
ds := &clientStreamDownloader{
isLeading: true,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
Expand Down Expand Up @@ -192,7 +192,7 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
return err
}

ds := &clientDownloaderStream{
ds := &clientStreamDownloader{
isLeading: false,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
Expand Down Expand Up @@ -246,7 +246,7 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
return nil
}

func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, streamProc clientStreamProcessor) bool {
func (d *clientPrimaryDownloader) onStreamTracks(ctx context.Context, streamProc clientStreamProcessor) bool {
select {
case d.chStreamTracks <- streamProc:
case <-ctx.Done():
Expand All @@ -262,12 +262,12 @@ func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, streamProc
return true
}

func (d *clientDownloaderPrimary) onSetLeadingTimeSync(ts clientTimeSync) {
func (d *clientPrimaryDownloader) onSetLeadingTimeSync(ts clientTimeSync) {
d.leadingTimeSync = ts
close(d.leadingTimeSyncReady)
}

func (d *clientDownloaderPrimary) onGetLeadingTimeSync(ctx context.Context) (clientTimeSync, bool) {
func (d *clientPrimaryDownloader) onGetLeadingTimeSync(ctx context.Context) (clientTimeSync, bool) {
select {
case <-d.leadingTimeSyncReady:
case <-ctx.Done():
Expand Down
10 changes: 5 additions & 5 deletions client_downloader_stream.go → client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func findSegmentWithID(seqNo int, segments []*playlist.MediaSegment, id int) (*p
return segments[index], index, len(segments) - index
}

type clientDownloaderStream struct {
type clientStreamDownloader struct {
isLeading bool
httpClient *http.Client
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
Expand All @@ -46,7 +46,7 @@ type clientDownloaderStream struct {
curSegmentID *int
}

func (d *clientDownloaderStream) run(ctx context.Context) error {
func (d *clientStreamDownloader) run(ctx context.Context) error {
initialPlaylist := d.initialPlaylist
d.initialPlaylist = nil
if initialPlaylist == nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
}
}

func (d *clientDownloaderStream) downloadPlaylist(ctx context.Context) (*playlist.Media, error) {
func (d *clientStreamDownloader) downloadPlaylist(ctx context.Context) (*playlist.Media, error) {
d.onDownloadStreamPlaylist(d.playlistURL.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.playlistURL)
Expand All @@ -140,7 +140,7 @@ func (d *clientDownloaderStream) downloadPlaylist(ctx context.Context) (*playlis
return plt, nil
}

func (d *clientDownloaderStream) downloadSegment(
func (d *clientStreamDownloader) downloadSegment(
ctx context.Context,
uri string,
start *uint64,
Expand Down Expand Up @@ -184,7 +184,7 @@ func (d *clientDownloaderStream) downloadSegment(
return byts, nil
}

func (d *clientDownloaderStream) fillSegmentQueue(
func (d *clientStreamDownloader) fillSegmentQueue(
ctx context.Context,
pl *playlist.Media,
segmentQueue *clientSegmentQueue,
Expand Down
135 changes: 19 additions & 116 deletions client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"

Expand Down Expand Up @@ -54,10 +53,10 @@ type clientStreamProcessorFMP4 struct {
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}

tracks []*Track
init fmp4.Init
leadingTrackID int
prePreProcessFuncs map[int]func(context.Context, *fmp4.PartTrack) error
tracks []*Track
init fmp4.Init
leadingTrackID int
trackProcessors map[int]*clientTrackProcessorFMP4

// in
chPartTrackProcessed chan struct{}
Expand Down Expand Up @@ -117,7 +116,7 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, byts []b
return err
}

if p.prePreProcessFuncs == nil {
if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, parts)
if err != nil {
return err
Expand All @@ -132,12 +131,12 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, byts []b
return fmt.Errorf("too many part tracks at once")
}

prePreProcess, ok := p.prePreProcessFuncs[partTrack.ID]
trackProc, ok := p.trackProcessors[partTrack.ID]
if !ok {
continue
}

err = prePreProcess(ctx, partTrack)
err := trackProc.push(ctx, partTrack)
if err != nil {
return err
}
Expand Down Expand Up @@ -197,119 +196,23 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors(
}
}

p.prePreProcessFuncs = make(map[int]func(context.Context, *fmp4.PartTrack) error)
p.trackProcessors = make(map[int]*clientTrackProcessorFMP4)

for i, track := range p.tracks {
onData := p.onData[track]

var postProcess func(pts time.Duration, dts time.Duration, sample *fmp4.PartSample) error

switch track.Codec.(type) {
case *codecs.AV1:
var onDataCasted ClientOnDataAV1Func = func(pts time.Duration, tu [][]byte) {}
if onData != nil {
onDataCasted = onData.(ClientOnDataAV1Func)
}

postProcess = func(pts time.Duration, dts time.Duration, sample *fmp4.PartSample) error {
tu, err := sample.GetAV1()
if err != nil {
return err
}

onDataCasted(pts, tu)
return nil
}

case *codecs.VP9:
var onDataCasted ClientOnDataVP9Func = func(pts time.Duration, frame []byte) {}
if onData != nil {
onDataCasted = onData.(ClientOnDataVP9Func)
}

postProcess = func(pts time.Duration, dts time.Duration, sample *fmp4.PartSample) error {
onDataCasted(pts, sample.Payload)
return nil
}

case *codecs.H265, *codecs.H264:
var onDataCasted ClientOnDataH26xFunc = func(pts time.Duration, dts time.Duration, au [][]byte) {}
if onData != nil {
onDataCasted = onData.(ClientOnDataH26xFunc)
}

postProcess = func(pts time.Duration, dts time.Duration, sample *fmp4.PartSample) error {
au, err := sample.GetH26x()
if err != nil {
return err
}

onDataCasted(pts, dts, au)
return nil
}

case *codecs.Opus:
var onDataCasted ClientOnDataOpusFunc = func(pts time.Duration, packets [][]byte) {}
if onData != nil {
onDataCasted = onData.(ClientOnDataOpusFunc)
}

postProcess = func(pts time.Duration, dts time.Duration, sample *fmp4.PartSample) error {
onDataCasted(pts, [][]byte{sample.Payload})
return nil
}

case *codecs.MPEG4Audio:
var onDataCasted ClientOnDataMPEG4AudioFunc = func(pts time.Duration, aus [][]byte) {}
if onData != nil {
onDataCasted = onData.(ClientOnDataMPEG4AudioFunc)
}

postProcess = func(pts time.Duration, dts time.Duration, sample *fmp4.PartSample) error {
onDataCasted(pts, [][]byte{sample.Payload})
return nil
}
}

timeScale := p.init.Tracks[i].TimeScale

preProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error {
rawDTS := partTrack.BaseTime

for _, sample := range partTrack.Samples {
pts, dts, err := timeSync.convertAndSync(ctx, timeScale, rawDTS, sample.PTSOffset)
if err != nil {
return err
}

rawDTS += uint64(sample.Duration)

// silently discard packets prior to the first packet of the leading track
if pts < 0 {
continue
}

err = postProcess(pts, dts, sample)
if err != nil {
return err
}
}

p.onPartTrackProcessed(ctx)
return nil
trackProc := &clientTrackProcessorFMP4{
track: track,
onData: p.onData[track],
timeScale: p.init.Tracks[i].TimeScale,
timeSync: timeSync,
onPartTrackProcessed: p.onPartTrackProcessed,
}
err := trackProc.initialize()
if err != nil {
return err
}

trackProc := &clientTrackProcessor{}
trackProc.initialize()
p.rp.add(trackProc)

prePreProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error {
return trackProc.push(ctx, func() error {
return preProcess(ctx, partTrack)
})
}

p.prePreProcessFuncs[p.init.Tracks[i].ID] = prePreProcess
p.trackProcessors[p.init.Tracks[i].ID] = trackProc
}

return nil
Expand Down
Loading

0 comments on commit 9eb4ef3

Please sign in to comment.