@@ -14,15 +14,20 @@ import (
14
14
"github.com/prometheus/client_golang/prometheus"
15
15
"github.com/rs/zerolog"
16
16
"go.uber.org/atomic"
17
+ "google.golang.org/genproto/googleapis/rpc/errdetails"
17
18
"google.golang.org/grpc"
19
+ "google.golang.org/grpc/codes"
18
20
"google.golang.org/grpc/connectivity"
21
+ "google.golang.org/grpc/status"
19
22
"google.golang.org/protobuf/proto"
20
23
21
24
"github.com/authzed/spicedb/internal/dispatch"
22
25
"github.com/authzed/spicedb/internal/dispatch/keys"
23
26
log "github.com/authzed/spicedb/internal/logging"
27
+ corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
24
28
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
25
29
"github.com/authzed/spicedb/pkg/spiceerrors"
30
+ "github.com/authzed/spicedb/pkg/tuple"
26
31
)
27
32
28
33
var dispatchCounter = prometheus .NewCounterVec (prometheus.CounterOpts {
@@ -130,6 +135,7 @@ func NewClusterDispatcher(client ClusterClient, conn *grpc.ClientConn, config Cl
130
135
secondaryDispatch : secondaryDispatch ,
131
136
secondaryDispatchExprs : secondaryDispatchExprs ,
132
137
secondaryInitialResponseDigests : secondaryInitialResponseDigests ,
138
+ supportedResourceSubjectTracker : newSupportedResourceSubjectTracker (),
133
139
}, nil
134
140
}
135
141
@@ -142,6 +148,7 @@ type clusterDispatcher struct {
142
148
secondaryDispatch map [string ]SecondaryDispatch
143
149
secondaryDispatchExprs map [string ]* DispatchExpr
144
150
secondaryInitialResponseDigests map [string ]* digestAndLock
151
+ supportedResourceSubjectTracker * supportedResourceSubjectTracker
145
152
}
146
153
147
154
// digestAndLock is a struct that holds a TDigest and a lock to protect it.
@@ -188,15 +195,22 @@ func (cr *clusterDispatcher) DispatchCheck(ctx context.Context, req *v1.Dispatch
188
195
189
196
ctx = context .WithValue (ctx , consistent .CtxKey , requestKey )
190
197
191
- resp , err := dispatchSyncRequest (ctx , cr , "check" , req , func (ctx context.Context , client ClusterClient ) (* v1.DispatchCheckResponse , error ) {
192
- resp , err := client .DispatchCheck (ctx , req )
193
- if err != nil {
194
- return resp , err
195
- }
198
+ resp , err := dispatchSyncRequest (
199
+ ctx ,
200
+ cr ,
201
+ "check" ,
202
+ req ,
203
+ tuple .FromCoreRelationReference (req .ResourceRelation ),
204
+ tuple .RR (req .Subject .Namespace , req .Subject .Relation ),
205
+ func (ctx context.Context , client ClusterClient ) (* v1.DispatchCheckResponse , error ) {
206
+ resp , err := client .DispatchCheck (ctx , req )
207
+ if err != nil {
208
+ return resp , err
209
+ }
196
210
197
- err = adjustMetadataForDispatch (resp .Metadata )
198
- return resp , err
199
- })
211
+ err = adjustMetadataForDispatch (resp .Metadata )
212
+ return resp , err
213
+ })
200
214
if err != nil {
201
215
return & v1.DispatchCheckResponse {Metadata : requestFailureMetadata }, err
202
216
}
@@ -216,6 +230,13 @@ type responseMessage interface {
216
230
GetMetadata () * v1.ResponseMeta
217
231
}
218
232
233
+ type streamingRequestMessage interface {
234
+ requestMessage
235
+
236
+ GetResourceRelation () * corev1.RelationReference
237
+ GetSubjectRelation () * corev1.RelationReference
238
+ }
239
+
219
240
type respTuple [S responseMessage ] struct {
220
241
resp S
221
242
err error
@@ -226,7 +247,15 @@ type secondaryRespTuple[S responseMessage] struct {
226
247
resp S
227
248
}
228
249
229
- func dispatchSyncRequest [Q requestMessage , S responseMessage ](ctx context.Context , cr * clusterDispatcher , reqKey string , req Q , handler func (context.Context , ClusterClient ) (S , error )) (S , error ) {
250
+ func dispatchSyncRequest [Q requestMessage , S responseMessage ](
251
+ ctx context.Context ,
252
+ cr * clusterDispatcher ,
253
+ reqKey string ,
254
+ req Q ,
255
+ resourceTypeAndRelation tuple.RelationReference ,
256
+ subjectTypeAndRelation tuple.RelationReference ,
257
+ handler func (context.Context , ClusterClient ) (S , error ),
258
+ ) (S , error ) {
230
259
withTimeout , cancelFn := context .WithTimeout (ctx , cr .dispatchOverallTimeout )
231
260
defer cancelFn ()
232
261
@@ -248,9 +277,11 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](ctx context.Contex
248
277
go func () {
249
278
// Have the main dispatch wait some time before returning, to allow the secondaries to
250
279
// potentially return first.
251
- computedWait := cr .secondaryInitialResponseDigests [ reqKey ]. getWaitTime ( )
280
+ computedWait := cr .getPrimaryWaitTime ( reqKey , resourceTypeAndRelation , subjectTypeAndRelation )
252
281
log .Trace ().Stringer ("computed-wait" , computedWait ).Msg ("primary dispatch started; sleeping for computed wait time" )
253
- time .Sleep (computedWait )
282
+ if computedWait > 0 {
283
+ time .Sleep (computedWait )
284
+ }
254
285
255
286
log .Trace ().Msg ("running primary dispatch after wait" )
256
287
select {
@@ -296,10 +327,12 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](ctx context.Contex
296
327
// For secondary dispatches, ignore any errors, as only the primary will be handled in
297
328
// that scenario.
298
329
log .Trace ().Stringer ("duration" , handlerDuration ).Str ("secondary" , secondary .Name ).Err (err ).Msg ("got ignored secondary dispatch error" )
330
+ cr .supportedResourceSubjectTracker .updateForError (err )
299
331
return
300
332
}
301
333
302
334
log .Trace ().Stringer ("duration" , handlerDuration ).Str ("secondary" , secondary .Name ).Msg ("secondary dispatch completed" )
335
+ go cr .supportedResourceSubjectTracker .updateForSuccess (resourceTypeAndRelation , subjectTypeAndRelation )
303
336
cr .secondaryInitialResponseDigests [reqKey ].addResultTime (handlerDuration )
304
337
secondaryResultChan <- secondaryRespTuple [S ]{resp : resp , handlerName : secondary .Name }
305
338
}
@@ -397,7 +430,7 @@ type ctxAndCancel struct {
397
430
// secondary dispatchers. Unlike the non-streaming version, this will first attempt to dispatch
398
431
// from the allowed secondary dispatchers before falling back to the primary, rather than running
399
432
// them in parallel.
400
- func dispatchStreamingRequest [Q requestMessage , R responseMessage ](
433
+ func dispatchStreamingRequest [Q streamingRequestMessage , R responseMessage ](
401
434
ctx context.Context ,
402
435
cr * clusterDispatcher ,
403
436
reqKey string ,
@@ -495,9 +528,14 @@ func dispatchStreamingRequest[Q requestMessage, R responseMessage](
495
528
isPrimary := name == primaryDispatcher
496
529
if isPrimary {
497
530
// Have the primary wait a bit to ensure the secondaries have a chance to return first.
498
- computedWait := cr .secondaryInitialResponseDigests [reqKey ].getWaitTime ()
499
- time .Sleep (computedWait )
500
- hedgeWaitHistogram .WithLabelValues (reqKey ).Observe (computedWait .Seconds ())
531
+ computedWait := cr .getPrimaryWaitTime (reqKey ,
532
+ tuple .FromCoreRelationReference (req .GetResourceRelation ()),
533
+ tuple .FromCoreRelationReference (req .GetSubjectRelation ()),
534
+ )
535
+ if computedWait > 0 {
536
+ time .Sleep (computedWait )
537
+ hedgeWaitHistogram .WithLabelValues (reqKey ).Observe (computedWait .Seconds ())
538
+ }
501
539
} else {
502
540
startTime = time .Now ()
503
541
}
@@ -521,6 +559,10 @@ func dispatchStreamingRequest[Q requestMessage, R responseMessage](
521
559
primaryDispatch .WithLabelValues ("false" , reqKey ).Inc ()
522
560
}
523
561
if err != nil {
562
+ if ! isPrimary {
563
+ cr .supportedResourceSubjectTracker .updateForError (err )
564
+ }
565
+
524
566
log .Warn ().Err (err ).Str ("dispatcher" , name ).Msg ("error when trying to run secondary dispatcher" )
525
567
errorsLock .Lock ()
526
568
errorsByDispatcherName [name ] = err
@@ -549,6 +591,10 @@ func dispatchStreamingRequest[Q requestMessage, R responseMessage](
549
591
finishTime := time .Now ()
550
592
duration := finishTime .Sub (startTime )
551
593
cr .secondaryInitialResponseDigests [reqKey ].addResultTime (duration )
594
+ go cr .supportedResourceSubjectTracker .updateForSuccess (
595
+ tuple .FromCoreRelationReference (req .GetResourceRelation ()),
596
+ tuple .FromCoreRelationReference (req .GetSubjectRelation ()),
597
+ )
552
598
}
553
599
554
600
// If a valid result, and we have not yet returned any results, try take a "lock" on
@@ -576,6 +622,10 @@ func dispatchStreamingRequest[Q requestMessage, R responseMessage](
576
622
}
577
623
578
624
if err != nil {
625
+ if ! isPrimary {
626
+ cr .supportedResourceSubjectTracker .updateForError (err )
627
+ }
628
+
579
629
errorsLock .Lock ()
580
630
errorsByDispatcherName [name ] = err
581
631
errorsLock .Unlock ()
@@ -728,6 +778,108 @@ func (cr *clusterDispatcher) ReadyState() dispatch.ReadyState {
728
778
}
729
779
}
730
780
781
+ // getPrimaryWaitTime returns the wait time for the primary dispatch, based on the request key and the
782
+ // resource and subject types and relations. If the resource or subject is unsupported, the wait time
783
+ // will be zero.
784
+ func (cr * clusterDispatcher ) getPrimaryWaitTime (
785
+ reqKey string ,
786
+ resourceTypeAndRelation tuple.RelationReference ,
787
+ subjectTypeAndRelation tuple.RelationReference ,
788
+ ) time.Duration {
789
+ // If the resource or subject is unsupported, return zero time.
790
+ if cr .supportedResourceSubjectTracker .isUnsupported (
791
+ resourceTypeAndRelation ,
792
+ subjectTypeAndRelation ,
793
+ ) {
794
+ return 0
795
+ }
796
+
797
+ return cr .secondaryInitialResponseDigests [reqKey ].getWaitTime ()
798
+ }
799
+
800
+ // supportedResourceSubjectTracker is a struct that tracks the resources and subjects that are
801
+ // unsupported by the secondary dispatcher(s). If a resource or subject is unsupported, the primary
802
+ // dispatcher will not wait for the secondary dispatchers to return results, as it is extremely
803
+ // likely that hedging will not be beneficial.
804
+ type supportedResourceSubjectTracker struct {
805
+ unsupportedResources sync.Map
806
+ unsupportedSubjects sync.Map
807
+ }
808
+
809
+ func newSupportedResourceSubjectTracker () * supportedResourceSubjectTracker {
810
+ return & supportedResourceSubjectTracker {
811
+ unsupportedResources : sync.Map {},
812
+ unsupportedSubjects : sync.Map {},
813
+ }
814
+ }
815
+
816
+ // isUnsupported returns whether the resource or subject is unsupported by the secondary dispatcher(s).
817
+ func (srst * supportedResourceSubjectTracker ) isUnsupported (
818
+ resourceTypeAndRelation tuple.RelationReference ,
819
+ subjectTypeAndRelation tuple.RelationReference ,
820
+ ) bool {
821
+ isUnsupportedResource , found := srst .unsupportedResources .Load (resourceTypeAndRelation )
822
+ if found && isUnsupportedResource .(bool ) {
823
+ return true
824
+ }
825
+
826
+ isUnsupportedSubject , found := srst .unsupportedSubjects .Load (subjectTypeAndRelation )
827
+ return found && isUnsupportedSubject .(bool )
828
+ }
829
+
830
+ // updateForSuccess updates the tracker for a successful dispatch, removing the resource and subject
831
+ // from the unsupported set.
832
+ func (srst * supportedResourceSubjectTracker ) updateForSuccess (
833
+ resourceTypeAndRelation tuple.RelationReference ,
834
+ subjectTypeAndRelation tuple.RelationReference ,
835
+ ) {
836
+ srst .unsupportedResources .CompareAndSwap (resourceTypeAndRelation , true , false )
837
+ srst .unsupportedSubjects .CompareAndSwap (subjectTypeAndRelation , true , false )
838
+ }
839
+
840
+ // updateForError updates the tracker for an error, adding the resource or subject to the unsupported set
841
+ // if the error indicates that the resource or subject is unsupported.
842
+ func (srst * supportedResourceSubjectTracker ) updateForError (err error ) {
843
+ // Check for a FAILED_PRECONDITION with error details that indicate an unsupported
844
+ // resource or subject.
845
+ st , ok := status .FromError (err )
846
+ if ! ok {
847
+ return
848
+ }
849
+
850
+ if st .Code () != codes .FailedPrecondition {
851
+ return
852
+ }
853
+
854
+ for _ , detail := range st .Details () {
855
+ errDetail , ok := detail .(* errdetails.ErrorInfo )
856
+ if ! ok {
857
+ continue
858
+ }
859
+
860
+ // If the error is an unsupported resource or subject, add it to the tracker.
861
+ if errDetail .Reason == "UNSUPPORTED_RESOURCE_RELATION" {
862
+ definitionName := errDetail .Metadata ["definition_name" ]
863
+ relationName := errDetail .Metadata ["relation_name" ]
864
+ rr := tuple .RR (definitionName , relationName )
865
+ existing , loaded := srst .unsupportedResources .LoadOrStore (rr , true )
866
+ if ! loaded || ! existing .(bool ) {
867
+ srst .unsupportedResources .Store (rr , true )
868
+ }
869
+ }
870
+
871
+ if errDetail .Reason == "UNSUPPORTED_SUBJECT_RELATION" {
872
+ definitionName := errDetail .Metadata ["definition_name" ]
873
+ relationName := errDetail .Metadata ["relation_name" ]
874
+ rr := tuple .RR (definitionName , relationName )
875
+ existing , loaded := srst .unsupportedSubjects .LoadOrStore (rr , true )
876
+ if ! loaded || ! existing .(bool ) {
877
+ srst .unsupportedSubjects .Store (rr , true )
878
+ }
879
+ }
880
+ }
881
+ }
882
+
731
883
// Always verify that we implement the interface
732
884
var _ dispatch.Dispatcher = & clusterDispatcher {}
733
885
0 commit comments