Skip to content

Commit b1c1489

Browse files
fix: improve consumer existence check with exponential backoff
Signed-off-by: rideshnath-scout <[email protected]>
1 parent 2a205cb commit b1c1489

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

bindings/aws/kinesis/kinesis.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,18 +339,23 @@ func (a *AWSKinesis) deregisterConsumer(_ context.Context, streamARN *string, co
339339
}
340340

341341
func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error {
342-
// Iterate 18 times
343-
for range 18 {
342+
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
343+
defer cancel()
344+
345+
bo := backoff.NewExponentialBackOff()
346+
bo.InitialInterval = 2 * time.Second
347+
bo.MaxInterval = 30 * time.Second
348+
349+
return backoff.Retry(func() error {
344350
consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input)
345351
if err != nil {
346-
return err
352+
return backoff.Permanent(err)
347353
}
348-
if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive {
349-
return nil
354+
if consumer.ConsumerDescription.ConsumerStatus != types.ConsumerStatusActive {
355+
return errors.New("consumer not active yet")
350356
}
351-
time.Sleep(10 * time.Second)
352-
}
353-
return errors.New("consumer did not become active within timeout")
357+
return nil
358+
}, backoff.WithContext(bo, ctx))
354359
}
355360

356361
func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) {

0 commit comments

Comments
 (0)