From d46a827cd99da17a116c6a32de9691ff66567141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dorofiejczyk?= Date: Thu, 26 Apr 2018 15:37:36 +0200 Subject: [PATCH] Route balancing using jumping consistent hashing algorithm --- include/ejabberd_router.hrl | 2 + rebar.config | 1 + src/ejabberd_router.erl | 31 +++++++++++--- src/ejabberd_router_mnesia.erl | 77 +++++++++++++++++++++------------- 4 files changed, 76 insertions(+), 35 deletions(-) diff --git a/include/ejabberd_router.hrl b/include/ejabberd_router.hrl index 04ea6e304a8..65de1536475 100644 --- a/include/ejabberd_router.hrl +++ b/include/ejabberd_router.hrl @@ -2,6 +2,8 @@ -type local_hint() :: integer() | {apply, atom(), atom()}. +-type balancing_algorithm() :: dynamic | fix_number | consistent_hashing. + -record(route, {domain :: binary() | '_', server_host :: binary() | '_', pid :: undefined | pid(), diff --git a/rebar.config b/rebar.config index 79e13bc2bd6..0c60a480cc7 100644 --- a/rebar.config +++ b/rebar.config @@ -31,6 +31,7 @@ {p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}}, {jose, ".*", {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.8.4"}}}, {eimp, ".*", {git, "https://github.com/processone/eimp", {tag, "1.0.5"}}}, + {jch, ".*", {git, "https://github.com/darach/jch-erl", {tag, "0.2.3"}}}, {if_var_true, stun, {stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.0.22"}}}}, {if_var_true, sip, {esip, ".*", {git, "https://github.com/processone/esip", {tag, "1.0.23"}}}}, {if_var_true, mysql, {p1_mysql, ".*", {git, "https://github.com/processone/p1_mysql", diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index feb79549d74..516ee2578b4 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -54,7 +54,8 @@ is_my_host/1, clean_cache/1, config_reloaded/0, - get_backend/0]). + get_backend/0, + get_domain_balancing_algorithm/1]). -export([start_link/0]). @@ -68,6 +69,8 @@ %% This value is used in SIP and Megaco for a transaction lifetime. -define(IQ_TIMEOUT, 32000). +-define(MAX_PHASH_RANGE, 4294967296). + -include("ejabberd.hrl"). -include("logger.hrl"). -include("ejabberd_router.hrl"). @@ -394,8 +397,9 @@ do_route(_Pkt, _Route) -> balancing_route(From, To, Packet, Rs) -> LDstDomain = To#jid.lserver, Value = get_domain_balancing(From, To, LDstDomain), - case get_component_number(LDstDomain) of - undefined -> + + case get_domain_balancing_algorithm(LDstDomain) of + dynamic -> case [R || R <- Rs, node(R#route.pid) == node()] of [] -> R = lists:nth(erlang:phash(Value, length(Rs)), Rs), @@ -404,10 +408,15 @@ balancing_route(From, To, Packet, Rs) -> R = lists:nth(erlang:phash(Value, length(LRs)), LRs), do_route(Packet, R) end; - _ -> + fix_number -> SRs = lists:ukeysort(#route.local_hint, Rs), R = lists:nth(erlang:phash(Value, length(SRs)), SRs), - do_route(Packet, R) + do_route(Packet, R); + consistent_hashing -> + SRs = lists:ukeysort(#route.local_hint, Rs), + Rn = jch:ch(erlang:phash(Value, ?MAX_PHASH_RANGE), length(SRs)), + R = lists:nth(Rn + 1, SRs), + do_route(Packet, R) end. -spec get_component_number(binary()) -> pos_integer() | undefined. @@ -425,6 +434,18 @@ get_domain_balancing(From, To, LDomain) -> bare_destination -> jid:remove_resource(jid:tolower(To)) end. +-spec get_domain_balancing_algorithm(binary()) -> balancing_algorithm(). +get_domain_balancing_algorithm(LDomain) -> + case ejabberd_config:get_option({domain_balancing_algorithm, LDomain}) of + AlgorithmName when AlgorithmName == undefined; AlgorithmName == fix_number -> + case get_component_number(LDomain) of + undefined -> dynamic; + _ -> fix_number + end; + AlgorithmName -> + AlgorithmName + end. + -spec get_backend() -> module(). get_backend() -> DBType = ejabberd_config:get_option( diff --git a/src/ejabberd_router_mnesia.erl b/src/ejabberd_router_mnesia.erl index acf21a160ab..e8c2c8f30ad 100644 --- a/src/ejabberd_router_mnesia.erl +++ b/src/ejabberd_router_mnesia.erl @@ -56,7 +56,30 @@ start_link() -> use_cache() -> false. -register_route(Domain, ServerHost, LocalHint, undefined, Pid) -> +register_route(Domain, ServerHost, LocalHint, N, Pid) -> + Algorithm = ejabberd_router:get_domain_balancing_algorithm(Domain), + register_route(Domain, ServerHost, LocalHint, N, Pid, Algorithm). + +register_route(Domain, ServerHost, _LocalHint, undefined, Pid, consistent_hashing) -> + F = fun () -> + case mnesia:wread({route, Domain}) of + [] -> + mnesia:write(#route{domain = Domain, + server_host = ServerHost, + pid = Pid, + local_hint = 1}); + Rs -> + SRs = lists:ukeysort(#route.local_hint, Rs), + R = lists:last(SRs), + I = R#route.local_hint, + mnesia:write(#route{domain = Domain, + server_host = ServerHost, + pid = Pid, + local_hint = I + 1}) + end + end, + transaction(F); +register_route(Domain, ServerHost, LocalHint, undefined, Pid, dynamic) -> F = fun () -> mnesia:write(#route{domain = Domain, pid = Pid, @@ -64,7 +87,7 @@ register_route(Domain, ServerHost, LocalHint, undefined, Pid) -> local_hint = LocalHint}) end, transaction(F); -register_route(Domain, ServerHost, _LocalHint, N, Pid) -> +register_route(Domain, ServerHost, _LocalHint, N, Pid, fix_number) -> F = fun () -> case mnesia:wread({route, Domain}) of [] -> @@ -99,27 +122,31 @@ register_route(Domain, ServerHost, _LocalHint, N, Pid) -> end, transaction(F). -unregister_route(Domain, undefined, Pid) -> - F = fun () -> +unregister_route(Domain, LocalHint, Pid) -> + Algorithm = ejabberd_router:get_domain_balancing_algorithm(Domain), + unregister_route(Domain, LocalHint, Pid, Algorithm). + +unregister_route(Domain, _, Pid, fix_number) -> + F = fun () -> case mnesia:match_object( - #route{domain = Domain, pid = Pid, _ = '_'}) of - [R] -> mnesia:delete_object(R); - _ -> ok + #route{domain = Domain, pid = Pid, _ = '_'}) of + [R] -> + I = R#route.local_hint, + ServerHost = R#route.server_host, + mnesia:write(#route{domain = Domain, + server_host = ServerHost, + pid = undefined, + local_hint = I}), + mnesia:delete_object(R); + _ -> ok end - end, - transaction(F); -unregister_route(Domain, _, Pid) -> + end, + transaction(F); +unregister_route(Domain, _, Pid, _) -> F = fun () -> case mnesia:match_object( #route{domain = Domain, pid = Pid, _ = '_'}) of - [R] -> - I = R#route.local_hint, - ServerHost = R#route.server_host, - mnesia:write(#route{domain = Domain, - server_host = ServerHost, - pid = undefined, - local_hint = I}), - mnesia:delete_object(R); + [R] -> mnesia:delete_object(R); _ -> ok end end, @@ -166,23 +193,13 @@ handle_info({mnesia_table_event, handle_info({mnesia_table_event, _}, State) -> {noreply, State}; handle_info({'DOWN', _Ref, _Type, Pid, _Info}, State) -> + ?DEBUG("Process down: ~p", [Pid]), F = fun () -> Es = mnesia:select(route, [{#route{pid = Pid, _ = '_'}, [], ['$_']}]), lists:foreach( fun(E) -> - if is_integer(E#route.local_hint) -> - LDomain = E#route.domain, - I = E#route.local_hint, - ServerHost = E#route.server_host, - mnesia:write(#route{domain = LDomain, - server_host = ServerHost, - pid = undefined, - local_hint = I}), - mnesia:delete_object(E); - true -> - mnesia:delete_object(E) - end + unregister_route(E#route.domain, E#route.local_hint, Pid) end, Es) end, transaction(F),