diff --git a/tests/AMSlib/CMakeLists.txt b/tests/AMSlib/CMakeLists.txt index 7de043f..d9f47ab 100644 --- a/tests/AMSlib/CMakeLists.txt +++ b/tests/AMSlib/CMakeLists.txt @@ -198,6 +198,12 @@ endfunction() BUILD_TEST(ams_packing_test cpu_packing_test.cpp AMSPack) ADDTEST(ams_packing_test AMSPack) +# AMS Database benchmark (RMQ and/or HDF5 + MPI / No ML models used) +BUILD_TEST(ams_benchmark_db ams_bench_db.cpp) +# The AMS DB Benchmark requires mfem +# TODO: Remove mfem requirement from the benchmark +target_link_libraries(ams_benchmark_db PRIVATE AMS ${AMS_EXAMPLE_LIBRARIES}) + if(WITH_TORCH) BUILD_TEST(ams_inference_test torch_model.cpp) ADDTEST(ams_inference_test AMSInferDouble ${CMAKE_CURRENT_SOURCE_DIR}/debug_model.pt "double") diff --git a/tests/AMSlib/ams_bench_db.cpp b/tests/AMSlib/ams_bench_db.cpp new file mode 100644 index 0000000..20eb4a1 --- /dev/null +++ b/tests/AMSlib/ams_bench_db.cpp @@ -0,0 +1,333 @@ +#ifdef __AMS_ENABLE_MPI__ +#include +#endif + +#ifdef __AMS_ENABLE_CALIPER__ +#include +#endif + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "AMS.h" + +void createUmpirePool(const std::string &parent_name, + const std::string &pool_name) +{ + auto &rm = umpire::ResourceManager::getInstance(); + auto alloc_resource = rm.makeAllocator( + pool_name, rm.getAllocator(parent_name)); +} + +AMSDType getDataType(const char *d_type) +{ + AMSDType dType = AMSDType::AMS_DOUBLE; + if (std::strcmp(d_type, "float") == 0) { + dType = AMSDType::AMS_SINGLE; + } else if (d_type == "double") { + dType = AMSDType::AMS_DOUBLE; + } else { + assert(false && "Unknown data type (must be 'float' or 'double')"); + } + return dType; +} + +AMSDBType getDBType(const char *db_type) +{ + AMSDBType dbType = AMSDBType::AMS_NONE; + if (std::strcmp(db_type, "csv") == 0) { + dbType = AMSDBType::AMS_CSV; + } else if (std::strcmp(db_type, "hdf5") == 0) { + dbType = AMSDBType::AMS_HDF5; + } else if (std::strcmp(db_type, "rmq") == 0) { + dbType = AMSDBType::AMS_RMQ; + } + return dbType; +} + +template +struct Problem { + int num_inputs; + int num_outputs; + int sleep_msec; // in milliseconds + Problem(int ni, int no, int sleep_msec = 0) + : num_inputs(ni), num_outputs(no), sleep_msec(sleep_msec) + { + } + + void run(long num_elements, DType **inputs, DType **outputs) + { + for (int i = 0; i < num_elements; i++) { + DType sum = 0; + for (int j = 0; j < num_inputs; j++) { + sum += inputs[j][i]; + } + + for (int j = 0; j < num_outputs; j++) { + outputs[j][i] = sum; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_msec)); + } + + const DType *initialize_inputs(DType *inputs, long length) + { + for (int i = 0; i < length; i++) { + inputs[i] = static_cast(i); + } + return inputs; + } + + void ams_run(AMSExecutor &wf, + AMSResourceType resource, + int iterations, + int num_elements) + { + CALIPER(CALI_CXX_MARK_FUNCTION;) + auto &rm = umpire::ResourceManager::getInstance(); + + CALIPER(CALI_CXX_MARK_LOOP_BEGIN(mainloop_id, "mainloop");) + + for (int i = 0; i < iterations; i++) { + std::cout << "Iteration [" << i << "]\n"; + CALIPER(CALI_CXX_MARK_LOOP_ITERATION(mainloop_id, i);) + int elements = num_elements; // * ((DType)(rand()) / RAND_MAX) + 1; + std::vector inputs; + std::vector outputs; + + // Allocate Input memory + for (int j = 0; j < num_inputs; j++) { + DType *data = new DType[elements]; + inputs.push_back(initialize_inputs(data, elements)); + } + + // Allocate Output memory + for (int j = 0; j < num_outputs; j++) { + outputs.push_back(new DType[elements]); + } + + AMSExecute(wf, + (void *)this, + elements, + reinterpret_cast(inputs.data()), + reinterpret_cast(outputs.data()), + inputs.size(), + outputs.size()); + + for (int i = 0; i < num_outputs; i++) { + delete[] outputs[i]; + outputs[i] = nullptr; + } + for (int i = 0; i < num_inputs; i++) { + delete[] inputs[i]; + inputs[i] = nullptr; + } + } + CALIPER(CALI_CXX_MARK_LOOP_END(mainloop_id);) + } +}; + +void callBackDouble(void *cls, long elements, void **inputs, void **outputs) +{ + std::cout << " > Called the double precision model\n"; + static_cast *>(cls)->run(elements, + (double **)(inputs), + (double **)(outputs)); +} + +void callBackSingle(void *cls, long elements, void **inputs, void **outputs) +{ + std::cout << " > Called the single precision model\n"; + static_cast *>(cls)->run(elements, + (float **)(inputs), + (float **)(outputs)); +} + +int main(int argc, char **argv) +{ + // Number of ranks in this run + int wS = 1; + // My Local Id + int rId = 0; + // Level of Threading provided by MPI + int provided = 0; + + MPI_CALL(MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided)); + MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &wS)); + MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rId)); + + // Deactivate output on cout for all ranks but 0 + if (rId != 0) { + std::cout.setstate(std::ios::failbit); + } + + const char *device_name = "cpu"; + const char *db_config = ""; + const char *db_type = ""; + const char *precision_opt = "double"; + + int seed = 0; + int num_elems = 1024; + int num_inputs = 8; + int num_outputs = 9; + int num_iterations = 1; + int sleep_msec = 0; + bool verbose = false; + + // ------------------------------------------------------------------------- + // setup command line parser + // ------------------------------------------------------------------------- + mfem::OptionsParser args(argc, argv); + args.AddOption(&device_name, + "-d", + "--device", + "Device config string (cpu or cuda)"); + + // set precision + args.AddOption(&precision_opt, + "-pr", + "--precision", + "Set precision (single or double)"); + + // Sleeping time + args.AddOption(&sleep_msec, + "-ms", + "--msleep", + "Sleep for x milliseconds for each iteration"); + + // data parameters + args.AddOption(&num_elems, + "-e", + "--num-elems", + "Number of elements per iteration"); + + args.AddOption(&num_inputs, "-di", "--dim-inputs", "Dimension of inputs"); + args.AddOption(&num_outputs, "-do", "--dim-outputs", "Dimension of outputs"); + args.AddOption(&num_iterations, "-i", "--num-iter", "Number of iterations"); + + // random speed and packing + args.AddOption(&seed, "-s", "--seed", "Seed for rand (default 0)"); + + args.AddOption(&db_type, + "-dt", + "--dbtype", + "Configuration option of the different DB types:\n" + "\t 'csv': use CSV as a back end\n" + "\t 'hdf5': use HDF5 as a back end\n" + "\t 'rmq': use RabbitMQ as a back end\n"); + + args.AddOption(&verbose, + "-v", + "--verbose", + "-qu", + "--quiet", + "Enable more verbose benchmark"); + + // ------------------------------------------------------------------------- + // parse arguments + // ------------------------------------------------------------------------- + args.Parse(); + if (!args.Good()) { + args.PrintUsage(std::cout); + return -1; + } + + if (rId == 0) { + args.PrintOptions(std::cout); + std::cout << std::endl; + } + + srand(seed + rId); + + AMSDType data_type = getDataType(precision_opt); + AMSDBType dbType = getDBType(db_type); + + if (dbType == AMSDBType::AMS_NONE) { + std::cerr << "Error: no DB backend specified with --dbtype\n"; + return -1; + } + + const char *object_descr = std::getenv("AMS_OBJECTS"); + if (dbType == AMSDBType::AMS_RMQ && !object_descr) { + std::cerr << "Error: RabbitMQ backend required to set env variable " + "AMS_OBJECTS\n"; + return -1; + } + + // ------------------------------------------------------------------------- + // AMS allocators setup + // ------------------------------------------------------------------------- + AMSResourceType resource = AMSResourceType::AMS_HOST; + const bool use_device = std::strcmp(device_name, "cpu") != 0; + if (use_device) { +#ifdef __ENABLE_CUDA__ + resource = AMSResourceType::AMS_DEVICE; +#else + std::cerr << "Error: Benchmark has not been compiled with CUDA support\n"; + return -1; +#endif + } + + AMSCAbstrModel ams_model = AMSRegisterAbstractModel("bench_db_no_model", + AMSUQPolicy::AMS_RANDOM, + 0.5, + "", + "", + "bench_db_no_model", + 1); + + std::cout << "Total elements across all " << wS + << " ranks: " << wS * num_elems << "\n"; + std::cout << "Total elements per rank: " << num_elems << "\n"; + + if (data_type == AMSDType::AMS_SINGLE) { + Problem prob(num_inputs, num_outputs, sleep_msec); +#ifdef __ENABLE_MPI__ + AMSExecutor wf = AMSCreateDistributedExecutor(ams_model, + AMSDType::AMS_SINGLE, + resource, + (AMSPhysicFn)callBackSingle, + MPI_COMM_WORLD, + rId, + wS); +#else + AMSExecutor wf = AMSCreateExecutor(ams_model, + AMSDType::AMS_SINGLE, + resource, + (AMSPhysicFn)callBackSingle, + rId, + wS); +#endif + prob.ams_run(wf, resource, num_iterations, num_elems); + } else { + Problem prob(num_inputs, num_outputs, sleep_msec); +#ifdef __ENABLE_MPI__ + AMSExecutor wf = AMSCreateDistributedExecutor(ams_model, + AMSDType::AMS_DOUBLE, + resource, + (AMSPhysicFn)callBackDouble, + MPI_COMM_WORLD, + rId, + wS); +#else + AMSExecutor wf = AMSCreateExecutor(ams_model, + AMSDType::AMS_DOUBLE, + resource, + (AMSPhysicFn)callBackDouble, + rId, + wS); +#endif + prob.ams_run(wf, resource, num_iterations, num_elems); + } + + MPI_CALL(MPI_Finalize()); + return 0; +}