Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions hlsvod/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const readyTimeout = 80 * time.Second
// how long can it take for transcode to return first data
const transcodeTimeout = 10 * time.Second

// transcodeSegmentsFn is a package-level hook so tests can stub out the actual
// FFmpeg-based implementation. In production it points to TranscodeSegments.
var transcodeSegmentsFn = TranscodeSegments

type ManagerCtx struct {
mu sync.Mutex
logger zerolog.Logger
Expand Down Expand Up @@ -382,7 +386,7 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error {
segmentTimes := m.breakpoints[offset : offset+limit+1]
logger.Info().Interface("segments-times", segmentTimes).Msg("transcoding segments")

segments, err := TranscodeSegments(m.ctx, m.config.FFmpegBinary, TranscodeConfig{
segments, err := transcodeSegmentsFn(m.ctx, m.config.FFmpegBinary, TranscodeConfig{
InputFilePath: m.config.MediaPath,
OutputDirPath: m.config.TranscodeDir,
SegmentPrefix: m.config.SegmentPrefix, // This does not need to match.
Expand Down Expand Up @@ -458,44 +462,40 @@ func (m *ManagerCtx) transcodeFromSegment(index int) error {
m.mu.Lock()
defer m.mu.Unlock()

segmentsTotal := len(m.segments)
if segmentsTotal <= m.segmentBufferMax {
// if all our segments can fit in the buffer
// then we should transcode all of them
// regardless of the index
// Determine the upper bound (exclusive) of the segment range we will inspect
upperBound := len(m.segments)
if upperBound <= m.segmentBufferMax {
// All segments fit into the buffer – transcode everything
index = 0
} else if index+m.segmentBufferMax < segmentsTotal {
// cap transocded segments to the buffer size
segmentsTotal = index + m.segmentBufferMax
} else if index+m.segmentBufferMax < upperBound {
// Restrict the window to the current index + buffer size
upperBound = index + m.segmentBufferMax
}

offset, limit := 0, 0
for i := index; i < segmentsTotal-1; i++ {
processedCount, pendingCount := 0, 0 // processedCount is the number of segments already transcoded or enqueued
for i := index; i < upperBound; i++ {
_, isEnqueued := m.waitForSegment(i)
isTranscoded := m.isSegmentTranscoded(i)

// increase offset if transcoded without limit
if (isTranscoded || isEnqueued) && limit == 0 {
offset++
} else
// increase limit if is not transcoded
if !(isTranscoded || isEnqueued) {
limit++
} else
// break otherwise
{
// Skip already-handled segments until we find the first pending one
if (isTranscoded || isEnqueued) && pendingCount == 0 {
processedCount++
} else if !(isTranscoded || isEnqueued) {
// Count segments that still need to be transcoded
pendingCount++
} else {
// Once we have a mix of handled and pending segments, stop the scan
break
}
}

// if offset is greater than our minimal offset,
// or limit is 0, we have enough segments available
if offset > m.segmentBufferMin || limit == 0 {
// If we already have enough handled segments in the buffer, or no work is pending, exit early
if processedCount > m.segmentBufferMin || pendingCount == 0 {
return nil
}

// otherwise transcode chosen segment range
return m.transcodeSegments(offset+index, limit)
// Otherwise, transcode the pending segment window
return m.transcodeSegments(index+processedCount, pendingCount)
}

func (m *ManagerCtx) Start() (err error) {
Expand Down
47 changes: 47 additions & 0 deletions hlsvod/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package hlsvod

import (
"context"
"testing"
)

// TestTranscodeFromSegmentBufferSize ensures that transcodeFromSegment()
// queues exactly segmentBufferMax segments for transcoding. (This is
// a regression test, as the implementation previously only queued
// segmentBufferMax-1 segments.)
func TestTranscodeFromSegmentBufferSize(t *testing.T) {
const bufferMax = 5

// Prepare a ManagerCtx with exactly bufferMax segments.
m := &ManagerCtx{
breakpoints: make([]float64, bufferMax+1),
segmentBufferMax: bufferMax,
segmentBufferMin: 3, // default value from constructor
segments: make(map[int]string),
segmentQueue: make(map[int]chan struct{}),
}

// Populate the dummy segments map so that len(m.segments) == bufferMax.
for i := 0; i < bufferMax; i++ {
m.segments[i] = ""
}

// Stub out the transcode function so the test doesn't invoke FFmpeg.
origFn := transcodeSegmentsFn
transcodeSegmentsFn = func(_ context.Context, _ string, _ TranscodeConfig) (chan string, error) {
ch := make(chan string)
close(ch)
return ch, nil
}
defer func() { transcodeSegmentsFn = origFn }()

// Execute the code under test.
if err := m.transcodeFromSegment(0); err != nil {
t.Fatalf("transcodeFromSegment returned error: %v", err)
}

// transcodeFromSegment should enqueue `bufferMax` segments
if got := len(m.segmentQueue); got != bufferMax {
t.Fatalf("expected %d queued segments, got %d", bufferMax, got)
}
}