diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index e9494f44a..c757d8f9f 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -642,6 +642,8 @@ scheduler_tmpl_t::set_initial_schedule( std::unordered_map> &workload2inputs) { if (options_.mapping == MAP_AS_PREVIOUSLY) { + live_replay_output_count_.store(static_cast(outputs_.size()), + std::memory_order_release); if (options_.schedule_replay_istream == nullptr || options_.schedule_record_ostream != nullptr) return STATUS_ERROR_INVALID_PARAMETER; @@ -1646,8 +1648,13 @@ scheduler_tmpl_t::pick_next_input_as_previously( output_ordinal_t output, input_ordinal_t &index) { if (outputs_[output].record_index + 1 >= - static_cast(outputs_[output].record.size())) + static_cast(outputs_[output].record.size())) { + if (!outputs_[output].at_eof) { + outputs_[output].at_eof = true; + live_replay_output_count_.fetch_add(-1, std::memory_order_release); + } return eof_or_idle(output); + } const schedule_record_t &segment = outputs_[output].record[outputs_[output].record_index + 1]; index = segment.key.input; @@ -1895,14 +1902,10 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu if (inputs_[index].at_eof || *inputs_[index].reader == *inputs_[index].reader_end) { VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index); - if (options_.schedule_record_ostream != nullptr && - prev_index != INVALID_INPUT_ORDINAL) - close_schedule_segment(output, inputs_[prev_index]); if (!inputs_[index].at_eof) mark_input_eof(inputs_[index]); index = INVALID_INPUT_ORDINAL; // Loop and pick next thread. - prev_index = INVALID_INPUT_ORDINAL; continue; } break; @@ -2276,9 +2279,13 @@ template void scheduler_tmpl_t::mark_input_eof(input_info_t &input) { + if (input.at_eof) + return; input.at_eof = true; assert(live_input_count_.load(std::memory_order_acquire) > 0); live_input_count_.fetch_add(-1, std::memory_order_release); + VPRINT(this, 2, "input %d at eof; %d live inputs left\n", input.index, + live_input_count_.load(std::memory_order_acquire)); } template @@ -2286,7 +2293,13 @@ typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::eof_or_idle(output_ordinal_t output) { if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT || - live_input_count_.load(std::memory_order_acquire) == 0) { + live_input_count_.load(std::memory_order_acquire) == 0 || + // While a full schedule recorded should have each input hit either its + // EOF or ROI end, we have a fallback to avoid hangs for possible recorded + // schedules that end an input early deliberately without an ROI. + (options_.mapping == MAP_AS_PREVIOUSLY && + live_replay_output_count_.load(std::memory_order_acquire) == 0)) { + assert(options_.mapping != MAP_AS_PREVIOUSLY || outputs_[output].at_eof); return sched_type_t::STATUS_EOF; } else { outputs_[output].waiting = true; diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index bf5d18089..edbb1ab99 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -1094,6 +1094,8 @@ template class scheduler_tmpl_t { uint64_t cur_time = 0; // Used for MAP_TO_RECORDED_OUTPUT get_output_cpuid(). int64_t as_traced_cpuid = -1; + // Used for MAP_AS_PREVIOUSLY with live_replay_output_count_. + bool at_eof = false; }; // Called just once at initialization time to set the initial input-to-output @@ -1346,6 +1348,8 @@ template class scheduler_tmpl_t { uint64_t ready_counter_ = 0; // Count of inputs not yet at eof. std::atomic live_input_count_; + // In replay mode, count of outputs not yet at the end of the replay sequence. + std::atomic live_replay_output_count_; // Map from workload,tid pair to input. struct workload_tid_t { workload_tid_t(int wl, memref_tid_t tid) diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index 2e111ccd2..fbf431a72 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -2328,6 +2328,118 @@ test_replay_timestamps() #endif // HAS_ZIP } +#ifdef HAS_ZIP +// We subclass scheduler_t to access its record struct and functions. +class test_noeof_scheduler_t : public scheduler_t { +public: + void + write_test_schedule(std::string record_fname) + { + // We duplicate test_scheduler_t but we have one input ending early before + // eof. + scheduler_t scheduler; + std::vector sched0; + sched0.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0); + sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 0, 4, 11); + // There is a huge time gap here. + // Max numeric value means continue until EOF. + sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 7, + 0xffffffffffffffffUL, 91); + sched0.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0); + std::vector sched1; + sched1.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 0, 4, 10); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 0, 4, 20); + // Input 2 advances early so core 0 is no longer waiting on it but only + // the timestamp. + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 4, 7, 60); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 0, 4, 30); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 4, 7, 40); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 4, 7, 50); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 4, 7, 70); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 7, + 0xffffffffffffffffUL, 80); + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 7, + 0xffffffffffffffffUL, 90); + // Input 3 never reaches EOF (end is exclusive: so it stops at 8 with the + // real end at 9). + sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 7, 9, 110); + sched1.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0); + zipfile_ostream_t outfile(record_fname); + std::string err = outfile.open_new_component(recorded_schedule_component_name(0)); + assert(err.empty()); + if (!outfile.write(reinterpret_cast(sched0.data()), + sched0.size() * sizeof(sched0[0]))) + assert(false); + err = outfile.open_new_component(recorded_schedule_component_name(1)); + assert(err.empty()); + if (!outfile.write(reinterpret_cast(sched1.data()), + sched1.size() * sizeof(sched1[0]))) + assert(false); + } +}; +#endif + +static void +test_replay_noeof() +{ +#ifdef HAS_ZIP + std::cerr << "\n----------------\nTesting replay with no eof\n"; + static constexpr int NUM_INPUTS = 4; + static constexpr int NUM_OUTPUTS = 2; + static constexpr int NUM_INSTRS = 9; + static constexpr memref_tid_t TID_BASE = 100; + std::vector inputs[NUM_INPUTS]; + for (int i = 0; i < NUM_INPUTS; i++) { + memref_tid_t tid = TID_BASE + i; + inputs[i].push_back(make_thread(tid)); + inputs[i].push_back(make_pid(1)); + // We need a timestamp so the scheduler will find one for initial + // input processing. We do not try to duplicate the timestamp + // sequences in the stored file and just use a dummy timestamp here. + inputs[i].push_back(make_timestamp(10 + i)); + for (int j = 0; j < NUM_INSTRS; j++) + inputs[i].push_back(make_instr(42 + j * 4)); + inputs[i].push_back(make_exit(tid)); + } + + // Create a record file with timestamps requiring waiting. + // We cooperate with the test_noeof_scheduler_t class which constructs this schedule: + static const char *const CORE0_SCHED_STRING = ".AAA-------------------------CCC.__"; + static const char *const CORE1_SCHED_STRING = ".BBB.CCCCCC.DDDAAABBBDDDAAA.BBB.DD"; + std::string record_fname = "tmp_test_replay_noeof_timestamp.zip"; + test_noeof_scheduler_t test_scheduler; + test_scheduler.write_test_schedule(record_fname); + + // Replay the recorded schedule. + std::vector sched_inputs; + for (int i = 0; i < NUM_INPUTS; i++) { + memref_tid_t tid = TID_BASE + i; + std::vector readers; + readers.emplace_back(std::unique_ptr(new mock_reader_t(inputs[i])), + std::unique_ptr(new mock_reader_t()), tid); + sched_inputs.emplace_back(std::move(readers)); + } + scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY, + scheduler_t::DEPENDENCY_TIMESTAMPS, + scheduler_t::SCHEDULER_DEFAULTS, + /*verbosity=*/4); + zipfile_istream_t infile(record_fname); + sched_ops.schedule_replay_istream = &infile; + scheduler_t scheduler; + if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) != + scheduler_t::STATUS_SUCCESS) + assert(false); + std::vector sched_as_string = + run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE); + for (int i = 0; i < NUM_OUTPUTS; i++) { + std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; + } + assert(sched_as_string[0] == CORE0_SCHED_STRING); + assert(sched_as_string[1] == CORE1_SCHED_STRING); +#endif // HAS_ZIP +} + static void test_replay_skip() { @@ -3200,6 +3312,7 @@ test_main(int argc, const char *argv[]) test_replay(); test_replay_multi_threaded(argv[1]); test_replay_timestamps(); + test_replay_noeof(); test_replay_skip(); test_replay_limit(); test_replay_as_traced_from_file(argv[1]);