Skip to content

Commit 9b1f061

Browse files
authored
Do not block when ACK is missing. (#461)
* Do not block when ACK is missing. * Force retry 200 OK for UDP. * Move late ACK timer right after the Accept.
1 parent b5a1c15 commit 9b1f061

File tree

4 files changed

+178
-64
lines changed

4 files changed

+178
-64
lines changed

pkg/config/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ type Config struct {
106106

107107
// Experimental, these option might go away without notice.
108108
Experimental struct {
109-
IgnoreMissingACK bool `yaml:"ignore_missing_ack"`
109+
// InboundWaitACK forces SIP to wait for an ACK to 200 OK before proceeding with the call.
110+
InboundWaitACK bool `yaml:"inbound_wait_ack"`
110111
} `yaml:"experimental"`
111112
}
112113

pkg/sip/inbound.go

Lines changed: 98 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,17 @@ import (
5252
)
5353

5454
const (
55+
stateUpdateTick = 10 * time.Minute
56+
5557
// audioBridgeMaxDelay delays sending audio for certain time, unless RTP packet is received.
5658
// This is done because of audio cutoff at the beginning of calls observed in the wild.
5759
audioBridgeMaxDelay = 1 * time.Second
5860

59-
inviteOkAckTimeout = 5 * time.Second
61+
inviteOkRetryInterval = 250 * time.Millisecond // 1/2 of T1 for faster recovery
62+
inviteOkRetryIntervalMax = 3 * time.Second
63+
inviteOKRetryAttempts = 5
64+
inviteOKRetryAttemptsNoACK = 2
65+
inviteOkAckLateTimeout = inviteOkRetryIntervalMax
6066
)
6167

6268
var errNoACK = errors.New("no ACK received for 200 OK")
@@ -609,6 +615,13 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
609615
return answerData, nil
610616
}
611617

618+
// If we do not wait for ACK during Accept, we could wait for it later.
619+
// Otherwise, leave channels nil, so that they never trigger.
620+
var (
621+
ackReceived <-chan struct{}
622+
ackTimeout <-chan time.Time
623+
)
624+
612625
// We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF.
613626
acceptCall := func(answerData []byte) (bool, error) {
614627
headers := disp.Headers
@@ -619,19 +632,19 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
619632
c.log.Infow("Accepting the call", "headers", headers)
620633
err := c.cc.Accept(ctx, answerData, headers)
621634
if errors.Is(err, errNoACK) {
622-
if !c.s.conf.Experimental.IgnoreMissingACK {
623-
c.log.Errorw("Call accepted, but no ACK received", err)
624-
c.close(true, callNoACK, "no-ack")
625-
return false, err
626-
}
627-
c.log.Warnw("Call accepted, but no ACK received", err)
628-
err = nil // ignore
629-
}
630-
if err != nil {
635+
c.log.Errorw("Call accepted, but no ACK received", err)
636+
c.closeWithNoACK()
637+
return false, err
638+
} else if err != nil {
631639
c.log.Errorw("Cannot accept the call", err)
632640
c.close(true, callAcceptFailed, "accept-failed")
633641
return false, err
634642
}
643+
if !c.s.conf.Experimental.InboundWaitACK {
644+
ackReceived = c.cc.InviteACK()
645+
// Start this timer right after the Accept.
646+
ackTimeout = time.After(inviteOkAckLateTimeout)
647+
}
635648
c.media.EnableTimeout(true)
636649
c.media.EnableOut()
637650
if ok, err := c.waitMedia(ctx); !ok {
@@ -716,9 +729,10 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
716729

717730
c.started.Break()
718731

719-
ticker := time.NewTicker(10 * time.Minute)
720-
defer ticker.Stop()
732+
var noAck = false
721733
// Wait for the caller to terminate the call. Send regular keep alives
734+
ticker := time.NewTicker(stateUpdateTick)
735+
defer ticker.Stop()
722736
for {
723737
select {
724738
case <-ticker.C:
@@ -734,8 +748,21 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
734748
c.close(false, callDropped, "removed")
735749
return nil
736750
case <-c.media.Timeout():
751+
if noAck {
752+
c.log.Errorw("Media timeout after missing ACK", errNoACK)
753+
c.closeWithNoACK()
754+
return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK)
755+
}
737756
c.closeWithTimeout()
738757
return psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout")
758+
case <-ackReceived:
759+
ackTimeout = nil // all good, disable timeout
760+
case <-ackTimeout:
761+
// Only warn, the other side still thinks the call is active, media may be flowing.
762+
c.log.Warnw("Call accepted, but no ACK received", errNoACK)
763+
// We don't need to wait for a full media timeout initially, we already know something is not quite right.
764+
c.media.SetTimeout(min(inviteOkAckLateTimeout, c.s.conf.MediaTimeoutInitial), c.s.conf.MediaTimeout)
765+
noAck = true
739766
}
740767
}
741768
}
@@ -971,6 +998,10 @@ func (c *inboundCall) closeWithTimeout() {
971998
c.close(true, callDropped, "media-timeout")
972999
}
9731000

1001+
func (c *inboundCall) closeWithNoACK() {
1002+
c.close(true, callNoACK, "no-ack")
1003+
}
1004+
9741005
func (c *inboundCall) closeWithCancelled() {
9751006
c.state.DeferUpdate(func(info *livekit.SIPCallInfo) {
9761007
info.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED
@@ -1162,6 +1193,7 @@ func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite
11621193
id: id,
11631194
invite: invite,
11641195
inviteTx: inviteTx,
1196+
legTr: legTransportFromReq(invite),
11651197
contact: &sip.ContactHeader{
11661198
Address: *contact.GetContactURI(),
11671199
},
@@ -1195,6 +1227,7 @@ type sipInbound struct {
11951227
cancelled chan struct{}
11961228
from *sip.FromHeader
11971229
to *sip.ToHeader
1230+
legTr Transport
11981231
referDone chan error
11991232

12001233
mu sync.RWMutex
@@ -1352,6 +1385,10 @@ func (c *sipInbound) stopRinging() {
13521385
}
13531386
}
13541387

1388+
func (c *sipInbound) InviteACK() <-chan struct{} {
1389+
return c.acked.Watch()
1390+
}
1391+
13551392
func (c *sipInbound) Cancelled() <-chan struct{} {
13561393
return c.cancelled
13571394
}
@@ -1370,6 +1407,11 @@ func (c *sipInbound) addExtraHeaders(r *sip.Response) {
13701407
}
13711408
}
13721409

1410+
func (c *sipInbound) accepted(inviteOK *sip.Response) {
1411+
c.inviteOk = inviteOK
1412+
c.inviteTx = nil
1413+
}
1414+
13731415
func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[string]string) error {
13741416
ctx, span := tracer.Start(ctx, "sipInbound.Accept")
13751417
defer span.End()
@@ -1390,23 +1432,51 @@ func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[str
13901432
r.AppendHeader(sip.NewHeader(k, v))
13911433
}
13921434
c.stopRinging()
1393-
if err := c.inviteTx.Respond(r); err != nil {
1394-
return err
1435+
retryAfter := inviteOkRetryInterval
1436+
maxRetries := inviteOKRetryAttempts
1437+
if !c.s.conf.Experimental.InboundWaitACK {
1438+
// Still retry, but limit it to ~750ms.
1439+
maxRetries = inviteOKRetryAttemptsNoACK
1440+
}
1441+
if c.legTr != TransportUDP {
1442+
maxRetries = 1
1443+
// That actually becomes an ACK timeout here.
1444+
retryAfter = inviteOkRetryIntervalMax
1445+
}
1446+
var acceptErr error
1447+
retries:
1448+
for try := 1; ; try++ {
1449+
if err := c.inviteTx.Respond(r); err != nil {
1450+
return err
1451+
}
1452+
if c.legTr != TransportUDP && !c.s.conf.Experimental.InboundWaitACK {
1453+
// Reliable transport and we are not waiting for ACK - return immediately.
1454+
break retries
1455+
}
1456+
t := time.NewTimer(retryAfter)
1457+
select {
1458+
case <-c.inviteTx.Acks():
1459+
t.Stop()
1460+
break retries
1461+
case <-c.acked.Watch():
1462+
t.Stop()
1463+
break retries
1464+
case <-t.C:
1465+
}
1466+
if try > maxRetries {
1467+
// Only set error if an option is enabled.
1468+
// Otherwise, ignore missing ACK for now.
1469+
if c.s.conf.Experimental.InboundWaitACK {
1470+
acceptErr = errNoACK
1471+
}
1472+
break retries
1473+
}
1474+
retryAfter *= 2
1475+
retryAfter = min(retryAfter, inviteOkRetryIntervalMax)
13951476
}
1396-
ackCtx, ackCancel := context.WithTimeout(ctx, inviteOkAckTimeout)
1397-
defer ackCancel()
1398-
select {
1399-
case <-ackCtx.Done():
1400-
// Other side may think it's accepted, so update our state accordingly.
1401-
c.inviteOk = r
1402-
c.inviteTx = nil
1403-
return errNoACK
1404-
case <-c.inviteTx.Acks():
1405-
case <-c.acked.Watch():
1406-
}
1407-
c.inviteOk = r
1408-
c.inviteTx = nil // accepted
1409-
return nil
1477+
// Other side likely thinks it's accepted, so update our state accordingly, even if no ACK follows.
1478+
c.accepted(r)
1479+
return acceptErr
14101480
}
14111481

14121482
func (c *sipInbound) AcceptAck(req *sip.Request, tx sip.ServerTransaction) {

0 commit comments

Comments
 (0)