Skip to content

Commit 6572218

Browse files
committed
Jitter Buffer Improvements
Rework the jitterbuffer for SampleBuilder use Allow Skipping packets
1 parent 4663196 commit 6572218

File tree

8 files changed

+1417
-268
lines changed

8 files changed

+1417
-268
lines changed

internal/test/mock_stream.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
103103
if !ok {
104104
return 0, nil, io.EOF
105105
}
106-
107106
marshaled, err := p.Marshal()
108107
if err != nil {
109108
return 0, nil, io.EOF

pkg/jitterbuffer/jitter_buffer.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type (
6666
// order, and allows removing in either sequence number order or via a
6767
// provided timestamp.
6868
type JitterBuffer struct {
69-
packets *PriorityQueue
69+
packets *RBTree
7070
minStartCount uint16
7171
overflowLen uint16
7272
lastSequence uint16
@@ -98,7 +98,7 @@ func New(opts ...Option) *JitterBuffer {
9898
stats: Stats{0, 0, 0},
9999
minStartCount: 50,
100100
overflowLen: 100,
101-
packets: NewQueue(),
101+
packets: NewTree(),
102102
listeners: make(map[Event][]EventListener),
103103
}
104104

@@ -132,7 +132,15 @@ func (jb *JitterBuffer) PlayoutHead() uint16 {
132132
return jb.playoutHead
133133
}
134134

135-
// SetPlayoutHead allows you to manually specify the packet you wish to pop next
135+
// Length returns the current number of packets in the buffer.
136+
func (jb *JitterBuffer) Length() uint16 {
137+
jb.mutex.Lock()
138+
defer jb.mutex.Unlock()
139+
140+
return jb.packets.Length()
141+
}
142+
143+
// SetPlayoutHead allows you to manually specify the packet you wish to pop next.
136144
// If you have encountered a packet that hasn't resolved you can skip it.
137145
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
138146
jb.mutex.Lock()
@@ -171,7 +179,7 @@ func (jb *JitterBuffer) Push(packet *rtp.Packet) {
171179
}
172180

173181
jb.updateStats(packet.SequenceNumber)
174-
jb.packets.Push(packet, packet.SequenceNumber)
182+
jb.packets.Push(packet)
175183
jb.updateState()
176184
}
177185

@@ -255,6 +263,7 @@ func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) {
255263
func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) {
256264
jb.mutex.Lock()
257265
defer jb.mutex.Unlock()
266+
258267
packet, err := jb.packets.Find(sq)
259268
if err != nil {
260269
return nil, err
@@ -296,3 +305,11 @@ func (jb *JitterBuffer) Clear(resetState bool) {
296305
jb.minStartCount = 50
297306
}
298307
}
308+
309+
// State returns the current state of the jitter buffer.
310+
func (jb *JitterBuffer) State() State {
311+
jb.mutex.Lock()
312+
defer jb.mutex.Unlock()
313+
314+
return jb.state
315+
}

0 commit comments

Comments
 (0)