From 67674ab4711af132147a78dcd79fc18d490672be Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Fri, 24 Apr 2026 08:02:17 -0500 Subject: [PATCH 1/5] Demonstrate failure for multi-layer transforms to receive a data product from an unfold --- test/unfold.cpp | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/test/unfold.cpp b/test/unfold.cpp index b2f00699..1caf0320 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -137,3 +137,51 @@ TEST_CASE("Splitting the processing", "[graph]") CHECK(g.execution_count("add_numbers") == 20); CHECK(g.execution_count("check_sum_same") == index_limit); } + +// ======================================================================================= +// This test exercises a multi-layer transform whose two inputs come from different data +// layers: one from the unfolded (child) layer and one from the parent (event) layer. +// +/* Index Router */ +/* | */ +/* provide_max_number (event layer) */ +/* | \ */ +/* unfold/iota (creates "subevent" children) */ +/* | \ */ +/* | \ */ +/* (subevent) (event, repeated) */ +/* new_number max_number */ +/* \ / */ +/* multi-layer transform: max_number + new_number */ +// ======================================================================================= +TEST_CASE("Multi-layer transform with one input from an unfold", "[graph]") +{ + constexpr auto index_limit = 2u; + + experimental::layer_generator gen; + gen.add_layer("event", {"job", index_limit}); + + experimental::framework_graph g{driver_for_test(gen)}; + + g.provide("provide_max_number", provide_max_number, concurrency::unlimited) + .output_product("input", "max_number", "event"); + + g.unfold("iota", &iota::predicate, &iota::unfold, concurrency::unlimited, "subevent") + .input_family(product_query{.creator = "input", .layer = "event", .suffix = "max_number"}) + .output_product_suffixes("new_number"); + + g.transform( + "add_max_and_new", + [](unsigned int i, unsigned int j) { return i + j; }, + concurrency::unlimited) + .input_family(product_query{.creator = "iota", .layer = "subevent", .suffix = "new_number"}, + product_query{.creator = "input", .layer = "event", .suffix = "max_number"}) + .output_product_suffixes("result"); + + g.execute(); + + // event 0: max_number=10, new_number in [0,9] -> 10 executions + // event 1: max_number=20, new_number in [0,19] -> 20 executions + CHECK(g.execution_count("iota") == index_limit); + CHECK(g.execution_count("add_max_and_new") == 30u); +} From 2a1e706b3cfdd551efd4f4fcb25569f485b33a52 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Mon, 11 May 2026 15:31:33 -0500 Subject: [PATCH 2/5] Introduce data_cell_tracker and child_tracker constructs --- phlex/model/CMakeLists.txt | 4 + phlex/model/child_tracker.cpp | 106 ++++++++++++ phlex/model/child_tracker.hpp | 68 ++++++++ phlex/model/data_cell_tracker.cpp | 150 ++++++++++++++++ phlex/model/data_cell_tracker.hpp | 92 ++++++++++ test/CMakeLists.txt | 8 + test/child_tracker_test.cpp | 273 ++++++++++++++++++++++++++++++ test/data_cell_tracker_test.cpp | 114 +++++++++++++ 8 files changed, 815 insertions(+) create mode 100644 phlex/model/child_tracker.cpp create mode 100644 phlex/model/child_tracker.hpp create mode 100644 phlex/model/data_cell_tracker.cpp create mode 100644 phlex/model/data_cell_tracker.hpp create mode 100644 test/child_tracker_test.cpp create mode 100644 test/data_cell_tracker_test.cpp diff --git a/phlex/model/CMakeLists.txt b/phlex/model/CMakeLists.txt index 340ac8ea..5e9bb87c 100644 --- a/phlex/model/CMakeLists.txt +++ b/phlex/model/CMakeLists.txt @@ -4,7 +4,9 @@ cet_make_library( SHARED SOURCE algorithm_name.cpp + child_tracker.cpp data_cell_counter.cpp + data_cell_tracker.cpp fixed_hierarchy.cpp data_layer_hierarchy.cpp data_cell_index.cpp @@ -29,7 +31,9 @@ install( fixed_hierarchy.hpp fwd.hpp handle.hpp + child_tracker.hpp data_cell_counter.hpp + data_cell_tracker.hpp data_layer_hierarchy.hpp data_cell_index.hpp identifier.hpp diff --git a/phlex/model/child_tracker.cpp b/phlex/model/child_tracker.cpp new file mode 100644 index 00000000..57b6225f --- /dev/null +++ b/phlex/model/child_tracker.cpp @@ -0,0 +1,106 @@ +#include "phlex/model/child_tracker.hpp" + +#include "spdlog/spdlog.h" + +#include +#include +#include +#include + +namespace phlex::experimental { + + child_tracker::child_tracker(data_cell_index_ptr index, std::size_t expected_flush_count) : + index_{std::move(index)}, expected_flush_count_{expected_flush_count} + { + } + + std::size_t child_tracker::expected_total_count() const + { + return std::ranges::fold_left(expected_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t child_tracker::processed_total_count() const + { + return std::ranges::fold_left(processed_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t child_tracker::committed_total_count() const + { + return std::ranges::fold_left(committed_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t child_tracker::committed_count_for_layer( + data_cell_index::hash_type const layer_hash) const + { + return committed_counts_.count(layer_hash); + } + + void child_tracker::update_committed_counts(data_cell_counts const& committed_counts) + { + for (auto const& [layer_hash, count] : committed_counts) { + committed_counts_.add_to(layer_hash, count); + } + } + + void child_tracker::update_expected_counts(data_cell_counts const& expected_counts) + { + for (auto const& [layer_hash, count] : expected_counts) { + expected_counts_.add_to(layer_hash, count); + } + ++received_flush_count_; + } + + void child_tracker::update_expected_count(data_cell_index::hash_type const layer_hash, + std::size_t const count) + { + expected_counts_.add_to(layer_hash, count); + ++received_flush_count_; + } + + void child_tracker::send_flush() + { + if (flush_callback_) { + flush_callback_(*this); + } else { + spdlog::warn("No flush callback set for index: {}", index_->to_string()); + } + } + + bool child_tracker::all_children_accounted() + { + auto const received = received_flush_count_.load(); + if (received == 0) { + return false; + } + + // Block until all flush counts expected from unfolds have arrived so that expected_counts_ + // reflects the union of all child layers. + if (expected_flush_count_ > 0 and received < expected_flush_count_) { + return false; + } + + // All expected flush messages have arrived; check that processed child counts match. + bool const result = std::ranges::all_of(expected_counts_, [this](auto const& entry) { + auto const& [layer_hash, expected] = entry; + return processed_counts_.count(layer_hash) == expected.load(); + }); + + if (result) { + std::call_once(commit_once_, [this] { commit(); }); + } + + return result; + } + + void child_tracker::commit() + { + for (auto const& [layer_hash, count] : processed_counts_) { + committed_counts_.add_to(layer_hash, count.load()); + } + + // At some point, we might consider clearing the processed_counts_ and expected_counts_ maps + // to free memory, but for now we can just leave them as-is since the child_tracker will + // likely be destroyed soon after commit() is called. + } + +} // namespace phlex::experimental diff --git a/phlex/model/child_tracker.hpp b/phlex/model/child_tracker.hpp new file mode 100644 index 00000000..9afa267f --- /dev/null +++ b/phlex/model/child_tracker.hpp @@ -0,0 +1,68 @@ +#ifndef PHLEX_MODEL_CHILD_TRACKER_HPP +#define PHLEX_MODEL_CHILD_TRACKER_HPP + +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" +#include "phlex/phlex_model_export.hpp" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + + class PHLEX_MODEL_EXPORT child_tracker { + using flush_callback_t = std::function; + + public: + // expected_flush_count controls how many update_expected_count[s]() calls must arrive before + // all_children_accounted() can return true. A value of 0 means a single call is sufficient + // (the common case when only one unfold consumes this index's layer). A value greater than 1 + // means that multiple unfolds produce children from the same parent layer. + explicit child_tracker(data_cell_index_ptr index, std::size_t expected_flush_count); + + data_cell_index_ptr const index() const { return index_; } + std::size_t expected_total_count() const; + std::size_t processed_total_count() const; + std::size_t committed_total_count() const; + std::size_t committed_count_for_layer(data_cell_index::hash_type layer_hash) const; + data_cell_counts const& committed_counts() const { return committed_counts_; } + + // Merges expected_counts into the accumulated expected counts. Each call represents one + // flush message arriving (e.g. one unfold completing for this index). + void update_expected_counts(data_cell_counts const& expected_counts); + // Single-entry variant used when an unfold reports its child count directly (no map needed). + void update_expected_count(data_cell_index::hash_type layer_hash, std::size_t count); + void update_committed_counts(data_cell_counts const& committed_counts); + void increment(data_cell_index::hash_type const layer_hash) + { + processed_counts_.increment(layer_hash); + } + + void set_flush_callback(flush_callback_t callback) { flush_callback_ = std::move(callback); } + void send_flush(); + bool all_children_accounted(); + + private: + void commit(); + + data_cell_index_ptr const index_; + std::once_flag commit_once_; + data_cell_counts committed_counts_; + data_cell_counts processed_counts_; + // Accumulated expected child counts from all unfolds. + data_cell_counts expected_counts_; + std::atomic received_flush_count_{0}; + // Number of flush messages expected from unfolds. Zero means any single + // update_expected_count[s]() call is sufficient to unblock all_children_accounted(). + std::size_t expected_flush_count_{0}; + flush_callback_t flush_callback_; + }; + + using child_tracker_ptr = std::shared_ptr; + +} // namespace phlex::experimental + +#endif // PHLEX_MODEL_CHILD_TRACKER_HPP diff --git a/phlex/model/data_cell_tracker.cpp b/phlex/model/data_cell_tracker.cpp new file mode 100644 index 00000000..a82b7bfb --- /dev/null +++ b/phlex/model/data_cell_tracker.cpp @@ -0,0 +1,150 @@ +#include "phlex/model/data_cell_tracker.hpp" + +#include "spdlog/spdlog.h" + +#include +#include +#include + +namespace { + auto make_data_cell_counts(phlex::data_cell_index_ptr const& index) + { + auto result = std::make_shared(); + result->emplace(index->layer_hash(), 1); + return result; + } +} + +namespace phlex::experimental { + + // ========================================================================================= + // data_cell_counts implementation + void data_cell_counts::emplace(std::size_t layer_hash, std::size_t value) + { + map_.emplace(layer_hash, value); + } + + // ========================================================================================= + // data_cell_tracker implementation + data_cell_tracker::~data_cell_tracker() + { + if (pending_flushes_.empty()) { + return; + } + spdlog::warn("Cached pending flushes at destruction:"); + for (auto const& [index, flush_counts] : pending_flushes_ | std::views::values) { + spdlog::warn(" Index: {}", index->to_string()); + for (auto const& [layer_hash, count] : *flush_counts) { + spdlog::warn(" {} = {}", layer_hash, count.load()); + } + } + } + + index_flushes data_cell_tracker::closeout(data_cell_index_ptr const& received_index) + { + // Always update the cached index. The logic below uses the previous cached index to + // determine what flushes to emit. + auto cached_index = std::exchange(cached_index_, received_index); + + // Just beginning job (or ending a job that immediately threw an exception) + if (cached_index == nullptr) { + return {}; + } + + // Ending job. Backout to the job layer and emit flush tokens for all closed-out indices. + // + // Example: + // Current index: [run: 4, spill: 7] + // Received index: nullptr (end of job) + // Actions: + // a. Emit flush token for [run: 4, spill: 7] + // b. Emit flush token for [run: 4] + // c. Remove remaining flushes from cache + if (received_index == nullptr) { + // The "std::move" empties the cache, so we don't need to manually clear it after. + return std::move(pending_flushes_) | std::views::values | std::ranges::to(); + } + + assert(received_index); + assert(cached_index); + + // A parent must exist at this point + auto received_parent = received_index->parent(); + assert(received_parent); + + // Received index is immediate child of current index. + // + // Example: + // Current index: [run: 4] + // Received index: [run: 4, spill: 6] + // Actions: + // a. Initialize count for [run: 4] + if (received_parent->hash() == cached_index->hash()) { + create_parent_count(received_parent, received_index); + return {}; + } + + auto cached_parent = cached_index->parent(); + + // Received index is a sibling of the current index. Increment parent count and move + // current index to the new sibling. + // + // Example: + // Current index: [run: 4, spill: 6] + // Received index: [run: 4, spill: 7] + // Actions: + // a. Increment count for [run: 4] + if (cached_parent and received_parent->hash() == cached_parent->hash()) { + increment_parent_count(received_parent, received_index); + return {}; + } + + // Received index is a parent of the cached index. This means we've closed out the + // cached index and all of its siblings, and are moving back up the hierarchy. We need + // to emit flush tokens for all closed-out indices. + // + // Example: + // Cached index: [run: 4, spill: 6, subspill: 2, subsubspill: 3] + // Received index: [run: 4, spill: 7] + // Actions: + // a. Emit flush token for [run: 4, spill: 6, subspill: 2] + // b. Emit flush token for [run: 4, spill: 6] + // c. Remove relevant flush tokens from cache + // d. Increment count for [run: 4] + index_flushes result; + auto recursive_parent = cached_parent; + while (recursive_parent != received_parent) { + if (recursive_parent == nullptr) { + // We will get here if the received parent is at a layer lower than the cached parent and is + // not an immediate child of the cached parent. The recursive parent walks all the way back + // up to the root without finding the received parent. This means the received index is not + // an ancestor of the cached index, and is invalid. + throw std::runtime_error( + fmt::format("Received index {}, which is not an immediate child of {}", + received_index->to_string(), + cached_index->to_string())); + } + auto fh = pending_flushes_.extract(recursive_parent->hash()); + result.push_back(fh.mapped()); + recursive_parent = recursive_parent->parent(); + } + increment_parent_count(received_parent, received_index); + return result; + } + + void data_cell_tracker::create_parent_count(data_cell_index_ptr const& parent, + data_cell_index_ptr const& child) + { + pending_flushes_.emplace(parent->hash(), + index_flush{.index = parent, .counts = make_data_cell_counts(child)}); + } + + void data_cell_tracker::increment_parent_count(data_cell_index_ptr const& parent, + data_cell_index_ptr const& child) + { + auto it = pending_flushes_.find(parent->hash()); + // This is only called for siblings, so the parent count must already exist in the cache. + assert(it != pending_flushes_.end()); + it->second.counts->increment(child->layer_hash()); + } +} diff --git a/phlex/model/data_cell_tracker.hpp b/phlex/model/data_cell_tracker.hpp new file mode 100644 index 00000000..ba37574f --- /dev/null +++ b/phlex/model/data_cell_tracker.hpp @@ -0,0 +1,92 @@ +#ifndef PHLEX_MODEL_DATA_CELL_TRACKER_HPP +#define PHLEX_MODEL_DATA_CELL_TRACKER_HPP + +#include "phlex/phlex_model_export.hpp" + +#include "phlex/model/data_cell_index.hpp" + +#include "oneapi/tbb/concurrent_unordered_map.h" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + class PHLEX_MODEL_EXPORT data_cell_counts { + public: + void emplace(std::size_t layer_hash, std::size_t value); + + void increment(data_cell_index::hash_type layer_hash) { ++map_[layer_hash]; } + void add_to(std::size_t layer_hash, std::size_t value) { map_[layer_hash] += value; } + + auto begin() const { return map_.begin(); } + auto end() const { return map_.end(); } + + auto size() const { return map_.size(); } + + std::size_t count(data_cell_index::hash_type layer_hash) const + { + auto it = map_.find(layer_hash); + return it != map_.end() ? it->second.load() : 0; + } + + private: + tbb::concurrent_unordered_map> map_; + }; + + using data_cell_counts_ptr = std::shared_ptr; + using data_cell_counts_const_ptr = std::shared_ptr; + + struct PHLEX_MODEL_EXPORT index_flush { + data_cell_index_ptr index; + // Ideally, the counts field should be a `data_cell_counts_const_ptr` to ensure immutability. + // However, this type is also used for incrementing counters, so it must be mutable. + data_cell_counts_ptr counts; + }; + + using index_flushes = std::vector; + + // A simpler flush message sent by an unfold to the index_router. Unlike index_flush, which + // carries a map of child counts, unfold_flush carries a single (layer_hash, count) pair + // because each unfold produces children in exactly one child layer. + struct PHLEX_MODEL_EXPORT unfold_flush { + data_cell_index_ptr index; + data_cell_index::hash_type layer_hash{}; + std::size_t count{}; + }; + + // The `closeout_then_emit` struct carries flushes that must be emitted + // (to close out already-emitted indices) before emitting `index_to_emit`. + struct PHLEX_MODEL_EXPORT closeout_then_emit { + index_flushes closeout_flushes{}; + data_cell_index_ptr index_to_emit{nullptr}; + }; + + class PHLEX_MODEL_EXPORT data_cell_tracker { + public: + data_cell_tracker() = default; + ~data_cell_tracker(); + + data_cell_tracker(data_cell_tracker const&) = delete; + data_cell_tracker(data_cell_tracker&&) = delete; + data_cell_tracker& operator=(data_cell_tracker const&) = delete; + data_cell_tracker& operator=(data_cell_tracker&&) = delete; + + // Computes and returns the set of indices whose processing is now complete, given that + // the next index to be processed is `index`. A null `index` signals end-of-job and + // returns all remaining pending flushes. + index_flushes closeout(data_cell_index_ptr const& index); + + private: + void create_parent_count(data_cell_index_ptr const& parent, data_cell_index_ptr const& child); + void increment_parent_count(data_cell_index_ptr const& parent, + data_cell_index_ptr const& child); + + data_cell_index_ptr cached_index_{nullptr}; + std::map pending_flushes_; + }; +} + +#endif // PHLEX_MODEL_DATA_CELL_TRACKER_HPP diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 79b60c0f..b2d126c8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -176,6 +176,14 @@ cet_test(data_cell_index USE_CATCH2_MAIN SOURCE data_cell_index.cpp LIBRARIES cet_test(fixed_hierarchy_test USE_CATCH2_MAIN SOURCE fixed_hierarchy_test.cpp LIBRARIES phlex::model ) +cet_test(data_cell_tracker USE_CATCH2_MAIN SOURCE data_cell_tracker_test.cpp LIBRARIES + phlex::model + TBB::tbb +) +cet_test(child_tracker USE_CATCH2_MAIN SOURCE child_tracker_test.cpp LIBRARIES + phlex::model_internal + TBB::tbb +) cet_test(product_handle USE_CATCH2_MAIN SOURCE product_handle.cpp LIBRARIES phlex::core_internal ) diff --git a/test/child_tracker_test.cpp b/test/child_tracker_test.cpp new file mode 100644 index 00000000..36cca22e --- /dev/null +++ b/test/child_tracker_test.cpp @@ -0,0 +1,273 @@ +// ======================================================================================= +// Unit tests for child_tracker covering: +// - single- and two-layer index hierarchies (job → runs → spills) +// - grandchild count propagation via update_committed_counts +// - blocking on expected_flush_count > 1 (multiple unfolds into the same parent layer) +// - all_children_accounted() returning false before any flush message arrives +// - concurrent execution via tbb::parallel_for with concurrent_hash_map/concurrent_vector +// +// The local flush_if_done() helper mirrors the core propagation logic from index_router, +// allowing child_tracker to be tested without a TBB flow graph. +// ======================================================================================= + +#include "phlex/model/child_tracker.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" +#include "phlex/model/identifier.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_vector.h" +#include "oneapi/tbb/parallel_for.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/spdlog.h" + +#include +#include + +using namespace phlex; +using namespace phlex::experimental; +using namespace phlex::experimental::literals; + +namespace { + + void use_ostream_logger(std::ostringstream& oss) + { + auto ostream_sink = std::make_shared(oss); + auto ostream_logger = std::make_shared("my_logger", ostream_sink); + spdlog::set_default_logger(ostream_logger); + } + + // trackers_t maps index->hash() -> child_tracker_ptr. + // 'flushed' accumulates all trackers whose send_flush() was invoked. + using trackers_t = tbb::concurrent_hash_map; + using flushed_t = tbb::concurrent_vector; + + child_tracker_ptr make_tracker(data_cell_index_ptr index, std::size_t expected_flush_count) + { + auto tracker = std::make_shared(std::move(index), expected_flush_count); + tracker->set_flush_callback([](child_tracker const&) {}); + return tracker; + } + + void flush_if_done(data_cell_index_ptr index, trackers_t& trackers, flushed_t& flushed) + { + while (index) { + trackers_t::accessor a; + if (not trackers.find(a, index->hash())) { + return; + } + auto& tracker = a->second; + if (not tracker->all_children_accounted()) { + return; + } + + flushed.push_back(tracker); + tracker->send_flush(); + + auto parent = index->parent(); + if (parent) { + if (trackers_t::accessor pa; trackers.find(pa, parent->hash())) { + pa->second->update_committed_counts(tracker->committed_counts()); + pa->second->increment(index->layer_hash()); + } + } + trackers.erase(a); + index = parent; + } + } +} + +TEST_CASE("child_tracker: no registered callback", "[child_tracker]") +{ + std::ostringstream oss; + use_ostream_logger(oss); + auto tracker = std::make_shared(data_cell_index::job(), 0); + tracker->send_flush(); + CHECK_THAT(oss.str(), Catch::Matchers::ContainsSubstring("No flush callback set for index: []")); +} + +TEST_CASE("child_tracker: single-layer hierarchy (job -> runs)", "[child_tracker]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run1 = job->make_child("run", 1); + auto run_layer_hash = run0->layer_hash(); + + // The unfold into run fires once, reporting 2 children. + auto job_tracker = make_tracker(job, 0); + job_tracker->update_expected_count(run_layer_hash, 2); + + trackers_t trackers; + trackers.emplace(job->hash(), job_tracker); + + flushed_t flushed; + + // Run 0 completes — job tracker not yet done (run 1 still outstanding). + job_tracker->increment(run_layer_hash); + flush_if_done(job, trackers, flushed); + CHECK(flushed.empty()); + + // Run 1 completes — job tracker is now done. + job_tracker->increment(run_layer_hash); + flush_if_done(job, trackers, flushed); + + REQUIRE(flushed.size() == 1); + auto const& jt = flushed[0]; + CHECK(jt->index() == job); + CHECK(jt->expected_total_count() == 2); + CHECK(jt->processed_total_count() == 2); + CHECK(jt->committed_count_for_layer(run_layer_hash) == 2); + CHECK(trackers.empty()); +} + +TEST_CASE("child_tracker: two-layer hierarchy (job -> runs -> spills)", "[child_tracker]") +{ + constexpr std::size_t n_runs = 3; + constexpr std::size_t n_spills = 2; + + auto job = data_cell_index::job(); + auto run0 = + job->make_child("run", 0); // representative child; layer_hash is the same for all runs + auto run_layer_hash = run0->layer_hash(); + auto spill0 = run0->make_child("spill", 0); + auto spill_layer_hash = spill0->layer_hash(); + + trackers_t trackers; + flushed_t flushed; + + // Create the job-layer tracker. + auto job_tracker = make_tracker(job, 0); + job_tracker->update_expected_count(run_layer_hash, n_runs); + trackers.emplace(job->hash(), job_tracker); + + // Pre-populate all run trackers before running in parallel, so that flush_if_done + // can always find the job tracker when a run completes. + std::vector runs; + runs.reserve(n_runs); + for (std::size_t const r : std::views::iota(0uz, n_runs)) { + auto& run_index = runs.emplace_back(job->make_child("run", r)); + auto run_tracker = make_tracker(run_index, 0); + run_tracker->update_expected_count(spill_layer_hash, n_spills); + trackers.emplace(run_index->hash(), run_tracker); + } + + // Process each run's spills in parallel. + tbb::parallel_for(0uz, n_runs, [&](std::size_t r) { + trackers_t::const_accessor a; + trackers.find(a, runs[r]->hash()); + auto const& run_tracker = a->second; + a.release(); + for (std::size_t s = 0uz; s < n_spills; ++s) { + run_tracker->increment(spill_layer_hash); + flush_if_done(runs[r], trackers, flushed); + } + }); + + REQUIRE(flushed.size() == n_runs + 1); // one per run + the job + + // Insertion order into flushed is non-deterministic under parallel execution, + // so partition by layer name rather than relying on position. + child_tracker_ptr job_flushed; + for (auto const& ft : flushed) { + if (ft->index()->layer_name() == "run"_id) { + CHECK(ft->expected_total_count() == n_spills); + CHECK(ft->processed_total_count() == n_spills); + CHECK(ft->committed_count_for_layer(spill_layer_hash) == n_spills); + } else { + REQUIRE(ft->index() == job); + job_flushed = ft; + } + } + + REQUIRE(job_flushed); + CHECK(job_flushed->expected_total_count() == n_runs); + CHECK(job_flushed->processed_total_count() == n_runs); + // Immediate children (runs) counted directly. + CHECK(job_flushed->committed_count_for_layer(run_layer_hash) == n_runs); + // Grandchildren (spills) propagated up from the run trackers. + CHECK(job_flushed->committed_count_for_layer(spill_layer_hash) == n_runs * n_spills); + + CHECK(trackers.empty()); +} + +TEST_CASE("child_tracker: multiple unfolds into the same parent layer", "[child_tracker]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run_layer_hash = run0->layer_hash(); + + // Two separate unfolds each produce 2 runs — expected_flush_count = 2. + auto job_tracker = make_tracker(job, 2); + + // Both expected-count messages arrive, each reporting 2 children. + job_tracker->update_expected_count(run_layer_hash, 2); + CHECK_FALSE(job_tracker->all_children_accounted()); // second flush not yet received + + job_tracker->update_expected_count(run_layer_hash, 2); + // expected total is now 4, but no children have been processed yet. + CHECK_FALSE(job_tracker->all_children_accounted()); + + // Process all 4 children one by one; only the last call should commit. + for (int child = 0; child < 3; ++child) { + job_tracker->increment(run_layer_hash); + CHECK_FALSE(job_tracker->all_children_accounted()); + } + job_tracker->increment(run_layer_hash); + CHECK(job_tracker->all_children_accounted()); + + CHECK(job_tracker->expected_total_count() == 4); + CHECK(job_tracker->processed_total_count() == 4); + CHECK(job_tracker->committed_count_for_layer(run_layer_hash) == 4); +} + +TEST_CASE("child_tracker: not done before any flush message arrives", "[child_tracker]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run_layer_hash = run0->layer_hash(); + + auto tracker = make_tracker(job, 0); + + // No update_expected_count call yet — must return false. + CHECK_FALSE(tracker->all_children_accounted()); + + tracker->update_expected_count(run_layer_hash, 1); + CHECK_FALSE(tracker->all_children_accounted()); + CHECK(tracker->expected_total_count() == 1); + CHECK(tracker->processed_total_count() == 0); + + tracker->increment(run_layer_hash); + CHECK(tracker->all_children_accounted()); +} + +TEST_CASE("child_tracker: update_committed_counts accumulates across multiple children", + "[child_tracker]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run1 = job->make_child("run", 1); + auto spill0 = run0->make_child("spill", 0); + auto spill_layer_hash = spill0->layer_hash(); + auto run_layer_hash = run0->layer_hash(); + + auto job_tracker = make_tracker(job, 0); + job_tracker->update_expected_count(run_layer_hash, 2); + + // Simulate run 0 committing with 3 spills. + data_cell_counts run0_committed; + run0_committed.add_to(spill_layer_hash, 3); + job_tracker->update_committed_counts(run0_committed); + job_tracker->increment(run_layer_hash); + + // Simulate run 1 committing with 5 spills. + data_cell_counts run1_committed; + run1_committed.add_to(spill_layer_hash, 5); + job_tracker->update_committed_counts(run1_committed); + job_tracker->increment(run_layer_hash); + + REQUIRE(job_tracker->all_children_accounted()); + CHECK(job_tracker->committed_count_for_layer(spill_layer_hash) == 8); + CHECK(job_tracker->committed_count_for_layer(run_layer_hash) == 2); +} diff --git a/test/data_cell_tracker_test.cpp b/test/data_cell_tracker_test.cpp new file mode 100644 index 00000000..5afc4005 --- /dev/null +++ b/test/data_cell_tracker_test.cpp @@ -0,0 +1,114 @@ +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/spdlog.h" + +using namespace phlex; +using namespace phlex::experimental; + +namespace { + void use_ostream_logger(std::ostringstream& oss) + { + auto ostream_sink = std::make_shared(oss); + auto ostream_logger = std::make_shared("my_logger", ostream_sink); + spdlog::set_default_logger(ostream_logger); + } +} + +TEST_CASE("Test data-cell tracker", "[graph]") +{ + data_cell_tracker tracker; + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + auto spill5 = run4->make_child("spill", 5); + auto spill6 = run4->make_child("spill", 6); + auto subspill2 = spill6->make_child("subspill", 2); + auto run5 = job_index->make_child("run", 5); + + CHECK(tracker.closeout(job_index).empty()); + CHECK(tracker.closeout(run4).empty()); + CHECK(tracker.closeout(spill5).empty()); + CHECK(tracker.closeout(spill6).empty()); + CHECK(tracker.closeout(subspill2).empty()); + + auto flushes = tracker.closeout(run5); + REQUIRE(flushes.size() == 2); + + auto spill6_flush = flushes[0]; + CHECK(spill6_flush.index == spill6); + REQUIRE(spill6_flush.counts->size() == 1); // Should only be "subspill" layer + CHECK(spill6_flush.counts->count(subspill2->layer_hash()) == 1); // subspill 2 + + auto run4_flush = flushes[1]; + CHECK(run4_flush.index == run4); + REQUIRE(run4_flush.counts->size() == 1); // Should only be "spill" layer + CHECK(run4_flush.counts->count(spill5->layer_hash()) == 2); // spills 5 and 6 + + flushes = tracker.closeout(nullptr); + REQUIRE(flushes.size() == 1); // only job should have a flush count + + auto job_flush = flushes[0]; + CHECK(job_flush.index == job_index); + REQUIRE(job_flush.counts->size() == 1); // Should only be "run" layer + CHECK(job_flush.counts->count(run4->layer_hash()) == 2); // runs 4 and 5 +} + +TEST_CASE("Test data-cell tracker with multiple hierarchy branches", "[graph]") +{ + data_cell_tracker tracker; + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + auto calib1 = job_index->make_child("calib", 1); + auto run5 = job_index->make_child("run", 5); + + CHECK(tracker.closeout(job_index).empty()); + CHECK(tracker.closeout(run4).empty()); + CHECK(tracker.closeout(calib1).empty()); + CHECK(tracker.closeout(run5).empty()); + + auto flushes = tracker.closeout(nullptr); + REQUIRE(flushes.size() == 1); // only job should have a flush count + auto job_flush = flushes[0]; + CHECK(job_flush.index == job_index); + REQUIRE(job_flush.counts->size() == 2); // Should have "run" and "calib" layers + CHECK(job_flush.counts->count(run4->layer_hash()) == 2); // run 4 and 5 + CHECK(job_flush.counts->count(calib1->layer_hash()) == 1); // calib 1 +} + +TEST_CASE("Test data-cell tracker with missing intermediate layers", "[graph]") +{ + data_cell_tracker tracker; + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + auto spill2 = run4->make_child("spill", 2); + + CHECK(tracker.closeout(job_index).empty()); + + CHECK_THROWS_WITH(tracker.closeout(spill2), + "Received index [run:4, spill:2], which is not an immediate child of []"); +} + +TEST_CASE("Cached flush counts at destruction generate warning message", "[graph]") +{ + std::ostringstream oss; + use_ostream_logger(oss); + auto tracker = std::make_unique(); + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + + CHECK(tracker->closeout(job_index).empty()); + CHECK(tracker->closeout(run4).empty()); + + tracker.reset(); // Invoke destructor to trigger warning message + auto const warning = oss.str(); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Cached pending flushes at destruction:")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Index: []")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("7457871974376244100 = 1")); +} From e9702f3594f49767e581f359490320fd8d7ca0d6 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Mon, 27 Apr 2026 17:54:59 -0500 Subject: [PATCH 3/5] Support multi-layer transforms with products from unfolds Also: - Removes now stale code - Renames test --- phlex/core/declared_unfold.cpp | 15 +- phlex/core/declared_unfold.hpp | 50 ++- phlex/core/edge_maker.hpp | 26 +- phlex/core/framework_graph.cpp | 105 ++++--- phlex/core/framework_graph.hpp | 10 +- phlex/core/index_router.cpp | 485 +++++++++++++++++++---------- phlex/core/index_router.hpp | 159 +++++----- phlex/core/message.hpp | 2 +- phlex/core/store_counters.cpp | 9 +- phlex/core/store_counters.hpp | 11 +- phlex/model/CMakeLists.txt | 9 +- phlex/model/child_tracker.cpp | 106 ------- phlex/model/child_tracker.hpp | 68 ---- phlex/model/data_cell_counter.cpp | 65 ---- phlex/model/data_cell_counter.hpp | 76 ----- phlex/model/data_cell_counts.cpp | 8 + phlex/model/data_cell_counts.hpp | 36 +++ phlex/model/data_cell_tracker.cpp | 13 +- phlex/model/data_cell_tracker.hpp | 59 +--- phlex/model/fixed_hierarchy.cpp | 7 +- phlex/model/fixed_hierarchy.hpp | 4 + phlex/model/flush_gate.cpp | 100 ++++++ phlex/model/flush_gate.hpp | 108 +++++++ phlex/model/flush_messages.hpp | 39 +++ phlex/model/fwd.hpp | 6 +- test/CMakeLists.txt | 6 +- test/child_tracker_test.cpp | 273 ---------------- test/data_cell_counting.cpp | 144 --------- test/data_cell_tracker_test.cpp | 35 ++- test/data_layer_hierarchy_test.cpp | 46 +++ test/flush_gate_test.cpp | 237 ++++++++++++++ 31 files changed, 1144 insertions(+), 1173 deletions(-) delete mode 100644 phlex/model/child_tracker.cpp delete mode 100644 phlex/model/child_tracker.hpp delete mode 100644 phlex/model/data_cell_counter.cpp delete mode 100644 phlex/model/data_cell_counter.hpp create mode 100644 phlex/model/data_cell_counts.cpp create mode 100644 phlex/model/data_cell_counts.hpp create mode 100644 phlex/model/flush_gate.cpp create mode 100644 phlex/model/flush_gate.hpp create mode 100644 phlex/model/flush_messages.hpp delete mode 100644 test/child_tracker_test.cpp delete mode 100644 test/data_cell_counting.cpp create mode 100644 test/data_layer_hierarchy_test.cpp create mode 100644 test/flush_gate_test.cpp diff --git a/phlex/core/declared_unfold.cpp b/phlex/core/declared_unfold.cpp index d56cfde9..f3849baf 100644 --- a/phlex/core/declared_unfold.cpp +++ b/phlex/core/declared_unfold.cpp @@ -1,6 +1,6 @@ #include "phlex/core/declared_unfold.hpp" -#include "phlex/model/data_cell_counter.hpp" #include "phlex/model/handle.hpp" +#include "phlex/utilities/hashing.hpp" #include "fmt/std.h" #include "spdlog/spdlog.h" @@ -12,25 +12,18 @@ namespace phlex::experimental { std::string const& child_layer_name) : parent_{std::const_pointer_cast(parent)}, node_name_{std::move(node_name)}, - child_layer_name_{child_layer_name} + child_layer_name_{child_layer_name}, + child_layer_hash_{hash(parent->index()->layer_hash(), identifier{child_layer_name_}.hash())} { } product_store_const_ptr generator::make_child(std::size_t const i, products new_products) { auto child_index = parent_->index()->make_child(child_layer_name_, i); - ++child_counts_[child_index->layer_hash()]; + ++child_counts_; return std::make_shared(child_index, node_name_, std::move(new_products)); } - flush_counts_ptr generator::flush_result() const - { - if (not child_counts_.empty()) { - return std::make_shared(child_counts_); - } - return nullptr; - } - declared_unfold::declared_unfold(algorithm_name name, std::vector predicates, product_queries input_products, diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 76855927..d773f26a 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -11,6 +11,7 @@ #include "phlex/core/products_consumer.hpp" #include "phlex/model/algorithm_name.hpp" #include "phlex/model/data_cell_index.hpp" +#include "phlex/model/flush_messages.hpp" #include "phlex/model/handle.hpp" #include "phlex/model/identifier.hpp" #include "phlex/model/product_specification.hpp" @@ -39,22 +40,19 @@ namespace phlex::experimental { explicit generator(product_store_const_ptr const& parent, algorithm_name node_name, std::string const& child_layer_name); - flush_counts_ptr flush_result() const; - product_store_const_ptr make_child_for(std::size_t const data_cell_number, - products new_products) - { - return make_child(data_cell_number, std::move(new_products)); - } + std::size_t child_layer_hash() const { return child_layer_hash_; } + std::size_t child_count() const { return child_counts_; } + product_store_const_ptr make_child(std::size_t i, products new_products); private: - product_store_const_ptr make_child(std::size_t i, products new_products); product_store_ptr parent_; algorithm_name node_name_; // References declared_unfold::child_layer_, which outlives this short-lived object. // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) std::string const& child_layer_name_; - std::map child_counts_; + std::size_t child_layer_hash_; + std::size_t child_counts_ = 0; }; class PHLEX_CORE_EXPORT declared_unfold : public products_consumer { @@ -66,10 +64,10 @@ namespace phlex::experimental { ~declared_unfold() override; virtual tbb::flow::sender& output_port() = 0; - virtual tbb::flow::sender& output_index_port() = 0; + virtual tbb::flow::sender& output_index_port() = 0; + virtual tbb::flow::sender& flush_sender() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; - virtual flusher_t& flusher() = 0; std::string const& child_layer() const noexcept { return child_layer_; } @@ -109,19 +107,16 @@ namespace phlex::experimental { unfold_{g, concurrency, [this, p = std::move(predicate), ufold = std::move(unfold)]( - messages_t const& messages, auto&) { + messages_t const& messages, auto& outputs) { auto const& msg = most_derived(messages); auto const& store = msg.store; - std::size_t const original_message_id{msg_counter_}; generator g{store, this->full_name(), child_layer()}; call(p, ufold, store->index(), g, messages, std::make_index_sequence{}); - - flusher_.try_put({.index = store->index(), - .counts = g.flush_result(), - .original_id = original_message_id}); - }}, - flusher_{g} + std::get<2>(outputs).try_put({.index = store->index(), + .layer_hash = g.child_layer_hash(), + .count = g.child_count()}); + }} { if constexpr (num_inputs > 1ull) { make_edge(join_, unfold_); @@ -142,12 +137,15 @@ namespace phlex::experimental { { return tbb::flow::output_port<0>(unfold_); } - tbb::flow::sender& output_index_port() override + tbb::flow::sender& output_index_port() override { return tbb::flow::output_port<1>(unfold_); } + tbb::flow::sender& flush_sender() override + { + return tbb::flow::output_port<2>(unfold_); + } product_specifications const& output() const override { return output_; } - flusher_t& flusher() override { return flusher_; } template void call(Predicate const& predicate, @@ -181,10 +179,10 @@ namespace phlex::experimental { } ++product_count_; - auto child = g.make_child_for(counter++, std::move(new_products)); - tbb::flow::output_port<0>(unfold_).try_put( - {.store = child, .id = msg_counter_.fetch_add(1)}); - tbb::flow::output_port<1>(unfold_).try_put(child->index()); + auto child = g.make_child(counter++, std::move(new_products)); + auto const msg_id = msg_counter_.fetch_add(1); + tbb::flow::output_port<0>(unfold_).try_put({.store = child, .id = msg_id}); + tbb::flow::output_port<1>(unfold_).try_put({.index = child->index(), .msg_id = msg_id}); } } @@ -195,9 +193,9 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; join_or_none_t join_; - tbb::flow::multifunction_node, std::tuple> + tbb::flow::multifunction_node, + std::tuple> unfold_; - flusher_t flusher_; std::atomic msg_counter_{}; // Is this sufficient? Probably not. std::atomic calls_{}; std::atomic product_count_{}; diff --git a/phlex/core/edge_maker.hpp b/phlex/core/edge_maker.hpp index 334c31e7..d8ee8dd2 100644 --- a/phlex/core/edge_maker.hpp +++ b/phlex/core/edge_maker.hpp @@ -33,12 +33,11 @@ namespace phlex::experimental { explicit edge_maker(Args&... args); template - void operator()(tbb::flow::graph& g, - index_router& multi, - std::map& filters, - declared_outputs& outputs, - provider_nodes& providers, - Args&... consumers); + std::tuple> + operator()(std::map& filters, + declared_outputs& outputs, + provider_nodes& providers, + Args&... consumers); private: template @@ -100,12 +99,11 @@ namespace phlex::experimental { } template - void edge_maker::operator()(tbb::flow::graph& g, - index_router& multi, - std::map& filters, - declared_outputs& outputs, - provider_nodes& providers, - Args&... consumers) + std::tuple> + edge_maker::operator()(std::map& filters, + declared_outputs& outputs, + provider_nodes& providers, + Args&... consumers) { // Create edges to outputs for (auto const& [output_name, output_node] : outputs) { @@ -126,7 +124,7 @@ namespace phlex::experimental { if (head_ports.empty()) { // This can happen for jobs that only execute the driver, which is helpful for debugging - return; + return {}; } auto provider_input_ports = make_provider_edges(std::move(head_ports), providers); @@ -134,7 +132,7 @@ namespace phlex::experimental { std::map multilayer_join_index_ports; (multilayer_join_index_ports.merge(multilayer_ports(consumers)), ...); - multi.finalize(g, std::move(provider_input_ports), std::move(multilayer_join_index_ports)); + return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports)); } } diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 3e32feb6..052f7d0e 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -2,7 +2,6 @@ #include "phlex/concurrency.hpp" #include "phlex/core/edge_maker.hpp" -#include "phlex/model/data_cell_counter.hpp" #include "phlex/model/product_store.hpp" #include "fmt/std.h" @@ -29,15 +28,21 @@ namespace phlex::experimental { fixed_hierarchy_{std::move(bundle.hierarchy)}, driver_{std::move(bundle.driver)}, src_{graph_, - [this](tbb::flow_control& fc) mutable -> data_cell_index_ptr { + [this](tbb::flow_control& fc) mutable -> ready_flushes_then_emit { if (auto item = driver_()) { - return index_router_.route(*item); + return {.ready_flushes = cell_tracker_.report_and_evict_ready_flushes(*item), + .index_to_emit = *item}; } - index_router_.drain(); fc.stop(); return {}; }}, index_router_{graph_}, + index_receiver_{graph_, + tbb::flow::unlimited, + [this](ready_flushes_then_emit const& input) -> data_cell_index_ptr { + auto&& [ready_flushes, index_to_emit] = input; + return index_router_.route(index_to_emit, ready_flushes); + }}, hierarchy_node_{graph_, tbb::flow::unlimited, [this](data_cell_index_ptr const& index) -> tbb::flow::continue_msg { @@ -53,7 +58,8 @@ namespace phlex::experimental { { if (shutdown_on_error_) { // When in an error state, we need to sanely pop the layer stack and wait for any tasks to finish. - index_router_.drain(); + auto remaining_flushes = cell_tracker_.report_and_evict_ready_flushes(nullptr); + index_router_.drain(std::move(remaining_flushes)); graph_.wait_for_all(); } } @@ -89,6 +95,10 @@ namespace phlex::experimental { { src_.activate(); graph_.wait_for_all(); + + // Now back out of all remaining layers + index_router_.drain(cell_tracker_.report_and_evict_ready_flushes(nullptr)); + graph_.wait_for_all(); } namespace { @@ -135,49 +145,64 @@ namespace phlex::experimental { filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.unfolds)); filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.transforms)); - edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds}; - make_edges(graph_, - index_router_, - filters_, - nodes_.outputs, - nodes_.providers, - nodes_.predicates, - nodes_.observers, - nodes_.folds, - nodes_.unfolds, - nodes_.transforms); - - std::map flushers_from_unfolds; + std::set unfold_input_layer_names; + // Count how many distinct unfold nodes consume each input layer. When that count is + // greater than one, the flush_gate for an index in that layer must collect a flush + // message from every unfold before it knows the total number of children it will see. + std::map unfold_count_per_input_layer; for (auto const& n : nodes_.unfolds | std::views::values) { - flushers_from_unfolds.try_emplace(identifier{n->child_layer()}, &n->flusher()); + for (auto const& input : n->input()) { + if (!static_cast(input.layer).empty()) { + unfold_input_layer_names.insert(input.layer); + ++unfold_count_per_input_layer[identifier{input.layer}]; + } + } } - // Connect edges between all nodes, the graph-wide flusher, and the unfolds' flushers - auto connect_with_flusher = - [this, unfold_flushers = std::move(flushers_from_unfolds)](auto& consumers) { - for (auto& n : consumers | std::views::values) { - std::set flushers; - // For providers - for (product_query const& pq : n->input()) { - if (auto it = unfold_flushers.find(pq.layer); it != unfold_flushers.end()) { - flushers.insert(it->second); - } else { - flushers.insert(&index_router_.flusher()); - } - } - for (flusher_t* flusher : flushers) { - make_edge(*flusher, n->flush_port()); - } - } - }; - connect_with_flusher(nodes_.folds); + std::vector unfold_output_layer_names; + for (auto const& n : nodes_.unfolds | std::views::values) { + unfold_output_layer_names.emplace_back(n->child_layer()); + } + + index_router_.establish_layers( + fixed_hierarchy_.layer_paths(), + std::vector(unfold_input_layer_names.begin(), unfold_input_layer_names.end()), + unfold_output_layer_names); + index_router_.register_unfold_count_per_input_layer(std::move(unfold_count_per_input_layer)); + + edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds}; + auto [provider_input_ports, multilayer_join_index_ports] = + make_edges(filters_, + nodes_.outputs, + nodes_.providers, + // Consumers of data products below + nodes_.predicates, + nodes_.observers, + nodes_.folds, + nodes_.unfolds, + nodes_.transforms); + if (not std::empty(provider_input_ports)) { + index_router_.finalize( + graph_, std::move(provider_input_ports), std::move(multilayer_join_index_ports)); + } // The hierarchy node is used to report which data layers have been seen by the // framework. To assemble the report, data-cell indices emitted by the input node are // recorded as well as any data-cell indices emitted by an unfold. - make_edge(src_, hierarchy_node_); + + // FIXME: Eventually the separate index_receiver_ and index_router_.index_receiver() may be combined. + // Should also consider whether inline tasks can be used. + make_edge(src_, index_receiver_); + make_edge(index_receiver_, hierarchy_node_); + make_edge(index_router_.index_receiver(), hierarchy_node_); + + for (auto& [_, node] : nodes_.folds) { + make_edge(index_router_.flusher(), node->flush_port()); + } + for (auto& [_, node] : nodes_.unfolds) { - make_edge(node->output_index_port(), hierarchy_node_); + make_edge(node->output_index_port(), index_router_.index_receiver()); + make_edge(node->flush_sender(), index_router_.flush_receiver()); } } } diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index 37391751..efbeb675 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -11,7 +11,9 @@ #include "phlex/core/message.hpp" #include "phlex/core/node_catalog.hpp" #include "phlex/driver.hpp" +#include "phlex/model/data_cell_tracker.hpp" #include "phlex/model/data_layer_hierarchy.hpp" +#include "phlex/model/flush_messages.hpp" #include "phlex/model/product_store.hpp" #include "phlex/module.hpp" #include "phlex/source.hpp" @@ -166,9 +168,13 @@ namespace phlex::experimental { tbb::flow::graph graph_{}; framework_driver driver_; std::vector registration_errors_{}; - tbb::flow::input_node src_; + data_cell_tracker cell_tracker_{}; + tbb::flow::input_node src_; index_router index_router_; - tbb::flow::function_node hierarchy_node_; + tbb::flow::function_node + index_receiver_; + tbb::flow::function_node + hierarchy_node_; bool shutdown_on_error_{false}; }; } diff --git a/phlex/core/index_router.cpp b/phlex/core/index_router.cpp index 82173149..3912cd0b 100644 --- a/phlex/core/index_router.cpp +++ b/phlex/core/index_router.cpp @@ -1,232 +1,321 @@ #include "phlex/core/index_router.hpp" -#include "phlex/model/product_store.hpp" + +#include "phlex/model/flush_gate.hpp" +#include "phlex/utilities/hashing.hpp" #include "fmt/std.h" #include "oneapi/tbb/flow_graph.h" #include "spdlog/spdlog.h" -#include #include +#include #include #include using namespace phlex::experimental; namespace { - std::string delimited_layer_path(std::string_view const layer_path) - { - if (not layer_path.starts_with("/")) { - return fmt::format("/{}", layer_path); - } - return std::string{layer_path}; - } + using layer_path_t = std::vector; - void send_messages(phlex::data_cell_index_ptr const& index, - std::size_t message_id, - phlex::experimental::detail::multilayer_slots const& slots) + std::size_t layer_hash_for_path(layer_path_t const& layer_path) { - for (auto& slot : slots) { - slot->put_message(index, message_id); + std::size_t result = "job"_id.hash(); + for (auto const& layer_name : layer_path | std::views::drop(1)) { + result = hash(result, identifier{layer_name}.hash()); } + return result; } -} - -namespace phlex::experimental { - //======================================================================================== - // layer_scope implementation - - detail::layer_scope::layer_scope(flush_counters& counters, - flusher_t& flusher, - detail::multilayer_slots const& slots_for_layer, - data_cell_index_ptr index, - std::size_t const message_id) : - counters_{counters}, - flusher_{flusher}, - slots_{slots_for_layer}, - index_{index}, - message_id_{message_id} + bool is_strict_prefix(layer_path_t const& candidate, layer_path_t const& other) { - // FIXME: Only for folds right now - counters_.update(index_); + // FIXME: Use std::ranges::starts_with(other, candidate) once the compilers support it (C++23) + return candidate.size() < other.size() and + std::ranges::mismatch(other, candidate).in2 == std::ranges::end(candidate); } - detail::layer_scope::~layer_scope() + std::string delimited_layer_path(std::string_view const layer_path) { - // To consider: We may want to skip the following logic if the framework prematurely - // needs to shut down. Keeping it enabled allows in-flight folds to - // complete. However, in some cases it may not be desirable to do this. - - for (auto& slot : slots_) { - slot->put_end_token(index_); - } - - // The following is for fold nodes only (temporary until the release of fold results are incorporated - // into the above paradigm). - auto flush_result = counters_.extract(index_); - flush_counts_ptr result; - if (not flush_result.empty()) { - result = std::make_shared(std::move(flush_result)); + if (not layer_path.starts_with("/")) { + return fmt::format("/{}", layer_path); } - flusher_.try_put({index_, std::move(result), message_id_}); + return std::string{layer_path}; } +} - std::size_t detail::layer_scope::depth() const { return index_->depth(); } +namespace phlex::experimental { //======================================================================================== // multilayer_slot implementation + namespace detail { + class multilayer_slot { + public: + multilayer_slot(tbb::flow::graph& g, + identifier layer, + tbb::flow::receiver* flush_port, + tbb::flow::receiver* input_port); + + void put_message(data_cell_index_ptr const& index, std::size_t message_id); + void put_end_token(data_cell_index_ptr const& index, flush_gate const& fc); + + bool matches_exactly(std::string const& layer_path) const; + bool is_parent_of(data_cell_index_ptr const& index) const; + + private: + identifier layer_; + index_set_node broadcaster_; + flush_node flusher_; + }; + + multilayer_slot::multilayer_slot(tbb::flow::graph& g, + identifier layer, + tbb::flow::receiver* flush_port, + tbb::flow::receiver* input_port) : + layer_{std::move(layer)}, broadcaster_{g}, flusher_{g} + { + make_edge(broadcaster_, *input_port); + make_edge(flusher_, *flush_port); + } - detail::multilayer_slot::multilayer_slot(tbb::flow::graph& g, - identifier layer, - tbb::flow::receiver* flush_port, - tbb::flow::receiver* input_port) : - layer_{std::move(layer)}, broadcaster_{g}, flusher_{g} - { - make_edge(broadcaster_, *input_port); - make_edge(flusher_, *flush_port); - } + void multilayer_slot::put_message(data_cell_index_ptr const& index, std::size_t message_id) + { + if (layer_ == index->layer_name()) { + broadcaster_.try_put({.index = index, .msg_id = message_id, .cache = false}); + return; + } - void detail::multilayer_slot::put_message(data_cell_index_ptr const& index, - std::size_t message_id) - { - if (layer_ == index->layer_name()) { - broadcaster_.try_put({.index = index, .msg_id = message_id, .cache = false}); - return; + broadcaster_.try_put({.index = index->parent(layer_), .msg_id = message_id}); } - // Flush values are only used for indices that are *not* the "lowest" in the branch - // of the hierarchy. - ++counter_; - broadcaster_.try_put({.index = index->parent(layer_), .msg_id = message_id}); - } + void multilayer_slot::put_end_token(data_cell_index_ptr const& index, flush_gate const& fc) + { + // We're going to have to be a little more careful about this. The committed total count may + // not be enough granularity for some downstream nodes. + flusher_.try_put({.index = index, .count = static_cast(fc.committed_total_count())}); + } - void detail::multilayer_slot::put_end_token(data_cell_index_ptr const& index) - { - auto count = std::exchange(counter_, 0); - if (count == 0) { - // See comment above about flush values - return; + bool multilayer_slot::matches_exactly(std::string const& layer_path) const + { + return layer_path.ends_with(delimited_layer_path(static_cast(layer_))); } - flusher_.try_put({.index = index, .count = count}); + bool multilayer_slot::is_parent_of(data_cell_index_ptr const& index) const + { + return index->parent(layer_) != nullptr; + } } - bool detail::multilayer_slot::matches_exactly(std::string const& layer_path) const + //======================================================================================== + // index_router implementation + index_router::index_router(tbb::flow::graph& g) : + index_receiver_{g, + tbb::flow::unlimited, + [this](index_message const& msg) -> data_cell_index_ptr { + auto const& [index, message_id, _] = msg; + assert(index); + return route(index, index_is_lowest_layer(index), message_id); + }}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](unfold_flush input) -> tbb::flow::continue_msg { + auto&& [index, layer_hash, count] = input; + apply_expected_count(*gate_for(index), layer_hash, count); + // Because the flush receiver receives flush values, the index cannot + // correspond to a lowest layer. + flush_if_done(index); + return {}; + }}, + flusher_{g} { - return layer_path.ends_with(delimited_layer_path(static_cast(layer_))); } - bool detail::multilayer_slot::is_parent_of(data_cell_index_ptr const& index) const + void index_router::establish_layers( + std::vector> const& layer_paths_from_driver, + std::vector unfold_input_layer_names, + std::vector unfold_output_layer_names) { - return index->parent(layer_) != nullptr; - } - - //======================================================================================== - // index_router implementation + auto sorted_layer_paths = layer_paths_from_driver; + std::ranges::sort(sorted_layer_paths); + + // In sorted order, a path can only be a prefix of paths that follow it. + for (std::size_t i = 0; i < sorted_layer_paths.size(); ++i) { + bool const is_not_lowest_layer = + i + 1 < sorted_layer_paths.size() and + is_strict_prefix(sorted_layer_paths[i], sorted_layer_paths[i + 1]); + if (is_not_lowest_layer) { + auto const layer_hash = layer_hash_for_path(sorted_layer_paths[i]); + is_lowest_layer_hashes_.emplace(layer_hash, false); + } + } - index_router::index_router(tbb::flow::graph& g) : flusher_{g} {} + unfold_input_layer_names_ = unfold_input_layer_names; + unfold_output_layer_names_ = unfold_output_layer_names; + } void index_router::finalize(tbb::flow::graph& g, provider_input_ports_t provider_input_ports, - std::map multilayers) + std::map multilayer_join_ports) { // We must have at least one provider port, or there can be no data to process. assert(!provider_input_ports.empty()); - provider_input_ports_ = std::move(provider_input_ports); // Create the index-set broadcast nodes for providers - for (auto& [pq, provider_port] : provider_input_ports_ | std::views::values) { - auto [it, _] = broadcasters_.try_emplace(static_cast(pq.layer), - std::make_shared(g)); + for (auto& [input_product, provider_port] : provider_input_ports | std::views::values) { + auto [it, _] = + index_set_nodes_.emplace(input_product.layer, std::make_shared(g)); make_edge(*it->second, *provider_port); } - for (auto const& [node_name, multilayer] : multilayers) { - spdlog::trace("Making multilayer caster for {}", node_name); - detail::multilayer_slots casters; - casters.reserve(multilayer.size()); - // FIXME: Consider whether the construction of casters can be simplied - for (auto const& [layer, flush_port, input_port] : multilayer) { - auto entry = std::make_shared(g, layer, flush_port, input_port); - casters.push_back(entry); + for (auto const& [node_name, join_ports] : multilayer_join_ports) { + spdlog::trace("Making multilayer slots for {}", node_name); + detail::multilayer_slots slots; + slots.reserve(join_ports.size()); + for (auto const& [layer, flush_port, input_port] : join_ports) { + auto slot = std::make_shared(g, layer, flush_port, input_port); + slots.push_back(slot); } - multibroadcasters_.try_emplace(identifier{node_name}, std::move(casters)); + multilayer_join_slots_.emplace(identifier{node_name}, std::move(slots)); } } - data_cell_index_ptr index_router::route(data_cell_index_ptr const index) + data_cell_index_ptr index_router::route(data_cell_index_ptr const index, index_flushes flushes) { - backout_to(index); + update_flush_counts(std::move(flushes)); + return route(index, index_is_lowest_layer(index), received_indices_.fetch_add(1)); + } - auto message_id = received_indices_.fetch_add(1); + data_cell_index_ptr index_router::route(data_cell_index_ptr index, + bool const is_lowest_layer, + std::size_t const message_id) + { + if (auto index_set_node = index_set_node_for(index)) { + index_set_node->try_put({.index = index, .msg_id = message_id}); + } - send_to_provider_index_nodes(index, message_id); - auto const& slots_for_layer = send_to_multilayer_join_nodes(index, message_id); + auto [message_slots, end_token_slots] = multilayer_slots_for(index); + for (auto const& slot : *message_slots) { + slot->put_message(index, message_id); + } + + // Lowest-layer indices have no flush gate and contribute to their parent's readiness + // solely through the expected-count message that announced them — there is nothing + // to do here for them. + if (is_lowest_layer) { + return index; + } - layers_.emplace(counters_, flusher_, slots_for_layer, index, message_id); + gate_for(index)->set_flush_callback( + [this, end_token_slots = std::move(end_token_slots), index, message_id]( + flush_gate const& fc) { + for (auto const& slot : *end_token_slots) { + slot->put_end_token(index, fc); + } + + // Used only for folds, until folds use the slot infrastructure above. + flusher_.try_put({index, fc.committed_counts(), message_id}); + }); + + flush_if_done(index); return index; } - void index_router::backout_to(data_cell_index_ptr const index) + void index_router::drain(index_flushes flushes) { update_flush_counts(std::move(flushes)); } + + void index_router::register_unfold_count_per_input_layer(std::map counts) + { + // Called once during finalize(), before any indices are routed, so no concurrent access. + unfold_count_per_input_layer_ = std::move(counts); + } + + bool index_router::index_is_lowest_layer(data_cell_index_ptr const& index) { - assert(index); - auto const new_depth = index->depth(); - while (not empty(layers_) and new_depth <= layers_.top().depth()) { - layers_.pop(); + auto it = is_lowest_layer_hashes_.find(index->layer_hash()); + if (it != is_lowest_layer_hashes_.end()) { + return it->second; } + + if (std::ranges::contains(unfold_input_layer_names_, index->layer_name())) { + // FIXME: Need to make sure that the index is a child of existing layers + return is_lowest_layer_hashes_.emplace(index->layer_hash(), false).first->second; + } + + if (std::ranges::contains(unfold_output_layer_names_, index->layer_name())) { + return is_lowest_layer_hashes_.emplace(index->layer_hash(), true).first->second; + } + + // If the index is neither and input or an output to an unfold, it is assumed to be a lowest layer. + return is_lowest_layer_hashes_.emplace(index->layer_hash(), true).first->second; } - void index_router::drain() + detail::index_set_node_ptr index_router::index_set_node_for(data_cell_index_ptr const& index) { - while (not empty(layers_)) { - layers_.pop(); + auto const layer_hash = index->layer_hash(); + if (auto it = index_set_node_cache_.find(layer_hash); it != index_set_node_cache_.end()) { + return it->second; } + + std::string const layerish_path{static_cast(index->layer_name())}; + auto broadcaster = index_set_node_for(layerish_path); + index_set_node_cache_.insert({layer_hash, broadcaster}); + return broadcaster; } - void index_router::send_to_provider_index_nodes(data_cell_index_ptr const& index, - std::size_t const message_id) + auto index_router::index_set_node_for(std::string const& layer_path) -> detail::index_set_node_ptr { - if (auto it = matched_broadcasters_.find(index->layer_hash()); - it != matched_broadcasters_.end()) { - // Not all layers will have a corresponding broadcaster - if (it->second) { - it->second->try_put({.index = index, .msg_id = message_id}); + std::string const search_token = delimited_layer_path(layer_path); + + std::vector candidates; + for (auto it = index_set_nodes_.begin(), e = index_set_nodes_.end(); it != e; ++it) { + if (search_token.ends_with(delimited_layer_path(static_cast(it->first)))) { + candidates.push_back(it); } - return; } - std::string const layerish_path{static_cast(index->layer_name())}; - auto broadcaster = index_node_for(layerish_path); - if (broadcaster) { - broadcaster->try_put({.index = index, .msg_id = message_id}); + if (candidates.size() == 1ull) { + return candidates[0]->second; } - // We cache the result of the lookup even if there is no broadcaster for this layer, - // to avoid repeated lookups for layers that don't have broadcasters. - matched_broadcasters_.try_emplace(index->layer_hash(), broadcaster); + + if (candidates.empty()) { + return nullptr; + } + + std::string msg = fmt::format("Multiple layers match specification {}:\n", layer_path); + for (auto const& it : candidates) { + msg += fmt::format("\n- {}", it->first); + } + throw std::runtime_error(msg); } - detail::multilayer_slots const& index_router::send_to_multilayer_join_nodes( - data_cell_index_ptr const& index, std::size_t const message_id) + std::pair + index_router::multilayer_slots_for(data_cell_index_ptr const& index) { auto const layer_hash = index->layer_hash(); - if (auto it = matched_routing_entries_.find(layer_hash); it != matched_routing_entries_.end()) { - send_messages(index, message_id, it->second); - return matched_flushing_entries_.find(layer_hash)->second; + // Fast path: shared lock allows concurrent reads of cached entries. + { + multilayer_slot_cache_const_accessor acc; + if (multilayer_slot_cache_.find(acc, layer_hash)) { + return {acc->second.message_slots, acc->second.end_token_slots}; + } } - auto routing_it = matched_routing_entries_.try_emplace(layer_hash).first; - auto flushing_it = matched_flushing_entries_.try_emplace(layer_hash).first; + // Slow path: exclusive lock serializes concurrent cache misses for the same layer. + multilayer_slot_cache_accessor acc; + auto const inserted = multilayer_slot_cache_.insert(acc, layer_hash); + if (not inserted) { + return {acc->second.message_slots, acc->second.end_token_slots}; + } auto const layer_path = index->layer_path(); + detail::multilayer_slots message_slots; + detail::multilayer_slots end_token_slots; // For each multi-layer join node, determine which slots are relevant to this index. - // Routing entries: All slots from a node are added if (1) at least one slot exactly + // Message entries: All slots from a node are added if (1) at least one slot exactly // matches the current layer, and (2) all slots either exactly match // or are parent layers of the current index. - // Flushing entries: Only slots that exactly match the current layer are added. - for (auto& [node_name, slots] : multibroadcasters_) { + // End-token entries: Only slots that exactly match the current layer are added. + for (auto& [node_name, slots] : multilayer_join_slots_) { detail::multilayer_slots matching_slots; matching_slots.reserve(slots.size()); @@ -236,7 +325,7 @@ namespace phlex::experimental { for (auto& slot : slots) { if (slot->matches_exactly(layer_path)) { has_exact_match = true; - flushing_it->second.push_back(slot); + end_token_slots.push_back(slot); matching_slots.push_back(slot); ++matched_count; } else if (slot->is_parent_of(index)) { @@ -245,41 +334,117 @@ namespace phlex::experimental { } } - // Add all matching slots to routing entries only if we have an exact match and + // Add all matching slots to message entries only if we have an exact match and // all slots from this node matched something (either exactly or as a parent). if (has_exact_match and matched_count == slots.size()) { - routing_it->second.insert(routing_it->second.end(), - std::make_move_iterator(matching_slots.begin()), - std::make_move_iterator(matching_slots.end())); + message_slots.insert(message_slots.end(), + std::make_move_iterator(matching_slots.begin()), + std::make_move_iterator(matching_slots.end())); } } - send_messages(index, message_id, routing_it->second); - return flushing_it->second; + + acc->second.message_slots = + std::make_shared(std::move(message_slots)); + acc->second.end_token_slots = + std::make_shared(std::move(end_token_slots)); + return {acc->second.message_slots, acc->second.end_token_slots}; } - auto index_router::index_node_for(std::string const& layer_path) -> detail::index_set_node_ptr + void index_router::update_flush_counts(index_flushes flushes) { - std::string const search_token = delimited_layer_path(layer_path); - - std::vector candidates; - for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) { - if (search_token.ends_with(delimited_layer_path(static_cast(it->first)))) { - candidates.push_back(it); + for (auto const& [index, flush_counts] : flushes) { + auto gate = gate_for(index); + for (auto const& [child_layer_hash, count] : *flush_counts) { + apply_expected_count(*gate, child_layer_hash, count.load()); } + flush_if_done(index); } + } - if (candidates.size() == 1ull) { - return candidates[0]->second; + void index_router::apply_expected_count(flush_gate& gate, + data_cell_index::hash_type const child_layer_hash, + std::size_t const count) + { + // Non-lowest children contribute to the parent's readiness via rollup + // (roll_up_child() called from flush_if_done()). Lowest-layer children need no + // further accounting: their full count is already reflected in the expected-count + // message and will be merged into committed_counts_ at commit time. + // + // The pending counter must be bumped BEFORE update_expected_count() increments + // received_flush_count_; otherwise a concurrent all_children_accounted() call could + // observe a positive received count while the pending counter is still at its + // pre-bump value (which may be at or below zero from earlier rollup notifications) + // and erroneously declare the tracker ready. + if (not is_lowest_layer_hash(child_layer_hash)) { + gate.expect_child_rollups(static_cast(count)); } + gate.update_expected_count(child_layer_hash, count); + } - if (candidates.empty()) { - return nullptr; + bool index_router::is_lowest_layer_hash(std::size_t const layer_hash) const + { + auto it = is_lowest_layer_hashes_.find(layer_hash); + return it != is_lowest_layer_hashes_.end() ? it->second : true; + } + + flush_gate_ptr index_router::gate_for(data_cell_index_ptr const& index) + { + // Fast path: entry already exists — read under shared lock to avoid serializing threads. + const_accessor ca; + if (flush_gates_.find(ca, index->hash())) { + return ca->second; } + ca.release(); + + // Slow path: insert a new entry under exclusive lock. + accessor a; + if (flush_gates_.insert(a, index->hash())) { + // Newly inserted — initialize the value. + // If multiple unfolds consume this layer, the gate must wait for a flush message + // from each of them before it can evaluate done(). Without this, the first unfold to + // finish could cause the gate to fire before the others have reported their counts. + std::size_t const expected_flush_count = [&]() -> std::size_t { + auto it = unfold_count_per_input_layer_.find(index->layer_name()); + return it != unfold_count_per_input_layer_.end() ? it->second : 0; + }(); + a->second = std::make_shared(index, expected_flush_count); + } + return a->second; + } - std::string msg = fmt::format("Multiple layers match specification {}:\n", layer_path); - for (auto const& it : candidates) { - msg += fmt::format("\n- {}", it->first); + void index_router::flush_if_done(data_cell_index_ptr index) + { + assert(index); + + while (index) { + // Erase the entry while holding the exclusive accessor, then release the lock before + // calling send_flush(). The erase claims exclusive ownership of this gate — + // any concurrent flush_if_done call for the same index will fail to find the entry + // and return immediately, preventing double-flush. + flush_gate_ptr gate; + { + accessor a; + if (not flush_gates_.find(a, index->hash())) { + // This can happen when two threads process the same parent index, + // and one of them releases it before the other completes. + return; + } + + if (not a->second->all_children_accounted()) { + return; + } + + gate = a->second; + flush_gates_.erase(a); + } + + gate->send_flush(); + + auto next = index->parent(); + if (next) { + gate_for(next)->roll_up_child(gate->committed_counts()); + } + index = next; } - throw std::runtime_error(msg); } } diff --git a/phlex/core/index_router.hpp b/phlex/core/index_router.hpp index 4dc7f08f..6a31420d 100644 --- a/phlex/core/index_router.hpp +++ b/phlex/core/index_router.hpp @@ -5,19 +5,18 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/message.hpp" -#include "phlex/model/data_cell_counter.hpp" #include "phlex/model/data_cell_index.hpp" +#include "phlex/model/flush_gate.hpp" +#include "phlex/model/flush_messages.hpp" #include "phlex/model/identifier.hpp" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_unordered_map.h" #include "oneapi/tbb/flow_graph.h" -#include #include #include -#include -#include #include -#include #include namespace phlex::experimental { @@ -32,55 +31,9 @@ namespace phlex::experimental { // join operation. It: // (a) routes index messages to either the matching layer or its data-layer parent, and // (b) emits flush tokens to the repeater to evict a cached data product from memory. - class PHLEX_CORE_EXPORT multilayer_slot { - public: - multilayer_slot(tbb::flow::graph& g, - identifier layer, - tbb::flow::receiver* flush_port, - tbb::flow::receiver* input_port); - - void put_message(data_cell_index_ptr const& index, std::size_t message_id); - void put_end_token(data_cell_index_ptr const& index); - - bool matches_exactly(std::string const& layer_path) const; - bool is_parent_of(data_cell_index_ptr const& index) const; - - private: - identifier layer_; - detail::index_set_node broadcaster_; - detail::flush_node flusher_; - int counter_ = 0; - }; - + class multilayer_slot; using multilayer_slots = std::vector>; - - // A layer_scope object is an RAII object that manages layer-scoped operations during - // data-cell-index routing. It updates flush counters on construction and ensures cleanup - // (flushing end tokens and releasing fold results) on destruction. - class PHLEX_CORE_EXPORT layer_scope { - public: - layer_scope(flush_counters& counters, - flusher_t& flusher, - multilayer_slots const& slots_for_layer, - data_cell_index_ptr index, - std::size_t message_id); - ~layer_scope(); - layer_scope(layer_scope const&) = delete; - layer_scope& operator=(layer_scope const&) = delete; - layer_scope(layer_scope&&) = delete; - layer_scope& operator=(layer_scope&&) = delete; - std::size_t depth() const; - - private: - // Non-owning references to externally-owned state; layer_scope is an RAII guard. - // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) - flush_counters& counters_; - flusher_t& flusher_; - multilayer_slots const& slots_; - // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) - data_cell_index_ptr index_; - std::size_t message_id_; - }; + using multilayer_slots_ptr = std::shared_ptr; } class PHLEX_CORE_EXPORT index_router { @@ -101,49 +54,97 @@ namespace phlex::experimental { using provider_input_ports_t = std::map; explicit index_router(tbb::flow::graph& g); - data_cell_index_ptr route(data_cell_index_ptr index); + data_cell_index_ptr route(data_cell_index_ptr index, index_flushes flushes); + + void establish_layers(std::vector> const& layer_paths_from_driver, + std::vector unfold_input_layer_names, + std::vector unfold_output_layer_names); + + // Registers how many unfolds produce children from each input layer. Must be called + // before execution so that flush_gates are initialized with the correct expected + // child count when they are first created. + void register_unfold_count_per_input_layer(std::map counts); void finalize(tbb::flow::graph& g, provider_input_ports_t provider_input_ports, - std::map multilayers); - void drain(); + std::map multilayer_join_ports); + void drain(index_flushes flushes); flusher_t& flusher() { return flusher_; } - private: - void backout_to(data_cell_index_ptr store); - void send_to_provider_index_nodes(data_cell_index_ptr const& index, std::size_t message_id); - detail::multilayer_slots const& send_to_multilayer_join_nodes(data_cell_index_ptr const& index, - std::size_t message_id); - detail::index_set_node_ptr index_node_for(std::string const& layer); + tbb::flow::function_node& index_receiver() + { + return index_receiver_; + } + tbb::flow::function_node& flush_receiver() { return flush_receiver_; } - provider_input_ports_t provider_input_ports_; + private: + data_cell_index_ptr route(data_cell_index_ptr index, + bool is_lowest_layer, + std::size_t message_id); + bool index_is_lowest_layer(data_cell_index_ptr const& index); + // Hash-only lookup, intended for classifying child layer hashes that arrive in flush + // messages (where only the hash is available, not a data_cell_index). Returns the + // cached classification when known; defaults to lowest for unknown hashes, which is + // correct for unfold outputs (the only source of unknown hashes) and consistent with + // index_is_lowest_layer()'s fall-through default. + bool is_lowest_layer_hash(std::size_t layer_hash) const; + detail::index_set_node_ptr index_set_node_for(std::string const& layer); + detail::index_set_node_ptr index_set_node_for(data_cell_index_ptr const& index); + std::pair multilayer_slots_for( + data_cell_index_ptr const& index); + void update_flush_counts(index_flushes flushes); + void apply_expected_count(flush_gate& gate, + data_cell_index::hash_type child_layer_hash, + std::size_t count); + flush_gate_ptr gate_for(data_cell_index_ptr const& index); + void flush_if_done(data_cell_index_ptr index); + + tbb::flow::function_node index_receiver_; + tbb::flow::function_node flush_receiver_; std::atomic received_indices_{}; flusher_t flusher_; - flush_counters counters_; - std::stack layers_; + tbb::concurrent_unordered_map is_lowest_layer_hashes_; + std::vector unfold_input_layer_names_; + std::vector unfold_output_layer_names_; // ========================================================================================== // Routing to provider nodes // The following maps are used to route data-cell indices to provider nodes. - // The first map is from layer name to the corresponding broadcaster node. - std::unordered_map broadcasters_; - // The second map is a cache from a layer hash matched to a broadcaster node, to avoid + // The first map is from layer name to the corresponding index-set node. + tbb::concurrent_unordered_map index_set_nodes_; + // The second map is a cache from a layer hash to an index-set node, to avoid // repeated lookups for the same layer. - std::unordered_map matched_broadcasters_; + tbb::concurrent_unordered_map index_set_node_cache_; // ========================================================================================== // Routing to multi-layer join nodes - // The first map is from the node name to the corresponding broadcaster nodes and flush - // nodes. - std::unordered_map multibroadcasters_; - // The second map is a cache from a layer hash matched to a set of multilayer slots, to - // avoid repeated lookups for the same layer. - std::unordered_map matched_routing_entries_; - // The third map is a cache from a layer hash matched to a set of multilayer slots for the - // purposes of flushing, to avoid repeated lookups for the same layer during flushing. - std::unordered_map matched_flushing_entries_; - }; + // Maps from join-node name to the multilayer slots for that node. + tbb::concurrent_unordered_map multilayer_join_slots_; + + // This struct lets get_multilayer_slots return message and end-token slots together, + // instead of passing concurrent_hash_map accessors as output parameters. + struct multilayer_slot_cache_entry { + detail::multilayer_slots_ptr message_slots; + detail::multilayer_slots_ptr end_token_slots; + }; + // Cache from layer hash to matched message/end-token slots for that layer. + using multilayer_slot_cache_t = + tbb::concurrent_hash_map; + using multilayer_slot_cache_accessor = multilayer_slot_cache_t::accessor; + using multilayer_slot_cache_const_accessor = multilayer_slot_cache_t::const_accessor; + multilayer_slot_cache_t multilayer_slot_cache_; + // ========================================================================================== + // Flush gates (data-cell index hash is the key) + using gates_t = tbb::concurrent_hash_map; + using accessor = gates_t::accessor; + using const_accessor = gates_t::const_accessor; + gates_t flush_gates_; + + // Number of unfolds that will send flush messages for each input layer. Used to + // initialize flush_gates with the correct expected child count. + std::map unfold_count_per_input_layer_; + }; } #endif // PHLEX_CORE_INDEX_ROUTER_HPP diff --git a/phlex/core/message.hpp b/phlex/core/message.hpp index 30d0c5f5..e2414f05 100644 --- a/phlex/core/message.hpp +++ b/phlex/core/message.hpp @@ -36,7 +36,7 @@ namespace phlex::experimental { struct flush_message { data_cell_index_ptr index; - flush_counts_ptr counts; + data_cell_counts_const_ptr counts; std::size_t original_id{}; // FIXME: Used only by folds }; diff --git a/phlex/core/store_counters.cpp b/phlex/core/store_counters.cpp index 9938a20c..eade0e86 100644 --- a/phlex/core/store_counters.cpp +++ b/phlex/core/store_counters.cpp @@ -1,6 +1,6 @@ #include "phlex/core/store_counters.hpp" #include "phlex/core/message.hpp" -#include "phlex/model/data_cell_counter.hpp" +#include "phlex/model/data_cell_counts.hpp" #include "fmt/std.h" #include "spdlog/spdlog.h" @@ -9,7 +9,7 @@ namespace phlex::experimental { - void store_counter::set_flush_value(flush_counts_ptr counts, + void store_counter::set_flush_value(data_cell_counts_const_ptr counts, std::size_t const original_message_id) { if (not counts) { @@ -46,13 +46,12 @@ namespace phlex::experimental { // The 'counts_' data member can be empty if the flush_counts member has been filled // but none of the children stores have been processed. - if (counts_.empty() and !flush_counts->empty()) { + if (counts_.empty() and flush_counts->size() > 0uz) { return false; } for (auto const& [layer_hash, count] : counts_) { - auto maybe_count = flush_counts->count_for(layer_hash); - if (!maybe_count or count != *maybe_count) { + if (count != flush_counts->count(layer_hash)) { return false; } } diff --git a/phlex/core/store_counters.hpp b/phlex/core/store_counters.hpp index beba19a0..81a2b1d1 100644 --- a/phlex/core/store_counters.hpp +++ b/phlex/core/store_counters.hpp @@ -4,7 +4,6 @@ #include "phlex/phlex_core_export.hpp" #include "phlex/core/fwd.hpp" -#include "phlex/model/data_cell_counter.hpp" #include "phlex/model/data_cell_index.hpp" #include "phlex/model/product_store.hpp" @@ -18,20 +17,20 @@ namespace phlex::experimental { class PHLEX_CORE_EXPORT store_counter { public: - void set_flush_value(flush_counts_ptr counts, std::size_t original_message_id); + void set_flush_value(data_cell_counts_const_ptr counts, std::size_t original_message_id); void increment(data_cell_index::hash_type layer_hash); bool is_complete(); unsigned int original_message_id() const noexcept; private: - using counts_t2 = + using counts_t = tbb::concurrent_unordered_map>; - counts_t2 counts_{}; + counts_t counts_{}; #ifdef __cpp_lib_atomic_shared_ptr - std::atomic flush_counts_{nullptr}; + std::atomic flush_counts_{nullptr}; #else - flush_counts_ptr flush_counts_{nullptr}; + data_cell_counts_const_ptr flush_counts_{nullptr}; #endif unsigned int original_message_id_{}; // Necessary for matching inputs to downstream join nodes. std::atomic ready_to_flush_{true}; diff --git a/phlex/model/CMakeLists.txt b/phlex/model/CMakeLists.txt index 5e9bb87c..c01a8e98 100644 --- a/phlex/model/CMakeLists.txt +++ b/phlex/model/CMakeLists.txt @@ -4,9 +4,9 @@ cet_make_library( SHARED SOURCE algorithm_name.cpp - child_tracker.cpp - data_cell_counter.cpp + data_cell_counts.cpp data_cell_tracker.cpp + flush_gate.cpp fixed_hierarchy.cpp data_layer_hierarchy.cpp data_cell_index.cpp @@ -29,11 +29,12 @@ install( FILES algorithm_name.hpp fixed_hierarchy.hpp + flush_messages.hpp fwd.hpp handle.hpp - child_tracker.hpp - data_cell_counter.hpp + data_cell_counts.hpp data_cell_tracker.hpp + flush_gate.hpp data_layer_hierarchy.hpp data_cell_index.hpp identifier.hpp diff --git a/phlex/model/child_tracker.cpp b/phlex/model/child_tracker.cpp deleted file mode 100644 index 57b6225f..00000000 --- a/phlex/model/child_tracker.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include "phlex/model/child_tracker.hpp" - -#include "spdlog/spdlog.h" - -#include -#include -#include -#include - -namespace phlex::experimental { - - child_tracker::child_tracker(data_cell_index_ptr index, std::size_t expected_flush_count) : - index_{std::move(index)}, expected_flush_count_{expected_flush_count} - { - } - - std::size_t child_tracker::expected_total_count() const - { - return std::ranges::fold_left(expected_counts_ | std::views::values, 0uz, std::plus{}); - } - - std::size_t child_tracker::processed_total_count() const - { - return std::ranges::fold_left(processed_counts_ | std::views::values, 0uz, std::plus{}); - } - - std::size_t child_tracker::committed_total_count() const - { - return std::ranges::fold_left(committed_counts_ | std::views::values, 0uz, std::plus{}); - } - - std::size_t child_tracker::committed_count_for_layer( - data_cell_index::hash_type const layer_hash) const - { - return committed_counts_.count(layer_hash); - } - - void child_tracker::update_committed_counts(data_cell_counts const& committed_counts) - { - for (auto const& [layer_hash, count] : committed_counts) { - committed_counts_.add_to(layer_hash, count); - } - } - - void child_tracker::update_expected_counts(data_cell_counts const& expected_counts) - { - for (auto const& [layer_hash, count] : expected_counts) { - expected_counts_.add_to(layer_hash, count); - } - ++received_flush_count_; - } - - void child_tracker::update_expected_count(data_cell_index::hash_type const layer_hash, - std::size_t const count) - { - expected_counts_.add_to(layer_hash, count); - ++received_flush_count_; - } - - void child_tracker::send_flush() - { - if (flush_callback_) { - flush_callback_(*this); - } else { - spdlog::warn("No flush callback set for index: {}", index_->to_string()); - } - } - - bool child_tracker::all_children_accounted() - { - auto const received = received_flush_count_.load(); - if (received == 0) { - return false; - } - - // Block until all flush counts expected from unfolds have arrived so that expected_counts_ - // reflects the union of all child layers. - if (expected_flush_count_ > 0 and received < expected_flush_count_) { - return false; - } - - // All expected flush messages have arrived; check that processed child counts match. - bool const result = std::ranges::all_of(expected_counts_, [this](auto const& entry) { - auto const& [layer_hash, expected] = entry; - return processed_counts_.count(layer_hash) == expected.load(); - }); - - if (result) { - std::call_once(commit_once_, [this] { commit(); }); - } - - return result; - } - - void child_tracker::commit() - { - for (auto const& [layer_hash, count] : processed_counts_) { - committed_counts_.add_to(layer_hash, count.load()); - } - - // At some point, we might consider clearing the processed_counts_ and expected_counts_ maps - // to free memory, but for now we can just leave them as-is since the child_tracker will - // likely be destroyed soon after commit() is called. - } - -} // namespace phlex::experimental diff --git a/phlex/model/child_tracker.hpp b/phlex/model/child_tracker.hpp deleted file mode 100644 index 9afa267f..00000000 --- a/phlex/model/child_tracker.hpp +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef PHLEX_MODEL_CHILD_TRACKER_HPP -#define PHLEX_MODEL_CHILD_TRACKER_HPP - -#include "phlex/model/data_cell_index.hpp" -#include "phlex/model/data_cell_tracker.hpp" -#include "phlex/phlex_model_export.hpp" - -#include -#include -#include -#include -#include - -namespace phlex::experimental { - - class PHLEX_MODEL_EXPORT child_tracker { - using flush_callback_t = std::function; - - public: - // expected_flush_count controls how many update_expected_count[s]() calls must arrive before - // all_children_accounted() can return true. A value of 0 means a single call is sufficient - // (the common case when only one unfold consumes this index's layer). A value greater than 1 - // means that multiple unfolds produce children from the same parent layer. - explicit child_tracker(data_cell_index_ptr index, std::size_t expected_flush_count); - - data_cell_index_ptr const index() const { return index_; } - std::size_t expected_total_count() const; - std::size_t processed_total_count() const; - std::size_t committed_total_count() const; - std::size_t committed_count_for_layer(data_cell_index::hash_type layer_hash) const; - data_cell_counts const& committed_counts() const { return committed_counts_; } - - // Merges expected_counts into the accumulated expected counts. Each call represents one - // flush message arriving (e.g. one unfold completing for this index). - void update_expected_counts(data_cell_counts const& expected_counts); - // Single-entry variant used when an unfold reports its child count directly (no map needed). - void update_expected_count(data_cell_index::hash_type layer_hash, std::size_t count); - void update_committed_counts(data_cell_counts const& committed_counts); - void increment(data_cell_index::hash_type const layer_hash) - { - processed_counts_.increment(layer_hash); - } - - void set_flush_callback(flush_callback_t callback) { flush_callback_ = std::move(callback); } - void send_flush(); - bool all_children_accounted(); - - private: - void commit(); - - data_cell_index_ptr const index_; - std::once_flag commit_once_; - data_cell_counts committed_counts_; - data_cell_counts processed_counts_; - // Accumulated expected child counts from all unfolds. - data_cell_counts expected_counts_; - std::atomic received_flush_count_{0}; - // Number of flush messages expected from unfolds. Zero means any single - // update_expected_count[s]() call is sufficient to unblock all_children_accounted(). - std::size_t expected_flush_count_{0}; - flush_callback_t flush_callback_; - }; - - using child_tracker_ptr = std::shared_ptr; - -} // namespace phlex::experimental - -#endif // PHLEX_MODEL_CHILD_TRACKER_HPP diff --git a/phlex/model/data_cell_counter.cpp b/phlex/model/data_cell_counter.cpp deleted file mode 100644 index a7e51b52..00000000 --- a/phlex/model/data_cell_counter.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include "phlex/model/data_cell_counter.hpp" -#include "phlex/model/identifier.hpp" -#include "phlex/utilities/hashing.hpp" - -#include - -namespace phlex::experimental { - using namespace phlex::experimental::literals; - - flush_counts::flush_counts() = default; - - flush_counts::flush_counts(std::map child_counts) : - child_counts_{std::move(child_counts)} - { - } - - data_cell_counter::data_cell_counter() : data_cell_counter{nullptr, "job"_id} {} - - data_cell_counter::data_cell_counter(data_cell_counter* parent, identifier const& layer_name) : - parent_{parent}, - layer_hash_{parent_ ? hash(parent->layer_hash_, layer_name.hash()) : layer_name.hash()} - { - } - - data_cell_counter::~data_cell_counter() - { - if (parent_) { - parent_->adjust(*this); - } - } - - data_cell_counter data_cell_counter::make_child(identifier const& layer_name) - { - return {this, layer_name}; - } - - void data_cell_counter::adjust(data_cell_counter& child) - { - auto it2 = child_counts_.find(child.layer_hash_); - if (it2 == cend(child_counts_)) { - it2 = child_counts_.try_emplace(child.layer_hash_, 0).first; - } - ++it2->second; - for (auto const& [nested_layer_hash, count] : child.child_counts_) { - child_counts_[nested_layer_hash] += count; - } - } - - void flush_counters::update(data_cell_index_ptr const id) - { - data_cell_counter* parent_counter = nullptr; - if (auto parent = id->parent()) { - auto it = counters_.find(parent->hash()); - assert(it != counters_.cend()); - parent_counter = it->second.get(); - } - counters_[id->hash()] = std::make_shared(parent_counter, id->layer_name()); - } - - flush_counts flush_counters::extract(data_cell_index_ptr const id) - { - auto counter = counters_.extract(id->hash()); - return counter.mapped()->result(); - } -} diff --git a/phlex/model/data_cell_counter.hpp b/phlex/model/data_cell_counter.hpp deleted file mode 100644 index f93389dd..00000000 --- a/phlex/model/data_cell_counter.hpp +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef PHLEX_MODEL_DATA_CELL_COUNTER_HPP -#define PHLEX_MODEL_DATA_CELL_COUNTER_HPP - -#include "phlex/phlex_model_export.hpp" - -#include "phlex/model/data_cell_index.hpp" -#include "phlex/model/fwd.hpp" -#include "phlex/model/identifier.hpp" - -#include "oneapi/tbb/concurrent_hash_map.h" - -#include -#include -#include - -namespace phlex::experimental { - class PHLEX_MODEL_EXPORT flush_counts { - public: - flush_counts(); - explicit flush_counts(std::map child_counts); - - auto begin() const { return child_counts_.begin(); } - auto end() const { return child_counts_.end(); } - bool empty() const { return child_counts_.empty(); } - auto size() const { return child_counts_.size(); } - - std::optional count_for(data_cell_index::hash_type const layer_hash) const - { - if (auto it = child_counts_.find(layer_hash); it != child_counts_.end()) { - return it->second; - } - return std::nullopt; - } - - private: - std::map child_counts_{}; - }; - - class PHLEX_MODEL_EXPORT data_cell_counter { - public: - data_cell_counter(); - data_cell_counter(data_cell_counter* parent, identifier const& layer_name); - ~data_cell_counter(); - data_cell_counter(data_cell_counter const&) = delete; - data_cell_counter& operator=(data_cell_counter const&) = delete; - data_cell_counter(data_cell_counter&&) = delete; - data_cell_counter& operator=(data_cell_counter&&) = delete; - - data_cell_counter make_child(identifier const& layer_name); - flush_counts result() const - { - if (empty(child_counts_)) { - return flush_counts{}; - } - return flush_counts{child_counts_}; - } - - private: - void adjust(data_cell_counter& child); - - data_cell_counter* parent_; - data_cell_index::hash_type layer_hash_; - std::map child_counts_{}; - }; - - class PHLEX_MODEL_EXPORT flush_counters { - public: - void update(data_cell_index_ptr const id); - flush_counts extract(data_cell_index_ptr const id); - - private: - std::map> counters_; - }; -} - -#endif // PHLEX_MODEL_DATA_CELL_COUNTER_HPP diff --git a/phlex/model/data_cell_counts.cpp b/phlex/model/data_cell_counts.cpp new file mode 100644 index 00000000..f84086a8 --- /dev/null +++ b/phlex/model/data_cell_counts.cpp @@ -0,0 +1,8 @@ +#include "phlex/model/data_cell_counts.hpp" + +namespace phlex::experimental { + void data_cell_counts::emplace(std::size_t layer_hash, std::size_t value) + { + map_.emplace(layer_hash, value); + } +} diff --git a/phlex/model/data_cell_counts.hpp b/phlex/model/data_cell_counts.hpp new file mode 100644 index 00000000..4216340e --- /dev/null +++ b/phlex/model/data_cell_counts.hpp @@ -0,0 +1,36 @@ +#ifndef PHLEX_MODEL_DATA_CELL_COUNTS_HPP +#define PHLEX_MODEL_DATA_CELL_COUNTS_HPP + +#include "phlex/phlex_model_export.hpp" + +#include "oneapi/tbb/concurrent_unordered_map.h" + +#include +#include +#include + +namespace phlex::experimental { + class PHLEX_MODEL_EXPORT data_cell_counts { + public: + void emplace(std::size_t layer_hash, std::size_t value); + + void increment(std::size_t layer_hash) { ++map_[layer_hash]; } + void add_to(std::size_t layer_hash, std::size_t value) { map_[layer_hash] += value; } + + auto begin() const { return map_.begin(); } + auto end() const { return map_.end(); } + + auto size() const { return map_.size(); } + + std::size_t count(std::size_t layer_hash) const + { + auto it = map_.find(layer_hash); + return it != map_.end() ? it->second.load() : 0; + } + + private: + tbb::concurrent_unordered_map> map_; + }; +} + +#endif // PHLEX_MODEL_DATA_CELL_COUNTS_HPP diff --git a/phlex/model/data_cell_tracker.cpp b/phlex/model/data_cell_tracker.cpp index a82b7bfb..70a05f1f 100644 --- a/phlex/model/data_cell_tracker.cpp +++ b/phlex/model/data_cell_tracker.cpp @@ -16,16 +16,6 @@ namespace { } namespace phlex::experimental { - - // ========================================================================================= - // data_cell_counts implementation - void data_cell_counts::emplace(std::size_t layer_hash, std::size_t value) - { - map_.emplace(layer_hash, value); - } - - // ========================================================================================= - // data_cell_tracker implementation data_cell_tracker::~data_cell_tracker() { if (pending_flushes_.empty()) { @@ -40,7 +30,8 @@ namespace phlex::experimental { } } - index_flushes data_cell_tracker::closeout(data_cell_index_ptr const& received_index) + index_flushes data_cell_tracker::report_and_evict_ready_flushes( + data_cell_index_ptr const& received_index) { // Always update the cached index. The logic below uses the previous cached index to // determine what flushes to emit. diff --git a/phlex/model/data_cell_tracker.hpp b/phlex/model/data_cell_tracker.hpp index ba37574f..ac2ca614 100644 --- a/phlex/model/data_cell_tracker.hpp +++ b/phlex/model/data_cell_tracker.hpp @@ -4,66 +4,11 @@ #include "phlex/phlex_model_export.hpp" #include "phlex/model/data_cell_index.hpp" +#include "phlex/model/flush_messages.hpp" -#include "oneapi/tbb/concurrent_unordered_map.h" - -#include -#include #include -#include -#include namespace phlex::experimental { - class PHLEX_MODEL_EXPORT data_cell_counts { - public: - void emplace(std::size_t layer_hash, std::size_t value); - - void increment(data_cell_index::hash_type layer_hash) { ++map_[layer_hash]; } - void add_to(std::size_t layer_hash, std::size_t value) { map_[layer_hash] += value; } - - auto begin() const { return map_.begin(); } - auto end() const { return map_.end(); } - - auto size() const { return map_.size(); } - - std::size_t count(data_cell_index::hash_type layer_hash) const - { - auto it = map_.find(layer_hash); - return it != map_.end() ? it->second.load() : 0; - } - - private: - tbb::concurrent_unordered_map> map_; - }; - - using data_cell_counts_ptr = std::shared_ptr; - using data_cell_counts_const_ptr = std::shared_ptr; - - struct PHLEX_MODEL_EXPORT index_flush { - data_cell_index_ptr index; - // Ideally, the counts field should be a `data_cell_counts_const_ptr` to ensure immutability. - // However, this type is also used for incrementing counters, so it must be mutable. - data_cell_counts_ptr counts; - }; - - using index_flushes = std::vector; - - // A simpler flush message sent by an unfold to the index_router. Unlike index_flush, which - // carries a map of child counts, unfold_flush carries a single (layer_hash, count) pair - // because each unfold produces children in exactly one child layer. - struct PHLEX_MODEL_EXPORT unfold_flush { - data_cell_index_ptr index; - data_cell_index::hash_type layer_hash{}; - std::size_t count{}; - }; - - // The `closeout_then_emit` struct carries flushes that must be emitted - // (to close out already-emitted indices) before emitting `index_to_emit`. - struct PHLEX_MODEL_EXPORT closeout_then_emit { - index_flushes closeout_flushes{}; - data_cell_index_ptr index_to_emit{nullptr}; - }; - class PHLEX_MODEL_EXPORT data_cell_tracker { public: data_cell_tracker() = default; @@ -77,7 +22,7 @@ namespace phlex::experimental { // Computes and returns the set of indices whose processing is now complete, given that // the next index to be processed is `index`. A null `index` signals end-of-job and // returns all remaining pending flushes. - index_flushes closeout(data_cell_index_ptr const& index); + index_flushes report_and_evict_ready_flushes(data_cell_index_ptr const& index); private: void create_parent_count(data_cell_index_ptr const& parent, data_cell_index_ptr const& child); diff --git a/phlex/model/fixed_hierarchy.cpp b/phlex/model/fixed_hierarchy.cpp index 0508eeaa..737474dc 100644 --- a/phlex/model/fixed_hierarchy.cpp +++ b/phlex/model/fixed_hierarchy.cpp @@ -57,7 +57,8 @@ namespace { } namespace phlex { - + // ================================================================================ + // data_cell_cursor implementation data_cell_cursor::data_cell_cursor(data_cell_index_ptr index, fixed_hierarchy const& h, experimental::async_driver& d) : @@ -76,13 +77,15 @@ namespace phlex { std::string data_cell_cursor::layer_path() const { return index_->layer_path(); } + // ================================================================================ + // fixed_hierarchy implementation fixed_hierarchy::fixed_hierarchy(std::initializer_list> layer_paths) : fixed_hierarchy(std::vector>(layer_paths)) { } fixed_hierarchy::fixed_hierarchy(std::vector> layer_paths) : - layer_hashes_(std::from_range, build_hashes(layer_paths)) + layer_paths_(std::move(layer_paths)), layer_hashes_(std::from_range, build_hashes(layer_paths_)) { } diff --git a/phlex/model/fixed_hierarchy.hpp b/phlex/model/fixed_hierarchy.hpp index 627fd602..576e483d 100644 --- a/phlex/model/fixed_hierarchy.hpp +++ b/phlex/model/fixed_hierarchy.hpp @@ -52,6 +52,9 @@ namespace phlex { explicit fixed_hierarchy(std::initializer_list> layer_paths); explicit fixed_hierarchy(std::vector> layer_paths); + // Returns the layer paths for this fixed hierarchy. + auto const& layer_paths() const { return layer_paths_; } + void validate(data_cell_index_ptr const& index) const; // Yields the job-level data-cell index to the provided driver and returns a @@ -60,6 +63,7 @@ namespace phlex { data_cell_cursor yield_job(experimental::async_driver& d) const; private: + std::vector> layer_paths_; std::vector layer_hashes_; }; diff --git a/phlex/model/flush_gate.cpp b/phlex/model/flush_gate.cpp new file mode 100644 index 00000000..828d804d --- /dev/null +++ b/phlex/model/flush_gate.cpp @@ -0,0 +1,100 @@ +#include "phlex/model/flush_gate.hpp" + +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + + flush_gate::flush_gate(data_cell_index_ptr index, std::size_t expected_flush_count) : + index_{std::move(index)}, + committed_counts_{std::make_shared()}, + expected_flush_count_{expected_flush_count} + { + } + + std::size_t flush_gate::expected_total_count() const + { + return std::ranges::fold_left(expected_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t flush_gate::committed_total_count() const + { + return std::ranges::fold_left(*committed_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t flush_gate::committed_count_for_layer( + data_cell_index::hash_type const layer_hash) const + { + return committed_counts_->count(layer_hash); + } + + void flush_gate::update_expected_count(data_cell_index::hash_type const layer_hash, + std::size_t const count) + { + expected_counts_.add_to(layer_hash, count); + ++received_flush_count_; + } + + void flush_gate::roll_up_child(data_cell_counts_const_ptr child_committed_counts) + { + assert(child_committed_counts); + for (auto const& [layer_hash, count] : *child_committed_counts) { + committed_counts_->add_to(layer_hash, count); + } + --pending_child_rollups_; + } + + void flush_gate::expect_child_rollups(std::ptrdiff_t const n) { pending_child_rollups_ += n; } + + void flush_gate::send_flush() + { + assert(flush_callback_); + flush_callback_(*this); + } + + bool flush_gate::all_children_accounted() + { + // Guard against firing before the router has had a chance to set the callback. + if (not flush_callback_) { + return false; + } + + auto const received = received_flush_count_.load(); + if (received == 0) { + return false; + } + + // Block until all flush counts expected from unfolds have arrived so that expected_counts_ + // (and the pending_* counters) reflect the union of all child layers. + if (expected_flush_count_ > 0 and received < expected_flush_count_) { + return false; + } + + // Every non-lowest direct child must have rolled up its committed_counts_ into this + // gate. Lowest-layer children require no per-arrival accounting: their full count + // is supplied by the same expected-count message that announced them. + if (pending_child_rollups_ != 0) { + return false; + } + + std::call_once(commit_once_, [this] { commit(); }); + return true; + } + + void flush_gate::commit() + { + for (auto const& [layer_hash, count] : expected_counts_) { + committed_counts_->add_to(layer_hash, count.load()); + } + + // At some point, we might consider clearing the expected_counts_ map to free memory, + // but for now we can just leave it as-is since the flush_gate will likely be + // destroyed soon after commit() is called. + } + +} // namespace phlex::experimental diff --git a/phlex/model/flush_gate.hpp b/phlex/model/flush_gate.hpp new file mode 100644 index 00000000..7cea5adc --- /dev/null +++ b/phlex/model/flush_gate.hpp @@ -0,0 +1,108 @@ +#ifndef PHLEX_MODEL_FLUSH_GATE_HPP +#define PHLEX_MODEL_FLUSH_GATE_HPP + +// ========================================================================================= +// flush_gate +// +// A flush_gate is a per-parent-index completion detector: for a given data_cell_index, it +// decides when the parent's entire subtree of descendant data cells has been accounted for +// and the parent is therefore ready to emit a flush (end-of-subtree token). +// +// One flush_gate is created per data_cell_index by index_router, keyed by the index's hash. +// It accumulates three independent streams of information about that index's subtree: +// +// 1. Expected child counts per child layer, supplied by unfold operations via +// update_expected_count(). The gate waits for `expected_flush_count` such messages +// (one per unfold consuming the parent's layer) before treating the expectation as +// complete. +// +// 2. Rollups of committed counts from non-lowest-layer direct children, recorded via +// roll_up_child() and balanced against an announced expectation set by +// expect_child_rollups(). This is a signed counter that must reach zero before the +// gate opens. +// +// 3. Lowest-layer children, which require no per-arrival accounting: their announced +// count is the final count and is merged into committed_counts_ at commit time. +// +// Readiness (all_children_accounted()) is a quiescence test: all expected flush messages +// must have arrived AND all expected non-lowest rollups must have been received. When +// that holds, the gate commits (merges expected_counts_ into committed_counts_) and the +// router walks up to the parent, rolling this gate's counts into the grandparent via +// roll_up_child(). +// ========================================================================================= + +#include "phlex/model/data_cell_counts.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/phlex_model_export.hpp" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + + class PHLEX_MODEL_EXPORT flush_gate { + using flush_callback_t = std::function; + + public: + // expected_flush_count controls how many update_expected_count() calls must arrive before + // all_children_accounted() can return true. A value of 0 means a single call is sufficient + // (the common case when only one unfold consumes this index's layer). A value greater than 1 + // means that multiple unfolds produce children from the same parent layer. + explicit flush_gate(data_cell_index_ptr index, std::size_t expected_flush_count); + + data_cell_index_ptr const index() const { return index_; } + std::size_t expected_total_count() const; + std::size_t committed_total_count() const; + std::size_t committed_count_for_layer(data_cell_index::hash_type layer_hash) const; + data_cell_counts_const_ptr committed_counts() const { return committed_counts_; } + + // Merges an expected child count into the accumulated expected counts. Each call + // represents one flush message arriving (e.g. one unfold completing for this index). + void update_expected_count(data_cell_index::hash_type layer_hash, std::size_t count); + + // Records that a non-lowest direct child has rolled up: merges its committed_counts + // into this gate's and decrements the pending-rollups balance. The two steps are + // bundled because every rollup must do both, in the same call. + void roll_up_child(data_cell_counts_const_ptr child_committed_counts); + + // Announces that n additional non-lowest direct children are expected to roll up. + // Lowest-layer children require no such bookkeeping: their counts are fully accounted + // for by the expected-count message that produced them (from the input_node or an + // unfold). The pending counter is signed because rollups can be recorded before the + // corresponding expected-count message has been processed. + void expect_child_rollups(std::ptrdiff_t n); + + void set_flush_callback(flush_callback_t callback) { flush_callback_ = std::move(callback); } + void send_flush(); + bool all_children_accounted(); + + private: + void commit(); + + data_cell_index_ptr const index_; + std::once_flag commit_once_; + // FIXME: We express committed_counts_ as a shared pointer so that we can copy the committed + // counts (this is done for determining the flush values for folds). Once the fold + // flushes are incorporated as part of the multi-layer join node infrastructure, it + // should be possible for committed_counts_ to no longer be a pointer, but a value. + std::shared_ptr committed_counts_; + // Accumulated expected child counts from all unfolds. + data_cell_counts expected_counts_; + std::atomic received_flush_count_{0}; + // Number of flush messages expected from unfolds. Zero means any single + // update_expected_count() call is sufficient to unblock all_children_accounted(). + std::size_t expected_flush_count_{0}; + // Signed running balance: (expected non-lowest direct-child rollups) - (rollups received). + // Commit-ready when this reaches zero (and the expected-count message has arrived). + std::atomic pending_child_rollups_{0}; + flush_callback_t flush_callback_; + }; + + using flush_gate_ptr = std::shared_ptr; + +} // namespace phlex::experimental + +#endif // PHLEX_MODEL_FLUSH_GATE_HPP diff --git a/phlex/model/flush_messages.hpp b/phlex/model/flush_messages.hpp new file mode 100644 index 00000000..aae866ce --- /dev/null +++ b/phlex/model/flush_messages.hpp @@ -0,0 +1,39 @@ +#ifndef PHLEX_MODEL_FLUSH_MESSAGES_HPP +#define PHLEX_MODEL_FLUSH_MESSAGES_HPP + +#include "phlex/phlex_model_export.hpp" + +#include "phlex/model/data_cell_counts.hpp" +#include "phlex/model/data_cell_index.hpp" + +#include +#include + +namespace phlex::experimental { + struct PHLEX_MODEL_EXPORT index_flush { + data_cell_index_ptr index; + // Ideally, the counts field should be a `data_cell_counts_const_ptr` to ensure immutability. + // However, this type is also used for incrementing counters, so it must be mutable. + data_cell_counts_ptr counts; + }; + + using index_flushes = std::vector; + + // A simpler flush message sent by an unfold to the index_router. Unlike index_flush, which + // carries a map of child counts, unfold_flush carries a single (layer_hash, count) pair + // because each unfold produces children in exactly one child layer. + struct PHLEX_MODEL_EXPORT unfold_flush { + data_cell_index_ptr index; + data_cell_index::hash_type layer_hash{}; + std::size_t count{}; + }; + + // The `ready_flushes_then_emit` struct carries flushes that must be emitted + // (to close out already-emitted indices) before emitting `index_to_emit`. + struct PHLEX_MODEL_EXPORT ready_flushes_then_emit { + index_flushes ready_flushes{}; + data_cell_index_ptr index_to_emit{nullptr}; + }; +} + +#endif // PHLEX_MODEL_FLUSH_MESSAGES_HPP diff --git a/phlex/model/fwd.hpp b/phlex/model/fwd.hpp index 0c782a53..7b282c52 100644 --- a/phlex/model/fwd.hpp +++ b/phlex/model/fwd.hpp @@ -4,12 +4,12 @@ #include namespace phlex::experimental { - class data_cell_counter; class data_layer_hierarchy; - class flush_counts; + class data_cell_counts; class product_store; - using flush_counts_ptr = std::shared_ptr; + using data_cell_counts_const_ptr = std::shared_ptr; + using data_cell_counts_ptr = std::shared_ptr; using product_store_const_ptr = std::shared_ptr; using product_store_ptr = std::shared_ptr; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b2d126c8..7a319342 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -162,10 +162,10 @@ cet_test( LIBRARIES layer_generator_internal phlex::core_internal spdlog::spdlog ) cet_test( - data_cell_counting + data_layer_hierarchy USE_CATCH2_MAIN SOURCE - data_cell_counting.cpp + data_layer_hierarchy_test.cpp LIBRARIES phlex::model_internal phlex::utilities_internal @@ -180,7 +180,7 @@ cet_test(data_cell_tracker USE_CATCH2_MAIN SOURCE data_cell_tracker_test.cpp LIB phlex::model TBB::tbb ) -cet_test(child_tracker USE_CATCH2_MAIN SOURCE child_tracker_test.cpp LIBRARIES +cet_test(flush_gate USE_CATCH2_MAIN SOURCE flush_gate_test.cpp LIBRARIES phlex::model_internal TBB::tbb ) diff --git a/test/child_tracker_test.cpp b/test/child_tracker_test.cpp deleted file mode 100644 index 36cca22e..00000000 --- a/test/child_tracker_test.cpp +++ /dev/null @@ -1,273 +0,0 @@ -// ======================================================================================= -// Unit tests for child_tracker covering: -// - single- and two-layer index hierarchies (job → runs → spills) -// - grandchild count propagation via update_committed_counts -// - blocking on expected_flush_count > 1 (multiple unfolds into the same parent layer) -// - all_children_accounted() returning false before any flush message arrives -// - concurrent execution via tbb::parallel_for with concurrent_hash_map/concurrent_vector -// -// The local flush_if_done() helper mirrors the core propagation logic from index_router, -// allowing child_tracker to be tested without a TBB flow graph. -// ======================================================================================= - -#include "phlex/model/child_tracker.hpp" -#include "phlex/model/data_cell_index.hpp" -#include "phlex/model/data_cell_tracker.hpp" -#include "phlex/model/identifier.hpp" - -#include "catch2/catch_test_macros.hpp" -#include "catch2/matchers/catch_matchers_string.hpp" -#include "oneapi/tbb/concurrent_hash_map.h" -#include "oneapi/tbb/concurrent_vector.h" -#include "oneapi/tbb/parallel_for.h" -#include "spdlog/sinks/ostream_sink.h" -#include "spdlog/spdlog.h" - -#include -#include - -using namespace phlex; -using namespace phlex::experimental; -using namespace phlex::experimental::literals; - -namespace { - - void use_ostream_logger(std::ostringstream& oss) - { - auto ostream_sink = std::make_shared(oss); - auto ostream_logger = std::make_shared("my_logger", ostream_sink); - spdlog::set_default_logger(ostream_logger); - } - - // trackers_t maps index->hash() -> child_tracker_ptr. - // 'flushed' accumulates all trackers whose send_flush() was invoked. - using trackers_t = tbb::concurrent_hash_map; - using flushed_t = tbb::concurrent_vector; - - child_tracker_ptr make_tracker(data_cell_index_ptr index, std::size_t expected_flush_count) - { - auto tracker = std::make_shared(std::move(index), expected_flush_count); - tracker->set_flush_callback([](child_tracker const&) {}); - return tracker; - } - - void flush_if_done(data_cell_index_ptr index, trackers_t& trackers, flushed_t& flushed) - { - while (index) { - trackers_t::accessor a; - if (not trackers.find(a, index->hash())) { - return; - } - auto& tracker = a->second; - if (not tracker->all_children_accounted()) { - return; - } - - flushed.push_back(tracker); - tracker->send_flush(); - - auto parent = index->parent(); - if (parent) { - if (trackers_t::accessor pa; trackers.find(pa, parent->hash())) { - pa->second->update_committed_counts(tracker->committed_counts()); - pa->second->increment(index->layer_hash()); - } - } - trackers.erase(a); - index = parent; - } - } -} - -TEST_CASE("child_tracker: no registered callback", "[child_tracker]") -{ - std::ostringstream oss; - use_ostream_logger(oss); - auto tracker = std::make_shared(data_cell_index::job(), 0); - tracker->send_flush(); - CHECK_THAT(oss.str(), Catch::Matchers::ContainsSubstring("No flush callback set for index: []")); -} - -TEST_CASE("child_tracker: single-layer hierarchy (job -> runs)", "[child_tracker]") -{ - auto job = data_cell_index::job(); - auto run0 = job->make_child("run", 0); - auto run1 = job->make_child("run", 1); - auto run_layer_hash = run0->layer_hash(); - - // The unfold into run fires once, reporting 2 children. - auto job_tracker = make_tracker(job, 0); - job_tracker->update_expected_count(run_layer_hash, 2); - - trackers_t trackers; - trackers.emplace(job->hash(), job_tracker); - - flushed_t flushed; - - // Run 0 completes — job tracker not yet done (run 1 still outstanding). - job_tracker->increment(run_layer_hash); - flush_if_done(job, trackers, flushed); - CHECK(flushed.empty()); - - // Run 1 completes — job tracker is now done. - job_tracker->increment(run_layer_hash); - flush_if_done(job, trackers, flushed); - - REQUIRE(flushed.size() == 1); - auto const& jt = flushed[0]; - CHECK(jt->index() == job); - CHECK(jt->expected_total_count() == 2); - CHECK(jt->processed_total_count() == 2); - CHECK(jt->committed_count_for_layer(run_layer_hash) == 2); - CHECK(trackers.empty()); -} - -TEST_CASE("child_tracker: two-layer hierarchy (job -> runs -> spills)", "[child_tracker]") -{ - constexpr std::size_t n_runs = 3; - constexpr std::size_t n_spills = 2; - - auto job = data_cell_index::job(); - auto run0 = - job->make_child("run", 0); // representative child; layer_hash is the same for all runs - auto run_layer_hash = run0->layer_hash(); - auto spill0 = run0->make_child("spill", 0); - auto spill_layer_hash = spill0->layer_hash(); - - trackers_t trackers; - flushed_t flushed; - - // Create the job-layer tracker. - auto job_tracker = make_tracker(job, 0); - job_tracker->update_expected_count(run_layer_hash, n_runs); - trackers.emplace(job->hash(), job_tracker); - - // Pre-populate all run trackers before running in parallel, so that flush_if_done - // can always find the job tracker when a run completes. - std::vector runs; - runs.reserve(n_runs); - for (std::size_t const r : std::views::iota(0uz, n_runs)) { - auto& run_index = runs.emplace_back(job->make_child("run", r)); - auto run_tracker = make_tracker(run_index, 0); - run_tracker->update_expected_count(spill_layer_hash, n_spills); - trackers.emplace(run_index->hash(), run_tracker); - } - - // Process each run's spills in parallel. - tbb::parallel_for(0uz, n_runs, [&](std::size_t r) { - trackers_t::const_accessor a; - trackers.find(a, runs[r]->hash()); - auto const& run_tracker = a->second; - a.release(); - for (std::size_t s = 0uz; s < n_spills; ++s) { - run_tracker->increment(spill_layer_hash); - flush_if_done(runs[r], trackers, flushed); - } - }); - - REQUIRE(flushed.size() == n_runs + 1); // one per run + the job - - // Insertion order into flushed is non-deterministic under parallel execution, - // so partition by layer name rather than relying on position. - child_tracker_ptr job_flushed; - for (auto const& ft : flushed) { - if (ft->index()->layer_name() == "run"_id) { - CHECK(ft->expected_total_count() == n_spills); - CHECK(ft->processed_total_count() == n_spills); - CHECK(ft->committed_count_for_layer(spill_layer_hash) == n_spills); - } else { - REQUIRE(ft->index() == job); - job_flushed = ft; - } - } - - REQUIRE(job_flushed); - CHECK(job_flushed->expected_total_count() == n_runs); - CHECK(job_flushed->processed_total_count() == n_runs); - // Immediate children (runs) counted directly. - CHECK(job_flushed->committed_count_for_layer(run_layer_hash) == n_runs); - // Grandchildren (spills) propagated up from the run trackers. - CHECK(job_flushed->committed_count_for_layer(spill_layer_hash) == n_runs * n_spills); - - CHECK(trackers.empty()); -} - -TEST_CASE("child_tracker: multiple unfolds into the same parent layer", "[child_tracker]") -{ - auto job = data_cell_index::job(); - auto run0 = job->make_child("run", 0); - auto run_layer_hash = run0->layer_hash(); - - // Two separate unfolds each produce 2 runs — expected_flush_count = 2. - auto job_tracker = make_tracker(job, 2); - - // Both expected-count messages arrive, each reporting 2 children. - job_tracker->update_expected_count(run_layer_hash, 2); - CHECK_FALSE(job_tracker->all_children_accounted()); // second flush not yet received - - job_tracker->update_expected_count(run_layer_hash, 2); - // expected total is now 4, but no children have been processed yet. - CHECK_FALSE(job_tracker->all_children_accounted()); - - // Process all 4 children one by one; only the last call should commit. - for (int child = 0; child < 3; ++child) { - job_tracker->increment(run_layer_hash); - CHECK_FALSE(job_tracker->all_children_accounted()); - } - job_tracker->increment(run_layer_hash); - CHECK(job_tracker->all_children_accounted()); - - CHECK(job_tracker->expected_total_count() == 4); - CHECK(job_tracker->processed_total_count() == 4); - CHECK(job_tracker->committed_count_for_layer(run_layer_hash) == 4); -} - -TEST_CASE("child_tracker: not done before any flush message arrives", "[child_tracker]") -{ - auto job = data_cell_index::job(); - auto run0 = job->make_child("run", 0); - auto run_layer_hash = run0->layer_hash(); - - auto tracker = make_tracker(job, 0); - - // No update_expected_count call yet — must return false. - CHECK_FALSE(tracker->all_children_accounted()); - - tracker->update_expected_count(run_layer_hash, 1); - CHECK_FALSE(tracker->all_children_accounted()); - CHECK(tracker->expected_total_count() == 1); - CHECK(tracker->processed_total_count() == 0); - - tracker->increment(run_layer_hash); - CHECK(tracker->all_children_accounted()); -} - -TEST_CASE("child_tracker: update_committed_counts accumulates across multiple children", - "[child_tracker]") -{ - auto job = data_cell_index::job(); - auto run0 = job->make_child("run", 0); - auto run1 = job->make_child("run", 1); - auto spill0 = run0->make_child("spill", 0); - auto spill_layer_hash = spill0->layer_hash(); - auto run_layer_hash = run0->layer_hash(); - - auto job_tracker = make_tracker(job, 0); - job_tracker->update_expected_count(run_layer_hash, 2); - - // Simulate run 0 committing with 3 spills. - data_cell_counts run0_committed; - run0_committed.add_to(spill_layer_hash, 3); - job_tracker->update_committed_counts(run0_committed); - job_tracker->increment(run_layer_hash); - - // Simulate run 1 committing with 5 spills. - data_cell_counts run1_committed; - run1_committed.add_to(spill_layer_hash, 5); - job_tracker->update_committed_counts(run1_committed); - job_tracker->increment(run_layer_hash); - - REQUIRE(job_tracker->all_children_accounted()); - CHECK(job_tracker->committed_count_for_layer(spill_layer_hash) == 8); - CHECK(job_tracker->committed_count_for_layer(run_layer_hash) == 2); -} diff --git a/test/data_cell_counting.cpp b/test/data_cell_counting.cpp deleted file mode 100644 index 2548557e..00000000 --- a/test/data_cell_counting.cpp +++ /dev/null @@ -1,144 +0,0 @@ -#include "phlex/model/data_cell_counter.hpp" -#include "phlex/model/data_cell_index.hpp" -#include "phlex/model/data_layer_hierarchy.hpp" -#include "phlex/utilities/hashing.hpp" - -#include "catch2/catch_test_macros.hpp" - -using namespace phlex::experimental; -using namespace phlex::experimental::literals; -using phlex::data_cell_index; - -namespace { - struct job_hash_fixture { - std::size_t job_hash_value() const - { - static std::size_t const cached_hash = "job"_idq.hash; - return cached_hash; - } - }; -} - -TEST_CASE("Counter with nothing processed", "[data model]") -{ - data_cell_counter job_counter{}; - CHECK(job_counter.result().empty()); -} - -TEST_CASE_METHOD(job_hash_fixture, "Counter one layer deep", "[data model]") -{ - data_cell_counter job_counter{}; - for (std::size_t i = 0; i != 10; ++i) { - job_counter.make_child("event"_id); - } - auto const event_hash_value = hash(job_hash_value(), "event"_idq.hash); - CHECK(job_counter.result().count_for(event_hash_value) == 10); -} - -TEST_CASE("Data layer hierarchy with ambiguous layer names", "[data model]") -{ - data_layer_hierarchy h; - CHECK_THROWS(h.count_for("/job")); - CHECK(h.count_for("/job", true) == 0); - - auto job_index = data_cell_index::job(); - h.increment_count(job_index); - CHECK(h.count_for("/job") == 1); - - auto spill_index = job_index->make_child("spill", 0); - h.increment_count(spill_index); - - auto run_index = job_index->make_child("run", 0); - h.increment_count(run_index); - CHECK(h.count_for("/job/run") == 1); - CHECK(h.count_for("run") == 1); - - // Nested spill indices - h.increment_count(run_index->make_child("spill", 0)); - h.increment_count(run_index->make_child("spill", 1)); - - CHECK_THROWS(h.count_for("spill")); - CHECK(h.count_for("/job/spill") == 1); - CHECK(h.count_for("/job/run/spill") == 2); -} - -TEST_CASE("Data layer hierarchy with unnamed layer", "[data model]") -{ - // Exercises the maybe_name() fallback to "(unnamed)" for layers with empty names. - // Invoke print() explicitly so the test body, rather than teardown side effects, - // traverses the hierarchy and reaches maybe_name("") for the unnamed child. - data_layer_hierarchy h; - auto job_index = data_cell_index::job(); - CHECK_NOTHROW(h.increment_count(job_index)); - CHECK_NOTHROW(h.increment_count(job_index->make_child("", 0))); - CHECK_NOTHROW(h.print()); -} - -TEST_CASE_METHOD(job_hash_fixture, "Counter multiple layers deep", "[data model]") -{ - constexpr std::size_t nruns{2ull}; - constexpr std::size_t nsubruns_per_run{3ull}; - constexpr std::size_t nevents_per_subrun{5ull}; - - std::size_t processed_jobs{}; - std::size_t processed_runs{}; - std::size_t processed_subruns{}; - std::size_t processed_events{}; - - data_layer_hierarchy h; - flush_counters counters; - - // Notice the wholesale capture by reference--generally a lazy way of doing things. - auto check_all_processed = [&] { - CHECK(h.count_for("job", true) == processed_jobs); - CHECK(h.count_for("run", true) == processed_runs); - CHECK(h.count_for("subrun", true) == processed_subruns); - CHECK(h.count_for("event", true) == processed_events); - }; - - auto const run_hash_value = hash(job_hash_value(), "run"_idq.hash); - auto const subrun_hash_value = hash(run_hash_value, "subrun"_idq.hash); - auto const event_hash_value = hash(subrun_hash_value, "event"_idq.hash); - - auto job_index = data_cell_index::job(); - counters.update(job_index); - for (std::size_t i = 0; i != nruns; ++i) { - auto run_index = job_index->make_child("run", i); - counters.update(run_index); - for (std::size_t j = 0; j != nsubruns_per_run; ++j) { - auto subrun_index = run_index->make_child("subrun", j); - counters.update(subrun_index); - for (std::size_t k = 0; k != nevents_per_subrun; ++k) { - auto event_index = subrun_index->make_child("event", k); - counters.update(event_index); - ++processed_events; - - h.increment_count(event_index); - auto results = counters.extract(event_index); - CHECK(results.empty()); - check_all_processed(); - } - h.increment_count(subrun_index); - auto results = counters.extract(subrun_index); - ++processed_subruns; - - CHECK(results.count_for(event_hash_value)); - check_all_processed(); - } - h.increment_count(run_index); - auto results = counters.extract(run_index); - ++processed_runs; - - CHECK(results.count_for(event_hash_value) == nevents_per_subrun * nsubruns_per_run); - CHECK(results.count_for(subrun_hash_value) == nsubruns_per_run); - check_all_processed(); - } - h.increment_count(job_index); - auto results = counters.extract(job_index); - ++processed_jobs; - - CHECK(results.count_for(event_hash_value) == nevents_per_subrun * nsubruns_per_run * nruns); - CHECK(results.count_for(subrun_hash_value) == nsubruns_per_run * nruns); - CHECK(results.count_for(run_hash_value) == nruns); - check_all_processed(); -} diff --git a/test/data_cell_tracker_test.cpp b/test/data_cell_tracker_test.cpp index 5afc4005..838549cc 100644 --- a/test/data_cell_tracker_test.cpp +++ b/test/data_cell_tracker_test.cpp @@ -29,13 +29,14 @@ TEST_CASE("Test data-cell tracker", "[graph]") auto subspill2 = spill6->make_child("subspill", 2); auto run5 = job_index->make_child("run", 5); - CHECK(tracker.closeout(job_index).empty()); - CHECK(tracker.closeout(run4).empty()); - CHECK(tracker.closeout(spill5).empty()); - CHECK(tracker.closeout(spill6).empty()); - CHECK(tracker.closeout(subspill2).empty()); - - auto flushes = tracker.closeout(run5); + // Present data-cell indices in an order that reflects what a normal framework program would do. + CHECK(tracker.report_and_evict_ready_flushes(job_index).empty()); + CHECK(tracker.report_and_evict_ready_flushes(run4).empty()); + CHECK(tracker.report_and_evict_ready_flushes(spill5).empty()); + CHECK(tracker.report_and_evict_ready_flushes(spill6).empty()); + CHECK(tracker.report_and_evict_ready_flushes(subspill2).empty()); + + auto flushes = tracker.report_and_evict_ready_flushes(run5); REQUIRE(flushes.size() == 2); auto spill6_flush = flushes[0]; @@ -48,7 +49,7 @@ TEST_CASE("Test data-cell tracker", "[graph]") REQUIRE(run4_flush.counts->size() == 1); // Should only be "spill" layer CHECK(run4_flush.counts->count(spill5->layer_hash()) == 2); // spills 5 and 6 - flushes = tracker.closeout(nullptr); + flushes = tracker.report_and_evict_ready_flushes(nullptr); REQUIRE(flushes.size() == 1); // only job should have a flush count auto job_flush = flushes[0]; @@ -66,12 +67,12 @@ TEST_CASE("Test data-cell tracker with multiple hierarchy branches", "[graph]") auto calib1 = job_index->make_child("calib", 1); auto run5 = job_index->make_child("run", 5); - CHECK(tracker.closeout(job_index).empty()); - CHECK(tracker.closeout(run4).empty()); - CHECK(tracker.closeout(calib1).empty()); - CHECK(tracker.closeout(run5).empty()); + CHECK(tracker.report_and_evict_ready_flushes(job_index).empty()); + CHECK(tracker.report_and_evict_ready_flushes(run4).empty()); + CHECK(tracker.report_and_evict_ready_flushes(calib1).empty()); + CHECK(tracker.report_and_evict_ready_flushes(run5).empty()); - auto flushes = tracker.closeout(nullptr); + auto flushes = tracker.report_and_evict_ready_flushes(nullptr); REQUIRE(flushes.size() == 1); // only job should have a flush count auto job_flush = flushes[0]; CHECK(job_flush.index == job_index); @@ -88,9 +89,9 @@ TEST_CASE("Test data-cell tracker with missing intermediate layers", "[graph]") auto run4 = job_index->make_child("run", 4); auto spill2 = run4->make_child("spill", 2); - CHECK(tracker.closeout(job_index).empty()); + CHECK(tracker.report_and_evict_ready_flushes(job_index).empty()); - CHECK_THROWS_WITH(tracker.closeout(spill2), + CHECK_THROWS_WITH(tracker.report_and_evict_ready_flushes(spill2), "Received index [run:4, spill:2], which is not an immediate child of []"); } @@ -103,8 +104,8 @@ TEST_CASE("Cached flush counts at destruction generate warning message", "[graph auto job_index = data_cell_index::job(); auto run4 = job_index->make_child("run", 4); - CHECK(tracker->closeout(job_index).empty()); - CHECK(tracker->closeout(run4).empty()); + CHECK(tracker->report_and_evict_ready_flushes(job_index).empty()); + CHECK(tracker->report_and_evict_ready_flushes(run4).empty()); tracker.reset(); // Invoke destructor to trigger warning message auto const warning = oss.str(); diff --git a/test/data_layer_hierarchy_test.cpp b/test/data_layer_hierarchy_test.cpp new file mode 100644 index 00000000..9b4a6d0d --- /dev/null +++ b/test/data_layer_hierarchy_test.cpp @@ -0,0 +1,46 @@ +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_layer_hierarchy.hpp" + +#include "catch2/catch_test_macros.hpp" + +using namespace phlex::experimental; +using phlex::data_cell_index; + +TEST_CASE("Data layer hierarchy with ambiguous layer names", "[data model]") +{ + data_layer_hierarchy h; + CHECK_THROWS(h.count_for("/job")); + CHECK(h.count_for("/job", true) == 0); + + auto job_index = data_cell_index::job(); + h.increment_count(job_index); + CHECK(h.count_for("/job") == 1); + + auto spill_index = job_index->make_child("spill", 0); + h.increment_count(spill_index); + + auto run_index = job_index->make_child("run", 0); + h.increment_count(run_index); + CHECK(h.count_for("/job/run") == 1); + CHECK(h.count_for("run") == 1); + + // Nested spill indices + h.increment_count(run_index->make_child("spill", 0)); + h.increment_count(run_index->make_child("spill", 1)); + + CHECK_THROWS(h.count_for("spill")); + CHECK(h.count_for("/job/spill") == 1); + CHECK(h.count_for("/job/run/spill") == 2); +} + +TEST_CASE("Data layer hierarchy with unnamed layer", "[data model]") +{ + // Exercises the maybe_name() fallback to "(unnamed)" for layers with empty names. + // Invoke print() explicitly so the test body, rather than teardown side effects, + // traverses the hierarchy and reaches maybe_name("") for the unnamed child. + data_layer_hierarchy h; + auto job_index = data_cell_index::job(); + CHECK_NOTHROW(h.increment_count(job_index)); + CHECK_NOTHROW(h.increment_count(job_index->make_child("", 0))); + CHECK_NOTHROW(h.print()); +} diff --git a/test/flush_gate_test.cpp b/test/flush_gate_test.cpp new file mode 100644 index 00000000..3412306c --- /dev/null +++ b/test/flush_gate_test.cpp @@ -0,0 +1,237 @@ +// ======================================================================================= +// Unit tests for flush_gate covering: +// - single- and two-layer index hierarchies (job → runs → spills) +// - grandchild count propagation via roll_up_child +// - blocking on expected_flush_count > 1 (multiple unfolds into the same parent layer) +// - all_children_accounted() returning false before any flush message arrives +// - concurrent execution via tbb::parallel_for with concurrent_hash_map/concurrent_vector +// +// The local flush_if_done() helper mirrors the core propagation logic from index_router, +// allowing flush_gate to be tested without a TBB flow graph. In the router, the +// expectation bookkeeping (expect_child_rollups) is performed inside apply_expected_count() +// for non-lowest child layers only; lowest-layer children are fully accounted for by the +// expected-count message itself. These tests perform that bookkeeping directly so that +// flush_gate's readiness logic can be exercised in isolation. +// ======================================================================================= + +#include "phlex/model/data_cell_counts.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/flush_gate.hpp" +#include "phlex/model/identifier.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_vector.h" +#include "oneapi/tbb/parallel_for.h" +#include "spdlog/spdlog.h" + +#include +#include + +using namespace phlex; +using namespace phlex::experimental; +using namespace phlex::experimental::literals; + +namespace { + + // gates_t maps index->hash() -> flush_gate_ptr. + // 'flushed' accumulates all gates whose send_flush() was invoked. + using gates_t = tbb::concurrent_hash_map; + using flushed_t = tbb::concurrent_vector; + + flush_gate_ptr make_gate(data_cell_index_ptr index, std::size_t expected_flush_count) + { + auto gate = std::make_shared(std::move(index), expected_flush_count); + gate->set_flush_callback([](flush_gate const&) {}); + return gate; + } + + // Mirrors index_router::flush_if_done: walk up from `index`, flushing gates whose + // children are all accounted for and rolling each gate's committed_counts into its + // parent via roll_up_child(). + void flush_if_done(data_cell_index_ptr index, gates_t& gates, flushed_t& flushed) + { + while (index) { + gates_t::accessor a; + if (not gates.find(a, index->hash())) { + return; + } + auto& gate = a->second; + if (not gate->all_children_accounted()) { + return; + } + + flushed.push_back(gate); + gate->send_flush(); + + auto parent = index->parent(); + if (parent) { + if (gates_t::accessor pa; gates.find(pa, parent->hash())) { + pa->second->roll_up_child(gate->committed_counts()); + } + } + gates.erase(a); + index = parent; + } + } +} + +TEST_CASE("flush_gate: single-layer hierarchy (job -> runs)", "[flush_gate]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run_layer_hash = run0->layer_hash(); + + // The unfold into run fires once, reporting 2 children. Runs are lowest in this test + // (no descendants), so the expected-count message alone is sufficient to mark the + // job gate ready — no per-child accounting is performed. + auto job_gate = make_gate(job, 0); + job_gate->update_expected_count(run_layer_hash, 2); + + gates_t gates; + gates.emplace(job->hash(), job_gate); + + flushed_t flushed; + flush_if_done(job, gates, flushed); + + REQUIRE(flushed.size() == 1); + auto const& jt = flushed[0]; + CHECK(jt->index() == job); + CHECK(jt->expected_total_count() == 2); + CHECK(jt->committed_count_for_layer(run_layer_hash) == 2); + CHECK(gates.empty()); +} + +TEST_CASE("flush_gate: two-layer hierarchy (job -> runs -> spills)", "[flush_gate]") +{ + constexpr std::size_t n_runs = 3; + constexpr std::size_t n_spills = 2; + + auto job = data_cell_index::job(); + auto run0 = + job->make_child("run", 0); // representative child; layer_hash is the same for all runs + auto run_layer_hash = run0->layer_hash(); + auto spill0 = run0->make_child("spill", 0); + auto spill_layer_hash = spill0->layer_hash(); + + gates_t gates; + flushed_t flushed; + + // Create the job-layer gate. Runs are non-lowest (they have their own gates), + // so the job gate awaits n_runs rollups. + auto job_gate = make_gate(job, 0); + job_gate->expect_child_rollups(n_runs); + job_gate->update_expected_count(run_layer_hash, n_runs); + gates.emplace(job->hash(), job_gate); + + // Pre-populate all run gates before running in parallel, so that flush_if_done + // can always find the job gate when a run completes. Spills are lowest under each + // run, so the run gates commit as soon as their expected-count message arrives. + std::vector runs; + runs.reserve(n_runs); + for (std::size_t const r : std::views::iota(0uz, n_runs)) { + auto& run_index = runs.emplace_back(job->make_child("run", r)); + auto run_gate = make_gate(run_index, 0); + run_gate->update_expected_count(spill_layer_hash, n_spills); + gates.emplace(run_index->hash(), run_gate); + } + + // Flush each run in parallel. Each run's flush_if_done will roll its committed_counts_ + // up into the job gate and, on the last one, flush the job as well. + tbb::parallel_for(0uz, n_runs, [&](std::size_t r) { flush_if_done(runs[r], gates, flushed); }); + + REQUIRE(flushed.size() == n_runs + 1); // one per run + the job + + // Insertion order into flushed is non-deterministic under parallel execution, + // so partition by layer name rather than relying on position. + flush_gate_ptr job_flushed; + for (auto const& ft : flushed) { + if (ft->index()->layer_name() == "run"_id) { + CHECK(ft->expected_total_count() == n_spills); + CHECK(ft->committed_count_for_layer(spill_layer_hash) == n_spills); + } else { + REQUIRE(ft->index() == job); + job_flushed = ft; + } + } + + REQUIRE(job_flushed); + CHECK(job_flushed->expected_total_count() == n_runs); + // Immediate children (runs) counted directly. + CHECK(job_flushed->committed_count_for_layer(run_layer_hash) == n_runs); + // Grandchildren (spills) propagated up from the run gates. + CHECK(job_flushed->committed_count_for_layer(spill_layer_hash) == n_runs * n_spills); + + CHECK(gates.empty()); +} + +TEST_CASE("flush_gate: multiple unfolds into the same parent layer", "[flush_gate]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run_layer_hash = run0->layer_hash(); + + // Two separate unfolds each produce 2 runs — expected_flush_count = 2. Runs are + // lowest in this test, so once both expected-count messages have arrived the gate + // is ready (no per-child accounting). Until then, the higher expected_flush_count + // gates readiness. + auto job_gate = make_gate(job, 2); + + // First flush message: 2 children expected, but the second flush hasn't arrived yet. + job_gate->update_expected_count(run_layer_hash, 2); + CHECK_FALSE(job_gate->all_children_accounted()); + + // Second flush message arrives — both unfolds have now reported. + job_gate->update_expected_count(run_layer_hash, 2); + CHECK(job_gate->all_children_accounted()); + + CHECK(job_gate->expected_total_count() == 4); + CHECK(job_gate->committed_count_for_layer(run_layer_hash) == 4); +} + +TEST_CASE("flush_gate: not done before any flush message arrives", "[flush_gate]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run_layer_hash = run0->layer_hash(); + + auto gate = make_gate(job, 0); + + // No update_expected_count call yet — must return false. + CHECK_FALSE(gate->all_children_accounted()); + + // After the expected-count message arrives for a lowest child layer, the gate + // is immediately ready (no per-child accounting needed). + gate->update_expected_count(run_layer_hash, 1); + CHECK(gate->all_children_accounted()); + CHECK(gate->expected_total_count() == 1); +} + +TEST_CASE("flush_gate: roll_up_child accumulates across multiple children", "[flush_gate]") +{ + auto job = data_cell_index::job(); + auto run0 = job->make_child("run", 0); + auto run1 = job->make_child("run", 1); + auto spill0 = run0->make_child("spill", 0); + auto spill_layer_hash = spill0->layer_hash(); + auto run_layer_hash = run0->layer_hash(); + + // Runs are non-lowest in this scenario (they have spills as descendants). + auto job_gate = make_gate(job, 0); + job_gate->expect_child_rollups(2); + job_gate->update_expected_count(run_layer_hash, 2); + + // Simulate run 0 rolling up with 3 spills. + auto run0_committed = std::make_shared(); + run0_committed->add_to(spill_layer_hash, 3); + job_gate->roll_up_child(run0_committed); + + // Simulate run 1 rolling up with 5 spills. + auto run1_committed = std::make_shared(); + run1_committed->add_to(spill_layer_hash, 5); + job_gate->roll_up_child(run1_committed); + + REQUIRE(job_gate->all_children_accounted()); + CHECK(job_gate->committed_count_for_layer(spill_layer_hash) == 8); + CHECK(job_gate->committed_count_for_layer(run_layer_hash) == 2); +} From 5da5756809608563c1a41aaf4d8a8387376faaad Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 19 May 2026 16:41:15 -0500 Subject: [PATCH 4/5] Some renaming --- phlex/core/declared_unfold.hpp | 9 +++++---- phlex/core/framework_graph.cpp | 10 +++++----- phlex/core/index_router.cpp | 32 +++++++++++++++----------------- phlex/core/index_router.hpp | 13 ++++++++----- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index d773f26a..d1f904db 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -111,11 +111,12 @@ namespace phlex::experimental { auto const& msg = most_derived(messages); auto const& store = msg.store; - generator g{store, this->full_name(), child_layer()}; - call(p, ufold, store->index(), g, messages, std::make_index_sequence{}); + generator gen{store, this->full_name(), child_layer()}; + call( + p, ufold, store->index(), gen, messages, std::make_index_sequence{}); std::get<2>(outputs).try_put({.index = store->index(), - .layer_hash = g.child_layer_hash(), - .count = g.child_count()}); + .layer_hash = gen.child_layer_hash(), + .count = gen.child_count()}); }} { if constexpr (num_inputs > 1ull) { diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 052f7d0e..f79f7cfc 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -190,19 +190,19 @@ namespace phlex::experimental { // framework. To assemble the report, data-cell indices emitted by the input node are // recorded as well as any data-cell indices emitted by an unfold. - // FIXME: Eventually the separate index_receiver_ and index_router_.index_receiver() may be combined. - // Should also consider whether inline tasks can be used. + // FIXME: Eventually the separate index_receiver_ and index_router_.unfold_index_receiver() + // may be combined. make_edge(src_, index_receiver_); make_edge(index_receiver_, hierarchy_node_); - make_edge(index_router_.index_receiver(), hierarchy_node_); + make_edge(index_router_.unfold_index_receiver(), hierarchy_node_); for (auto& [_, node] : nodes_.folds) { make_edge(index_router_.flusher(), node->flush_port()); } for (auto& [_, node] : nodes_.unfolds) { - make_edge(node->output_index_port(), index_router_.index_receiver()); - make_edge(node->flush_sender(), index_router_.flush_receiver()); + make_edge(node->output_index_port(), index_router_.unfold_index_receiver()); + make_edge(node->flush_sender(), index_router_.unfold_flush_receiver()); } } } diff --git a/phlex/core/index_router.cpp b/phlex/core/index_router.cpp index 3912cd0b..14a25e7a 100644 --- a/phlex/core/index_router.cpp +++ b/phlex/core/index_router.cpp @@ -107,23 +107,21 @@ namespace phlex::experimental { //======================================================================================== // index_router implementation index_router::index_router(tbb::flow::graph& g) : - index_receiver_{g, - tbb::flow::unlimited, - [this](index_message const& msg) -> data_cell_index_ptr { - auto const& [index, message_id, _] = msg; - assert(index); - return route(index, index_is_lowest_layer(index), message_id); - }}, - flush_receiver_{g, - tbb::flow::unlimited, - [this](unfold_flush input) -> tbb::flow::continue_msg { - auto&& [index, layer_hash, count] = input; - apply_expected_count(*gate_for(index), layer_hash, count); - // Because the flush receiver receives flush values, the index cannot - // correspond to a lowest layer. - flush_if_done(index); - return {}; - }}, + unfold_index_receiver_{g, + tbb::flow::unlimited, + [this](index_message const& msg) -> data_cell_index_ptr { + auto const& [index, message_id, _] = msg; + assert(index); + return route(index, index_is_lowest_layer(index), message_id); + }}, + unfold_flush_receiver_{g, + tbb::flow::unlimited, + [this](unfold_flush input) -> tbb::flow::continue_msg { + auto&& [index, layer_hash, count] = input; + apply_expected_count(*gate_for(index), layer_hash, count); + flush_if_done(index); + return {}; + }}, flusher_{g} { } diff --git a/phlex/core/index_router.hpp b/phlex/core/index_router.hpp index 6a31420d..f6fac58c 100644 --- a/phlex/core/index_router.hpp +++ b/phlex/core/index_router.hpp @@ -71,11 +71,14 @@ namespace phlex::experimental { void drain(index_flushes flushes); flusher_t& flusher() { return flusher_; } - tbb::flow::function_node& index_receiver() + tbb::flow::function_node& unfold_index_receiver() { - return index_receiver_; + return unfold_index_receiver_; + } + tbb::flow::function_node& unfold_flush_receiver() + { + return unfold_flush_receiver_; } - tbb::flow::function_node& flush_receiver() { return flush_receiver_; } private: data_cell_index_ptr route(data_cell_index_ptr index, @@ -99,8 +102,8 @@ namespace phlex::experimental { flush_gate_ptr gate_for(data_cell_index_ptr const& index); void flush_if_done(data_cell_index_ptr index); - tbb::flow::function_node index_receiver_; - tbb::flow::function_node flush_receiver_; + tbb::flow::function_node unfold_index_receiver_; + tbb::flow::function_node unfold_flush_receiver_; std::atomic received_indices_{}; flusher_t flusher_; tbb::concurrent_unordered_map is_lowest_layer_hashes_; From 0dafda7bd0af15577b3a20fb87cfaf79d05dd605 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Fri, 8 May 2026 09:00:27 -0500 Subject: [PATCH 5/5] Re-enable different_hierarchies test (now fold_duplicate_layer_name) --- test/CMakeLists.txt | 7 ++----- ..._hierarchies.cpp => fold_duplicate_layer_name_test.cpp} | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) rename test/{different_hierarchies.cpp => fold_duplicate_layer_name_test.cpp} (97%) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7a319342..56c5e6b8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -80,18 +80,15 @@ cet_test( ) cet_test( - different_hierarchies + fold_duplicate_layer_name USE_CATCH2_MAIN SOURCE - different_hierarchies.cpp + fold_duplicate_layer_name_test.cpp LIBRARIES phlex::core_internal spdlog::spdlog layer_generator_internal ) -# Disable pending resolution of [Change fold-result caching to use -# `multilayer_join_node`](https://github.com/Framework-R-D/phlex/issues/359) -set_tests_properties(different_hierarchies PROPERTIES DISABLED TRUE) cet_test(filter_impl USE_CATCH2_MAIN SOURCE filter_impl.cpp LIBRARIES phlex::core_internal diff --git a/test/different_hierarchies.cpp b/test/fold_duplicate_layer_name_test.cpp similarity index 97% rename from test/different_hierarchies.cpp rename to test/fold_duplicate_layer_name_test.cpp index 726675c2..38a24bd4 100644 --- a/test/different_hierarchies.cpp +++ b/test/fold_duplicate_layer_name_test.cpp @@ -48,7 +48,7 @@ namespace { void add(std::atomic& counter, unsigned int number) { counter += number; } } -TEST_CASE("Different hierarchies used with fold", "[graph]") +TEST_CASE("Fold different layer paths with same trailing name", "[graph]") { // job -> run -> event layers constexpr auto index_limit = 2u;