Skip to content

Commit

Permalink
Merge pull request #1 from mtokioka/redix-0.6.0
Browse files Browse the repository at this point in the history
Redix 0.6.0 etc
  • Loading branch information
masahiro tokioka authored Jun 16, 2017
2 parents bf063be + 0a89e4b commit a4b99b7
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 150 deletions.
6 changes: 3 additions & 3 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use Mix.Config

config :redix_cluster,
cluster_nodes: [%{host: '10.1.2.7', port: 7000},
%{host: '10.1.2.6', port: 7000},
%{host: '10.1.2.5', port: 7000}
cluster_nodes: [%{host: "127.0.0.1", port: 7000},
%{host: "127.0.0.1", port: 7001},
%{host: "127.0.0.1", port: 7002}
],
pool_size: 5,
pool_max_overflow: 0,
Expand Down
2 changes: 2 additions & 0 deletions lib/redix_cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ defmodule RedixCluster do
{:ok, [Redix.Protocol.redis_value]} | {:error, term}
def transaction(commands, opts\\ []), do: transaction(commands, opts, 0)

def flushdb(), do: RedixCluster.Run.flushdb()

@doc """
`Make sure` CROSSSLOT Keys in request hash to the same slot
Expand Down
4 changes: 2 additions & 2 deletions lib/redix_cluster/exceptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ defmodule RedixCluster.Error do

def exception(:no_support_transaction), do: %__MODULE__{message: "cluster pipeline don't support MULTI, using transation"}

def exception(other)when is_atom(other), do: %Redix.ConnectionError{message: :inet.format_error(other)}
def exception(other)when is_atom(other), do: %Redix.ConnectionError{reason: :inet.format_error(other)}

@type t :: %__MODULE__{message: binary} | %Redix.ConnectionError{message: binary}
@type t :: %__MODULE__{message: binary} | %Redix.ConnectionError{reason: binary}

end
54 changes: 7 additions & 47 deletions lib/redix_cluster/hash.ex
Original file line number Diff line number Diff line change
@@ -1,55 +1,15 @@
defmodule RedixCluster.Hash do
@moduledoc false

use Bitwise

@redis_cluster_hash_slots 16384
@crcdef <<0x00,0x00,0x10,0x21,0x20,0x42,0x30,0x63,0x40,0x84,0x50,0xa5,0x60,0xc6,0x70,0xe7,
0x81,0x08,0x91,0x29,0xa1,0x4a,0xb1,0x6b,0xc1,0x8c,0xd1,0xad,0xe1,0xce,0xf1,0xef,
0x12,0x31,0x02,0x10,0x32,0x73,0x22,0x52,0x52,0xb5,0x42,0x94,0x72,0xf7,0x62,0xd6,
0x93,0x39,0x83,0x18,0xb3,0x7b,0xa3,0x5a,0xd3,0xbd,0xc3,0x9c,0xf3,0xff,0xe3,0xde,
0x24,0x62,0x34,0x43,0x04,0x20,0x14,0x01,0x64,0xe6,0x74,0xc7,0x44,0xa4,0x54,0x85,
0xa5,0x6a,0xb5,0x4b,0x85,0x28,0x95,0x09,0xe5,0xee,0xf5,0xcf,0xc5,0xac,0xd5,0x8d,
0x36,0x53,0x26,0x72,0x16,0x11,0x06,0x30,0x76,0xd7,0x66,0xf6,0x56,0x95,0x46,0xb4,
0xb7,0x5b,0xa7,0x7a,0x97,0x19,0x87,0x38,0xf7,0xdf,0xe7,0xfe,0xd7,0x9d,0xc7,0xbc,
0x48,0xc4,0x58,0xe5,0x68,0x86,0x78,0xa7,0x08,0x40,0x18,0x61,0x28,0x02,0x38,0x23,
0xc9,0xcc,0xd9,0xed,0xe9,0x8e,0xf9,0xaf,0x89,0x48,0x99,0x69,0xa9,0x0a,0xb9,0x2b,
0x5a,0xf5,0x4a,0xd4,0x7a,0xb7,0x6a,0x96,0x1a,0x71,0x0a,0x50,0x3a,0x33,0x2a,0x12,
0xdb,0xfd,0xcb,0xdc,0xfb,0xbf,0xeb,0x9e,0x9b,0x79,0x8b,0x58,0xbb,0x3b,0xab,0x1a,
0x6c,0xa6,0x7c,0x87,0x4c,0xe4,0x5c,0xc5,0x2c,0x22,0x3c,0x03,0x0c,0x60,0x1c,0x41,
0xed,0xae,0xfd,0x8f,0xcd,0xec,0xdd,0xcd,0xad,0x2a,0xbd,0x0b,0x8d,0x68,0x9d,0x49,
0x7e,0x97,0x6e,0xb6,0x5e,0xd5,0x4e,0xf4,0x3e,0x13,0x2e,0x32,0x1e,0x51,0x0e,0x70,
0xff,0x9f,0xef,0xbe,0xdf,0xdd,0xcf,0xfc,0xbf,0x1b,0xaf,0x3a,0x9f,0x59,0x8f,0x78,
0x91,0x88,0x81,0xa9,0xb1,0xca,0xa1,0xeb,0xd1,0x0c,0xc1,0x2d,0xf1,0x4e,0xe1,0x6f,
0x10,0x80,0x00,0xa1,0x30,0xc2,0x20,0xe3,0x50,0x04,0x40,0x25,0x70,0x46,0x60,0x67,
0x83,0xb9,0x93,0x98,0xa3,0xfb,0xb3,0xda,0xc3,0x3d,0xd3,0x1c,0xe3,0x7f,0xf3,0x5e,
0x02,0xb1,0x12,0x90,0x22,0xf3,0x32,0xd2,0x42,0x35,0x52,0x14,0x62,0x77,0x72,0x56,
0xb5,0xea,0xa5,0xcb,0x95,0xa8,0x85,0x89,0xf5,0x6e,0xe5,0x4f,0xd5,0x2c,0xc5,0x0d,
0x34,0xe2,0x24,0xc3,0x14,0xa0,0x04,0x81,0x74,0x66,0x64,0x47,0x54,0x24,0x44,0x05,
0xa7,0xdb,0xb7,0xfa,0x87,0x99,0x97,0xb8,0xe7,0x5f,0xf7,0x7e,0xc7,0x1d,0xd7,0x3c,
0x26,0xd3,0x36,0xf2,0x06,0x91,0x16,0xb0,0x66,0x57,0x76,0x76,0x46,0x15,0x56,0x34,
0xd9,0x4c,0xc9,0x6d,0xf9,0x0e,0xe9,0x2f,0x99,0xc8,0x89,0xe9,0xb9,0x8a,0xa9,0xab,
0x58,0x44,0x48,0x65,0x78,0x06,0x68,0x27,0x18,0xc0,0x08,0xe1,0x38,0x82,0x28,0xa3,
0xcb,0x7d,0xdb,0x5c,0xeb,0x3f,0xfb,0x1e,0x8b,0xf9,0x9b,0xd8,0xab,0xbb,0xbb,0x9a,
0x4a,0x75,0x5a,0x54,0x6a,0x37,0x7a,0x16,0x0a,0xf1,0x1a,0xd0,0x2a,0xb3,0x3a,0x92,
0xfd,0x2e,0xed,0x0f,0xdd,0x6c,0xcd,0x4d,0xbd,0xaa,0xad,0x8b,0x9d,0xe8,0x8d,0xc9,
0x7c,0x26,0x6c,0x07,0x5c,0x64,0x4c,0x45,0x3c,0xa2,0x2c,0x83,0x1c,0xe0,0x0c,0xc1,
0xef,0x1f,0xff,0x3e,0xcf,0x5d,0xdf,0x7c,0xaf,0x9b,0xbf,0xba,0x8f,0xd9,0x9f,0xf8,
0x6e,0x17,0x7e,0x36,0x4e,0x55,0x5e,0x74,0x2e,0x93,0x3e,0xb2,0x0e,0xd1,0x1e,0xf0>>

@spec hash(binary) :: integer
def hash(key) when is_binary(key), do: key |> to_char_list |> hash
def hash(key), do: crc16(0, key) |>rem @redis_cluster_hash_slots

defp crc16(crc, []), do: crc
defp crc16(crc, [b | rest]) do
index = bsr(crc, 8)|> bxor(b) |> band(0xff)
bsl(crc, 8)|> band(0xffff) |> bxor(crc_index(index)) |> crc16(rest)
end
## CRCBench
# benchmark name iterations average time
# CRC 100 20819.58 µs/op
# RedixClusterCRC 10 126367.30 µs/op

defp crc_index(n) do
<<crc::16>> = :binary.part(@crcdef, n*2, 2)
crc
end
@spec hash(binary) :: integer
def hash(key) when is_list(key), do: key |> to_string |> hash
def hash(key), do: CRC.ccitt_16_xmodem(key) |>rem(@redis_cluster_hash_slots)

end
61 changes: 35 additions & 26 deletions lib/redix_cluster/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@ defmodule RedixCluster.Monitor do
def connect(cluster_nodes), do: GenServer.call(__MODULE__, {:connect, cluster_nodes})

@spec refresh_mapping(integer) :: :ok | {:ignore, String.t}
def refresh_mapping(version), do: GenServer.call(__MODULE__, {:reload_slots_map, version})
def refresh_mapping(version) do
result = GenServer.call(__MODULE__, {:reload_slots_map, version})
RedixCluster.SlotCache.refresh_mapping(version)
result
end

@spec get_slot_cache() :: {:cluster, [binary], [integer], integer} | {:not_cluster, integer, atom}
def get_slot_cache() do
[{:cluster_state, state}] = :ets.lookup(__MODULE__, :cluster_state)
case state.is_cluster do
true -> {:cluster, state.slots_maps, state.slots, state.version}
false ->
[slots_map] = state.slots_maps
{:not_cluster, state.version, slots_map.node.pool}
end
GenServer.call(__MODULE__, {:get_slot})
end

@spec start_link(Keyword.t) :: GenServer.on_start
def start_link(options), do: GenServer.start_link(__MODULE__, nil, options)

def init(nil) do
:ets.new(__MODULE__, [:protected, :set, :named_table, {:read_concurrency, true}])
case get_env(:cluster_nodes, []) do
[] -> {:ok, %State{}}
cluster_nodes -> {:ok, do_connect(cluster_nodes)}
end
end

def handle_call({:get_slot}, _from, state) do
{:reply, state, state}
end

def handle_call({:reload_slots_map, version}, _from, state = %State{version: version}) do
{:reply, :ok, reload_slots_map(state)}
end
Expand All @@ -56,13 +56,21 @@ defmodule RedixCluster.Monitor do
end

defp reload_slots_map(state) do
for slots_map <- state.slots_maps, do: close_connection(slots_map)
old_slots_maps = state.slots_maps
{is_cluster, cluster_info} = get_cluster_info(state.cluster_nodes)
slots_maps = cluster_info |> parse_slots_maps |> connect_all_slots
slots = create_slots_cache(slots_maps)
new_state = %State{state | slots: slots, slots_maps: slots_maps, version: state.version + 1, is_cluster: is_cluster}
true = :ets.insert(__MODULE__, [{:cluster_state, new_state}])
new_state

# close only removed pool
removed = removed_nodes(old_slots_maps, slots_maps)
for slots_map <- removed, do: close_connection(slots_map)

%State{state | slots: slots, slots_maps: slots_maps, version: state.version + 1, is_cluster: is_cluster}
end

defp removed_nodes(old_slots_maps, slots_maps) do
new_pools = slots_maps |> Enum.map(fn(slot) -> slot.node.pool end)
old_slots_maps |> Enum.reject(fn(slot) -> slot.node.pool in new_pools end)
end

defp close_connection(slots_map) do
Expand All @@ -77,13 +85,15 @@ defmodule RedixCluster.Monitor do
defp get_cluster_info([node|restnodes]) do
case start_link_redix(node.host, node.port) do
{:ok, conn} ->
case Redix.command(conn, ~w(CLUSTER SLOTS), []) do
{:ok, cluster_info} ->
Redix.stop(conn)
{true, cluster_info}
{:error, %Redix.Error{message: "ERR unknown command 'CLUSTER'"}} ->
cluster_info_from_single_node(node)
{:error, %Redix.Error{message: "ERR This instance has cluster support disabled"}} ->
try do
case Redix.command(conn, ~w(CLUSTER SLOTS), []) do
{:ok, cluster_info} ->
Redix.stop(conn)
{true, cluster_info}
{:error, _} -> get_cluster_info(restnodes)
end
rescue
Redix.Error ->
cluster_info_from_single_node(node)
end
_ -> get_cluster_info(restnodes)
Expand All @@ -93,7 +103,7 @@ defmodule RedixCluster.Monitor do
#[[10923, 16383, ["Host1", 7000], ["SlaveHost1", 7001]],
#[5461, 10922, ["Host2", 7000], ["SlaveHost2", 7001]],
#[0, 5460, ["Host3", 7000], ["SlaveHost3", 7001]]]
defp parse_slots_maps(cluster_info) do
def parse_slots_maps(cluster_info) do
cluster_info
|> Stream.with_index
|> Stream.map(&parse_cluster_slot/1)
Expand Down Expand Up @@ -128,7 +138,7 @@ defmodule RedixCluster.Monitor do
{false,
[[0,
@redis_cluster_hash_slots - 1,
[List.to_string(node.host), node.port]
[node.host, node.port]
]]
}
end
Expand Down Expand Up @@ -156,11 +166,10 @@ defmodule RedixCluster.Monitor do
|> String.to_atom
end

defp parse_master_node([[master_host, master_port]|_]) do
%{host: to_char_list(master_host),
defp parse_master_node([[master_host, master_port|_]|_]) do
%{host: master_host,
port: master_port,
pool: nil
}
end

end
88 changes: 52 additions & 36 deletions lib/redix_cluster/run.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,44 @@ defmodule RedixCluster.Run do

@spec command(command, Keyword.t) :: {:ok, term} |{:error, term}
def command(command, opts) do
case RedixCluster.Monitor.get_slot_cache do
{:cluster, slots_maps, slots, version} ->
command
|> parse_key_from_command
|> key_to_slot_hash
|> get_pool_by_slot(slots_maps, slots, version)
|> query_redis_pool(command, :command, opts)
{:not_cluster, version, pool_name} ->
query_redis_pool({version, pool_name}, command, :command, opts)
end
command
|> parse_key_from_command()
|> key_to_slot_hash()
|> RedixCluster.SlotCache.get_pool()
|> query_redis_pool(command, :command, opts)
end

@spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
def pipeline(pipeline, opts) do
case RedixCluster.Monitor.get_slot_cache do
{:cluster, slots_maps, slots, version} ->
pipeline
|> parse_keys_from_pipeline
|> keys_to_slot_hashs
|> is_same_slot_hashs
|> get_pool_by_slot(slots_maps, slots, version)
|> query_redis_pool(pipeline, :pipeline, opts)
{:not_cluster, version, pool_name} ->
query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
end
pipeline
|> parse_keys_from_pipeline()
|> keys_to_slot_hashs()
|> is_same_slot_hashs()
|> RedixCluster.SlotCache.get_pool()
|> query_redis_pool(pipeline, :pipeline, opts)
end

@spec transaction([command], Keyword.t) :: {:ok, term} |{:error, term}
def transaction(pipeline, opts) do
transaction = [["MULTI"]] ++ pipeline ++ [["EXEC"]]
case RedixCluster.Monitor.get_slot_cache do
{:cluster, slots_maps, slots, version} ->
pipeline
|> parse_keys_from_pipeline
|> keys_to_slot_hashs
|> is_same_slot_hashs
|> get_pool_by_slot(slots_maps, slots, version)
|> query_redis_pool(transaction, :pipeline, opts)
{:not_cluster, version, pool_name} ->
query_redis_pool({version, pool_name}, transaction, :pipeline, opts)
end

pipeline
|> parse_keys_from_pipeline()
|> keys_to_slot_hashs()
|> is_same_slot_hashs()
|> RedixCluster.SlotCache.get_pool()
|> query_redis_pool(transaction, :pipeline, opts)
end

def flushdb() do
{version, slots_maps} = RedixCluster.SlotCache.get_slot_maps
Enum.each(slots_maps, fn(cluster) ->
case cluster == nil or cluster.node == nil do
true -> nil
false -> query_redis_pool({version, cluster.node.pool}, ~w(flushdb), :command, [])
end
end)
{:ok, "OK"}
end

defp parse_key_from_command([term1, term2|_]), do: verify_command_key(term1, term2)
Expand Down Expand Up @@ -103,23 +100,42 @@ defmodule RedixCluster.Run do
try do
pool_name
|> :poolboy.transaction(fn(worker) -> GenServer.call(worker, {type, command, opts}) end)
|> parse_trans_result(version)
|> parse_trans_result({version, pool_name}, command, type, opts)
catch
:exit, _ ->
RedixCluster.Monitor.refresh_mapping(version)
{:error, :retry}
end
end

defp parse_trans_result({:error, %Redix.Error{message: <<"MOVED", _redirectioninfo::binary>>}}, version) do
defp parse_trans_result({:error, %Redix.Error{message: <<"ASK", redirectioninfo::binary>>}}, {version, _pool_name}, command, type, opts) do
[_, _slot, host_info] = Regex.split(~r/\s+/, redirectioninfo)
[host, port] = Regex.split(~r/:/, host_info)
RedixCluster.Pools.Supervisor.new_pool(host, port)
pool_name = ["Pool", host, ":", port] |> Enum.join |> String.to_atom
query_redis_pool({version, pool_name}, command, type, opts)
end
defp parse_trans_result({:error, %Redix.Error{message: <<"MOVED", _redirectioninfo::binary>>}}, {version, _pool_name}, _command, _type, _opts) do
RedixCluster.Monitor.refresh_mapping(version)
{:error, :retry}
end
defp parse_trans_result({:error, :no_connection}, {version, _pool_name}, _command, _type, _opts) do
RedixCluster.Monitor.refresh_mapping(version)
{:error, :retry}
end
defp parse_trans_result({:error, :closed}, {version, _pool_name}, _command, _type, _opts) do
RedixCluster.Monitor.refresh_mapping(version)
{:error, :retry}
end
defp parse_trans_result({:error, %Redix.ConnectionError{}}, {version, _pool_name}, _command, _type, _opts) do
RedixCluster.Monitor.refresh_mapping(version)
{:error, :retry}
end
defp parse_trans_result({:error, :no_connection}, version) do
defp parse_trans_result({:error, %Redix.Error{message: <<"CLUSTERDOWN", _::binary>>}}, {version, _pool_name}, _command, _type, _opts) do
RedixCluster.Monitor.refresh_mapping(version)
{:error, :retry}
end
defp parse_trans_result(payload, _), do: payload
defp parse_trans_result(payload, _, _, _, _), do: payload

defp verify_command_key(term1, term2) do
term1
Expand Down
Loading

0 comments on commit a4b99b7

Please sign in to comment.