Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FW: Implement cpu jump during test execution #501

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
205 changes: 204 additions & 1 deletion framework/sandstone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

#include "cpu_features.h"
#include "forkfd.h"
#include "futex.h"

#include "sandstone.h"
#include "sandstone_p.h"
Expand All @@ -67,6 +68,7 @@
#include "sandstone_tests.h"
#include "sandstone_utils.h"
#include "topology.h"
#include "gettid.h"

#if SANDSTONE_SSL_BUILD
# include "sandstone_ssl.h"
Expand Down Expand Up @@ -155,6 +157,7 @@ enum {
service_option,
shortened_runtime_option,
strict_runtime_option,
synchronize_threads_option,
syslog_runtime_option,
temperature_threshold_option,
test_delay_option,
Expand Down Expand Up @@ -650,6 +653,174 @@ inline void test_the_test_data<true>::test_tests_finish(const struct test *the_t
#undef maybe_log_error
}

namespace {
class BarrierThreadSynchronization : public SandstoneThreadSynchronizationBase
{
public:
struct alignas(64) AlignedAtomicInt : std::atomic_int {
using std::atomic_int::atomic;
};

void init() noexcept override;
void synchronize() noexcept override { do_sync(); }
virtual void do_wait(std::atomic_int &, int) noexcept;
virtual void do_wake(std::atomic_int &) noexcept;
void do_sync() noexcept;
void do_cpu_jmp() noexcept;

std::array<AlignedAtomicInt, 2> phases;
AlignedAtomicInt phase_idx = 0;
std::array<AlignedAtomicInt, 32> threads_id; // Fix, don't use hardcoded number
};

void BarrierThreadSynchronization::init() noexcept
{
phases[0].store(num_cpus(), std::memory_order_relaxed);
phases[1].store(INT_MAX, std::memory_order_relaxed);
phase_idx.store(0, std::memory_order_relaxed);
}

void BarrierThreadSynchronization::do_sync() noexcept
{
int idx = phase_idx.load(std::memory_order_relaxed);
auto &phase = phases[idx];

// are we the last to arrive?
int remaining = phase.fetch_add(-1, std::memory_order_relaxed) - 1;
assert(remaining >= 0);
assert(remaining < num_cpus());
// store id
threads_id[thread_num].store(gettid());
if (remaining > 0) {
// not the last, so wait
return do_wait(phase, remaining);
}

// yes, set up the next phase
idx ^= 1;
phases[idx].store(num_cpus(), std::memory_order_relaxed);
phase_idx.store(idx, std::memory_order_relaxed);

// change logical processor execution
do_cpu_jmp();

do_wake(phase);
}

void BarrierThreadSynchronization::do_wait(std::atomic_int &phase, int) noexcept
{
while (phase.load(std::memory_order_relaxed) >= 0) {
// no pause
}
}

void BarrierThreadSynchronization::do_wake(std::atomic_int &phase) noexcept
{
// release the other threads
phase.store(-1, std::memory_order_relaxed);
}

void BarrierThreadSynchronization::do_cpu_jmp() noexcept
{

int n_cpus = num_cpus();
int cpu_0 = cpu_info[0].cpu_number;
for (int i=0; i<n_cpus; i++) {
int next_index = (i+1) % n_cpus;
int thread_id = threads_id[i].load();
int next_cpu = (next_index == 0) ? cpu_0 : cpu_info[next_index].cpu_number;
//int next_cpu = i;
if (pin_to_logical_processor(LogicalProcessor(next_cpu), current_test->id, thread_id)) {
// update cpu_info?
log_warning("thread %i running on cpu %i", thread_id, next_cpu); // remove, only for debug
}
}
}

class FutexThreadSynchronization : public BarrierThreadSynchronization
{
public:
void do_wait(std::atomic_int &phase, int remaining) noexcept override;
void do_wake(std::atomic_int &phase) noexcept override;
};

void FutexThreadSynchronization::do_wait(std::atomic_int &phase, int remaining) noexcept
{
while (futex_wait(&phase, remaining) != 0) {
remaining = phase.load(std::memory_order_relaxed);
if (remaining < 0)
return;
}
}

void FutexThreadSynchronization::do_wake(std::atomic_int &phase) noexcept
{
BarrierThreadSynchronization::do_wake(phase);
futex_wake_all(&phase);
}

#ifdef __x86_64__
#pragma GCC push_options
#pragma GCC target("waitpkg")

static constexpr int WaitpkgWaitState = 1; // C0.1
void check_waitpkg()
{
if (cpu_has_feature(cpu_feature_waitpkg)) {
// test that the OS has enabled this
uint64_t current_tsc = __rdtsc();
if (_tpause(WaitpkgWaitState, current_tsc + 1) == 0)
return;
}

fprintf(stderr, "%s: waitpkg is not supported on this CPU or this OS.\n",
program_invocation_name);
exit(EX_OSERR);
}

class TPauseThreadSynchronization : public BarrierThreadSynchronization
{
public:
TPauseThreadSynchronization() { check_waitpkg(); }
void do_wait(std::atomic_int &phase, int remaining) noexcept override;
void do_wake(std::atomic_int &phase) noexcept override;

std::atomic<uint64_t> tsc;
};

void TPauseThreadSynchronization::do_wait(std::atomic_int &phase, int remaining) noexcept
{
BarrierThreadSynchronization::do_wait(phase, remaining);
_tpause(WaitpkgWaitState, tsc.load(std::memory_order_relaxed));
}

void TPauseThreadSynchronization::do_wake(std::atomic_int &phase) noexcept
{
uint64_t target_tsc = __rdtsc() + 16;
tsc.store(target_tsc, std::memory_order_relaxed);
BarrierThreadSynchronization::do_wake(phase);
_tpause(WaitpkgWaitState, target_tsc);
}

class UMWaitThreadSynchronization : public BarrierThreadSynchronization
{
public:
UMWaitThreadSynchronization() { check_waitpkg(); }
void do_wait(std::atomic_int &phase, int remaining) noexcept override;
};

void UMWaitThreadSynchronization::do_wait(std::atomic_int &phase, int remaining) noexcept
{
_umonitor(&phase); // arm the monitor first
if (phase.load(std::memory_order_relaxed) < 0)
return;
_umwait(WaitpkgWaitState, __rdtsc() + 128);
}

#pragma GCC pop_options
#endif
} // unnamed namespace

static ShortDuration test_duration()
{
/* global (-t) option overrides this all */
Expand Down Expand Up @@ -746,7 +917,12 @@ int test_time_condition(const struct test *the_test) noexcept
if (max_loop_count_exceeded(the_test))
return 0; // end the test if max loop count exceeded

return !wallclock_deadline_has_expired(sApp->shmem->current_test_endtime);
if (wallclock_deadline_has_expired(sApp->shmem->current_test_endtime))
return 0;

if (sApp->thread_synchronization)
sApp->thread_synchronization->synchronize();
return 1;
}

// Creates a string containing all socket temperatures like: "P0:30oC P2:45oC"
Expand Down Expand Up @@ -899,6 +1075,10 @@ int test_run_wrapper_function(const struct test *test, int thread_number)

void test_loop_start() noexcept
{
#ifdef __x86_64__
sApp->test_thread_data(thread_num)->last_tsc = __rdtsc();
#endif

using namespace AssemblyMarker;
assembly_marker<TestLoop, Start>();
}
Expand Down Expand Up @@ -1764,6 +1944,8 @@ static TestResult child_run(/*nonconst*/ struct test *test, int child_number)
signals_init_child();
debug_init_child();
}
if (sApp->thread_synchronization)
sApp->thread_synchronization->init();

TestResult state = TestResult::Passed;

Expand Down Expand Up @@ -3132,6 +3314,7 @@ int main(int argc, char **argv)
{ "service", no_argument, nullptr, service_option },
{ "shorten-runtime", required_argument, nullptr, shortened_runtime_option },
{ "strict-runtime", no_argument, nullptr, strict_runtime_option },
{ "synchronize-threads", required_argument, nullptr, synchronize_threads_option },
{ "syslog", no_argument, nullptr, syslog_runtime_option },
{ "temperature-threshold", required_argument, nullptr, temperature_threshold_option },
{ "test-delay", required_argument, nullptr, test_delay_option },
Expand Down Expand Up @@ -3398,6 +3581,26 @@ int main(int argc, char **argv)
case strict_runtime_option:
sApp->shmem->use_strict_runtime = true;
break;
case synchronize_threads_option:
if (std::string_view(optarg) == "barrier") {
sApp->thread_synchronization = new BarrierThreadSynchronization;
} else if (std::string_view(optarg) == "futex") {
sApp->thread_synchronization = new FutexThreadSynchronization;
#ifdef __x86_64__
} else if (std::string_view(optarg) == "tpause") {
sApp->thread_synchronization = new TPauseThreadSynchronization;
} else if (std::string_view(optarg) == "umwait") {
sApp->thread_synchronization = new UMWaitThreadSynchronization;
#endif
} else {
fprintf(stderr, "%s: invalid option --synchronize-threads=%s. Valid options are: barrier, futex"
#ifdef __x86_64__
", tpause, umwait"
#endif
"\n", argv[0], optarg);
return EX_USAGE;
}
break;
case syslog_runtime_option:
sApp->syslog_ident = program_invocation_name;
break;
Expand Down
12 changes: 12 additions & 0 deletions framework/sandstone_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ struct alignas(64) Test : Common
/* Thread's effective CPU frequency during execution */
double effective_freq_mhz;

/* Last TSC reading */
uint64_t last_tsc;

void init()
{
Common::init();
inner_loop_count = inner_loop_count_at_fail = 0;
effective_freq_mhz = 0.0;
last_tsc = 0;
}
};
} // namespace PerThreadData
Expand Down Expand Up @@ -317,6 +321,13 @@ class LoggingStream
friend LoggingStream logging_user_messages_stream(int thread_num, int level);
};

struct SandstoneThreadSynchronizationBase
{
virtual ~SandstoneThreadSynchronizationBase() = default;
virtual void init() noexcept = 0;
virtual void synchronize() noexcept = 0;
};

struct SandstoneApplication : public InterruptMonitor, public test_the_test_data<SandstoneConfig::Debug>
{
enum class OutputFormat : int8_t {
Expand Down Expand Up @@ -435,6 +446,7 @@ struct SandstoneApplication : public InterruptMonitor, public test_the_test_data
void select_main_thread(int slice);

SandstoneBackgroundScan background_scan;
SandstoneThreadSynchronizationBase *thread_synchronization = nullptr;

private:
SandstoneApplication() = default;
Expand Down
5 changes: 3 additions & 2 deletions framework/sysdeps/linux/cpu_affinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ LogicalProcessorSet ambient_logical_processor_set()
return result;
}

bool pin_to_logical_processor(LogicalProcessor n, const char *thread_name)
// TODO: Do the same for the others
bool pin_to_logical_processor(LogicalProcessor n, const char *thread_name, pid_t thread_id)
{
set_thread_name(thread_name);
if (n == LogicalProcessor(-1))
Expand All @@ -54,7 +55,7 @@ bool pin_to_logical_processor(LogicalProcessor n, const char *thread_name)
memset(cpu_set, 0, sizeof(cpu_set));
LogicalProcessorSetOps::setInArray({ cpu_set, size }, n);

if (sched_setaffinity(0, sizeof(cpu_set), reinterpret_cast<cpu_set_t *>(cpu_set))) {
if (sched_setaffinity(thread_id, sizeof(cpu_set), reinterpret_cast<cpu_set_t *>(cpu_set))) {
perror("sched_setaffinity");
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion framework/topology.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class LogicalProcessorSet : private LogicalProcessorSetOps
};

LogicalProcessorSet ambient_logical_processor_set();
bool pin_to_logical_processor(LogicalProcessor, const char *thread_name = nullptr);
bool pin_to_logical_processor(LogicalProcessor, const char *thread_name = nullptr, pid_t thread_id=0);
bool pin_to_logical_processors(CpuRange, const char *thread_name);

void apply_cpuset_param(char *param);
Expand Down
1 change: 1 addition & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ add_project_link_arguments([
'-lm',
'-lssp',
'-Wl,-Bdynamic',
'-lsynchronization',
'-lshlwapi',
'-lntdll',
], language : [ 'c', 'cpp' ])
Expand Down