Skip to content

Commit

Permalink
Stop search indexer when it is deleted.
Browse files Browse the repository at this point in the history
Allow dreyfus to listen to ddoc_updated events and check if the index
is deleted as a result of update to it's design document. If the index
is deleted then it will stop the index. If there are any pids waiting
after 30 seconds, it will send a message 'ddoc_updates' before it stops
the index.

BugzId:85718
  • Loading branch information
brkolla committed Jul 19, 2017
1 parent d838881 commit 10c62ee
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 9 deletions.
71 changes: 69 additions & 2 deletions src/dreyfus_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,25 @@ handle_call(info, _From, State) -> % obsolete
Reply = info_int(State#state.index_pid),
{reply, Reply, State}.

handle_cast({ddoc_updated, DDocResult}, #state{} = State) ->
#index{sig = Sig, dbname = DbName} = State#state.index,
case check_if_index_is_deleted(DbName, DDocResult, Sig) of
true ->
erlang:send_after(index_shutdown_delay(), self(), index_deleted),
{noreply, State};
false ->
{noreply, State}
end;

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(index_deleted, State) ->
#state{index=Index} = State,
couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document",
[index_name(Index)]),
{stop, {shutdown, ddoc_updated}, State};

handle_info({'EXIT', FromPid, {updated, NewSeq}},
#state{
index=Index0,
Expand Down Expand Up @@ -214,13 +230,61 @@ handle_info({'DOWN',_,_,Pid,Reason}, #state{
[gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList],
{stop, normal, State}.

terminate(_Reason, _State) ->
ok.
terminate(Reason, State) ->
case Reason of
{shutdown, ddoc_updated} ->
send_all(State#state.waiting_list, ddoc_updated);
_ ->
ok
end.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

% private functions.
send_all(Waiters, Reply) ->
[gen_server:reply(From, Reply) || {From, _} <- Waiters].

check_if_index_is_deleted(DbName, DDocResult, CurrentSig) ->
case DDocResult of
{not_found, deleted} ->
IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig);
{ok, DDoc} ->
case indices_matching_sig({DDoc, CurrentSig}) of
[] ->
% check if index is referred from other ddocs
IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig);
IndicesWithSig ->
IndicesWithSig
end
end,

case IndicesWithSig of
[] ->
true;
[_H | _T] ->
false
end.

indices_with_sig_from_other_ddocs(DbName, CurrentSig) ->
{ok, DesignDocs} = fabric:design_docs(mem3:dbname(DbName)),
ActiveSigs = lists:usort(lists:flatmap(
fun indices_matching_sig/1,
[{couch_doc:from_json_obj(DD), CurrentSig} || DD <- DesignDocs])),
ActiveSigs.

indices_matching_sig({#doc{body={Fields}}=Doc, CurrentSig}) ->
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
{IndexNames, _} = lists:unzip(RawIndexes),
lists:filter(fun check_if_index_matches_sig/1, [{Doc, IndexName, CurrentSig} || IndexName <- IndexNames]).

check_if_index_matches_sig({Doc, IndexName, Sig}) ->
case design_doc_to_index(Doc, IndexName) of
{ok, #index{sig=Sig_New}} when Sig_New =/= Sig ->
false;
{ok, #index{sig=Sig_New}} when Sig_New =:= Sig ->
true
end.

open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
Path = <<DbName/binary,"/",Sig/binary>>,
Expand Down Expand Up @@ -367,5 +431,8 @@ group2_int(Pid, QueryArgs0) ->
clouseau_rpc:group2(Pid, Props)
end.

index_shutdown_delay() ->
config:get_integer("dreyfus", "index_shutdown_delay", 30000). % 30 seconds

info_int(Pid) ->
clouseau_rpc:info(Pid).
40 changes: 33 additions & 7 deletions src/dreyfus_index_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

-define(BY_SIG, dreyfus_by_sig).
-define(BY_PID, dreyfus_by_pid).
-define(BY_DB, dreyfus_by_db).

% public api.
-export([start_link/0, get_index/2, get_disk_size/2]).
Expand All @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) ->
% gen_server functions.

init([]) ->
ets:new(?BY_SIG, [set, private, named_table]),
ets:new(?BY_SIG, [set, protected, named_table]),
ets:new(?BY_PID, [set, private, named_table]),
ets:new(?BY_DB, [bag, protected, named_table]),
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
process_flag(trap_exit, true),
{ok, nil}.
Expand All @@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) ->
Reply = clouseau_rpc:disk_size(Path),
{reply, Reply, State};

handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) ->
handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) ->
link(NewPid),
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, {ok, NewPid}) || From <- WaitList],
ets:delete(?BY_PID, OpenerPid),
add_to_ets(NewPid, DbName, Sig),
add_to_ets(NewPid, DbName, DDocId, Sig),
{reply, ok, State};

handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) ->
Expand Down Expand Up @@ -121,25 +123,49 @@ handle_db_event(DbName, created, _St) ->
handle_db_event(DbName, deleted, _St) ->
gen_server:cast(?MODULE, {cleanup, DbName}),
{ok, nil};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) ->
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
couch_log:info("Received ddoc_updated event for ~s",[DDocId]),
DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
lists:foreach(fun(DbShard) ->
lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbShard, Sig}) of
[{_, IndexPid}] ->
gen_server:cast(IndexPid, {ddoc_updated, DDocResult});
[] ->
ok
end
end, ets:match_object(?BY_DB, {DbShard, {DDocId, '_'}}))
end, DbShards),
{ok, nil};
handle_db_event(_DbName, _Event, _St) ->
{ok, nil}.

new_index(DbName, #index{sig=Sig}=Index) ->
new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) ->
case (catch dreyfus_index:start_link(DbName, Index)) of
{ok, NewPid} ->
Msg = {open_ok, DbName, Sig, NewPid},
Msg = {open_ok, DbName, DDocId, Sig, NewPid},
ok = gen_server:call(?MODULE, Msg, infinity),
unlink(NewPid);
Error ->
Msg = {open_error, DbName, Sig, Error},
ok = gen_server:call(?MODULE, Msg, infinity)
end.

add_to_ets(Pid, DbName, Sig) ->
add_to_ets(Pid, DbName, DDocId, Sig) ->
true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}).
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).

delete_from_ets(Pid, DbName, Sig) ->
case ets:match_object(?BY_DB, {DbName, {'_', Sig}}) of
[{DbName, {DDocId, Sig}}] ->
true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}});
_Else ->
true
end,
true = ets:delete(?BY_PID, Pid),
true = ets:delete(?BY_SIG, {DbName, Sig}).

Binary file added test/.DS_Store
Binary file not shown.
Loading

0 comments on commit 10c62ee

Please sign in to comment.