Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ type TLSConfig struct {
KeyLog string `yaml:"key_log"`
}

type SessionTimerConfig struct {
DefaultExpires int `yaml:"default_expires"` // Default session interval in seconds (default: 1800)
MinSE int `yaml:"min_se"` // Minimum acceptable session interval (default: 90)
PreferRefresher string `yaml:"prefer_refresher"` // Preferred refresher role: "uac" or "uas" (default: "uac")
UseUpdate bool `yaml:"use_update"` // Use UPDATE instead of re-INVITE for refresh (default: false)
}

type Config struct {
Redis *redis.RedisConfig `yaml:"redis"` // required
ApiKey string `yaml:"api_key"` // required (env LIVEKIT_API_KEY)
Expand Down Expand Up @@ -100,6 +107,9 @@ type Config struct {
EnableJitterBuffer bool `yaml:"enable_jitter_buffer"`
EnableJitterBufferProb float64 `yaml:"enable_jitter_buffer_prob"`

// SessionTimer configures RFC 4028 session timer support
SessionTimer SessionTimerConfig `yaml:"session_timer"`

// internal
ServiceName string `yaml:"-"`
NodeID string // Do not provide, will be overwritten
Expand Down Expand Up @@ -158,6 +168,17 @@ func (c *Config) Init() error {
c.MaxCpuUtilization = 0.9
}

// Initialize session timer defaults
if c.SessionTimer.DefaultExpires == 0 {
c.SessionTimer.DefaultExpires = 1800 // 30 minutes
}
if c.SessionTimer.MinSE == 0 {
c.SessionTimer.MinSE = 90 // RFC 4028 minimum
}
if c.SessionTimer.PreferRefresher == "" {
c.SessionTimer.PreferRefresher = "uac"
}

if err := c.InitLogger(); err != nil {
return err
}
Expand Down
205 changes: 183 additions & 22 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,28 +553,30 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
}

type inboundCall struct {
s *Server
log logger.Logger
cc *sipInbound
mon *stats.CallMonitor
state *CallState
extraAttrs map[string]string
attrsToHdr map[string]string
ctx context.Context
cancel func()
closeReason atomic.Pointer[ReasonHeader]
call *rpc.SIPCall
media *MediaPort
dtmf chan dtmf.Event // buffered
lkRoom *Room // LiveKit room; only active after correct pin is entered
callDur func() time.Duration
joinDur func() time.Duration
forwardDTMF atomic.Bool
done atomic.Bool
started core.Fuse
stats Stats
jitterBuf bool
projectID string
s *Server
log logger.Logger
cc *sipInbound
mon *stats.CallMonitor
state *CallState
extraAttrs map[string]string
attrsToHdr map[string]string
ctx context.Context
cancel func()
closeReason atomic.Pointer[ReasonHeader]
call *rpc.SIPCall
media *MediaPort
dtmf chan dtmf.Event // buffered
lkRoom *Room // LiveKit room; only active after correct pin is entered
callDur func() time.Duration
joinDur func() time.Duration
forwardDTMF atomic.Bool
done atomic.Bool
started core.Fuse
stats Stats
jitterBuf bool
projectID string
sessionTimer *SessionTimer // RFC 4028 session timer
lastSDP []byte // Last SDP answer sent (for session refresh)
}

func (s *Server) newInboundCall(
Expand Down Expand Up @@ -622,6 +624,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
c.call.SipCallId = h.Value()
}

// Initialize session timer (RFC 4028)
c.initSessionTimer(req, conf)

c.cc.StartRinging()
// Send initial request. In the best case scenario, we will immediately get a room name to join.
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
Expand Down Expand Up @@ -713,6 +718,10 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
headers = AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers)
}
c.log.Infow("Accepting the call", "headers", headers)

// Store SDP for session refresh
c.lastSDP = answerData

err := c.cc.Accept(ctx, answerData, headers)
if errors.Is(err, errNoACK) {
c.log.Errorw("Call accepted, but no ACK received", err)
Expand Down Expand Up @@ -812,6 +821,11 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI

c.started.Break()

// Start session timer after call is established
if c.sessionTimer != nil {
c.sessionTimer.Start()
}

var noAck = false
// Wait for the caller to terminate the call. Send regular keep alives
ticker := time.NewTicker(stateUpdateTick)
Expand Down Expand Up @@ -1038,6 +1052,58 @@ func (c *inboundCall) printStats(log logger.Logger) {
log.Infow("call statistics", "stats", c.stats.Load())
}

// initSessionTimer initializes the session timer from the incoming INVITE
func (c *inboundCall) initSessionTimer(req *sip.Request, conf *config.Config) {
// Convert config format to session timer config
stConfig := SessionTimerConfig{
DefaultExpires: conf.SessionTimer.DefaultExpires,
MinSE: conf.SessionTimer.MinSE,
UseUpdate: conf.SessionTimer.UseUpdate,
}

// Parse prefer refresher string
switch conf.SessionTimer.PreferRefresher {
case "uac":
stConfig.PreferRefresher = RefresherUAC
case "uas":
stConfig.PreferRefresher = RefresherUAS
default:
stConfig.PreferRefresher = RefresherUAC
}

c.sessionTimer = NewSessionTimer(stConfig, false, c.log) // isUAC=false for inbound
c.sessionTimer.SetContext(c.ctx)

// Set up callbacks
c.sessionTimer.SetCallbacks(
func(ctx context.Context) error {
return c.sendSessionRefresh(ctx)
},
func(ctx context.Context) error {
c.log.Warnw("Session timer expired, terminating call", nil)
c.closeWithTimeout()
return nil
},
)

// Share timer with sipInbound for response generation
c.cc.sessionTimer = c.sessionTimer

// Negotiate session timer parameters from INVITE
_, _, _, err := c.sessionTimer.NegotiateInvite(req)
if err != nil {
c.log.Warnw("Session timer negotiation failed, timer disabled", err)
}
}

// sendSessionRefresh sends a session refresh (re-INVITE or UPDATE)
func (c *inboundCall) sendSessionRefresh(ctx context.Context) error {
c.log.Infow("Sending session refresh")

// Use the sipInbound layer to send the refresh with the same SDP
return c.cc.sendSessionRefresh(ctx, c.lastSDP)
}

// close should only be called from handleInvite.
func (c *inboundCall) close(error bool, status CallStatus, reason string) {
if !c.done.CompareAndSwap(false, true) {
Expand All @@ -1060,6 +1126,12 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {

c.closeMedia()
c.cc.CloseWithStatus(sipCode, sipStatus)

// Stop session timer if active
if c.sessionTimer != nil {
c.sessionTimer.Stop()
}

if c.callDur != nil {
c.callDur()
}
Expand Down Expand Up @@ -1352,6 +1424,7 @@ type sipInbound struct {
ringing chan struct{}
acked core.Fuse
setHeaders setHeadersFunc
sessionTimer *SessionTimer // Session timer reference
}

func (c *sipInbound) ValidateInvite() error {
Expand Down Expand Up @@ -1550,6 +1623,15 @@ func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[str

c.addExtraHeaders(r)

// Add session timer headers if negotiated
if c.sessionTimer != nil {
sessionExpires := c.sessionTimer.GetSessionExpires()
refresher := c.sessionTimer.GetRefresher()
if sessionExpires > 0 {
c.sessionTimer.AddHeadersToResponse(r, sessionExpires, refresher)
}
}

r.AppendHeader(&contentTypeHeaderSDP)
for k, v := range headers {
r.AppendHeader(sip.NewHeader(k, v))
Expand Down Expand Up @@ -1665,6 +1747,85 @@ func (c *sipInbound) setCSeq(req *sip.Request) {
c.nextRequestCSeq++
}

// sendSessionRefresh sends a mid-dialog re-INVITE to refresh the session
func (c *sipInbound) sendSessionRefresh(ctx context.Context, sdpOffer []byte) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.inviteOk == nil || c.invite == nil {
return errors.New("call not established")
}

ctx, span := tracer.Start(ctx, "sipInbound.sendSessionRefresh")
defer span.End()

// Create re-INVITE request with the same dialog parameters
req := sip.NewRequest(sip.INVITE, c.invite.Recipient)

// Copy essential headers from original INVITE
req.RemoveHeader("Call-ID")
if callID := c.invite.CallID(); callID != nil {
req.AppendHeader(callID)
}

// From and To headers (maintaining tags)
req.AppendHeader(c.from)
req.AppendHeader(c.to)

// Contact
if c.contact != nil {
req.AppendHeader(c.contact)
}

// Set new CSeq
c.setCSeq(req)

// Add SDP body
if sdpOffer != nil && len(sdpOffer) > 0 {
req.SetBody(sdpOffer)
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
}

// Add session timer headers if active
if c.sessionTimer != nil {
c.sessionTimer.AddHeadersToRequest(req)
}

// Add custom headers
if c.setHeaders != nil {
for k, v := range c.setHeaders(nil) {
req.AppendHeader(sip.NewHeader(k, v))
}
}

// Swap src/dst for client-like behavior
c.swapSrcDst(req)

// Send the request and wait for response
tx, err := c.s.sipSrv.TransactionLayer().Request(req)
if err != nil {
return fmt.Errorf("failed to send session refresh: %w", err)
}
defer tx.Terminate()

// Wait for response
resp, err := sipResponse(ctx, tx, nil, nil)
if err != nil {
return fmt.Errorf("session refresh failed: %w", err)
}

if resp.StatusCode != sip.StatusOK {
return fmt.Errorf("session refresh rejected: %d %s", resp.StatusCode, resp.Reason)
}

c.log.Infow("Session refresh successful")

// Send ACK
ack := sip.NewAckRequest(req, resp, nil)
c.swapSrcDst(ack)
return c.s.sipSrv.TransportLayer().WriteMsg(ack)
}

func (c *sipInbound) sendBye() {
if c.inviteOk == nil {
return // call wasn't established
Expand Down
Loading