Skip to content

feat(Dataflow): Update Kafka snippet to use Managed I/O #13304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions dataflow/snippets/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@
# on the host machine. This Dockerfile is derived from the
# dataflow/custom-containers/ubuntu sample.

FROM ubuntu:focal
FROM python:3.12-slim

# Install JRE
COPY --from=openjdk:8-jre-slim /usr/local/openjdk-8 /usr/local/openjdk-8
ENV JAVA_HOME /usr/local/openjdk-8
RUN update-alternatives --install /usr/bin/java java /usr/local/openjdk-8/bin/java 10

WORKDIR /pipeline

COPY --from=apache/beam_python3.11_sdk:2.62.0 /opt/apache/beam /opt/apache/beam
# Copy files from official SDK image.
COPY --from=apache/beam_python3.11_sdk:2.63.0 /opt/apache/beam /opt/apache/beam
# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT [ "/opt/apache/beam/boot" ]

COPY requirements.txt .
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
curl python3-distutils default-jre docker.io \
&& rm -rf /var/lib/apt/lists/* \
&& update-alternatives --install /usr/bin/python python /usr/bin/python3 10 \
&& curl https://bootstrap.pypa.io/get-pip.py | python \
# Install the requirements.
&& pip install --no-cache-dir -r requirements.txt \
&& pip check
# Install Docker.
RUN apt-get update
RUN apt-get install -y --no-install-recommends docker.io

# Install dependencies.
RUN pip3 install --no-cache-dir apache-beam[gcp]==2.63.0
RUN pip install --no-cache-dir kafka-python==2.0.6

# Verify that the image does not have conflicting dependencies.
RUN pip check

# Copy the snippets to test.
COPY read_kafka.py ./
COPY read_kafka_multi_topic.py ./

2 changes: 1 addition & 1 deletion dataflow/snippets/noxfile_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12", "3.13"],
"ignored_versions": ["2.7", "3.7", "3.8", "3.9", "3.10", "3.13"],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": True,
Expand Down
21 changes: 11 additions & 10 deletions dataflow/snippets/read_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

Expand All @@ -42,16 +41,18 @@ def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
(
pipeline
# Read messages from an Apache Kafka topic.
| ReadFromKafka(
consumer_config={"bootstrap.servers": options.bootstrap_server},
topics=[options.topic],
with_metadata=False,
max_num_records=5,
start_read_time=0,
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
"auto_offset_reset_config": "earliest",
# The max_read_time_seconds parameter is intended for testing.
# Avoid using this parameter in production.
"max_read_time_seconds": 5
}
)
# The previous step creates a key-value collection, keyed by message ID.
# The values are the message payloads.
| beam.Values()
# Subdivide the output into fixed 5-second windows.
| beam.WindowInto(window.FixedWindows(5))
| WriteToText(
Expand Down
4 changes: 2 additions & 2 deletions dataflow/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
apache-beam[gcp]==2.58.0
kafka-python==2.0.2
apache-beam[gcp]==2.63.0
kafka-python==2.0.6