Skip to content

Commit 5f0a209

Browse files
authored
fix jitter buffer (#325)
1 parent 270b769 commit 5f0a209

File tree

9 files changed

+324
-2
lines changed

9 files changed

+324
-2
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ bin/
44
cmd/server/server
55

66
test/config.yaml
7-
test/client/*.mkv
7+
test/*/*.mkv

pkg/media/rtp/jitter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type jitterHandler struct {
3939
}
4040

4141
func (h *jitterHandler) HandleRTP(p *rtp.Packet) error {
42-
h.buf.Push(p)
42+
h.buf.Push(p.Clone())
4343
var last error
4444
for _, p := range h.buf.Pop(false) {
4545
if err := h.h.HandleRTP(p); err != nil {

pkg/siptest/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ func (c *Client) createOffer() ([]byte, error) {
538538
return offer.Marshal()
539539
}
540540

541+
// Sends PCM audio from a webm file
541542
func (c *Client) SendAudio(path string) error {
542543
f, err := os.Open(path)
543544
if err != nil {

test/cloud/cloud.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package cloud
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/livekit/protocol/livekit"
8+
"github.com/livekit/protocol/logger"
9+
"github.com/livekit/protocol/rpc"
10+
"github.com/livekit/protocol/sip"
11+
"github.com/livekit/psrpc"
12+
)
13+
14+
type CloudTestService struct {
15+
conf *IntegrationConfig
16+
17+
psrpcClient rpc.SIPInternalClient
18+
}
19+
20+
func NewCloudTestService(conf *IntegrationConfig, bus psrpc.MessageBus) (*CloudTestService, error) {
21+
c, err := rpc.NewSIPInternalClient(bus)
22+
if err != nil {
23+
return nil, err
24+
}
25+
26+
return &CloudTestService{
27+
conf: conf,
28+
psrpcClient: c,
29+
}, nil
30+
}
31+
32+
func (s *CloudTestService) CreateSIPParticipant(ctx context.Context, req *livekit.CreateSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) {
33+
token, err := sip.BuildSIPToken(sip.SIPTokenParams{
34+
APIKey: s.conf.ApiKey,
35+
APISecret: s.conf.ApiSecret,
36+
RoomName: req.RoomName,
37+
ParticipantIdentity: req.ParticipantIdentity,
38+
ParticipantName: req.ParticipantName,
39+
ParticipantMetadata: req.ParticipantMetadata,
40+
ParticipantAttributes: req.ParticipantAttributes,
41+
})
42+
if err != nil {
43+
logger.Errorw("failed to create SIP token", err)
44+
return nil, err
45+
}
46+
47+
callID := sip.NewCallID()
48+
trunk := &livekit.SIPOutboundTrunkInfo{}
49+
r, err := rpc.NewCreateSIPParticipantRequest(ProjectID, callID, Host, s.conf.WsUrl, token, req, trunk)
50+
if err != nil {
51+
logger.Errorw("failed to build CreateSIPParticipantRequest", err)
52+
return nil, err
53+
}
54+
55+
resp, err := s.psrpcClient.CreateSIPParticipant(ctx, s.conf.ClusterID, r, psrpc.WithRequestTimeout(time.Second*30))
56+
if err != nil {
57+
logger.Errorw("failed to create SIP participant", err)
58+
return nil, err
59+
}
60+
61+
return &livekit.SIPParticipantInfo{
62+
ParticipantId: resp.ParticipantId,
63+
ParticipantIdentity: resp.ParticipantIdentity,
64+
RoomName: req.RoomName,
65+
SipCallId: r.SipCallId,
66+
}, nil
67+
}

test/cloud/config.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package cloud
2+
3+
import (
4+
"os"
5+
6+
"github.com/livekit/sip/pkg/config"
7+
)
8+
9+
const (
10+
Host = "integration.sip.livekit.cloud"
11+
ProjectID = "sip_integration"
12+
Uri = "integration.pstn.twilio.com"
13+
)
14+
15+
type IntegrationConfig struct {
16+
*config.Config
17+
18+
RoomName string
19+
}
20+
21+
func NewIntegrationConfig() (*IntegrationConfig, error) {
22+
c := &IntegrationConfig{
23+
Config: &config.Config{
24+
ApiKey: os.Getenv("LIVEKIT_API_KEY"),
25+
ApiSecret: os.Getenv("LIVEKIT_API_SECRET"),
26+
WsUrl: os.Getenv("LIVEKIT_WS_URL"),
27+
ServiceName: "sip",
28+
EnableJitterBuffer: true,
29+
},
30+
RoomName: "test",
31+
}
32+
if err := c.Config.Init(); err != nil {
33+
return nil, err
34+
}
35+
return c, nil
36+
}

test/cloud/integration_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package cloud
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/livekit/protocol/logger"
11+
"github.com/livekit/psrpc"
12+
)
13+
14+
func TestSIP(t *testing.T) {
15+
logger.InitFromConfig(&logger.Config{
16+
JSON: false,
17+
Level: "debug",
18+
}, "sip")
19+
20+
conf, err := NewIntegrationConfig()
21+
require.NoError(t, err)
22+
23+
if conf.ApiKey == "" || conf.ApiSecret == "" || conf.WsUrl == "" {
24+
t.Skip("missing env vars")
25+
}
26+
27+
bus := psrpc.NewLocalMessageBus()
28+
svc, err := NewService(conf, bus)
29+
require.NoError(t, err)
30+
defer svc.Stop(true)
31+
32+
go func() {
33+
_ = svc.Run()
34+
}()
35+
36+
a, err := NewPhoneClient(false)
37+
require.NoError(t, err)
38+
defer a.Close()
39+
40+
b, err := NewPhoneClient(true)
41+
require.NoError(t, err)
42+
defer b.Close()
43+
44+
ctx, cancel := context.WithCancel(context.Background())
45+
defer cancel()
46+
go b.SendSilence(ctx)
47+
48+
go a.SendAudio("audio-pcm.mkv")
49+
50+
time.Sleep(time.Second * 5)
51+
_ = a.SendDTMF("2345")
52+
time.Sleep(time.Second * 5)
53+
}

test/cloud/io.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package cloud
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"google.golang.org/protobuf/types/known/emptypb"
8+
9+
"github.com/livekit/protocol/logger"
10+
"github.com/livekit/protocol/rpc"
11+
"github.com/livekit/protocol/sip"
12+
"github.com/livekit/psrpc"
13+
)
14+
15+
const (
16+
TrunkID = "ST_INTEGRATION"
17+
DispatchRuleID = "SDR_INTEGRATION"
18+
)
19+
20+
type IOTestClient struct {
21+
rpc.IOInfoClient
22+
23+
conf *IntegrationConfig
24+
}
25+
26+
func NewIOTestClient(conf *IntegrationConfig) rpc.IOInfoClient {
27+
return &IOTestClient{conf: conf}
28+
}
29+
30+
func (c *IOTestClient) GetSIPTrunkAuthentication(_ context.Context, _ *rpc.GetSIPTrunkAuthenticationRequest, _ ...psrpc.RequestOption) (*rpc.GetSIPTrunkAuthenticationResponse, error) {
31+
return &rpc.GetSIPTrunkAuthenticationResponse{
32+
SipTrunkId: TrunkID,
33+
ProjectId: ProjectID,
34+
}, nil
35+
}
36+
37+
func (c *IOTestClient) EvaluateSIPDispatchRules(_ context.Context, _ *rpc.EvaluateSIPDispatchRulesRequest, _ ...psrpc.RequestOption) (*rpc.EvaluateSIPDispatchRulesResponse, error) {
38+
identity := fmt.Sprintf("phone-%d", num.Load())
39+
token, err := sip.BuildSIPToken(sip.SIPTokenParams{
40+
APIKey: c.conf.ApiKey,
41+
APISecret: c.conf.ApiSecret,
42+
RoomName: c.conf.RoomName,
43+
ParticipantIdentity: identity,
44+
ParticipantName: identity,
45+
RoomConfig: nil,
46+
})
47+
if err != nil {
48+
logger.Errorw("failed to build SIP token", err)
49+
return nil, err
50+
}
51+
52+
return &rpc.EvaluateSIPDispatchRulesResponse{
53+
RoomName: c.conf.RoomName,
54+
ParticipantIdentity: identity,
55+
ParticipantName: identity,
56+
Token: token,
57+
WsUrl: c.conf.WsUrl,
58+
Result: rpc.SIPDispatchResult_ACCEPT,
59+
SipTrunkId: TrunkID,
60+
SipDispatchRuleId: DispatchRuleID,
61+
ProjectId: ProjectID,
62+
RoomConfig: nil,
63+
}, nil
64+
}
65+
66+
func (c *IOTestClient) UpdateSIPCallState(_ context.Context, _ *rpc.UpdateSIPCallStateRequest, _ ...psrpc.RequestOption) (*emptypb.Empty, error) {
67+
return &emptypb.Empty{}, nil
68+
}

test/cloud/phone.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package cloud
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"sync/atomic"
7+
8+
"github.com/livekit/sip/pkg/media/dtmf"
9+
"github.com/livekit/sip/pkg/media/g711"
10+
"github.com/livekit/sip/pkg/siptest"
11+
)
12+
13+
const (
14+
to = 15550100000
15+
codec = g711.ULawSDPName
16+
)
17+
18+
var num atomic.Int64
19+
20+
type PhoneClient struct {
21+
*siptest.Client
22+
23+
rec *os.File
24+
}
25+
26+
func NewPhoneClient(record bool) (*PhoneClient, error) {
27+
n := num.Add(1)
28+
id := fmt.Sprintf("phone-%d", n)
29+
30+
c, err := siptest.NewClient(id, siptest.ClientConfig{
31+
Number: fmt.Sprintf("+%d", to+n),
32+
Codec: codec,
33+
OnBye: func() {},
34+
OnDTMF: func(ev dtmf.Event) {
35+
fmt.Println("DTMF C:", ev.Code, " D:", string(ev.Digit))
36+
},
37+
})
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
p := &PhoneClient{
43+
Client: c,
44+
}
45+
if record {
46+
p.rec, err = os.Create(fmt.Sprintf("%s.mkv", id))
47+
if err != nil {
48+
return nil, err
49+
}
50+
c.Record(p.rec)
51+
}
52+
53+
if err = p.Client.Dial(p.LocalIP()+":5060", Uri, fmt.Sprintf("+%d", to), nil); err != nil {
54+
return nil, err
55+
}
56+
57+
return p, nil
58+
}
59+
60+
func (p *PhoneClient) Close() {
61+
p.Client.Close()
62+
if rec := p.rec; rec != nil {
63+
_ = p.rec.Close()
64+
}
65+
}

test/cloud/service.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package cloud
2+
3+
import (
4+
"github.com/livekit/protocol/logger"
5+
"github.com/livekit/protocol/rpc"
6+
"github.com/livekit/psrpc"
7+
"github.com/livekit/sip/pkg/service"
8+
"github.com/livekit/sip/pkg/sip"
9+
"github.com/livekit/sip/pkg/stats"
10+
)
11+
12+
func NewService(conf *IntegrationConfig, bus psrpc.MessageBus) (*service.Service, error) {
13+
psrpcClient := NewIOTestClient(conf)
14+
15+
mon, err := stats.NewMonitor(conf.Config)
16+
if err != nil {
17+
return nil, err
18+
}
19+
20+
sipsrv, err := sip.NewService("", conf.Config, mon, logger.GetLogger(), func(projectID string) rpc.IOInfoClient { return psrpcClient })
21+
if err != nil {
22+
return nil, err
23+
}
24+
svc := service.NewService(conf.Config, logger.GetLogger(), sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcClient, bus, mon)
25+
sipsrv.SetHandler(svc)
26+
27+
if err = sipsrv.Start(); err != nil {
28+
return nil, err
29+
}
30+
31+
return svc, nil
32+
}

0 commit comments

Comments
 (0)