Skip to content

Commit

Permalink
muxer: isolate components
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 25, 2024
1 parent d2288ae commit 5010219
Show file tree
Hide file tree
Showing 8 changed files with 553 additions and 442 deletions.
293 changes: 233 additions & 60 deletions muxer.go

Large diffs are not rendered by default.

29 changes: 20 additions & 9 deletions muxer_part.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gohlslib

import (
"fmt"
"io"
"time"

Expand All @@ -9,20 +10,22 @@ import (
)

type muxerPart struct {
stream *muxerStream
segment *muxerSegmentFMP4
startDTS time.Duration
prefix string
id uint64
storage storage.Part
segmentMaxSize uint64
streamID string
streamTracks []*muxerTrack
segment *muxerSegmentFMP4
startDTS time.Duration
prefix string
id uint64
storage storage.Part

path string
isIndependent bool
endDTS time.Duration
}

func (p *muxerPart) initialize() {
p.path = partPath(p.prefix, p.stream.id, p.id)
p.path = partPath(p.prefix, p.streamID, p.id)
}

func (p *muxerPart) reader() (io.ReadCloser, error) {
Expand All @@ -38,7 +41,7 @@ func (p *muxerPart) finalize(endDTS time.Duration) error {
SequenceNumber: uint32(p.id),
}

for i, track := range p.stream.tracks {
for i, track := range p.streamTracks {
if track.fmp4Samples != nil {
part.Tracks = append(part.Tracks, &fmp4.PartTrack{
ID: 1 + i,
Expand All @@ -60,7 +63,13 @@ func (p *muxerPart) finalize(endDTS time.Duration) error {
return nil
}

func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample) {
func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample) error {
size := uint64(len(sample.Payload))
if (p.segment.size + size) > p.segmentMaxSize {
return fmt.Errorf("reached maximum segment size")
}

Check warning on line 70 in muxer_part.go

View check run for this annotation

Codecov / codecov/patch

muxer_part.go#L69-L70

Added lines #L69 - L70 were not covered by tests
p.segment.size += size

if track.fmp4Samples == nil {
track.fmp4StartDTS = sample.dts
}
Expand All @@ -70,4 +79,6 @@ func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample)
}

track.fmp4Samples = append(track.fmp4Samples, &sample.PartSample)

return nil
}
5 changes: 0 additions & 5 deletions muxer_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

type muxerSegment interface {
initialize() error
close()
finalize(time.Duration) error
getPath() string
Expand All @@ -21,10 +20,6 @@ type muxerGap struct {
duration time.Duration
}

func (muxerGap) initialize() error {
return nil
}

func (muxerGap) close() {
}

Expand Down
33 changes: 2 additions & 31 deletions muxer_segment_fmp4.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
package gohlslib

import (
"fmt"
"io"
"time"

"github.com/bluenviron/gohlslib/v2/pkg/storage"
)

type muxerSegmentFMP4 struct {
variant MuxerVariant
segmentMaxSize uint64
prefix string
nextPartID uint64
storageFactory storage.Factory
stream *muxerStream
streamID string
id uint64
startNTP time.Time
startDTS time.Duration
Expand All @@ -28,24 +24,14 @@ type muxerSegmentFMP4 struct {
}

func (s *muxerSegmentFMP4) initialize() error {
s.path = segmentPath(s.prefix, s.stream.id, s.id, true)
s.path = segmentPath(s.prefix, s.streamID, s.id, true)

var err error
s.storage, err = s.storageFactory.NewFile(s.path)
if err != nil {
return err
}

s.stream.nextPart = &muxerPart{
stream: s.stream,
segment: s,
startDTS: s.startDTS,
prefix: s.prefix,
id: s.nextPartID,
storage: s.storage.NewPart(),
}
s.stream.nextPart.initialize()

return nil
}

Expand Down Expand Up @@ -80,18 +66,3 @@ func (s *muxerSegmentFMP4) finalize(nextDTS time.Duration) error {

return nil
}

func (s *muxerSegmentFMP4) writeSample(
track *muxerTrack,
sample *fmp4AugmentedSample,
) error {
size := uint64(len(sample.Payload))
if (s.size + size) > s.segmentMaxSize {
return fmt.Errorf("reached maximum segment size")
}
s.size += size

s.stream.nextPart.writeSample(track, sample)

return nil
}
12 changes: 6 additions & 6 deletions muxer_segment_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"time"

"github.com/bluenviron/gohlslib/v2/pkg/storage"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)

type muxerSegmentMPEGTS struct {
segmentMaxSize uint64
prefix string
storageFactory storage.Factory
stream *muxerStream
streamID string
mpegtsWriter *mpegts.Writer
id uint64
startNTP time.Time
startDTS time.Duration
Expand All @@ -28,7 +30,7 @@ type muxerSegmentMPEGTS struct {
}

func (s *muxerSegmentMPEGTS) initialize() error {
s.path = segmentPath(s.prefix, s.stream.id, s.id, false)
s.path = segmentPath(s.prefix, s.streamID, s.id, false)

var err error
s.storage, err = s.storageFactory.NewFile(s.path)
Expand All @@ -39,8 +41,6 @@ func (s *muxerSegmentMPEGTS) initialize() error {
s.storagePart = s.storage.NewPart()
s.bw = bufio.NewWriter(s.storagePart.Writer())

s.stream.mpegtsSwitchableWriter.w = s.bw

return nil
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func (s *muxerSegmentMPEGTS) writeH264(
}
s.size += size

err := s.stream.mpegtsWriter.WriteH264(
err := s.mpegtsWriter.WriteH264(
track.mpegtsTrack,
multiplyAndDivide(pts, 90000, int64(track.ClockRate)),
multiplyAndDivide(dts, 90000, int64(track.ClockRate)),
Expand Down Expand Up @@ -127,7 +127,7 @@ func (s *muxerSegmentMPEGTS) writeMPEG4Audio(
}
s.size += size

err := s.stream.mpegtsWriter.WriteMPEG4Audio(
err := s.mpegtsWriter.WriteMPEG4Audio(
track.mpegtsTrack,
multiplyAndDivide(pts, 90000, int64(track.ClockRate)),
aus,
Expand Down
45 changes: 27 additions & 18 deletions muxer_segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,17 @@ type fmp4AugmentedSample struct {
ntp time.Time
}

type muxerSegmenterParent interface {
createFirstSegment(nextDTS time.Duration, nextNTP time.Time) error
rotateSegments(nextDTS time.Duration, nextNTP time.Time, force bool) error
rotateParts(nextDTS time.Duration) error
}

type muxerSegmenter struct {
muxer *Muxer // TODO: remove
variant MuxerVariant
segmentMinDuration time.Duration
partMinDuration time.Duration
parent muxerSegmenterParent

pendingParamsChange bool
fmp4SampleDurations map[time.Duration]struct{} // low-latency only
Expand All @@ -87,7 +96,7 @@ type muxerSegmenter struct {
}

func (s *muxerSegmenter) initialize() {
if s.muxer.Variant != MuxerVariantMPEGTS {
if s.variant != MuxerVariantMPEGTS {
s.fmp4SampleDurations = make(map[time.Duration]struct{})
}
}
Expand Down Expand Up @@ -349,17 +358,17 @@ func (s *muxerSegmenter) writeH264(
return fmt.Errorf("unable to extract DTS: %w", err)
}

if s.muxer.Variant == MuxerVariantMPEGTS {
if s.variant == MuxerVariantMPEGTS {
if track.stream.nextSegment == nil {
err = s.muxer.createFirstSegment(timestampToDuration(dts, track.ClockRate), ntp)
err = s.parent.createFirstSegment(timestampToDuration(dts, track.ClockRate), ntp)
if err != nil {
return err
}
} else if randomAccess && // switch segment
((timestampToDuration(dts, track.ClockRate)-
track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.muxer.SegmentMinDuration ||
track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.segmentMinDuration ||
paramsChanged) {
err = s.muxer.rotateSegments(timestampToDuration(dts, track.ClockRate), ntp, false)
err = s.parent.rotateSegments(timestampToDuration(dts, track.ClockRate), ntp, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,17 +444,17 @@ func (s *muxerSegmenter) writeMPEG4Audio(
pts int64,
aus [][]byte,
) error {
if s.muxer.Variant == MuxerVariantMPEGTS {
if s.variant == MuxerVariantMPEGTS {
if track.isLeading {
if track.stream.nextSegment == nil {
err := s.muxer.createFirstSegment(timestampToDuration(pts, track.ClockRate), ntp)
err := s.parent.createFirstSegment(timestampToDuration(pts, track.ClockRate), ntp)
if err != nil {
return err
}
} else if track.stream.nextSegment.(*muxerSegmentMPEGTS).audioAUCount >= mpegtsSegmentMinAUCount && // switch segment
(timestampToDuration(pts, track.ClockRate)-
track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.muxer.SegmentMinDuration {
err := s.muxer.rotateSegments(timestampToDuration(pts, track.ClockRate), ntp, false)
track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.segmentMinDuration {
err := s.parent.rotateSegments(timestampToDuration(pts, track.ClockRate), ntp, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -496,7 +505,7 @@ func (s *muxerSegmenter) writeMPEG4Audio(
// iPhone iOS fails if part durations are less than 85% of maximum part duration.
// find a part duration that is compatible with all sample durations
func (s *muxerSegmenter) fmp4AdjustPartDuration(sampleDuration time.Duration) {
if s.muxer.Variant != MuxerVariantLowLatency || s.fmp4FreezeAdjustedPartDuration {
if s.variant != MuxerVariantLowLatency || s.fmp4FreezeAdjustedPartDuration {
return
}

Expand All @@ -508,7 +517,7 @@ func (s *muxerSegmenter) fmp4AdjustPartDuration(sampleDuration time.Duration) {
if _, ok := s.fmp4SampleDurations[sampleDuration]; !ok {
s.fmp4SampleDurations[sampleDuration] = struct{}{}
s.fmp4AdjustedPartDuration = findCompatiblePartDuration(
s.muxer.PartMinDuration,
s.partMinDuration,
s.fmp4SampleDurations,
)
}
Expand Down Expand Up @@ -539,7 +548,7 @@ func (s *muxerSegmenter) fmp4WriteSample(
if track.isLeading {
// create first segment
if track.stream.nextSegment == nil {
err := s.muxer.createFirstSegment(timestampToDuration(sample.dts, track.ClockRate), sample.ntp)
err := s.parent.createFirstSegment(timestampToDuration(sample.dts, track.ClockRate), sample.ntp)
if err != nil {
return err
}
Expand All @@ -555,7 +564,7 @@ func (s *muxerSegmenter) fmp4WriteSample(
s.fmp4AdjustPartDuration(timestampToDuration(duration, track.ClockRate))
}

err := track.stream.nextSegment.(*muxerSegmentFMP4).writeSample(
err := track.stream.nextPart.writeSample(
track,
sample,
)
Expand All @@ -567,8 +576,8 @@ func (s *muxerSegmenter) fmp4WriteSample(
// switch segment
if randomAccess && (paramsChanged ||
(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate)-
track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.muxer.SegmentMinDuration) {
err = s.muxer.rotateSegments(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate),
track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.segmentMinDuration) {
err = s.parent.rotateSegments(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate),
track.fmp4NextSample.ntp, paramsChanged)
if err != nil {
return err
Expand All @@ -583,10 +592,10 @@ func (s *muxerSegmenter) fmp4WriteSample(
}

// switch part
} else if (s.muxer.Variant == MuxerVariantLowLatency) &&
} else if (s.variant == MuxerVariantLowLatency) &&
(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate)-
track.stream.nextPart.startDTS) >= s.fmp4AdjustedPartDuration {
err := s.muxer.rotateParts(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate))
err := s.parent.rotateParts(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 5010219

Please sign in to comment.