Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce scheduler pollset for nif select #9275

Open
wants to merge 13 commits into
base: maint
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion erts/doc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ assets/erl_uds_dist.erl: ../../lib/kernel/examples/erl_uds_dist/src/erl_uds_dist
assets/time_compat.erl: ../example/time_compat.erl
$(V_at)cp $< $@

$(HTMLDIR)/index.html: $(patsubst ../emulator/internal_doc/assets/%,assets/%,$(wildcard ../emulator/internal_doc/assets/*.png) $(wildcard ../emulator/internal_doc/assets/*.svg)) assets/gen_tcp_dist.erl assets/erl_uds_dist.erl assets/time_compat.erl
$(HTMLDIR)/index.html: $(wildcard ../emulator/internal_doc/*.md) \
$(patsubst ../emulator/internal_doc/assets/%,assets/%,$(wildcard ../emulator/internal_doc/assets/*.png) $(wildcard ../emulator/internal_doc/assets/*.svg)) \
assets/gen_tcp_dist.erl assets/erl_uds_dist.erl assets/time_compat.erl

# ----------------------------------------------------
# Release Target
Expand Down
23 changes: 15 additions & 8 deletions erts/emulator/beam/erl_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,20 +810,27 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks)
/** @brief Create a message with the content of process independent \c msg_env.
* Invalidates \c msg_env.
*/
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env)
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env, Uint extra)
{
struct enif_msg_environment_t* menv = (struct enif_msg_environment_t*)msg_env;
ErtsMessage* mp;
ErlHeapFragment *heap_frag;

flush_env(msg_env);
mp = erts_alloc_message(0, NULL);
mp->data.heap_frag = menv->env.heap_frag;
ASSERT(mp->data.heap_frag == MBUF(&menv->phony_proc));
if (mp->data.heap_frag != NULL) {
mp = erts_alloc_message(extra, NULL);
if (extra) {
mp->hfrag.next = menv->env.heap_frag;
heap_frag = mp->hfrag.next;
} else {
mp->data.heap_frag = menv->env.heap_frag;
heap_frag = mp->data.heap_frag;
}
ASSERT(heap_frag == MBUF(&menv->phony_proc));
if (heap_frag != NULL) {
/* Move all offheap's from phony proc to the first fragment.
Quick and dirty... */
ASSERT(!is_offheap(&mp->data.heap_frag->off_heap));
mp->data.heap_frag->off_heap = MSO(&menv->phony_proc);
ASSERT(!is_offheap(&heap_frag->off_heap));
heap_frag->off_heap = MSO(&menv->phony_proc);
clear_offheap(&MSO(&menv->phony_proc));
menv->env.heap_frag = NULL;
MBUF(&menv->phony_proc) = NULL;
Expand Down Expand Up @@ -944,7 +951,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
}
#endif
}
mp = erts_create_message_from_nif_env(msg_env);
mp = erts_create_message_from_nif_env(msg_env, 0);
ERL_MESSAGE_TOKEN(mp) = token;
} else {
erts_literal_area_t litarea;
Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/erl_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ Eterm erts_request_io_bytes(Process *c_p);
#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 11))
/* Last port to terminate halts the emulator */
#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 12))
/* Check if the event in ready_input should be cleaned */
/* Check if the event in ready_input should be cleaned. */
#define ERTS_PORT_SFLG_CHECK_FD_CLEANUP ((Uint32) (1 << 13))
#ifdef DEBUG
/* Only debug: make sure all flags aren't cleared unintentionally */
Expand Down
5 changes: 5 additions & 0 deletions erts/emulator/beam/erl_port_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ reset_executed_io_task_handle(Port *prt, ErtsPortTask *ptp)
reset_port_task_handle);
erts_atomic32_read_band_nob(&prt->state, ~ERTS_PORT_SFLG_CHECK_FD_CLEANUP);
} else {
/* We don't have to call erts_io_notify_port_task_executed for scheduler events
as we will keep the fd in the set. However, if driver_deselect was called in
the ready_input callback, then we do need to call it in order to free the
select structures from the fd state.
*/
reset_port_task_handle(ptp->u.alive.handle);
}
} else
Expand Down
15 changes: 15 additions & 0 deletions erts/emulator/beam/erl_port_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef erts_atomic_t ErtsPortTaskHandle;
#if (defined(ERL_PROCESS_C__) \
|| defined(ERL_PORT_TASK_C__) \
|| defined(ERL_IO_C__) \
|| defined(ERL_CHECK_IO_C__) \
|| (ERTS_GLB_INLINE_INCL_FUNC_DEF \
&& defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF)))
#define ERTS_INCLUDE_SCHEDULER_INTERNALS
Expand Down Expand Up @@ -138,6 +139,8 @@ ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched

#if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
ERTS_GLB_INLINE void erts_port_task_inc_outstanding_io_tasks(void);
ERTS_GLB_INLINE void erts_port_task_dec_outstanding_io_tasks(void);
/* NOTE: Do not access any of the exported variables directly */
extern erts_atomic_t erts_port_task_outstanding_io_tasks;
#endif
Expand Down Expand Up @@ -226,6 +229,18 @@ erts_port_task_have_outstanding_io_tasks(void)
return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
!= 0);
}

ERTS_GLB_INLINE void
erts_port_task_inc_outstanding_io_tasks(void)
{
erts_atomic_inc_nob(&erts_port_task_outstanding_io_tasks);
}

ERTS_GLB_INLINE void
erts_port_task_dec_outstanding_io_tasks(void)
{
erts_atomic_dec_nob(&erts_port_task_outstanding_io_tasks);
}
#endif

#endif
Expand Down
32 changes: 31 additions & 1 deletion erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "bif.h"
#include "erl_bif_unique.h"
#include "erl_proc_sig_queue.h"
#include "erl_check_io.h"
#include "dtrace-wrapper.h"

#define ERTS_SIG_REDS_CNT_FACTOR 4
Expand Down Expand Up @@ -2472,6 +2473,16 @@ erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from,
return proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_LINK);
}

int
erts_proc_sig_send_nif_select(Eterm to, ErtsMessage *msg) {
ErtsSignal *sig = (ErtsSignal*)msg;

sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_NIF_SELECT,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
return proc_queue_signal(NULL, am_system, to, sig, 0, ERTS_SIG_Q_OP_NIF_SELECT);
}

ErtsSigUnlinkOp *
erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from)
{
Expand Down Expand Up @@ -6355,6 +6366,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,

break;
}
#if ERTS_POLL_USE_SCHEDULER_POLLING
case ERTS_SIG_Q_OP_NIF_SELECT: {

Eterm msg = erts_io_handle_nif_select(sig);

convert_prepared_sig_to_msg(c_p, &tracing, sig, msg, am_undefined, next_nm_sig);

cnt += 4;

erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);

break;
}
#endif

default:
ERTS_INTERNAL_ERROR("Unknown signal");
Expand Down Expand Up @@ -6726,6 +6751,7 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
}
break;

case ERTS_SIG_Q_OP_NIF_SELECT:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_ALIAS_MSG:
sig->next = NULL;
Expand Down Expand Up @@ -6918,6 +6944,7 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_ADJ_MSGQ:
case ERTS_SIG_Q_OP_FLUSH:
case ERTS_SIG_Q_OP_NIF_SELECT:
break;

default:
Expand Down Expand Up @@ -7007,7 +7034,9 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
case ERTS_SIG_Q_OP_SYNC_SUSPEND:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_IS_ALIVE:
case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY: {
case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY:
case ERTS_SIG_Q_OP_NIF_SELECT:
{
ErlHeapFragment *hf;
size = sizeof(ErtsMessageRef);
size += ERTS_HEAP_FRAG_SIZE(((ErtsMessage *) sig)->hfrag.alloc_size);
Expand Down Expand Up @@ -8614,6 +8643,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_FLUSH:
case ERTS_SIG_Q_OP_RPC:
case ERTS_SIG_Q_OP_NIF_SELECT:
break;

default:
Expand Down
8 changes: 6 additions & 2 deletions erts/emulator/beam/erl_proc_sig_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ typedef struct {
* Note that not all signal are handled using this functionality!
*/

#define ERTS_SIG_Q_OP_MAX 19
#define ERTS_SIG_Q_OP_MAX 20

#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
Expand All @@ -190,7 +190,8 @@ typedef struct {
#define ERTS_SIG_Q_OP_RECV_MARK 16
#define ERTS_SIG_Q_OP_UNLINK_ACK 17
#define ERTS_SIG_Q_OP_ADJ_MSGQ 18
#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_OP_FLUSH 19
#define ERTS_SIG_Q_OP_NIF_SELECT ERTS_SIG_Q_OP_MAX

#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10)

Expand Down Expand Up @@ -1178,6 +1179,9 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id);
void
erts_proc_sig_send_move_msgq_off_heap(Eterm to);

int
erts_proc_sig_send_nif_select(Eterm to, ErtsMessage *msg);

/*
* End of send operations of currently supported process signals.
*/
Expand Down
19 changes: 15 additions & 4 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -3392,7 +3392,7 @@ try_set_sys_scheduling(void)


static ERTS_INLINE int
prepare_for_sys_schedule(void)
prepare_for_sys_schedule(ErtsSchedulerData *esdp)
{
if (erts_sched_poll_enabled()) {
while (!erts_port_task_have_outstanding_io_tasks()
Expand All @@ -3407,7 +3407,7 @@ prepare_for_sys_schedule(void)

#else
#define clear_sys_scheduling()
#define prepare_for_sys_schedule() 0
#define prepare_for_sys_schedule(esdp) 0
#endif

#ifdef HARDDEBUG
Expand Down Expand Up @@ -3542,13 +3542,18 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
current_time = 0;
timeout_time = ERTS_MONOTONIC_TIME_MAX;
}
#if ERTS_POLL_USE_SCHEDULER_POLLING
if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && erts_sched_poll_enabled()) {
erts_io_clear_nif_select_handles(esdp);
}
#endif
if (do_timeout) {
if (!thr_prgr_active) {
erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
}
else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule()) {
else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule(esdp)) {
/* We sleep in check_io, only for normal schedulers */
if (thr_prgr_active) {
erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0);
Expand Down Expand Up @@ -5985,6 +5990,12 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->io.out = (Uint64) 0;
esdp->io.in = (Uint64) 0;

#if ERTS_POLL_USE_SCHEDULER_POLLING
for (int i = 0; i < sizeof(esdp->nif_select_fds) / sizeof(ErtsSysFdType); i++) {
esdp->nif_select_fds[i] = ERTS_SYS_FD_INVALID;
}
#endif

esdp->pending_signal.sig = NULL;
esdp->pending_signal.to = THE_NON_VALUE;
#ifdef DEBUG
Expand Down Expand Up @@ -9834,7 +9845,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
goto check_activities_to_run;
} else if (is_normal_sched &&
fcalls > (2 * context_reds) &&
prepare_for_sys_schedule()) {
prepare_for_sys_schedule(esdp)) {
ErtsMonotonicTime current_time;
/*
* Schedule system-level activities.
Expand Down
6 changes: 6 additions & 0 deletions erts/emulator/beam/erl_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ typedef struct process Process;
#include "erl_thr_progress.h"
#undef ERL_THR_PROGRESS_TSD_TYPE_ONLY

// Included for ERTS_POLL_USE_SCHEDULER_POLLING
#include "erl_poll.h"

#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT 0
#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT 0

Expand Down Expand Up @@ -721,6 +724,9 @@ struct ErtsSchedulerData_ {
Uint64 out;
Uint64 in;
} io;
#if ERTS_POLL_USE_SCHEDULER_POLLING
ErtsSysFdType nif_select_fds[5]; /* Used by check io */
#endif
struct {
ErtsSignal* sig;
Eterm to;
Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ extern Eterm erts_nif_call_function(Process *p, Process *tracee,

int erts_call_dirty_nif(ErtsSchedulerData *esdp, Process *c_p,
ErtsCodePtr I, Eterm *reg);
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env);
ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env, Uint extra);


/* Driver handle (wrapper for old plain handle) */
Expand Down
Loading
Loading