@@ -416,6 +416,7 @@ def run_merged_intervals(
416416        start : t .Optional [TimeLike ] =  None ,
417417        end : t .Optional [TimeLike ] =  None ,
418418        allow_destructive_snapshots : t .Optional [t .Set [str ]] =  None ,
419+         selected_models : t .Optional [t .Set [str ]] =  None ,
419420        allow_additive_snapshots : t .Optional [t .Set [str ]] =  None ,
420421        selected_snapshot_ids : t .Optional [t .Set [SnapshotId ]] =  None ,
421422        run_environment_statements : bool  =  False ,
@@ -445,7 +446,13 @@ def run_merged_intervals(
445446        if  not  selected_snapshots :
446447            selected_snapshots  =  list (merged_intervals )
447448
448-         snapshot_dag  =  snapshots_to_dag (selected_snapshots )
449+         # Build the full DAG from all snapshots to preserve transitive dependencies 
450+         full_dag  =  snapshots_to_dag (self .snapshots .values ())
451+ 
452+         # Create a subdag that includes the selected snapshots and all their upstream dependencies 
453+         # This ensures that transitive dependencies are preserved even when intermediate nodes are not selected 
454+         selected_snapshot_ids_set  =  {s .snapshot_id  for  s  in  selected_snapshots }
455+         snapshot_dag  =  full_dag .subdag (* selected_snapshot_ids_set )
449456
450457        batched_intervals  =  self .batch_intervals (
451458            merged_intervals , deployability_index , environment_naming_info , dag = snapshot_dag 
@@ -472,6 +479,7 @@ def run_merged_intervals(
472479                start = start ,
473480                end = end ,
474481                execution_time = execution_time ,
482+                 selected_models = selected_models ,
475483            )
476484
477485        # We only need to create physical tables if the snapshot is not representative or if it 
@@ -533,6 +541,7 @@ def run_node(node: SchedulingUnit) -> None:
533541                            allow_destructive_snapshots = allow_destructive_snapshots ,
534542                            allow_additive_snapshots = allow_additive_snapshots ,
535543                            target_table_exists = snapshot .snapshot_id  not  in snapshots_to_create ,
544+                             selected_models = selected_models ,
536545                        )
537546
538547                    evaluation_duration_ms  =  now_timestamp () -  execution_start_ts 
@@ -602,6 +611,7 @@ def run_node(node: SchedulingUnit) -> None:
602611                    start = start ,
603612                    end = end ,
604613                    execution_time = execution_time ,
614+                     selected_models = selected_models ,
605615                )
606616
607617            self .state_sync .recycle ()
@@ -642,20 +652,11 @@ def _dag(
642652            upstream_dependencies : t .List [SchedulingUnit ] =  []
643653
644654            for  p_sid  in  snapshot .parents :
645-                 if  p_sid  in  self .snapshots :
646-                     p_intervals  =  intervals_per_snapshot .get (p_sid .name , [])
647- 
648-                     if  not  p_intervals  and  p_sid  in  original_snapshots_to_create :
649-                         upstream_dependencies .append (CreateNode (snapshot_name = p_sid .name ))
650-                     elif  len (p_intervals ) >  1 :
651-                         upstream_dependencies .append (DummyNode (snapshot_name = p_sid .name ))
652-                     else :
653-                         for  i , interval  in  enumerate (p_intervals ):
654-                             upstream_dependencies .append (
655-                                 EvaluateNode (
656-                                     snapshot_name = p_sid .name , interval = interval , batch_index = i 
657-                                 )
658-                             )
655+                 upstream_dependencies .extend (
656+                     self ._find_upstream_dependencies (
657+                         p_sid , intervals_per_snapshot , original_snapshots_to_create 
658+                     )
659+                 )
659660
660661            batch_concurrency  =  snapshot .node .batch_concurrency 
661662            batch_size  =  snapshot .node .batch_size 
@@ -699,6 +700,36 @@ def _dag(
699700                    )
700701        return  dag 
701702
703+     def  _find_upstream_dependencies (
704+         self ,
705+         parent_sid : SnapshotId ,
706+         intervals_per_snapshot : t .Dict [str , Intervals ],
707+         snapshots_to_create : t .Set [SnapshotId ],
708+     ) ->  t .List [SchedulingUnit ]:
709+         if  parent_sid  not  in self .snapshots :
710+             return  []
711+ 
712+         p_intervals  =  intervals_per_snapshot .get (parent_sid .name , [])
713+ 
714+         if  p_intervals :
715+             if  len (p_intervals ) >  1 :
716+                 return  [DummyNode (snapshot_name = parent_sid .name )]
717+             interval  =  p_intervals [0 ]
718+             return  [EvaluateNode (snapshot_name = parent_sid .name , interval = interval , batch_index = 0 )]
719+         if  parent_sid  in  snapshots_to_create :
720+             return  [CreateNode (snapshot_name = parent_sid .name )]
721+         # This snapshot has no intervals and doesn't need creation which means 
722+         # that it can be a transitive dependency 
723+         transitive_deps : t .List [SchedulingUnit ] =  []
724+         parent_snapshot  =  self .snapshots [parent_sid ]
725+         for  grandparent_sid  in  parent_snapshot .parents :
726+             transitive_deps .extend (
727+                 self ._find_upstream_dependencies (
728+                     grandparent_sid , intervals_per_snapshot , snapshots_to_create 
729+                 )
730+             )
731+         return  transitive_deps 
732+ 
702733    def  _run_or_audit (
703734        self ,
704735        environment : str  |  EnvironmentNamingInfo ,
@@ -808,6 +839,7 @@ def _run_or_audit(
808839            run_environment_statements = run_environment_statements ,
809840            audit_only = audit_only ,
810841            auto_restatement_triggers = auto_restatement_triggers ,
842+             selected_models = {s .node .dbt_name  for  s  in  merged_intervals  if  s .node .dbt_name },
811843        )
812844
813845        return  CompletionStatus .FAILURE  if  errors  else  CompletionStatus .SUCCESS 
0 commit comments