-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Open
Description
What happened?
import apache_beam as beam
from apache_beam.testing.util import assert_that, equal_to
with beam.Pipeline() as p:
output_titles = (
p
| "Create input" >> beam.Create([(0,0)])
| "Batch in groups" >> beam.GroupIntoBatches(5)
)
assert_that(output_titles, equal_to([(0, [0])]))
The above pipeline throws:
File "/Users/jtran/repo/hjtran/beam/sdks/python/apache_beam/runners/direct/executor.py", line 418, in await_completion
self._executor.await_completion()
File "/Users/jtran/repo/hjtran/beam/sdks/python/apache_beam/runners/direct/executor.py", line 465, in await_completion
raise update.exception
File "/Users/jtran/repo/hjtran/beam/sdks/python/apache_beam/runners/direct/executor.py", line 356, in call
self.attempt_call(
File "/Users/jtran/repo/hjtran/beam/sdks/python/apache_beam/runners/direct/executor.py", line 399, in attempt_call
evaluator.process_element(value)
File "/Users/jtran/repo/hjtran/beam/sdks/python/apache_beam/runners/direct/transform_evaluator.py", line 931, in process_element
assert not self.global_state.get_state(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError
This is with beam 2.69. I think current versions of beam you'll need to specify the direct bundle runner
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
kvudata