Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
{
if (options_.mapping == MAP_AS_PREVIOUSLY) {
live_replay_output_count_.store(static_cast<int>(outputs_.size()),
std::memory_order_release);
if (options_.schedule_replay_istream == nullptr ||
options_.schedule_record_ostream != nullptr)
return STATUS_ERROR_INVALID_PARAMETER;
Expand Down Expand Up @@ -1646,8 +1648,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
output_ordinal_t output, input_ordinal_t &index)
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size()))
static_cast<int>(outputs_[output].record.size())) {
if (!outputs_[output].at_eof) {
outputs_[output].at_eof = true;
live_replay_output_count_.fetch_add(-1, std::memory_order_release);
}
return eof_or_idle(output);
}
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
index = segment.key.input;
Expand Down Expand Up @@ -1895,14 +1902,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (inputs_[index].at_eof ||
*inputs_[index].reader == *inputs_[index].reader_end) {
VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index);
if (options_.schedule_record_ostream != nullptr &&
prev_index != INVALID_INPUT_ORDINAL)
close_schedule_segment(output, inputs_[prev_index]);
if (!inputs_[index].at_eof)
mark_input_eof(inputs_[index]);
index = INVALID_INPUT_ORDINAL;
// Loop and pick next thread.
prev_index = INVALID_INPUT_ORDINAL;
continue;
}
break;
Expand Down Expand Up @@ -2276,17 +2279,27 @@ template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::mark_input_eof(input_info_t &input)
{
if (input.at_eof)
return;
input.at_eof = true;
assert(live_input_count_.load(std::memory_order_acquire) > 0);
live_input_count_.fetch_add(-1, std::memory_order_release);
VPRINT(this, 2, "input %d at eof; %d live inputs left\n", input.index,
live_input_count_.load(std::memory_order_acquire));
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
{
if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT ||
live_input_count_.load(std::memory_order_acquire) == 0) {
live_input_count_.load(std::memory_order_acquire) == 0 ||
// While a full schedule recorded should have each input hit either its
// EOF or ROI end, we have a fallback to avoid hangs for possible recorded
// schedules that end an input early deliberately without an ROI.
(options_.mapping == MAP_AS_PREVIOUSLY &&
live_replay_output_count_.load(std::memory_order_acquire) == 0)) {
assert(options_.mapping != MAP_AS_PREVIOUSLY || outputs_[output].at_eof);
return sched_type_t::STATUS_EOF;
} else {
outputs_[output].waiting = true;
Expand Down
4 changes: 4 additions & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
uint64_t cur_time = 0;
// Used for MAP_TO_RECORDED_OUTPUT get_output_cpuid().
int64_t as_traced_cpuid = -1;
// Used for MAP_AS_PREVIOUSLY with live_replay_output_count_.
bool at_eof = false;
};

// Called just once at initialization time to set the initial input-to-output
Expand Down Expand Up @@ -1346,6 +1348,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
uint64_t ready_counter_ = 0;
// Count of inputs not yet at eof.
std::atomic<int> live_input_count_;
// In replay mode, count of outputs not yet at the end of the replay sequence.
std::atomic<int> live_replay_output_count_;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
Expand Down
113 changes: 113 additions & 0 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,118 @@ test_replay_timestamps()
#endif // HAS_ZIP
}

#ifdef HAS_ZIP
// We subclass scheduler_t to access its record struct and functions.
class test_noeof_scheduler_t : public scheduler_t {
public:
void
write_test_schedule(std::string record_fname)
{
// We duplicate test_scheduler_t but we have one input ending early before
// eof.
scheduler_t scheduler;
std::vector<schedule_record_t> sched0;
sched0.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0);
sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 0, 4, 11);
// There is a huge time gap here.
// Max numeric value means continue until EOF.
sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 7,
0xffffffffffffffffUL, 91);
sched0.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0);
std::vector<schedule_record_t> sched1;
sched1.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 0, 4, 10);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 0, 4, 20);
// Input 2 advances early so core 0 is no longer waiting on it but only
// the timestamp.
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 4, 7, 60);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 0, 4, 30);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 4, 7, 40);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 4, 7, 50);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 4, 7, 70);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 7,
0xffffffffffffffffUL, 80);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 7,
0xffffffffffffffffUL, 90);
// Input 3 never reaches EOF (end is exclusive: so it stops at 8 with the
// real end at 9).
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 7, 9, 110);
sched1.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0);
zipfile_ostream_t outfile(record_fname);
std::string err = outfile.open_new_component(recorded_schedule_component_name(0));
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched0.data()),
sched0.size() * sizeof(sched0[0])))
assert(false);
err = outfile.open_new_component(recorded_schedule_component_name(1));
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched1.data()),
sched1.size() * sizeof(sched1[0])))
assert(false);
}
};
#endif

static void
test_replay_noeof()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay with no eof\n";
static constexpr int NUM_INPUTS = 4;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
// We need a timestamp so the scheduler will find one for initial
// input processing. We do not try to duplicate the timestamp
// sequences in the stored file and just use a dummy timestamp here.
inputs[i].push_back(make_timestamp(10 + i));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}

// Create a record file with timestamps requiring waiting.
// We cooperate with the test_noeof_scheduler_t class which constructs this schedule:
static const char *const CORE0_SCHED_STRING = ".AAA-------------------------CCC.__";
static const char *const CORE1_SCHED_STRING = ".BBB.CCCCCC.DDDAAABBBDDDAAA.BBB.DD";
std::string record_fname = "tmp_test_replay_noeof_timestamp.zip";
test_noeof_scheduler_t test_scheduler;
test_scheduler.write_test_schedule(record_fname);

// Replay the recorded schedule.
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/4);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
#endif // HAS_ZIP
}

static void
test_replay_skip()
{
Expand Down Expand Up @@ -3200,6 +3312,7 @@ test_main(int argc, const char *argv[])
test_replay();
test_replay_multi_threaded(argv[1]);
test_replay_timestamps();
test_replay_noeof();
test_replay_skip();
test_replay_limit();
test_replay_as_traced_from_file(argv[1]);
Expand Down