Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce erpc:call_opt/3,5 #8642

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 92 additions & 82 deletions lib/kernel/src/erpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ is available on the involved nodes.
reqids_add/3,
reqids_to_list/1]).

-export_type([request_id/0, request_id_collection/0, timeout_time/0]).
-export_type([call_options/0,
request_id/0,
request_id_collection/0,
timeout_time/0]).

%% Internal exports (also used by the 'rpc' module)

Expand Down Expand Up @@ -121,45 +124,60 @@ The value can be:
a time further into the future than `4294967295` milliseconds. Identifying the
timeout using an absolute timeout value is especially handy when you have a
deadline for responses corresponding to a complete collection of requests
(`t:request_id_collection/0`) , since you do not have to recalculate the
(`t:request_id_collection/0`), since you do not have to recalculate the
relative time until the deadline over and over again.
""".
-type timeout_time() :: 0..?MAX_INT_TIMEOUT | 'infinity' | {abs, integer()}.

-doc """
Options to be used in [`call/3,5`](`call/5`) and
[`multicall/3,5`](`multicall/5`) functions.

- **`timeout`** - Upper time limit for call operations to complete, see
`t:timeout_time/0`. Default: `infinity`.

- **`always_spawn`** - If `true`, the `apply()` will _always_ be performed
in a freshly spawned process. If `false`, the calling process _may_ be
used instead, if possible. Default: `false`.
""".
-doc(#{since => <<"OTP 28.0">>}).
-type call_options() :: #{'timeout' => Timeout :: timeout_time(),
'always_spawn' => AlwaysSpawn :: boolean()}.

%%------------------------------------------------------------------------
%% Exported API
%%------------------------------------------------------------------------

-doc(#{equiv => call(Node, Fun, infinity)}).
-doc(#{equiv => call(Node, Fun, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Fun) -> Result when
Node :: node(),
Fun :: function(),
Result :: term().

call(N, Fun) ->
call(N, Fun, infinity).
call(N, Fun, #{timeout => infinity}).

-doc """
Equivalent to
[`erpc:call(Node, erlang, apply, [Fun,[]], Timeout)`](`call/5`).
[`erpc:call(Node, erlang, apply, [Fun,[]], #{timeout => Timeout})`](`call/5`).

May raise all the same exceptions as [`call/5`](`call/5`) plus an `{erpc, badarg}`
`error` exception if `Fun` is not a fun of zero arity.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Fun, Timeout) -> Result when
-spec call(Node, Fun, TimeoutOrOptions) -> Result when
Node :: node(),
Fun :: function(),
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: term().

call(N, Fun, Timeout) when is_function(Fun, 0) ->
call(N, erlang, apply, [Fun, []], Timeout);
call(N, Fun, TimeoutOrOptions) when is_function(Fun, 0) ->
call(N, erlang, apply, [Fun, []], TimeoutOrOptions);
call(_N, _Fun, _Timeout) ->
error({?MODULE, badarg}).

-doc(#{equiv => call(Node, Module, Function, Args, infinity)}).
-doc(#{equiv => call(Node, Module, Function, Args, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Module, Function, Args) -> Result when
Node :: node(),
Expand All @@ -169,14 +187,16 @@ call(_N, _Fun, _Timeout) ->
Result :: term().

call(N, M, F, A) ->
call(N, M, F, A, infinity).
call(N, M, F, A, #{timeout => infinity}).

-dialyzer([{nowarn_function, call/5}, no_return]).

-doc """
Evaluates [`apply(Module, Function, Args)`](`apply/3`) on node `Node` and
returns the corresponding value `Result`. `Timeout` sets an upper time limit for
the `call` operation to complete.
returns the corresponding value `Result`.

`TimeoutOrOptions` can be either a [`timeout time`](`t:timeout_time/0`) or a
[`call options`](`t:call_options/0`) map (since OTP 28.0).

The `call()` function only returns if the applied function successfully returned
without raising any uncaught exceptions, the operation did not time out, and no
Expand Down Expand Up @@ -239,23 +259,24 @@ communication may, of course, reach the calling process.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be the calling process itself, a server, or a freshly
> spawned process.
> If the `always_spawn` option is `false` (which is the default), you cannot make
> _any_ assumptions about the process that will perform the `apply()`. It may be
> the calling process itself, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Module, Function, Args, Timeout) -> Result when
-spec call(Node, Module, Function, Args, TimeoutOrOptions) -> Result when
Node :: node(),
Module :: atom(),
Function :: atom(),
Args :: [term()],
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: term().

call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call
is_atom(M),
is_atom(F),
is_list(A) ->
call(N, M, F, A, #{timeout := infinity,
always_spawn := false}) when node() =:= N, %% Optimize local call
is_atom(M),
is_atom(F),
is_list(A) ->
try
{return, Return} = execute_call(M,F,A),
Return
Expand All @@ -271,10 +292,12 @@ call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call
error({exception, Reason, ErpcStack})
end
end;
call(N, M, F, A, T) when is_atom(N),
is_atom(M),
is_atom(F),
is_list(A) ->
call(N, M, F, A, #{timeout := T,
always_spawn := AlwaysSpawn}) when is_atom(N),
is_atom(M),
is_atom(F),
is_list(A),
is_boolean(AlwaysSpawn) ->
Timeout = timeout_value(T),
Res = make_ref(),
ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A],
Expand All @@ -287,8 +310,15 @@ call(N, M, F, A, T) when is_atom(N),
after Timeout ->
result(timeout, ReqId, Res, undefined)
end;
call(_N, _M, _F, _A, _T) ->
error({?MODULE, badarg}).
call(_N, _M, _F, _A, #{timeout := _T,
always_spawn := _AlwaysSpawn} = _Opts) ->
error({?MODULE, badarg});
call(N, M, F, A, #{} = Opts) ->
call(N, M, F, A, maps:merge(#{timeout => infinity,
always_spawn => false}, Opts));
call(N, M, F, A, T) ->
call(N, M, F, A, #{timeout => T,
always_spawn => false}).

%% Asynchronous call

Expand All @@ -309,11 +339,6 @@ Fails with an `{erpc, badarg}` `error` exception if:

- `Node` is not an atom.
- `Fun` is not a fun of zero arity.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec send_request(Node, Fun) -> RequestId when
Expand Down Expand Up @@ -364,11 +389,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.

Equivalent to
[`erpc:send_request(Node, erlang, apply, [Fun,[]]), Label, RequestIdCollection)`](`send_request/6`).

Expand All @@ -377,11 +397,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Node` is not an atom.
- `Fun` is not a fun of zero arity.
- `RequestIdCollection` is detected not to be request identifier collection.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec send_request(Node, Module, Function, Args) -> RequestId when
Expand Down Expand Up @@ -433,11 +448,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.
- `RequestIdCollection` is detected not to be request identifier collection.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 25.0">>}).
-spec send_request(Node, Module, Function, Args,
Expand Down Expand Up @@ -992,36 +1002,36 @@ reqids_to_list(_) ->
| {error, {?MODULE, Reason :: term()}}.


-doc(#{equiv => multicall(Nodes, Fun, infinity)}).
-doc(#{equiv => multicall(Nodes, Fun, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Fun) -> Result when
Nodes :: [atom()],
Fun :: function(),
Result :: term().

multicall(Ns, Fun) ->
multicall(Ns, Fun, infinity).
multicall(Ns, Fun, #{timeout => infinity}).

-doc """
Equivalent to
[`erpc:multicall(Nodes, erlang, apply, [Fun,[]], Timeout)`](`multicall/5`).
[`erpc:multicall(Nodes, erlang, apply, [Fun,[]], #{timeout => Timeout})`](`multicall/5`).

May raise all the same exceptions as [`multicall/5`](`multicall/5`) plus an
`{erpc, badarg}` `error` exception if `Fun` is not a fun of zero arity.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Fun, Timeout) -> Result when
-spec multicall(Nodes, Fun, TimeoutOrOptions) -> Result when
Nodes :: [atom()],
Fun :: function(),
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: term().

multicall(Ns, Fun, Timeout) when is_function(Fun, 0) ->
multicall(Ns, erlang, apply, [Fun, []], Timeout);
multicall(_Ns, _Fun, _Timeout) ->
multicall(Ns, Fun, TimeoutOrOptions) when is_function(Fun, 0) ->
multicall(Ns, erlang, apply, [Fun, []], TimeoutOrOptions);
multicall(_Ns, _Fun, _TimeoutOrOptions) ->
error({?MODULE, badarg}).

-doc(#{equiv => multicall(Nodes, Module, Function, Args, infinity)}).
-doc(#{equiv => multicall(Nodes, Module, Function, Args, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Module, Function, Args) -> Result when
Nodes :: [atom()],
Expand All @@ -1031,14 +1041,18 @@ multicall(_Ns, _Fun, _Timeout) ->
Result :: [{ok, ReturnValue :: term()} | caught_call_exception()].

multicall(Ns, M, F, A) ->
multicall(Ns, M, F, A, infinity).
multicall(Ns, M, F, A, #{timeout => infinity}).

-doc """
Performs multiple `call` operations in parallel on multiple nodes.

That is, evaluates [`apply(Module, Function, Args)`](`apply/3`) on the nodes `Nodes` in
parallel. `Timeout` sets an upper time limit for all `call` operations to
complete. The result is returned as a list where the result from each node is
parallel.

`TimeoutOrOptions` can be either a [`timeout time`](`t:timeout_time/0`) or a
[`call options`](`t:call_options/0`) map (since OTP 28.0).

The result is returned as a list where the result from each node is
placed at the same position as the node name is placed in `Nodes`. Each item in
the resulting list is formatted as either:

Expand Down Expand Up @@ -1094,32 +1108,38 @@ calling process, such communication may, of course, reach the calling process.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be the calling process itself, a server, or a freshly
> spawned process.
> If the `always_spawn` option is `false` (which is the default), you cannot make
> _any_ assumptions about the processes that will perform the `apply()`s. It may be
> the calling process itself, or freshly spawned processes, or a mix of both.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Module, Function, Args, Timeout) -> Result when
-spec multicall(Nodes, Module, Function, Args, TimeoutOrOptions) -> Result when
Nodes :: [atom()],
Module :: atom(),
Function :: atom(),
Args :: [term()],
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: [{ok, ReturnValue :: term()} | caught_call_exception()].

multicall(Ns, M, F, A, T) ->
multicall(Ns, M, F, A, #{} = Opts) ->
try
true = is_atom(M),
true = is_atom(F),
true = is_list(A),
Tag = make_ref(),
Timeout = timeout_value(T),
SendState = mcall_send_requests(Tag, Ns, M, F, A, Timeout),
Timeout = timeout_value(maps:get(timeout, Opts, infinity)),
LocalCall = case maps:get(always_spawn, Opts, false) of
true -> always_spawn;
false -> allow_local_call
end,
SendState = mcall_send_requests(Tag, Ns, M, F, A, LocalCall, Timeout),
mcall_receive_replies(Tag, SendState)
catch
error:NotIErr when NotIErr /= internal_error ->
error({?MODULE, badarg})
end.
end;
multicall(Ns, M, F, A, T) ->
multicall(Ns, M, F, A, #{timeout => T}).

-doc """
Equivalent to
Expand Down Expand Up @@ -1155,11 +1175,6 @@ if:
- `Function` is not an atom.
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec multicast(Nodes, Module, Function, Args) -> 'ok' when
Expand Down Expand Up @@ -1215,11 +1230,6 @@ ignored.
- `Function` is not an atom.
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.

> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec cast(Node, Module, Function, Args) -> 'ok' when
Expand Down Expand Up @@ -1515,9 +1525,9 @@ mcall_send_request(T, N, M, F, A) when is_reference(T),
{reply_tag, T},
{monitor, [{tag, T}]}]).

mcall_send_requests(Tag, Ns, M, F, A, Tmo) ->
mcall_send_requests(Tag, Ns, M, F, A, LC, Tmo) ->
DL = deadline(Tmo),
mcall_send_requests(Tag, Ns, M, F, A, [], DL, undefined, 0).
mcall_send_requests(Tag, Ns, M, F, A, [], DL, LC, 0).

mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
%% Timeout infinity and call on local node wanted;
Expand All @@ -1527,7 +1537,7 @@ mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) ->
{ok, RIDs, #{}, NRs, DL};
mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs,
infinity, undefined, NRs) when N == node() ->
infinity, allow_local_call, NRs) when N == node() ->
mcall_send_requests(Tag, Ns, M, F, A, [local_call|RIDs],
infinity, local_call, NRs);
mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs, DL, LC, NRs) ->
Expand Down
Loading
Loading