Skip to content

Commit

Permalink
Stop search indexer when it is deleted.
Browse files Browse the repository at this point in the history
Allow dreyfus to listen to ddoc_updated events and check if the index
is deleted as a result of update to it's design document. If the index
is deleted then it will stop the index. If there are any pids waiting
after 30 seconds, it will send a message 'ddoc_updates' before it stops
the index.

BugzId:85718
  • Loading branch information
brkolla committed Jul 26, 2017
1 parent d838881 commit ce30124
Show file tree
Hide file tree
Showing 4 changed files with 445 additions and 9 deletions.
72 changes: 70 additions & 2 deletions src/dreyfus_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,26 @@ handle_call(info, _From, State) -> % obsolete
Reply = info_int(State#state.index_pid),
{reply, Reply, State}.

handle_cast({ddoc_updated, DDocResult}, #state{} = State) ->
#index{sig = Sig, dbname = DbName} = State#state.index,
case check_if_index_is_deleted(DbName, DDocResult, Sig) of
true ->
% The intention of this arbitrary delay is to avoid the errors caused by deleting an index while it's being queried. Currently there is no easy way to identify the pids that are waiting for indexing vs the pids that are actively querying, hence we chose this approach to shutdown the index after the delay.
erlang:send_after(index_shutdown_delay(), self(), index_deleted),
{noreply, State};
false ->
{noreply, State}
end;

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(index_deleted, State) ->
#state{index=Index} = State,
couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document",
[index_name(Index)]),
{stop, {shutdown, ddoc_updated}, State};

handle_info({'EXIT', FromPid, {updated, NewSeq}},
#state{
index=Index0,
Expand Down Expand Up @@ -214,13 +231,61 @@ handle_info({'DOWN',_,_,Pid,Reason}, #state{
[gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList],
{stop, normal, State}.

terminate(_Reason, _State) ->
ok.
terminate(Reason, State) ->
case Reason of
{shutdown, ddoc_updated} ->
send_all(State#state.waiting_list, ddoc_updated);
_ ->
ok
end.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

% private functions.
send_all(Waiters, Reply) ->
[gen_server:reply(From, Reply) || {From, _} <- Waiters].

check_if_index_is_deleted(DbName, DDocResult, CurrentSig) ->
case DDocResult of
{not_found, deleted} ->
IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig);
{ok, DDoc} ->
case indices_matching_sig({DDoc, CurrentSig}) of
[] ->
% check if index is referred from other ddocs
IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig);
IndicesWithSig ->
IndicesWithSig
end
end,

case IndicesWithSig of
[] ->
true;
[_H | _T] ->
false
end.

indices_with_sig_from_other_ddocs(DbName, CurrentSig) ->
{ok, DesignDocs} = fabric:design_docs(mem3:dbname(DbName)),
ActiveSigs = lists:usort(lists:flatmap(
fun indices_matching_sig/1,
[{couch_doc:from_json_obj(DD), CurrentSig} || DD <- DesignDocs])),
ActiveSigs.

indices_matching_sig({#doc{body={Fields}}=Doc, CurrentSig}) ->
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
{IndexNames, _} = lists:unzip(RawIndexes),
lists:filter(fun check_if_index_matches_sig/1, [{Doc, IndexName, CurrentSig} || IndexName <- IndexNames]).

check_if_index_matches_sig({Doc, IndexName, Sig}) ->
case design_doc_to_index(Doc, IndexName) of
{ok, #index{sig=Sig_New}} when Sig_New =/= Sig ->
false;
{ok, #index{sig=Sig_New}} when Sig_New =:= Sig ->
true
end.

open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
Path = <<DbName/binary,"/",Sig/binary>>,
Expand Down Expand Up @@ -367,5 +432,8 @@ group2_int(Pid, QueryArgs0) ->
clouseau_rpc:group2(Pid, Props)
end.

index_shutdown_delay() ->
config:get_integer("dreyfus", "index_shutdown_delay", 30000). % 30 seconds

info_int(Pid) ->
clouseau_rpc:info(Pid).
40 changes: 33 additions & 7 deletions src/dreyfus_index_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

-define(BY_SIG, dreyfus_by_sig).
-define(BY_PID, dreyfus_by_pid).
-define(BY_DB, dreyfus_by_db).

% public api.
-export([start_link/0, get_index/2, get_disk_size/2]).
Expand All @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) ->
% gen_server functions.

init([]) ->
ets:new(?BY_SIG, [set, private, named_table]),
ets:new(?BY_SIG, [set, protected, named_table]),
ets:new(?BY_PID, [set, private, named_table]),
ets:new(?BY_DB, [bag, protected, named_table]),
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
process_flag(trap_exit, true),
{ok, nil}.
Expand All @@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) ->
Reply = clouseau_rpc:disk_size(Path),
{reply, Reply, State};

handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) ->
handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) ->
link(NewPid),
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, {ok, NewPid}) || From <- WaitList],
ets:delete(?BY_PID, OpenerPid),
add_to_ets(NewPid, DbName, Sig),
add_to_ets(NewPid, DbName, DDocId, Sig),
{reply, ok, State};

handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) ->
Expand Down Expand Up @@ -121,25 +123,49 @@ handle_db_event(DbName, created, _St) ->
handle_db_event(DbName, deleted, _St) ->
gen_server:cast(?MODULE, {cleanup, DbName}),
{ok, nil};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) ->
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
couch_log:info("Received ddoc_updated event for ~s",[DDocId]),
DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
lists:foreach(fun(DbShard) ->
lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbShard, Sig}) of
[{_, IndexPid}] ->
gen_server:cast(IndexPid, {ddoc_updated, DDocResult});
[] ->
ok
end
end, ets:match_object(?BY_DB, {DbShard, {DDocId, '_'}}))
end, DbShards),
{ok, nil};
handle_db_event(_DbName, _Event, _St) ->
{ok, nil}.

new_index(DbName, #index{sig=Sig}=Index) ->
new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) ->
case (catch dreyfus_index:start_link(DbName, Index)) of
{ok, NewPid} ->
Msg = {open_ok, DbName, Sig, NewPid},
Msg = {open_ok, DbName, DDocId, Sig, NewPid},
ok = gen_server:call(?MODULE, Msg, infinity),
unlink(NewPid);
Error ->
Msg = {open_error, DbName, Sig, Error},
ok = gen_server:call(?MODULE, Msg, infinity)
end.

add_to_ets(Pid, DbName, Sig) ->
add_to_ets(Pid, DbName, DDocId, Sig) ->
true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}).
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).

delete_from_ets(Pid, DbName, Sig) ->
case ets:match_object(?BY_DB, {DbName, {'_', Sig}}) of
[{DbName, {DDocId, Sig}}] ->
true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}});
_Else ->
true
end,
true = ets:delete(?BY_PID, Pid),
true = ets:delete(?BY_SIG, {DbName, Sig}).

Binary file added test/.DS_Store
Binary file not shown.
Loading

0 comments on commit ce30124

Please sign in to comment.