Skip to content

Commit

Permalink
Merge pull request #11703 from ibzib/BEAM-9001
Browse files Browse the repository at this point in the history
[BEAM-9001, BEAM-6327] Ensure that all transforms (except for require…
  • Loading branch information
ibzib authored May 14, 2020
2 parents 8f387f0 + d87c0ab commit e859735
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 70 deletions.
18 changes: 13 additions & 5 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ message PTransform {
// details.
FunctionSpec spec = 1;

// (Optional) if this node is a composite, a list of the ids of
// transforms that it contains.
// (Optional) A list of the ids of transforms that it contains.
//
// Primitive transforms are not allowed to specify this.
repeated string subtransforms = 2;

// (Required) A map from local names of inputs (unique only with this map, and
Expand Down Expand Up @@ -184,9 +185,10 @@ message PTransform {
// there is none, it may be omitted.
repeated DisplayData display_data = 6;

// (Optional) Environment where the current PTransform should be executed in.
// Runner that executes the pipeline may choose to override this if needed. If
// not specified, environment will be decided by the runner.
// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;
}

Expand Down Expand Up @@ -227,12 +229,18 @@ message StandardPTransforms {
// See https://beam.apache.org/documentation/programming-guide/#groupbykey
// for additional details.
//
// Never defines an environment as the runner is required to implement this
// transform.
//
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];

// A transform which produces a single empty byte array at the minimum
// timestamp in the GlobalWindow.
//
// Never defines an environment as the runner is required to implement this
// transform.
//
// Payload: None
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
Expand All @@ -74,6 +74,10 @@ public class PTransformTranslation {
public static final String MAP_WINDOWS_TRANSFORM_URN = "beam:transform:map_windows:v1";
public static final String MERGE_WINDOWS_TRANSFORM_URN = "beam:transform:merge_windows:v1";

// Required runner implemented transforms. These transforms should never specify an environment.
public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);

// DeprecatedPrimitives
/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
Expand Down Expand Up @@ -350,10 +354,15 @@ public RunnerApi.PTransform translate(

// A composite transform is permitted to have a null spec. There are also some pseudo-
// primitives not yet supported by the portability framework that have null specs
String urn = "";
if (spec != null) {
urn = spec.getUrn();
transformBuilder.setSpec(spec);
}
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());

if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
}
return transformBuilder.build();
}
}
Expand All @@ -367,11 +376,6 @@ private static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>>
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();

// TODO: BEAM-9001 - set environment ID in all transforms and allow runners to override.
private static List<String> sdkTransformsWithEnvironment =
ImmutableList.of(
PAR_DO_TRANSFORM_URN, COMBINE_PER_KEY_TRANSFORM_URN, ASSIGN_WINDOWS_TRANSFORM_URN);

private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators =
Expand Down Expand Up @@ -423,14 +427,20 @@ public RunnerApi.PTransform translate(
if (spec != null) {
transformBuilder.setSpec(spec);

if (sdkTransformsWithEnvironment.contains(spec.getUrn())) {
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
} else if (spec.getUrn().equals(READ_TRANSFORM_URN)
&& (appliedPTransform.getTransform().getClass() == Read.Bounded.class)) {
// Only assigning environment to Bounded reads. Not assigning an environment to Unbounded
// reads since they are a Runner translated transform, unless, in the future, we have an
// adapter available for splittable DoFn.
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
// Required runner implemented transforms should not have an environment id.
if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(spec.getUrn())) {
// TODO(BEAM-9309): Remove existing hacks around deprecated READ transform.
if (spec.getUrn().equals(READ_TRANSFORM_URN)) {
// Only assigning environment to Bounded reads. Not assigning an environment to
// Unbounded
// reads since they are a Runner translated transform, unless, in the future, we have an
// adapter available for splittable DoFn.
if (appliedPTransform.getTransform().getClass() == Read.Bounded.class) {
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
}
} else {
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
}
}
}
return transformBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ private static void validateTransform(
}

String urn = transform.getSpec().getUrn();
if (PTransformTranslation.RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
checkArgument(
transform.getEnvironmentId().isEmpty(),
"Transform %s references environment %s when no environment should be specified since it is a required runner implemented transform %s.",
id,
transform.getEnvironmentId(),
urn);
}

if (VALIDATORS.containsKey(urn)) {
try {
VALIDATORS.get(urn).validate(id, transform, components, requirements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,52 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO(BEAM-6327): Remove the need for this.

/** PipelineTrimmer removes subcomponents of native transforms that shouldn't be fused. */
public class PipelineTrimmer {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class);
/**
* TrivialNativeTransformExpander is used to replace transforms with known URNs with their native
* equivalent.
*/
public class TrivialNativeTransformExpander {
private static final Logger LOG = LoggerFactory.getLogger(TrivialNativeTransformExpander.class);

/**
* Remove subcomponents of native transforms that shouldn't be fused.
* Replaces transforms with the known URN with a native equivalent stripping the environment and
* removing any sub-transforms from the returned pipeline.
*
* @param pipeline the pipeline to be trimmed
* @param knownUrns set of URNs for the runner's native transforms
* @return the trimmed pipeline
*/
public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
public static Pipeline forKnownUrns(Pipeline pipeline, Set<String> knownUrns) {
return makeKnownUrnsPrimitives(pipeline, knownUrns);
}

private static RunnerApi.Pipeline makeKnownUrnsPrimitives(
RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
if (knownUrns.contains(
pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
// Skip over previously removed transforms from the original pipeline since we iterate
// over all transforms from the original pipeline and not the trimmed down version.
RunnerApi.PTransform currentTransform =
trimmedPipeline.getComponents().getTransformsOrDefault(ptransformId, null);
if (currentTransform != null && knownUrns.contains(currentTransform.getSpec().getUrn())) {
LOG.debug(
"Removing descendants and environment of known native PTransform {}" + ptransformId);
removeDescendants(trimmedPipeline, ptransformId);
trimmedPipeline
.getComponentsBuilder()
.putTransforms(
ptransformId,
currentTransform.toBuilder().clearSubtransforms().clearEnvironmentId().build());
}
}
return trimmedPipeline.build();
}

private static void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
RunnerApi.PTransform parentProto =
pipeline.getComponents().getTransformsOrDefault(parentId, null);
if (parentProto != null) {
for (String childId : parentProto.getSubtransformsList()) {
removeDescendants(pipeline, childId);
pipeline.getComponentsBuilder().removeTransforms(childId);
}
pipeline
.getComponentsBuilder()
.putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
RunnerApi.PTransform parentProto = pipeline.getComponents().getTransformsOrThrow(parentId);
for (String childId : parentProto.getSubtransformsList()) {
removeDescendants(pipeline, childId);
pipeline.getComponentsBuilder().removeTransforms(childId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
Expand Down Expand Up @@ -342,15 +344,15 @@ public void getProducer() {
public void getEnvironmentWithEnvironment() {
Pipeline p = Pipeline.create();
PCollection<Long> longs = p.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
PCollectionList.of(longs).and(longs).and(longs).apply("flatten", Flatten.pCollections());
longs.apply(WithKeys.of("a")).apply("groupByKey", GroupByKey.create());

Components components = PipelineTranslation.toProto(p).getComponents();
QueryablePipeline qp = QueryablePipeline.forPrimitivesIn(components);

PTransformNode environmentalRead =
PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead"));
PTransformNode nonEnvironmentalTransform =
PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten"));
PipelineNode.pTransform("groupByKey", components.getTransformsOrThrow("groupByKey"));

assertThat(qp.getEnvironment(environmentalRead).isPresent(), is(true));
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
Expand Down Expand Up @@ -98,7 +98,8 @@ PortablePipelineResult runPipelineWithTranslator(

// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
TrivialNativeTransformExpander.forKnownUrns(
pipelineWithSdfExpanded, translator.knownUrns());

// Fused pipeline proto.
// TODO: Consider supporting partially-fused graphs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ interface PTransformTranslator<T> {

@Override
public Set<String> knownUrns() {
// Do not expose Read as a known URN because PipelineTrimmer otherwise removes
// Do not expose Read as a known URN because TrivialNativeTransformExpander otherwise removes
// the subtransforms which are added in case of bounded reads. We only have a
// translator here for unbounded Reads which are native transforms which do not
// have subtransforms. Unbounded Reads are used by cross-language transforms, e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
Expand Down Expand Up @@ -82,7 +82,8 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)

// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
TrivialNativeTransformExpander.forKnownUrns(
pipelineWithSdfExpanded, translator.knownUrns());

// Fused pipeline proto.
// TODO: Consider supporting partially-fused graphs.
Expand Down
22 changes: 13 additions & 9 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
transform := &pipepb.PTransform{
UniqueName: s.Scope.Name,
Subtransforms: subtransforms,
EnvironmentId: m.addDefaultEnv(),
}

m.updateIfCombineComposite(s, transform)
Expand Down Expand Up @@ -208,7 +209,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PT
AccumulatorCoderId: acID,
}
transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
transform.EnvironmentId = m.addDefaultEnv()
}

func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
Expand Down Expand Up @@ -238,10 +238,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
// allPIds tracks additional PTransformIDs generated for the pipeline
var allPIds []string
var spec *pipepb.FunctionSpec
var transformEnvID = ""
switch edge.Edge.Op {
case graph.Impulse:
// TODO(herohde) 7/18/2018: Encode data?
spec = &pipepb.FunctionSpec{Urn: URNImpulse}

case graph.ParDo:
Expand Down Expand Up @@ -315,7 +313,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
if edge.Edge.DoFn.IsSplittable() {
payload.RestrictionCoderId = m.coders.Add(edge.Edge.RestrictionCoder)
}
transformEnvID = m.addDefaultEnv()
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}

case graph.Combine:
Expand All @@ -325,7 +322,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
},
}
transformEnvID = m.addDefaultEnv()
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}

case graph.Flatten:
Expand All @@ -347,6 +343,11 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
}

var transformEnvID = ""
if !(spec.Urn == URNGBK || spec.Urn == URNImpulse) {
transformEnvID = m.addDefaultEnv()
}

transform := &pipepb.PTransform{
UniqueName: edge.Name,
Spec: spec,
Expand Down Expand Up @@ -413,10 +414,11 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {

flattenID := fmt.Sprintf("%v_flatten", id)
flatten := &pipepb.PTransform{
UniqueName: flattenID,
Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
Inputs: inputs,
Outputs: map[string]string{"i0": out},
UniqueName: flattenID,
Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
Inputs: inputs,
Outputs: map[string]string{"i0": out},
EnvironmentId: m.addDefaultEnv(),
}
m.transforms[flattenID] = flatten
subtransforms = append(subtransforms, flattenID)
Expand Down Expand Up @@ -468,6 +470,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
m.transforms[cogbkID] = &pipepb.PTransform{
UniqueName: edge.Name,
Subtransforms: subtransforms,
EnvironmentId: m.addDefaultEnv(),
}
return cogbkID
}
Expand Down Expand Up @@ -632,6 +635,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
Spec: &pipepb.FunctionSpec{
Urn: URNReshuffle,
},
EnvironmentId: m.addDefaultEnv(),
}
return reshuffleID
}
Expand Down
Loading

0 comments on commit e859735

Please sign in to comment.