Skip to content

Commit

Permalink
per-thread start-time; use a proper macro/API to increment metrics; s…
Browse files Browse the repository at this point in the history
…plit GC time into safepoint time and proper GC time; introduce internal spinlock time; introduce feature flag to report metrics in terms of CPU time
  • Loading branch information
d-netto committed Sep 24, 2024
1 parent a13242d commit 898fffb
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 45 deletions.
2 changes: 0 additions & 2 deletions base/locks-mt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ end
Base.assert_havelock(l::SpinLock) = islocked(l) ? nothing : Base.concurrency_violation()

function lock(l::SpinLock)
# t0 = time_ns()
while true
if @inline trylock(l)
# ccall(:jl_record_lock_spin_time, Cvoid, (Culonglong,), time_ns() - t0)
return
end
ccall(:jl_cpu_suspend, Cvoid, ())
Expand Down
19 changes: 14 additions & 5 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3755,7 +3755,11 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
int8_t old_state = jl_atomic_load_relaxed(&ptls->gc_state);
jl_atomic_store_release(&ptls->gc_state, JL_GC_STATE_WAITING);
// `jl_safepoint_start_gc()` makes sure only one thread can run the GC.
uint64_t t0 = jl_hrtime(); // time we entered GC

// time we started the GC stop-the-world phase
uint64_t safepoint_start_wall_time = jl_hrtime();
uint64_t safepoint_start_for_tls_timing = jl_record_time_for_tls_metric();

if (!jl_safepoint_start_gc()) {
// either another thread is running GC, or the GC got disabled just now.
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
Expand Down Expand Up @@ -3785,8 +3789,11 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
jl_gc_wait_for_the_world(gc_all_tls_states, gc_n_threads);
JL_PROBE_GC_STOP_THE_WORLD();

uint64_t t1 = jl_hrtime(); // time user code stopped running
uint64_t duration = t1 - t0;
// time we ended the GC stop-the-world phase
uint64_t safepoint_end_wall_time = jl_hrtime();
uint64_t safepoint_end_for_tls_timing = jl_record_time_for_tls_metric();

uint64_t duration = safepoint_end_wall_time - safepoint_start_wall_time;
if (duration > gc_num.max_time_to_safepoint)
gc_num.max_time_to_safepoint = duration;
gc_num.time_to_safepoint = duration;
Expand Down Expand Up @@ -3814,8 +3821,10 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
jl_safepoint_end_gc();
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
JL_PROBE_GC_END();
// Time how long GC took.
ptls->timing_tls.gc_time += jl_hrtime() - t1;

// Measure how long safepoint (stop-the-world) and GC took
jl_increment_timing_tls_metric(ptls, safepoint_time, safepoint_end_for_tls_timing - safepoint_start_for_tls_timing);
jl_increment_timing_tls_metric(ptls, gc_time, jl_record_time_for_tls_metric() - safepoint_end_for_tls_timing);

// Only disable finalizers on current thread
// Doing this on all threads is racy (it's impossible to check
Expand Down
2 changes: 2 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_
jl_start_threads();
jl_start_gc_threads();
uv_barrier_wait(&thread_init_done);
// `start_time`is initialized to 0, so it's fine to use an increment call here
jl_increment_timing_tls_metric(ptls, start_time, jl_record_time_for_tls_metric());

uv_mutex_init(&array_to_string_print_lock);

Expand Down
1 change: 1 addition & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ int jl_running_under_rr(int recheck) JL_NOTSAFEPOINT;
// timers
// Returns time in nanosec
JL_DLLEXPORT uint64_t jl_hrtime(void) JL_NOTSAFEPOINT;
JL_DLLEXPORT uint64_t jl_record_time_for_tls_metric(void) JL_NOTSAFEPOINT;

JL_DLLEXPORT void jl_set_peek_cond(uintptr_t);
JL_DLLEXPORT double jl_get_profile_peek_duration(void);
Expand Down
4 changes: 4 additions & 0 deletions src/julia_locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ JL_DLLEXPORT int _jl_mutex_trylock_nogc(jl_task_t *self, jl_mutex_t *lock) JL_NO
JL_DLLEXPORT int _jl_mutex_trylock(jl_task_t *self, jl_mutex_t *lock);
JL_DLLEXPORT void _jl_mutex_unlock(jl_task_t *self, jl_mutex_t *lock);
JL_DLLEXPORT void _jl_mutex_unlock_nogc(jl_mutex_t *lock) JL_NOTSAFEPOINT;
// Unfortunately we can't include `julia_internal.h` here, so we need to forward declare this
JL_DLLEXPORT uint64_t jl_record_time_for_tls_metric(void) JL_NOTSAFEPOINT;

static inline void jl_mutex_wait(jl_mutex_t *lock, int safepoint)
{
uint64_t t0 = jl_record_time_for_tls_metric();
_jl_mutex_wait(jl_current_task, lock, safepoint);
jl_increment_timing_tls_metric(jl_current_task->ptls, internal_spinlock_time, jl_record_time_for_tls_metric() - t0);
}

static inline void jl_mutex_lock_nogc(jl_mutex_t *lock) JL_NOTSAFEPOINT JL_NOTSAFEPOINT_ENTER
Expand Down
10 changes: 8 additions & 2 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,18 @@ struct _jl_bt_element_t;

typedef struct {
uint64_t start_time;
uint64_t sleep_time;
uint64_t internal_spinlock_time;
uint64_t scheduler_time;
/* uint64_t lock_spin_time; */
uint64_t safepoint_time;
uint64_t gc_time;
} jl_timing_tls_states_t;

#define jl_increment_timing_tls_metric(ptls, metric, dt) do { \
if (ptls) { \
ptls->timing_tls.metric += dt; \
} \
} while (0)

// This includes all the thread local states we care about for a thread.
// Changes to TLS field types must be reflected in codegen.
#define JL_MAX_BT_SIZE 80000
Expand Down
29 changes: 19 additions & 10 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ void jl_parallel_gc_threadfun(void *arg)

// free the thread argument here
free(targ);
// `start_time`is initialized to 0, so it's fine to use an increment call here
jl_increment_timing_tls_metric(ptls, start_time, jl_record_time_for_tls_metric());

while (1) {
uv_mutex_lock(&gc_threads_lock);
Expand Down Expand Up @@ -171,6 +173,8 @@ void jl_concurrent_gc_threadfun(void *arg)

// free the thread argument here
free(targ);
// `start_time`is initialized to 0, so it's fine to use an increment call here
jl_increment_timing_tls_metric(ptls, start_time, jl_record_time_for_tls_metric());

while (1) {
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_CONCURRENT_COLLECTOR_THREAD);
Expand Down Expand Up @@ -198,6 +202,8 @@ void jl_threadfun(void *arg)

// free the thread argument here
free(targ);
// `start_time`is initialized to 0, so it's fine to use an increment call here
jl_increment_timing_tls_metric(ptls, start_time, jl_record_time_for_tls_metric());

(void)jl_gc_unsafe_enter(ptls);
jl_finish_task(ct); // noreturn
Expand Down Expand Up @@ -385,18 +391,16 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT

extern _Atomic(unsigned) _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
static jl_task_t *_jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
{
jl_task_t *ct = jl_current_task;
uint64_t start_cycles = 0;
uint64_t t0 = jl_hrtime();

while (1) {
jl_ptls_t ptls = ct->ptls;
jl_task_t *task = get_next_task(trypoptask, q);
if (task) {
ptls->timing_tls.scheduler_time += jl_hrtime() - t0;
if (task)
return task;
}

// quick, race-y check to see if there seems to be any stuff in there
jl_cpu_pause();
if (!check_empty(checkempty)) {
Expand All @@ -405,6 +409,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}

jl_cpu_pause();
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (ptls->tid == 0 && (!jl_atomic_load_relaxed(&_threadedregion) || wait_empty))) {
// acquire sleep-check lock
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
Expand All @@ -426,7 +431,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
if (task)
ptls->timing_tls.scheduler_time += jl_hrtime() - t0;
return task;
continue;
}
Expand All @@ -435,7 +439,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
ptls->timing_tls.scheduler_time += jl_hrtime() - t0;
return task;
}

Expand Down Expand Up @@ -510,7 +513,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,

// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
uint64_t tsleep0 = jl_hrtime();
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
Expand All @@ -527,7 +529,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
ptls->timing_tls.sleep_time += jl_hrtime() - tsleep0;
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
start_cycles = 0;
if (task) {
Expand All @@ -543,6 +544,14 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
}

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
{
uint64_t t0 = jl_record_time_for_tls_metric();
jl_task_t *task = _jl_task_get_next(trypoptask, q, checkempty);
jl_increment_timing_tls_metric(jl_current_task->ptls, scheduler_time, jl_record_time_for_tls_metric() - t0);
return task;
}

#ifdef __cplusplus
}
#endif
15 changes: 9 additions & 6 deletions src/safepoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,12 @@ void jl_safepoint_end_gc(void)
uv_cond_broadcast(&safepoint_cond);
}

void jl_safepoint_wait_gc(void)
void _jl_safepoint_wait_gc(void)
{
jl_task_t *ct = jl_current_task; (void)ct;
JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct);
// The thread should have set this is already
jl_ptls_t ptls = ct->ptls;
assert(jl_atomic_load_relaxed(&ptls->gc_state) != 0);
// Time how long this thread is stopped while GC is running.
uint64_t t0 = jl_hrtime();
assert(jl_atomic_load_relaxed(&ct->ptls->gc_state) != 0);
// Use normal volatile load in the loop for speed until GC finishes.
// Then use an acquire load to make sure the GC result is visible on this thread.
while (jl_atomic_load_relaxed(&jl_gc_running) || jl_atomic_load_acquire(&jl_gc_running)) {
Expand All @@ -176,7 +173,13 @@ void jl_safepoint_wait_gc(void)
uv_cond_wait(&safepoint_cond, &safepoint_lock);
uv_mutex_unlock(&safepoint_lock);
}
ptls->timing_tls.gc_time = jl_hrtime() - t0;
}

void jl_safepoint_wait_gc(void)
{
uint64_t t0 = jl_record_time_for_tls_metric();
_jl_safepoint_wait_gc();
jl_increment_timing_tls_metric(jl_current_task->ptls, gc_time, jl_record_time_for_tls_metric() - t0);
}

void jl_safepoint_enable_sigint(void)
Expand Down
22 changes: 22 additions & 0 deletions src/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,28 @@ JL_DLLEXPORT uint64_t jl_hrtime(void) JL_NOTSAFEPOINT
return uv_hrtime();
}

JL_DLLEXPORT uint64_t jl_cpu_time(void) JL_NOTSAFEPOINT
{
#if defined(_OS_LINUX_)
struct rusage usage;
if (getrusage(RUSAGE_THREAD, &usage) == 0) {
return (uint64_t)usage.ru_utime.tv_sec * 1000000000 + (uint64_t)usage.ru_utime.tv_usec * 1000;
}
return 0;
#else
return 0;
#endif
}

JL_DLLEXPORT uint64_t jl_record_time_for_tls_metric(void) JL_NOTSAFEPOINT
{
#ifdef USE_CPU_TIMING_FOR_TLS_METRIC
return jl_cpu_time();
#else
return jl_hrtime();
#endif
}

// -- iterating the environment --

#ifdef __APPLE__
Expand Down
26 changes: 7 additions & 19 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,19 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0;

static uint64_t jl_thread_start_time;
void jl_set_thread_start_time(void)
{
jl_thread_start_time = jl_hrtime();
}

// TODO: not just current thread
JL_DLLEXPORT uint64_t jl_thread_up_time(void)
{
return jl_hrtime() - jl_thread_start_time;
jl_ptls_t ptls = jl_current_task->ptls;
return jl_record_time_for_tls_metric() - ptls->timing_tls.start_time;
}

JL_DLLEXPORT uint64_t jl_thread_user_time(uint8_t tid)
{
jl_ptls_t ptls = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
jl_timing_tls_states_t *timing = &ptls->timing_tls;
return jl_thread_up_time() - timing->gc_time - timing->scheduler_time;
return jl_record_time_for_tls_metric() - timing->start_time -
timing->internal_spinlock_time - timing->scheduler_time -
timing->safepoint_time - timing->gc_time;
}

JL_DLLEXPORT void *jl_get_ptls_states(void)
Expand Down Expand Up @@ -783,7 +780,6 @@ void jl_start_threads(void)
}
uv_thread_detach(&uvtid);
}
jl_set_thread_start_time();
}

_Atomic(unsigned) _threadedregion; // HACK: keep track of whether to prioritize IO or threading
Expand Down Expand Up @@ -813,13 +809,6 @@ JL_DLLEXPORT void jl_exit_threaded_region(void)

// Profiling stubs

/* JL_DLLEXPORT void jl_record_lock_spin_time(uint64_t time) JL_NOTSAFEPOINT */
/* { */
/* jl_task_t *ct = jl_current_task; */
/* jl_ptls_t ptls = ct->ptls; */
/* ptls->timing_tls.lock_spin_time += time; */
/* } */

void _jl_mutex_init(jl_mutex_t *lock, const char *name) JL_NOTSAFEPOINT
{
jl_atomic_store_relaxed(&lock->owner, (jl_task_t*)NULL);
Expand All @@ -842,12 +831,10 @@ void _jl_mutex_wait(jl_task_t *self, jl_mutex_t *lock, int safepoint)
return;
}
JL_TIMING(LOCK_SPIN, LOCK_SPIN);
/* uint64_t t0 = jl_hrtime(); */
while (1) {
if (owner == NULL && jl_atomic_cmpswap(&lock->owner, &owner, self)) {
lock->count = 1;
jl_profile_lock_acquired(lock);
/* jl_record_lock_spin_time(jl_hrtime() - t0); */
return;
}
if (safepoint) {
Expand Down Expand Up @@ -947,6 +934,7 @@ void _jl_mutex_unlock(jl_task_t *self, jl_mutex_t *lock)
}
}


// Make gc alignment available for threading
// see threads.jl alignment
JL_DLLEXPORT int jl_alignment(size_t sz)
Expand Down
1 change: 0 additions & 1 deletion src/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ void jl_init_threadinginfra(void);
void jl_parallel_gc_threadfun(void *arg);
void jl_concurrent_gc_threadfun(void *arg);
void jl_threadfun(void *arg);
void jl_set_thread_start_time(void);

#ifdef __cplusplus
}
Expand Down

0 comments on commit 898fffb

Please sign in to comment.