Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions form/form_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ PHLEX_REGISTER_PROVIDERS(s, config)

// --- Register providers dynamically from config ---
// FIXME: Prototype 0.1 -- types hardcoded as int.
// FIXME: output_product should also be given the correct stage name
for (auto const& name : products) {
s.provide("provide_" + name,
[form_input, creator, name](phlex::data_cell_index const& id) -> int {
return form_input->read<int>(creator, name, id);
})
.output_product(phlex::product_query{.creator = phlex::experimental::identifier(creator),
.layer = phlex::experimental::identifier("event"),
.suffix = phlex::experimental::identifier(name)});
.output_product(phlex::experimental::algorithm_name::create(creator),
phlex::experimental::identifier(name),
phlex::experimental::identifier("event"));
}

std::cout << "FORM input source registered successfully\n";
Expand Down
3 changes: 2 additions & 1 deletion phlex/core/edge_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace phlex::experimental {
bool found_match = false;
for (auto const& [_, p] : providers) {
auto& provider = *p;
if (port.input_product.match(provider.output_product())) {
if (port.input_product.match(
provider.output_product(), provider.layer(), provider.stage())) {
if (!result.contains(provider.full_name())) {
result.try_emplace(provider.full_name(), port.input_product, provider.input_port());
}
Expand Down
17 changes: 17 additions & 0 deletions phlex/core/product_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ namespace phlex {
return true;
}

// Check if a product_specification, layer, and stage together satisfies this query
bool product_query::match(experimental::product_specification const& spec,
experimental::identifier const& layer,
experimental::identifier const& stage) const
{
if (!match(spec)) {
return false;
}
if (this->layer != layer) {
return false;
}
if (this->stage && this->stage != stage) {
return false;
}
return true;
}

std::string product_query::to_string() const
{
if (suffix) {
Expand Down
5 changes: 5 additions & 0 deletions phlex/core/product_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ namespace phlex {
// Check if a product_specification satisfies this query
bool match(experimental::product_specification const& spec) const;

// Check if a product_specification, layer, and stage together satisfies this query
bool match(experimental::product_specification const& spec,
experimental::identifier const& layer,
experimental::identifier const& stage) const;

std::string to_string() const;

bool operator==(product_query const& rhs) const;
Expand Down
19 changes: 11 additions & 8 deletions phlex/core/provider_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ namespace phlex::experimental {
std::size_t concurrency,
tbb::flow::graph& g,
provider_function provider_func,
product_query output) :
product_specification output_spec,
identifier output_layer,
identifier stage) :
name_{std::move(name)},
output_product_{output},
output_{algorithm_name::create(std::string_view(identifier(output.creator))),
output.suffix.value_or(identifier("")),
output.type},
output_{std::move(output_spec)},
layer_{std::move(output_layer)},
stage_{std::move(stage)},
provider_{g,
concurrency,
[this, ft = std::move(provider_func)](index_message const& index_msg) -> message {
Expand All @@ -35,13 +36,15 @@ namespace phlex::experimental {
}}
{
spdlog::debug(
"Created provider node {} making output {}", this->full_name(), output.to_string());
"Created provider node {} making output {} ϵ {}", this->full_name(), output_.full(), layer_);
}

std::string provider_node::full_name() const { return name_.full(); }

product_query const& provider_node::output_product() const noexcept { return output_product_; }
product_specification const& provider_node::output_product() const noexcept { return output_; }

identifier const& provider_node::layer() const noexcept { return output_product_.layer; }
identifier const& provider_node::layer() const noexcept { return layer_; }

identifier const& provider_node::stage() const noexcept { return stage_; }

}
10 changes: 7 additions & 3 deletions phlex/core/provider_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,24 @@ namespace phlex::experimental {
std::size_t concurrency,
tbb::flow::graph& g,
provider_function provider_func,
product_query output);
product_specification output_spec,
identifier output_layer,
identifier stage);

std::string full_name() const;
product_query const& output_product() const noexcept;
product_specification const& output_product() const noexcept;
identifier const& layer() const noexcept;
identifier const& stage() const noexcept;

tbb::flow::receiver<index_message>* input_port() { return &provider_; }
tbb::flow::sender<message>& output_port() { return provider_; }
std::size_t num_calls() const { return calls_.load(); }

private:
algorithm_name name_;
product_query output_product_;
product_specification output_;
identifier layer_;
identifier stage_;
tbb::flow::function_node<index_message, message> provider_;
std::atomic<std::size_t> calls_;
};
Expand Down
27 changes: 20 additions & 7 deletions phlex/core/registration_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,33 @@ namespace phlex::experimental {
{
}

auto output_product(product_query output)
auto output_product(algorithm_name creator,
identifier suffix,
identifier output_layer,
identifier stage = "CURRENT"_id)
{
using return_type = return_type<typename AlgorithmBits::algorithm_type>;
output.type = make_type_id<return_type>();
product_specification output_spec(
std::move(creator), std::move(suffix), make_type_id<return_type>());

auto type_erased_alg = [alg = alg_.release_algorithm()](data_cell_index const& index) {
return product_for(std::invoke(alg, index));
};

registrar_.set_creator([this, alg = std::move(type_erased_alg), output = std::move(output)](
auto /* predicates */, auto /* output_product_suffixes */) {
return std::make_unique<provider_node>(
std::move(name_), concurrency_.value, graph_, std::move(alg), std::move(output));
});
registrar_.set_creator(
[this,
alg = std::move(type_erased_alg),
output_spec = std::move(output_spec),
output_layer = std::move(output_layer),
stage = std::move(stage)](auto /* predicates */, auto /* output_product_suffixes */) {
return std::make_unique<provider_node>(std::move(name_),
concurrency_.value,
graph_,
std::move(alg),
std::move(output_spec),
std::move(output_layer),
std::move(stage));
});
}

private:
Expand Down
42 changes: 28 additions & 14 deletions plugins/python/src/modulewrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,33 +1238,46 @@ static PyObject* sc_provide(py_phlex_source* src, PyObject* args, PyObject* kwds
PyErr_Clear();
}

// translate and validate the output query
// translate and validate the output "query"
// Since a query in Python is just a dictionary, it isn't called out in the user API as a query
auto opq = validate_query(output);
if (!opq.has_value()) {
// validate_query has set a python exception with details about the error
return nullptr;
}

algorithm_name creator =
algorithm_name::create(std::string_view(identifier(opq.value().creator)));
identifier layer = opq.value().layer;
identifier suffix = opq.value().suffix.value_or("");

// insert provider node (TODO: as in transform and observe, we'll leak the
// callable for now, until there's a proper shutdown procedure)
// Note: can't use a translator node here, b/c we need a module to add a
// transform, but we only have a source. However, the interface of a provider
// is fixed, so there is no combinatorics problem.
std::string const& out_type = output_types[0];
if (out_type == "bool") {
src->ph_source->provide(functor_name, provider_cb_bool{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_bool{callable})
.output_product(creator, suffix, layer);
} else if (out_type == "int32_t") {
src->ph_source->provide(functor_name, provider_cb_int{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_int{callable})
.output_product(creator, suffix, layer);
} else if (out_type == "uint32_t") {
src->ph_source->provide(functor_name, provider_cb_uint{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_uint{callable})
.output_product(creator, suffix, layer);
} else if (out_type == "int64_t") {
src->ph_source->provide(functor_name, provider_cb_long{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_long{callable})
.output_product(creator, suffix, layer);
} else if (out_type == "uint64_t") {
src->ph_source->provide(functor_name, provider_cb_ulong{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_ulong{callable})
.output_product(creator, suffix, layer);
} else if (out_type == "float") {
src->ph_source->provide(functor_name, provider_cb_float{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_float{callable})
.output_product(creator, suffix, layer);
} else if (out_type == "double") {
src->ph_source->provide(functor_name, provider_cb_double{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_double{callable})
.output_product(creator, suffix, layer);
} else if (out_type.compare(0, 7, "ndarray") == 0 || out_type.compare(0, 4, "list") == 0) {
// TODO: just like for input types, these are hard-coded, but should be handled by
// an IDL instead.
Expand All @@ -1274,22 +1287,23 @@ static PyObject* sc_provide(py_phlex_source* src, PyObject* args, PyObject* kwds
return nullptr;
}
if (*dtype == "[int32_t]") {
src->ph_source->provide(functor_name, provider_cb_vint{callable}).output_product(opq.value());
src->ph_source->provide(functor_name, provider_cb_vint{callable})
.output_product(creator, suffix, layer);
} else if (*dtype == "[uint32_t]") {
src->ph_source->provide(functor_name, provider_cb_vuint{callable})
.output_product(opq.value());
.output_product(creator, suffix, layer);
} else if (*dtype == "[int64_t]") {
src->ph_source->provide(functor_name, provider_cb_vlong{callable})
.output_product(opq.value());
.output_product(creator, suffix, layer);
} else if (*dtype == "[uint64_t]") {
src->ph_source->provide(functor_name, provider_cb_vulong{callable})
.output_product(opq.value());
.output_product(creator, suffix, layer);
} else if (*dtype == "[float]") {
src->ph_source->provide(functor_name, provider_cb_vfloat{callable})
.output_product(opq.value());
.output_product(creator, suffix, layer);
} else if (*dtype == "[double]") {
src->ph_source->provide(functor_name, provider_cb_vdouble{callable})
.output_product(opq.value());
.output_product(creator, suffix, layer);
} else {
PyErr_Format(PyExc_TypeError, "unsupported collection output type \"%s\"", out_type.c_str());
return nullptr;
Expand Down
6 changes: 3 additions & 3 deletions test/allowed_families.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ TEST_CASE("Testing families", "[data model]")

// Wire up providers for each level
g.provide("run_id_provider", provide_index, concurrency::unlimited)
.output_product(product_query{.creator = "dummy", .layer = "run", .suffix = "id"});
.output_product("dummy", "id", "run");
g.provide("subrun_id_provider", provide_index, concurrency::unlimited)
.output_product(product_query{.creator = "dummy", .layer = "subrun", .suffix = "id"});
.output_product("dummy", "id", "subrun");
g.provide("event_id_provider", provide_index, concurrency::unlimited)
.output_product(product_query{.creator = "dummy", .layer = "event", .suffix = "id"});
.output_product("dummy", "id", "event");

g.observe("se", check_two_ids)
.input_family(product_query{.creator = "dummy", .layer = "subrun", .suffix = "id"},
Expand Down
2 changes: 1 addition & 1 deletion test/benchmarks/benchmarks_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ PHLEX_REGISTER_PROVIDERS(s)
{
using namespace phlex;
s.provide("provide_id", [](data_cell_index const& id) { return id; })
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "id"});
.output_product("input", "id", "event");
}
6 changes: 3 additions & 3 deletions test/cached_execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ TEST_CASE("Cached function calls", "[data model]")

// Register providers
g.provide("provide_number", provide_number, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "run", .suffix = "number"});
.output_product("input", "number", "run");
g.provide("provide_another", provide_another, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "subrun", .suffix = "another"});
.output_product("input", "another", "subrun");
g.provide("provide_still", provide_still, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "still"});
.output_product("input", "still", "event");

g.transform("A1", call_one, concurrency::unlimited)
.input_family(product_query{.creator = "input", .layer = "run", .suffix = "number"})
Expand Down
6 changes: 3 additions & 3 deletions test/class_registration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ TEST_CASE("Call non-framework functions", "[programming model]")

// Register providers for the input products
g.provide("provide_number", provide_number, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "job", .suffix = "number"});
.output_product("input", "number", "job");
g.provide("provide_temperature", provide_temperature, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "job", .suffix = "temperature"});
.output_product("input", "temperature", "job");
g.provide("provide_name", provide_name, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "job", .suffix = "name"});
.output_product("input", "name", "job");

auto glueball = g.make<A>();
SECTION("No framework")
Expand Down
2 changes: 1 addition & 1 deletion test/demo-giantdata/unfold_transform_fold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TEST_CASE("Unfold-transform-fold pipeline", "[concurrency][unfold][fold]")
spill_index.parent()->number(),
spill_index.number());
})
.output_product(product_query{.creator = "input", .layer = "spill", .suffix = "wgen"});
.output_product("input", "wgen", "spill");

g.unfold<demo::WaveformGenerator>(
"WaveformGenerator",
Expand Down
2 changes: 1 addition & 1 deletion test/different_hierarchies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ TEST_CASE("Different hierarchies used with fold", "[graph]")

// Register provider
g.provide("provide_number", provide_number, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "number"});
.output_product("input", "number", "event");

g.fold("run_add", add, concurrency::unlimited, "run", 0u)
.input_family(product_query{.creator = "input", .layer = "event", .suffix = "number"})
Expand Down
12 changes: 6 additions & 6 deletions test/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ TEST_CASE("Two predicates", "[filtering]")
gen.add_layer("event", {"job", 10, 1});
experimental::framework_graph g{driver_for_test(gen)};
g.provide("provide_num", give_me_nums, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "num"});
.output_product("input", "num", "event");
g.predicate("evens_only", evens_only, concurrency::unlimited)
.input_family(product_query{.creator = "input", .layer = "event", .suffix = "num"});
g.predicate("odds_only", odds_only, concurrency::unlimited)
Expand All @@ -132,7 +132,7 @@ TEST_CASE("Two predicates in series", "[filtering]")
gen.add_layer("event", {"job", 10, 1});
experimental::framework_graph g{driver_for_test(gen)};
g.provide("provide_num", give_me_nums, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "num"});
.output_product("input", "num", "event");
g.predicate("evens_only", evens_only, concurrency::unlimited)
.input_family(product_query{.creator = "input", .layer = "event", .suffix = "num"});
g.predicate("odds_only", odds_only, concurrency::unlimited)
Expand All @@ -154,7 +154,7 @@ TEST_CASE("Two predicates in parallel", "[filtering]")
gen.add_layer("event", {"job", 10, 1});
experimental::framework_graph g{driver_for_test(gen)};
g.provide("provide_num", give_me_nums, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "num"});
.output_product("input", "num", "event");
g.predicate("evens_only", evens_only, concurrency::unlimited)
.input_family(product_query{.creator = "input", .layer = "event", .suffix = "num"});
g.predicate("odds_only", odds_only, concurrency::unlimited)
Expand Down Expand Up @@ -184,7 +184,7 @@ TEST_CASE("Three predicates in parallel", "[filtering]")
gen.add_layer("event", {"job", 10, 1});
experimental::framework_graph g{driver_for_test(gen)};
g.provide("provide_num", give_me_nums, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "num"});
.output_product("input", "num", "event");
for (auto const& [name, b, e] : configs) {
g.make<not_in_range>(b, e)
.predicate(name, &not_in_range::eval, concurrency::unlimited)
Expand All @@ -210,9 +210,9 @@ TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filteri
gen.add_layer("event", {"job", 10, 1});
experimental::framework_graph g{driver_for_test(gen)};
g.provide("provide_num", give_me_nums, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "num"});
.output_product("input", "num", "event");
g.provide("provide_other_num", give_me_other_nums, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "other_num"});
.output_product("input", "other_num", "event");
g.predicate("evens_only", evens_only, concurrency::unlimited)
.input_family(product_query{.creator = "input", .layer = "event", .suffix = "num"});
g.predicate("odds_only", odds_only, concurrency::unlimited)
Expand Down
2 changes: 1 addition & 1 deletion test/fold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST_CASE("Different data layers of fold", "[graph]")
experimental::framework_graph g{driver_for_test(gen)};

g.provide("provide_number", provide_number, concurrency::unlimited)
.output_product(product_query{.creator = "input", .layer = "event", .suffix = "number"});
.output_product("input", "number", "event");

g.fold("run_add", add, concurrency::unlimited, "run")
.input_family(product_query{.creator = "input", .layer = "event", .suffix = "number"})
Expand Down
Loading
Loading