From 4001054ad8693f6a30948bd7cec710d0d262e2bf Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 15 Apr 2025 21:16:25 +0000 Subject: [PATCH 1/7] Update Kafka snippet to use Managed I/O --- dataflow/snippets/Dockerfile | 32 +++++++++++++++++++----------- dataflow/snippets/read_kafka.py | 18 ++++++++--------- dataflow/snippets/requirements.txt | 4 ++-- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/dataflow/snippets/Dockerfile b/dataflow/snippets/Dockerfile index ebe0d2b6d90..bb230e64e4d 100644 --- a/dataflow/snippets/Dockerfile +++ b/dataflow/snippets/Dockerfile @@ -18,24 +18,32 @@ # on the host machine. This Dockerfile is derived from the # dataflow/custom-containers/ubuntu sample. -FROM ubuntu:focal +FROM python:3.12-slim + +# Install JRE +COPY --from=openjdk:8-jre-slim /usr/local/openjdk-8 /usr/local/openjdk-8 +ENV JAVA_HOME /usr/local/openjdk-8 +RUN update-alternatives --install /usr/bin/java java /usr/local/openjdk-8/bin/java 10 WORKDIR /pipeline -COPY --from=apache/beam_python3.11_sdk:2.62.0 /opt/apache/beam /opt/apache/beam +# Copy files from official SDK image. +COPY --from=apache/beam_python3.11_sdk:2.63.0 /opt/apache/beam /opt/apache/beam +# Set the entrypoint to Apache Beam SDK launcher. ENTRYPOINT [ "/opt/apache/beam/boot" ] -COPY requirements.txt . -RUN apt-get update \ - && apt-get install -y --no-install-recommends \ - curl python3-distutils default-jre docker.io \ - && rm -rf /var/lib/apt/lists/* \ - && update-alternatives --install /usr/bin/python python /usr/bin/python3 10 \ - && curl https://bootstrap.pypa.io/get-pip.py | python \ - # Install the requirements. - && pip install --no-cache-dir -r requirements.txt \ - && pip check +# Install Docker. +RUN apt-get update +RUN apt-get install -y --no-install-recommends docker.io + +# Install dependencies. +RUN pip3 install --no-cache-dir apache-beam[gcp]==2.63.0 +RUN pip install --no-cache-dir kafka-python==2.0.6 +# Verify that the image does not have conflicting dependencies. +RUN pip check +# Copy the snippets to test. COPY read_kafka.py ./ COPY read_kafka_multi_topic.py ./ + diff --git a/dataflow/snippets/read_kafka.py b/dataflow/snippets/read_kafka.py index e3c9c135926..9a52b60b89c 100644 --- a/dataflow/snippets/read_kafka.py +++ b/dataflow/snippets/read_kafka.py @@ -42,16 +42,16 @@ def _add_argparse_args(parser: argparse.ArgumentParser) -> None: ( pipeline # Read messages from an Apache Kafka topic. - | ReadFromKafka( - consumer_config={"bootstrap.servers": options.bootstrap_server}, - topics=[options.topic], - with_metadata=False, - max_num_records=5, - start_read_time=0, + | beam.managed.Read( + beam.managed.KAFKA, + config={ + "bootstrap_servers": options.bootstrap_server, + "topic": options.topic, + "data_format": "RAW", + "auto_offset_reset_config": "earliest", + "max_read_time_seconds": 5 # For testing, avoid in production. + } ) - # The previous step creates a key-value collection, keyed by message ID. - # The values are the message payloads. - | beam.Values() # Subdivide the output into fixed 5-second windows. | beam.WindowInto(window.FixedWindows(5)) | WriteToText( diff --git a/dataflow/snippets/requirements.txt b/dataflow/snippets/requirements.txt index b8391358711..0f0d8796fa2 100644 --- a/dataflow/snippets/requirements.txt +++ b/dataflow/snippets/requirements.txt @@ -1,2 +1,2 @@ -apache-beam[gcp]==2.58.0 -kafka-python==2.0.2 +apache-beam[gcp]==2.63.0 +kafka-python==2.0.6 From 17da1ef7258b6c5c28e8cc46d45f8fe5eea3560e Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 15 Apr 2025 21:32:13 +0000 Subject: [PATCH 2/7] Remove Python 3.8 from tests (not supported) --- dataflow/snippets/noxfile_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow/snippets/noxfile_config.py b/dataflow/snippets/noxfile_config.py index dd0def22c9e..1798bbee4f8 100644 --- a/dataflow/snippets/noxfile_config.py +++ b/dataflow/snippets/noxfile_config.py @@ -22,7 +22,7 @@ TEST_CONFIG_OVERRIDE = { # You can opt out from the test for specific Python versions. - "ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12", "3.13"], + "ignored_versions": ["2.7", "3.7", "3.8", "3.9", "3.10", "3.12", "3.13"], # Old samples are opted out of enforcing Python type hints # All new samples should feature them "enforce_type_hints": True, From c251739cb0e17db7fc4f0915d0665e7029792f9c Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 15 Apr 2025 21:32:59 +0000 Subject: [PATCH 3/7] Fix linter error --- dataflow/snippets/read_kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow/snippets/read_kafka.py b/dataflow/snippets/read_kafka.py index 9a52b60b89c..a0982d06a8e 100644 --- a/dataflow/snippets/read_kafka.py +++ b/dataflow/snippets/read_kafka.py @@ -49,7 +49,7 @@ def _add_argparse_args(parser: argparse.ArgumentParser) -> None: "topic": options.topic, "data_format": "RAW", "auto_offset_reset_config": "earliest", - "max_read_time_seconds": 5 # For testing, avoid in production. + "max_read_time_seconds": 5 # For testing, avoid in production. } ) # Subdivide the output into fixed 5-second windows. From bc75b539244b24d4ab4e257f7082121a115ff7d7 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 15 Apr 2025 22:08:15 +0000 Subject: [PATCH 4/7] Fix linter error --- dataflow/snippets/read_kafka.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dataflow/snippets/read_kafka.py b/dataflow/snippets/read_kafka.py index a0982d06a8e..fa2b314a55b 100644 --- a/dataflow/snippets/read_kafka.py +++ b/dataflow/snippets/read_kafka.py @@ -19,7 +19,6 @@ import apache_beam as beam from apache_beam import window -from apache_beam.io.kafka import ReadFromKafka from apache_beam.io.textio import WriteToText from apache_beam.options.pipeline_options import PipelineOptions From d4bf244418b3a70ec97ea43c84b7f661050a2a76 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 15 Apr 2025 22:13:22 +0000 Subject: [PATCH 5/7] Enable 3.12 --- dataflow/snippets/noxfile_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow/snippets/noxfile_config.py b/dataflow/snippets/noxfile_config.py index 1798bbee4f8..900f58e0ddf 100644 --- a/dataflow/snippets/noxfile_config.py +++ b/dataflow/snippets/noxfile_config.py @@ -22,7 +22,7 @@ TEST_CONFIG_OVERRIDE = { # You can opt out from the test for specific Python versions. - "ignored_versions": ["2.7", "3.7", "3.8", "3.9", "3.10", "3.12", "3.13"], + "ignored_versions": ["2.7", "3.7", "3.8", "3.9", "3.10", "3.13"], # Old samples are opted out of enforcing Python type hints # All new samples should feature them "enforce_type_hints": True, From c909c18e7ac125855f5006355ac7984e668f1ad3 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Wed, 16 Apr 2025 16:08:47 +0000 Subject: [PATCH 6/7] Clarify code comment --- dataflow/snippets/read_kafka.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dataflow/snippets/read_kafka.py b/dataflow/snippets/read_kafka.py index fa2b314a55b..4cbe1eb485f 100644 --- a/dataflow/snippets/read_kafka.py +++ b/dataflow/snippets/read_kafka.py @@ -48,7 +48,9 @@ def _add_argparse_args(parser: argparse.ArgumentParser) -> None: "topic": options.topic, "data_format": "RAW", "auto_offset_reset_config": "earliest", - "max_read_time_seconds": 5 # For testing, avoid in production. + # The max_read_time_seconds parameter is intended for testing. + # Avoiding using this parameter in production. + "max_read_time_seconds": 5 } ) # Subdivide the output into fixed 5-second windows. From 0f7b837b7eda012c8edc6303546c37cd6469d68c Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Thu, 17 Apr 2025 19:50:26 +0000 Subject: [PATCH 7/7] Fix typo in comment --- dataflow/snippets/read_kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow/snippets/read_kafka.py b/dataflow/snippets/read_kafka.py index 4cbe1eb485f..351e95d49fd 100644 --- a/dataflow/snippets/read_kafka.py +++ b/dataflow/snippets/read_kafka.py @@ -49,7 +49,7 @@ def _add_argparse_args(parser: argparse.ArgumentParser) -> None: "data_format": "RAW", "auto_offset_reset_config": "earliest", # The max_read_time_seconds parameter is intended for testing. - # Avoiding using this parameter in production. + # Avoid using this parameter in production. "max_read_time_seconds": 5 } )