Skip to content

[improve][io] Support Protobuf schema for Kafka source connector#23954

Closed
jiangpengcheng wants to merge 13 commits intoapache:masterfrom
jiangpengcheng:support_protobuf_for_kafka_source
Closed

[improve][io] Support Protobuf schema for Kafka source connector#23954
jiangpengcheng wants to merge 13 commits intoapache:masterfrom
jiangpengcheng:support_protobuf_for_kafka_source

Conversation

@jiangpengcheng
Copy link
Copy Markdown
Contributor

Motivation

The Kafka source connector support schema registered in schema registry, but it only support Avro for now, while the schema registry also supports Protobuf, it's better to support it too

Modifications

Update the Kafka source connector to make it support Protobuf schema

Verifying this change

  • Make sure that the change passes the CI checks.

  • This change added tests and can be verified as follows:

    • Added integration tests for using KafkaProtobufDeserializer to create Kafka source connector

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: jiangpengcheng#39

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 10, 2025
@jiangpengcheng
Copy link
Copy Markdown
Contributor Author

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 13, 2025

Codecov Report

Attention: Patch coverage is 36.70886% with 50 lines in your changes missing coverage. Please review.

Project coverage is 74.20%. Comparing base (bbc6224) to head (ba65b6b).
Report is 959 commits behind head on master.

Files with missing lines Patch % Lines
...a/org/apache/pulsar/io/kafka/KafkaSchemaCache.java 16.32% 41 Missing ⚠️
...a/org/apache/pulsar/io/kafka/KafkaBytesSource.java 70.00% 6 Missing and 3 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23954      +/-   ##
============================================
+ Coverage     73.57%   74.20%   +0.63%     
+ Complexity    32624    32438     -186     
============================================
  Files          1877     1863      -14     
  Lines        139502   144332    +4830     
  Branches      15299    16467    +1168     
============================================
+ Hits         102638   107104    +4466     
+ Misses        28908    28771     -137     
- Partials       7956     8457     +501     
Flag Coverage Δ
inttests 26.74% <0.00%> (+2.16%) ⬆️
systests 23.16% <0.00%> (-1.16%) ⬇️
unittests 73.72% <36.70%> (+0.87%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...a/org/apache/pulsar/io/kafka/KafkaBytesSource.java 71.96% <70.00%> (+1.50%) ⬆️
...a/org/apache/pulsar/io/kafka/KafkaSchemaCache.java 16.32% <16.32%> (ø)

... and 1064 files with indirect coverage changes

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@jiangpengcheng
Copy link
Copy Markdown
Contributor Author

Hi @lhotari, could u help to review this pr, thx~

Copy link
Copy Markdown
Member

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@freeznet could you also help take a look?

@jiangpengcheng jiangpengcheng force-pushed the support_protobuf_for_kafka_source branch from ba65b6b to 07d4919 Compare April 23, 2025 06:47
Comment thread pulsar-io/kafka/pom.xml
@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 23, 2025

@jiangpengcheng I have resolved the merge conflict after #24201 changes which upgraded Confluent Platform version to 7.8.2 and Kafka client version to 3.8.1. The integration test seems to fail now. I'm not sure exactly what the problem is.

I guess you are already aware that the way to run the integration test locally is to first build the docker image with this command:

mvn -B -am -pl distribution/io,distribution/offloaders,distribution/server,distribution/shell,tests/docker-images/latest-version-image install \
          -Pmain,docker -Dmaven.test.skip=true \
          -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true

And then run the test

mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -DintegrationTests -pl tests/integration -Dtest=KafkaSourceTest#testProtobuf

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be even more to resolving the complete protobuf schema as can be seen in the documentation at https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html .

For example:

  • support for multiple message types defined in the same schema
  • support for schema references

Comment on lines +274 to +277
// the kafka protobuf serializer encodes the MessageIndexes in the payload, we need to skip them
if (schemaType == SchemaType.PROTOBUF_NATIVE) {
MessageIndexes.readFrom(buffer);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a meaning why the MessageIndexes exists. Each protobuf schema can include multiple message definitions and the MessageIndexes contains a solution for referencing a specific message.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the license issue is resolved, we could come back to this detail. If the code for KafkaProtobufDeserializer would be Apache 2.0 licensed, we could rather safely look at the code and see how the proper schema resolution can be handled for protobuf encoded messages. As long as we have the license issue, we better not copy-paste code due to IPR violation risk.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 23, 2025

One potential problem is the Confluent schema registry licenses. It says

The project is licensed under the Confluent Community License, except for the client-* and avro-* libs, which are under the Apache 2.0 license. See LICENSE file in each subfolder for detailed license agreement.

for example, https://github.com/confluentinc/schema-registry/blob/master/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/KafkaProtobufDeserializer.java doesn't have Apache 2.0 license.

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java is under Apache 2.0 license.

Comment thread pulsar-io/kafka/pom.xml

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the source repository, https://github.com/confluentinc/schema-registry, this library has a license of Confluent Community License. This didn't change between the previous
https://github.com/confluentinc/schema-registry/tree/v6.2.8 version and the currently used https://github.com/confluentinc/schema-registry/tree/v7.8.2 version.

The information provided by Confluent about the license is slightly conflicting since it says that client libraries are with Apache 2.0 license, but at the same time, it says "See LICENSE file in each subfolder for detailed license agreement". However, the license headers for protobuf classes are very explicit about the license. For Avro related classes, there's an Apache 2.0 license header.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the pom file https://packages.confluent.io/maven/io/confluent/kafka-protobuf-serializer/7.8.2/kafka-protobuf-serializer-7.8.2.pom, there's Apache 2.0 license. This is also the case for 6.2.8 version, https://packages.confluent.io/maven/io/confluent/kafka-protobuf-serializer/6.2.8/kafka-protobuf-serializer-6.2.8.pom . I wonder if they just have forgotten to make things consistent in the repository.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an existing issue about the license for protobuf libraries: confluentinc/schema-registry#1558

Copy link
Copy Markdown
Member

@lhotari lhotari Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's even a branch to fix the issue, dating back to 2021, but it was never merged: https://github.com/confluentinc/schema-registry/compare/protobuf-licensing

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confluent employee replied in 2021: confluentinc/schema-registry#1558 (comment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I didn't check the library license, will close this then

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 24, 2025

We cannot add a dependency on the io.confluent:kafka-protobuf-serializer library since it is under the Confluent Community License, as briefly commented in an issue discussion.

This is how Claude AI explained the licensing issue:

The Confluent Community License (CCL) and Apache 2.0 license have important compatibility considerations:

The main challenge is that the Confluent Community License is not an open source license according to the Open Source Initiative's definition. It contains field-of-use restrictions that prevent using the software to build competing services.

Specifically:

  • Apache 2.0 is a permissive open source license that allows code to be used in proprietary software
  • The CCL restricts using the licensed software to create "Competitive Offerings" (competing SaaS products)

This creates asymmetric compatibility:

  • You can include Apache 2.0 code in a CCL-licensed project
  • You generally cannot include CCL-licensed code in an Apache 2.0 project without the resulting work becoming subject to CCL restrictions

If you're developing software that combines components with these licenses, you should:

  1. Carefully review the specific license texts
  2. Consider the direction of code sharing between components
  3. Consult with legal counsel for your specific use case

If you need to maintain Apache 2.0 compliance, you'll typically need to avoid incorporating CCL-licensed code in your Apache project.

Unless io.confluent:kafka-protobuf-serializer gets relicensed under Apache 2.0 license, we cannot use it in Apache Pulsar. We should also avoid looking at the code to avoid any IPR issues and instead implement a "clean room" implementation from scratch using the documentation of the wire format, since one could assume that there's no IPR on the wire format itself.

We could also consider supporting protobuf encoded messages that aren't encoded using Confluent's schema registry client libraries and don't use the schema registry for dynamically retrieving the schema. In those cases, the schema would have to be provided in the source connector config. That would be a different feature than what this PR implements.
@jiangpengcheng WDYT?

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot add a dependency on the io.confluent:kafka-protobuf-serializer library since it is under the Confluent Community License, as briefly commented in an issue discussion.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 25, 2025

It looks like AWS Glue Schema Registry Library contains the classes that we'd need for parsing the Confluent Schema Registry protobuf wire format.

Under https://github.com/awslabs/aws-glue-schema-registry/tree/master/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry there are both serializers and deserializers which we could use as a replacement for the Confluent Community Licensed libraries. We could continue to use the Apache 2.0 licensed Confluent Schema Registry libraries.

There are limitations in the AWS Glue libs, such as not supporting imported schemas like Confluent Schema Registry supports: https://github.com/awslabs/aws-glue-schema-registry/blob/72cbca0b05a758f0a775c39e580a15e7f19613fb/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/protobuf/MessageIndexFinder.java#L71-L72

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 25, 2025

I also found Apicurio Schema registry libs which are Apache 2.0 licensed, https://github.com/Apicurio/apicurio-registry/tree/main/serdes/kafka/protobuf-serde/src/main/java/io/apicurio/registry/serde/protobuf . However, it looks like the wire format is Apicurio specific and not compatible with Confluent Schema Registry.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Apr 25, 2025

More OSS schema registry clients with protobuf support:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants