Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "phlex/core/declared_unfold.hpp"
#include "phlex/model/data_cell_counter.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/utilities/hashing.hpp"

#include "fmt/std.h"
#include "spdlog/spdlog.h"
Expand All @@ -12,25 +12,18 @@ namespace phlex::experimental {
std::string const& child_layer_name) :
parent_{std::const_pointer_cast<product_store>(parent)},
node_name_{std::move(node_name)},
child_layer_name_{child_layer_name}
child_layer_name_{child_layer_name},
child_layer_hash_{hash(parent->index()->layer_hash(), identifier{child_layer_name_}.hash())}
{
}

product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child_index = parent_->index()->make_child(child_layer_name_, i);
++child_counts_[child_index->layer_hash()];
++child_counts_;
return std::make_shared<product_store>(child_index, node_name_, std::move(new_products));
}

flush_counts_ptr generator::flush_result() const
{
if (not child_counts_.empty()) {
return std::make_shared<flush_counts const>(child_counts_);
}
return nullptr;
}

declared_unfold::declared_unfold(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products,
Expand Down
55 changes: 27 additions & 28 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "phlex/core/products_consumer.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/flush_messages.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/identifier.hpp"
#include "phlex/model/product_specification.hpp"
Expand Down Expand Up @@ -39,22 +40,19 @@ namespace phlex::experimental {
explicit generator(product_store_const_ptr const& parent,
algorithm_name node_name,
std::string const& child_layer_name);
flush_counts_ptr flush_result() const;

product_store_const_ptr make_child_for(std::size_t const data_cell_number,
products new_products)
{
return make_child(data_cell_number, std::move(new_products));
}
std::size_t child_layer_hash() const { return child_layer_hash_; }
std::size_t child_count() const { return child_counts_; }
product_store_const_ptr make_child(std::size_t i, products new_products);

private:
product_store_const_ptr make_child(std::size_t i, products new_products);
product_store_ptr parent_;
algorithm_name node_name_;
// References declared_unfold::child_layer_, which outlives this short-lived object.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
std::string const& child_layer_name_;
std::map<data_cell_index::hash_type, std::size_t> child_counts_;
std::size_t child_layer_hash_;
std::size_t child_counts_ = 0;
};

class PHLEX_CORE_EXPORT declared_unfold : public products_consumer {
Expand All @@ -66,10 +64,10 @@ namespace phlex::experimental {
~declared_unfold() override;

virtual tbb::flow::sender<message>& output_port() = 0;
virtual tbb::flow::sender<data_cell_index_ptr>& output_index_port() = 0;
virtual tbb::flow::sender<index_message>& output_index_port() = 0;
virtual tbb::flow::sender<unfold_flush>& flush_sender() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
virtual flusher_t& flusher() = 0;

std::string const& child_layer() const noexcept { return child_layer_; }

Expand Down Expand Up @@ -109,19 +107,17 @@ namespace phlex::experimental {
unfold_{g,
concurrency,
[this, p = std::move(predicate), ufold = std::move(unfold)](
messages_t<num_inputs> const& messages, auto&) {
messages_t<num_inputs> const& messages, auto& outputs) {
auto const& msg = most_derived(messages);
auto const& store = msg.store;

std::size_t const original_message_id{msg_counter_};
generator g{store, this->full_name(), child_layer()};
call(p, ufold, store->index(), g, messages, std::make_index_sequence<num_inputs>{});

flusher_.try_put({.index = store->index(),
.counts = g.flush_result(),
.original_id = original_message_id});
}},
flusher_{g}
generator gen{store, this->full_name(), child_layer()};
call(
p, ufold, store->index(), gen, messages, std::make_index_sequence<num_inputs>{});
std::get<2>(outputs).try_put({.index = store->index(),
.layer_hash = gen.child_layer_hash(),
.count = gen.child_count()});
}}
{
if constexpr (num_inputs > 1ull) {
make_edge(join_, unfold_);
Expand All @@ -142,12 +138,15 @@ namespace phlex::experimental {
{
return tbb::flow::output_port<0>(unfold_);
}
tbb::flow::sender<data_cell_index_ptr>& output_index_port() override
tbb::flow::sender<index_message>& output_index_port() override
{
return tbb::flow::output_port<1>(unfold_);
}
tbb::flow::sender<unfold_flush>& flush_sender() override
{
return tbb::flow::output_port<2>(unfold_);
}
product_specifications const& output() const override { return output_; }
flusher_t& flusher() override { return flusher_; }

template <std::size_t... Is>
void call(Predicate const& predicate,
Expand Down Expand Up @@ -181,10 +180,10 @@ namespace phlex::experimental {
}
++product_count_;

auto child = g.make_child_for(counter++, std::move(new_products));
tbb::flow::output_port<0>(unfold_).try_put(
{.store = child, .id = msg_counter_.fetch_add(1)});
tbb::flow::output_port<1>(unfold_).try_put(child->index());
auto child = g.make_child(counter++, std::move(new_products));
auto const msg_id = msg_counter_.fetch_add(1);
tbb::flow::output_port<0>(unfold_).try_put({.store = child, .id = msg_id});
tbb::flow::output_port<1>(unfold_).try_put({.index = child->index(), .msg_id = msg_id});
}
}

Expand All @@ -195,9 +194,9 @@ namespace phlex::experimental {
input_retriever_types<input_args> input_{input_arguments<input_args>()};
product_specifications output_;
join_or_none_t<num_inputs> join_;
tbb::flow::multifunction_node<messages_t<num_inputs>, std::tuple<message, data_cell_index_ptr>>
tbb::flow::multifunction_node<messages_t<num_inputs>,
std::tuple<message, index_message, unfold_flush>>
unfold_;
flusher_t flusher_;
std::atomic<std::size_t> msg_counter_{}; // Is this sufficient? Probably not.
std::atomic<std::size_t> calls_{};
std::atomic<std::size_t> product_count_{};
Expand Down
26 changes: 12 additions & 14 deletions phlex/core/edge_maker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ namespace phlex::experimental {
explicit edge_maker(Args&... args);

template <typename... Args>
void operator()(tbb::flow::graph& g,
index_router& multi,
std::map<std::string, filter>& filters,
declared_outputs& outputs,
provider_nodes& providers,
Args&... consumers);
std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
operator()(std::map<std::string, filter>& filters,
declared_outputs& outputs,
provider_nodes& providers,
Args&... consumers);

private:
template <typename T>
Expand Down Expand Up @@ -100,12 +99,11 @@ namespace phlex::experimental {
}

template <typename... Args>
void edge_maker::operator()(tbb::flow::graph& g,
index_router& multi,
std::map<std::string, filter>& filters,
declared_outputs& outputs,
provider_nodes& providers,
Args&... consumers)
std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
edge_maker::operator()(std::map<std::string, filter>& filters,
declared_outputs& outputs,
provider_nodes& providers,
Args&... consumers)
{
// Create edges to outputs
for (auto const& [output_name, output_node] : outputs) {
Expand All @@ -126,15 +124,15 @@ namespace phlex::experimental {

if (head_ports.empty()) {
// This can happen for jobs that only execute the driver, which is helpful for debugging
return;
return {};
}

auto provider_input_ports = make_provider_edges(std::move(head_ports), providers);

std::map<std::string, named_index_ports> multilayer_join_index_ports;
(multilayer_join_index_ports.merge(multilayer_ports(consumers)), ...);

multi.finalize(g, std::move(provider_input_ports), std::move(multilayer_join_index_ports));
return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports));
}
}

Expand Down
105 changes: 65 additions & 40 deletions phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "phlex/concurrency.hpp"
#include "phlex/core/edge_maker.hpp"
#include "phlex/model/data_cell_counter.hpp"
#include "phlex/model/product_store.hpp"

#include "fmt/std.h"
Expand All @@ -29,15 +28,21 @@ namespace phlex::experimental {
fixed_hierarchy_{std::move(bundle.hierarchy)},
driver_{std::move(bundle.driver)},
src_{graph_,
[this](tbb::flow_control& fc) mutable -> data_cell_index_ptr {
[this](tbb::flow_control& fc) mutable -> ready_flushes_then_emit {
if (auto item = driver_()) {
return index_router_.route(*item);
return {.ready_flushes = cell_tracker_.report_and_evict_ready_flushes(*item),
.index_to_emit = *item};
}
index_router_.drain();
fc.stop();
return {};
}},
index_router_{graph_},
index_receiver_{graph_,
tbb::flow::unlimited,
[this](ready_flushes_then_emit const& input) -> data_cell_index_ptr {
auto&& [ready_flushes, index_to_emit] = input;
return index_router_.route(index_to_emit, ready_flushes);
}},
hierarchy_node_{graph_,
tbb::flow::unlimited,
[this](data_cell_index_ptr const& index) -> tbb::flow::continue_msg {
Expand All @@ -53,7 +58,8 @@ namespace phlex::experimental {
{
if (shutdown_on_error_) {
// When in an error state, we need to sanely pop the layer stack and wait for any tasks to finish.
index_router_.drain();
auto remaining_flushes = cell_tracker_.report_and_evict_ready_flushes(nullptr);
index_router_.drain(std::move(remaining_flushes));
graph_.wait_for_all();
}
}
Expand Down Expand Up @@ -89,6 +95,10 @@ namespace phlex::experimental {
{
src_.activate();
graph_.wait_for_all();

// Now back out of all remaining layers
index_router_.drain(cell_tracker_.report_and_evict_ready_flushes(nullptr));
graph_.wait_for_all();
}

namespace {
Expand Down Expand Up @@ -135,49 +145,64 @@ namespace phlex::experimental {
filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.unfolds));
filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.transforms));

edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds};
make_edges(graph_,
index_router_,
filters_,
nodes_.outputs,
nodes_.providers,
nodes_.predicates,
nodes_.observers,
nodes_.folds,
nodes_.unfolds,
nodes_.transforms);

std::map<identifier, flusher_t*> flushers_from_unfolds;
std::set<identifier> unfold_input_layer_names;
// Count how many distinct unfold nodes consume each input layer. When that count is
// greater than one, the flush_gate for an index in that layer must collect a flush
// message from every unfold before it knows the total number of children it will see.
std::map<identifier, std::size_t> unfold_count_per_input_layer;
for (auto const& n : nodes_.unfolds | std::views::values) {
flushers_from_unfolds.try_emplace(identifier{n->child_layer()}, &n->flusher());
for (auto const& input : n->input()) {
if (!static_cast<identifier const&>(input.layer).empty()) {
unfold_input_layer_names.insert(input.layer);
++unfold_count_per_input_layer[identifier{input.layer}];
}
}
}

// Connect edges between all nodes, the graph-wide flusher, and the unfolds' flushers
auto connect_with_flusher =
[this, unfold_flushers = std::move(flushers_from_unfolds)](auto& consumers) {
for (auto& n : consumers | std::views::values) {
std::set<flusher_t*> flushers;
// For providers
for (product_query const& pq : n->input()) {
if (auto it = unfold_flushers.find(pq.layer); it != unfold_flushers.end()) {
flushers.insert(it->second);
} else {
flushers.insert(&index_router_.flusher());
}
}
for (flusher_t* flusher : flushers) {
make_edge(*flusher, n->flush_port());
}
}
};
connect_with_flusher(nodes_.folds);
std::vector<identifier> unfold_output_layer_names;
for (auto const& n : nodes_.unfolds | std::views::values) {
unfold_output_layer_names.emplace_back(n->child_layer());
}

index_router_.establish_layers(
fixed_hierarchy_.layer_paths(),
std::vector<identifier>(unfold_input_layer_names.begin(), unfold_input_layer_names.end()),
unfold_output_layer_names);
index_router_.register_unfold_count_per_input_layer(std::move(unfold_count_per_input_layer));

edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds};
auto [provider_input_ports, multilayer_join_index_ports] =
make_edges(filters_,
nodes_.outputs,
nodes_.providers,
// Consumers of data products below
nodes_.predicates,
nodes_.observers,
nodes_.folds,
nodes_.unfolds,
nodes_.transforms);
if (not std::empty(provider_input_ports)) {
index_router_.finalize(
graph_, std::move(provider_input_ports), std::move(multilayer_join_index_ports));
}

// The hierarchy node is used to report which data layers have been seen by the
// framework. To assemble the report, data-cell indices emitted by the input node are
// recorded as well as any data-cell indices emitted by an unfold.
make_edge(src_, hierarchy_node_);

// FIXME: Eventually the separate index_receiver_ and index_router_.unfold_index_receiver()
// may be combined.
make_edge(src_, index_receiver_);
make_edge(index_receiver_, hierarchy_node_);
make_edge(index_router_.unfold_index_receiver(), hierarchy_node_);

for (auto& [_, node] : nodes_.folds) {
make_edge(index_router_.flusher(), node->flush_port());
}

for (auto& [_, node] : nodes_.unfolds) {
make_edge(node->output_index_port(), hierarchy_node_);
make_edge(node->output_index_port(), index_router_.unfold_index_receiver());
make_edge(node->flush_sender(), index_router_.unfold_flush_receiver());
}
}
}
10 changes: 8 additions & 2 deletions phlex/core/framework_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include "phlex/core/message.hpp"
#include "phlex/core/node_catalog.hpp"
#include "phlex/driver.hpp"
#include "phlex/model/data_cell_tracker.hpp"
#include "phlex/model/data_layer_hierarchy.hpp"
#include "phlex/model/flush_messages.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/module.hpp"
#include "phlex/source.hpp"
Expand Down Expand Up @@ -166,9 +168,13 @@ namespace phlex::experimental {
tbb::flow::graph graph_{};
framework_driver driver_;
std::vector<std::string> registration_errors_{};
tbb::flow::input_node<data_cell_index_ptr> src_;
data_cell_tracker cell_tracker_{};
tbb::flow::input_node<ready_flushes_then_emit> src_;
index_router index_router_;
tbb::flow::function_node<data_cell_index_ptr> hierarchy_node_;
tbb::flow::function_node<ready_flushes_then_emit, data_cell_index_ptr, tbb::flow::lightweight>
index_receiver_;
tbb::flow::function_node<data_cell_index_ptr, tbb::flow::continue_msg, tbb::flow::lightweight>
hierarchy_node_;
bool shutdown_on_error_{false};
};
}
Expand Down
Loading
Loading