From 3b72fbb1fb5d40eb60562189e723878d06bb0ff0 Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 23 Mar 2023 10:19:31 +0800 Subject: [PATCH 1/5] The grpcbox_client get_channel function adds support for hash and direct strategies --- src/grpcbox_channel.erl | 21 +++++++++++++++++---- src/grpcbox_client.erl | 3 ++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 3530f4d..4023024 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -5,6 +5,7 @@ -export([start_link/3, is_ready/1, pick/2, + pick/3, stop/1, stop/2]). -export([init/1, @@ -58,11 +59,19 @@ is_ready(Name) -> gen_statem:call(?CHANNEL(Name), is_ready). %% @doc Picks a subchannel from a pool using the configured strategy. --spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} | - {error, undefined_channel | no_endpoints}. +-spec pick(name(), unary | stream) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | no_endpoints}. pick(Name, CallType) -> + pick(Name, CallType, undefined). + +%% @doc Picks a subchannel from a pool using the configured strategy. +-spec pick(name(), unary | stream, term() | undefined) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | no_endpoints}. +pick(Name, CallType, Key) -> try - case gproc_pool:pick_worker(Name) of + case pick_worker(Name, Key) of false -> {error, no_endpoints}; Pid when is_pid(Pid) -> {ok, {Pid, interceptor(Name, CallType)}} @@ -72,6 +81,11 @@ pick(Name, CallType) -> {error, undefined_channel} end. +pick_worker(Name, undefined) -> + gproc_pool:pick_worker(Name); +pick_worker(Name, Key) -> + gproc_pool:pick_worker(Name, Key). + -spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined. interceptor(Name, CallType) -> case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of @@ -177,4 +191,3 @@ start_workers(Pool, StatsHandler, Encoding, Endpoints) -> Encoding, StatsHandler), Pid end || Endpoint={Transport, Host, Port, EndpointOptions} <- Endpoints]. - diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index b00b487..42bb025 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -47,7 +47,8 @@ get_channel(Options, Type) -> Channel = maps:get(channel, Options, default_channel), - grpcbox_channel:pick(Channel, Type). + Key = maps:get(key, Options, undefined), + grpcbox_channel:pick(Channel, Type, Key). unary(Ctx, Service, Method, Input, Def, Options) -> unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options). From b9060a2e21c9382bf1de5ace581f3495e83c57be Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 23 Mar 2023 10:02:49 +0800 Subject: [PATCH 2/5] Fix the value passed into the Name parameter of the subchannel gproc_pool disconnect_worker and remove_worker functions --- src/grpcbox_subchannel.erl | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/grpcbox_subchannel.erl b/src/grpcbox_subchannel.erl index 8eb6bad..93e5a10 100644 --- a/src/grpcbox_subchannel.erl +++ b/src/grpcbox_subchannel.erl @@ -14,7 +14,8 @@ ready/3, disconnected/3]). --record(data, {endpoint :: grpcbox_channel:endpoint(), +-record(data, {name :: any(), + endpoint :: grpcbox_channel:endpoint(), channel :: grpcbox_channel:t(), info :: #{authority := binary(), scheme := binary(), @@ -41,8 +42,10 @@ stop(Pid, Reason) -> init([Name, Channel, Endpoint, Encoding, StatsHandler]) -> process_flag(trap_exit, true), + gproc_pool:connect_worker(Channel, Name), - {ok, disconnected, #data{conn=undefined, + {ok, disconnected, #data{name=Name, + conn=undefined, info=info_map(Endpoint, Encoding, StatsHandler), endpoint=Endpoint, channel=Channel}}. @@ -89,24 +92,24 @@ handle_event(_, _, _) -> keep_state_and_data. terminate(_Reason, _State, #data{conn=undefined, - endpoint=Endpoint, + name=Name, channel=Channel}) -> - gproc_pool:disconnect_worker(Channel, Endpoint), - gproc_pool:remove_worker(Channel, Endpoint), + gproc_pool:disconnect_worker(Channel, Name), + gproc_pool:remove_worker(Channel, Name), ok; terminate(normal, _State, #data{conn=Pid, - endpoint=Endpoint, + name=Name, channel=Channel}) -> h2_connection:stop(Pid), - gproc_pool:disconnect_worker(Channel, Endpoint), - gproc_pool:remove_worker(Channel, Endpoint), + gproc_pool:disconnect_worker(Channel, Name), + gproc_pool:remove_worker(Channel, Name), ok; terminate(Reason, _State, #data{conn=Pid, - endpoint=Endpoint, + name=Name, channel=Channel}) -> + gproc_pool:disconnect_worker(Channel, Name), + gproc_pool:remove_worker(Channel, Name), exit(Pid, Reason), - gproc_pool:disconnect_worker(Channel, Endpoint), - gproc_pool:remove_worker(Channel, Endpoint), ok. connect(Data=#data{conn=undefined, From 5c041a3c2940a8d139e41ab9686b08322dbf3c5b Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 23 Mar 2023 10:12:14 +0800 Subject: [PATCH 3/5] grpcbox_channel can add and remove dynamic endpoints --- src/grpcbox_channel.erl | 51 ++++++++++++++++++++++++++++------ test/grpcbox_channel_SUITE.erl | 39 ++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 test/grpcbox_channel_SUITE.erl diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 4023024..cf7cce8 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -6,6 +6,8 @@ is_ready/1, pick/2, pick/3, + add_endpoints/2, + remove_endpoints/3, stop/1, stop/2]). -export([init/1, @@ -86,6 +88,12 @@ pick_worker(Name, undefined) -> pick_worker(Name, Key) -> gproc_pool:pick_worker(Name, Key). +add_endpoints(Name, Endpoints) -> + gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}). + +remove_endpoints(Name, Endpoints, Reason) -> + gen_statem:call(?CHANNEL(Name), {remove_endpoints, Endpoints, Reason}). + -spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined. interceptor(Name, CallType) -> case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of @@ -114,14 +122,13 @@ init([Name, Endpoints, Options]) -> pool = Name, encoding = Encoding, stats_handler = StatsHandler, - endpoints = Endpoints + endpoints = lists:usort(Endpoints) }, - case maps:get(sync_start, Options, false) of false -> {ok, idle, Data, [{next_event, internal, connect}]}; true -> - _ = start_workers(Name, StatsHandler, Encoding, Endpoints), + start_workers(Name, StatsHandler, Encoding, Endpoints), {ok, connected, Data} end. @@ -130,6 +137,23 @@ callback_mode() -> connected({call, From}, is_ready, _Data) -> {keep_state_and_data, [{reply, From, true}]}; +connected({call, From}, {add_endpoints, Endpoints}, + Data=#data{pool=Pool, + stats_handler=StatsHandler, + encoding=Encoding, + endpoints=TotalEndpoints}) -> + NewEndpoints = lists:subtract(Endpoints, TotalEndpoints), + NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints), + start_workers(Pool, StatsHandler, Encoding, NewEndpoints), + {keep_state, Data#data{endpoints=NewTotalEndpoints}, [{reply, From, ok}]}; +connected({call, From}, {remove_endpoints, Endpoints, Reason}, + Data=#data{pool=Pool, endpoints=TotalEndpoints}) -> + + NewEndpoints = sets:to_list(sets:intersection(sets:from_list(Endpoints), + sets:from_list(TotalEndpoints))), + NewTotalEndpoints = lists:subtract(TotalEndpoints, Endpoints), + stop_workers(Pool, NewEndpoints, Reason), + {keep_state, Data#data{endpoints = NewTotalEndpoints}, [{reply, From, ok}]}; connected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). @@ -137,7 +161,8 @@ idle(internal, connect, Data=#data{pool=Pool, stats_handler=StatsHandler, encoding=Encoding, endpoints=Endpoints}) -> - _ = start_workers(Pool, StatsHandler, Encoding, Endpoints), + + start_workers(Pool, StatsHandler, Encoding, Endpoints), {next_state, connected, Data}; idle({call, From}, is_ready, _Data) -> {keep_state_and_data, [{reply, From, false}]}; @@ -186,8 +211,16 @@ insert_stream_interceptor(Name, _Type, Interceptors) -> start_workers(Pool, StatsHandler, Encoding, Endpoints) -> [begin - gproc_pool:add_worker(Pool, Endpoint), - {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, EndpointOptions}, - Encoding, StatsHandler), - Pid - end || Endpoint={Transport, Host, Port, EndpointOptions} <- Endpoints]. + gproc_pool:add_worker(Pool, Endpoint), + {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, + Pool, Endpoint, Encoding, StatsHandler), + Pid + end || Endpoint <- Endpoints]. + +stop_workers(Pool, Endpoints, Reason) -> + [begin + case gproc_pool:whereis_worker(Pool, Endpoint) of + undefined -> ok; + Pid -> grpcbox_subchannel:stop(Pid, Reason) + end + end || Endpoint <- Endpoints]. diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl new file mode 100644 index 0000000..5a20a62 --- /dev/null +++ b/test/grpcbox_channel_SUITE.erl @@ -0,0 +1,39 @@ +-module(grpcbox_channel_SUITE). + +-export([all/0, + init_per_suite/1, + end_per_suite/1, + add_and_remove_endpoints/1]). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + add_and_remove_endpoints + ]. +init_per_suite(_Config) -> + application:set_env(grpcbox, servers, []), + application:ensure_all_started(grpcbox), + grpcbox_channel_sup:start_link(), + grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, []}], #{}), + grpcbox_channel_sup:start_child(random_channel, + [{http, "127.0.0.1", 18080, []}, {http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], + #{balancer => random}), + grpcbox_channel_sup:start_child(hash_channel, + [{http, "127.0.0.1", 18080, []}, {http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], + #{balancer => hash}), + grpcbox_channel_sup:start_child(direct_channel, + [{http, "127.0.0.1", 18080, []}, {http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.4", 18084, []}], + #{ balancer => direct}), + + _Config. + +end_per_suite(_Config) -> + application:stop(grpcbox), + ok. + +add_and_remove_endpoints(_Config) -> + grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]), + ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]), + ?assertEqual(7, length(gproc_pool:active_workers(default_channel))). From 22c8de1a18e80cea072bb807eac881c2846adc80 Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 23 Mar 2023 10:43:56 +0800 Subject: [PATCH 4/5] Pick a subchannel which connection is established --- src/grpcbox_channel.erl | 7 ++- src/grpcbox_client.erl | 6 ++- src/grpcbox_subchannel.erl | 75 ++++++++++++++++++++++---------- test/grpcbox_SUITE.erl | 1 + test/grpcbox_channel_SUITE.erl | 79 +++++++++++++++++++++++++++++++--- 5 files changed, 138 insertions(+), 30 deletions(-) diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index cf7cce8..ab98bd7 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -118,6 +118,8 @@ init([Name, Endpoints, Options]) -> gproc_pool:new(Name, BalancerType, [{size, length(Endpoints)}, {auto_size, true}]), + gproc_pool:new({Name, active}, BalancerType, [{size, length(Endpoints)}, + {auto_size, true}]), Data = #data{ pool = Name, encoding = Encoding, @@ -173,10 +175,12 @@ handle_event(_, _, Data) -> {keep_state, Data}. terminate({shutdown, force_delete}, _State, #data{pool=Name}) -> - gproc_pool:force_delete(Name); + gproc_pool:force_delete(Name), + gproc_pool:force_delete({Name, active}); terminate(Reason, _State, #data{pool=Name}) -> [grpcbox_subchannel:stop(Pid, Reason) || {_Channel, Pid} <- gproc_pool:active_workers(Name)], gproc_pool:delete(Name), + gproc_pool:delete({Name, active}), ok. insert_interceptors(Name, Interceptors) -> @@ -212,6 +216,7 @@ insert_stream_interceptor(Name, _Type, Interceptors) -> start_workers(Pool, StatsHandler, Encoding, Endpoints) -> [begin gproc_pool:add_worker(Pool, Endpoint), + gproc_pool:add_worker({Pool, active}, Endpoint), {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, Endpoint, Encoding, StatsHandler), Pid diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index 42bb025..4dd4ffd 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -48,7 +48,11 @@ get_channel(Options, Type) -> Channel = maps:get(channel, Options, default_channel), Key = maps:get(key, Options, undefined), - grpcbox_channel:pick(Channel, Type, Key). + PickStrategy = maps:get(pick_strategy, Options, undefined), + case PickStrategy of + active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key); + undefined -> grpcbox_channel:pick(Channel, Type, Key) + end. unary(Ctx, Service, Method, Input, Def, Options) -> unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options). diff --git a/src/grpcbox_subchannel.erl b/src/grpcbox_subchannel.erl index 93e5a10..60a50fb 100644 --- a/src/grpcbox_subchannel.erl +++ b/src/grpcbox_subchannel.erl @@ -14,6 +14,8 @@ ready/3, disconnected/3]). +-define(RECONNECT_INTERVAL, 5000). + -record(data, {name :: any(), endpoint :: grpcbox_channel:endpoint(), channel :: grpcbox_channel:t(), @@ -42,13 +44,13 @@ stop(Pid, Reason) -> init([Name, Channel, Endpoint, Encoding, StatsHandler]) -> process_flag(trap_exit, true), - gproc_pool:connect_worker(Channel, Name), - {ok, disconnected, #data{name=Name, - conn=undefined, - info=info_map(Endpoint, Encoding, StatsHandler), - endpoint=Endpoint, - channel=Channel}}. + Data = #data{name=Name, + conn=undefined, + info=info_map(Endpoint, Encoding, StatsHandler), + endpoint=Endpoint, + channel=Channel}, + {ok, disconnected, Data, [{next_event, internal, connect}]}. info_map({http, Host, 80, _}, Encoding, StatsHandler) -> #{authority => list_to_binary(Host), @@ -72,20 +74,28 @@ callback_mode() -> ready({call, From}, conn, #data{conn=Conn, info=Info}) -> {keep_state_and_data, [{reply, From, {ok, Conn, Info}}]}; +ready(info, {'EXIT', Pid, _}, Data=#data{conn=Pid, name=Name, channel=Channel}) -> + gproc_pool:disconnect_worker({Channel, active}, Name), + {next_state, disconnected, Data#data{conn=undefined}, [{next_event, internal, connect}]}; +ready(info, {timeout, connect}, _Data) -> + keep_state_and_data; ready(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). +disconnected(internal, connect, Data) -> + do_connect(Data); +disconnected(info, {timeout, connect}, Data) -> + do_connect(Data); disconnected({call, From}, conn, Data) -> connect(Data, From, [postpone]); +disconnected(info, {'EXIT', _, _}, #data{conn=undefined}) -> + erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}), + keep_state_and_data; disconnected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, Data). handle_event({call, From}, info, #data{info=Info}) -> {keep_state_and_data, [{reply, From, Info}]}; -handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) -> - {next_state, disconnected, Data#data{conn=undefined}}; -handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) -> - keep_state_and_data; handle_event({call, From}, shutdown, _) -> {stop_and_reply, normal, {reply, From, ok}}; handle_event(_, _, _) -> @@ -96,6 +106,7 @@ terminate(_Reason, _State, #data{conn=undefined, channel=Channel}) -> gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:remove_worker({Channel, active}, Name), ok; terminate(normal, _State, #data{conn=Pid, name=Name, @@ -103,29 +114,37 @@ terminate(normal, _State, #data{conn=Pid, h2_connection:stop(Pid), gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:disconnect_worker({Channel, active}, Name), + gproc_pool:remove_worker({Channel, active}, Name), ok; terminate(Reason, _State, #data{conn=Pid, name=Name, channel=Channel}) -> gproc_pool:disconnect_worker(Channel, Name), gproc_pool:remove_worker(Channel, Name), + gproc_pool:disconnect_worker({Channel, active}, Name), + gproc_pool:remove_worker({Channel, active}, Name), exit(Pid, Reason), ok. -connect(Data=#data{conn=undefined, - endpoint={Transport, Host, Port, EndpointOptions}}, From, Actions) -> - % Get and delete non-ssl options from endpoint options, these are passed as connection settings - ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000), - TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0), - EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions), - EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2), +do_connect(Data=#data{name=Name, channel=Channel, + conn=undefined, endpoint=Endpoint}) -> + case start_h2_client(Endpoint) of + {ok, Pid} -> + gproc_pool:connect_worker({Channel, active}, Name), + {next_state, ready, Data#data{conn=Pid}}; + {error, _} -> + erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}), + {next_state, disconnected, Data#data{conn=undefined}} + end. + +connect(Data=#data{name=Name, channel=Channel, + conn=undefined, endpoint=Endpoint}, + From, Actions) -> + case start_h2_client(Endpoint) of - case h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3), - #{garbage_on_end => true, - stream_callback_mod => grpcbox_client_stream, - connect_timeout => ConnectTimeout, - tcp_user_timeout => TcpUserTimeout}) of {ok, Pid} -> + gproc_pool:connect_worker({Channel, active}, Name), {next_state, ready, Data#data{conn=Pid}, Actions}; {error, _}=Error -> {next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]} @@ -138,3 +157,15 @@ options(https, Options) -> [{client_preferred_next_protocols, {client, [<<"h2">>]}} | Options]; options(http, Options) -> Options. + +start_h2_client({Transport, Host, Port, EndpointOptions}) -> + % Get and delete non-ssl options from endpoint options, these are passed as connection settings + ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000), + TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0), + EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions), + EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2), + h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3), + #{garbage_on_end => true, + stream_callback_mod => grpcbox_client_stream, + connect_timeout => ConnectTimeout, + tcp_user_timeout => TcpUserTimeout}). diff --git a/test/grpcbox_SUITE.erl b/test/grpcbox_SUITE.erl index 81c1c1a..cab1be3 100644 --- a/test/grpcbox_SUITE.erl +++ b/test/grpcbox_SUITE.erl @@ -343,6 +343,7 @@ end_per_testcase(_, _Config) -> initially_down_service(_Config) -> Point = #{latitude => 409146138, longitude => -746188906}, Ctx = ctx:with_deadline_after(ctx:new(), 5, second), + ct:sleep(100), ?assertMatch({error, econnrefused}, routeguide_route_guide_client:get_feature(Ctx, Point)), grpcbox:start_server(#{grpc_opts => #{service_protos => [route_guide_pb], diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl index 5a20a62..b6f8135 100644 --- a/test/grpcbox_channel_SUITE.erl +++ b/test/grpcbox_channel_SUITE.erl @@ -1,19 +1,37 @@ -module(grpcbox_channel_SUITE). -export([all/0, - init_per_suite/1, - end_per_suite/1, - add_and_remove_endpoints/1]). + init_per_suite/1, + end_per_suite/1, + add_and_remove_endpoints/1, + add_and_remove_endpoints_active_workers/1, + pick_worker_strategy/1, + pick_active_worker_strategy/1 + ]). -include_lib("eunit/include/eunit.hrl"). all() -> [ - add_and_remove_endpoints + add_and_remove_endpoints, + add_and_remove_endpoints_active_workers, + pick_worker_strategy, + pick_active_worker_strategy ]. init_per_suite(_Config) -> - application:set_env(grpcbox, servers, []), + application:set_env(grpcbox, client, #{channel => []}), + GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}}, + Servers = [#{grpc_opts => GrpcOptions, + listen_opts => #{port => 18080, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18081, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18082, ip => {127,0,0,1}}}, + #{grpc_opts => GrpcOptions, + listen_opts => #{port => 18083, ip => {127,0,0,1}}}], + application:set_env(grpcbox, servers, Servers), application:ensure_all_started(grpcbox), + ct:sleep(1000), grpcbox_channel_sup:start_link(), grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, []}], #{}), grpcbox_channel_sup:start_child(random_channel, @@ -32,8 +50,57 @@ end_per_suite(_Config) -> application:stop(grpcbox), ok. + add_and_remove_endpoints(_Config) -> grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]), ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]), - ?assertEqual(7, length(gproc_pool:active_workers(default_channel))). + ?assertEqual(7, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], normal), + ?assertEqual(4, length(gproc_pool:active_workers(default_channel))), + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18080, []}, {https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}], normal), + ?assertEqual(2, length(gproc_pool:active_workers(default_channel))). + +add_and_remove_endpoints_active_workers(_Config) -> + grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]), + ct:sleep(1000), + ?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]), + ct:sleep(1000), + ?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}], normal), + ct:sleep(1000), + ?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))), + grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}], normal), + ct:sleep(1000), + ?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))). + +pick_worker_strategy(_Config) -> + ?assertEqual(ok, pick_worker(default_channel)), + ?assertEqual(ok, pick_worker(random_channel)), + ?assertEqual(ok, pick_worker(direct_channel, 1)), + ?assertEqual(ok, pick_worker(hash_channel, 1)), + ?assertEqual(error, pick_worker(default_channel, 1)), + ?assertEqual(error, pick_worker(random_channel, 1)), + ?assertEqual(error, pick_worker(direct_channel)), + ?assertEqual(error, pick_worker(hash_channel)), + ok. + +pick_active_worker_strategy(_Config) -> + ct:sleep(1000), + ?assertEqual(ok, pick_worker({default_channel, active})), + ?assertEqual(ok, pick_worker({random_channel, active})), + ?assertEqual(ok, pick_worker({direct_channel, active}, 1)), + ?assertEqual(ok, pick_worker({hash_channel, active}, 1)), + ?assertEqual(error, pick_worker({default_channel, active}, 1)), + ?assertEqual(error, pick_worker({random_channel, active}, 1)), + ?assertEqual(error, pick_worker({direct_channel, active})), + ?assertEqual(error, pick_worker({hash_channel, active})), + ok. + +pick_worker(Name, N) -> + {R, _} = grpcbox_channel:pick(Name, unary, N), + R. +pick_worker(Name) -> + {R, _} = grpcbox_channel:pick(Name, unary, undefined), + R. From 48f26092c76eadf8f1a16d2006657f82ddb4889e Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 13 Apr 2023 13:48:42 +0800 Subject: [PATCH 5/5] Pick the specify worker --- src/grpcbox_channel.erl | 11 +++++++++++ src/grpcbox_client.erl | 1 + test/grpcbox_channel_SUITE.erl | 12 ++++++++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index ab98bd7..c3cbb99 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -4,6 +4,7 @@ -export([start_link/3, is_ready/1, + get/3, pick/2, pick/3, add_endpoints/2, @@ -60,6 +61,16 @@ start_link(Name, Endpoints, Options) -> is_ready(Name) -> gen_statem:call(?CHANNEL(Name), is_ready). +-spec get(name(), unary | stream, term()) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | not_found_endpoint}. +get(Name, CallType, Key) -> + case lists:keyfind(Key, 1, gproc_pool:active_workers(Name)) of + {_, Pid} -> {ok, {Pid, interceptor(Name, CallType)}}; + false -> {error, not_found_endpoint} + end. + + %% @doc Picks a subchannel from a pool using the configured strategy. -spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} | diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index 4dd4ffd..e04a78f 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -50,6 +50,7 @@ get_channel(Options, Type) -> Key = maps:get(key, Options, undefined), PickStrategy = maps:get(pick_strategy, Options, undefined), case PickStrategy of + specify_worker -> grpcbox_channel:get(Channel, Type, Key); active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key); undefined -> grpcbox_channel:pick(Channel, Type, Key) end. diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl index b6f8135..e259cf5 100644 --- a/test/grpcbox_channel_SUITE.erl +++ b/test/grpcbox_channel_SUITE.erl @@ -6,7 +6,8 @@ add_and_remove_endpoints/1, add_and_remove_endpoints_active_workers/1, pick_worker_strategy/1, - pick_active_worker_strategy/1 + pick_active_worker_strategy/1, + pick_specify_worker_strategy/1 ]). -include_lib("eunit/include/eunit.hrl"). @@ -16,7 +17,8 @@ all() -> add_and_remove_endpoints, add_and_remove_endpoints_active_workers, pick_worker_strategy, - pick_active_worker_strategy + pick_active_worker_strategy, + pick_specify_worker_strategy ]. init_per_suite(_Config) -> application:set_env(grpcbox, client, #{channel => []}), @@ -98,6 +100,12 @@ pick_active_worker_strategy(_Config) -> ?assertEqual(error, pick_worker({hash_channel, active})), ok. +pick_specify_worker_strategy(_Config) -> + ?assertMatch({ok, _} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 18080, []})), + ?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 8080, []})), + ?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(channel_xxx, stream, {http, "127.0.0.1", 8080, []})), + ok. + pick_worker(Name, N) -> {R, _} = grpcbox_channel:pick(Name, unary, N), R.