Skip to content

Commit

Permalink
i#6822 unscheduled: Add start-unscheduled support (#6851)
Browse files Browse the repository at this point in the history
Adds support for threads starting out in an "unscheduled" state. This is
accomplished by always reading ahead in each input and looking for a
TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE marker *before* the first
instruction. Normally such a marker indicates the invocation of a system
call and is after the system call instruction; for start-unscheduled
threads it is present at the system call exit at the start of the trace.

Changes the scheduler's virtual method process_next_initial_record() to
make the booleans on finding certain markers input-and-output parameters
and moves filetype marker handling and timestamp recording into the
function. This also fixes a problem where an input's initial
next_timestamp was replaced with the 2nd timestamp if a subclass read
ahead.

The extra readahead causes complexities elsewhere which are addressed:
+ The reader caches the last cpuid to use for synthetic recores on
skipping.
+ Generalizes the existing scheduler handling of readahead (the
"recorded_in_schedule" field in input_info_t) to store a count of
pre-read instructions, which will generally be either 0 or 1. Adds a new
internal interface get_instr_ordinal() to get the input reader's
instruction ordinal minus the pre-read count.

Changes raw2trace's virtual function process_marker_additionally() to
process_marker() and moves all marker processing (including timestamps,
which are not markers in the raw format) there, to better support
subclasses inserting start-unscheduled markers and deciding whether to
insert new markers either before or after pre-existing markers.

Adds a scheduler test for the new feature.

Issue: #6822
  • Loading branch information
derekbruening authored Jun 22, 2024
1 parent 44d677f commit 304faf9
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 92 deletions.
9 changes: 8 additions & 1 deletion clients/drcachesim/reader/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ reader_t::process_input_entry()
if (first_timestamp_ == 0)
first_timestamp_ = last_timestamp_;
}
} else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_VERSION)
} else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID)
last_cpuid_ = cur_ref_.marker.marker_value;
else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_VERSION)
version_ = cur_ref_.marker.marker_value;
else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_FILETYPE) {
filetype_ = cur_ref_.marker.marker_value;
Expand Down Expand Up @@ -464,6 +466,11 @@ reader_t::skip_instructions_with_timestamp(uint64_t stop_instruction_count)
timestamp.addr = static_cast<addr_t>(last_timestamp_);
}
trace_entry_t cpu = {};
if (last_cpuid_ != 0) {
cpu.type = TRACE_TYPE_MARKER;
cpu.size = TRACE_MARKER_TYPE_CPU_ID;
cpu.addr = static_cast<addr_t>(last_cpuid_);
}
trace_entry_t next_instr = {};
bool prev_was_record_ord = false;
bool found_real_timestamp = false;
Expand Down
1 change: 1 addition & 0 deletions clients/drcachesim/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class reader_t : public std::iterator<std::input_iterator_tag, memref_t>,
uint64_t last_timestamp_ = 0;
uint64_t first_timestamp_ = 0;
trace_entry_t *input_entry_ = nullptr;
uint64_t last_cpuid_ = 0;
// Remember top-level headers for the memtrace_stream_t interface.
uint64_t version_ = 0;
uint64_t filetype_ = 0;
Expand Down
172 changes: 111 additions & 61 deletions clients/drcachesim/scheduler/scheduler.cpp

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool at_eof = false;
uintptr_t next_timestamp = 0;
uint64_t instrs_in_quantum = 0;
bool recorded_in_schedule = false;
int instrs_pre_read = 0;
// This is a per-workload value, stored in each input for convenience.
uint64_t base_timestamp = 0;
// This equals 'options_.deps == DEPENDENCY_TIMESTAMPS', stored here for
Expand Down Expand Up @@ -1464,7 +1464,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// the two bool parameters are what the return value should be based on.
virtual bool
process_next_initial_record(input_info_t &input, RecordType record,
bool found_filetype, bool found_timestamp);
bool &found_filetype, bool &found_timestamp);

// Opens up all the readers for each file in 'path' which may be a directory.
// Returns a map of the thread id of each file to its index in inputs_.
Expand Down Expand Up @@ -1612,6 +1612,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool
record_type_is_marker(RecordType record, trace_marker_type_t &type, uintptr_t &value);

// Returns false for memref_t; for trace_entry_t returns true for the _HEADER,
// _THREAD, and _PID record types.
bool
record_type_is_non_marker_header(RecordType record);

// If the given record is a timestamp, returns true and its fields.
bool
record_type_is_timestamp(RecordType record, uintptr_t &value);
Expand Down Expand Up @@ -1691,6 +1696,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
uint64_t
get_input_record_ordinal(output_ordinal_t output);

// Returns the input instruction ordinal taking into account queued records.
// The caller must hold the input's lock.
uint64_t
get_instr_ordinal(input_info_t &input);

// Returns the first timestamp for the current input stream interface for the
// 'output_ordinal'-th output stream.
uint64_t
Expand Down
127 changes: 123 additions & 4 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3512,10 +3512,11 @@ test_replay_as_traced_from_file(const char *testdir)
static const char *const SCHED_STRING =
"Core #0: 1257602 \nCore #1: 1257600 \n"
"Core #2: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
// The ordinal is really 1 ("<1,0,0>") but with the scheduler's readahead
// it becomes 2; easier to just check for that as trying to avoid readahead
// causes other problems (i#xxxx).
"(<366986,87875,13331862029895453> => <2,0,0>) \n"
// The ordinal is really 1 ("<1,0,0>") but with the scheduler's readahead we have
// more records and a timestamp; easier to just check for that as trying to avoid
// readahead causes other problems. Maybe the scheduler should have mediator
// getters for the record and timestamp that hide the readahead.
"(<366986,87875,13331862029895453> => <6,0,13331862029900572>) \n"
"Core #3: 1257596 \nCore #4: 1257603 \nCore #5: 1257601 \nCore #6: 1257598 \n";
static constexpr int NUM_OUTPUTS = 7; // Matches the actual trace's core footprint.
scheduler_t scheduler;
Expand Down Expand Up @@ -4423,6 +4424,123 @@ test_unscheduled_fallback()
}
}

static void
test_unscheduled_initially()
{
std::cerr << "\n----------------\nTesting initially-unscheduled threads\n";
static constexpr int NUM_OUTPUTS = 1;
static constexpr int BLOCK_LATENCY = 100;
static constexpr double BLOCK_SCALE = 1. / (BLOCK_LATENCY);
static constexpr uint64_t BLOCK_TIME_MAX = 500;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr memref_tid_t TID_A = TID_BASE + 0;
static constexpr memref_tid_t TID_B = TID_BASE + 1;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
make_version(TRACE_ENTRY_VERSION),
// A has the earliest timestamp and would start.
make_timestamp(1001),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
// A starts out unscheduled though.
make_marker(TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE, 0),
make_timestamp(4202),
// B makes us scheduled again.
make_instr(/*pc=*/102),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B),
make_pid(1),
make_version(TRACE_ENTRY_VERSION),
// B runs 2nd by timestamp.
make_timestamp(3001),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/200),
make_timestamp(3002),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
// B makes a long-latency blocking syscall, testing whether
// A is really unscheduled.
make_marker(TRACE_MARKER_TYPE_SYSCALL, 999),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_timestamp(7002),
make_instr(/*pc=*/201),
// B makes A no longer unscheduled.
make_marker(TRACE_MARKER_TYPE_SYSCALL, 999),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_SYSCALL_SCHEDULE, TID_A),
make_timestamp(7021),
make_instr(/*pc=*/202),
make_exit(TID_B),
};
{
// Test the defaults with direct switches enabled.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
// We have an idle period while B is blocked and A unscheduled.
static const char *const CORE0_SCHED_STRING =
"...B.....________________________________________B....B......A.";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
sched_ops.block_time_scale = BLOCK_SCALE;
sched_ops.block_time_max = BLOCK_TIME_MAX;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
}
{
// Test disabling direct switches.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
// A runs first as it being unscheduled is ignored.
static const char *const CORE0_SCHED_STRING =
".....A....B.....________________________________________B....B.";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
// We use our mock's time==instruction count for a deterministic result.
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
sched_ops.block_time_scale = BLOCK_SCALE;
sched_ops.block_time_max = BLOCK_TIME_MAX;
sched_ops.honor_direct_switches = false;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
}
}

static void
test_kernel_switch_sequences()
{
Expand Down Expand Up @@ -4935,6 +5053,7 @@ test_main(int argc, const char *argv[])
test_direct_switch();
test_unscheduled();
test_unscheduled_fallback();
test_unscheduled_initially();
test_kernel_switch_sequences();
test_random_schedule();
test_record_scheduler();
Expand Down
45 changes: 27 additions & 18 deletions clients/drcachesim/tracer/raw2trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,7 @@ raw2trace_t::process_offline_entry(raw2trace_thread_data_t *tdata,
return false;
trace_marker_type_t marker_type =
static_cast<trace_marker_type_t>(in_entry->extended.valueB);
buf += trace_metadata_writer_t::write_marker(buf, marker_type, marker_val);
if (!process_marker_additionally(tdata, marker_type, marker_val, buf,
flush_decode_cache))
if (!process_marker(tdata, marker_type, marker_val, buf, flush_decode_cache))
return false;
// If there is currently a delayed branch that has not been emitted yet,
// delay most markers since intra-block markers can cause issues with
Expand Down Expand Up @@ -836,11 +834,21 @@ raw2trace_t::process_offline_entry(raw2trace_thread_data_t *tdata,
}

bool
raw2trace_t::process_marker_additionally(raw2trace_thread_data_t *tdata,
trace_marker_type_t marker_type,
uintptr_t marker_val, byte *&buf,
DR_PARAM_OUT bool *flush_decode_cache)
{
raw2trace_t::process_marker(raw2trace_thread_data_t *tdata,
trace_marker_type_t marker_type, uintptr_t marker_val,
byte *&buf, DR_PARAM_OUT bool *flush_decode_cache)
{
if (marker_type == TRACE_MARKER_TYPE_TIMESTAMP) {
uint64 stamp = static_cast<uint64>(marker_val);
VPRINT(2, "Thread %u timestamp 0x" ZHEX64_FORMAT_STRING "\n",
static_cast<uint>(tdata->tid), stamp);
accumulate_to_statistic(tdata, RAW2TRACE_STAT_EARLIEST_TRACE_TIMESTAMP, stamp);
accumulate_to_statistic(tdata, RAW2TRACE_STAT_LATEST_TRACE_TIMESTAMP, stamp);
tdata->last_timestamp_ = stamp;
buf += trace_metadata_writer_t::write_timestamp(buf, marker_val);
return true;
}
buf += trace_metadata_writer_t::write_marker(buf, marker_type, marker_val);
if (marker_type == TRACE_MARKER_TYPE_KERNEL_EVENT) {
log(4, "Signal/exception between bbs\n");
// An rseq side exit may next hit a signal which is then the
Expand Down Expand Up @@ -1293,16 +1301,17 @@ raw2trace_t::process_next_thread_buffer(raw2trace_thread_data_t *tdata,
// when it calls get_next_entry() on its own.
offline_entry_t entry = *in_entry;
if (entry.timestamp.type == OFFLINE_TYPE_TIMESTAMP) {
VPRINT(2, "Thread %u timestamp 0x" ZHEX64_FORMAT_STRING "\n",
(uint)tdata->tid, (uint64)entry.timestamp.usec);
accumulate_to_statistic(tdata, RAW2TRACE_STAT_EARLIEST_TRACE_TIMESTAMP,
static_cast<uint64>(entry.timestamp.usec));
accumulate_to_statistic(tdata, RAW2TRACE_STAT_LATEST_TRACE_TIMESTAMP,
static_cast<uint64>(entry.timestamp.usec));
byte *buf = buf_base +
trace_metadata_writer_t::write_timestamp(buf_base,
(uintptr_t)entry.timestamp.usec);
tdata->last_timestamp_ = entry.timestamp.usec;
// Give subclasses a chance for further action on a timestamp by
// putting our processing as thought it were a marker at the raw level.
bool flush_decode_cache = false;
byte *buf = buf_base;
uintptr_t value = static_cast<uintptr_t>(entry.timestamp.usec);
if (!process_marker(tdata, TRACE_MARKER_TYPE_TIMESTAMP, value, buf,
&flush_decode_cache)) {
return false;
}
if (flush_decode_cache)
decode_cache_[tdata->worker].clear();
if ((uint)(buf - buf_base) >= WRITE_BUFFER_SIZE) {
tdata->error = "Too many entries";
return false;
Expand Down
12 changes: 6 additions & 6 deletions clients/drcachesim/tracer/raw2trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -1101,14 +1101,14 @@ class raw2trace_t {
DR_PARAM_OUT bool *flush_decode_cache);

/**
* Performs any additional actions for the marker "marker_type" with value
* "marker_val", beyond writing out a marker record. New records can be written to
* "buf". Returns whether successful.
* Performs processing actions for the marker "marker_type" with value
* "marker_val", including writing out a marker record. Further records can also
* be written to "buf". Returns whether successful.
*/
virtual bool
process_marker_additionally(raw2trace_thread_data_t *tdata,
trace_marker_type_t marker_type, uintptr_t marker_val,
byte *&buf, DR_PARAM_OUT bool *flush_decode_cache);
process_marker(raw2trace_thread_data_t *tdata, trace_marker_type_t marker_type,
uintptr_t marker_val, byte *&buf,
DR_PARAM_OUT bool *flush_decode_cache);
/**
* Read the header of a thread, by calling get_next_entry() successively to
* populate the header values. The timestamp field is populated only
Expand Down

0 comments on commit 304faf9

Please sign in to comment.