@@ -2,13 +2,15 @@ package geolocation
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "math/rand"
78 "net/http"
89 "net/url"
910 "regexp"
1011 "strconv"
1112 "strings"
13+ "time"
1214
1315 "github.com/golang/glog"
1416 "github.com/julienschmidt/httprouter"
@@ -20,6 +22,14 @@ import (
2022 "github.com/livepeer/go-api-client"
2123)
2224
25+ const (
26+ streamSourceRetries = 20
27+ streamSourceRetryInterval = 3 * time .Second
28+ lockPullLeaseTimeout = 3 * time .Minute
29+ )
30+
31+ var errLockPull = errors .New ("failed to lock pull" )
32+
2333type GeolocationHandlersCollection struct {
2434 Balancer balancer.Balancer
2535 Cluster cluster.Cluster
@@ -150,31 +160,52 @@ func (c *GeolocationHandlersCollection) HandleStreamSource(ctx context.Context,
150160
151161 latStr := fmt .Sprintf ("%f" , lat )
152162 lonStr := fmt .Sprintf ("%f" , lon )
153- dtscURL , err := c .Balancer .MistUtilLoadSource (context .Background (), payload .StreamName , latStr , lonStr )
154- if err != nil {
155- glog .Errorf ("error querying mist for STREAM_SOURCE: %s" , err )
163+ for i := 0 ; i < streamSourceRetries ; i ++ {
164+ dtscURL , err := c .Balancer .MistUtilLoadSource (context .Background (), payload .StreamName , latStr , lonStr )
165+ if err == nil {
166+ return c .resolveReplicatedStream (dtscURL , payload .StreamName )
167+ }
156168
157- playbackID := payload .StreamName
158- parts := strings .Split (playbackID , "+" )
159- if len (parts ) == 2 {
160- playbackID = parts [1 ] // take the playbackID after the prefix e.g. 'video+'
169+ glog .Errorf ("error querying mist for STREAM_SOURCE: %s" , err )
170+ pullURL , err := c .getStreamPull (playbackIdFor (payload .StreamName ))
171+ if err == nil {
172+ if pullURL == "" {
173+ // not a stream pull
174+ return "push://" , nil
175+ } else {
176+ // start stream pull
177+ glog .Infof ("replying to Mist STREAM_SOURCE with stream pull request=%s response=%s" , payload .StreamName , pullURL )
178+ return pullURL , nil
179+ }
161180 }
162- pullURL , err := c . getStreamPull ( playbackID )
163- if err != nil {
181+ if ! errors . Is ( err , errLockPull ) {
182+ // stream pull failed for unknown reason
164183 glog .Errorf ("getStreamPull failed for %s: %s" , payload .StreamName , err )
165- } else if pullURL != "" {
166- glog .Infof ("replying to Mist STREAM_SOURCE with stream pull request=%s response=%s" , payload .StreamName , pullURL )
167- return pullURL , nil
184+ return "push://" , nil
168185 }
186+ // stream pull failed, because another node started to pull at the same time
187+ glog .Warningf ("another node is currently pulling the stream, waiting %v and retrying" , streamSourceRetryInterval )
188+ time .Sleep (streamSourceRetryInterval )
189+ }
190+ return "push://" , nil
191+ }
169192
170- return "push://" , nil
193+ func playbackIdFor (streamName string ) string {
194+ res := streamName
195+ parts := strings .Split (res , "+" )
196+ if len (parts ) == 2 {
197+ res = parts [1 ] // take the playbackID after the prefix e.g. 'video+'
171198 }
199+ return res
200+ }
201+
202+ func (c * GeolocationHandlersCollection ) resolveReplicatedStream (dtscURL string , streamName string ) (string , error ) {
172203 outURL , err := c .Cluster .ResolveNodeURL (dtscURL )
173204 if err != nil {
174205 glog .Errorf ("error finding STREAM_SOURCE: %s" , err )
175206 return "push://" , nil
176207 }
177- glog .V (7 ).Infof ("replying to Mist STREAM_SOURCE request=%s response=%s" , payload . StreamName , outURL )
208+ glog .V (7 ).Infof ("replying to Mist STREAM_SOURCE request=%s response=%s" , streamName , outURL )
178209 return outURL , nil
179210}
180211
@@ -200,6 +231,10 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string) (string
200231 return "" , nil
201232 }
202233
234+ if err := c .Lapi .LockPull (stream .ID , lockPullLeaseTimeout ); err != nil {
235+ return "" , errLockPull
236+ }
237+
203238 if len (stream .Pull .Headers ) == 0 {
204239 return stream .Pull .Source , nil
205240 }
0 commit comments