From c440365337b1e36f42d846ba6f69c52978ca4268 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20Backstr=C3=B6m?= Date: Fri, 6 Dec 2024 12:25:10 +0100 Subject: [PATCH] erts: Add scheduler polling support to nifs --- erts/emulator/beam/erl_nif.c | 23 +- erts/emulator/beam/erl_port_task.h | 15 ++ erts/emulator/beam/erl_proc_sig_queue.c | 32 ++- erts/emulator/beam/erl_proc_sig_queue.h | 8 +- erts/emulator/beam/erl_process.c | 19 +- erts/emulator/beam/erl_process.h | 6 + erts/emulator/beam/global.h | 2 +- erts/emulator/sys/common/erl_check_io.c | 254 ++++++++++++++---- erts/emulator/sys/common/erl_check_io.h | 17 ++ erts/emulator/test/driver_SUITE.erl | 1 + erts/emulator/test/nif_SUITE.erl | 165 +++++++++++- erts/emulator/test/nif_SUITE_data/nif_SUITE.c | 34 ++- 12 files changed, 510 insertions(+), 66 deletions(-) diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index c6d00f312d81..6e05b4cb8782 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -810,20 +810,27 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks) /** @brief Create a message with the content of process independent \c msg_env. * Invalidates \c msg_env. */ -ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env) +ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env, Uint extra) { struct enif_msg_environment_t* menv = (struct enif_msg_environment_t*)msg_env; ErtsMessage* mp; + ErlHeapFragment *heap_frag; flush_env(msg_env); - mp = erts_alloc_message(0, NULL); - mp->data.heap_frag = menv->env.heap_frag; - ASSERT(mp->data.heap_frag == MBUF(&menv->phony_proc)); - if (mp->data.heap_frag != NULL) { + mp = erts_alloc_message(extra, NULL); + if (extra) { + mp->hfrag.next = menv->env.heap_frag; + heap_frag = mp->hfrag.next; + } else { + mp->data.heap_frag = menv->env.heap_frag; + heap_frag = mp->data.heap_frag; + } + ASSERT(heap_frag == MBUF(&menv->phony_proc)); + if (heap_frag != NULL) { /* Move all offheap's from phony proc to the first fragment. Quick and dirty... */ - ASSERT(!is_offheap(&mp->data.heap_frag->off_heap)); - mp->data.heap_frag->off_heap = MSO(&menv->phony_proc); + ASSERT(!is_offheap(&heap_frag->off_heap)); + heap_frag->off_heap = MSO(&menv->phony_proc); clear_offheap(&MSO(&menv->phony_proc)); menv->env.heap_frag = NULL; MBUF(&menv->phony_proc) = NULL; @@ -944,7 +951,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, } #endif } - mp = erts_create_message_from_nif_env(msg_env); + mp = erts_create_message_from_nif_env(msg_env, 0); ERL_MESSAGE_TOKEN(mp) = token; } else { erts_literal_area_t litarea; diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index dfc9b4416ce8..7fbf615c4a9d 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -44,6 +44,7 @@ typedef erts_atomic_t ErtsPortTaskHandle; #if (defined(ERL_PROCESS_C__) \ || defined(ERL_PORT_TASK_C__) \ || defined(ERL_IO_C__) \ + || defined(ERL_CHECK_IO_C__) \ || (ERTS_GLB_INLINE_INCL_FUNC_DEF \ && defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF))) #define ERTS_INCLUDE_SCHEDULER_INTERNALS @@ -138,6 +139,8 @@ ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched #if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void); +ERTS_GLB_INLINE void erts_port_task_inc_outstanding_io_tasks(void); +ERTS_GLB_INLINE void erts_port_task_dec_outstanding_io_tasks(void); /* NOTE: Do not access any of the exported variables directly */ extern erts_atomic_t erts_port_task_outstanding_io_tasks; #endif @@ -226,6 +229,18 @@ erts_port_task_have_outstanding_io_tasks(void) return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks) != 0); } + +ERTS_GLB_INLINE void +erts_port_task_inc_outstanding_io_tasks(void) +{ + erts_atomic_inc_nob(&erts_port_task_outstanding_io_tasks); +} + +ERTS_GLB_INLINE void +erts_port_task_dec_outstanding_io_tasks(void) +{ + erts_atomic_dec_nob(&erts_port_task_outstanding_io_tasks); +} #endif #endif diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index ea0fa38848bd..44dfcc5eb1af 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -42,6 +42,7 @@ #include "bif.h" #include "erl_bif_unique.h" #include "erl_proc_sig_queue.h" +#include "erl_check_io.h" #include "dtrace-wrapper.h" #define ERTS_SIG_REDS_CNT_FACTOR 4 @@ -2472,6 +2473,16 @@ erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from, return proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_LINK); } +int +erts_proc_sig_send_nif_select(Eterm to, ErtsMessage *msg) { + ErtsSignal *sig = (ErtsSignal*)msg; + + sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_NIF_SELECT, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + return proc_queue_signal(NULL, am_system, to, sig, 0, ERTS_SIG_Q_OP_NIF_SELECT); +} + ErtsSigUnlinkOp * erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from) { @@ -6355,6 +6366,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, break; } +#if ERTS_POLL_USE_SCHEDULER_POLLING + case ERTS_SIG_Q_OP_NIF_SELECT: { + + Eterm msg = erts_io_handle_nif_select(sig); + + convert_prepared_sig_to_msg(c_p, &tracing, sig, msg, am_undefined, next_nm_sig); + + cnt += 4; + + erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN); + + break; + } +#endif default: ERTS_INTERNAL_ERROR("Unknown signal"); @@ -6726,6 +6751,7 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, } break; + case ERTS_SIG_Q_OP_NIF_SELECT: case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_ALIAS_MSG: sig->next = NULL; @@ -6918,6 +6944,7 @@ clear_seq_trace_token(ErtsMessage *sig) case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_ADJ_MSGQ: case ERTS_SIG_Q_OP_FLUSH: + case ERTS_SIG_Q_OP_NIF_SELECT: break; default: @@ -7007,7 +7034,9 @@ erts_proc_sig_signal_size(ErtsSignal *sig) case ERTS_SIG_Q_OP_SYNC_SUSPEND: case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_IS_ALIVE: - case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY: { + case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY: + case ERTS_SIG_Q_OP_NIF_SELECT: + { ErlHeapFragment *hf; size = sizeof(ErtsMessageRef); size += ERTS_HEAP_FRAG_SIZE(((ErtsMessage *) sig)->hfrag.alloc_size); @@ -8614,6 +8643,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_FLUSH: case ERTS_SIG_Q_OP_RPC: + case ERTS_SIG_Q_OP_NIF_SELECT: break; default: diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 6b4e22090b6e..cb0892cf54a5 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -169,7 +169,7 @@ typedef struct { * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 19 +#define ERTS_SIG_Q_OP_MAX 20 #define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */ #define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/ @@ -190,7 +190,8 @@ typedef struct { #define ERTS_SIG_Q_OP_RECV_MARK 16 #define ERTS_SIG_Q_OP_UNLINK_ACK 17 #define ERTS_SIG_Q_OP_ADJ_MSGQ 18 -#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_FLUSH 19 +#define ERTS_SIG_Q_OP_NIF_SELECT ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10) @@ -1178,6 +1179,9 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id); void erts_proc_sig_send_move_msgq_off_heap(Eterm to); +int +erts_proc_sig_send_nif_select(Eterm to, ErtsMessage *msg); + /* * End of send operations of currently supported process signals. */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 230e4437cfd6..4de61cd00b75 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -3392,7 +3392,7 @@ try_set_sys_scheduling(void) static ERTS_INLINE int -prepare_for_sys_schedule(void) +prepare_for_sys_schedule(ErtsSchedulerData *esdp) { if (erts_sched_poll_enabled()) { while (!erts_port_task_have_outstanding_io_tasks() @@ -3407,7 +3407,7 @@ prepare_for_sys_schedule(void) #else #define clear_sys_scheduling() -#define prepare_for_sys_schedule() 0 +#define prepare_for_sys_schedule(esdp) 0 #endif #ifdef HARDDEBUG @@ -3542,13 +3542,18 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) current_time = 0; timeout_time = ERTS_MONOTONIC_TIME_MAX; } +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && erts_sched_poll_enabled()) { + erts_io_clear_nif_select_handles(esdp); + } +#endif if (do_timeout) { if (!thr_prgr_active) { erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } } - else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule()) { + else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule(esdp)) { /* We sleep in check_io, only for normal schedulers */ if (thr_prgr_active) { erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0); @@ -5985,6 +5990,12 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->io.out = (Uint64) 0; esdp->io.in = (Uint64) 0; +#if ERTS_POLL_USE_SCHEDULER_POLLING + for (int i = 0; i < sizeof(esdp->nif_select_fds) / sizeof(ErtsSysFdType); i++) { + esdp->nif_select_fds[i] = ERTS_SYS_FD_INVALID; + } +#endif + esdp->pending_signal.sig = NULL; esdp->pending_signal.to = THE_NON_VALUE; #ifdef DEBUG @@ -9834,7 +9845,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) goto check_activities_to_run; } else if (is_normal_sched && fcalls > (2 * context_reds) && - prepare_for_sys_schedule()) { + prepare_for_sys_schedule(esdp)) { ErtsMonotonicTime current_time; /* * Schedule system-level activities. diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index a3fb22b84442..73c4b7dba0aa 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -71,6 +71,9 @@ typedef struct process Process; #include "erl_thr_progress.h" #undef ERL_THR_PROGRESS_TSD_TYPE_ONLY +// Included for ERTS_POLL_USE_SCHEDULER_POLLING +#include "erl_poll.h" + #define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT 0 #define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT 0 @@ -721,6 +724,9 @@ struct ErtsSchedulerData_ { Uint64 out; Uint64 in; } io; +#if ERTS_POLL_USE_SCHEDULER_POLLING + ErtsSysFdType nif_select_fds[5]; /* Used by check io */ +#endif struct { ErtsSignal* sig; Eterm to; diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index c172ac0df50a..82ee425a82d5 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -142,7 +142,7 @@ extern Eterm erts_nif_call_function(Process *p, Process *tracee, int erts_call_dirty_nif(ErtsSchedulerData *esdp, Process *c_p, ErtsCodePtr I, Eterm *reg); -ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env); +ErtsMessage* erts_create_message_from_nif_env(ErlNifEnv* msg_env, Uint extra); /* Driver handle (wrapper for old plain handle) */ diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index b66e7a727d50..16d006577eb5 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -35,9 +35,11 @@ #include "sys.h" #include "global.h" #include "erl_port.h" +#include "erl_port_task.h" #include "erl_check_io.h" #include "erl_thr_progress.h" #include "erl_bif_unique.h" +#include "erl_proc_sig_queue.h" #include "dtrace-wrapper.h" #include "lttng-wrapper.h" #define ERTS_WANT_TIMER_WHEEL_API @@ -85,17 +87,19 @@ typedef enum { to scheduler pollset */ ERTS_EV_FLAG_IN_SCHEDULER = 0x4, /* Set when the fd is currently in scheduler pollset */ + ERTS_EV_FLAG_NIF_SELECT = 0x8, /* Set if a nif select message is in-flight */ #else ERTS_EV_FLAG_SCHEDULER = ERTS_EV_FLAG_CLEAR, ERTS_EV_FLAG_IN_SCHEDULER = ERTS_EV_FLAG_CLEAR, + ERTS_EV_FLAG_NIF_SELECT = ERTS_EV_FLAG_CLEAR, #endif #ifdef ERTS_POLL_USE_FALLBACK - ERTS_EV_FLAG_FALLBACK = 0x8, /* Set when kernel poll rejected fd + ERTS_EV_FLAG_FALLBACK = 0x10, /* Set when kernel poll rejected fd and it was put in the nkp version */ #else ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR, #endif - ERTS_EV_FLAG_WANT_ERROR = 0x10, /* ERL_NIF_SELECT_ERROR turned on */ + ERTS_EV_FLAG_WANT_ERROR = 0x20, /* ERL_NIF_SELECT_ERROR turned on */ /* Combinations, defined only to be displayed by debugger (gdb) */ ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK, @@ -115,9 +119,14 @@ static const char* event_state_flag_to_str(EventStateFlags f, const char *buff, case ERTS_EV_FLAG_FALLBACK | ERTS_EV_FLAG_USED: return "USED|FLBK"; #if ERTS_POLL_USE_SCHEDULER_POLLING + case ERTS_EV_FLAG_CLEAR | ERTS_EV_FLAG_NIF_SELECT: return "CLEAR|NIF_SELECT"; + case ERTS_EV_FLAG_USED | ERTS_EV_FLAG_NIF_SELECT: return "USED|NIF_SELECT"; case ERTS_EV_FLAG_SCHEDULER: return "SCHD"; case ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_USED: return "USED|SCHD"; + case ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_NIF_SELECT: return "SCHD|NIF_SELECT"; case ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER: return "IN_SCHD"; + case ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER | + ERTS_EV_FLAG_NIF_SELECT: return "IN_SCHD|NIF_SELECT"; case ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER | ERTS_EV_FLAG_USED: return "USED|IN_SCHD"; #endif @@ -178,6 +187,10 @@ typedef struct { * - When a driver_select has triggered, but ready_input has not yet been called. * In this scenario `events` will have the event, but it will not be active * until erts_io_notify_port_task_executed has been called. + * - When an FD has been migrated to the sched_pollset, but has then been + * deselected. In this scenario the FD will remain in the scheduler pollset + * until it is triggered. In this scenario the event till be active, but not + * part of `events`. */ ErtsPollEvents active_events; EventStateType type; @@ -351,6 +364,7 @@ static ERTS_INLINE void check_fd_cleanup(ErtsDrvEventState *state, ErtsDrvSelectDataState **free_select, ErtsNifSelectDataState **free_nif); +static void clear_select_event(struct erts_nif_select_event* e); static ERTS_INLINE void iready(Eterm id, ErtsDrvEventState *state); static ERTS_INLINE void oready(Eterm id, ErtsDrvEventState *state); #ifdef DEBUG_PRINT_MODE @@ -653,7 +667,13 @@ abort_tasks(ErtsDrvEventState *state, int mode) } } +typedef struct { + Eterm message; + ErlNifEvent evt; +} ErtsNifSelectSignalData; + static void prepare_select_msg(struct erts_nif_select_event* e, + ErlNifEvent evt, enum ErlNifSelectFlags mode, Eterm recipient, ErtsResource* resource, @@ -662,32 +682,34 @@ static void prepare_select_msg(struct erts_nif_select_event* e, Eterm event_atom) { ErtsMessage* mp; - Eterm* hp; - Uint hsz; + ErtsNifSelectSignalData* xsig; + Eterm* hp, *hp_start; + Uint hsz = sizeof(ErtsNifSelectSignalData) / sizeof(Eterm); /* size of msg ptr */ - if (is_not_nil(e->pid)) { - ASSERT(e->mp); - erts_cleanup_messages(e->mp); - } + + clear_select_event(e); if (mode & ERL_NIF_SELECT_CUSTOM_MSG) { if (msg_env) { - mp = erts_create_message_from_nif_env(msg_env); + mp = erts_create_message_from_nif_env(msg_env, hsz); ERL_MESSAGE_TERM(mp) = msg; + hp = &mp->hfrag.mem[0]; + hp_start = hp; } else { - hsz = size_object(msg); + Eterm msgsz = size_object(msg); + hsz += msgsz; mp = erts_alloc_message(hsz, &hp); - ERL_MESSAGE_TERM(mp) = copy_struct(msg, hsz, &hp, &mp->hfrag.off_heap); + hp_start = hp; + ERL_MESSAGE_TERM(mp) = copy_struct(msg, msgsz, &hp, &mp->hfrag.off_heap); } } else { ErtsBinary* bin; Eterm resource_term, ref_term, tuple; - Eterm* hp_start; /* {select, Resource, Ref, EventAtom} */ - hsz = 5 + ERTS_MAGIC_REF_THING_SIZE; + hsz += 5 + ERTS_MAGIC_REF_THING_SIZE; if (is_internal_ref(msg)) hsz += ERTS_REF_THING_SIZE; else @@ -711,7 +733,19 @@ static void prepare_select_msg(struct erts_nif_select_event* e, tuple = TUPLE4(hp, am_select, resource_term, ref_term, event_atom); hp += 5; ERL_MESSAGE_TERM(mp) = tuple; - ASSERT(hp == hp_start + hsz); (void)hp_start; + ASSERT(hp == hp_start + hsz - sizeof(ErtsNifSelectSignalData)/sizeof(Eterm)); + } + + mp->hfrag.used_size = hp - hp_start; + + if (mode & ERL_NIF_SELECT_READ && ERTS_POLL_USE_SCHEDULER_POLLING) { + /* Save original msg ptr */ + xsig = (ErtsNifSelectSignalData*)hp; + xsig->message = ERL_MESSAGE_TERM(mp); + xsig->evt = evt; + /* Setting to THE_NON_VALUE so that + erts_proc_sig_send_nif_select puts proper signal tag here */ + ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; } ASSERT(is_not_nil(recipient)); @@ -719,17 +753,81 @@ static void prepare_select_msg(struct erts_nif_select_event* e, e->mp = mp; } -static ERTS_INLINE void send_select_msg(struct erts_nif_select_event* e) -{ - Process* rp = erts_proc_lookup(e->pid); +#if ERTS_POLL_USE_SCHEDULER_POLLING +static void +erts_io_clear_nif_select(ErtsSysFdType fd) { + erts_mtx_t *mtx = fd_mtx(fd); + ErtsDrvEventState *state; + erts_mtx_lock(mtx); + state = get_drv_ev_state(fd); + if (state->flags & ERTS_EV_FLAG_NIF_SELECT) { + DEBUG_PRINT_FD("Clear ERTS_EV_FLAG_NIF_SELECT (%d) on sched %d", state, + erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks), + erts_get_scheduler_id()); + state->flags &= ~ERTS_EV_FLAG_NIF_SELECT; + erts_port_task_dec_outstanding_io_tasks(); + } + erts_mtx_unlock(mtx); +} - ASSERT(is_internal_pid(e->pid)); - if (!rp) { - erts_cleanup_messages(e->mp); - return; +void +erts_io_clear_nif_select_handles(ErtsSchedulerData *esdp) { + for (int i = 0; i < sizeof(esdp->nif_select_fds) / sizeof(ErtsSysFdType); i++) { + if (esdp->nif_select_fds[i] != ERTS_SYS_FD_INVALID) { + DEBUG_PRINT("%d: Clear in sched %d's state", esdp->nif_select_fds[i], esdp->no); + erts_io_clear_nif_select(esdp->nif_select_fds[i]); + esdp->nif_select_fds[i] = ERTS_SYS_FD_INVALID; + } } +} +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING +Eterm +erts_io_handle_nif_select(ErtsMessage *sig) { + ErtsNifSelectSignalData* xsig = (ErtsNifSelectSignalData*) (char *) (&sig->hfrag.mem[0] + + sig->hfrag.used_size); + ASSERT(sig->hfrag.used_size < sig->hfrag.alloc_size); + + if (erts_sched_poll_enabled()) { + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + for (int i = 0; i < sizeof(esdp->nif_select_fds) / sizeof(ErtsSysFdType); i++) { + if (esdp->nif_select_fds[i] == ERTS_SYS_FD_INVALID) { + DEBUG_PRINT("%d: Put in sched %d's state", xsig->evt, esdp->no); + esdp->nif_select_fds[i] = xsig->evt; + goto done; + } + } + /* There were no slots left, clear the flag and reset the poll counter */ + erts_io_clear_nif_select(xsig->evt); + } + done: + return xsig->message; +} +#endif - erts_queue_message(rp, 0, e->mp, ERL_MESSAGE_TERM(e->mp), am_system); +static ERTS_INLINE void send_select_msg(struct erts_nif_select_event* e) +{ +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (ERL_MESSAGE_TERM(e->mp) == THE_NON_VALUE) { + /* If message term is THE_NON_VALUE this should be sent as a proc signal */ + if (!erts_proc_sig_send_nif_select(e->pid, e->mp)) + clear_select_event(e); + else { + erts_port_task_inc_outstanding_io_tasks(); + } + } else +#endif + { + /* Otherwise send as normal message */ + Process* rp = erts_proc_lookup(e->pid); + ASSERT(is_value(ERL_MESSAGE_TERM(e->mp))); + if (!rp) { + clear_select_event(e); + return; + } + erts_queue_message(rp, 0, e->mp, ERL_MESSAGE_TERM(e->mp), am_system); + } } static void clear_select_event(struct erts_nif_select_event* e) @@ -737,6 +835,11 @@ static void clear_select_event(struct erts_nif_select_event* e) if (is_not_nil(e->pid)) { /* Discard unsent message */ ASSERT(e->mp); + if (ERL_MESSAGE_TERM(e->mp) == THE_NON_VALUE) { + ErtsNifSelectSignalData* xsig = (ErtsNifSelectSignalData*) (char *) (&e->mp->hfrag.mem[0] + + e->mp->hfrag.used_size); + ERL_MESSAGE_TERM(e->mp) = xsig->message; + } erts_cleanup_messages(e->mp); e->mp = NULL; e->pid = NIL; @@ -958,7 +1061,6 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) ctl_events |= ERTS_POLL_EV_OUT; } - ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); @@ -1119,6 +1221,7 @@ enif_select_x(ErlNifEnv* env, enum { NO_STOP=0, CALL_STOP, CALL_STOP_AND_RELEASE } call_stop = NO_STOP; ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; + ErtsPollEvents new_events = 0; ASSERT(!erts_dbg_is_resource_dying(resource)); @@ -1194,15 +1297,13 @@ enif_select_x(ErlNifEnv* env, erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); print_nif_select_op(dsbufp, fd, mode, resource, msg); steal_pending_stop_nif(dsbufp, resource, state, mode, on); - if (state->type == ERTS_EV_TYPE_STOP_NIF) { - ret = ERL_NIF_SELECT_STOP_SCHEDULED; /* ?? */ - goto done; - } + ret = ERL_NIF_SELECT_STOP_SCHEDULED; + goto done; + } + default: ASSERT(state->type == ERTS_EV_TYPE_NONE); break; } - default: break; - } ASSERT((state->type == ERTS_EV_TYPE_NIF) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); @@ -1210,11 +1311,42 @@ enif_select_x(ErlNifEnv* env, old_events = state->events; if (on) { + if (state->type == ERTS_EV_TYPE_NONE) + ctl_op = ERTS_POLL_OP_ADD; +#if ERTS_POLL_USE_SCHEDULER_POLLING + else { + if (!(state->flags & ERTS_EV_FLAG_SCHEDULER) && + !(state->flags & ERTS_EV_FLAG_FALLBACK) && + (ctl_events & ERTS_POLL_EV_IN)) { + if (erts_sched_poll_enabled() && (state->flags & ERTS_EV_FLAG_NIF_SELECT) && + state->count++ > 10) { + int wake_poller = 0; + DEBUG_PRINT_FD("moving to scheduler ps", state); + new_events = erts_poll_control(get_scheduler_pollset(), fd, ERTS_POLL_OP_ADD, + ERTS_POLL_EV_IN, &wake_poller); + state->flags |= ERTS_EV_FLAG_SCHEDULER|ERTS_EV_FLAG_IN_SCHEDULER; + state->events |= ERTS_POLL_EV_IN; + state->active_events |= ERTS_POLL_EV_IN; + old_events = state->events; + } + } + if ((ctl_events & ERTS_POLL_EV_IN) && (state->flags & ERTS_EV_FLAG_NIF_SELECT)) { + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + DEBUG_PRINT_FD("Clear ERTS_EV_FLAG_NIF_SELECT (%d) on sched %d", state, + erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks), + erts_get_scheduler_id()); + state->flags &= ~ERTS_EV_FLAG_NIF_SELECT; + erts_port_task_dec_outstanding_io_tasks(); + for (int i = 0; i < sizeof(esdp->nif_select_fds) / sizeof(ErtsSysFdType); i++) { + if (esdp->nif_select_fds[i] == state->fd) + esdp->nif_select_fds[i] = ERTS_SYS_FD_INVALID; + } + } + } +#endif ctl_events &= ~old_events; state->events |= ctl_events; state->active_events |= ctl_events; - if (state->type == ERTS_EV_TYPE_NONE) - ctl_op = ERTS_POLL_OP_ADD; if (ctl_events & ERTS_POLL_EV_ERR) state->flags |= ERTS_EV_FLAG_WANT_ERROR; } @@ -1225,26 +1357,26 @@ enif_select_x(ErlNifEnv* env, } if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) { - ErtsPollEvents new_events; new_events = erts_io_control_wakeup(state, ctl_op, state->active_events, &wake_poller); - if (new_events & ERTS_POLL_EV_NVAL) { - if (state->type == ERTS_EV_TYPE_NIF && !old_events) { - state->type = ERTS_EV_TYPE_NONE; - state->flags = 0; - state->driver.nif->in.pid = NIL; - state->driver.nif->out.pid = NIL; - state->driver.nif->err.pid = NIL; - state->driver.stop.resource = NULL; - } - ret = INT_MIN | ERL_NIF_SELECT_FAILED; - goto done; + ASSERT(new_events == state->active_events || new_events & ERTS_POLL_EV_NVAL); + } + + if (new_events & ERTS_POLL_EV_NVAL) { + if (state->type == ERTS_EV_TYPE_NIF && !old_events) { + state->type = ERTS_EV_TYPE_NONE; + state->flags = 0; + state->driver.nif->in.pid = NIL; + state->driver.nif->out.pid = NIL; + state->driver.nif->err.pid = NIL; + state->driver.stop.resource = NULL; } - ASSERT(new_events == state->events); + ret = INT_MIN | ERL_NIF_SELECT_FAILED; + goto done; } ASSERT(state->type == ERTS_EV_TYPE_NIF @@ -1263,17 +1395,17 @@ enif_select_x(ErlNifEnv* env, ASSERT(state->type == ERTS_EV_TYPE_NIF); ASSERT(state->driver.stop.resource == resource); if (mode & ERL_DRV_READ) { - prepare_select_msg(&state->driver.nif->in, mode, recipient, + prepare_select_msg(&state->driver.nif->in, e, mode, recipient, resource, msg, msg_env, am_ready_input); msg_env = NULL; } if (mode & ERL_DRV_WRITE) { - prepare_select_msg(&state->driver.nif->out, mode, recipient, + prepare_select_msg(&state->driver.nif->out, e, mode, recipient, resource, msg, msg_env, am_ready_output); msg_env = NULL; } if (mode & ERL_NIF_SELECT_ERROR) { - prepare_select_msg(&state->driver.nif->err, mode, recipient, + prepare_select_msg(&state->driver.nif->err, e, mode, recipient, resource, msg, msg_env, am_ready_error); } ret = 0; @@ -1326,6 +1458,7 @@ enif_select_x(ErlNifEnv* env, state->type = ERTS_EV_TYPE_STOP_NIF; ret |= ERL_NIF_SELECT_STOP_SCHEDULED; } + state->count = 0; state->flags &= ~ERTS_EV_FLAG_WANT_ERROR; } else @@ -1791,6 +1924,9 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only erl_errno_id(poll_ret), poll_ret); erts_send_error_to_logger_nogl(dsbufp); } + // if (is_normal_sched) { + // erts_fprintf(stderr, "%d: woke up\r\n", esdp->no); + // } ERTS_MSACC_POP_STATE(); return; } @@ -1952,6 +2088,14 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only check_fd_cleanup(state, &free_select, &free_nif); } +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (erts_sched_poll_enabled() && is_not_nil(in_ev.pid)) { + DEBUG_PRINT_FD("Set ERTS_EV_FLAG_NIF_SELECT on sched %d", + state, erts_get_scheduler_id()); + state->flags |= ERTS_EV_FLAG_NIF_SELECT; + } +#endif + erts_mtx_unlock(fd_mtx(fd)); if (is_not_nil(in_ev.pid)) { @@ -1967,6 +2111,13 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only } case ERTS_EV_TYPE_STOP_NIF: { +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (psi->ps == get_scheduler_pollset()) + break; +#endif +#if ERTS_POLL_USE_FALLBACK + ASSERT(psi->ps == get_fallback_pollset()); +#endif resource = state->driver.stop.resource; state->type = ERTS_EV_TYPE_NONE; goto case_ERTS_EV_TYPE_NONE; @@ -1986,6 +2137,15 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only case ERTS_EV_TYPE_NONE: /* Deselected ... */ case_ERTS_EV_TYPE_NONE: state->flags &= ~ERTS_EV_FLAG_FALLBACK; +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (state->flags & ERTS_EV_FLAG_NIF_SELECT) { + DEBUG_PRINT_FD("Clear ERTS_EV_FLAG_NIF_SELECT (%d) on sched %d", state, + erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks), + erts_get_scheduler_id()); + state->flags &= ~ERTS_EV_FLAG_NIF_SELECT; + erts_port_task_dec_outstanding_io_tasks(); + } +#endif ASSERT(!state->events && !state->active_events && !state->flags); check_fd_cleanup(state, &free_select, &free_nif); break; diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h index d44b494223f3..96cd2732d417 100644 --- a/erts/emulator/sys/common/erl_check_io.h +++ b/erts/emulator/sys/common/erl_check_io.h @@ -58,6 +58,23 @@ Eterm erts_check_io_info(void *proc); void erts_io_notify_port_task_executed(ErtsPortTaskType type, ErtsPortTaskHandle *handle, void (*reset)(ErtsPortTaskHandle *)); + +#if ERTS_POLL_USE_SCHEDULER_POLLING +/** + * Handles an nif select non-message signal. Returns the message Eterm. + * + * @param sig The signal that has arrived + */ +Eterm erts_io_handle_nif_select(ErtsMessage *sig); + +/** + * Clears all nif select handles from scheduler queue. + * + * @param sig The signal that has arrived + */ +void erts_io_clear_nif_select_handles(ErtsSchedulerData *esdp); +#endif + /** * Returns the maximum number of fds that the check io framework can handle. */ diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index 1a512e054490..06206d0a2588 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -89,6 +89,7 @@ -export([bin_prefix/2]). -export([get_check_io_total/1]). % for z_SUITE.erl +-export([check_io_debug/0]). %% For nif_SUITE.erl -include_lib("common_test/include/ct.hrl"). diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl index ea5ffbff25d0..c5b9087a6529 100644 --- a/erts/emulator/test/nif_SUITE.erl +++ b/erts/emulator/test/nif_SUITE.erl @@ -37,8 +37,8 @@ t_load_race/1, t_call_nif_early/1, load_traced_nif/1, - select/1, select_steal/1, - select_error/1, + select/1, select_scheduler/1, + select_steal/1, select_error/1, monitor_process_a/1, monitor_process_b/1, monitor_process_c/1, @@ -154,6 +154,7 @@ select_nif/6, dupe_resource_nif/1, pipe_nif/0, + socketpair_nif/0, write_nif/2, read_nif/2, close_nif/1, @@ -268,6 +269,7 @@ groups() -> monitor_process_purge, demonitor_process]}, {select, [], [select, + select_scheduler, select_error, select_steal]}]. @@ -318,6 +320,7 @@ end_per_testcase(_Func, _Config) -> testcase_cleanup(). testcase_cleanup() -> + driver_SUITE:check_io_debug(), P1 = code:purge(nif_mod), Del = code:delete(nif_mod), P2 = code:purge(nif_mod), @@ -1003,6 +1006,7 @@ select_2(Flag, Ref1, Ref2, MSG_ENV) -> select_3() -> erlang:garbage_collect(), {_,_,2} = last_resource_dtor_call(), + ok. receive_ready(R, Ref, IOatom) when is_reference(Ref) -> @@ -1011,6 +1015,162 @@ receive_ready(_, Msg, _) -> [Got] = flush(), {true,_,_} = {Got=:=Msg, Got, Msg}. +select_scheduler(Config) -> + ensure_lib_loaded(Config), + + RefBin = list_to_binary(lists:duplicate(100, $x)), + + select_scheduler_do(0, make_ref(), null), + select_scheduler_do(?ERL_NIF_SELECT_CUSTOM_MSG, [a, "list", RefBin], null), + select_scheduler_do(?ERL_NIF_SELECT_CUSTOM_MSG, [a, "list", RefBin], alloc_env), + + case has_scheduler_pollset() of + true -> + {ok, Peer, Node} = ?CT_PEER(#{ args => ["+IOs","false"]}), + + erpc:call(Node, fun() -> + ensure_lib_loaded(Config), + select_scheduler_do(0, make_ref(), null), + select_scheduler_do(?ERL_NIF_SELECT_CUSTOM_MSG, [a, "list", RefBin], null), + select_scheduler_do(?ERL_NIF_SELECT_CUSTOM_MSG, [a, "list", RefBin], alloc_env) + end), + + peer:stop(Peer); + _ -> + ok + end. + +%% This testcase tests so that scheduler polling works as it should for NIFs +select_scheduler_do(Flag, Ref, MSG_ENV) -> + + OriginalSchedPollFds = get_scheduler_pollset_size(), + SchedulerFDs = case has_scheduler_pollset() of + true -> 1; + false -> 0 + end, + + {{R, _R_ptr}, {W, W_ptr}} = socketpair_nif(), + ok = write_nif(W, <<"hej">>), + <<"hej">> = read_nif(R, 3), + + %% Fill the output buffers and setup a select + FullData = write_full(R, $a), + select_nif(R, ?ERL_NIF_SELECT_WRITE bor Flag, R, self(), Ref, MSG_ENV), + + eagain = read_nif(R, 3), + + %% Move FD to scheduler pollset + move_fd_to_scheduler_pollset(W, R, Flag, Ref, MSG_ENV), + ?assertEqual(OriginalSchedPollFds + SchedulerFDs, get_scheduler_pollset_size()), + + %% Write without select, means migrate back to poll thread + ok = write_nif(W, <<"hej">>), + %% Make sure schedulers sleeps, triggering migration back to poll thread + timer:sleep(10), + <<"hej">> = read_nif(R, 3), + + ?assertEqual(OriginalSchedPollFds, get_scheduler_pollset_size()), + + %% Move FD to scheduler pollset again + move_fd_to_scheduler_pollset(W, R, Flag, Ref, MSG_ENV), + ?assertEqual(OriginalSchedPollFds + SchedulerFDs, get_scheduler_pollset_size()), + + %% Check that we get WRITE select event if read FullData + FullData = read_nif(W, byte_size(FullData)), + receive_ready(R, Ref, ready_output), + ?assertEqual(OriginalSchedPollFds + SchedulerFDs, get_scheduler_pollset_size()), + + %% Check that we can do a WRITE select when READ is on scheduler pollset + FullDataAgain = write_full(R, $b), + select_nif(R, ?ERL_NIF_SELECT_WRITE bor Flag, R, self(), Ref, MSG_ENV), + FullDataAgain = read_nif(W, byte_size(FullDataAgain)), + receive_ready(R, Ref, ready_output), + ?assertEqual(OriginalSchedPollFds + SchedulerFDs, get_scheduler_pollset_size()), + + %% Check that we can de-select on READ when in scheduler pollset + 0 = select_nif(R, ?ERL_NIF_SELECT_READ bor Flag, R, self(), Ref, MSG_ENV), + ?assertEqual(OriginalSchedPollFds + SchedulerFDs, get_scheduler_pollset_size()), + ?ERL_NIF_SELECT_READ_CANCELLED = + select_nif(R, ?ERL_NIF_SELECT_READ bor ?ERL_NIF_SELECT_CANCEL, R, self(), Ref, MSG_ENV), + ?assertEqual(OriginalSchedPollFds + SchedulerFDs, get_scheduler_pollset_size()), + ok = write_nif(W, <<"hej">>), + <<"hej">> = read_nif(R, 3), + [] = flush(0), + + %% Close write side, while read side is in scheduler pollset + check_stop_ret(select_nif(W, ?ERL_NIF_SELECT_STOP, W, null, Ref, null)), + [{fd_resource_stop, W_ptr, _}] = flush(), + {1, {W_ptr,_}} = last_fd_stop_call(), + true = is_closed_nif(W), + [] = flush(0), + 0 = select_nif(R, ?ERL_NIF_SELECT_READ bor Flag, R, self(), Ref, MSG_ENV), + receive_ready(R, Ref, ready_input), + eof = read_nif(R,1), + + check_stop_ret(select_nif(R, ?ERL_NIF_SELECT_STOP, R, null, Ref, null)), + [{fd_resource_stop, R_ptr, _}] = flush(), + {1, {R_ptr,_}} = last_fd_stop_call(), + true = is_closed_nif(R), + [] = flush(0), + + ?assertEqual(OriginalSchedPollFds, get_scheduler_pollset_size()), + + %% We setup 100 fds in parallel to make sure all end up in + %% scheduler pollset + NumberOfFds = 10, + Parent = self(), + Pids = [spawn_monitor(fun() -> + link(Parent), + {{R1, _R1_ptr}, {W1, _W1_ptr}} = socketpair_nif(), + move_fd_to_scheduler_pollset(W1, R1, Flag, Ref, MSG_ENV), + Parent ! self(), + receive stop -> ok end, + check_stop_ret(select_nif(W1, ?ERL_NIF_SELECT_STOP, W1, null, Ref, null)), + check_stop_ret(select_nif(R1, ?ERL_NIF_SELECT_STOP, R1, null, Ref, null)) + end) || _ <- lists:seq(1,NumberOfFds)], + [receive P -> ok end || {P, _} <- Pids], + ?assertEqual(OriginalSchedPollFds + NumberOfFds*SchedulerFDs, get_scheduler_pollset_size()), + [begin P ! stop, receive {'DOWN', Ref1, _, _, _} -> ok end end || {P, Ref1} <- Pids], + + NumberOfClosedFds = NumberOfFds * 2, + {NumberOfClosedFds, {_,_}} = last_fd_stop_call(), + + timer:sleep(1000), + + ?assertEqual(OriginalSchedPollFds, get_scheduler_pollset_size()), + + ok. + +move_fd_to_scheduler_pollset(W, R, Flag, Ref, MSG_ENV) -> + [begin + 0 = select_nif(R,?ERL_NIF_SELECT_READ bor Flag, R,null,Ref,MSG_ENV), + Buf = integer_to_binary(I), + ok = write_nif(W, Buf), + receive + {select, R, Ref, ready_input} -> + ok; + Msg when Ref =:= Msg -> + ok + end, + Buf = read_nif(R, byte_size(Buf)) + end || I <- lists:seq(1,30)]. + +has_scheduler_pollset() -> + lists:search(fun(PS) -> + proplists:get_value(fallback, PS) =:= false andalso + proplists:get_value(poll_threads, PS) =:= 0 + end, erlang:system_info(check_io)) =/= false. +get_scheduler_pollset_size() -> + CIO = erlang:system_info(check_io), + case lists:search(fun(PS) -> + proplists:get_value(fallback, PS) =:= false andalso + proplists:get_value(poll_threads, PS) =:= 0 + end, CIO) of + {value, PS} -> + proplists:get_value(total_poll_set_size, PS); + false -> + 0 + end. select_error(Config) when is_list(Config) -> ensure_lib_loaded(Config), @@ -4431,6 +4591,7 @@ format_term_nif(_,_) -> ?nif_stub. select_nif(_,_,_,_,_,_) -> ?nif_stub. dupe_resource_nif(_) -> ?nif_stub. pipe_nif() -> ?nif_stub. +socketpair_nif() -> ?nif_stub. write_nif(_,_) -> ?nif_stub. read_nif(_,_) -> ?nif_stub. close_nif(_) -> ?nif_stub. diff --git a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c index 9e77ff94ee46..845fd0640f23 100644 --- a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c +++ b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c @@ -28,6 +28,8 @@ #include #include #include +#include +#include #endif #include "nif_mod.h" @@ -2708,6 +2710,35 @@ static ERL_NIF_TERM pipe_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[] enif_make_tuple2(env, write_fd, make_pointer(env, write_rsrc))); } +/* + * Create a read-write socketpair with two fds (to read and to write) + */ +static ERL_NIF_TERM socketpair_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + struct fd_resource* read_rsrc; + struct fd_resource* write_rsrc; + ERL_NIF_TERM read_fd, write_fd; + int fds[2], flags; + + if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds) < 0) + return enif_make_string(env, "pipe failed", ERL_NIF_LATIN1); + + read_rsrc = enif_alloc_resource(fd_resource_type, sizeof(struct fd_resource)); + write_rsrc = enif_alloc_resource(fd_resource_type, sizeof(struct fd_resource)); + read_rsrc->fd = fds[0]; + read_rsrc->was_selected = 0; + write_rsrc->fd = fds[1]; + write_rsrc->was_selected = 0; + read_fd = enif_make_resource(env, read_rsrc); + write_fd = enif_make_resource(env, write_rsrc); + enif_release_resource(read_rsrc); + enif_release_resource(write_rsrc); + + return enif_make_tuple2(env, + enif_make_tuple2(env, read_fd, make_pointer(env, read_rsrc)), + enif_make_tuple2(env, write_fd, make_pointer(env, write_rsrc))); +} + /* * Create (dupe) of a resource with the same fd, to test stealing */ @@ -2783,7 +2814,7 @@ static ERL_NIF_TERM read_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[] } return res; } - else if (n == 0) { + else if (n == 0 || errno == ECONNRESET) { return atom_eof; } else if (errno == EAGAIN) { @@ -3891,6 +3922,7 @@ static ErlNifFunc nif_funcs[] = {"select_nif", 6, select_nif}, #ifndef __WIN32__ {"pipe_nif", 0, pipe_nif}, + {"socketpair_nif", 0, socketpair_nif}, {"write_nif", 2, write_nif}, {"dupe_resource_nif", 1, dupe_resource_nif}, {"read_nif", 2, read_nif},