From 4abddf919ca66f5b51e16b457c7b562bd20b3fdc Mon Sep 17 00:00:00 2001 From: brkolla Date: Fri, 28 Apr 2017 10:00:32 -0400 Subject: [PATCH] Stop search indexer when it is deleted. Allow dreyfus to listen to ddoc_updated events and check if the index is deleted as a result of update to it's design document. If the index is deleted then it will stop the index if waitlist is empty, if the waitlist is not empty then it will wait for a minute before shutting down. BugzId:85718 --- src/dreyfus_index.erl | 53 +++++ src/dreyfus_index_manager.erl | 40 +++- test/.DS_Store | Bin 0 -> 6148 bytes test/search_index_ddoc_update_tests.erl | 277 ++++++++++++++++++++++++ 4 files changed, 363 insertions(+), 7 deletions(-) create mode 100644 test/.DS_Store create mode 100644 test/search_index_ddoc_update_tests.erl diff --git a/src/dreyfus_index.erl b/src/dreyfus_index.erl index 56bc499..be213e1 100644 --- a/src/dreyfus_index.erl +++ b/src/dreyfus_index.erl @@ -40,6 +40,8 @@ waiting_list=[] }). +-define(INDEX_SHUTDOWN_DELAY, index_shutdown_delay()). + % exported for callback. -export([search_int/2, group1_int/2, group2_int/2, info_int/1]). @@ -157,9 +159,28 @@ handle_call(info, _From, State) -> % obsolete Reply = info_int(State#state.index_pid), {reply, Reply, State}. +handle_cast({ddoc_updated, DDocResult}, State) -> + #state{index=#index{sig=CurrentSig}, waiting_list = WaitList} = State, + case check_if_index_is_deleted(DDocResult, CurrentSig) of + true -> + case WaitList of + [] -> + self() ! index_deleted; + [_Index] -> + erlang:send_after(?INDEX_SHUTDOWN_DELAY, self(), index_deleted) + end + end, + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. +handle_info(index_deleted, State) -> + #state{index=Index} = State, + couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document", + [index_name(Index)]), + {stop, normal, State}; + handle_info({'EXIT', FromPid, {updated, NewSeq}}, #state{ index=Index0, @@ -222,6 +243,35 @@ code_change(_OldVsn, State, _Extra) -> % private functions. +get_index_with_sig(#doc{body={Fields}}=Doc, Sig) -> + RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}), + case RawIndexes of + {IndexList} when is_list(IndexList) -> + {IndexNames, _} = lists:unzip(IndexList), + lists:flatmap( + fun(IndexName) -> + case design_doc_to_index(Doc, IndexName) of + {ok, #index{sig=Sig_New}} when Sig_New =/= Sig -> + []; + {ok, #index{sig=Sig_New}=Index} when Sig_New =:= Sig -> + [Index] + end + end, IndexNames) + end. + +check_if_index_is_deleted(DDocResult, CurrentSig) -> + case DDocResult of + {not_found, deleted} -> + true; + {ok, DDoc} -> + case get_index_with_sig(DDoc, CurrentSig) of + [] -> + true; + [_Index] -> + false + end + end. + open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) -> Path = <>, case clouseau_rpc:open_index(self(), Path, Analyzer) of @@ -367,5 +417,8 @@ group2_int(Pid, QueryArgs0) -> clouseau_rpc:group2(Pid, Props) end. +index_shutdown_delay() -> + config:get_integer("dreyfus", "index_shutdown_delay", 60000). % 1 minute + info_int(Pid) -> clouseau_rpc:info(Pid). diff --git a/src/dreyfus_index_manager.erl b/src/dreyfus_index_manager.erl index 2575294..ebc4e19 100644 --- a/src/dreyfus_index_manager.erl +++ b/src/dreyfus_index_manager.erl @@ -21,6 +21,7 @@ -define(BY_SIG, dreyfus_by_sig). -define(BY_PID, dreyfus_by_pid). +-define(BY_DB, dreyfus_by_db). % public api. -export([start_link/0, get_index/2, get_disk_size/2]). @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) -> % gen_server functions. init([]) -> - ets:new(?BY_SIG, [set, private, named_table]), + ets:new(?BY_SIG, [set, protected, named_table]), ets:new(?BY_PID, [set, private, named_table]), + ets:new(?BY_DB, [bag, protected, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), process_flag(trap_exit, true), {ok, nil}. @@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) -> Reply = clouseau_rpc:disk_size(Path), {reply, Reply, State}; -handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> +handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), [{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}), [gen_server:reply(From, {ok, NewPid}) || From <- WaitList], ets:delete(?BY_PID, OpenerPid), - add_to_ets(NewPid, DbName, Sig), + add_to_ets(NewPid, DbName, DDocId, Sig), {reply, ok, State}; handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) -> @@ -121,13 +123,30 @@ handle_db_event(DbName, created, _St) -> handle_db_event(DbName, deleted, _St) -> gen_server:cast(?MODULE, {cleanup, DbName}), {ok, nil}; +handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) -> + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + couch_log:info("Received ddoc_updated event for ~s",[DDocId]), + DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))], + lists:foreach(fun(DbShard) -> + lists:foreach(fun({_DbShard, {_DDocId, Sig}}) -> + case ets:lookup(?BY_SIG, {DbShard, Sig}) of + [{_, IndexPid}] -> + gen_server:cast(IndexPid, {ddoc_updated, DDocResult}); + [] -> + ok + end + end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})) + end, DbShards), + {ok, nil}; handle_db_event(_DbName, _Event, _St) -> {ok, nil}. -new_index(DbName, #index{sig=Sig}=Index) -> +new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) -> case (catch dreyfus_index:start_link(DbName, Index)) of {ok, NewPid} -> - Msg = {open_ok, DbName, Sig, NewPid}, + Msg = {open_ok, DbName, DDocId, Sig, NewPid}, ok = gen_server:call(?MODULE, Msg, infinity), unlink(NewPid); Error -> @@ -135,11 +154,18 @@ new_index(DbName, #index{sig=Sig}=Index) -> ok = gen_server:call(?MODULE, Msg, infinity) end. -add_to_ets(Pid, DbName, Sig) -> +add_to_ets(Pid, DbName, DDocId, Sig) -> true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}). + true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), + true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). delete_from_ets(Pid, DbName, Sig) -> + case ets:match_object(?BY_DB, {DbName, {'$1', Sig}}) of + [{DbName, {DDocId, Sig}}] -> + true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}); + _Else -> + true + end, true = ets:delete(?BY_PID, Pid), true = ets:delete(?BY_SIG, {DbName, Sig}). diff --git a/test/.DS_Store b/test/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0>, <<"_design/searchddoc">>}, + {<<"indexes">>, {[ + {<<"other">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"blah\\\", doc.blah);}}">>} + ]}}, + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} +]}). + +-define(USER, "admin"). +-define(PASS, "pass"). +-define(AUTH, {basic_auth, {?USER, ?PASS}}). + + +start() -> + Ctx = test_util:start_couch([chttpd, dreyfus]), + ok = config:set("admins", ?USER, ?PASS, _Persist=false), + Ctx. + +setup(PortType) -> + ok = meck:new(mochiweb_socket, [passthrough]), + ok = meck:expect(mochiweb_socket, recv, fun mochiweb_socket_recv/3), + + DbName = ?tempdb(), + ok = create_db(PortType, DbName), + fake_clouseau(), + fake_dreyfus_index(), + + Host = host_url(PortType), + upload_ddoc(Host, ?b2l(DbName), ?DDOC), + {Host, ?b2l(DbName)}. + +teardown(Ctx) -> + ok = config:delete("admins", ?USER, _Persist=false), + (catch meck:unload(clouseau_rpc)), + (catch meck:unload(dreyfus_index)), + test_util:stop_couch(Ctx). + +teardown(PortType, {_Host, DbName}) -> + (catch meck:unload(mochiweb_socket)), + (catch meck:unload(clouseau_rpc)), + (catch meck:unload(dreyfus_index)), + delete_db(PortType, ?l2b(DbName)), + ok. + +search_test_() -> + { + "Check search index closes after delete", + { + setup, + fun start/0, fun teardown/1, + [ + make_test_case(clustered, [fun should_stop_indexing_after_deleting_index/2]), + make_test_case(clustered, [fun should_stop_indexing_after_deleting_ddoc/2]) + ] + } + }. + +fake_clouseau() -> + ok = meck:new([clouseau_rpc], [non_strict]), + ok = meck:expect(clouseau_rpc, open_index, ['_', '_', '_'], {ok, self()}), + ok = meck:expect(clouseau_rpc, await, ['_', '_'], ok), + ok = meck:expect(clouseau_rpc, get_update_seq, ['_'], {ok, 10}), + ok = meck:expect(clouseau_rpc, info, ['_'], {ok, [{disk_size, 10}, + {doc_count, 10}, + {doc_del_count, 0}, + {pending_seq, 11}, + {committed_seq, 10}]}), + ok = meck:expect(clouseau_rpc, search, ['_', '_'], {ok, [ + {update_seq, 10}, + {total_hits, 0}, + {hits, []} + ]}), + ok = meck:expect(clouseau_rpc, update, ['_', '_', '_'], ok), + ok = meck:expect(clouseau_rpc, commit, ['_', '_'], ok), + ok = meck:expect(clouseau_rpc, search, ['_', '_', '_', '_', '_', '_'], {ok, [ + {update_seq, 10}, + {total_hits, 0}, + {hits, []} + ]}). + +fake_dreyfus_index() -> + ok = meck:new([dreyfus_index], [passthrough]), + ok = meck:expect(dreyfus_index, await, ['_', '_'], ok), + ok = meck:expect(dreyfus_index, search, ['_', '_'], {ok, #top_docs{ + update_seq = 10, + total_hits = 0, + hits = [] + }}). + +make_test_case(Mod, Funs) -> + { + lists:flatten(io_lib:format("~s", [Mod])), + {foreachx, fun setup/1, fun teardown/2, [{Mod, Fun} || Fun <- Funs]} + }. + +should_stop_indexing_after_deleting_index(PortType, {Host, DbName}) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(PortType, ?l2b(DbName), <<"doc_id">>, {[]}), + ReqUrl = Host ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + Sig = ensure_index_is_opened(DbName, DDocId, "other"), + + % Upload the new DDoc which will delete the search index named "other". And verify that the index will be closed + UpdatedDDocJsonObj = updated_ddoc_json_obj(DbName, ?l2b(DDocId)), + upload_ddoc(Host, DbName, UpdatedDDocJsonObj), + ensure_index_is_closed(DbName, "_design/searchddoc", "other", Sig) + end)}. + +should_stop_indexing_after_deleting_ddoc(PortType, {Host, DbName}) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(PortType, ?l2b(DbName), <<"doc_id">>, {[]}), + ReqUrl = Host ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + Sig = ensure_index_is_opened(DbName, DDocId, "other"), + + % Delete the design doc and verify that the index will be closed + delete_ddoc(Host, DbName, ?l2b(DDocId)), + ensure_index_is_closed(DbName, "_design/searchddoc", "other", Sig) + end)}. + +updated_ddoc_json_obj(DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + % {DDocJsonObj} = ?DDOC_UPDATE, + % Rev = {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}, + % DDocUpdated = {[Rev] ++ DDocJsonObj}, + % DDocUpdated. + UpdatedDDocJsonObj = + {[ + {<<"_id">>, <<"_design/searchddoc">>}, + {<<"_rev">>, ?b2l(couch_doc:rev_to_str(DDoc#doc.revs))}, + {<<"indexes">>, {[ + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} + ]}, + UpdatedDDocJsonObj. + +ensure_index_is_opened(DbName, DDocId, IndexName) -> + IndexWithName = getIndex(DbName, ?l2b(DDocId), ?l2b(IndexName)), + ?assertMatch(#index{sig=_Sig}, IndexWithName), + [#index{sig=Sig}] = [IndexWithName], + IndexPidsFromEts = get_index_by_sig_from_ets(?l2b(DDocId), Sig), + ?assert(length(IndexPidsFromEts) > 0), + Sig. + +ensure_index_is_closed(DbName, DDocId, IndexName, Sig) -> + NotAnIndex = getIndex(DbName, ?l2b(DDocId), ?l2b(IndexName)), + ?assertEqual(ok, NotAnIndex), + (catch wait_for_indices_to_stop(?l2b(DbName))), + IndexPidsFromEts = get_index_by_sig_from_ets(?l2b(DDocId), Sig), + ?assertEqual(0, length(IndexPidsFromEts)), + ok. + +getIndex(DbName, DDocId, IndexName) -> + case fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]) of + {ok, DDoc} -> + case dreyfus_index:design_doc_to_index(DDoc, IndexName) of + {ok, Index} -> + Index; + _ -> + ok + end; + _ -> + ok + end. + +get_index_by_sig_from_ets(DDocId, Sig) -> + Indexes = ets:match_object( + dreyfus_by_db, {'$1', {DDocId, Sig}}), + lists:foldl(fun({DbName, {_DDocId, _Sig}}, Acc) -> + case ets:lookup(dreyfus_by_sig, {DbName, Sig}) of + [{_, Pid}] -> [Pid| Acc]; + _ -> Acc + end + end, [], Indexes). + +wait_for_indices_to_stop(DbName) -> + DbDir = config:get("couchdb", "database_dir", "."), + WaitFun = fun() -> + filelib:fold_files(DbDir, <<".*", DbName/binary, "\\.[0-9]+.*">>, + true, fun(_F, _A) -> wait end, ok) + end, + ok = test_util:wait(WaitFun). + +create_doc(clustered, DbName, Id, Body) -> + JsonDoc = couch_util:json_apply_field({<<"_id">>, Id}, Body), + Doc = couch_doc:from_json_obj(JsonDoc), + {ok, _} = fabric:update_docs(DbName, [Doc], [?ADMIN_CTX]), + ok. + +create_db(clustered, DbName) -> + {ok, Status, _, _} = test_request:put(db_url(DbName), [?AUTH], ""), + assert_success(create_db, Status), + ok. + +delete_db(clustered, DbName) -> + {ok, Status, _, _} = test_request:delete(db_url(DbName), [?AUTH]), + assert_success(delete_db, Status), + ok. + +assert_success(create_db, Status) -> + ?assert(lists:member(Status, [201, 202])); +assert_success(delete_db, Status) -> + ?assert(lists:member(Status, [200, 202])). + +host_url(PortType) -> + "http://" ++ bind_address(PortType) ++ ":" ++ port(PortType). + +bind_address(PortType) -> + config:get(section(PortType), "bind_address", "127.0.0.1"). + +section(clustered) -> "chttpd". + +db_url(DbName) when is_binary(DbName) -> + db_url(binary_to_list(DbName)); +db_url(DbName) when is_list(DbName) -> + host_url(clustered) ++ "/" ++ DbName. + +port(clustered) -> + integer_to_list(mochiweb_socket_server:get(chttpd, port)). + +upload_ddoc(Host, DbName, DDocJson) -> + Url = Host ++ "/" ++ DbName ++ "/_design/searchddoc", + Body = couch_util:json_encode(DDocJson), + {ok, 201, _Resp, _Body} = test_request:put(Url, [?AUTH], Body), + ok. + +delete_ddoc(Host, DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + Url = Host ++ "/" ++ DbName ++ "/_design/searchddoc" ++ "?rev=" ++ ?b2l(couch_doc:rev_to_str(DDoc#doc.revs)), + {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), + ok. + +mochiweb_socket_recv(Sock, Len, Timeout) -> + case meck:passthrough([Sock, Len, Timeout]) of + {ok, <<"{truncated}">>} -> + {error, closed}; + {ok, Data} -> + {ok, Data}; + Else -> + Else + end.