@@ -631,6 +631,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
631631 }
632632 }
633633 }
634+ VPRINT (this , 1 , " %zu inputs\n " , inputs_.size ());
635+ live_input_count_.store (static_cast <int >(inputs_.size ()), std::memory_order_release);
634636 return set_initial_schedule (workload2inputs);
635637}
636638
@@ -1313,7 +1315,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
13131315 input.cur_region );
13141316 if (input.cur_region >= static_cast <int >(input.regions_of_interest .size ())) {
13151317 if (input.at_eof )
1316- return sched_type_t ::STATUS_EOF ;
1318+ return eof_or_idle (output) ;
13171319 else {
13181320 // We let the user know we're done.
13191321 if (options_.schedule_record_ostream != nullptr ) {
@@ -1329,7 +1331,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
13291331 return status;
13301332 }
13311333 input.queue .push_back (create_thread_exit (input.tid ));
1332- input. at_eof = true ;
1334+ mark_input_eof ( input) ;
13331335 return sched_type_t ::STATUS_SKIPPED;
13341336 }
13351337 }
@@ -1408,7 +1410,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
14081410 if (*input.reader == *input.reader_end ) {
14091411 // Raise error because the input region is out of bounds.
14101412 VPRINT (this , 2 , " skip_instructions: input=%d skip out of bounds\n " , input.index );
1411- input. at_eof = true ;
1413+ mark_input_eof ( input) ;
14121414 return sched_type_t ::STATUS_REGION_INVALID;
14131415 }
14141416 input.in_cur_region = true ;
@@ -1645,7 +1647,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
16451647{
16461648 if (outputs_[output].record_index + 1 >=
16471649 static_cast <int >(outputs_[output].record .size ()))
1648- return sched_type_t ::STATUS_EOF ;
1650+ return eof_or_idle (output) ;
16491651 const schedule_record_t &segment =
16501652 outputs_[output].record [outputs_[output].record_index + 1 ];
16511653 index = segment.key .input ;
@@ -1681,6 +1683,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
16811683 // XXX i#5843: We may want to provide a kernel-mediated wait
16821684 // feature so a multi-threaded simulator doesn't have to do a
16831685 // spinning poll loop.
1686+ // XXX i#5843: For replaying a schedule as it was traced with
1687+ // MAP_TO_RECORDED_OUTPUT there may have been true idle periods during
1688+ // tracing where some other process than the traced workload was
1689+ // scheduled on a core. If we could identify those, we should return
1690+ // STATUS_IDLE rather than STATUS_WAIT.
16841691 VPRINT (this , 3 , " next_record[%d]: waiting for input %d instr #%" PRId64 " \n " ,
16851692 output, index, segment.start_instruction );
16861693 // Give up this input and go into a wait state.
@@ -1719,7 +1726,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
17191726 // queued candidate record, if any.
17201727 clear_input_queue (inputs_[index]);
17211728 inputs_[index].queue .push_back (create_thread_exit (inputs_[index].tid ));
1722- inputs_[index]. at_eof = true ;
1729+ mark_input_eof ( inputs_[index]) ;
17231730 VPRINT (this , 2 , " early end for input %d\n " , index);
17241731 // We're done with this entry but we need the queued record to be read,
17251732 // so we do not move past the entry.
@@ -1773,7 +1780,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
17731780 const schedule_record_t &segment =
17741781 outputs_[output].record [outputs_[output].record_index ];
17751782 int input = segment.key .input ;
1776- VPRINT (this , res == sched_type_t ::STATUS_WAIT ? 3 : 2 ,
1783+ VPRINT (this ,
1784+ (res == sched_type_t ::STATUS_IDLE ||
1785+ res == sched_type_t ::STATUS_WAIT)
1786+ ? 3
1787+ : 2 ,
17771788 " next_record[%d]: replay segment in=%d (@%" PRId64
17781789 " ) type=%d start=%" PRId64 " end=%" PRId64 " \n " ,
17791790 output, input,
@@ -1819,10 +1830,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
18191830 // We found a direct switch target above.
18201831 } else if (ready_queue_empty ()) {
18211832 if (prev_index == INVALID_INPUT_ORDINAL)
1822- return sched_type_t ::STATUS_EOF ;
1833+ return eof_or_idle (output) ;
18231834 std::lock_guard<std::mutex> lock (*inputs_[prev_index].lock );
18241835 if (inputs_[prev_index].at_eof )
1825- return sched_type_t ::STATUS_EOF ;
1836+ return eof_or_idle (output) ;
18261837 else
18271838 index = prev_index; // Go back to prior.
18281839 } else {
@@ -1836,7 +1847,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
18361847 }
18371848 input_info_t *queue_next = pop_from_ready_queue (output);
18381849 if (queue_next == nullptr )
1839- return sched_type_t ::STATUS_EOF ;
1850+ return eof_or_idle (output) ;
18401851 index = queue_next->index ;
18411852 }
18421853 } else if (options_.deps == DEPENDENCY_TIMESTAMPS) {
@@ -1850,7 +1861,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
18501861 }
18511862 }
18521863 if (index < 0 )
1853- return sched_type_t ::STATUS_EOF ;
1864+ return eof_or_idle (output) ;
18541865 VPRINT (this , 2 ,
18551866 " next_record[%d]: advancing to timestamp %" PRIu64
18561867 " == input #%d\n " ,
@@ -1883,14 +1894,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
18831894 std::lock_guard<std::mutex> lock (*inputs_[index].lock );
18841895 if (inputs_[index].at_eof ||
18851896 *inputs_[index].reader == *inputs_[index].reader_end ) {
1886- VPRINT (this , 2 , " next_record[%d]: local index %d == input #%d at eof\n " ,
1887- output, outputs_[output].input_indices_index , index);
1897+ VPRINT (this , 2 , " next_record[%d]: input #%d at eof\n " , output, index);
18881898 if (options_.schedule_record_ostream != nullptr &&
18891899 prev_index != INVALID_INPUT_ORDINAL)
18901900 close_schedule_segment (output, inputs_[prev_index]);
1891- inputs_[index].at_eof = true ;
1901+ if (!inputs_[index].at_eof )
1902+ mark_input_eof (inputs_[index]);
18921903 index = INVALID_INPUT_ORDINAL;
18931904 // Loop and pick next thread.
1905+ prev_index = INVALID_INPUT_ORDINAL;
18941906 continue ;
18951907 }
18961908 break ;
@@ -1911,7 +1923,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
19111923 // check for quantum end.
19121924 outputs_[output].cur_time = cur_time; // Invalid values are checked below.
19131925 if (!outputs_[output].active )
1914- return sched_type_t ::STATUS_WAIT ;
1926+ return sched_type_t ::STATUS_IDLE ;
19151927 if (outputs_[output].waiting ) {
19161928 VPRINT (this , 5 , " next_record[%d]: need new input (cur=waiting)\n " , output);
19171929 sched_type_t ::stream_status_t res = pick_next_input (output, true );
@@ -1922,7 +1934,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
19221934 if (outputs_[output].cur_input < 0 ) {
19231935 // This happens with more outputs than inputs. For non-empty outputs we
19241936 // require cur_input to be set to >=0 during init().
1925- return sched_type_t ::STATUS_EOF ;
1937+ return eof_or_idle (output) ;
19261938 }
19271939 input = &inputs_[outputs_[output].cur_input ];
19281940 auto lock = std::unique_lock<std::mutex>(*input->lock );
@@ -1970,6 +1982,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
19701982 input->needs_advance = true ;
19711983 }
19721984 if (input->at_eof || *input->reader == *input->reader_end ) {
1985+ if (!input->at_eof )
1986+ mark_input_eof (*input);
19731987 lock.unlock ();
19741988 VPRINT (this , 5 , " next_record[%d]: need new input (cur=%d eof)\n " , output,
19751989 input->index );
@@ -1998,6 +2012,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
19982012 if (outputs_[output].record_index >=
19992013 static_cast <int >(outputs_[output].record .size ())) {
20002014 // We're on the last record.
2015+ VPRINT (this , 4 , " next_record[%d]: on last record\n " , output);
20012016 } else if (outputs_[output].record [outputs_[output].record_index ].type ==
20022017 schedule_record_t ::SKIP) {
20032018 VPRINT (this , 5 , " next_record[%d]: need new input after skip\n " , output);
@@ -2257,6 +2272,28 @@ scheduler_tmpl_t<RecordType, ReaderType>::stop_speculation(output_ordinal_t outp
22572272 return sched_type_t ::STATUS_OK;
22582273}
22592274
2275+ template <typename RecordType, typename ReaderType>
2276+ void
2277+ scheduler_tmpl_t <RecordType, ReaderType>::mark_input_eof(input_info_t &input)
2278+ {
2279+ input.at_eof = true ;
2280+ assert (live_input_count_.load (std::memory_order_acquire) > 0 );
2281+ live_input_count_.fetch_add (-1 , std::memory_order_release);
2282+ }
2283+
2284+ template <typename RecordType, typename ReaderType>
2285+ typename scheduler_tmpl_t <RecordType, ReaderType>::stream_status_t
2286+ scheduler_tmpl_t <RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
2287+ {
2288+ if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT ||
2289+ live_input_count_.load (std::memory_order_acquire) == 0 ) {
2290+ return sched_type_t ::STATUS_EOF;
2291+ } else {
2292+ outputs_[output].waiting = true ;
2293+ return sched_type_t ::STATUS_IDLE;
2294+ }
2295+ }
2296+
22602297template <typename RecordType, typename ReaderType>
22612298typename scheduler_tmpl_t <RecordType, ReaderType>::stream_status_t
22622299scheduler_tmpl_t <RecordType, ReaderType>::set_output_active(output_ordinal_t output,
0 commit comments