Skip to content

Commit

Permalink
Stop search indexer when it is deleted.
Browse files Browse the repository at this point in the history
Allow dreyfus to listen to ddoc_updated events and check if the index
is deleted as a result of update to it's design document. If the index
is deleted then it will stop the index if waitlist is empty, if the
waitlist is not empty then it will wait for a minute before shutting
down.

BugzId:85718
  • Loading branch information
brkolla committed Jun 15, 2017
1 parent d838881 commit 7e0d982
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 7 deletions.
51 changes: 51 additions & 0 deletions src/dreyfus_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
waiting_list=[]
}).

-define(INDEX_SHUTDOWN_DELAY,
config:get("dreyfus", "index_shutdown_delay", 60000)). % 1 minute

% exported for callback.
-export([search_int/2, group1_int/2, group2_int/2, info_int/1]).

Expand Down Expand Up @@ -157,9 +160,28 @@ handle_call(info, _From, State) -> % obsolete
Reply = info_int(State#state.index_pid),
{reply, Reply, State}.

handle_cast({ddoc_updated, DDocResult}, State) ->
#state{index=#index{sig=CurrentSig}, waiting_list = WaitList} = State,
case check_if_index_is_deleted(DDocResult, CurrentSig) of
true ->
case WaitList of
[] ->
self() ! index_deleted;
[_Index] ->
erlang:send_after(?INDEX_SHUTDOWN_DELAY, self(), index_deleted)
end
end,
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(index_deleted, State) ->
#state{index=Index} = State,
couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document",
[index_name(Index)]),
{stop, normal, State};

handle_info({'EXIT', FromPid, {updated, NewSeq}},
#state{
index=Index0,
Expand Down Expand Up @@ -222,6 +244,35 @@ code_change(_OldVsn, State, _Extra) ->

% private functions.

get_index_with_sig(#doc{body={Fields}}=Doc, Sig) ->
RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}),
case RawIndexes of
{IndexList} when is_list(IndexList) ->
{IndexNames, _} = lists:unzip(IndexList),
lists:flatmap(
fun(IndexName) ->
case design_doc_to_index(Doc, IndexName) of
{ok, #index{sig=Sig_New}} when Sig_New =/= Sig ->
[];
{ok, #index{sig=Sig_New}=Index} when Sig_New =:= Sig ->
[Index]
end
end, IndexNames)
end.

check_if_index_is_deleted(DDocResult, CurrentSig) ->
case DDocResult of
{not_found, deleted} ->
true;
{ok, DDoc} ->
case get_index_with_sig(DDoc, CurrentSig) of
[] ->
true;
[_Index] ->
false
end
end.

open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
Path = <<DbName/binary,"/",Sig/binary>>,
case clouseau_rpc:open_index(self(), Path, Analyzer) of
Expand Down
40 changes: 33 additions & 7 deletions src/dreyfus_index_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

-define(BY_SIG, dreyfus_by_sig).
-define(BY_PID, dreyfus_by_pid).
-define(BY_DB, dreyfus_by_db).

% public api.
-export([start_link/0, get_index/2, get_disk_size/2]).
Expand All @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) ->
% gen_server functions.

init([]) ->
ets:new(?BY_SIG, [set, private, named_table]),
ets:new(?BY_SIG, [set, protected, named_table]),
ets:new(?BY_PID, [set, private, named_table]),
ets:new(?BY_DB, [bag, protected, named_table]),
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
process_flag(trap_exit, true),
{ok, nil}.
Expand All @@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) ->
Reply = clouseau_rpc:disk_size(Path),
{reply, Reply, State};

handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) ->
handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) ->
link(NewPid),
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, {ok, NewPid}) || From <- WaitList],
ets:delete(?BY_PID, OpenerPid),
add_to_ets(NewPid, DbName, Sig),
add_to_ets(NewPid, DbName, DDocId, Sig),
{reply, ok, State};

handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) ->
Expand Down Expand Up @@ -121,25 +123,49 @@ handle_db_event(DbName, created, _St) ->
handle_db_event(DbName, deleted, _St) ->
gen_server:cast(?MODULE, {cleanup, DbName}),
{ok, nil};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) ->
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
couch_log:info("Received ddoc_updated event for ~s",[DDocId]),
DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
lists:foreach(fun(DbShard) ->
lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbShard, Sig}) of
[{_, IndexPid}] ->
gen_server:cast(IndexPid, {ddoc_updated, DDocResult});
[] ->
ok
end
end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}))
end, DbShards),
{ok, nil};
handle_db_event(_DbName, _Event, _St) ->
{ok, nil}.

new_index(DbName, #index{sig=Sig}=Index) ->
new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) ->
case (catch dreyfus_index:start_link(DbName, Index)) of
{ok, NewPid} ->
Msg = {open_ok, DbName, Sig, NewPid},
Msg = {open_ok, DbName, DDocId, Sig, NewPid},
ok = gen_server:call(?MODULE, Msg, infinity),
unlink(NewPid);
Error ->
Msg = {open_error, DbName, Sig, Error},
ok = gen_server:call(?MODULE, Msg, infinity)
end.

add_to_ets(Pid, DbName, Sig) ->
add_to_ets(Pid, DbName, DDocId, Sig) ->
true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}).
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).

delete_from_ets(Pid, DbName, Sig) ->
case ets:match_object(?BY_DB, {DbName, {'$1', Sig}}) of
[{DbName, {DDocId, Sig}}] ->
true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}});
_Else ->
true
end,
true = ets:delete(?BY_PID, Pid),
true = ets:delete(?BY_SIG, {DbName, Sig}).

Binary file added test/.DS_Store
Binary file not shown.
Loading

0 comments on commit 7e0d982

Please sign in to comment.