-
Notifications
You must be signed in to change notification settings - Fork 543
Feat/kinesis binding vmware go kcl v2 latest #4082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat/kinesis binding vmware go kcl v2 latest #4082
Conversation
sicoyle
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for your contribution to the Dapr project 🙌 - few comments :)
bindings/aws/kinesis/kinesis.go
Outdated
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| applicationName string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please share some insights on why the additional field here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please share some insights on why the additional field here?
applicationName is required for KCL (Kinesis Client Library) worker configuration in shared throughput mode. It identifies the consumer application and is used for DynamoDB table naming and checkpointing.
Without applicationName we were facing an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this field next to consumerMode. And the comment is not necessary if you ask me, I'd remove it for consistency, but feel free to keep if you think it's useful :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this field next to
consumerMode. And the comment is not necessary if you ask me, I'd remove it for consistency, but feel free to keep if you think it's useful :)
Done
common/authentication/aws/client.go
Outdated
| v1Creds, err := c.Credentials.Get() | ||
| if err != nil { | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we ignoring the err here? If we are updating to use v2 then can you make the v2 creds provider be the default and if err then maybe try the v1 and then err from there? but pls reference the other aws components to see the auth flow so we can ensure that we standardize on this for when using v2 sdks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we ignoring the err here? If we are updating to use v2 then can you make the v2 creds provider be the default and if err then maybe try the v1 and then err from there? but pls reference the other aws components to see the auth flow so we can ensure that we standardize on this for when using v2 sdks.
Done please verify this.
|
The git history of this PR seems odd, lots of those commits are unrelated. Do you mind cleaning it up so it only contains commits related to this change? |
Signed-off-by: swatimodi-scout <[email protected]>
Signed-off-by: devendrapohekar-scout <[email protected]>
e6bc920 to
cabcec5
Compare
@acroca Thanks for the feedback! I’ve cleaned up the branch history — the PR now contains only the relevant commit for this change. Please let me know if anything else needs adjustment. |
Signed-off-by: swatimodi-scout <[email protected]>
bindings/aws/kinesis/kinesis.go
Outdated
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| applicationName string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this field next to consumerMode. And the comment is not necessary if you ask me, I'd remove it for consistency, but feel free to keep if you think it's useful :)
common/authentication/aws/client.go
Outdated
| /** | ||
| * If the error is not nil, do not proceed to the next step | ||
| * as it may cause a nil pointer error on stream.StreamDescription.StreamARN. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is not necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is not necessary
Done
common/authentication/aws/client.go
Outdated
| kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials) | ||
| return kclConfig | ||
| } | ||
| // Fallback to v1 credentials if v2 fails |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which cases would v2 fail, but v1 work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which cases would v2 fail, but v1 work?
Good question! You're right to point that out. Looking at this more carefully, the v2 fallback to v1 scenario is actually quite rare and adds unnecessary complexity.
In practice, both v1 and v2 SDKs use the same underlying credential sources (env vars, credential files, IAM roles, etc.), so if v2 fails to load credentials, v1 would likely fail for the same reason.
Since we already have validated v1 credentials from the established session, I've simplified this to directly use those credentials and convert them to v2 format for KCL compatibility. This is more reliable and removes the redundant credential resolution logic.
Updated the code to use the existing session credentials directly - much cleaner approach! 👍
Co-authored-by: Albert Callarisa <[email protected]> Signed-off-by: swatimodi-scout <[email protected]>
Co-authored-by: Albert Callarisa <[email protected]> Signed-off-by: swatimodi-scout <[email protected]>
Signed-off-by: swatimodi-scout <[email protected]>
common/authentication/aws/client.go
Outdated
| v1Creds, err := c.Credentials.Get() | ||
| if err != nil { | ||
| return nil | ||
| } | ||
| v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels wrong to have both v1 and v2 living together. Can't we migrate completely to v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels wrong to have both v1 and v2 living together. Can't we migrate completely to v2?
Done please review it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works, it's not wired up to the refresh at all.
I think it was better before, but the conversion from v1 credentials to v2 should be done in the KinesisClients.New, which is called in the refresh operations.
This way, you can convert all kinesis to v2 and only do the v1->v2 in the New right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thansks @swatimodi-scout! What if you fully migrate the aws client to v2? you do not need to migrate all other components, it would be something similar to this PR that @mikeee is working on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thansks @swatimodi-scout! What if you fully migrate the aws client to v2? you do not need to migrate all other components, it would be something similar to this PR that @mikeee is working on.
@javier-aliaga Yes, that makes sense. However, we’re not migrating the entire AWS SDK version at this stage — my current change focuses only on updating vmware-go-kcl to vmware-go-kcl-v2.
Would you prefer that I continue implementing the AWS SDK v2 changes for Kinesis within this same PR, or should I move that work to a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works, it's not wired up to the refresh at all. I think it was better before, but the conversion from v1 credentials to v2 should be done in the
KinesisClients.New, which is called in the refresh operations. This way, you can convert all kinesis to v2 and only do the v1->v2 in theNewright?
@acroca I updated in KinesisClients.New. It is working i checked in local. Let me know if i need to revert back to previous version where we were supporting both v1 and v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do it in the same PR as changing to aws-sdk-v2 is related to update the vmware-go-kcl-v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do it in the same PR as changing to aws-sdk-v2 is related to update the vmware-go-kcl-v2
@javier-aliaga Please review the code.
Signed-off-by: swatimodi-scout <[email protected]>
01bf923 to
45d9390
Compare
Signed-off-by: swatimodi-scout <[email protected]> Signed-off-by: rideshnath-scout <[email protected]>
Signed-off-by: rideshnath-scout <[email protected]>
2c85b67 to
9dafc6d
Compare
common/authentication/aws/client.go
Outdated
| Kinesis *kinesisv2.Client | ||
| Region string | ||
| Credentials *credentials.Credentials | ||
| V2Credentials aws.CredentialsProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To migrate to aws sdk V2 move this out to the kinesis.go file and create and store the client there. No references to sdk v2 should be here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To migrate to aws sdk V2 move this out to the kinesis.go file and create and store the client there. No references to sdk v2 should be here
@javier-aliaga All AWS resource clients are currently defined in client.go. Is there any specific reason we should move only the Kinesis client out of this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To migrate to aws sdk V2 move this out to the kinesis.go file and create and store the client there. No references to sdk v2 should be here
Done please review it
…d tests Signed-off-by: rideshnath-scout <[email protected]>
…ts (resolve linting issues) Signed-off-by: rideshnath-scout <[email protected]>
|
@mikeee @javier-aliaga Requested changes are done. Please review it. |
|
@swatimodi-scout linting is failing, can you fix it? |
Signed-off-by: rideshnath-scout <[email protected]>
@javier-aliaga I have fixed the lint issue, please check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linter issue - complaining of the need to run goimports at the least (or golangci-lint run --fix)
The changes to /common/authentication/aws should also be reverted if the package is not being referenced in this PR.
|
/ok-to-test |
Complete Build MatrixThe build status is currently not updated here. Please visit the action run below directly. Commit ref: d8f5103 |
Components certification testCommit ref: d8f5103 ❌ Some certification tests failedThese tests failed:
Additionally, some tests did not report a status:
|
Components conformance testCommit ref: d8f5103 ❌ Some conformance tests failedThese tests failed:
Additionally, some tests did not report a status:
|
mikeee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My last review comment still stands
Are you talking about this change? - The changes to /common/authentication/aws should also be reverted if the package is not being referenced in this PR. |
Signed-off-by: rideshnath-scout <[email protected]>
@mikeee I have reverted the changes. |
Done please review the changes |
bindings/aws/kinesis/kinesis.go
Outdated
| // Iterate 18 times | ||
| for range 18 { | ||
| consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive { | ||
| return nil | ||
| } | ||
| time.Sleep(10 * time.Second) | ||
| } | ||
| w.ApplyOptions(opts...) | ||
|
|
||
| return w.WaitWithContext(ctx) | ||
| return errors.New("consumer did not become active within timeout") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q:
I'm not sure on this but I believe all the aws service clients use the a default number of retries built-in which I'm not aware of us customising.
What is the significance of the magic number (18) used and does this mean for these requests the Nopretryer should be used here, and are we iterating more than the intended number of times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mikeee You're absolutely right to question this! The magic number 18 appears to be implementing a custom polling mechanism.
Let me fix this by implementing a proper polling mechanism with exponential backoff instead of the this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mikeee Could you please check and let me know if any changes are needed ?
Signed-off-by: rideshnath-scout <[email protected]>
mikeee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm, just noting that I haven't got an environment set up to test this and we don't have certification/conformance test coverage for this binding. A bonus definitely would be if you could write some e2e tests I can set up the resources on our aws account for them.
https://github.com/dapr/components-contrib/tree/main/tests/certification/bindings/aws/s3
…solve conflicts) Signed-off-by: rideshnath-scout <[email protected]>
@mikeee Thanks for the review. We’ve tested the changes locally, and everything is working as expected. Would it be possible to merge this PR at the earliest? We can follow up by adding the e2e test cases separately afterward. |
Description
This PR includes two key improvements to the AWS Kinesis input binding:
These changes improve stability and maintainability of the Kinesis input binding, ensuring compatibility with LocalStack and future AWS SDK updates.
Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #[3980, 3985]
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list:
Note: We expect contributors to open a corresponding documentation PR in the dapr/docs repository. As the implementer, you are the best person to document your work! Implementation PRs will not be merged until the documentation PR is opened and ready for review.