Skip to content

Commit

Permalink
erts: Add scheduler polling support to nifs
Browse files Browse the repository at this point in the history
  • Loading branch information
garazdawi committed Jan 9, 2025
1 parent 02c242a commit c440365
Show file tree
Hide file tree
Showing 12 changed files with 510 additions and 66 deletions.
23 changes: 15 additions & 8 deletions erts/emulator/beam/erl_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,20 +810,27 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks)
/** @brief Create a message with the content of process independent \c msg_env.
* Invalidates \c msg_env.
*/
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env)
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env, Uint extra)
{
struct enif_msg_environment_t* menv = (struct enif_msg_environment_t*)msg_env;
ErtsMessage* mp;
ErlHeapFragment *heap_frag;

flush_env(msg_env);
mp = erts_alloc_message(0, NULL);
mp->data.heap_frag = menv->env.heap_frag;
ASSERT(mp->data.heap_frag == MBUF(&menv->phony_proc));
if (mp->data.heap_frag != NULL) {
mp = erts_alloc_message(extra, NULL);
if (extra) {
mp->hfrag.next = menv->env.heap_frag;
heap_frag = mp->hfrag.next;
} else {
mp->data.heap_frag = menv->env.heap_frag;
heap_frag = mp->data.heap_frag;
}
ASSERT(heap_frag == MBUF(&menv->phony_proc));
if (heap_frag != NULL) {
/* Move all offheap's from phony proc to the first fragment.
Quick and dirty... */
ASSERT(!is_offheap(&mp->data.heap_frag->off_heap));
mp->data.heap_frag->off_heap = MSO(&menv->phony_proc);
ASSERT(!is_offheap(&heap_frag->off_heap));
heap_frag->off_heap = MSO(&menv->phony_proc);
clear_offheap(&MSO(&menv->phony_proc));
menv->env.heap_frag = NULL;
MBUF(&menv->phony_proc) = NULL;
Expand Down Expand Up @@ -944,7 +951,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
}
#endif
}
mp = erts_create_message_from_nif_env(msg_env);
mp = erts_create_message_from_nif_env(msg_env, 0);
ERL_MESSAGE_TOKEN(mp) = token;
} else {
erts_literal_area_t litarea;
Expand Down
15 changes: 15 additions & 0 deletions erts/emulator/beam/erl_port_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef erts_atomic_t ErtsPortTaskHandle;
#if (defined(ERL_PROCESS_C__) \
|| defined(ERL_PORT_TASK_C__) \
|| defined(ERL_IO_C__) \
|| defined(ERL_CHECK_IO_C__) \
|| (ERTS_GLB_INLINE_INCL_FUNC_DEF \
&& defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF)))
#define ERTS_INCLUDE_SCHEDULER_INTERNALS
Expand Down Expand Up @@ -138,6 +139,8 @@ ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched

#if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
ERTS_GLB_INLINE void erts_port_task_inc_outstanding_io_tasks(void);
ERTS_GLB_INLINE void erts_port_task_dec_outstanding_io_tasks(void);
/* NOTE: Do not access any of the exported variables directly */
extern erts_atomic_t erts_port_task_outstanding_io_tasks;
#endif
Expand Down Expand Up @@ -226,6 +229,18 @@ erts_port_task_have_outstanding_io_tasks(void)
return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
!= 0);
}

ERTS_GLB_INLINE void
erts_port_task_inc_outstanding_io_tasks(void)
{
erts_atomic_inc_nob(&erts_port_task_outstanding_io_tasks);
}

ERTS_GLB_INLINE void
erts_port_task_dec_outstanding_io_tasks(void)
{
erts_atomic_dec_nob(&erts_port_task_outstanding_io_tasks);
}
#endif

#endif
Expand Down
32 changes: 31 additions & 1 deletion erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "bif.h"
#include "erl_bif_unique.h"
#include "erl_proc_sig_queue.h"
#include "erl_check_io.h"
#include "dtrace-wrapper.h"

#define ERTS_SIG_REDS_CNT_FACTOR 4
Expand Down Expand Up @@ -2472,6 +2473,16 @@ erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from,
return proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_LINK);
}

int
erts_proc_sig_send_nif_select(Eterm to, ErtsMessage *msg) {
ErtsSignal *sig = (ErtsSignal*)msg;

sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_NIF_SELECT,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
return proc_queue_signal(NULL, am_system, to, sig, 0, ERTS_SIG_Q_OP_NIF_SELECT);
}

ErtsSigUnlinkOp *
erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from)
{
Expand Down Expand Up @@ -6355,6 +6366,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,

break;
}
#if ERTS_POLL_USE_SCHEDULER_POLLING
case ERTS_SIG_Q_OP_NIF_SELECT: {

Eterm msg = erts_io_handle_nif_select(sig);

convert_prepared_sig_to_msg(c_p, &tracing, sig, msg, am_undefined, next_nm_sig);

cnt += 4;

erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);

break;
}
#endif

default:
ERTS_INTERNAL_ERROR("Unknown signal");
Expand Down Expand Up @@ -6726,6 +6751,7 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
}
break;

case ERTS_SIG_Q_OP_NIF_SELECT:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_ALIAS_MSG:
sig->next = NULL;
Expand Down Expand Up @@ -6918,6 +6944,7 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_ADJ_MSGQ:
case ERTS_SIG_Q_OP_FLUSH:
case ERTS_SIG_Q_OP_NIF_SELECT:
break;

default:
Expand Down Expand Up @@ -7007,7 +7034,9 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
case ERTS_SIG_Q_OP_SYNC_SUSPEND:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_IS_ALIVE:
case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY: {
case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY:
case ERTS_SIG_Q_OP_NIF_SELECT:
{
ErlHeapFragment *hf;
size = sizeof(ErtsMessageRef);
size += ERTS_HEAP_FRAG_SIZE(((ErtsMessage *) sig)->hfrag.alloc_size);
Expand Down Expand Up @@ -8614,6 +8643,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_FLUSH:
case ERTS_SIG_Q_OP_RPC:
case ERTS_SIG_Q_OP_NIF_SELECT:
break;

default:
Expand Down
8 changes: 6 additions & 2 deletions erts/emulator/beam/erl_proc_sig_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ typedef struct {
* Note that not all signal are handled using this functionality!
*/

#define ERTS_SIG_Q_OP_MAX 19
#define ERTS_SIG_Q_OP_MAX 20

#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
Expand All @@ -190,7 +190,8 @@ typedef struct {
#define ERTS_SIG_Q_OP_RECV_MARK 16
#define ERTS_SIG_Q_OP_UNLINK_ACK 17
#define ERTS_SIG_Q_OP_ADJ_MSGQ 18
#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_OP_FLUSH 19
#define ERTS_SIG_Q_OP_NIF_SELECT ERTS_SIG_Q_OP_MAX

#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10)

Expand Down Expand Up @@ -1178,6 +1179,9 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id);
void
erts_proc_sig_send_move_msgq_off_heap(Eterm to);

int
erts_proc_sig_send_nif_select(Eterm to, ErtsMessage *msg);

/*
* End of send operations of currently supported process signals.
*/
Expand Down
19 changes: 15 additions & 4 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -3392,7 +3392,7 @@ try_set_sys_scheduling(void)


static ERTS_INLINE int
prepare_for_sys_schedule(void)
prepare_for_sys_schedule(ErtsSchedulerData *esdp)
{
if (erts_sched_poll_enabled()) {
while (!erts_port_task_have_outstanding_io_tasks()
Expand All @@ -3407,7 +3407,7 @@ prepare_for_sys_schedule(void)

#else
#define clear_sys_scheduling()
#define prepare_for_sys_schedule() 0
#define prepare_for_sys_schedule(esdp) 0
#endif

#ifdef HARDDEBUG
Expand Down Expand Up @@ -3542,13 +3542,18 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
current_time = 0;
timeout_time = ERTS_MONOTONIC_TIME_MAX;
}
#if ERTS_POLL_USE_SCHEDULER_POLLING
if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && erts_sched_poll_enabled()) {
erts_io_clear_nif_select_handles(esdp);
}
#endif
if (do_timeout) {
if (!thr_prgr_active) {
erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
}
else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule()) {
else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule(esdp)) {
/* We sleep in check_io, only for normal schedulers */
if (thr_prgr_active) {
erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0);
Expand Down Expand Up @@ -5985,6 +5990,12 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->io.out = (Uint64) 0;
esdp->io.in = (Uint64) 0;

#if ERTS_POLL_USE_SCHEDULER_POLLING
for (int i = 0; i < sizeof(esdp->nif_select_fds) / sizeof(ErtsSysFdType); i++) {
esdp->nif_select_fds[i] = ERTS_SYS_FD_INVALID;
}
#endif

esdp->pending_signal.sig = NULL;
esdp->pending_signal.to = THE_NON_VALUE;
#ifdef DEBUG
Expand Down Expand Up @@ -9834,7 +9845,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
goto check_activities_to_run;
} else if (is_normal_sched &&
fcalls > (2 * context_reds) &&
prepare_for_sys_schedule()) {
prepare_for_sys_schedule(esdp)) {
ErtsMonotonicTime current_time;
/*
* Schedule system-level activities.
Expand Down
6 changes: 6 additions & 0 deletions erts/emulator/beam/erl_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ typedef struct process Process;
#include "erl_thr_progress.h"
#undef ERL_THR_PROGRESS_TSD_TYPE_ONLY

// Included for ERTS_POLL_USE_SCHEDULER_POLLING
#include "erl_poll.h"

#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT 0
#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT 0

Expand Down Expand Up @@ -721,6 +724,9 @@ struct ErtsSchedulerData_ {
Uint64 out;
Uint64 in;
} io;
#if ERTS_POLL_USE_SCHEDULER_POLLING
ErtsSysFdType nif_select_fds[5]; /* Used by check io */
#endif
struct {
ErtsSignal* sig;
Eterm to;
Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ extern Eterm erts_nif_call_function(Process *p, Process *tracee,

int erts_call_dirty_nif(ErtsSchedulerData *esdp, Process *c_p,
ErtsCodePtr I, Eterm *reg);
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env);
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env, Uint extra);


/* Driver handle (wrapper for old plain handle) */
Expand Down
Loading

0 comments on commit c440365

Please sign in to comment.