Skip to content

Commit 61b6cb3

Browse files
[Tidy first] move microbatch compilation to .compile method (#11063) (#11065)
1 parent 6a36444 commit 61b6cb3

File tree

1 file changed

+27
-44
lines changed

1 file changed

+27
-44
lines changed

core/dbt/task/run.py

+27-44
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ def _execute_model(
283283
hook_ctx: Any,
284284
context_config: Any,
285285
model: ModelNode,
286-
manifest: Manifest,
287286
context: Dict[str, Any],
288287
materialization_macro: MacroProtocol,
289288
) -> RunResult:
@@ -328,9 +327,7 @@ def execute(self, model, manifest):
328327

329328
hook_ctx = self.adapter.pre_model_hook(context_config)
330329

331-
return self._execute_model(
332-
hook_ctx, context_config, model, manifest, context, materialization_macro
333-
)
330+
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
334331

335332

336333
class MicrobatchModelRunner(ModelRunner):
@@ -342,10 +339,30 @@ def __init__(self, config, adapter, node, node_index: int, num_nodes: int):
342339
self.relation_exists: bool = False
343340

344341
def compile(self, manifest: Manifest):
345-
# The default compile function is _always_ called. However, we do our
346-
# compilation _later_ in `_execute_microbatch_materialization`. This
347-
# meant the node was being compiled _twice_ for each batch. To get around
348-
# this, we've overriden the default compile method to do nothing
342+
if self.batch_idx is not None:
343+
batch = self.batches[self.batch_idx]
344+
345+
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
346+
# TODO: REMOVE before 1.10 GA
347+
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
348+
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
349+
# Create batch context on model node prior to re-compiling
350+
self.node.batch = BatchContext(
351+
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
352+
event_time_start=batch[0],
353+
event_time_end=batch[1],
354+
)
355+
# Recompile node to re-resolve refs with event time filters rendered, update context
356+
self.compiler.compile_node(
357+
self.node,
358+
manifest,
359+
{},
360+
split_suffix=MicrobatchBuilder.format_batch_start(
361+
batch[0], self.node.config.batch_size
362+
),
363+
)
364+
365+
# Skips compilation for non-batch runs
349366
return self.node
350367

351368
def set_batch_idx(self, batch_idx: int) -> None:
@@ -502,7 +519,6 @@ def _build_run_microbatch_model_result(self, model: ModelNode) -> RunResult:
502519
def _execute_microbatch_materialization(
503520
self,
504521
model: ModelNode,
505-
manifest: Manifest,
506522
context: Dict[str, Any],
507523
materialization_macro: MacroProtocol,
508524
) -> RunResult:
@@ -537,25 +553,6 @@ def _execute_microbatch_materialization(
537553
# call materialization_macro to get a batch-level run result
538554
start_time = time.perf_counter()
539555
try:
540-
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
541-
# TODO: REMOVE before 1.10 GA
542-
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
543-
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
544-
# Create batch context on model node prior to re-compiling
545-
model.batch = BatchContext(
546-
id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size),
547-
event_time_start=batch[0],
548-
event_time_end=batch[1],
549-
)
550-
# Recompile node to re-resolve refs with event time filters rendered, update context
551-
self.compiler.compile_node(
552-
model,
553-
manifest,
554-
{},
555-
split_suffix=MicrobatchBuilder.format_batch_start(
556-
batch[0], model.config.batch_size
557-
),
558-
)
559556
# Update jinja context with batch context members
560557
jinja_context = microbatch_builder.build_jinja_context_for_batch(
561558
incremental_batch=self.relation_exists
@@ -643,37 +640,23 @@ def _is_incremental(self, model) -> bool:
643640
else:
644641
return False
645642

646-
def _execute_microbatch_model(
643+
def _execute_model(
647644
self,
648645
hook_ctx: Any,
649646
context_config: Any,
650647
model: ModelNode,
651-
manifest: Manifest,
652648
context: Dict[str, Any],
653649
materialization_macro: MacroProtocol,
654650
) -> RunResult:
655651
try:
656652
batch_result = self._execute_microbatch_materialization(
657-
model, manifest, context, materialization_macro
653+
model, context, materialization_macro
658654
)
659655
finally:
660656
self.adapter.post_model_hook(context_config, hook_ctx)
661657

662658
return batch_result
663659

664-
def _execute_model(
665-
self,
666-
hook_ctx: Any,
667-
context_config: Any,
668-
model: ModelNode,
669-
manifest: Manifest,
670-
context: Dict[str, Any],
671-
materialization_macro: MacroProtocol,
672-
) -> RunResult:
673-
return self._execute_microbatch_model(
674-
hook_ctx, context_config, model, manifest, context, materialization_macro
675-
)
676-
677660

678661
class RunTask(CompileTask):
679662
def __init__(

0 commit comments

Comments
 (0)