Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement processMap function to MAP structured data #99

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

peacecwz
Copy link

Hi @jcustenborder,

Maybe you saw If you are using HeaderToField as transforms and your data struct is Map, the plugin is throwing error like "MAP is unsupported ..." I implemented processMap function and I tried to use it and It works well. Maybe you want to merge it as contribution

@jcustenborder
Copy link
Owner

@peacecwz Happy to merge this! Would you mind adding some unit tests?

@peacecwz
Copy link
Author

@jcustenborder Actually no but I can add some unit tests for the function. I'll update the PR quickly

@peacecwz
Copy link
Author

@jcustenborder I added test for processMap Is that okay for that?

@peacecwz
Copy link
Author

@jcustenborder btw If this PR will be merged can you also make a release it? Because I'm using my private artifact. I couldn't run Jenkins pipeline. It uploaded the artifact manually to S3 and deploy it as well but I would like to deploy with confluent-kafka CLI as officially

@peacecwz
Copy link
Author

@jcustenborder Hey can you check the PR?

});
}

input.put("_headers", headers);
Copy link

@harpaj harpaj Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested this code because we are running into a similar problem.
It appears to add the extracted values to the new struct field _headers which you are setting here, creating a nested structure.
So

".header.mappings": "time:INT64:d,time:INT64:h,time:INT64:m"

becomes

"value": {
    "_headers" : {
        "d" : 1669852804800000000,
        "h" : 1669852804800000000,
        "m" : 1669852804800000000
    }
}

I would have expected a flat structure here (and interestingly also the test you added shows a flat structure).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test actually doesn't test this correctly.
To have the same behaviour as for Structs (i,e, adding the new fields to the root), this should be:

Suggested change
input.put("_headers", headers);
input.putAll(headers);

@okayhooni
Copy link
Contributor

okayhooni commented Oct 20, 2023

@peacecwz

Could you fix same issue on the ChangeCase SMT..?

(If not.. I will try to implement processMap method on the ChangeCase, by referring to your commit.)

Caused by: java.lang.UnsupportedOperationException: MAP is not a supported type.
	at com.github.jcustenborder.kafka.connect.transform.common.BaseTransformation.processMap(BaseTransformation.java:39)
	at com.github.jcustenborder.kafka.connect.transform.common.BaseTransformation.process(BaseTransformation.java:120)
	at com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value.apply(ChangeCase.java:128)
	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
	... 15 more

Copy link

@greyfairer greyfairer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my suggestions to fix the test, because it doesn't test anything now.

And I'd also prefer the transformation to behave similar with Maps (JSON) as with Structs (AVRO), so add the field to the root of the object.

});
}

input.put("_headers", headers);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test actually doesn't test this correctly.
To have the same behaviour as for Structs (i,e, adding the new fields to the root), this should be:

Suggested change
input.put("_headers", headers);
input.putAll(headers);

@@ -71,4 +68,44 @@ public void apply() throws IOException {
assertStruct(expectedStruct, (Struct) actualRecord.value());
}

@Test
public void applyWithMap() throws IOException {
this.transformation = new HeaderToField.Key<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.transformation = new HeaderToField.Key<>();
this.transformation = new HeaderToField.Value<>();

ConnectHeaders inputHeaders = new ConnectHeaders();
inputHeaders.addString("applicationId", "testing");

Schema inputSchema = SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.OPTIONAL_STRING_SCHEMA)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Schema inputSchema = SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.OPTIONAL_STRING_SCHEMA)
Map<String, Object> inputSchema = new HashMap<>();
value.put("firstName", "example");
value.put("lastName", "user");


SinkRecord actualRecord = this.transformation.apply(inputRecord);
assertNotNull(actualRecord, "record should not be null.");
assertEquals(expectedSchema.parameters().size(), 3);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertEquals(expectedSchema.parameters().size(), 3);
assertEquals("testing", ((Map<String, String>)actualRecord.value()).get("applicationId"));

@greyfairer
Copy link

@peacecwz see peacecwz#1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants