Support multi-layer algorithms that receive a data product from an unfold#571
Merged
Merged
Conversation
381d7c1 to
fa25cb1
Compare
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #571 +/- ##
==========================================
+ Coverage 82.23% 82.59% +0.35%
==========================================
Files 157 161 +4
Lines 5760 5895 +135
Branches 649 682 +33
==========================================
+ Hits 4737 4869 +132
+ Misses 807 804 -3
- Partials 216 222 +6
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 3 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
eb2cf70 to
f4599e5
Compare
520789a to
ab9f5eb
Compare
knoepfel
commented
May 15, 2026
knoepfel
commented
May 15, 2026
knoepfel
commented
May 15, 2026
ab9f5eb to
6fe9261
Compare
marcpaterno
reviewed
May 18, 2026
629bc84 to
cf92bb1
Compare
marcpaterno
previously requested changes
May 20, 2026
cf92bb1 to
cd49e44
Compare
…uct from an unfold
Also: - Removes now stale code - Renames test
cd49e44 to
0dafda7
Compare
sabasehrish
approved these changes
May 20, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR fixes a bug where transforms operating on a hierarchy layer downstream of an unfold could not receive data products produced by that unfold. The root cause was that the
index_routerhad no mechanism to learn — from the unfold itself — how many children it had generated, so it could not emit the correct flush token at the right time.Solution
The fix introduces several new model components, reworks the flush-message types, and refactors the flush/routing pipeline. The previous
data_cell_counter/flush_countsabstraction is replaced by smaller, more focused pieces.New:
data_cell_counts(phlex/model/)A concurrent map from
layer_hashto count, replacing the oldflush_countstype. It supportsemplace,increment,add_to, andcountoperations and is used both as a mutable accumulator (data_cell_counts_ptr) and as an immutable payload on flush messages (data_cell_counts_const_ptr).New:
flush_messages.hpp(phlex/model/)Defines the message structs that flow through the new flush pipeline:
index_flush—(index, counts)pair carrying the child-count map for an index.unfold_flush— simpler(index, layer_hash, count)message emitted by an unfold, since each unfold produces children in exactly one child layer.ready_flushes_then_emit— bundles the closeout flushes that must be emitted before a new index.New:
data_cell_tracker(phlex/model/)Tracks the sequence of incoming
data_cell_indexvalues from the driver and determines which flush tokens are ready to be emitted when a new index arrives or the job ends. This logic was previously implicit in the source/index-router interaction.New:
child_tracker(phlex/model/)Per-index tracker that tracks how many children have been processed across each child layer. Once the expected count (received from the unfold's flush result) is satisfied,
child_trackerfires a callback to emit the flush token for that index.declared_unfoldNow uses a third TBB output port (
index_flush) carrying the child counts alongside the existing message and index-message ports.index_routerGains a
flush_receiverinput and anestablish_layers()initializer that records which layers are produced by unfolds vs. consumed as inputs.route()now accepts the closeout flushes fromdata_cell_trackerrather than computing them internally, anddrain()likewise receives the remaining flushes at job end.framework_graphindex_receiver_node that decouples closeout flush emission from the source node.data_cell_tracker(cell_tracker_) and feeds its output intoindex_router_.route().index_router_.establish_layers()beforefinalize(), using layer metadata gathered from the declared unfolds.flush_sender()toindex_router_.flush_receiver().edge_makerRefactored to return
(provider_input_ports, multilayer_join_index_ports)instead of directly callingindex_router_.finalize(), allowingframework_graphto callestablish_layers()first.store_counters/messageUpdated to use
data_cell_counts_const_ptrin place of the removedflush_counts_ptr, and the internal counts map is renamed accordingly.fixed_hierarchyExposes a
layer_paths()accessor so the framework graph can read the declared layer paths when establishing layers on theindex_router.Tests
test/data_cell_tracker_test.cpp— unit tests fordata_cell_trackerin isolation.test/child_tracker_test.cpp— unit-level tests forchild_trackerverifying that committed child counts accumulate correctly through nested unfolds without involving the fullframework_graphmachinery.test/data_layer_hierarchy_test.cpp— new unit tests fordata_layer_hierarchy, covering ambiguous-layer-name lookups and the unnamed-layer fallback path.test/fold_duplicate_layer_name_test.cpp— renamed fromtest/different_hierarchies.cpp; re-enables the previously disableddifferent_hierarchiestest, now covering the case where two layers share the same name in a fold scenario.test/unfold.cpp— additional unfold scenarios exercising multi-layer product consumption.test/data_cell_counting.cpp— removed alongside the olddata_cell_counterimplementation.Files changed
phlex/model/child_tracker.{hpp,cpp},phlex/model/data_cell_tracker.{hpp,cpp},phlex/model/data_cell_counts.{hpp,cpp},phlex/model/flush_messages.hppphlex/model/data_cell_counter.{hpp,cpp}phlex/model/fwd.hpp,phlex/model/fixed_hierarchy.{hpp,cpp},phlex/model/CMakeLists.txtphlex/core/index_router.{hpp,cpp},phlex/core/framework_graph.{hpp,cpp},phlex/core/declared_unfold.{hpp,cpp},phlex/core/edge_maker.hpp,phlex/core/message.hpp,phlex/core/store_counters.{hpp,cpp}test/data_cell_tracker_test.cpp,test/child_tracker_test.cpp,test/data_layer_hierarchy_test.cpptest/different_hierarchies.cpp→test/fold_duplicate_layer_name_test.cpptest/data_cell_counting.cpptest/unfold.cpp,test/CMakeLists.txtSummary largely courtesy of Claude Sonnet 4.6
Resolves #550