Skip to content

Commit

Permalink
muxer: isolate components (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Dec 25, 2024
1 parent d2288ae commit 4362616
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 444 deletions.
287 changes: 225 additions & 62 deletions muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,81 @@ import (
"io"
"log"
"net/http"
"net/url"
"strconv"
"sync"
"time"

"github.com/bluenviron/gohlslib/v2/pkg/codecs"
"github.com/bluenviron/gohlslib/v2/pkg/playlist"
"github.com/bluenviron/gohlslib/v2/pkg/storage"
)

const (
fmp4StartDTS = 10 * time.Second
mpegtsSegmentMinAUCount = 100
fmp4StartDTS = 10 * time.Second
mpegtsSegmentMinAUCount = 100
multivariantPlaylistMaxAge = "30"
initMaxAge = "30"
segmentMaxAge = "3600"
)

type switchableWriter struct {
w io.Writer
func boolPtr(v bool) *bool {
return &v
}

func (w *switchableWriter) Write(p []byte) (int, error) {
return w.w.Write(p)
func parseMSNPart(msn string, part string) (uint64, uint64, error) {
var msnint uint64
if msn != "" {
var err error
msnint, err = strconv.ParseUint(msn, 10, 64)
if err != nil {
return 0, 0, err
}
}

var partint uint64
if part != "" {
var err error
partint, err = strconv.ParseUint(part, 10, 64)
if err != nil {
return 0, 0, err
}
}

return msnint, partint, nil
}

func bandwidth(segments []muxerSegment) (int, int) {
if len(segments) == 0 {
return 0, 0
}

var maxBandwidth uint64
var sizes uint64
var durations time.Duration

for _, seg := range segments {
if _, ok := seg.(*muxerGap); !ok {
bandwidth := 8 * seg.getSize() * uint64(time.Second) / uint64(seg.getDuration())
if bandwidth > maxBandwidth {
maxBandwidth = bandwidth
}
sizes += seg.getSize()
durations += seg.getDuration()
}
}

averageBandwidth := 8 * sizes * uint64(time.Second) / uint64(durations)

return int(maxBandwidth), int(averageBandwidth)
}

func queryVal(q url.Values, key string) string {
vals, ok := q[key]
if ok && len(vals) >= 1 {
return vals[0]
}
return ""
}

func isVideo(codec codecs.Codec) bool {
Expand Down Expand Up @@ -79,6 +135,14 @@ func fmp4TimeScale(c codecs.Codec) uint32 {
return 90000
}

type switchableWriter struct {
w io.Writer
}

func (w *switchableWriter) Write(p []byte) (int, error) {
return w.w.Write(p)
}

// MuxerOnEncodeErrorFunc is the prototype of Muxer.OnEncodeError.
type MuxerOnEncodeErrorFunc func(err error)

Expand Down Expand Up @@ -127,21 +191,17 @@ type Muxer struct {
// private
//

mutex sync.Mutex
cond *sync.Cond
mtracks []*muxerTrack
mtracksByTrack map[*Track]*muxerTrack
streams []*muxerStream
prefix string
storageFactory storage.Factory
segmenter *muxerSegmenter
server *muxerServer
closed bool
nextSegmentID uint64
nextPartID uint64 // low-latency only
segmentDeleteCount int
targetDuration int
partTargetDuration time.Duration
mutex sync.Mutex
cond *sync.Cond
mtracks []*muxerTrack
mtracksByTrack map[*Track]*muxerTrack
streams []*muxerStream
leadingStream *muxerStream
prefix string
storageFactory storage.Factory
segmenter *muxerSegmenter
server *muxerServer
closed bool
}

// Start initializes the muxer.
Expand Down Expand Up @@ -236,15 +296,18 @@ func (m *Muxer) Start() error {
m.mtracksByTrack = make(map[*Track]*muxerTrack)

m.segmenter = &muxerSegmenter{
muxer: m,
variant: m.Variant,
segmentMinDuration: m.SegmentMinDuration,
partMinDuration: m.PartMinDuration,
parent: m,
}
m.segmenter.initialize()

m.server = &muxerServer{
muxer: m,
}
m.server = &muxerServer{}
m.server.initialize()

m.server.registerPath("index.m3u8", m.handleMultivariantPlaylist)

for i, track := range m.Tracks {
mtrack := &muxerTrack{
Track: track,
Expand All @@ -256,18 +319,40 @@ func (m *Muxer) Start() error {
m.mtracksByTrack[track] = mtrack
}

var err error
m.prefix, err = generatePrefix()
if err != nil {
return err
}

if m.Directory != "" {
m.storageFactory = storage.NewFactoryDisk(m.Directory)
} else {
m.storageFactory = storage.NewFactoryRAM()
}

// add initial gaps, required by iOS LL-HLS
nextSegmentID := uint64(0)
if m.Variant == MuxerVariantLowLatency {
m.nextSegmentID = 7
nextSegmentID = 7
}

switch {
case m.Variant == MuxerVariantMPEGTS:
stream := &muxerStream{
muxer: m,
tracks: m.mtracks,
id: "main",
isLeading: true,
isLeading: true,
variant: m.Variant,
segmentMaxSize: m.SegmentMaxSize,
segmentCount: m.SegmentCount,
onEncodeError: m.OnEncodeError,
mutex: &m.mutex,
cond: m.cond,
prefix: m.prefix,
storageFactory: m.storageFactory,
server: m.server,
tracks: m.mtracks,
id: "main",
nextSegmentID: nextSegmentID,
}
stream.initialize()
m.streams = append(m.streams, stream)
Expand Down Expand Up @@ -305,31 +390,37 @@ func (m *Muxer) Start() error {
}

stream := &muxerStream{
muxer: m,
tracks: []*muxerTrack{track},
id: id,
isLeading: track.isLeading,
isRendition: isRendition,
name: name,
language: track.Language,
isDefault: isDefault,
variant: m.Variant,
segmentMaxSize: m.SegmentMaxSize,
segmentCount: m.SegmentCount,
onEncodeError: m.OnEncodeError,
mutex: &m.mutex,
cond: m.cond,
prefix: m.prefix,
storageFactory: m.storageFactory,
server: m.server,
tracks: []*muxerTrack{track},
id: id,
isLeading: track.isLeading,
isRendition: isRendition,
name: name,
language: track.Language,
isDefault: isDefault,
nextSegmentID: nextSegmentID,
}
stream.initialize()
m.streams = append(m.streams, stream)
}
}

var err error
m.prefix, err = generatePrefix()
if err != nil {
return err
}

if m.Directory != "" {
m.storageFactory = storage.NewFactoryDisk(m.Directory)
} else {
m.storageFactory = storage.NewFactoryRAM()
}
m.leadingStream = func() *muxerStream {
for _, stream := range m.streams {
if stream.isLeading {
return stream
}
}
return nil
}()

return nil
}
Expand Down Expand Up @@ -425,7 +516,7 @@ func (m *Muxer) createFirstSegment(nextDTS time.Duration, nextNTP time.Time) err

func (m *Muxer) rotateParts(nextDTS time.Duration) error {
m.mutex.Lock()
err := m.rotatePartsInner(nextDTS, true)
err := m.rotatePartsInner(nextDTS)
m.mutex.Unlock()

if err != nil {
Expand All @@ -437,13 +528,19 @@ func (m *Muxer) rotateParts(nextDTS time.Duration) error {
return nil
}

func (m *Muxer) rotatePartsInner(nextDTS time.Duration, createNew bool) error {
m.nextPartID++
func (m *Muxer) rotatePartsInner(nextDTS time.Duration) error {
err := m.leadingStream.rotateParts(nextDTS, true)
if err != nil {
return err
}

for _, stream := range m.streams {
err := stream.rotateParts(nextDTS, createNew)
if err != nil {
return err
if !stream.isLeading {
err := stream.rotateParts(nextDTS, true)
if err != nil {
return err
}
stream.partTargetDuration = m.leadingStream.partTargetDuration
}
}

Expand Down Expand Up @@ -473,21 +570,87 @@ func (m *Muxer) rotateSegmentsInner(
nextNTP time.Time,
force bool,
) error {
if m.Variant != MuxerVariantMPEGTS {
err := m.rotatePartsInner(nextDTS, false)
err := m.leadingStream.rotateSegments(nextDTS, nextNTP, force)
if err != nil {
return err
}

for _, stream := range m.streams {
if !stream.isLeading {
err := stream.rotateSegments(nextDTS, nextNTP, force)
if err != nil {
return err
}
stream.targetDuration = m.leadingStream.targetDuration
stream.partTargetDuration = m.leadingStream.partTargetDuration
}
}

return nil
}

func (m *Muxer) handleMultivariantPlaylist(w http.ResponseWriter, r *http.Request) {
buf := func() []byte {
m.mutex.Lock()
defer m.mutex.Unlock()

for {
if m.closed {
return nil
}

if m.streams[0].hasContent() {
break
}

m.cond.Wait()
}

buf, err := m.generateMultivariantPlaylist(r.URL.RawQuery)
if err != nil {
return err
return nil
}

return buf
}()

if buf == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

m.nextSegmentID++
// allow caching but use a small period in order to
// allow a stream to change tracks or bitrate
w.Header().Set("Cache-Control", "max-age="+multivariantPlaylistMaxAge)
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.WriteHeader(http.StatusOK)
w.Write(buf)
}

func (m *Muxer) generateMultivariantPlaylist(rawQuery string) ([]byte, error) {
// TODO: consider segments in all streams
maxBandwidth, averageBandwidth := bandwidth(m.streams[0].segments)

pl := &playlist.Multivariant{
Version: func() int {
if m.Variant == MuxerVariantMPEGTS {
return 3
}
return 9
}(),
IndependentSegments: true,
Variants: []*playlist.MultivariantVariant{{
Bandwidth: maxBandwidth,
AverageBandwidth: &averageBandwidth,
}},
}

for _, stream := range m.streams {
err := stream.rotateSegments(nextDTS, nextNTP, force)
err := stream.populateMultivariantPlaylist(pl, rawQuery)
if err != nil {
return err
return nil, err
}
}

return nil
return pl.Marshal()
}
Loading

0 comments on commit 4362616

Please sign in to comment.