diff --git a/muxer.go b/muxer.go index a646410..592ee1d 100644 --- a/muxer.go +++ b/muxer.go @@ -7,25 +7,81 @@ import ( "io" "log" "net/http" + "net/url" "strconv" "sync" "time" "github.com/bluenviron/gohlslib/v2/pkg/codecs" + "github.com/bluenviron/gohlslib/v2/pkg/playlist" "github.com/bluenviron/gohlslib/v2/pkg/storage" ) const ( - fmp4StartDTS = 10 * time.Second - mpegtsSegmentMinAUCount = 100 + fmp4StartDTS = 10 * time.Second + mpegtsSegmentMinAUCount = 100 + multivariantPlaylistMaxAge = "30" + initMaxAge = "30" + segmentMaxAge = "3600" ) -type switchableWriter struct { - w io.Writer +func boolPtr(v bool) *bool { + return &v } -func (w *switchableWriter) Write(p []byte) (int, error) { - return w.w.Write(p) +func parseMSNPart(msn string, part string) (uint64, uint64, error) { + var msnint uint64 + if msn != "" { + var err error + msnint, err = strconv.ParseUint(msn, 10, 64) + if err != nil { + return 0, 0, err + } + } + + var partint uint64 + if part != "" { + var err error + partint, err = strconv.ParseUint(part, 10, 64) + if err != nil { + return 0, 0, err + } + } + + return msnint, partint, nil +} + +func bandwidth(segments []muxerSegment) (int, int) { + if len(segments) == 0 { + return 0, 0 + } + + var maxBandwidth uint64 + var sizes uint64 + var durations time.Duration + + for _, seg := range segments { + if _, ok := seg.(*muxerGap); !ok { + bandwidth := 8 * seg.getSize() * uint64(time.Second) / uint64(seg.getDuration()) + if bandwidth > maxBandwidth { + maxBandwidth = bandwidth + } + sizes += seg.getSize() + durations += seg.getDuration() + } + } + + averageBandwidth := 8 * sizes * uint64(time.Second) / uint64(durations) + + return int(maxBandwidth), int(averageBandwidth) +} + +func queryVal(q url.Values, key string) string { + vals, ok := q[key] + if ok && len(vals) >= 1 { + return vals[0] + } + return "" } func isVideo(codec codecs.Codec) bool { @@ -79,6 +135,14 @@ func fmp4TimeScale(c codecs.Codec) uint32 { return 90000 } +type switchableWriter struct { + w io.Writer +} + +func (w *switchableWriter) Write(p []byte) (int, error) { + return w.w.Write(p) +} + // MuxerOnEncodeErrorFunc is the prototype of Muxer.OnEncodeError. type MuxerOnEncodeErrorFunc func(err error) @@ -127,21 +191,19 @@ type Muxer struct { // private // - mutex sync.Mutex - cond *sync.Cond - mtracks []*muxerTrack - mtracksByTrack map[*Track]*muxerTrack - streams []*muxerStream - prefix string - storageFactory storage.Factory - segmenter *muxerSegmenter - server *muxerServer - closed bool - nextSegmentID uint64 - nextPartID uint64 // low-latency only - segmentDeleteCount int - targetDuration int - partTargetDuration time.Duration + mutex sync.Mutex + cond *sync.Cond + mtracks []*muxerTrack + mtracksByTrack map[*Track]*muxerTrack + streams []*muxerStream + leadingStream *muxerStream + prefix string + storageFactory storage.Factory + segmenter *muxerSegmenter + server *muxerServer + closed bool + nextSegmentID uint64 + nextPartID uint64 // low-latency only } // Start initializes the muxer. @@ -236,15 +298,18 @@ func (m *Muxer) Start() error { m.mtracksByTrack = make(map[*Track]*muxerTrack) m.segmenter = &muxerSegmenter{ - muxer: m, + variant: m.Variant, + segmentMinDuration: m.SegmentMinDuration, + partMinDuration: m.PartMinDuration, + parent: m, } m.segmenter.initialize() - m.server = &muxerServer{ - muxer: m, - } + m.server = &muxerServer{} m.server.initialize() + m.server.registerPath("index.m3u8", m.handleMultivariantPlaylist) + for i, track := range m.Tracks { mtrack := &muxerTrack{ Track: track, @@ -261,13 +326,35 @@ func (m *Muxer) Start() error { m.nextSegmentID = 7 } + var err error + m.prefix, err = generatePrefix() + if err != nil { + return err + } + + if m.Directory != "" { + m.storageFactory = storage.NewFactoryDisk(m.Directory) + } else { + m.storageFactory = storage.NewFactoryRAM() + } + switch { case m.Variant == MuxerVariantMPEGTS: stream := &muxerStream{ - muxer: m, - tracks: m.mtracks, - id: "main", - isLeading: true, + isLeading: true, + variant: m.Variant, + segmentMaxSize: m.SegmentMaxSize, + segmentCount: m.SegmentCount, + onEncodeError: m.OnEncodeError, + mutex: &m.mutex, + cond: m.cond, + prefix: m.prefix, + storageFactory: m.storageFactory, + server: m.server, + tracks: m.mtracks, + id: "main", + nextSegmentID: m.nextSegmentID, + nextPartID: m.nextPartID, } stream.initialize() m.streams = append(m.streams, stream) @@ -305,31 +392,38 @@ func (m *Muxer) Start() error { } stream := &muxerStream{ - muxer: m, - tracks: []*muxerTrack{track}, - id: id, - isLeading: track.isLeading, - isRendition: isRendition, - name: name, - language: track.Language, - isDefault: isDefault, + variant: m.Variant, + segmentMaxSize: m.SegmentMaxSize, + segmentCount: m.SegmentCount, + onEncodeError: m.OnEncodeError, + mutex: &m.mutex, + cond: m.cond, + prefix: m.prefix, + storageFactory: m.storageFactory, + server: m.server, + tracks: []*muxerTrack{track}, + id: id, + isLeading: track.isLeading, + isRendition: isRendition, + name: name, + language: track.Language, + isDefault: isDefault, + nextSegmentID: m.nextSegmentID, + nextPartID: m.nextPartID, } stream.initialize() m.streams = append(m.streams, stream) } } - var err error - m.prefix, err = generatePrefix() - if err != nil { - return err - } - - if m.Directory != "" { - m.storageFactory = storage.NewFactoryDisk(m.Directory) - } else { - m.storageFactory = storage.NewFactoryRAM() - } + m.leadingStream = func() *muxerStream { + for _, stream := range m.streams { + if stream.isLeading { + return stream + } + } + return nil + }() return nil } @@ -414,7 +508,7 @@ func (m *Muxer) Handle(w http.ResponseWriter, r *http.Request) { func (m *Muxer) createFirstSegment(nextDTS time.Duration, nextNTP time.Time) error { for _, stream := range m.streams { - err := stream.createFirstSegment(nextDTS, nextNTP) + err := stream.createFirstSegment(nextDTS, nextNTP, m.nextSegmentID, m.nextPartID) if err != nil { return err } @@ -425,7 +519,7 @@ func (m *Muxer) createFirstSegment(nextDTS time.Duration, nextNTP time.Time) err func (m *Muxer) rotateParts(nextDTS time.Duration) error { m.mutex.Lock() - err := m.rotatePartsInner(nextDTS, true) + err := m.rotatePartsInner(nextDTS) m.mutex.Unlock() if err != nil { @@ -437,13 +531,21 @@ func (m *Muxer) rotateParts(nextDTS time.Duration) error { return nil } -func (m *Muxer) rotatePartsInner(nextDTS time.Duration, createNew bool) error { +func (m *Muxer) rotatePartsInner(nextDTS time.Duration) error { m.nextPartID++ + err := m.leadingStream.rotateParts(nextDTS, true, m.nextPartID) + if err != nil { + return err + } + for _, stream := range m.streams { - err := stream.rotateParts(nextDTS, createNew) - if err != nil { - return err + if !stream.isLeading { + err := stream.rotateParts(nextDTS, true, m.nextPartID) + if err != nil { + return err + } + stream.partTargetDuration = m.leadingStream.partTargetDuration } } @@ -474,20 +576,91 @@ func (m *Muxer) rotateSegmentsInner( force bool, ) error { if m.Variant != MuxerVariantMPEGTS { - err := m.rotatePartsInner(nextDTS, false) + m.nextPartID++ + } + m.nextSegmentID++ + + err := m.leadingStream.rotateSegments(nextDTS, nextNTP, force, m.nextSegmentID, m.nextPartID) + if err != nil { + return err + } + + for _, stream := range m.streams { + if !stream.isLeading { + err := stream.rotateSegments(nextDTS, nextNTP, force, m.nextSegmentID, m.nextPartID) + if err != nil { + return err + } + stream.targetDuration = m.leadingStream.targetDuration + stream.partTargetDuration = m.leadingStream.partTargetDuration + } + } + + return nil +} + +func (m *Muxer) handleMultivariantPlaylist(w http.ResponseWriter, r *http.Request) { + buf := func() []byte { + m.mutex.Lock() + defer m.mutex.Unlock() + + for { + if m.closed { + return nil + } + + if m.streams[0].hasContent() { + break + } + + m.cond.Wait() + } + + buf, err := m.generateMultivariantPlaylist(r.URL.RawQuery) if err != nil { - return err + return nil } + + return buf + }() + + if buf == nil { + w.WriteHeader(http.StatusInternalServerError) + return } - m.nextSegmentID++ + // allow caching but use a small period in order to + // allow a stream to change tracks or bitrate + w.Header().Set("Cache-Control", "max-age="+multivariantPlaylistMaxAge) + w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + w.WriteHeader(http.StatusOK) + w.Write(buf) +} + +func (m *Muxer) generateMultivariantPlaylist(rawQuery string) ([]byte, error) { + // TODO: consider segments in all streams + maxBandwidth, averageBandwidth := bandwidth(m.streams[0].segments) + + pl := &playlist.Multivariant{ + Version: func() int { + if m.Variant == MuxerVariantMPEGTS { + return 3 + } + return 9 + }(), + IndependentSegments: true, + Variants: []*playlist.MultivariantVariant{{ + Bandwidth: maxBandwidth, + AverageBandwidth: &averageBandwidth, + }}, + } for _, stream := range m.streams { - err := stream.rotateSegments(nextDTS, nextNTP, force) + err := stream.populateMultivariantPlaylist(pl, rawQuery) if err != nil { - return err + return nil, err } } - return nil + return pl.Marshal() } diff --git a/muxer_part.go b/muxer_part.go index 7f9d84c..28feef7 100644 --- a/muxer_part.go +++ b/muxer_part.go @@ -1,6 +1,7 @@ package gohlslib import ( + "fmt" "io" "time" @@ -9,12 +10,14 @@ import ( ) type muxerPart struct { - stream *muxerStream - segment *muxerSegmentFMP4 - startDTS time.Duration - prefix string - id uint64 - storage storage.Part + segmentMaxSize uint64 + streamID string + streamTracks []*muxerTrack + segment *muxerSegmentFMP4 + startDTS time.Duration + prefix string + id uint64 + storage storage.Part path string isIndependent bool @@ -22,7 +25,7 @@ type muxerPart struct { } func (p *muxerPart) initialize() { - p.path = partPath(p.prefix, p.stream.id, p.id) + p.path = partPath(p.prefix, p.streamID, p.id) } func (p *muxerPart) reader() (io.ReadCloser, error) { @@ -38,7 +41,7 @@ func (p *muxerPart) finalize(endDTS time.Duration) error { SequenceNumber: uint32(p.id), } - for i, track := range p.stream.tracks { + for i, track := range p.streamTracks { if track.fmp4Samples != nil { part.Tracks = append(part.Tracks, &fmp4.PartTrack{ ID: 1 + i, @@ -60,7 +63,13 @@ func (p *muxerPart) finalize(endDTS time.Duration) error { return nil } -func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample) { +func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample) error { + size := uint64(len(sample.Payload)) + if (p.segment.size + size) > p.segmentMaxSize { + return fmt.Errorf("reached maximum segment size") + } + p.segment.size += size + if track.fmp4Samples == nil { track.fmp4StartDTS = sample.dts } @@ -70,4 +79,6 @@ func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample) } track.fmp4Samples = append(track.fmp4Samples, &sample.PartSample) + + return nil } diff --git a/muxer_segment.go b/muxer_segment.go index 26d05b8..eb26d16 100644 --- a/muxer_segment.go +++ b/muxer_segment.go @@ -7,7 +7,6 @@ import ( ) type muxerSegment interface { - initialize() error close() finalize(time.Duration) error getPath() string @@ -21,10 +20,6 @@ type muxerGap struct { duration time.Duration } -func (muxerGap) initialize() error { - return nil -} - func (muxerGap) close() { } diff --git a/muxer_segment_fmp4.go b/muxer_segment_fmp4.go index cde5ca9..9b5ddf5 100644 --- a/muxer_segment_fmp4.go +++ b/muxer_segment_fmp4.go @@ -1,7 +1,6 @@ package gohlslib import ( - "fmt" "io" "time" @@ -9,12 +8,9 @@ import ( ) type muxerSegmentFMP4 struct { - variant MuxerVariant - segmentMaxSize uint64 prefix string - nextPartID uint64 storageFactory storage.Factory - stream *muxerStream + streamID string id uint64 startNTP time.Time startDTS time.Duration @@ -28,7 +24,7 @@ type muxerSegmentFMP4 struct { } func (s *muxerSegmentFMP4) initialize() error { - s.path = segmentPath(s.prefix, s.stream.id, s.id, true) + s.path = segmentPath(s.prefix, s.streamID, s.id, true) var err error s.storage, err = s.storageFactory.NewFile(s.path) @@ -36,16 +32,6 @@ func (s *muxerSegmentFMP4) initialize() error { return err } - s.stream.nextPart = &muxerPart{ - stream: s.stream, - segment: s, - startDTS: s.startDTS, - prefix: s.prefix, - id: s.nextPartID, - storage: s.storage.NewPart(), - } - s.stream.nextPart.initialize() - return nil } @@ -80,18 +66,3 @@ func (s *muxerSegmentFMP4) finalize(nextDTS time.Duration) error { return nil } - -func (s *muxerSegmentFMP4) writeSample( - track *muxerTrack, - sample *fmp4AugmentedSample, -) error { - size := uint64(len(sample.Payload)) - if (s.size + size) > s.segmentMaxSize { - return fmt.Errorf("reached maximum segment size") - } - s.size += size - - s.stream.nextPart.writeSample(track, sample) - - return nil -} diff --git a/muxer_segment_mpegts.go b/muxer_segment_mpegts.go index 0a3cfa3..fa34365 100644 --- a/muxer_segment_mpegts.go +++ b/muxer_segment_mpegts.go @@ -7,13 +7,15 @@ import ( "time" "github.com/bluenviron/gohlslib/v2/pkg/storage" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) type muxerSegmentMPEGTS struct { segmentMaxSize uint64 prefix string storageFactory storage.Factory - stream *muxerStream + streamID string + mpegtsWriter *mpegts.Writer id uint64 startNTP time.Time startDTS time.Duration @@ -28,7 +30,7 @@ type muxerSegmentMPEGTS struct { } func (s *muxerSegmentMPEGTS) initialize() error { - s.path = segmentPath(s.prefix, s.stream.id, s.id, false) + s.path = segmentPath(s.prefix, s.streamID, s.id, false) var err error s.storage, err = s.storageFactory.NewFile(s.path) @@ -39,8 +41,6 @@ func (s *muxerSegmentMPEGTS) initialize() error { s.storagePart = s.storage.NewPart() s.bw = bufio.NewWriter(s.storagePart.Writer()) - s.stream.mpegtsSwitchableWriter.w = s.bw - return nil } @@ -96,7 +96,7 @@ func (s *muxerSegmentMPEGTS) writeH264( } s.size += size - err := s.stream.mpegtsWriter.WriteH264( + err := s.mpegtsWriter.WriteH264( track.mpegtsTrack, multiplyAndDivide(pts, 90000, int64(track.ClockRate)), multiplyAndDivide(dts, 90000, int64(track.ClockRate)), @@ -127,7 +127,7 @@ func (s *muxerSegmentMPEGTS) writeMPEG4Audio( } s.size += size - err := s.stream.mpegtsWriter.WriteMPEG4Audio( + err := s.mpegtsWriter.WriteMPEG4Audio( track.mpegtsTrack, multiplyAndDivide(pts, 90000, int64(track.ClockRate)), aus, diff --git a/muxer_segmenter.go b/muxer_segmenter.go index fc54aaf..8591020 100644 --- a/muxer_segmenter.go +++ b/muxer_segmenter.go @@ -77,8 +77,17 @@ type fmp4AugmentedSample struct { ntp time.Time } +type muxerSegmenterParent interface { + createFirstSegment(nextDTS time.Duration, nextNTP time.Time) error + rotateSegments(nextDTS time.Duration, nextNTP time.Time, force bool) error + rotateParts(nextDTS time.Duration) error +} + type muxerSegmenter struct { - muxer *Muxer // TODO: remove + variant MuxerVariant + segmentMinDuration time.Duration + partMinDuration time.Duration + parent muxerSegmenterParent pendingParamsChange bool fmp4SampleDurations map[time.Duration]struct{} // low-latency only @@ -87,7 +96,7 @@ type muxerSegmenter struct { } func (s *muxerSegmenter) initialize() { - if s.muxer.Variant != MuxerVariantMPEGTS { + if s.variant != MuxerVariantMPEGTS { s.fmp4SampleDurations = make(map[time.Duration]struct{}) } } @@ -349,17 +358,17 @@ func (s *muxerSegmenter) writeH264( return fmt.Errorf("unable to extract DTS: %w", err) } - if s.muxer.Variant == MuxerVariantMPEGTS { + if s.variant == MuxerVariantMPEGTS { if track.stream.nextSegment == nil { - err = s.muxer.createFirstSegment(timestampToDuration(dts, track.ClockRate), ntp) + err = s.parent.createFirstSegment(timestampToDuration(dts, track.ClockRate), ntp) if err != nil { return err } } else if randomAccess && // switch segment ((timestampToDuration(dts, track.ClockRate)- - track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.muxer.SegmentMinDuration || + track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.segmentMinDuration || paramsChanged) { - err = s.muxer.rotateSegments(timestampToDuration(dts, track.ClockRate), ntp, false) + err = s.parent.rotateSegments(timestampToDuration(dts, track.ClockRate), ntp, false) if err != nil { return err } @@ -435,17 +444,17 @@ func (s *muxerSegmenter) writeMPEG4Audio( pts int64, aus [][]byte, ) error { - if s.muxer.Variant == MuxerVariantMPEGTS { + if s.variant == MuxerVariantMPEGTS { if track.isLeading { if track.stream.nextSegment == nil { - err := s.muxer.createFirstSegment(timestampToDuration(pts, track.ClockRate), ntp) + err := s.parent.createFirstSegment(timestampToDuration(pts, track.ClockRate), ntp) if err != nil { return err } } else if track.stream.nextSegment.(*muxerSegmentMPEGTS).audioAUCount >= mpegtsSegmentMinAUCount && // switch segment (timestampToDuration(pts, track.ClockRate)- - track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.muxer.SegmentMinDuration { - err := s.muxer.rotateSegments(timestampToDuration(pts, track.ClockRate), ntp, false) + track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.segmentMinDuration { + err := s.parent.rotateSegments(timestampToDuration(pts, track.ClockRate), ntp, false) if err != nil { return err } @@ -496,7 +505,7 @@ func (s *muxerSegmenter) writeMPEG4Audio( // iPhone iOS fails if part durations are less than 85% of maximum part duration. // find a part duration that is compatible with all sample durations func (s *muxerSegmenter) fmp4AdjustPartDuration(sampleDuration time.Duration) { - if s.muxer.Variant != MuxerVariantLowLatency || s.fmp4FreezeAdjustedPartDuration { + if s.variant != MuxerVariantLowLatency || s.fmp4FreezeAdjustedPartDuration { return } @@ -508,7 +517,7 @@ func (s *muxerSegmenter) fmp4AdjustPartDuration(sampleDuration time.Duration) { if _, ok := s.fmp4SampleDurations[sampleDuration]; !ok { s.fmp4SampleDurations[sampleDuration] = struct{}{} s.fmp4AdjustedPartDuration = findCompatiblePartDuration( - s.muxer.PartMinDuration, + s.partMinDuration, s.fmp4SampleDurations, ) } @@ -539,7 +548,7 @@ func (s *muxerSegmenter) fmp4WriteSample( if track.isLeading { // create first segment if track.stream.nextSegment == nil { - err := s.muxer.createFirstSegment(timestampToDuration(sample.dts, track.ClockRate), sample.ntp) + err := s.parent.createFirstSegment(timestampToDuration(sample.dts, track.ClockRate), sample.ntp) if err != nil { return err } @@ -555,7 +564,7 @@ func (s *muxerSegmenter) fmp4WriteSample( s.fmp4AdjustPartDuration(timestampToDuration(duration, track.ClockRate)) } - err := track.stream.nextSegment.(*muxerSegmentFMP4).writeSample( + err := track.stream.nextPart.writeSample( track, sample, ) @@ -567,8 +576,8 @@ func (s *muxerSegmenter) fmp4WriteSample( // switch segment if randomAccess && (paramsChanged || (timestampToDuration(track.fmp4NextSample.dts, track.ClockRate)- - track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.muxer.SegmentMinDuration) { - err = s.muxer.rotateSegments(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate), + track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.segmentMinDuration) { + err = s.parent.rotateSegments(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate), track.fmp4NextSample.ntp, paramsChanged) if err != nil { return err @@ -583,10 +592,10 @@ func (s *muxerSegmenter) fmp4WriteSample( } // switch part - } else if (s.muxer.Variant == MuxerVariantLowLatency) && + } else if (s.variant == MuxerVariantLowLatency) && (timestampToDuration(track.fmp4NextSample.dts, track.ClockRate)- track.stream.nextPart.startDTS) >= s.fmp4AdjustedPartDuration { - err := s.muxer.rotateParts(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate)) + err := s.parent.rotateParts(timestampToDuration(track.fmp4NextSample.dts, track.ClockRate)) if err != nil { return err } diff --git a/muxer_server.go b/muxer_server.go index d115b76..285df2c 100644 --- a/muxer_server.go +++ b/muxer_server.go @@ -2,165 +2,45 @@ package gohlslib import ( "net/http" - "net/url" "path/filepath" - "strconv" - "time" - - "github.com/bluenviron/gohlslib/v2/pkg/playlist" -) - -const ( - multivariantPlaylistMaxAge = "30" - initMaxAge = "30" - segmentMaxAge = "3600" + "sync" ) -func boolPtr(v bool) *bool { - return &v -} - -func parseMSNPart(msn string, part string) (uint64, uint64, error) { - var msnint uint64 - if msn != "" { - var err error - msnint, err = strconv.ParseUint(msn, 10, 64) - if err != nil { - return 0, 0, err - } - } - - var partint uint64 - if part != "" { - var err error - partint, err = strconv.ParseUint(part, 10, 64) - if err != nil { - return 0, 0, err - } - } - - return msnint, partint, nil +type muxerServer struct { + mutex sync.RWMutex + pathHandlers map[string]http.HandlerFunc } -func bandwidth(segments []muxerSegment) (int, int) { - if len(segments) == 0 { - return 0, 0 - } - - var maxBandwidth uint64 - var sizes uint64 - var durations time.Duration - - for _, seg := range segments { - if _, ok := seg.(*muxerGap); !ok { - bandwidth := 8 * seg.getSize() * uint64(time.Second) / uint64(seg.getDuration()) - if bandwidth > maxBandwidth { - maxBandwidth = bandwidth - } - sizes += seg.getSize() - durations += seg.getDuration() - } - } - - averageBandwidth := 8 * sizes * uint64(time.Second) / uint64(durations) - - return int(maxBandwidth), int(averageBandwidth) +func (s *muxerServer) initialize() { + s.pathHandlers = make(map[string]http.HandlerFunc) } -func queryVal(q url.Values, key string) string { - vals, ok := q[key] - if ok && len(vals) >= 1 { - return vals[0] - } - return "" +func (s *muxerServer) registerPath(path string, cb http.HandlerFunc) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.pathHandlers[path] = cb } -type muxerServer struct { - muxer *Muxer // TODO: remove - - pathHandlers map[string]http.HandlerFunc +func (s *muxerServer) unregisterPath(path string) { + s.mutex.Lock() + defer s.mutex.Unlock() + delete(s.pathHandlers, path) } -func (s *muxerServer) initialize() { - s.pathHandlers = make(map[string]http.HandlerFunc) - - s.pathHandlers["index.m3u8"] = s.handleMultivariantPlaylist +func (s *muxerServer) getPathHandler(path string) http.HandlerFunc { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.pathHandlers[path] } func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) { path := filepath.Base(r.URL.Path) - s.muxer.mutex.Lock() + s.mutex.RLock() handler, ok := s.pathHandlers[path] - s.muxer.mutex.Unlock() + s.mutex.RUnlock() + if ok { handler(w, r) - return - } -} - -func (s *muxerServer) handleMultivariantPlaylist(w http.ResponseWriter, r *http.Request) { - buf := func() []byte { - s.muxer.mutex.Lock() - defer s.muxer.mutex.Unlock() - - for { - if s.muxer.closed { - return nil - } - - if s.muxer.streams[0].hasContent() { - break - } - - s.muxer.cond.Wait() - } - - buf, err := s.generateMultivariantPlaylist(r.URL.RawQuery) - if err != nil { - return nil - } - - return buf - }() - - if buf == nil { - w.WriteHeader(http.StatusInternalServerError) - return } - - // allow caching but use a small period in order to - // allow a stream to change tracks or bitrate - w.Header().Set("Cache-Control", "max-age="+multivariantPlaylistMaxAge) - w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - w.WriteHeader(http.StatusOK) - w.Write(buf) -} - -func (s *muxerServer) generateMultivariantPlaylist(rawQuery string) ([]byte, error) { - // TODO: consider segments in all streams - maxBandwidth, averageBandwidth := bandwidth(s.muxer.streams[0].segments) - - pl := &playlist.Multivariant{ - Version: func() int { - if s.muxer.Variant == MuxerVariantMPEGTS { - return 3 - } - return 9 - }(), - IndependentSegments: true, - Variants: []*playlist.MultivariantVariant{{ - Bandwidth: maxBandwidth, - AverageBandwidth: &averageBandwidth, - }}, - } - - for _, stream := range s.muxer.streams { - err := stream.populateMultivariantPlaylist(pl, rawQuery) - if err != nil { - return nil, err - } - } - - return pl.Marshal() } diff --git a/muxer_stream.go b/muxer_stream.go index 75966df..b9cfd28 100644 --- a/muxer_stream.go +++ b/muxer_stream.go @@ -8,11 +8,13 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/bluenviron/gohlslib/v2/pkg/codecparams" "github.com/bluenviron/gohlslib/v2/pkg/codecs" "github.com/bluenviron/gohlslib/v2/pkg/playlist" + "github.com/bluenviron/gohlslib/v2/pkg/storage" "github.com/bluenviron/mediacommon/pkg/codecs/av1" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" @@ -93,23 +95,36 @@ type generateMediaPlaylistFunc func( ) ([]byte, error) type muxerStream struct { - muxer *Muxer // TODO: remove - tracks []*muxerTrack - id string - isLeading bool - isRendition bool - name string - language string - isDefault bool - - generateMediaPlaylist generateMediaPlaylistFunc - + variant MuxerVariant + segmentMaxSize uint64 + segmentCount int + onEncodeError MuxerOnEncodeErrorFunc + mutex *sync.Mutex + cond *sync.Cond + prefix string + storageFactory storage.Factory + server *muxerServer + tracks []*muxerTrack + id string + isLeading bool + isRendition bool + name string + language string + isDefault bool + nextSegmentID uint64 + nextPartID uint64 + + generateMediaPlaylist generateMediaPlaylistFunc mpegtsSwitchableWriter *switchableWriter // mpegts only mpegtsWriter *mpegts.Writer // mpegts only segments []muxerSegment nextSegment muxerSegment nextPart *muxerPart // low-latency only initFilePresent bool // fmp4 only + segmentDeleteCount int + closed bool + targetDuration int + partTargetDuration time.Duration } func (s *muxerStream) initialize() { @@ -117,7 +132,7 @@ func (s *muxerStream) initialize() { track.stream = s } - if s.muxer.Variant == MuxerVariantMPEGTS { + if s.variant == MuxerVariantMPEGTS { s.generateMediaPlaylist = s.generateMediaPlaylistMPEGTS tracks := make([]*mpegts.Track, len(s.tracks)) @@ -130,10 +145,12 @@ func (s *muxerStream) initialize() { s.generateMediaPlaylist = s.generateMediaPlaylistFMP4 } - s.muxer.server.pathHandlers[mediaPlaylistPath(s.id)] = s.handleMediaPlaylist + s.server.registerPath(mediaPlaylistPath(s.id), s.handleMediaPlaylist) } func (s *muxerStream) close() { + s.closed = true + for _, segment := range s.segments { segment.close() } @@ -244,14 +261,14 @@ func (s *muxerStream) populateMultivariantPlaylist( } func (s *muxerStream) hasContent() bool { - if s.muxer.Variant == MuxerVariantFMP4 { + if s.variant == MuxerVariantFMP4 { return len(s.segments) >= 2 } return len(s.segments) >= 1 } func (s *muxerStream) hasPart(segmentID uint64, partID uint64) bool { - if segmentID == s.muxer.nextSegmentID { + if segmentID == s.nextSegmentID { if partID < uint64(len(s.nextSegment.(*muxerSegmentFMP4).parts)) { return true } @@ -283,7 +300,7 @@ func (s *muxerStream) handleMediaPlaylist(w http.ResponseWriter, r *http.Request isDeltaUpdate := false - if s.muxer.Variant == MuxerVariantLowLatency { + if s.variant == MuxerVariantLowLatency { isDeltaUpdate = skip == "YES" || skip == "v2" msnint, partint, err := parseMSNPart(msn, part) @@ -295,11 +312,11 @@ func (s *muxerStream) handleMediaPlaylist(w http.ResponseWriter, r *http.Request switch { case msn != "": byts := func() []byte { - s.muxer.mutex.Lock() - defer s.muxer.mutex.Unlock() + s.mutex.Lock() + defer s.mutex.Unlock() for { - if s.muxer.closed { + if s.closed { w.WriteHeader(http.StatusInternalServerError) return nil } @@ -309,7 +326,7 @@ func (s *muxerStream) handleMediaPlaylist(w http.ResponseWriter, r *http.Request // exceeds the last Partial Segment in the current Playlist by the // Advance Part Limit, then the server SHOULD immediately return Bad // Request, such as HTTP 400. - if msnint > (s.muxer.nextSegmentID+1) || msnint < (s.muxer.nextSegmentID-uint64(len(s.segments)-1)) { + if msnint > (s.nextSegmentID+1) || msnint < (s.nextSegmentID-uint64(len(s.segments)-1)) { w.WriteHeader(http.StatusBadRequest) return nil } @@ -318,7 +335,7 @@ func (s *muxerStream) handleMediaPlaylist(w http.ResponseWriter, r *http.Request break } - s.muxer.cond.Wait() + s.cond.Wait() } byts, err := s.generateMediaPlaylist( @@ -348,11 +365,11 @@ func (s *muxerStream) handleMediaPlaylist(w http.ResponseWriter, r *http.Request } byts := func() []byte { - s.muxer.mutex.Lock() - defer s.muxer.mutex.Unlock() + s.mutex.Lock() + defer s.mutex.Unlock() for { - if s.muxer.closed { + if s.closed { w.WriteHeader(http.StatusInternalServerError) return nil } @@ -361,7 +378,7 @@ func (s *muxerStream) handleMediaPlaylist(w http.ResponseWriter, r *http.Request break } - s.muxer.cond.Wait() + s.cond.Wait() } byts, err := s.generateMediaPlaylist( @@ -391,8 +408,8 @@ func (s *muxerStream) generateMediaPlaylistMPEGTS( pl := &playlist.Media{ Version: 3, AllowCache: boolPtr(false), - TargetDuration: s.muxer.targetDuration, - MediaSequence: s.muxer.segmentDeleteCount, + TargetDuration: s.targetDuration, + MediaSequence: s.segmentDeleteCount, } for _, s := range s.segments { @@ -417,17 +434,17 @@ func (s *muxerStream) generateMediaPlaylistFMP4( isDeltaUpdate bool, rawQuery string, ) ([]byte, error) { - skipBoundary := time.Duration(s.muxer.targetDuration) * 6 * time.Second + skipBoundary := time.Duration(s.targetDuration) * 6 * time.Second rawQuery = filterOutHLSParams(rawQuery) pl := &playlist.Media{ Version: 10, - TargetDuration: s.muxer.targetDuration, - MediaSequence: s.muxer.segmentDeleteCount, + TargetDuration: s.targetDuration, + MediaSequence: s.segmentDeleteCount, } - if s.muxer.Variant == MuxerVariantLowLatency { - partHoldBack := (s.muxer.partTargetDuration * 25) / 10 + if s.variant == MuxerVariantLowLatency { + partHoldBack := (s.partTargetDuration * 25) / 10 pl.ServerControl = &playlist.MediaServerControl{ CanBlockReload: true, @@ -436,14 +453,14 @@ func (s *muxerStream) generateMediaPlaylistFMP4( } pl.PartInf = &playlist.MediaPartInf{ - PartTarget: s.muxer.partTargetDuration, + PartTarget: s.partTargetDuration, } } skipped := 0 if !isDeltaUpdate { - uri := initFilePath(s.muxer.prefix, s.id) + uri := initFilePath(s.prefix, s.id) if rawQuery != "" { uri += "?" + rawQuery } @@ -489,7 +506,7 @@ func (s *muxerStream) generateMediaPlaylistFMP4( plse.DateTime = &seg.startNTP } - if s.muxer.Variant == MuxerVariantLowLatency && (len(s.segments)-i) <= 2 { + if s.variant == MuxerVariantLowLatency && (len(s.segments)-i) <= 2 { for _, part := range seg.parts { u = part.path if rawQuery != "" { @@ -515,7 +532,7 @@ func (s *muxerStream) generateMediaPlaylistFMP4( } } - if s.muxer.Variant == MuxerVariantLowLatency { + if s.variant == MuxerVariantLowLatency { for _, part := range s.nextSegment.(*muxerSegmentFMP4).parts { u := part.path if rawQuery != "" { @@ -531,7 +548,7 @@ func (s *muxerStream) generateMediaPlaylistFMP4( // preload hint must always be present // otherwise hls.js goes into a loop - uri := partPath(s.muxer.prefix, s.id, s.muxer.nextPartID) + uri := partPath(s.prefix, s.id, s.nextPartID) if rawQuery != "" { uri += "?" + rawQuery } @@ -565,14 +582,16 @@ func (s *muxerStream) generateAndCacheInitFile() error { s.initFilePresent = true initFile := w.Bytes() - s.muxer.server.pathHandlers[initFilePath(s.muxer.prefix, s.id)] = func(w http.ResponseWriter, _ *http.Request) { - // allow caching but use a small period in order to - // allow a stream to change track parameters - w.Header().Set("Cache-Control", "max-age="+initMaxAge) - w.Header().Set("Content-Type", "video/mp4") - w.WriteHeader(http.StatusOK) - w.Write(initFile) - } + s.server.registerPath( + initFilePath(s.prefix, s.id), + func(w http.ResponseWriter, _ *http.Request) { + // allow caching but use a small period in order to + // allow a stream to change track parameters + w.Header().Set("Cache-Control", "max-age="+initMaxAge) + w.Header().Set("Content-Type", "video/mp4") + w.WriteHeader(http.StatusOK) + w.Write(initFile) + }) return nil } @@ -580,14 +599,17 @@ func (s *muxerStream) generateAndCacheInitFile() error { func (s *muxerStream) createFirstSegment( nextDTS time.Duration, nextNTP time.Time, + nextSegmentID uint64, + nextPartID uint64, ) error { - if s.muxer.Variant == MuxerVariantMPEGTS { + if s.variant == MuxerVariantMPEGTS { //nolint:dupl seg := &muxerSegmentMPEGTS{ - segmentMaxSize: s.muxer.SegmentMaxSize, - prefix: s.muxer.prefix, - storageFactory: s.muxer.storageFactory, - stream: s, - id: s.muxer.nextSegmentID, + segmentMaxSize: s.segmentMaxSize, + prefix: s.prefix, + storageFactory: s.storageFactory, + streamID: s.id, + mpegtsWriter: s.mpegtsWriter, + id: nextSegmentID, startNTP: nextNTP, startDTS: nextDTS, } @@ -596,15 +618,14 @@ func (s *muxerStream) createFirstSegment( return err } s.nextSegment = seg + + s.mpegtsSwitchableWriter.w = seg.bw } else { seg := &muxerSegmentFMP4{ - variant: s.muxer.Variant, - segmentMaxSize: s.muxer.SegmentMaxSize, - prefix: s.muxer.prefix, - nextPartID: s.muxer.nextPartID, - storageFactory: s.muxer.storageFactory, - stream: s, - id: s.muxer.nextSegmentID, + prefix: s.prefix, + storageFactory: s.storageFactory, + streamID: s.id, + id: nextSegmentID, startNTP: nextNTP, startDTS: nextDTS, fromForcedRotation: false, @@ -614,12 +635,30 @@ func (s *muxerStream) createFirstSegment( return err } s.nextSegment = seg + + s.nextPart = &muxerPart{ + segmentMaxSize: s.segmentMaxSize, + streamID: s.id, + streamTracks: s.tracks, + segment: seg, + startDTS: seg.startDTS, + prefix: seg.prefix, + id: nextPartID, + storage: seg.storage.NewPart(), + } + s.nextPart.initialize() } return nil } -func (s *muxerStream) rotateParts(nextDTS time.Duration, createNew bool) error { +func (s *muxerStream) rotateParts( + nextDTS time.Duration, + createNew bool, + nextPartID uint64, +) error { + s.nextPartID = nextPartID + part := s.nextPart s.nextPart = nil @@ -628,78 +667,84 @@ func (s *muxerStream) rotateParts(nextDTS time.Duration, createNew bool) error { return err } - if s.muxer.Variant == MuxerVariantLowLatency { + if s.variant == MuxerVariantLowLatency { part.segment.parts = append(part.segment.parts, part) - s.muxer.server.pathHandlers[part.path] = func(w http.ResponseWriter, _ *http.Request) { - r, err := part.reader() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer r.Close() + s.server.registerPath( + part.path, + func(w http.ResponseWriter, _ *http.Request) { + r, err := part.reader() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + defer r.Close() - w.Header().Set("Cache-Control", "max-age="+segmentMaxAge) - w.Header().Set("Content-Type", "video/mp4") - w.WriteHeader(http.StatusOK) - io.Copy(w, r) - } + w.Header().Set("Cache-Control", "max-age="+segmentMaxAge) + w.Header().Set("Content-Type", "video/mp4") + w.WriteHeader(http.StatusOK) + io.Copy(w, r) + }) // EXT-X-PRELOAD-HINT - partID := s.muxer.nextPartID - partPath := partPath(s.muxer.prefix, s.id, partID) - s.muxer.server.pathHandlers[partPath] = func(w http.ResponseWriter, r *http.Request) { - s.muxer.mutex.Lock() + partID := nextPartID + partPath := partPath(s.prefix, s.id, partID) + s.server.registerPath( + partPath, + func(w http.ResponseWriter, r *http.Request) { + s.mutex.Lock() - for { - if s.muxer.closed { - w.WriteHeader(http.StatusInternalServerError) - return - } + for { + if s.closed { + w.WriteHeader(http.StatusInternalServerError) + return + } + + if s.nextPartID > partID { + break + } - if s.muxer.nextPartID > partID { - break + s.cond.Wait() } - s.muxer.cond.Wait() - } + h := s.server.getPathHandler(partPath) - h := s.muxer.server.pathHandlers[partPath] + s.mutex.Unlock() - s.muxer.mutex.Unlock() + if h != nil { + h(w, r) + } + }) + } - if h != nil { - h(w, r) - } + if createNew { + nextPart := &muxerPart{ + segmentMaxSize: s.segmentMaxSize, + streamID: s.id, + streamTracks: s.tracks, + segment: part.segment, + startDTS: nextDTS, + prefix: s.prefix, + id: nextPartID, + storage: part.segment.storage.NewPart(), } + nextPart.initialize() + s.nextPart = nextPart } // while segment target duration can be increased indefinitely, // part target duration cannot, since // "The duration of a Partial Segment MUST be at least 85% of the Part Target Duration" // so it's better to reset it every time. - if s == s.muxer.streams[0] { - partTargetDuration := partTargetDuration(s.segments, part.segment.parts) - if s.muxer.partTargetDuration == 0 { - s.muxer.partTargetDuration = partTargetDuration - } else if partTargetDuration != s.muxer.partTargetDuration { - s.muxer.OnEncodeError(fmt.Errorf("part duration changed from %v to %v - this will cause an error in iOS clients", - s.muxer.partTargetDuration, partTargetDuration)) - s.muxer.partTargetDuration = partTargetDuration - } - } - - if createNew { - nextPart := &muxerPart{ - stream: s, - segment: part.segment, - startDTS: nextDTS, - prefix: s.muxer.prefix, - id: s.muxer.nextPartID, - storage: part.segment.storage.NewPart(), + if s.isLeading { + partTargetDuration := partTargetDuration(s.segments, s.nextSegment.(*muxerSegmentFMP4).parts) + if s.partTargetDuration == 0 { + s.partTargetDuration = partTargetDuration + } else if partTargetDuration != s.partTargetDuration { + s.onEncodeError(fmt.Errorf("part duration changed from %v to %v - this will cause an error in iOS clients", + s.partTargetDuration, partTargetDuration)) + s.partTargetDuration = partTargetDuration } - nextPart.initialize() - s.nextPart = nextPart } return nil @@ -709,7 +754,18 @@ func (s *muxerStream) rotateSegments( nextDTS time.Duration, nextNTP time.Time, force bool, + nextSegmentID uint64, + nextPartID uint64, ) error { + if s.variant != MuxerVariantMPEGTS { + err := s.rotateParts(nextDTS, false, nextPartID) + if err != nil { + return err + } + } + + s.nextSegmentID = nextSegmentID + segment := s.nextSegment s.nextSegment = nil @@ -720,7 +776,7 @@ func (s *muxerStream) rotateSegments( } // add initial gaps, required by iOS LL-HLS - if s.muxer.Variant == MuxerVariantLowLatency && len(s.segments) == 0 { + if s.variant == MuxerVariantLowLatency && len(s.segments) == 0 { for i := 0; i < 7; i++ { s.segments = append(s.segments, &muxerGap{ duration: segment.getDuration(), @@ -730,99 +786,115 @@ func (s *muxerStream) rotateSegments( s.segments = append(s.segments, segment) - s.muxer.server.pathHandlers[segment.getPath()] = func(w http.ResponseWriter, _ *http.Request) { - r, err2 := segment.reader() - if err2 != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - defer r.Close() + s.server.registerPath( + segment.getPath(), + func(w http.ResponseWriter, _ *http.Request) { + r, err2 := segment.reader() + if err2 != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + defer r.Close() - w.Header().Set("Cache-Control", "max-age="+segmentMaxAge) - w.Header().Set( - "Content-Type", - func() string { - if s.muxer.Variant == MuxerVariantMPEGTS { - return "video/MP2T" - } - return "video/mp4" - }(), - ) - w.WriteHeader(http.StatusOK) - io.Copy(w, r) - } + w.Header().Set("Cache-Control", "max-age="+segmentMaxAge) + w.Header().Set( + "Content-Type", + func() string { + if s.variant == MuxerVariantMPEGTS { + return "video/MP2T" + } + return "video/mp4" + }(), + ) + w.WriteHeader(http.StatusOK) + io.Copy(w, r) + }) // delete old segments and parts - if len(s.segments) > s.muxer.SegmentCount { + if len(s.segments) > s.segmentCount { toDelete := s.segments[0] if toDeleteSeg, ok := toDelete.(*muxerSegmentFMP4); ok { for _, part := range toDeleteSeg.parts { - delete(s.muxer.server.pathHandlers, part.path) + s.server.unregisterPath(part.path) } } toDelete.close() - delete(s.muxer.server.pathHandlers, toDelete.getPath()) + + s.server.unregisterPath(toDelete.getPath()) s.segments = s.segments[1:] - if s == s.muxer.streams[0] { - s.muxer.segmentDeleteCount++ - } + s.segmentDeleteCount++ } // regenerate init files only if missing or codec parameters have changed - if s.muxer.Variant != MuxerVariantMPEGTS && (!s.initFilePresent || segment.isFromForcedRotation()) { + if s.variant != MuxerVariantMPEGTS && (!s.initFilePresent || segment.isFromForcedRotation()) { err = s.generateAndCacheInitFile() if err != nil { return err } } - if s == s.muxer.streams[0] { - targetDuration := targetDuration(s.segments) - if s.muxer.targetDuration == 0 { - s.muxer.targetDuration = targetDuration - } else if targetDuration > s.muxer.targetDuration { - s.muxer.OnEncodeError(fmt.Errorf( - "segment duration changed from %ds to %ds - this will cause an error in iOS clients", - s.muxer.targetDuration, targetDuration)) - s.muxer.targetDuration = targetDuration - } - } - - // create next segment - var nextSegment muxerSegment - if s.muxer.Variant == MuxerVariantMPEGTS { - nextSegment = &muxerSegmentMPEGTS{ - segmentMaxSize: s.muxer.SegmentMaxSize, - prefix: s.muxer.prefix, - storageFactory: s.muxer.storageFactory, - stream: s, - id: s.muxer.nextSegmentID, + if s.variant == MuxerVariantMPEGTS { //nolint:dupl + seg := &muxerSegmentMPEGTS{ + segmentMaxSize: s.segmentMaxSize, + prefix: s.prefix, + storageFactory: s.storageFactory, + streamID: s.id, + mpegtsWriter: s.mpegtsWriter, + id: nextSegmentID, startNTP: nextNTP, startDTS: nextDTS, } + err = seg.initialize() + if err != nil { + return err + } + s.nextSegment = seg + + s.mpegtsSwitchableWriter.w = seg.bw } else { - nextSegment = &muxerSegmentFMP4{ - variant: s.muxer.Variant, - segmentMaxSize: s.muxer.SegmentMaxSize, - prefix: s.muxer.prefix, - nextPartID: s.muxer.nextPartID, - storageFactory: s.muxer.storageFactory, - stream: s, - id: s.muxer.nextSegmentID, + seg := &muxerSegmentFMP4{ + prefix: s.prefix, + storageFactory: s.storageFactory, + streamID: s.id, + id: nextSegmentID, startNTP: nextNTP, startDTS: nextDTS, fromForcedRotation: force, } + err = seg.initialize() + if err != nil { + return err + } + s.nextSegment = seg + + s.nextPart = &muxerPart{ + segmentMaxSize: s.segmentMaxSize, + streamID: s.id, + streamTracks: s.tracks, + segment: seg, + startDTS: seg.startDTS, + prefix: seg.prefix, + id: nextPartID, + storage: seg.storage.NewPart(), + } + s.nextPart.initialize() } - err = nextSegment.initialize() - if err != nil { - return err + + if s.isLeading { + targetDuration := targetDuration(s.segments) + if s.targetDuration == 0 { + s.targetDuration = targetDuration + } else if targetDuration > s.targetDuration { + s.onEncodeError(fmt.Errorf( + "segment duration changed from %ds to %ds - this will cause an error in iOS clients", + s.targetDuration, targetDuration)) + s.targetDuration = targetDuration + } } - s.nextSegment = nextSegment return nil }