Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAP support for HeaderToField #71

Open
prestonmcgowan opened this issue Feb 11, 2021 · 4 comments
Open

MAP support for HeaderToField #71

prestonmcgowan opened this issue Feb 11, 2021 · 4 comments

Comments

@prestonmcgowan
Copy link

I am making use of the HeaderToField to attempt to filter messages from a message's header.
The message header I am making use of is added by replicator, the provenance header for replicator (__replicator_id).

I attempted to add the HeaderToField transform, but the connect failed with the following error.
Configuration:

    "transforms" : "headerToField",
    "transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
    "transforms.headerToField.header.mappings" : "__replicator_id:MAP"

I had also tried STRING and BYTES, for grins, still complained about MAP is not a supported type.

Error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    used by: java.lang.UnsupportedOperationException: MAP is not a supported type.
    at com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation.processMap(BaseKeyValueTransformation.java:48)
    at com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation.process(BaseKeyValueTransformation.java:130)
    at com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation.apply(BaseKeyValueTransformation.java:192)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
    ... 11 more
@andriihrachov
Copy link

Same for me when using mongodb sink, but even without setting MAP

    transforms=headerToField
    transforms.headerToField.type=com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value
    transforms.headerToField.header.mappings=x-event-type:BYTES:x_event_type
    ```

@shberlin
Copy link

Same for me. I assume, this happens when the Header is given as MAP instead of LIST. A proper MAP support would be awesome.

@jx2lee
Copy link

jx2lee commented Oct 12, 2023

related PR opend (#99)

@setrawicana
Copy link

Bumping this up. Facing the same issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants