From ce301242e0193cbea99e0e5ee29c4eb121cc9337 Mon Sep 17 00:00:00 2001 From: brkolla Date: Fri, 28 Apr 2017 10:00:32 -0400 Subject: [PATCH] Stop search indexer when it is deleted. Allow dreyfus to listen to ddoc_updated events and check if the index is deleted as a result of update to it's design document. If the index is deleted then it will stop the index. If there are any pids waiting after 30 seconds, it will send a message 'ddoc_updates' before it stops the index. BugzId:85718 --- src/dreyfus_index.erl | 72 ++++- src/dreyfus_index_manager.erl | 40 ++- test/.DS_Store | Bin 0 -> 6148 bytes test/search_index_ddoc_update_tests.erl | 342 ++++++++++++++++++++++++ 4 files changed, 445 insertions(+), 9 deletions(-) create mode 100644 test/.DS_Store create mode 100644 test/search_index_ddoc_update_tests.erl diff --git a/src/dreyfus_index.erl b/src/dreyfus_index.erl index 56bc499..787c9c9 100644 --- a/src/dreyfus_index.erl +++ b/src/dreyfus_index.erl @@ -157,9 +157,26 @@ handle_call(info, _From, State) -> % obsolete Reply = info_int(State#state.index_pid), {reply, Reply, State}. +handle_cast({ddoc_updated, DDocResult}, #state{} = State) -> + #index{sig = Sig, dbname = DbName} = State#state.index, + case check_if_index_is_deleted(DbName, DDocResult, Sig) of + true -> + % The intention of this arbitrary delay is to avoid the errors caused by deleting an index while it's being queried. Currently there is no easy way to identify the pids that are waiting for indexing vs the pids that are actively querying, hence we chose this approach to shutdown the index after the delay. + erlang:send_after(index_shutdown_delay(), self(), index_deleted), + {noreply, State}; + false -> + {noreply, State} + end; + handle_cast(_Msg, State) -> {noreply, State}. +handle_info(index_deleted, State) -> + #state{index=Index} = State, + couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document", + [index_name(Index)]), + {stop, {shutdown, ddoc_updated}, State}; + handle_info({'EXIT', FromPid, {updated, NewSeq}}, #state{ index=Index0, @@ -214,13 +231,61 @@ handle_info({'DOWN',_,_,Pid,Reason}, #state{ [gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList], {stop, normal, State}. -terminate(_Reason, _State) -> - ok. +terminate(Reason, State) -> + case Reason of + {shutdown, ddoc_updated} -> + send_all(State#state.waiting_list, ddoc_updated); + _ -> + ok + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. % private functions. +send_all(Waiters, Reply) -> + [gen_server:reply(From, Reply) || {From, _} <- Waiters]. + +check_if_index_is_deleted(DbName, DDocResult, CurrentSig) -> + case DDocResult of + {not_found, deleted} -> + IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig); + {ok, DDoc} -> + case indices_matching_sig({DDoc, CurrentSig}) of + [] -> + % check if index is referred from other ddocs + IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig); + IndicesWithSig -> + IndicesWithSig + end + end, + + case IndicesWithSig of + [] -> + true; + [_H | _T] -> + false + end. + +indices_with_sig_from_other_ddocs(DbName, CurrentSig) -> + {ok, DesignDocs} = fabric:design_docs(mem3:dbname(DbName)), + ActiveSigs = lists:usort(lists:flatmap( + fun indices_matching_sig/1, + [{couch_doc:from_json_obj(DD), CurrentSig} || DD <- DesignDocs])), + ActiveSigs. + +indices_matching_sig({#doc{body={Fields}}=Doc, CurrentSig}) -> + {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), + {IndexNames, _} = lists:unzip(RawIndexes), + lists:filter(fun check_if_index_matches_sig/1, [{Doc, IndexName, CurrentSig} || IndexName <- IndexNames]). + +check_if_index_matches_sig({Doc, IndexName, Sig}) -> + case design_doc_to_index(Doc, IndexName) of + {ok, #index{sig=Sig_New}} when Sig_New =/= Sig -> + false; + {ok, #index{sig=Sig_New}} when Sig_New =:= Sig -> + true + end. open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) -> Path = <>, @@ -367,5 +432,8 @@ group2_int(Pid, QueryArgs0) -> clouseau_rpc:group2(Pid, Props) end. +index_shutdown_delay() -> + config:get_integer("dreyfus", "index_shutdown_delay", 30000). % 30 seconds + info_int(Pid) -> clouseau_rpc:info(Pid). diff --git a/src/dreyfus_index_manager.erl b/src/dreyfus_index_manager.erl index 2575294..99b496f 100644 --- a/src/dreyfus_index_manager.erl +++ b/src/dreyfus_index_manager.erl @@ -21,6 +21,7 @@ -define(BY_SIG, dreyfus_by_sig). -define(BY_PID, dreyfus_by_pid). +-define(BY_DB, dreyfus_by_db). % public api. -export([start_link/0, get_index/2, get_disk_size/2]). @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) -> % gen_server functions. init([]) -> - ets:new(?BY_SIG, [set, private, named_table]), + ets:new(?BY_SIG, [set, protected, named_table]), ets:new(?BY_PID, [set, private, named_table]), + ets:new(?BY_DB, [bag, protected, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), process_flag(trap_exit, true), {ok, nil}. @@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) -> Reply = clouseau_rpc:disk_size(Path), {reply, Reply, State}; -handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> +handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), [{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}), [gen_server:reply(From, {ok, NewPid}) || From <- WaitList], ets:delete(?BY_PID, OpenerPid), - add_to_ets(NewPid, DbName, Sig), + add_to_ets(NewPid, DbName, DDocId, Sig), {reply, ok, State}; handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) -> @@ -121,13 +123,30 @@ handle_db_event(DbName, created, _St) -> handle_db_event(DbName, deleted, _St) -> gen_server:cast(?MODULE, {cleanup, DbName}), {ok, nil}; +handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) -> + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + couch_log:info("Received ddoc_updated event for ~s",[DDocId]), + DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))], + lists:foreach(fun(DbShard) -> + lists:foreach(fun({_DbShard, {_DDocId, Sig}}) -> + case ets:lookup(?BY_SIG, {DbShard, Sig}) of + [{_, IndexPid}] -> + gen_server:cast(IndexPid, {ddoc_updated, DDocResult}); + [] -> + ok + end + end, ets:match_object(?BY_DB, {DbShard, {DDocId, '_'}})) + end, DbShards), + {ok, nil}; handle_db_event(_DbName, _Event, _St) -> {ok, nil}. -new_index(DbName, #index{sig=Sig}=Index) -> +new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) -> case (catch dreyfus_index:start_link(DbName, Index)) of {ok, NewPid} -> - Msg = {open_ok, DbName, Sig, NewPid}, + Msg = {open_ok, DbName, DDocId, Sig, NewPid}, ok = gen_server:call(?MODULE, Msg, infinity), unlink(NewPid); Error -> @@ -135,11 +154,18 @@ new_index(DbName, #index{sig=Sig}=Index) -> ok = gen_server:call(?MODULE, Msg, infinity) end. -add_to_ets(Pid, DbName, Sig) -> +add_to_ets(Pid, DbName, DDocId, Sig) -> true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}). + true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), + true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). delete_from_ets(Pid, DbName, Sig) -> + case ets:match_object(?BY_DB, {DbName, {'_', Sig}}) of + [{DbName, {DDocId, Sig}}] -> + true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}); + _Else -> + true + end, true = ets:delete(?BY_PID, Pid), true = ets:delete(?BY_SIG, {DbName, Sig}). diff --git a/test/.DS_Store b/test/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0>, <<"_design/searchddoc">>}, + {<<"indexes">>, {[ + {<<"other">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"blah\\\", doc.blah);}}">>} + ]}}, + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} +]}). + +-define(USER, "admin"). +-define(PASS, "pass"). +-define(AUTH, {basic_auth, {?USER, ?PASS}}). + + +start() -> + Ctx = test_util:start_couch([chttpd, dreyfus]), + ok = config:set("admins", ?USER, ?PASS, _Persist=false), + Ctx. + +setup() -> + DbName = ?tempdb(), + ok = create_db(DbName), + + ok = meck:new(mochiweb_socket, [passthrough]), + fake_clouseau(), + fake_dreyfus_index(), + + upload_ddoc(?b2l(DbName), ?DDOC), + ?b2l(DbName). + +teardown(DbName) -> + (catch meck:unload(mochiweb_socket)), + (catch meck:unload(clouseau_rpc)), + (catch meck:unload(dreyfus_index)), + delete_db(?l2b(DbName)), + ok. + +search_test_() -> + { + "Check search index closes after delete", + { + setup, + fun start/0, fun test_util:stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_stop_index_after_deleting_index/1, + fun should_stop_index_after_deleting_ddoc/1, + fun should_not_stop_index_after_adding_new_index/1, + fun should_not_stop_index_if_index_referred_from_other_ddoc/1 + ] + } + } + }. + +fake_clouseau() -> + ok = meck:new([clouseau_rpc], [non_strict]), + ok = meck:expect(clouseau_rpc, open_index, ['_', '_', '_'], {ok, self()}), + ok = meck:expect(clouseau_rpc, await, ['_', '_'], ok), + ok = meck:expect(clouseau_rpc, get_update_seq, ['_'], {ok, 10}), + ok = meck:expect(clouseau_rpc, info, ['_'], {ok, [{disk_size, 10}, + {doc_count, 10}, + {doc_del_count, 0}, + {pending_seq, 11}, + {committed_seq, 10}]}), + ok = meck:expect(clouseau_rpc, search, ['_', '_'], {ok, [ + {update_seq, 10}, + {total_hits, 0}, + {hits, []} + ]}), + ok = meck:expect(clouseau_rpc, update, ['_', '_', '_'], ok), + ok = meck:expect(clouseau_rpc, commit, ['_', '_'], ok), + ok = meck:expect(clouseau_rpc, search, ['_', '_', '_', '_', '_', '_'], {ok, [ + {update_seq, 10}, + {total_hits, 0}, + {hits, []} + ]}). + +fake_dreyfus_index() -> + ok = meck:new([dreyfus_index], [passthrough]), + ok = meck:expect(dreyfus_index, await, ['_', '_'], ok), + ok = meck:expect(dreyfus_index, search, ['_', '_'], {ok, #top_docs{ + update_seq = 10, + total_hits = 0, + hits = [] + }}). + +should_stop_index_after_deleting_index(DbName) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(DbName, <<"doc_id">>, {[]}), + ReqUrl = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + IndexPids = ensure_index_is_opened(DbName, DDocId, "other"), + + % Upload the new DDoc which will delete the search index named "other". And verify that the index will be closed + UpdatedDDocJsonObj = ddoc_delete_index_json_obj(DbName, ?l2b(DDocId)), + upload_ddoc(DbName, UpdatedDDocJsonObj), + ensure_index_is_closed(DbName, "_design/searchddoc", "other", IndexPids) + end)}. + +should_stop_index_after_deleting_ddoc(DbName) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(DbName, <<"doc_id">>, {[]}), + ReqUrl = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + IndexPids = ensure_index_is_opened(DbName, DDocId, "other"), + + % Delete the design doc and verify that the index will be closed + delete_ddoc(DbName, ?l2b(DDocId)), + ensure_index_is_closed(DbName, "_design/searchddoc", "other", IndexPids) + end)}. + +should_not_stop_index_after_adding_new_index(DbName) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(DbName, <<"doc_id">>, {[]}), + ReqUrl = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + ensure_index_is_opened(DbName, DDocId, "other"), + + % Upload the new DDoc which will delete the search index named "other". And verify that the index will be closed + UpdatedDDocJsonObj = ddoc_add_index_json_obj(DbName, ?l2b(DDocId)), + upload_ddoc(DbName, UpdatedDDocJsonObj), + ensure_index_is_opened(DbName, DDocId, "other") + end)}. + +should_not_stop_index_if_index_referred_from_other_ddoc(DbName) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(DbName, <<"doc_id">>, {[]}), + + % Create another design doc with the same indices (with same sig) + OtherDDocJsonObj = ddoc_second_design_doc_with_same_index_json_obj(DbName), + upload_ddoc2(DbName, OtherDDocJsonObj), + + ReqUrl = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + ensure_index_is_opened(DbName, DDocId, "other"), + + DeleteIndexDDocJsonObj = ddoc_delete_index_json_obj(DbName, ?l2b(DDocId)), + upload_ddoc(DbName, DeleteIndexDDocJsonObj), + + % As the index is referred from the other design doc, index should not be closed even if it's deleted from one design doc + ensure_index_is_opened(DbName, "_design/searchddoc2", "other") + end)}. + +ddoc_delete_index_json_obj(DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + UpdatedDDocJsonObj = + {[ + {<<"_id">>, <<"_design/searchddoc">>}, + {<<"_rev">>, ?b2l(couch_doc:rev_to_str(DDoc#doc.revs))}, + {<<"indexes">>, {[ + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} + ]}, + UpdatedDDocJsonObj. + +ddoc_add_index_json_obj(DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + UpdatedDDocJsonObj = + {[ + {<<"_id">>, <<"_design/searchddoc">>}, + {<<"_rev">>, ?b2l(couch_doc:rev_to_str(DDoc#doc.revs))}, + {<<"indexes">>, {[ + {<<"other">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"blah\\\", doc.blah);}}">>} + ]}}, + {<<"other1">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"newfield\\\", doc.blah);}}">>} + ]}}, + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} + ]}, + UpdatedDDocJsonObj. + +ddoc_second_design_doc_with_same_index_json_obj(_DbName) -> + UpdatedDDocJsonObj = + {[ + {<<"_id">>, <<"_design/searchddoc2">>}, + {<<"indexes">>, {[ + {<<"other">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"blah\\\", doc.blah);}}">>} + ]}}, + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} + ]}, + UpdatedDDocJsonObj. + +ensure_index_is_opened(DbName, DDocId, IndexName) -> + IndexWithName = get_index(DbName, ?l2b(DDocId), ?l2b(IndexName)), + ?assertMatch(#index{sig=_Sig}, IndexWithName), + [#index{sig=Sig}] = [IndexWithName], + IndexPids = get_index_by_sig_from_ets(Sig), + ?assert(length(IndexPids) > 0), + AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexPids), + ?assertEqual(length(IndexPids), length(AliveBefore)), + IndexPids. + +ensure_index_is_closed(DbName, DDocId, IndexName, IndexPids) -> + NotAnIndex = get_index(DbName, ?l2b(DDocId), ?l2b(IndexName)), + ?assertEqual(ok, NotAnIndex), + (catch wait_for_indices_to_stop(?l2b(DbName))), + AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexPids), + ?assertEqual(0, length(AliveAfter)), + ok. + +get_index(DbName, DDocId, IndexName) -> + case fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]) of + {ok, DDoc} -> + case dreyfus_index:design_doc_to_index(DDoc, IndexName) of + {ok, Index} -> + Index; + _ -> + ok + end; + _ -> + ok + end. + +get_index_by_sig_from_ets(Sig) -> + Indexes = ets:match_object( + dreyfus_by_db, {'_', {'_', Sig}}), + lists:foldl(fun({DbName, {_DDocId, _Sig}}, Acc) -> + case ets:lookup(dreyfus_by_sig, {DbName, Sig}) of + [{_, Pid}] -> [Pid| Acc]; + _ -> Acc + end + end, [], Indexes). + +wait_for_indices_to_stop(DbName) -> + DbDir = config:get("couchdb", "database_dir", "."), + WaitFun = fun() -> + filelib:fold_files(DbDir, <<".*", DbName/binary, "\\.[0-9]+.*">>, + true, fun(_F, _A) -> wait end, ok) + end, + ok = test_util:wait(WaitFun). + +create_doc(DbName, Id, Body) -> + JsonDoc = couch_util:json_apply_field({<<"_id">>, Id}, Body), + Doc = couch_doc:from_json_obj(JsonDoc), + {ok, _} = fabric:update_docs(DbName, [Doc], [?ADMIN_CTX]), + ok. + +create_db(DbName) -> + {ok, Status, _, _} = test_request:put(db_url(DbName), [?AUTH], ""), + assert_success(create_db, Status), + ok. + +delete_db(DbName) -> + {ok, Status, _, _} = test_request:delete(db_url(DbName), [?AUTH]), + assert_success(delete_db, Status), + ok. + +assert_success(create_db, Status) -> + ?assert(lists:member(Status, [201, 202])); +assert_success(delete_db, Status) -> + ?assert(lists:member(Status, [200, 202])). + +host_url() -> + "http://" ++ bind_address() ++ ":" ++ port(). + +bind_address() -> + config:get(section(), "bind_address", "127.0.0.1"). + +section() -> "chttpd". + +db_url(DbName) when is_binary(DbName) -> + db_url(binary_to_list(DbName)); +db_url(DbName) when is_list(DbName) -> + host_url() ++ "/" ++ DbName. + +port() -> + integer_to_list(mochiweb_socket_server:get(chttpd, port)). + +upload_ddoc(DbName, DDocJson) -> + Url = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc", + Body = couch_util:json_encode(DDocJson), + {ok, 201, _Resp, _Body} = test_request:put(Url, [?AUTH], Body), + ok. + +upload_ddoc2(DbName, DDocJson) -> + Url = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc2", + Body = couch_util:json_encode(DDocJson), + {ok, 201, _Resp, _Body} = test_request:put(Url, [?AUTH], Body), + ok. + +delete_ddoc(DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + Url = host_url() ++ "/" ++ DbName ++ "/_design/searchddoc" ++ "?rev=" ++ ?b2l(couch_doc:rev_to_str(DDoc#doc.revs)), + {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), + ok.