diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml new file mode 100644 index 0000000..edd77bf --- /dev/null +++ b/.github/workflows/erlang.yml @@ -0,0 +1,23 @@ +name: Erlang CI + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + + build: + + runs-on: ubuntu-latest + + container: + image: erlang:24 + + steps: + - uses: actions/checkout@v2 + - name: Compile + run: rebar3 compile + - name: Run tests + run: rebar3 do eunit, ct diff --git a/rebar.config b/rebar.config index aadbb18..dfbff8e 100644 --- a/rebar.config +++ b/rebar.config @@ -1,34 +1,53 @@ %% -*- mode: erlang;erlang-indent-level: 2;indent-tabs-mode: nil -*- %% ex: ts=4 sw=4 ft=erlang et - %% == Erlang Compiler == -{erl_opts, [ - warn_unused_vars, - warnings_as_errors, - ewarn_export_all, - warn_shadow_vars, - warn_unused_import, - warn_unused_function, - warn_bif_clash, - warn_unused_record, - warn_deprecated_function, - warn_obsolete_guard, - strict_validation, - warn_export_vars, - warn_exported_vars, - warn_missing_spec, - warn_untyped_record, - debug_info -]}. +{ + erl_opts, + [ + warn_unused_vars, + warnings_as_errors, + ewarn_export_all, + warn_shadow_vars, + warn_unused_import, + warn_unused_function, + warn_bif_clash, + warn_unused_record, + warn_deprecated_function, + warn_obsolete_guard, + strict_validation, + warn_export_vars, + warn_exported_vars, + %warn_missing_spec, + warn_untyped_record, + debug_info + ] +}. %% == Dependencies == -{deps, [ - {sumo_db, "0.7.2"}, - {riakc, "2.5.3"}, - {iso8601, "1.1.2", {pkg, inaka_iso8601}} -]}. +{ + deps, + [ + { + datetime, + {git, "https://github.com/jagguli/date_util.git", {branch, "master"}} + }, + { + sumo_db, + {git, "https://github.com/jagguli/sumo_db.git", {branch, "master"}} + }, + { + riakc, + { + git, + "git://github.com/basho/riak-erlang-client.git", + {branch, "develop-3.0"} + } + }, + {iso8601, "1.1.2", {pkg, inaka_iso8601}} + ] +}. %% == Overrides == @@ -36,38 +55,45 @@ %% == Profiles == -{profiles, [ - {test, [ - {deps, [ - {katana_test, "0.1.1"}, - {mixer, "0.1.5", {pkg, inaka_mixer}} - ]} - ]} -]}. +{ + profiles, + [ + { + test, + [ + { + deps, + [katana_test, {mixer, "1.2.0", {pkg, inaka_mixer}}, {elvis, "1.0.1"}] + } + ] + } + ] +}. %% == Common Test == -{ct_compile_opts, [ - warn_unused_vars, - warn_export_all, - warn_shadow_vars, - warn_unused_import, - warn_unused_function, - warn_bif_clash, - warn_unused_record, - warn_deprecated_function, - warn_obsolete_guard, - strict_validation, - warn_export_vars, - warn_exported_vars, - warn_missing_spec, - warn_untyped_record, - debug_info -]}. - -{ct_opts, [ - {sys_config, ["test/test.config"]} -]}. +{ + ct_compile_opts, + [ + warn_unused_vars, + warn_export_all, + warn_shadow_vars, + warn_unused_import, + warn_unused_function, + warn_bif_clash, + warn_unused_record, + warn_deprecated_function, + warn_obsolete_guard, + strict_validation, + warn_export_vars, + warn_exported_vars, + warn_missing_spec, + warn_untyped_record, + debug_info + ] +}. + +{ct_opts, [{sys_config, ["test/test.config"]}]}. %% == Cover == @@ -77,32 +103,46 @@ %% == EDoc == -{edoc_opts, [ - {report_missing_types, true}, - {source_path, ["src"]}, - {report_missing_types, true}, - {todo, true}, - {packages, false}, - {subpackages, false} -]}. +{ + edoc_opts, + [ + {report_missing_types, true}, + {source_path, ["src"]}, + {report_missing_types, true}, + {todo, true}, + {packages, false}, + {subpackages, false} + ] +}. %% == Dialyzer == -{dialyzer, [ - {warnings, [ - race_conditions, - no_return, - unmatched_returns, - error_handling - ]}, - {plt_apps, top_level_deps}, - {plt_extra_apps, []}, - {plt_location, local}, - {plt_prefix, "sumo_db_riak"}, - {base_plt_location, ".plt"}, - {base_plt_prefix, "sumo_db_riak"} -]}. +{ + dialyzer, + [ + {warnings, [race_conditions, no_return, unmatched_returns, error_handling]}, + {plt_apps, top_level_deps}, + {plt_extra_apps, []}, + {plt_location, local}, + {plt_prefix, "sumo_db_riak"}, + {base_plt_location, ".plt"}, + {base_plt_prefix, "sumo_db_riak"} + ] +}. %% == Shell == {shell, [{apps, [sumo_db_riak]}]}. + +%% == Plugins == + +{plugins, [steamroller]}. + +{ + steamroller, + [ + {line_length, 80}, + {indent, 2}, + {inputs, ["rebar.config", "{src,test,include}/*.{[he]rl,app.src}"]} + ] +}. diff --git a/rebar.lock b/rebar.lock index c63fc7c..ae094bc 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,24 +1,37 @@ -{"1.1.0", -[{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},2}, - {<<"hamcrest">>,{pkg,<<"basho_hamcrest">>,<<"0.4.1">>},2}, - {<<"iso8601">>,{pkg,<<"inaka_iso8601">>,<<"1.1.2">>},0}, - {<<"lager">>,{pkg,<<"lager">>,<<"3.2.4">>},1}, +{"1.2.0", +[{<<"iso8601">>,{pkg,<<"inaka_iso8601">>,<<"1.1.2">>},0}, + {<<"meck">>, + {git,"https://github.com/eproxus/meck.git", + {ref,"4ecc1ae9089edc6977e8c8c4cd41081513cc5590"}}, + 3}, + {<<"protobuffs">>, + {git,"https://github.com/basho/erlang_protobuffs.git", + {ref,"098efad8f85dfe556d64e2cf6ce31f2075808f67"}}, + 2}, {<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.5.4">>},2}, - {<<"riak_pb">>,{pkg,<<"riak_pb">>,<<"2.3.2">>},1}, - {<<"riakc">>,{pkg,<<"riakc">>,<<"2.5.3">>},0}, - {<<"sumo_db">>,{pkg,<<"sumo_db">>,<<"0.7.2">>},0}, - {<<"uuid">>,{pkg,<<"uuid_erl">>,<<"1.5.4">>},1}, - {<<"worker_pool">>,{pkg,<<"worker_pool">>,<<"2.0.1">>},1}]}. + {<<"riak_pb">>, + {git,"https://github.com/basho/riak_pb", + {ref,"b7abca90e4c708073021d2c4e18c896f1a10b838"}}, + 1}, + {<<"riakc">>, + {git,"git://github.com/basho/riak-erlang-client.git", + {ref,"d9fa46335fda4704fda71cbc011ab6a11e50e3f3"}}, + 0}, + {<<"sumo_db">>, + {git,"https://github.com/jagguli/sumo_db.git", + {ref,"372fa1c25de4aabfd3db56944df8ec5157db3c03"}}, + 0}, + {<<"uuid">>,{pkg,<<"uuid_erl">>,<<"1.5.2-rc1">>},1}, + {<<"worker_pool">>,{pkg,<<"worker_pool">>,<<"3.1.0">>},1}]}. [ {pkg_hash,[ - {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"hamcrest">>, <<"FB7B2C92D252A1E9DB936750B86089ADDAEBEB8F87967FB4BBDDA61E8863338E">>}, {<<"iso8601">>, <<"6D84BBA9641FA39802E6B53C57E0B61B2F61BF8E81C112356B57BE8DA31DE771">>}, - {<<"lager">>, <<"A6DEB74DAE7927F46BD13255268308EF03EB206EC784A94EAF7C1C0F3B811615">>}, {<<"quickrand">>, <<"47ADD4755CC5F209CBEFFD6F47C84061196CD7FAD99FD8FD12418EB0D06B939D">>}, - {<<"riak_pb">>, <<"48FFBF66DBB3F136AB9A7134BAC4E496754BAA5EF58C4F50A61326736D996390">>}, - {<<"riakc">>, <<"6132D9E687A0DFD314B2B24C4594302CA8B55568A5D733C491D8FB6CD4004763">>}, - {<<"sumo_db">>, <<"B56544DB18826D1EE5296042B5D4461D0805B7F54C998ED8338F1BC59E284749">>}, - {<<"uuid">>, <<"06240EFF3EAF013CC22274689B0C1FF6345645EE5D47A9E4ED7545EEB7698B23">>}, - {<<"worker_pool">>, <<"B90273074898FA89434317991E00884DBBAFFAB5BFD964A7586317CD16FB18D4">>}]} + {<<"uuid">>, <<"D4022AB3F4F1A28E86EA15D4075CB0C57EC908D8AF1CA2E8AF28AA815EF93C3A">>}, + {<<"worker_pool">>, <<"C908627E04057CF29940AD0E79B89AB161DB520EEBC76942EFD08A187BABF93A">>}]}, +{pkg_hash_ext,[ + {<<"iso8601">>, <<"8964665B3EDBC8C4B390512D9DE429A550CC32FF3FA3583F2E6F14925127D480">>}, + {<<"quickrand">>, <<"2657DCA4544BF98EC44328F95F2C94A07ACCECE452195C599E43799A760B28B0">>}, + {<<"uuid">>, <<"4297348D9D52A38E2446185F979CB8786FE8784AC053ABE6724C2AF7D7930E38">>}, + {<<"worker_pool">>, <<"F809B89F46B2CB5DFD2B731A2C8B234DB2788490FAB88EB0823DA18B64851EE2">>}]} ]. diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index badc73a..0a7e847 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -17,50 +17,51 @@ %%% @end %%% @copyright Inaka %%% + -module(sumo_backend_riak). + -author("Carlos Andres Bolanos "). + -license("Apache License 2.0"). -behaviour(gen_server). -behaviour(sumo_backend). %%% API --export([ - start_link/2, - get_connection/1 -]). + +-export([start_link/2, get_connection/1]). %%% Exports for gen_server --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). + +-export( + [ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ] +). %%%============================================================================= %%% Types %%%============================================================================= --record(state, { - host :: string(), - port :: non_neg_integer(), - opts :: [term()]}). +-record(state, {host :: string(), port :: non_neg_integer(), opts :: [term()]}). + -type state() :: #state{}. %%%============================================================================= %%% API %%%============================================================================= --spec start_link(atom(), proplists:proplist()) -> {ok, pid()}|term(). +-spec start_link(atom(), proplists:proplist()) -> {ok, pid()} | term(). start_link(Name, Options) -> gen_server:start_link({local, Name}, ?MODULE, Options, []). -spec get_connection(atom() | pid()) -> pid(). -get_connection(Name) -> - gen_server:call(Name, get_connection). +get_connection(Name) -> gen_server:call(Name, get_connection). %%%============================================================================= %%% gen_server callbacks @@ -76,10 +77,13 @@ init(Options) -> %% @todo: implement connection pool. %% In other cases is a built-in feature of the client. + -spec handle_call(term(), term(), state()) -> {reply, term(), state()}. -handle_call(get_connection, - _From, - State = #state{host = Host, port = Port, opts = Opts}) -> +handle_call( + get_connection, + _From, + State = #state{host = Host, port = Port, opts = Opts} +) -> {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), {reply, Conn, State}. @@ -107,11 +111,12 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. riak_opts(Options) -> User = proplists:get_value(username, Options), Pass = proplists:get_value(password, Options), - Opts0 = case User /= undefined andalso Pass /= undefined of - true -> [{credentials, User, Pass}]; - _ -> [] - end, + Opts0 = + case User /= undefined andalso Pass /= undefined of + true -> [{credentials, User, Pass}]; + _ -> [] + end, case lists:keyfind(connect_timeout, 1, Options) of {_, V1} -> [{connect_timeout, V1}, {auto_reconnect, true}] ++ Opts0; - _ -> [{auto_reconnect, true}] ++ Opts0 + _ -> [{auto_reconnect, true}] ++ Opts0 end. diff --git a/src/sumo_db_riak.app.src b/src/sumo_db_riak.app.src index 6b86fd0..d9570f4 100644 --- a/src/sumo_db_riak.app.src +++ b/src/sumo_db_riak.app.src @@ -1,21 +1,18 @@ -{application, sumo_db_riak, [ - {description, "SumoDB Riak Adapter."}, - {vsn, "0.1.0"}, - {id, "sumo_db_riak"}, - {registered, []}, - {applications, [ - kernel, - stdlib, - sumo_db, - riakc - ]}, - {modules, []}, - {mod, {sumo_db_riak, []}}, - {env, []}, - {maintainers, ["Inaka"]}, - {licenses, ["Apache 2.0"]}, - {links, [ - {"Github", "https://github.com/inaka/sumo_db_riak"} - ]}, - {build_tools, ["rebar3"]} -]}. +{ + application, + sumo_db_riak, + [ + {description, "SumoDB Riak Adapter."}, + {vsn, "0.1.0"}, + {id, "sumo_db_riak"}, + {registered, []}, + {applications, [kernel, stdlib, sumo_db, riakc]}, + {modules, []}, + {mod, {sumo_db_riak, []}}, + {env, []}, + {maintainers, ["Inaka"]}, + {licenses, ["Apache 2.0"]}, + {links, [{"Github", "https://github.com/inaka/sumo_db_riak"}]}, + {build_tools, ["rebar3"]} + ] +}. diff --git a/src/sumo_db_riak.erl b/src/sumo_db_riak.erl index 32545ce..d8276cf 100644 --- a/src/sumo_db_riak.erl +++ b/src/sumo_db_riak.erl @@ -3,16 +3,12 @@ -behaviour(application). %% API --export([ - start/0, - stop/0 -]). + +-export([start/0, stop/0]). %% Application callbacks --export([ - start/2, - stop/1 -]). + +-export([start/2, stop/1]). %%%============================================================================= %%% API @@ -28,15 +24,14 @@ stop() -> application:stop(sumo_db_riak). %%% Application callbacks %%%============================================================================= --spec start(StartType, StartArgs) -> Response when - StartType :: application:start_type(), - StartArgs :: term(), - State :: term(), - Reason :: term(), - Response :: {ok, pid()} | {ok, pid(), State} | {error, Reason}. -start(_StartType, _StartArgs) -> - {ok, self()}. - --spec(stop(State :: term()) -> term()). -stop(_State) -> - ok. +-spec start(StartType, StartArgs) -> + Response + when StartType :: application:start_type(), + StartArgs :: term(), + State :: term(), + Reason :: term(), + Response :: {ok, pid()} | {ok, pid(), State} | {error, Reason}. +start(_StartType, _StartArgs) -> {ok, self()}. + +-spec stop(State :: term()) -> term(). +stop(_State) -> ok. diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index f6d9760..0024cf3 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -29,9 +29,13 @@ %%% @end %%% @copyright Inaka %%% + -module(sumo_store_riak). + -author("Carlos Andres Bolanos "). + -github("https://github.com/inaka"). + -license("Apache License 2.0"). -behavior(sumo_store). @@ -40,43 +44,54 @@ %% @todo remove this when riakc releases a new version > 2.5.3 %% They already fixed on master so we should wait until they release a new version + -dialyzer([{nowarn_function, new_doc/2}]). %% API. --export([ - init/1, - create_schema/2, - persist/2, - fetch/3, - delete_by/3, - delete_all/2, - find_all/2, find_all/5, - find_by/3, find_by/5, find_by/6, - count/2 -]). + +-export( + [ + init/1, + create_schema/2, + persist/2, + fetch/3, + delete_by/3, + delete_all/2, + find_all/2, + find_all/5, + find_by/3, + find_by/5, + find_by/6, + count/2, + count_by/3 + ] +). %% Utilities --export([ - doc_to_rmap/1, - map_to_rmap/1, - rmap_to_doc/2, - rmap_to_map/2, - fetch_map/4, - fetch_docs/5, - delete_map/4, - update_map/5, - search/6, - build_query/2 -]). + +-export( + [ + doc_to_rmap/1, + map_to_rmap/1, + rmap_to_doc/2, + rmap_to_map/2, + fetch_map/4, + fetch_docs/5, + delete_map/4, + update_map/5, + search/6, + build_query/2 + ] +). %%%============================================================================= %%% Types %%%============================================================================= - %% Riak base parameters + -type connection() :: pid(). --type index() :: binary(). --type options() :: [proplists:property()]. +-type index() :: binary(). +-type options() :: [proplists:property()]. -export_type([connection/0, index/0, options/0]). @@ -89,14 +104,18 @@ %% del_opts: Riak delete options parameters. %% Reference. %% @end --record(state, { - conn :: connection(), - bucket :: {binary(), binary()}, - index :: index(), - get_opts :: get_options(), - put_opts :: put_options(), - del_opts :: delete_options() -}). + +-record( + state, + { + conn :: connection(), + bucket :: {binary(), binary()}, + index :: index(), + get_opts :: get_options(), + put_opts :: put_options(), + del_opts :: delete_options() + } +). -type state() :: #state{}. @@ -110,126 +129,135 @@ init(Opts) -> % which creates and initializes the storage backend. Backend = proplists:get_value(storage_backend, Opts), Conn = sumo_backend_riak:get_connection(Backend), - BucketType = sumo_utils:to_bin(sumo_utils:keyfind(bucket_type, Opts, <<"maps">>)), + BucketType = + sumo_utils:to_bin(sumo_utils:keyfind(bucket_type, Opts, <<"maps">>)), Bucket = sumo_utils:to_bin(sumo_utils:keyfind(bucket, Opts, <<"sumo">>)), Index = sumo_utils:to_bin(sumo_utils:keyfind(index, Opts, <<"sumo_index">>)), GetOpts = proplists:get_value(get_options, Opts, []), PutOpts = proplists:get_value(put_options, Opts, []), DelOpts = proplists:get_value(delete_options, Opts, []), - - State = #state{ - conn = Conn, - bucket = {BucketType, Bucket}, - index = Index, - get_opts = GetOpts, - put_opts = PutOpts, - del_opts = DelOpts - }, + State = + #state{ + conn = Conn, + bucket = {BucketType, Bucket}, + index = Index, + get_opts = GetOpts, + put_opts = PutOpts, + del_opts = DelOpts + }, {ok, State}. --spec persist(Doc, State) -> Response when - Doc :: sumo_internal:doc(), - State :: state(), - Response :: sumo_store:result(sumo_internal:doc(), state()). + +-spec persist(Doc, State) -> + Response + when Doc :: sumo_internal:doc(), + State :: state(), + Response :: sumo_store:result(sumo_internal:doc(), state()). persist(Doc, #state{conn = Conn, bucket = Bucket, put_opts = Opts} = State) -> {Id, NewDoc} = new_doc(sleep(Doc), State), case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), Opts) of - {error, Error} -> - {error, Error, State}; - _ -> - {ok, wakeup(NewDoc), State} + {error, Error} -> {error, Error, State}; + _ -> {ok, wakeup(NewDoc), State} end. --spec fetch(DocName, Id, State) -> Response when - DocName :: sumo:schema_name(), - Id :: sumo:field_value(), - State :: state(), - Response :: sumo_store:result(sumo_internal:doc(), state()). + +-spec fetch(DocName, Id, State) -> + Response + when DocName :: sumo:schema_name(), + Id :: sumo:field_value(), + State :: state(), + Response :: sumo_store:result(sumo_internal:doc(), state()). fetch(DocName, Id, State) -> #state{conn = Conn, bucket = Bucket, get_opts = Opts} = State, case fetch_map(Conn, Bucket, sumo_utils:to_bin(Id), Opts) of - {ok, RMap} -> - {ok, rmap_to_doc(DocName, RMap), State}; - {error, {notfound, _Type = map}} -> - {error, notfound, State}; - {error, Error} -> - {error, Error, State} + {ok, RMap} -> {ok, rmap_to_doc(DocName, RMap), State}; + {error, {notfound, _Type = map}} -> {error, notfound, State}; + {error, Error} -> {error, Error, State} end. --spec delete_by(DocName, Conditions, State) -> Response when - DocName :: sumo:schema_name(), - Conditions :: sumo:conditions(), - State :: state(), - Response :: sumo_store:result(sumo_store:affected_rows(), state()). + +-spec delete_by(DocName, Conditions, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + State :: state(), + Response :: sumo_store:result(sumo_store:affected_rows(), state()). delete_by(DocName, Conditions, State) when is_list(Conditions) -> #state{conn = Conn, bucket = Bucket, index = Index, del_opts = Opts} = State, IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of {_K, Key} -> case delete_map(Conn, Bucket, sumo_utils:to_bin(Key), Opts) of - ok -> - {ok, 1, State}; - {error, Error} -> - {error, Error, State} + ok -> {ok, 1, State}; + {error, Error} -> {error, Error, State} end; + _ -> Query = build_query(Conditions, Bucket), case search_keys_by(Conn, Index, Query, [], 0, 0) of - {ok, {Total, Res}} -> + {ok, {Total, Res}} -> delete_keys(Conn, Bucket, Res, Opts), {ok, Total, State}; - {error, Error} -> - {error, Error, State} + + {error, Error} -> {error, Error, State} end end; + delete_by(DocName, Conditions, State) -> #state{conn = Conn, bucket = Bucket, index = Index, del_opts = Opts} = State, TranslatedConditions = transform_conditions(DocName, Conditions), Query = build_query(TranslatedConditions, Bucket), case search_keys_by(Conn, Index, Query, [], 0, 0) of - {ok, {Total, Res}} -> + {ok, {Total, Res}} -> delete_keys(Conn, Bucket, Res, Opts), {ok, Total, State}; - {error, Error} -> - {error, Error, State} + + {error, Error} -> {error, Error, State} end. --spec delete_all(DocName, State) -> Response when - DocName :: sumo:schema_name(), - State :: state(), - Response :: sumo_store:result(sumo_store:affected_rows(), state()). + +-spec delete_all(DocName, State) -> + Response + when DocName :: sumo:schema_name(), + State :: state(), + Response :: sumo_store:result(sumo_store:affected_rows(), state()). delete_all(_DocName, State) -> #state{conn = Conn, bucket = Bucket, del_opts = Opts} = State, - Del = fun(Kst, Acc) -> - lists:foreach(fun(K) -> delete_map(Conn, Bucket, K, Opts) end, Kst), - Acc + length(Kst) - end, + Del = + fun + (Kst, Acc) -> + lists:foreach(fun (K) -> delete_map(Conn, Bucket, K, Opts) end, Kst), + Acc + length(Kst) + end, case stream_keys(Conn, Bucket, Del, 0) of - {ok, Count} -> {ok, Count, State}; + {ok, Count} -> {ok, Count, State}; {error, Reason, Count} -> {error, {stream_keys, Reason, Count}, State} end. --spec find_all(DocName, State) -> Response when - DocName :: sumo:schema_name(), - State :: state(), - Response :: sumo_store:result([sumo_internal:doc()], state()). + +-spec find_all(DocName, State) -> + Response + when DocName :: sumo:schema_name(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, State) -> #state{conn = Conn, bucket = Bucket, get_opts = Opts} = State, - Get = fun(Kst, Acc) -> - fetch_docs(DocName, Conn, Bucket, Kst, Opts) ++ Acc - end, + Get = + fun (Kst, Acc) -> fetch_docs(DocName, Conn, Bucket, Kst, Opts) ++ Acc end, case stream_keys(Conn, Bucket, Get, []) of - {ok, Docs} -> {ok, Docs, State}; + {ok, Docs} -> {ok, Docs, State}; {error, Reason, Count} -> {error, {stream_keys, Reason, Count}, State} end. --spec find_all(DocName, Sort, Limit, Offset, State) -> Response when - DocName :: sumo:schema_name(), - Sort :: term(), - Limit :: non_neg_integer(), - Offset :: non_neg_integer(), - State :: state(), - Response :: sumo_store:result([sumo_internal:doc()], state()). + +-spec find_all(DocName, Sort, Limit, Offset, State) -> + Response + when DocName :: sumo:schema_name(), + Sort :: term(), + Limit :: non_neg_integer(), + Offset :: non_neg_integer(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, Sort, Limit, Offset, State) -> find_by(DocName, [], Sort, Limit, Offset, State). @@ -240,21 +268,24 @@ find_all(DocName, Sort, Limit, Offset, State) -> %% query exist, and then obtain results for all of them. %% This is done to overcome Solr's default pagination value of 10. %% @end --spec find_by(DocName, Conditions, State) -> Response when - DocName :: sumo:schema_name(), - Conditions :: sumo:conditions(), - State :: state(), - Response :: sumo_store:result([sumo_internal:doc()], state()). + +-spec find_by(DocName, Conditions, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). find_by(DocName, Conditions, State) -> find_by(DocName, Conditions, undefined, undefined, State). --spec find_by(DocName, Conditions, Limit, Offset, State) -> Response when - DocName :: sumo:schema_name(), - Conditions :: sumo:conditions(), - Limit :: non_neg_integer() | undefined, - Offset :: non_neg_integer() | undefined, - State :: state(), - Response :: sumo_store:result([sumo_internal:doc()], state()). +-spec find_by(DocName, Conditions, Limit, Offset, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + Limit :: non_neg_integer() | undefined, + Offset :: non_neg_integer() | undefined, + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). find_by(DocName, Conditions, undefined, undefined, State) -> %% First get all keys matching the query, and then obtain documents for those %% keys. @@ -265,21 +296,24 @@ find_by(DocName, Conditions, undefined, undefined, State) -> {ok, Keys} -> Results = fetch_docs(DocName, Conn, Bucket, Keys, Opts), {ok, Results, State}; - {error, Error} -> - {error, Error, State} + + {error, Error} -> {error, Error, State} end; + find_by(DocName, Conditions, Limit, Offset, State) -> %% Limit and offset were specified so we return a possibly partial result set. find_by(DocName, Conditions, [], Limit, Offset, State). --spec find_by(DocName, Conditions, Sort, Limit, Offset, State) -> Response when - DocName :: sumo:schema_name(), - Conditions :: sumo:conditions(), - Sort :: term(), - Limit :: non_neg_integer(), - Offset :: non_neg_integer(), - State :: state(), - Response :: sumo_store:result([sumo_internal:doc()], state()). + +-spec find_by(DocName, Conditions, Sort, Limit, Offset, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + Sort :: term(), + Limit :: non_neg_integer(), + Offset :: non_neg_integer(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). find_by(DocName, Conditions, Sort, Limit, Offset, State) -> #state{conn = Conn, bucket = Bucket, index = Index, get_opts = Opts} = State, TranslatedConditions = transform_conditions(DocName, Conditions), @@ -289,8 +323,8 @@ find_by(DocName, Conditions, Sort, Limit, Offset, State) -> {ok, {_Total, Keys}} -> Results = fetch_docs(DocName, Conn, Bucket, Keys, Opts), {ok, Results, State}; - {error, Error} -> - {error, Error, State} + + {error, Error} -> {error, Error, State} end. %% @doc @@ -301,44 +335,55 @@ find_by(DocName, Conditions, Sort, Limit, Offset, State) -> %% set, and then it fetches the rest fo them. %% @end %% @private + find_by_query_get_keys(Conn, Index, Query) -> - InitialResults = case search_keys_by(Conn, Index, Query, [], 0, 0) of - {ok, {Total, Keys}} -> {ok, length(Keys), Total, Keys}; - Error -> Error - end, + InitialResults = + case search_keys_by(Conn, Index, Query, [], 0, 0) of + {ok, {Total, Keys}} -> {ok, length(Keys), Total, Keys}; + Error -> Error + end, case InitialResults of {ok, ResultCount, Total1, Keys1} when ResultCount < Total1 -> - Limit = Total1 - ResultCount, + Limit = Total1 - ResultCount, Offset = ResultCount, case search_keys_by(Conn, Index, Query, [], Limit, Offset) of - {ok, {Total1, Keys2}} -> - {ok, lists:append(Keys1, Keys2)}; - {error, Error1} -> - {error, Error1} + {ok, {Total1, Keys2}} -> {ok, lists:append(Keys1, Keys2)}; + {error, Error1} -> {error, Error1} end; - {ok, _ResultCount, _Total, Keys1} -> - {ok, Keys1}; - {error, Error2} -> - {error, Error2} + + {ok, _ResultCount, _Total, Keys1} -> {ok, Keys1}; + {error, Error2} -> {error, Error2} end. --spec count(DocName, State) -> Response when - DocName :: sumo:schema_name(), - State :: state(), - Response :: sumo_store:result(non_neg_integer(), state()). + +-spec count(DocName, State) -> + Response + when DocName :: sumo:schema_name(), + State :: state(), + Response :: sumo_store:result(non_neg_integer(), state()). count(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> - Sum = fun(Kst, Acc) -> length(Kst) + Acc end, + Sum = fun (Kst, Acc) -> length(Kst) + Acc end, case stream_keys(Conn, Bucket, Sum, 0) of {ok, Count} -> {ok, Count, State}; - {_, _, _} -> {error, {error, count_failed}, State} + {_, _, _} -> {error, {error, count_failed}, State} end. --spec create_schema(Schema, State) -> Response when - Schema :: sumo_internal:schema(), - State :: state(), - Response :: sumo_store:result(state()). -create_schema(_Schema, State) -> - {ok, State}. + +-spec create_schema(Schema, State) -> + Response + when Schema :: sumo_internal:schema(), + State :: state(), + Response :: sumo_store:result(state()). +create_schema(_Schema, State) -> {ok, State}. + +-spec count_by(DocName, Conditions, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + State :: state(), + Response :: sumo_store:result(non_neg_integer(), state()). +count_by(DocName, [], State) -> count(DocName, State); +count_by(_DocName, _Conditions, #{conn := _Conn} = _State) -> 0. %%%============================================================================= %%% Utilities @@ -349,221 +394,290 @@ doc_to_rmap(Doc) -> Fields = sumo_internal:doc_fields(Doc), map_to_rmap(Fields). + -spec map_to_rmap(map()) -> riakc_map:crdt_map(). map_to_rmap(Map) -> lists:foldl(fun rmap_update/2, riakc_map:new(), maps:to_list(Map)). --spec rmap_to_doc(sumo:schema_name(), riakc_map:crdt_map()) -> sumo_internal:doc(). +-spec rmap_to_doc(sumo:schema_name(), riakc_map:crdt_map()) -> + sumo_internal:doc(). rmap_to_doc(DocName, RMap) -> wakeup(sumo_internal:new_doc(DocName, rmap_to_map(DocName, RMap))). -spec rmap_to_map(sumo:schema_name(), riakc_map:crdt_map()) -> map(). rmap_to_map(DocName, RMap) -> - lists:foldl(fun - ({{K, map}, V}, Acc) -> - NewV = rmap_to_map(DocName, {map, V, [], [], undefined}), - maps:put(sumo_utils:to_atom(K), NewV, Acc); - ({{K, _}, V}, Acc) -> - maps:put(sumo_utils:to_atom(K), V, Acc) - end, #{}, riakc_map:value(RMap)). - --spec fetch_map(Conn, Bucket, Key, Opts) -> Result when - Conn :: connection(), - Bucket :: bucket_and_type(), - Key :: key(), - Opts :: options(), - Result :: {ok, riakc_datatype:datatype()} | {error, term()}. + lists:foldl( + fun + ({{K, map}, V}, Acc) -> + NewV = rmap_to_map(DocName, {map, V, [], [], undefined}), + maps:put(sumo_utils:to_atom(K), NewV, Acc); + + ({{K, _}, V}, Acc) -> maps:put(sumo_utils:to_atom(K), V, Acc) + end, + #{}, + riakc_map:value(RMap) + ). + + +-spec fetch_map(Conn, Bucket, Key, Opts) -> + Result + when Conn :: connection(), + Bucket :: bucket_and_type(), + Key :: key(), + Opts :: options(), + Result :: {ok, riakc_datatype:datatype()} | {error, term()}. fetch_map(Conn, Bucket, Key, Opts) -> riakc_pb_socket:fetch_type(Conn, Bucket, Key, Opts). --spec fetch_docs(DocName, Conn, Bucket, Keys, Opts) -> Result when - DocName :: sumo:schema_name(), - Conn :: connection(), - Bucket :: bucket_and_type(), - Keys :: [key()], - Opts :: options(), - Result :: [sumo_internal:doc()]. +-spec fetch_docs(DocName, Conn, Bucket, Keys, Opts) -> + Result + when DocName :: sumo:schema_name(), + Conn :: connection(), + Bucket :: bucket_and_type(), + Keys :: [key()], + Opts :: options(), + Result :: [sumo_internal:doc()]. fetch_docs(DocName, Conn, Bucket, Keys, Opts) -> - lists:foldl(fun(K, Acc) -> - case fetch_map(Conn, Bucket, K, Opts) of - {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; - _ -> Acc - end - end, [], Keys). - --spec delete_map(connection(), bucket_and_type(), key(), options()) -> ok | {error, term()}. + lists:foldl( + fun + (K, Acc) -> + case fetch_map(Conn, Bucket, K, Opts) of + {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; + _ -> Acc + end + end, + [], + Keys + ). + + +-spec delete_map(connection(), bucket_and_type(), key(), options()) -> + ok | {error, term()}. delete_map(Conn, Bucket, Key, Opts) -> riakc_pb_socket:delete(Conn, Bucket, Key, Opts). --spec update_map(Conn, Bucket, Key, Map, Opts) -> Result when - Conn :: connection(), - Bucket :: bucket_and_type(), - Key :: key() | undefined, - Map :: riakc_map:crdt_map(), - Opts :: options(), - Ok :: ok | {ok, Key | riakc_datatype:datatype()} | {ok, Key, riakc_datatype:datatype()}, - Error :: {error, term()}, - Result :: Ok | Error. +-spec update_map(Conn, Bucket, Key, Map, Opts) -> + Result + when Conn :: connection(), + Bucket :: bucket_and_type(), + Key :: key() | undefined, + Map :: riakc_map:crdt_map(), + Opts :: options(), + Ok + :: + ok + | {ok, Key | riakc_datatype:datatype()} + | {ok, Key, riakc_datatype:datatype()}, + Error :: {error, term()}, + Result :: Ok | Error. update_map(Conn, Bucket, Key, Map, Opts) -> riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map), Opts). --spec search(Conn, Index, Query, Sort, Limit, Offset) -> Result when - Conn :: connection(), - Index :: index(), - Query :: binary(), - Sort :: [term()], - Limit :: non_neg_integer(), - Offset :: non_neg_integer(), - Result :: {ok, search_result()} | {error, term()}. +-spec search(Conn, Index, Query, Sort, Limit, Offset) -> + Result + when Conn :: connection(), + Index :: index(), + Query :: binary(), + Sort :: [term()], + Limit :: non_neg_integer(), + Offset :: non_neg_integer(), + Result :: {ok, search_result()} | {error, term()}. search(Conn, Index, Query, Sort, 0, 0) -> riakc_pb_socket:search(Conn, Index, Query, Sort); + search(Conn, Index, Query, Sort, Limit, Offset) -> - riakc_pb_socket:search(Conn, Index, Query, [{start, Offset}, {rows, Limit}] ++ Sort). + riakc_pb_socket:search( + Conn, + Index, + Query, + [{start, Offset}, {rows, Limit}] ++ Sort + ). -spec build_query(sumo:conditions(), {binary(), binary()}) -> binary(). build_query(Conditions, {Type, Bucket}) -> - Query = build_query1(Conditions, fun escape/1, fun quote/1), - <<"_yz_rt:\"", Type/binary, "\" AND " , "_yz_rb:\"", Bucket/binary, "\" AND ", Query/binary>>. + Query = build_query1(Conditions, fun escape/1, fun quote/1), + << + "_yz_rt:\"", + Type/binary, + "\" AND ", + "_yz_rb:\"", + Bucket/binary, + "\" AND ", + Query/binary + >>. %%%============================================================================= %%% Internal functions %%%============================================================================= - %% @private + transform_conditions(DocName, Conditions) -> sumo_utils:transform_conditions( - fun validate_date/1, DocName, Conditions, [date, datetime]). + fun validate_date/1, + DocName, + Conditions, + [date, datetime] + ). %% @private + validate_date({FieldType, _, FieldValue}) -> case {FieldType, sumo_utils:is_datetime(FieldValue)} of - {datetime, true} -> - iso8601:format(FieldValue); + {datetime, true} -> iso8601:format(FieldValue); + {date, true} -> DateTime = {FieldValue, {0, 0, 0}}, iso8601:format(DateTime) end. %% @private -sleep(Doc) -> - sumo_utils:doc_transform(fun sleep_fun/4, Doc). + +sleep(Doc) -> sumo_utils:doc_transform(fun sleep_fun/4, Doc). + %% @private -sleep_fun(_, FieldName, undefined, _) when FieldName /= id -> - <<"$nil">>; + +sleep_fun(_, FieldName, undefined, _) when FieldName /= id -> <<"$nil">>; + sleep_fun(FieldType, _, FieldValue, _) - when FieldType =:= datetime; FieldType =:= date -> +when FieldType =:= datetime; FieldType =:= date -> case {FieldType, sumo_utils:is_datetime(FieldValue)} of {datetime, true} -> iso8601:format(FieldValue); - {date, true} -> iso8601:format({FieldValue, {0, 0, 0}}); - _ -> FieldValue + {date, true} -> iso8601:format({FieldValue, {0, 0, 0}}); + _ -> FieldValue end; + sleep_fun(custom, _, FieldValue, FieldAttrs) -> Type = sumo_utils:keyfind(type, FieldAttrs, custom), sleep_custom(FieldValue, Type); -sleep_fun(_, _, FieldValue, _) -> - FieldValue. + +sleep_fun(_, _, FieldValue, _) -> FieldValue. %% @private + sleep_custom(FieldValue, FieldType) -> case lists:member(FieldType, [term, tuple, map, list]) of true -> base64:encode(term_to_binary(FieldValue)); - _ -> FieldValue + _ -> FieldValue end. %% @private -wakeup(Doc) -> - sumo_utils:doc_transform(fun wakeup_fun/4, Doc). -wakeup_fun(_, _, <<"$nil">>, _) -> - undefined; + +wakeup(Doc) -> sumo_utils:doc_transform(fun wakeup_fun/4, Doc). + +wakeup_fun(_, _, undefined, _) -> undefined; +wakeup_fun(_, _, <<"$nil">>, _) -> undefined; + wakeup_fun(FieldType, _, FieldValue, _) - when FieldType =:= datetime; FieldType =:= date -> - case {FieldType, iso8601:is_datetime(FieldValue)} of +when FieldType =:= datetime; FieldType =:= date -> + case {FieldType, sumo_utils:is_datetime(FieldValue)} of {datetime, true} -> iso8601:parse(FieldValue); - {date, true} -> {Date, _} = iso8601:parse(FieldValue), Date; - _ -> FieldValue + + {date, true} -> + {Date, _} = iso8601:parse(FieldValue), + Date; + + _ -> FieldValue end; + wakeup_fun(integer, _, FieldValue, _) when is_binary(FieldValue) -> binary_to_integer(FieldValue); + wakeup_fun(float, _, FieldValue, _) when is_binary(FieldValue) -> binary_to_float(FieldValue); + wakeup_fun(boolean, _, FieldValue, _) when is_binary(FieldValue) -> binary_to_atom(FieldValue, utf8); + wakeup_fun(custom, _, FieldValue, FieldAttrs) -> Type = sumo_utils:keyfind(type, FieldAttrs, custom), wakeup_custom(FieldValue, Type); -wakeup_fun(_, _, FieldValue, _) -> - FieldValue. + +wakeup_fun(_, _, FieldValue, _) -> FieldValue. %% @private + wakeup_custom(FieldValue, FieldType) -> case lists:member(FieldType, [term, tuple, map, list]) of true -> binary_to_term(base64:decode(FieldValue)); - _ -> FieldValue + _ -> FieldValue end. %% @private + new_doc(Doc, #state{conn = Conn, bucket = Bucket, put_opts = Opts}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), - Id = case sumo_internal:get_field(IdField, Doc) of - undefined -> - case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), Opts) of - {ok, RiakMapId} -> RiakMapId; - {error, Error} -> exit(Error); - Unexpected -> exit({unexpected, Unexpected}) - end; - Id0 -> - sumo_utils:to_bin(Id0) - end, + Id = + case sumo_internal:get_field(IdField, Doc) of + undefined -> + case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), Opts) of + {ok, RiakMapId} -> RiakMapId; + {error, Error} -> exit(Error); + Unexpected -> exit({unexpected, Unexpected}) + end; + + Id0 -> sumo_utils:to_bin(Id0) + end, {Id, sumo_internal:set_field(IdField, Id, Doc)}. %% @private -list_to_rset(_, [], Acc) -> - Acc; + +list_to_rset(_, [], Acc) -> Acc; + list_to_rset(K, [H | T], Acc) -> - M = riakc_map:update( - {sumo_utils:to_bin(K), set}, - fun(S) -> riakc_set:add_element(sumo_utils:to_bin(H), S) end, - Acc), + M = + riakc_map:update( + {sumo_utils:to_bin(K), set}, + fun (S) -> riakc_set:add_element(sumo_utils:to_bin(H), S) end, + Acc + ), list_to_rset(K, T, M). %% @private + rmap_update({K, V}, RMap) when is_map(V) -> NewV = map_to_rmap(V), - riakc_map:update({sumo_utils:to_bin(K), map}, fun(_M) -> NewV end, RMap); + riakc_map:update({sumo_utils:to_bin(K), map}, fun (_M) -> NewV end, RMap); + rmap_update({K, V}, RMap) when is_list(V) -> case io_lib:printable_list(V) of true -> riakc_map:update( {sumo_utils:to_bin(K), register}, - fun(R) -> riakc_register:set(sumo_utils:to_bin(V), R) end, - RMap); - false -> - list_to_rset(K, V, RMap) + fun (R) -> riakc_register:set(sumo_utils:to_bin(V), R) end, + RMap + ); + + false -> list_to_rset(K, V, RMap) end; + rmap_update({K, V}, RMap) -> riakc_map:update( {sumo_utils:to_bin(K), register}, - fun(R) -> riakc_register:set(sumo_utils:to_bin(V), R) end, - RMap). - - + fun (R) -> riakc_register:set(sumo_utils:to_bin(V), R) end, + RMap + ). %% @private + stream_keys(Conn, Bucket, F, Acc) -> - {ok, Ref} = riakc_pb_socket:get_index_eq( - Conn, Bucket, <<"$bucket">>, <<"">>, [{stream, true}]), + {ok, Ref} = + riakc_pb_socket:get_index_eq( + Conn, + Bucket, + <<"$bucket">>, + <<"">>, + [{stream, true}] + ), receive_stream(Ref, F, Acc). %% @private + receive_stream(Ref, F, Acc) -> receive - {Ref, {_, Keys, _}} -> - receive_stream(Ref, F, F(Keys, Acc)); - {Ref, {done, _Continuation = undefined}} -> - {ok, Acc}; - Unexpected -> - {error, {unexpected, Unexpected}, Acc} + {Ref, {_, Keys, _}} -> receive_stream(Ref, F, F(Keys, Acc)); + {Ref, {done, _Continuation = undefined}} -> {ok, Acc}; + Unexpected -> {error, {unexpected, Unexpected}, Acc} after 30000 -> {error, timeout, Acc} end. @@ -573,135 +687,180 @@ receive_stream(Ref, F, Acc) -> %% Search all docs that match with the given query, but only keys are returned. %% IMPORTANT: assumes that default schema 'yokozuna' is being used. %% @end + search_keys_by(Conn, Index, Query, SortOpts, Limit, Offset) -> case sumo_store_riak:search(Conn, Index, Query, SortOpts, Limit, Offset) of {ok, {search_results, Results, _, Total}} -> - Keys = lists:foldl(fun({_, KV}, Acc) -> - {_, K} = lists:keyfind(<<"_yz_rk">>, 1, KV), - [K | Acc] - end, [], Results), + Keys = + lists:foldl( + fun + ({_, KV}, Acc) -> + {_, K} = lists:keyfind(<<"_yz_rk">>, 1, KV), + [K | Acc] + end, + [], + Results + ), {ok, {Total, Keys}}; - {error, Error} -> - {error, Error} + + {error, Error} -> {error, Error} end. %% @private + delete_keys(Conn, Bucket, Keys, Opts) -> - lists:foreach(fun(K) -> - delete_map(Conn, Bucket, K, Opts) - end, Keys). + lists:foreach(fun (K) -> delete_map(Conn, Bucket, K, Opts) end, Keys). %%%============================================================================= %%% Query Builder %%%============================================================================= - %% @private -build_query1([], _EscapeFun, _QuoteFun) -> - <<"*:*">>; + +build_query1([], _EscapeFun, _QuoteFun) -> <<"*:*">>; + build_query1(Exprs, EscapeFun, QuoteFun) when is_list(Exprs) -> Clauses = [build_query1(Expr, EscapeFun, QuoteFun) || Expr <- Exprs], binary:list_to_bin(["(", interpose(" AND ", Clauses), ")"]); + build_query1({'and', Exprs}, EscapeFun, QuoteFun) -> build_query1(Exprs, EscapeFun, QuoteFun); + build_query1({'or', Exprs}, EscapeFun, QuoteFun) -> Clauses = [build_query1(Expr, EscapeFun, QuoteFun) || Expr <- Exprs], binary:list_to_bin(["(", interpose(" OR ", Clauses), ")"]); + build_query1({'not', Expr}, EscapeFun, QuoteFun) -> binary:list_to_bin(["(NOT ", build_query1(Expr, EscapeFun, QuoteFun), ")"]); + build_query1({Name, '<', Value}, EscapeFun, _QuoteFun) -> NewVal = binary:list_to_bin(["{* TO ", EscapeFun(Value), "}"]), query_eq(Name, NewVal); + build_query1({Name, '=<', Value}, EscapeFun, _QuoteFun) -> NewVal = binary:list_to_bin(["[* TO ", EscapeFun(Value), "]"]), query_eq(Name, NewVal); + build_query1({Name, '>', Value}, EscapeFun, _QuoteFun) -> NewVal = binary:list_to_bin(["{", EscapeFun(Value), " TO *}"]), query_eq(Name, NewVal); + build_query1({Name, '>=', Value}, EscapeFun, _QuoteFun) -> NewVal = binary:list_to_bin(["[", EscapeFun(Value), " TO *]"]), query_eq(Name, NewVal); + build_query1({Name, '==', Value}, EscapeFun, QuoteFun) -> build_query1({Name, Value}, EscapeFun, QuoteFun); + build_query1({Name, '/=', Value}, EscapeFun, QuoteFun) -> build_query1({negative_field(Name), Value}, EscapeFun, QuoteFun); -build_query1({Name, 'like', Value}, _EscapeFun, _QuoteFun) -> + +build_query1({Name, like, Value}, _EscapeFun, _QuoteFun) -> NewVal = like_to_wildcard_search(Value), - Bypass = fun(X) -> X end, + Bypass = fun (X) -> X end, build_query1({Name, NewVal}, Bypass, Bypass); -build_query1({Name, 'null'}, _EscapeFun, _QuoteFun) -> + +build_query1({Name, null}, _EscapeFun, _QuoteFun) -> %% null: (Field:<<"$nil">> OR (NOT Field:[* TO *])) Val = {'or', [{Name, <<"$nil">>}, {'not', {Name, <<"[* TO *]">>}}]}, - Bypass = fun(X) -> X end, + Bypass = fun (X) -> X end, build_query1(Val, Bypass, Bypass); -build_query1({Name, 'not_null'}, _EscapeFun, _QuoteFun) -> + +build_query1({Name, not_null}, _EscapeFun, _QuoteFun) -> %% not_null: (Field:[* TO *] AND -Field:<<"$nil">>) Val = {'and', [{Name, <<"[* TO *]">>}, {Name, '/=', <<"$nil">>}]}, - Bypass = fun(X) -> X end, -build_query1(Val, Bypass, Bypass); + Bypass = fun (X) -> X end, + build_query1(Val, Bypass, Bypass); + build_query1({Name, Value}, _EscapeFun, QuoteFun) -> query_eq(Name, QuoteFun(Value)). %% @private -query_eq(K, V) -> - binary:list_to_bin([build_key(K), V]). + +query_eq(K, V) -> binary:list_to_bin([build_key(K), V]). %% @private + build_key(K) -> build_key(binary:split(sumo_utils:to_bin(K), <<".">>, [global]), <<"">>). %% @private -build_key([K], <<"">>) -> - binary:list_to_bin([K, "_register:"]); -build_key([K], Acc) -> - binary:list_to_bin([Acc, ".", K, "_register:"]); -build_key([K | T], <<"">>) -> - build_key(T, binary:list_to_bin([K, "_map"])); + +build_key([K], <<"">>) -> binary:list_to_bin([K, "_register:"]); +build_key([K], Acc) -> binary:list_to_bin([Acc, ".", K, "_register:"]); +build_key([K | T], <<"">>) -> build_key(T, binary:list_to_bin([K, "_map"])); + build_key([K | T], Acc) -> build_key(T, binary:list_to_bin([Acc, ".", K, "_map"])). %% @private -interpose(Sep, List) -> - interpose(Sep, List, []). + +interpose(Sep, List) -> interpose(Sep, List, []). %% @private -interpose(_Sep, [], Result) -> - lists:reverse(Result); -interpose(Sep, [Item | []], Result) -> - interpose(Sep, [], [Item | Result]); + +interpose(_Sep, [], Result) -> lists:reverse(Result); +interpose(Sep, [Item | []], Result) -> interpose(Sep, [], [Item | Result]); + interpose(Sep, [Item | Rest], Result) -> interpose(Sep, Rest, [Sep, Item | Result]). %% @private -negative_field(Name) -> - binary:list_to_bin([<<"-">>, sumo_utils:to_bin(Name)]). + +negative_field(Name) -> binary:list_to_bin([<<"-">>, sumo_utils:to_bin(Name)]). %% @private + quote(Value) -> BinVal = sumo_utils:to_bin(Value), - [$\", re:replace(BinVal, "[\\\"\\\\]", "\\\\&", [global]), $\"]. + [$", re:replace(BinVal, "[\\\"\\\\]", "\\\\&", [global]), $"]. %% @private + escape(Value) -> Escape = "[\\+\\-\\&\\|\\!\\(\\)\\{\\}\\[\\]\\^\\\"\\~\\*\\?\\:\\\\]", re:replace( - sumo_utils:to_bin(Value), Escape, "\\\\&", [global, {return, binary}]). + sumo_utils:to_bin(Value), + Escape, + "\\\\&", + [global, {return, binary}] + ). %% @private + whitespace(Value) -> - re:replace(Value, "[\\\s\\\\]", "\\\\&", [global, {return, binary}]). + re:replace(Value, "[\\ \\\\]", "\\\\&", [global, {return, binary}]). %% @private + like_to_wildcard_search(Like) -> - whitespace(binary:replace( - sumo_utils:to_bin(Like), <<"%">>, <<"*">>, [global])). + whitespace( + binary:replace(sumo_utils:to_bin(Like), <<"%">>, <<"*">>, [global]) + ). %% @private -build_sort([]) -> - []; + +build_sort([]) -> []; + build_sort({Field, Dir}) -> - [{sort, <<(sumo_utils:to_bin(Field))/binary, "_register", (sumo_utils:to_bin(Dir))/binary>>}]; + [ + { + sort, + << + (sumo_utils:to_bin(Field))/binary, + "_register", + (sumo_utils:to_bin(Dir))/binary + >> + } + ]; + build_sort(Sorts) -> - Res = [begin - binary:list_to_bin([sumo_utils:to_bin(Field), "_register", " ", sumo_utils:to_bin(Dir)]) - end || {Field, Dir} <- Sorts], + Res = + [ + begin + binary:list_to_bin( + [sumo_utils:to_bin(Field), "_register", " ", sumo_utils:to_bin(Dir)] + ) + end + || {Field, Dir} <- Sorts + ], [{sort, binary:list_to_bin(interpose(", ", Res))}]. diff --git a/src/sumo_store_riak2i.erl b/src/sumo_store_riak2i.erl new file mode 100644 index 0000000..729db1f --- /dev/null +++ b/src/sumo_store_riak2i.erl @@ -0,0 +1,657 @@ +%%% @hidden +%%% @doc Riak store implementation. +%%% Implementation Notes: +%%%
    +%%%
  • Riak Data Types as main structures to push/pull data.
  • +%%%
  • Bulk operations (such as: delete_all and find_all) were +%%% optimized using streaming. Records are streamed in portions +%%% (using Riak 2i to stream keys first), and then the current +%%% operation (e.g.: delete the record or accumulate the values +%%% to return them later) is applied. This allows better memory +%%% and cpu efficiency.
  • +%%%
  • Query functions were implemented using Riak Search on Data Types, +%%% to get better performance and flexibility.
  • +%%%
+%%% +%%% Copyright 2012 Inaka <hello@inaka.net> +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% @end +%%% @copyright Inaka +%%% + +-module(sumo_store_riak2i). + +-author("Steven Joseph "). + +-github("https://github.com/jagguli"). + +-license("Apache License 2.0"). + +-behavior(sumo_store). + +-include_lib("riakc/include/riakc.hrl"). + +%% @todo remove this when riakc releases a new version > 2.5.3 +%% They already fixed on master so we should wait until they release a new version + +-dialyzer([{nowarn_function, new_doc/2}]). + +%% API. + +-export( + [ + init/1, + create_schema/2, + persist/2, + fetch/3, + delete_by/3, + delete_all/2, + find_all/2, + find_all/5, + find_by/3, + find_by/5, + find_by/6, + count/2, + count_by/3, + index_field/5 + ] +). + +%% Utilities + +-export( + [ + % doc_to_rmap/1, + % map_to_rmap/1, + % rmap_to_doc/2, + % rmap_to_map/2, + fetch_obj/4 + % fetch_docs/5, + % delete_map/4, + % update_map/5, + % search/6, + % build_query/2 + ] +). + +%%%============================================================================= +%%% Types +%%%============================================================================= +%% Riak base parameters + +-type connection() :: pid(). +-type index() :: binary(). +-type options() :: [proplists:property()]. + +-export_type([connection/0, index/0, options/0]). + +%% @doc +%% conn: is the Pid of the gen_server that holds the connection with Riak +%% bucket: Riak bucket (per store) +%% index: Riak index to be used by Riak Search +%% get_opts: Riak read options parameters. +%% put_opts: Riak write options parameters. +%% del_opts: Riak delete options parameters. +%% Reference. +%% @end + +-record( + state, + { + conn :: connection(), + bucket :: {binary(), binary()}, + index :: index(), + get_opts :: get_options(), + put_opts :: put_options(), + del_opts :: delete_options() + } +). + +-type state() :: #state{}. + +%%%============================================================================= +%%% API +%%%============================================================================= + +-spec init(term()) -> {ok, term()}. +init(Opts) -> + % The storage backend key in the options specifies the name of the process + % which creates and initializes the storage backend. + Backend = proplists:get_value(storage_backend, Opts), + Conn = sumo_backend_riak:get_connection(Backend), + BucketType = sumo_utils:to_bin(sumo_utils:keyfind(bucket_type, Opts)), + Bucket = sumo_utils:to_bin(sumo_utils:keyfind(bucket, Opts)), + GetOpts = proplists:get_value(get_options, Opts, []), + PutOpts = proplists:get_value(put_options, Opts, []), + DelOpts = proplists:get_value(delete_options, Opts, []), + State = + #state{ + conn = Conn, + bucket = {BucketType, Bucket}, + index = null, + get_opts = GetOpts, + put_opts = PutOpts, + del_opts = DelOpts + }, + {ok, State}. + + +-spec persist(Doc, State) -> + Response + when Doc :: sumo_internal:doc(), + State :: state(), + Response :: sumo_store:result(sumo_internal:doc(), state()). +persist(Doc, #state{conn = Conn, bucket = Bucket, put_opts = Opts} = State) -> + {Id, NewDoc} = new_doc(sleep(Doc), State), + try update_obj(Conn, Bucket, Id, NewDoc, Opts) of + ok -> {ok, wakeup(NewDoc), State}; + Error -> {error, Error, State} + catch + Error -> {error, Error, State} + end. + + +-spec robj_to_doc(sumo:schema_name(), riakc_obj:riakc_obj()) -> + sumo_internal:doc(). +robj_to_doc(DocName, RObj) -> wakeup(sumo_internal:new_doc(DocName, RObj)). + +-spec fetch_obj(Conn, Bucket, Key, Opts) -> + Result + when Conn :: connection(), + Bucket :: bucket_and_type(), + Key :: key(), + Opts :: options(), + Result :: {ok, riakc_obj:riakc_obj()} | {error, term()}. +fetch_obj(Conn, Bucket, Id, Opts) -> + riakc_pb_socket:get(Conn, Bucket, Id, Opts). + +check_resolve_siblings(RiakObject, DocName, Conn) -> + {riakc_obj, {_BucketType, _Bucket}, _Key, _Context, _, _, _} = RiakObject, + case riakc_obj:get_contents(RiakObject) of + [] -> throw(no_value); + [{_MD, V}] -> V; + + Siblings -> + Module = sumo_config:get_prop_value(DocName, module), + {MD, V} = Module:conflict_resolver(Siblings), + Obj = + riakc_obj:update_value(riakc_obj:update_metadata(RiakObject, MD), V), + ok = riakc_pb_socket:put(Conn, Obj), + V + end. + + +-spec fetch(DocName, Id, State) -> + Response + when DocName :: sumo:schema_name(), + Id :: sumo:field_value(), + State :: state(), + Response :: sumo_store:result(sumo_internal:doc(), state()). +fetch( + DocName, + Id, + #state{conn = Conn, bucket = Bucket, get_opts = Opts} = State +) -> + case fetch_obj(Conn, Bucket, sumo_utils:to_bin(Id), Opts) of + {ok, RiakObject} -> + { + ok, + robj_to_doc( + DocName, + jsx:decode( + check_resolve_siblings(RiakObject, DocName, Conn), + [{labels, atom}, return_maps] + ) + ), + State + }; + + {error, {notfound, _Type = map}} -> {error, notfound, State}; + {error, Error} -> {error, Error, State}; + undef -> {error, undef, State} + end. + + +-spec fetch_docs(DocName, Conn, Bucket, Keys, Opts) -> + Result + when DocName :: sumo:schema_name(), + Conn :: connection(), + Bucket :: bucket_and_type(), + Keys :: [key()], + Opts :: options(), + Result :: [sumo_internal:doc()]. +fetch_docs(DocName, Conn, Bucket, Keys, Opts) -> + lists:foldl( + fun + (K, Acc) -> + case fetch_obj(Conn, Bucket, K, Opts) of + {ok, M} -> [robj_to_doc(DocName, M) | Acc]; + _ -> Acc + end + end, + [], + Keys + ). + + +-spec delete_obj(connection(), bucket_and_type(), key(), options()) -> + ok | {error, term()}. +delete_obj(Conn, Bucket, Key, Opts) -> + riakc_pb_socket:delete(Conn, Bucket, Key, Opts). + +-spec delete_by(DocName, Conditions, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + State :: state(), + Response :: sumo_store:result(sumo_store:affected_rows(), state()). +delete_by(DocName, Conditions, State) when is_list(Conditions) -> + #state{conn = Conn, bucket = Bucket, index = _Index, del_opts = Opts} = State, + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case delete_obj(Conn, Bucket, sumo_utils:to_bin(Key), Opts) of + ok -> {ok, 1, State}; + {error, Error} -> {error, Error, State} + end + end; + +delete_by(DocName, Conditions, State) -> + #state{conn = _Conn, bucket = _Bucket, index = _Index, del_opts = _Opts} = + State, + _TranslatedConditions = transform_conditions(DocName, Conditions), + logger:debug("Conditions ~p", [Conditions]), + {ok, 0, State}. + + +%Query = build_query(TranslatedConditions, Bucket), +%case search_keys_by(Conn, Index, Query, [], 0, 0) of +% {ok, {Total, Res}} -> +% delete_keys(Conn, Bucket, Res, Opts), +% {ok, Total, State}; +% {error, Error} -> +% {error, Error, State} +%end. +-spec delete_all(DocName, State) -> + Response + when DocName :: sumo:schema_name(), + State :: state(), + Response :: sumo_store:result(sumo_store:affected_rows(), state()). +delete_all(_DocName, State) -> + #state{conn = Conn, bucket = Bucket, del_opts = Opts} = State, + Del = + fun + (Kst, Acc) -> + lists:foreach(fun (K) -> delete_obj(Conn, Bucket, K, Opts) end, Kst), + Acc + length(Kst) + end, + case stream_keys(Conn, Bucket, Del, 0) of + {ok, Count} -> {ok, Count, State}; + {error, Reason, Count} -> {error, {stream_keys, Reason, Count}, State} + end. + + +-spec find_all(DocName, State) -> + Response + when DocName :: sumo:schema_name(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). +find_all(DocName, State) -> + #state{conn = Conn, bucket = Bucket, get_opts = Opts} = State, + Get = + fun (Kst, Acc) -> fetch_docs(DocName, Conn, Bucket, Kst, Opts) ++ Acc end, + case stream_keys(Conn, Bucket, Get, []) of + {ok, Docs} -> {ok, Docs, State}; + {error, Reason, Count} -> {error, {stream_keys, Reason, Count}, State} + end. + + +-spec find_all(DocName, Sort, Limit, Offset, State) -> + Response + when DocName :: sumo:schema_name(), + Sort :: term(), + Limit :: non_neg_integer(), + Offset :: non_neg_integer(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). +find_all(DocName, Sort, Limit, Offset, State) -> + find_by(DocName, [], Sort, Limit, Offset, State). + +%% @doc +%% find_by may be used in two ways: either with a given limit and offset or not +%% If a limit and offset is not given, then the atom 'undefined' is used as a +%% marker to indicate that the store should find out how many keys matching the +%% query exist, and then obtain results for all of them. +%% This is done to overcome Solr's default pagination value of 10. +%% @end + +-spec find_by(DocName, Conditions, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). +find_by(DocName, Conditions, State) -> + find_by(DocName, Conditions, undefined, undefined, State). + +-spec find_by(DocName, Conditions, Limit, Offset, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + Limit :: non_neg_integer() | undefined, + Offset :: non_neg_integer() | undefined, + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). +find_by(_DocName, _Conditions, undefined, undefined, State) -> {ok, [], State}; +%% First get all keys matching the query, and then obtain documents for those +%% keys. +%%#state{conn = Conn, bucket = Bucket, index = Index, get_opts = Opts} = State, +%%TranslatedConditions = transform_conditions(DocName, Conditions), +%%Query = build_query(TranslatedConditions, Bucket), +%%case find_by_query_get_keys(Conn, Index, Query) of +%% {ok, Keys} -> +%% Results = fetch_docs(DocName, Conn, Bucket, Keys, Opts), +%% {ok, Results, State}; +%% {error, Error} -> +%% {error, Error, State} +%%end; +find_by(DocName, Conditions, Limit, Offset, State) -> + %% Limit and offset were specified so we return a possibly partial result set. + find_by(DocName, Conditions, [], Limit, Offset, State). + + +-spec find_by(DocName, Conditions, Sort, Limit, Offset, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + Sort :: term(), + Limit :: non_neg_integer(), + Offset :: non_neg_integer(), + State :: state(), + Response :: sumo_store:result([sumo_internal:doc()], state()). +find_by(DocName, Conditions, _Sort, _Limit, _Offset, State) -> + #state{conn = _Conn, bucket = _Bucket, index = _Index, get_opts = _Opts} = + State, + TranslatedConditions = transform_conditions(DocName, Conditions), + logger:debug("Conditions ~p", [TranslatedConditions]), + {ok, [], State}. + +%%SortOpts = build_sort(Sort), +%%Query = <<(build_query(TranslatedConditions, Bucket))/binary>>, +%%case search_keys_by(Conn, Index, Query, SortOpts, Limit, Offset) of +%% {ok, {_Total, Keys}} -> +%% Results = fetch_docs(DocName, Conn, Bucket, Keys, Opts), +%% {ok, Results, State}; +%% {error, Error} -> +%% {error, Error, State} +%%end. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= +%% @private + +transform_conditions(DocName, Conditions) -> + sumo_utils:transform_conditions( + fun validate_date/1, + DocName, + Conditions, + [date, datetime] + ). + +%% @private + +validate_date({FieldType, _, FieldValue}) -> + case {FieldType, sumo_utils:is_datetime(FieldValue)} of + {datetime, true} -> date_util:datetime_to_epoch(FieldValue); + {date, true} -> date_util:datetime_to_epoch({FieldValue, {0, 0, 0}}) + end. + + +sleep(Doc) -> sumo_utils:doc_transform(fun sleep_fun/4, Doc). + +%% @private + +sleep_fun(_, FieldName, undefined, _) when FieldName /= id -> null; + +sleep_fun(FieldType, _, FieldValue, _) +when FieldType =:= datetime and is_binary(FieldValue); + FieldType =:= date and is_binary(FieldValue) -> + iso8601:parse(FieldValue); + +sleep_fun(FieldType, _, FieldValue, _) +when FieldType =:= datetime; FieldType =:= date -> + case {FieldType, sumo_utils:is_datetime(FieldValue)} of + {datetime, true} -> date_util:datetime_to_epoch(FieldValue); + {date, true} -> date_util:datetime_to_epoch({FieldValue, {0, 0, 0}}); + _ -> date_util:datetime_to_epoch(iso8601:parse(FieldValue)) + end; + +sleep_fun(custom, _, FieldValue, FieldAttrs) -> + Type = sumo_utils:keyfind(type, FieldAttrs, custom), + sleep_custom(FieldValue, Type); + +sleep_fun(_, _, FieldValue, _) -> FieldValue. + +%% @private + +sleep_custom(FieldValue, FieldType) -> + case lists:member(FieldType, [term, tuple, map, list]) of + true -> base64:encode(term_to_binary(FieldValue)); + _ -> FieldValue + end. + +%% @private + +wakeup(Doc) -> sumo_utils:doc_transform(fun wakeup_fun/4, Doc). + +wakeup_fun(_, _, undefined, _) -> undefined; +wakeup_fun(_, _, <<"$nil">>, _) -> undefined; + +wakeup_fun(FieldType, _, FieldValue, _) +when FieldType =:= datetime; FieldType =:= date; is_binary(FieldValue) -> + iso8601:parse(FieldValue); + +wakeup_fun(FieldType, _, FieldValue, _) +when FieldType =:= datetime; + FieldType =:= date; + is_float(FieldValue) or is_integer(FieldValue) -> + date_util:timestamp_to_datetime(FieldValue); + +wakeup_fun(integer, _, FieldValue, _) when is_binary(FieldValue) -> + binary_to_integer(FieldValue); + +wakeup_fun(float, _, FieldValue, _) when is_binary(FieldValue) -> + binary_to_float(FieldValue); + +wakeup_fun(boolean, _, FieldValue, _) when is_binary(FieldValue) -> + binary_to_atom(FieldValue, utf8); + +wakeup_fun(custom, _, FieldValue, FieldAttrs) -> + Type = sumo_utils:keyfind(type, FieldAttrs, custom), + wakeup_custom(FieldValue, Type); + +wakeup_fun(_, _, FieldValue, _) -> FieldValue. + +%% @private + +wakeup_custom(null, _FieldType) -> null; + +wakeup_custom(FieldValue, FieldType) -> + case lists:member(FieldType, [term, tuple, map, list]) of + true -> binary_to_term(base64:decode(FieldValue)); + _ -> FieldValue + end. + + +-spec index_field(Doc, Key, Value, Type, Accumulator) -> + Accumulator + when Doc :: sumo_internal:doc(), + Key :: atom(), + Value :: any(), + Type :: atom(), + Accumulator :: list(). +index_field(_Doc, _Key, null, _, Acc) -> Acc; + +index_field(_Doc, Key, Value, integer, Acc) -> + lists:append(Acc, [{{integer_index, Key}, [Value]}]); + +index_field(_Doc, Key, Value, float, Acc) -> + lists:append(Acc, [{{binary_index, Key}, [float_to_binary(Value)]}]); + +index_field(_Doc, Key, Value, datetime, Acc) -> + lists:append(Acc, [{{binary_index, Key}, [float_to_binary(Value)]}]); + +index_field(_Doc, Key, Value, string, Acc) -> + lists:append(Acc, [{{binary_index, Key}, [Value]}]). + +build_index(Doc) -> + DocName = sumo_internal:doc_name(Doc), + Schema = sumo_internal:get_schema(DocName), + SchemaFields = sumo_internal:schema_fields(Schema), + Module = sumo_config:get_prop_value(DocName, module), + Index = + lists:foldl( + fun + (Field, Acc) -> + FieldType = sumo_internal:field_type(Field), + FieldName = sumo_internal:field_name(Field), + FieldValue = sumo_internal:get_field(FieldName, Doc), + %FieldAttrs = sumo_internal:field_attrs(Field), + Module:index_field( + maps:get(fields, Doc), + atom_to_list(FieldName), + FieldValue, + FieldType, + Acc + ) + end, + [], + SchemaFields + ), + logger:debug("Build index ~p", [Index]), + Index. + + +update_obj(Conn, Bucket, Id, Doc0, _Opts) -> + Doc = + maps:filter( + fun (_K, null) -> false; (_K, _V) -> true end, + maps:get(fields, Doc0) + ), + case fetch_obj(Conn, Bucket, sumo_utils:to_bin(Id), []) of + {ok, RiakObject} -> + riakc_obj:update_value(RiakObject, jsx:encode(Doc)), + MD1 = riakc_obj:get_update_metadata(RiakObject), + MD2 = riakc_obj:set_secondary_index(MD1, build_index(Doc0)), + Obj1 = riakc_obj:update_metadata(RiakObject, MD2), + riakc_pb_socket:put(Conn, Obj1); + + {error, notfound} -> + RiakObject = riakc_obj:new(Bucket, Id, jsx:encode(Doc)), + MD1 = riakc_obj:get_update_metadata(RiakObject), + MD2 = riakc_obj:set_secondary_index(MD1, build_index(Doc0)), + Obj1 = riakc_obj:update_metadata(RiakObject, MD2), + riakc_pb_socket:put(Conn, Obj1); + + {error, Error} -> {error, Error}; + undef -> {error, undef} + end. + + +%Module = sumo_config:get_prop_value(DocName, module), + +%% @private + +new_doc(Doc, _State) -> + DocName = sumo_internal:doc_name(Doc), + IdField = sumo_internal:id_field_name(DocName), + Id = + case sumo_internal:get_field(IdField, Doc) of + undefined -> throw({error, noid}); + Id0 -> sumo_utils:to_bin(Id0) + end, + {Id, sumo_internal:set_field(IdField, Id, Doc)}. + + +-spec create_schema(Schema, State) -> + Response + when Schema :: sumo_internal:schema(), + State :: state(), + Response :: sumo_store:result(state()). +create_schema(_Schema, State) -> {ok, State}. + +-spec count(DocName, State) -> + Response + when DocName :: sumo:schema_name(), + State :: state(), + Response :: sumo_store:result(non_neg_integer(), state()). +count(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> + Sum = fun (Kst, Acc) -> length(Kst) + Acc end, + case stream_keys(Conn, Bucket, Sum, 0) of + {ok, Count} -> {ok, Count, State}; + {_, _, _} -> {error, {error, count_failed}, State} + end. + + +-spec count_by(DocName, Conditions, State) -> + Response + when DocName :: sumo:schema_name(), + Conditions :: sumo:conditions(), + State :: state(), + Response :: sumo_store:result(non_neg_integer(), state()). +count_by(DocName, [], State) -> count(DocName, State); +count_by(_DocName, _Conditions, #{conn := _Conn} = _State) -> 0. + +%% @private + +stream_keys(Conn, Bucket, F, Acc) -> + {ok, Ref} = + riakc_pb_socket:get_index_eq( + Conn, + Bucket, + <<"$bucket">>, + <<"">>, + [{stream, true}] + ), + receive_stream(Ref, F, Acc). + + +%stream_index_eq(Conn, Bucket, F, Index, Key, Acc) -> +% {ok, Ref} = +% riakc_pb_socket:get_index_eq(Conn, Bucket, Index, Key, [{stream, true}]), +% receive_stream(Ref, F, Acc). +% +% +%stream_index_range(Conn, Bucket, F, Index, StartKey, EndKey, Acc) -> +% {ok, Ref} = +% riakc_pb_socket:get_index_range( +% Conn, +% Bucket, +% Index, +% StartKey, +% EndKey, +% [{stream, true}] +% ), +% receive_stream(Ref, F, Acc). + +%% @private + +receive_stream(Ref, F, Acc) -> + receive + {Ref, {_, Keys, _}} -> receive_stream(Ref, F, F(Keys, Acc)); + {Ref, {done, _Continuation = undefined}} -> {ok, Acc}; + Unexpected -> {error, {unexpected, Unexpected}, Acc} + after + 300000 -> {error, timeout, Acc} + end. diff --git a/test/nested_docs_SUITE.erl b/test/nested_docs_SUITE.erl index 69ebe23..ef9b1e9 100644 --- a/test/nested_docs_SUITE.erl +++ b/test/nested_docs_SUITE.erl @@ -1,21 +1,12 @@ -module(nested_docs_SUITE). %% CT --export([ - all/0, - init_per_suite/1, - end_per_suite/1, - init_per_testcase/2 -]). + +-export([all/0, init_per_suite/1, end_per_suite/1, init_per_testcase/2]). %% Test Cases --export([ - find_all/1, - find_by/1, - update/1, - delete_all/1, - delete/1 -]). + +-export([find_all/1, find_by/1, update/1, delete_all/1, delete/1]). -type config() :: term(). @@ -24,19 +15,20 @@ %%%============================================================================= -spec all() -> [atom()]. -all() -> - [find_all, find_by, update, delete_all, delete]. +all() -> [find_all, find_by, update, delete_all, delete]. -spec init_per_suite(config()) -> config(). init_per_suite(Config) -> {ok, _} = sumo_db_riak:start(), Config. + -spec init_per_testcase(atom(), config()) -> config(). init_per_testcase(_, Config) -> _ = init_store(), Config. + -spec end_per_suite(config()) -> config(). end_per_suite(Config) -> _ = sumo:delete_all(purchases), @@ -56,98 +48,99 @@ find_all(_Config) -> 9 = length(All2), ok. + -spec find_by(config()) -> ok. find_by(_Config) -> Results1 = sumo:find_by(purchases, [{currency, <<"USD">>}]), 2 = length(Results1), - - [ #{id := <<"ID1">>, - currency := <<"USD">>, - items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], + [ + #{ + id := <<"ID1">>, + currency := <<"USD">>, + items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], order_num := <<"O1">>, - ship_to := #{city := <<"city1">>, country := <<"US">>}, - bill_to := #{city := <<"city1">>, country := <<"US">>}, - total := 300}, - #{id := <<"ID2">>, - currency := <<"USD">>, - items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], + ship_to := #{city := <<"city1">>, country := <<"US">>}, + bill_to := #{city := <<"city1">>, country := <<"US">>}, + total := 300 + }, + #{ + id := <<"ID2">>, + currency := <<"USD">>, + items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], order_num := <<"O2">>, - ship_to := #{city := <<"city1">>, country := <<"US">>}, - bill_to := #{city := <<"city1">>, country := <<"US">>}, - total := 300} - ] = lists:sort( - fun(A, B) -> maps:get(id, A) =< maps:get(id, B) end, - Results1), - + ship_to := #{city := <<"city1">>, country := <<"US">>}, + bill_to := #{city := <<"city1">>, country := <<"US">>}, + total := 300 + } + ] = + lists:sort(fun (A, B) -> maps:get(id, A) =< maps:get(id, B) end, Results1), Results2 = sumo:find_by(purchases, [{currency, <<"EUR">>}]), 1 = length(Results2), - - [ #{id := <<"ID3">>, - currency := <<"EUR">>, - items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], + [ + #{ + id := <<"ID3">>, + currency := <<"EUR">>, + items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], order_num := <<"O3">>, - ship_to := #{city := <<"city2">>, country := <<"US">>}, - bill_to := #{city := <<"city1">>, country := <<"US">>}, - total := 300} + ship_to := #{city := <<"city2">>, country := <<"US">>}, + bill_to := #{city := <<"city1">>, country := <<"US">>}, + total := 300 + } ] = Results2, - PO1 = sumo:fetch(purchases, <<"ID1">>), - #{id := <<"ID1">>, - currency := <<"USD">>, - items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], + #{ + id := <<"ID1">>, + currency := <<"USD">>, + items := [#{part_num := <<"123">>}, #{part_num := <<"456">>}], order_num := <<"O1">>, - ship_to := #{city := <<"city1">>, country := <<"US">>}, - bill_to := #{city := <<"city1">>, country := <<"US">>}, - total := 300} = PO1, - + ship_to := #{city := <<"city1">>, country := <<"US">>}, + bill_to := #{city := <<"city1">>, country := <<"US">>}, + total := 300 + } = PO1, notfound = sumo:fetch(purchases, <<"ID123">>), - - Results3 = sumo:find_by( - purchases, [{'ship_to.city', <<"city2">>}]), + Results3 = sumo:find_by(purchases, [{'ship_to.city', <<"city2">>}]), 1 = length(Results3), Results2 = Results3, - - Results4 = sumo:find_by( - purchases, - [{'ship_to.city', <<"city2">>}, {currency, <<"USD">>}]), + Results4 = + sumo:find_by( + purchases, + [{'ship_to.city', <<"city2">>}, {currency, <<"USD">>}] + ), 0 = length(Results4), - - Results5 = sumo:find_by( - purchases, - [{'ship_to.city', <<"city1">>}, {currency, <<"USD">>}]), + Results5 = + sumo:find_by( + purchases, + [{'ship_to.city', <<"city1">>}, {currency, <<"USD">>}] + ), 2 = length(Results5), - ok. + -spec update(config()) -> ok. update(_Config) -> PO1 = sumo:fetch(purchases, <<"ID1">>), - PO1x = sumo_test_purchase_order:order_num(PO1, <<"0001">>), sumo:persist(purchases, PO1x), - PO1x = sumo:fetch(purchases, <<"ID1">>), - PO1y = sumo_test_purchase_order:order_num(PO1x, <<"00011">>), sumo:persist(purchases, PO1y), - PO1y = sumo:fetch(purchases, <<"ID1">>), - ok. + -spec delete_all(config()) -> ok. delete_all(_Config) -> sumo:delete_all(purchases), [] = sumo:find_all(purchases), ok. + -spec delete(config()) -> ok. delete(_Config) -> %% delete_by - 2 = sumo:delete_by(purchases, [{currency, <<"USD">>}]), + 2 = sumo:delete_by(purchases, [{currency, <<"USD">>}]), sync_timeout(9), [] = sumo:find_by(purchases, [{currency, <<"USD">>}]), - %% delete sumo:delete(purchases, <<"ID3">>), 8 = length(sumo:find_all(purchases)), @@ -161,50 +154,166 @@ init_store() -> sumo:create_schema(purchases), sumo:delete_all(purchases), sync_timeout(0), - - Addr1 = sumo_test_purchase_order:new_address( - <<"l1">>, <<"l2">>, <<"city1">>, <<"s">>, <<"zip">>, <<"US">>), - Addr2 = sumo_test_purchase_order:new_address( - <<"l1">>, <<"l2">>, <<"city2">>, <<"s">>, <<"zip">>, <<"US">>), - Addr3 = sumo_test_purchase_order:new_address( - <<"l1">>, <<"l2">>, <<"city3">>, <<"s">>, <<"zip">>, <<"US">>), - + Addr1 = + sumo_test_purchase_order:new_address( + <<"l1">>, + <<"l2">>, + <<"city1">>, + <<"s">>, + <<"zip">>, + <<"US">> + ), + Addr2 = + sumo_test_purchase_order:new_address( + <<"l1">>, + <<"l2">>, + <<"city2">>, + <<"s">>, + <<"zip">>, + <<"US">> + ), + Addr3 = + sumo_test_purchase_order:new_address( + <<"l1">>, + <<"l2">>, + <<"city3">>, + <<"s">>, + <<"zip">>, + <<"US">> + ), Item1 = sumo_test_purchase_order:new_item(<<"123">>, <<"p1">>, 1, 100, 100), Item2 = sumo_test_purchase_order:new_item(<<"456">>, <<"p2">>, 2, 100, 200), Items = [Item1, Item2], - Date = calendar:universal_time(), - - PO1 = sumo_test_purchase_order:new( - <<"ID1">>, <<"O1">>, Date, Addr1, Addr1, Items, <<"USD">>, 300), - PO2 = sumo_test_purchase_order:new( - <<"ID2">>, <<"O2">>, Date, Addr1, Addr1, Items, <<"USD">>, 300), - PO3 = sumo_test_purchase_order:new( - <<"ID3">>, <<"O3">>, Date, Addr2, Addr1, Items, <<"EUR">>, 300), - PO4 = sumo_test_purchase_order:new( - <<"ID4">>, <<"O4">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - PO5 = sumo_test_purchase_order:new( - <<"ID5">>, <<"O5">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - PO6 = sumo_test_purchase_order:new( - <<"ID6">>, <<"O6">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - PO7 = sumo_test_purchase_order:new( - <<"ID7">>, <<"O7">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - PO8 = sumo_test_purchase_order:new( - <<"ID8">>, <<"O8">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - PO9 = sumo_test_purchase_order:new( - <<"ID9">>, <<"O9">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - P10 = sumo_test_purchase_order:new( - <<"ID10">>, <<"10">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - P11 = sumo_test_purchase_order:new( - <<"ID11">>, <<"11">>, Date, Addr3, Addr3, Items, <<"ARG">>, 400), - - lists:foreach(fun(Doc) -> - sumo:persist(purchases, Doc) - end, [PO1, PO2, PO3, PO4, PO5, PO6, PO7, PO8, PO9, P10, P11]), - + PO1 = + sumo_test_purchase_order:new( + <<"ID1">>, + <<"O1">>, + Date, + Addr1, + Addr1, + Items, + <<"USD">>, + 300 + ), + PO2 = + sumo_test_purchase_order:new( + <<"ID2">>, + <<"O2">>, + Date, + Addr1, + Addr1, + Items, + <<"USD">>, + 300 + ), + PO3 = + sumo_test_purchase_order:new( + <<"ID3">>, + <<"O3">>, + Date, + Addr2, + Addr1, + Items, + <<"EUR">>, + 300 + ), + PO4 = + sumo_test_purchase_order:new( + <<"ID4">>, + <<"O4">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + PO5 = + sumo_test_purchase_order:new( + <<"ID5">>, + <<"O5">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + PO6 = + sumo_test_purchase_order:new( + <<"ID6">>, + <<"O6">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + PO7 = + sumo_test_purchase_order:new( + <<"ID7">>, + <<"O7">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + PO8 = + sumo_test_purchase_order:new( + <<"ID8">>, + <<"O8">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + PO9 = + sumo_test_purchase_order:new( + <<"ID9">>, + <<"O9">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + P10 = + sumo_test_purchase_order:new( + <<"ID10">>, + <<"10">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + P11 = + sumo_test_purchase_order:new( + <<"ID11">>, + <<"11">>, + Date, + Addr3, + Addr3, + Items, + <<"ARG">>, + 400 + ), + lists:foreach( + fun (Doc) -> sumo:persist(purchases, Doc) end, + [PO1, PO2, PO3, PO4, PO5, PO6, PO7, PO8, PO9, P10, P11] + ), sync_timeout(11), ok. + sync_timeout(Len) -> timer:sleep(5000), Len = length(sumo:find_by(purchases, [])). diff --git a/test/sumo_test_user.erl b/test/sumo_test_user.erl index 76cd23f..227d8ab 100644 --- a/test/sumo_test_user.erl +++ b/test/sumo_test_user.erl @@ -2,17 +2,14 @@ -behaviour(sumo_doc). --type user() :: #{id => binary(), - attributes => list()}. +-type user() :: #{id => binary(), attributes => list()}. %% API --export([ - new/2, - id/1, - attributes/1 -]). + +-export([new/2, id/1, attributes/1]). %% sumo_doc callbacks + -export([sumo_schema/0, sumo_wakeup/1, sumo_sleep/1]). %%%============================================================================= @@ -21,32 +18,29 @@ -spec sumo_schema() -> sumo:schema(). sumo_schema() -> - sumo:new_schema(users, [ - sumo:new_field(id, string, [id, not_null]), - sumo:new_field(attributes, custom, [{type, list}]) - ]). + sumo:new_schema( + users, + [ + sumo:new_field(id, string, [id, not_null]), + sumo:new_field(attributes, custom, [{type, list}]) + ] + ). -spec sumo_sleep(user()) -> sumo:model(). -sumo_sleep(User) -> - User. +sumo_sleep(User) -> User. -spec sumo_wakeup(sumo:model()) -> user(). -sumo_wakeup(Doc) -> - Doc. +sumo_wakeup(Doc) -> Doc. %%%============================================================================= %%% API %%%============================================================================= -spec new(binary(), list()) -> user(). -new(Id, Attributes) -> - #{id => Id, - attributes => Attributes}. +new(Id, Attributes) -> #{id => Id, attributes => Attributes}. -spec id(user()) -> string(). -id(#{id := Val}) -> - Val. +id(#{id := Val}) -> Val. -spec attributes(user()) -> list(). -attributes(#{attributes := Val}) -> - Val. +attributes(#{attributes := Val}) -> Val. diff --git a/test/users_SUITE.erl b/test/users_SUITE.erl index 5935fa1..1f89b48 100644 --- a/test/users_SUITE.erl +++ b/test/users_SUITE.erl @@ -1,12 +1,8 @@ -module(users_SUITE). %% CT --export([ - all/0, - init_per_suite/1, - end_per_suite/1, - find/1 -]). + +-export([all/0, init_per_suite/1, end_per_suite/1, find/1]). -type config() :: [{atom(), term()}]. @@ -15,20 +11,21 @@ %%%============================================================================= -spec all() -> [atom()]. -all() -> - [find]. +all() -> [find]. -spec init_per_suite(config()) -> config(). init_per_suite(Config) -> {ok, _} = application:ensure_all_started(sumo_db_riak), Config. + -spec end_per_suite(config()) -> config(). end_per_suite(Config) -> sumo:delete_all(users), ok = sumo_db_riak:stop(), Config. + -spec find(config()) -> ok. find(_Config) -> Id1 = <<"first">>, @@ -37,7 +34,6 @@ find(_Config) -> User2 = sumo_test_user:new(Id2, ["A", "B"]), sumo:persist(users, User1), sumo:persist(users, User2), - User1 = sumo:fetch(users, Id1), User2 = sumo:fetch(users, Id2), ok.