Skip to content

Commit

Permalink
replace new() with initialize()
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 25, 2023
1 parent b3d1cef commit b27a737
Show file tree
Hide file tree
Showing 20 changed files with 427 additions and 603 deletions.
28 changes: 15 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,21 @@ func (c *Client) run() {
}

func (c *Client) runInner() error {
rp := newClientRoutinePool()

dl := newClientDownloaderPrimary(
c.playlistURL,
c.HTTPClient,
c.OnDownloadPrimaryPlaylist,
c.OnDownloadStreamPlaylist,
c.OnDownloadSegment,
c.OnDecodeError,
rp,
c.OnTracks,
c.onData,
)
rp := &clientRoutinePool{}
rp.initialize()

dl := &clientDownloaderPrimary{
primaryPlaylistURL: c.playlistURL,
httpClient: c.HTTPClient,
onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist,
onDownloadStreamPlaylist: c.OnDownloadStreamPlaylist,
onDownloadSegment: c.OnDownloadSegment,
onDecodeError: c.OnDecodeError,
rp: rp,
onTracks: c.OnTracks,
onData: c.onData,
}
dl.initialize()
rp.add(dl)

select {
Expand Down
115 changes: 47 additions & 68 deletions client_downloader_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ type clientDownloaderPrimary struct {
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
onDownloadSegment ClientOnDownloadSegmentFunc
onDecodeError ClientOnDecodeErrorFunc
rp *clientRoutinePool
onTracks ClientOnTracksFunc
onData map[*Track]interface{}
rp *clientRoutinePool

leadingTimeSync clientTimeSync

Expand All @@ -119,31 +119,10 @@ type clientDownloaderPrimary struct {
leadingTimeSyncReady chan struct{}
}

func newClientDownloaderPrimary(
primaryPlaylistURL *url.URL,
httpClient *http.Client,
onDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc,
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc,
onDownloadSegment ClientOnDownloadSegmentFunc,
onDecodeError ClientOnDecodeErrorFunc,
rp *clientRoutinePool,
onTracks ClientOnTracksFunc,
onData map[*Track]interface{},
) *clientDownloaderPrimary {
return &clientDownloaderPrimary{
primaryPlaylistURL: primaryPlaylistURL,
httpClient: httpClient,
onDownloadPrimaryPlaylist: onDownloadPrimaryPlaylist,
onDownloadStreamPlaylist: onDownloadStreamPlaylist,
onDownloadSegment: onDownloadSegment,
onDecodeError: onDecodeError,
onTracks: onTracks,
onData: onData,
rp: rp,
streamTracks: make(chan []*Track),
startStreaming: make(chan struct{}),
leadingTimeSyncReady: make(chan struct{}),
}
func (d *clientDownloaderPrimary) initialize() {
d.streamTracks = make(chan []*Track)
d.startStreaming = make(chan struct{})
d.leadingTimeSyncReady = make(chan struct{})
}

func (d *clientDownloaderPrimary) run(ctx context.Context) error {
Expand All @@ -158,20 +137,20 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {

switch plt := pl.(type) {
case *playlist.Media:
ds := newClientDownloaderStream(
true,
d.httpClient,
d.onDownloadStreamPlaylist,
d.onDownloadSegment,
d.onDecodeError,
d.primaryPlaylistURL,
plt,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onData,
)
ds := &clientDownloaderStream{
isLeading: true,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDecodeError: d.onDecodeError,
playlistURL: d.primaryPlaylistURL,
initialPlaylist: plt,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}
d.rp.add(ds)
streamCount++

Expand All @@ -186,20 +165,20 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
return err
}

ds := newClientDownloaderStream(
true,
d.httpClient,
d.onDownloadStreamPlaylist,
d.onDownloadSegment,
d.onDecodeError,
u,
nil,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onData,
)
ds := &clientDownloaderStream{
isLeading: true,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDecodeError: d.onDecodeError,
playlistURL: u,
initialPlaylist: nil,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}

Check warning on line 181 in client_downloader_primary.go

View check run for this annotation

Codecov / codecov/patch

client_downloader_primary.go#L168-L181

Added lines #L168 - L181 were not covered by tests
d.rp.add(ds)
streamCount++

Expand All @@ -215,20 +194,20 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
return err
}

ds := newClientDownloaderStream(
false,
d.httpClient,
d.onDownloadStreamPlaylist,
d.onDownloadSegment,
d.onDecodeError,
u,
nil,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onData,
)
ds := &clientDownloaderStream{
isLeading: false,
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDecodeError: d.onDecodeError,
playlistURL: u,
initialPlaylist: nil,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}

Check warning on line 210 in client_downloader_primary.go

View check run for this annotation

Codecov / codecov/patch

client_downloader_primary.go#L197-L210

Added lines #L197 - L210 were not covered by tests
d.rp.add(ds)
streamCount++
}
Expand Down
76 changes: 24 additions & 52 deletions client_downloader_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,6 @@ type clientDownloaderStream struct {
curSegmentID *int
}

func newClientDownloaderStream(
isLeading bool,
httpClient *http.Client,
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc,
onDownloadSegment ClientOnDownloadSegmentFunc,
onDecodeError ClientOnDecodeErrorFunc,
playlistURL *url.URL,
initialPlaylist *playlist.Media,
rp *clientRoutinePool,
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onData map[*Track]interface{},
) *clientDownloaderStream {
return &clientDownloaderStream{
isLeading: isLeading,
httpClient: httpClient,
onDownloadStreamPlaylist: onDownloadStreamPlaylist,
onDownloadSegment: onDownloadSegment,
onDecodeError: onDecodeError,
playlistURL: playlistURL,
initialPlaylist: initialPlaylist,
rp: rp,
onStreamTracks: onStreamTracks,
onSetLeadingTimeSync: onSetLeadingTimeSync,
onGetLeadingTimeSync: onGetLeadingTimeSync,
onData: onData,
}
}

func (d *clientDownloaderStream) run(ctx context.Context) error {
initialPlaylist := d.initialPlaylist
d.initialPlaylist = nil
Expand All @@ -87,7 +57,8 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
}
}

segmentQueue := newClientSegmentQueue()
segmentQueue := &clientSegmentQueue{}
segmentQueue.initialize()

if initialPlaylist.Map != nil && initialPlaylist.Map.URI != "" {
byts, err := d.downloadSegment(
Expand All @@ -99,33 +70,34 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
return err
}

proc, err := newClientProcessorFMP4(
ctx,
d.isLeading,
byts,
segmentQueue,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onData,
)
proc := &clientProcessorFMP4{
ctx: ctx,
isLeading: d.isLeading,
initFile: byts,
segmentQueue: segmentQueue,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}
err = proc.initialize()
if err != nil {
return err
}

d.rp.add(proc)
} else {
proc := newClientProcessorMPEGTS(
d.onDecodeError,
d.isLeading,
segmentQueue,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onData,
)
proc := &clientProcessorMPEGTS{
onDecodeError: d.onDecodeError,
isLeading: d.isLeading,
segmentQueue: segmentQueue,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}
d.rp.add(proc)
}

Expand Down
46 changes: 19 additions & 27 deletions client_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ func fmp4PickLeadingTrack(init *fmp4.Init) int {
}

type clientProcessorFMP4 struct {
ctx context.Context
isLeading bool
initFile []byte
segmentQueue *clientSegmentQueue
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand All @@ -39,30 +42,12 @@ type clientProcessorFMP4 struct {
subpartProcessed chan struct{}
}

func newClientProcessorFMP4(
ctx context.Context,
isLeading bool,
initFile []byte,
segmentQueue *clientSegmentQueue,
rp *clientRoutinePool,
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onData map[*Track]interface{},
) (*clientProcessorFMP4, error) {
p := &clientProcessorFMP4{
isLeading: isLeading,
segmentQueue: segmentQueue,
rp: rp,
onSetLeadingTimeSync: onSetLeadingTimeSync,
onGetLeadingTimeSync: onGetLeadingTimeSync,
onData: onData,
subpartProcessed: make(chan struct{}, clientFMP4MaxPartTracksPerSegment),
}
func (p *clientProcessorFMP4) initialize() error {
p.subpartProcessed = make(chan struct{}, clientFMP4MaxPartTracksPerSegment)

err := p.init.Unmarshal(initFile)
err := p.init.Unmarshal(p.initFile)
if err != nil {
return nil, err
return err

Check warning on line 50 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L50

Added line #L50 was not covered by tests
}

p.leadingTrackID = fmp4PickLeadingTrack(&p.init)
Expand All @@ -74,12 +59,12 @@ func newClientProcessorFMP4(
}
}

ok := onStreamTracks(ctx, p.tracks)
ok := p.onStreamTracks(p.ctx, p.tracks)
if !ok {
return nil, fmt.Errorf("terminated")
return fmt.Errorf("terminated")

Check warning on line 64 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L64

Added line #L64 was not covered by tests
}

return p, nil
return nil
}

func (p *clientProcessorFMP4) run(ctx context.Context) error {
Expand Down Expand Up @@ -172,7 +157,13 @@ func (p *clientProcessorFMP4) initializeTrackProcs(ctx context.Context, track *f
}
return 0
}()
timeSync = newClientTimeSyncFMP4(timeScale, track.BaseTime)

timeSync = &clientTimeSyncFMP4{
timeScale: timeScale,
baseTime: track.BaseTime,
}
timeSync.initialize()

p.onSetLeadingTimeSync(timeSync)
} else {
rawTS, ok := p.onGetLeadingTimeSync(ctx)
Expand Down Expand Up @@ -288,7 +279,8 @@ func (p *clientProcessorFMP4) initializeTrackProcs(ctx context.Context, track *f
return nil
}

trackProc := newClientTrackProcessor()
trackProc := &clientTrackProcessor{}
trackProc.initialize()
p.rp.add(trackProc)

prePreProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error {
Expand Down
Loading

0 comments on commit b27a737

Please sign in to comment.