Skip to content

[Bug]: NullPointerException in KafkaSourceDescriptor during Dataflow job updates in beam-python #36356

@HurSungYun

Description

@HurSungYun

What happened?

When updating a Dataflow streaming pipeline, a NullPointerException: Null topic occurs in KafkaSourceDescriptor.

Environment

  • Apache Beam Python 3.12 SDK 2.67.0
  • Dataflow Runner (V2)

pipeline logic (Python)

from apache_beam.io.kafka import ReadFromKafka
# wrapper class
class ReadFromPlaintextKafkaSource(ReadFromKafka):
    def __init__(
        self,
        job_name: str,
        broker_addresses: str,
        topics: List[str],
        redistribute: bool = False,
        redistribute_num_keys: int = 0,
    ):
        if not job_name:
            raise ValueError("Kafka job name is required")
        job_name = job_name.replace("-", "_")

        if not broker_addresses:
            raise ValueError("Kafka broker addresses are required")

        if not topics:
            raise ValueError("Kafka topics are required")

        consumer_group_id = "some_group"

        logger.info(f"Consumer group id: {consumer_group_id}")
        consumer_client_id = f"{consumer_group_id}.{''.join(random.choices(string.ascii_letters + string.digits, k=6))}"
        consumer_config = {
            "allow.auto.create.topics": "false",
            "auto.commit.interval.ms": "100",
            "bootstrap.servers": broker_addresses,
            "group.id": consumer_group_id,
            "enable.auto.commit": "false",
            "auto.offset.reset": "latest",
            "client.id": consumer_client_id,
            "fetch.max.bytes": "104857600",  # 100MB
            "max.poll.records": "3000",
            "max.partition.fetch.bytes": "104857600",  # 100MB
            "fetch.min.bytes": "1048576",  # 1MB
            "request.timeout.ms": "25000",  # 25 seconds timeout
            "session.timeout.ms": "25000",  # 25 seconds timeout
            "default.api.timeout.ms": "25000",  # 25 seconds timeout
            "connections.max.idle.ms": "30000",  # 30 seconds
            "max.poll.interval.ms": "300000",  # 5 minutes
            "send.buffer.bytes": "52428800",  # 50MB
            "receive.buffer.bytes": "52428800",  # 50MB
        }

        super().__init__(
            consumer_config=consumer_config,
            topics=topics,
            with_metadata=False,
            commit_offset_in_finalize=True,
            redistribute=redistribute,
            redistribute_num_keys=np.int32(redistribute_num_keys),
        )

# pipeline logic
    pipeline = BasePipeline(pipeline_options)

    input_collection = pipeline | "Read from Kafka" >> ReadFromPlaintextKafkaSource(
        broker_addresses=pipeline_options.broker_address,
        topics=["some_topic"],
        job_name=pipeline_options.job_name,
    )

When triggered with update method, errors occurred like this.

stacktrace

Image
Error message from worker: generic::unknown: java.lang.NullPointerException: Null topic
	org.apache.beam.sdk.io.kafka.AutoValue_KafkaSourceDescriptor.<init>(AutoValue_KafkaSourceDescriptor.java:36)
	org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor.create(KafkaSourceDescriptor.java:122)
	org.apache.beam.sdk.io.kafka.SchemaUserTypeCreator$SchemaCodeGen$PDZ5LUq0.create(Unknown Source)
	org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:102)
	org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:46)
	org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
	org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
	org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64)
	org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:83)
	org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:78)
	org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:602)
	org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:593)
	org.apache.beam.sdk.values.WindowedValues$FullWindowedValueCoder.decode(WindowedValues.java:539)
	org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
	org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:527)

Reproduction Steps

  1. Create a streaming pipeline using Python Beam with Kafka IO
  2. Run the pipeline on Dataflow
  3. Update the pipeline (Update, not Drain)
  4. Confirm NPE

Analysis

  1. AutoValueSchema field order inconsistency
    • ReflectUtils.getMethods() may return different orders depending on JVM implementation
    • Mismatch between KafkaSourceDescriptor.create() parameter order and the field order determined by AutoValueSchema
  2. Occurs only during Dataflow Job Update
    • Schema conversion is required during state restoration
    • Drain does not restore state, so the issue does not occur

Description

I doubt this error occurs because of this temporary code.

@SchemaCreate
@SuppressWarnings("all")
// TODO(BEAM-10677): Remove this function after AutoValueSchema is fixed.
static KafkaSourceDescriptor create(
String topic,
Integer partition,
Long start_read_offset,
Instant start_read_time,
Long stop_read_offset,
Instant stop_read_time,
List<String> bootstrap_servers) {
checkArguments(start_read_offset, start_read_time, stop_read_offset, stop_read_time);
return new AutoValue_KafkaSourceDescriptor(
topic,
partition,
start_read_offset,
start_read_time,
stop_read_offset,
stop_read_time,
bootstrap_servers);
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions