diff --git a/src/dreyfus_index.erl b/src/dreyfus_index.erl index 56bc499..be213e1 100644 --- a/src/dreyfus_index.erl +++ b/src/dreyfus_index.erl @@ -40,6 +40,8 @@ waiting_list=[] }). +-define(INDEX_SHUTDOWN_DELAY, index_shutdown_delay()). + % exported for callback. -export([search_int/2, group1_int/2, group2_int/2, info_int/1]). @@ -157,9 +159,28 @@ handle_call(info, _From, State) -> % obsolete Reply = info_int(State#state.index_pid), {reply, Reply, State}. +handle_cast({ddoc_updated, DDocResult}, State) -> + #state{index=#index{sig=CurrentSig}, waiting_list = WaitList} = State, + case check_if_index_is_deleted(DDocResult, CurrentSig) of + true -> + case WaitList of + [] -> + self() ! index_deleted; + [_Index] -> + erlang:send_after(?INDEX_SHUTDOWN_DELAY, self(), index_deleted) + end + end, + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. +handle_info(index_deleted, State) -> + #state{index=Index} = State, + couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document", + [index_name(Index)]), + {stop, normal, State}; + handle_info({'EXIT', FromPid, {updated, NewSeq}}, #state{ index=Index0, @@ -222,6 +243,35 @@ code_change(_OldVsn, State, _Extra) -> % private functions. +get_index_with_sig(#doc{body={Fields}}=Doc, Sig) -> + RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}), + case RawIndexes of + {IndexList} when is_list(IndexList) -> + {IndexNames, _} = lists:unzip(IndexList), + lists:flatmap( + fun(IndexName) -> + case design_doc_to_index(Doc, IndexName) of + {ok, #index{sig=Sig_New}} when Sig_New =/= Sig -> + []; + {ok, #index{sig=Sig_New}=Index} when Sig_New =:= Sig -> + [Index] + end + end, IndexNames) + end. + +check_if_index_is_deleted(DDocResult, CurrentSig) -> + case DDocResult of + {not_found, deleted} -> + true; + {ok, DDoc} -> + case get_index_with_sig(DDoc, CurrentSig) of + [] -> + true; + [_Index] -> + false + end + end. + open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) -> Path = <>, case clouseau_rpc:open_index(self(), Path, Analyzer) of @@ -367,5 +417,8 @@ group2_int(Pid, QueryArgs0) -> clouseau_rpc:group2(Pid, Props) end. +index_shutdown_delay() -> + config:get_integer("dreyfus", "index_shutdown_delay", 60000). % 1 minute + info_int(Pid) -> clouseau_rpc:info(Pid). diff --git a/src/dreyfus_index_manager.erl b/src/dreyfus_index_manager.erl index 2575294..ebc4e19 100644 --- a/src/dreyfus_index_manager.erl +++ b/src/dreyfus_index_manager.erl @@ -21,6 +21,7 @@ -define(BY_SIG, dreyfus_by_sig). -define(BY_PID, dreyfus_by_pid). +-define(BY_DB, dreyfus_by_db). % public api. -export([start_link/0, get_index/2, get_disk_size/2]). @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) -> % gen_server functions. init([]) -> - ets:new(?BY_SIG, [set, private, named_table]), + ets:new(?BY_SIG, [set, protected, named_table]), ets:new(?BY_PID, [set, private, named_table]), + ets:new(?BY_DB, [bag, protected, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), process_flag(trap_exit, true), {ok, nil}. @@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) -> Reply = clouseau_rpc:disk_size(Path), {reply, Reply, State}; -handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> +handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), [{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}), [gen_server:reply(From, {ok, NewPid}) || From <- WaitList], ets:delete(?BY_PID, OpenerPid), - add_to_ets(NewPid, DbName, Sig), + add_to_ets(NewPid, DbName, DDocId, Sig), {reply, ok, State}; handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) -> @@ -121,13 +123,30 @@ handle_db_event(DbName, created, _St) -> handle_db_event(DbName, deleted, _St) -> gen_server:cast(?MODULE, {cleanup, DbName}), {ok, nil}; +handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) -> + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + couch_log:info("Received ddoc_updated event for ~s",[DDocId]), + DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))], + lists:foreach(fun(DbShard) -> + lists:foreach(fun({_DbShard, {_DDocId, Sig}}) -> + case ets:lookup(?BY_SIG, {DbShard, Sig}) of + [{_, IndexPid}] -> + gen_server:cast(IndexPid, {ddoc_updated, DDocResult}); + [] -> + ok + end + end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})) + end, DbShards), + {ok, nil}; handle_db_event(_DbName, _Event, _St) -> {ok, nil}. -new_index(DbName, #index{sig=Sig}=Index) -> +new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) -> case (catch dreyfus_index:start_link(DbName, Index)) of {ok, NewPid} -> - Msg = {open_ok, DbName, Sig, NewPid}, + Msg = {open_ok, DbName, DDocId, Sig, NewPid}, ok = gen_server:call(?MODULE, Msg, infinity), unlink(NewPid); Error -> @@ -135,11 +154,18 @@ new_index(DbName, #index{sig=Sig}=Index) -> ok = gen_server:call(?MODULE, Msg, infinity) end. -add_to_ets(Pid, DbName, Sig) -> +add_to_ets(Pid, DbName, DDocId, Sig) -> true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}). + true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), + true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). delete_from_ets(Pid, DbName, Sig) -> + case ets:match_object(?BY_DB, {DbName, {'$1', Sig}}) of + [{DbName, {DDocId, Sig}}] -> + true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}); + _Else -> + true + end, true = ets:delete(?BY_PID, Pid), true = ets:delete(?BY_SIG, {DbName, Sig}). diff --git a/test/.DS_Store b/test/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/test/.DS_Store differ diff --git a/test/search_index_ddoc_update_tests.erl b/test/search_index_ddoc_update_tests.erl new file mode 100644 index 0000000..ba338dc --- /dev/null +++ b/test/search_index_ddoc_update_tests.erl @@ -0,0 +1,277 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(search_index_ddoc_update_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include("dreyfus.hrl"). + +-define(DDOC, {[ + {<<"_id">>, <<"_design/searchddoc">>}, + {<<"indexes">>, {[ + {<<"other">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"blah\\\", doc.blah);}}">>} + ]}}, + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} +]}). + +-define(USER, "admin"). +-define(PASS, "pass"). +-define(AUTH, {basic_auth, {?USER, ?PASS}}). + + +start() -> + Ctx = test_util:start_couch([chttpd, dreyfus]), + ok = config:set("admins", ?USER, ?PASS, _Persist=false), + Ctx. + +setup(PortType) -> + ok = meck:new(mochiweb_socket, [passthrough]), + ok = meck:expect(mochiweb_socket, recv, fun mochiweb_socket_recv/3), + + DbName = ?tempdb(), + ok = create_db(PortType, DbName), + fake_clouseau(), + fake_dreyfus_index(), + + Host = host_url(PortType), + upload_ddoc(Host, ?b2l(DbName), ?DDOC), + {Host, ?b2l(DbName)}. + +teardown(Ctx) -> + ok = config:delete("admins", ?USER, _Persist=false), + (catch meck:unload(clouseau_rpc)), + (catch meck:unload(dreyfus_index)), + test_util:stop_couch(Ctx). + +teardown(PortType, {_Host, DbName}) -> + (catch meck:unload(mochiweb_socket)), + (catch meck:unload(clouseau_rpc)), + (catch meck:unload(dreyfus_index)), + delete_db(PortType, ?l2b(DbName)), + ok. + +search_test_() -> + { + "Check search index closes after delete", + { + setup, + fun start/0, fun teardown/1, + [ + make_test_case(clustered, [fun should_stop_indexing_after_deleting_index/2]), + make_test_case(clustered, [fun should_stop_indexing_after_deleting_ddoc/2]) + ] + } + }. + +fake_clouseau() -> + ok = meck:new([clouseau_rpc], [non_strict]), + ok = meck:expect(clouseau_rpc, open_index, ['_', '_', '_'], {ok, self()}), + ok = meck:expect(clouseau_rpc, await, ['_', '_'], ok), + ok = meck:expect(clouseau_rpc, get_update_seq, ['_'], {ok, 10}), + ok = meck:expect(clouseau_rpc, info, ['_'], {ok, [{disk_size, 10}, + {doc_count, 10}, + {doc_del_count, 0}, + {pending_seq, 11}, + {committed_seq, 10}]}), + ok = meck:expect(clouseau_rpc, search, ['_', '_'], {ok, [ + {update_seq, 10}, + {total_hits, 0}, + {hits, []} + ]}), + ok = meck:expect(clouseau_rpc, update, ['_', '_', '_'], ok), + ok = meck:expect(clouseau_rpc, commit, ['_', '_'], ok), + ok = meck:expect(clouseau_rpc, search, ['_', '_', '_', '_', '_', '_'], {ok, [ + {update_seq, 10}, + {total_hits, 0}, + {hits, []} + ]}). + +fake_dreyfus_index() -> + ok = meck:new([dreyfus_index], [passthrough]), + ok = meck:expect(dreyfus_index, await, ['_', '_'], ok), + ok = meck:expect(dreyfus_index, search, ['_', '_'], {ok, #top_docs{ + update_seq = 10, + total_hits = 0, + hits = [] + }}). + +make_test_case(Mod, Funs) -> + { + lists:flatten(io_lib:format("~s", [Mod])), + {foreachx, fun setup/1, fun teardown/2, [{Mod, Fun} || Fun <- Funs]} + }. + +should_stop_indexing_after_deleting_index(PortType, {Host, DbName}) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(PortType, ?l2b(DbName), <<"doc_id">>, {[]}), + ReqUrl = Host ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + Sig = ensure_index_is_opened(DbName, DDocId, "other"), + + % Upload the new DDoc which will delete the search index named "other". And verify that the index will be closed + UpdatedDDocJsonObj = updated_ddoc_json_obj(DbName, ?l2b(DDocId)), + upload_ddoc(Host, DbName, UpdatedDDocJsonObj), + ensure_index_is_closed(DbName, "_design/searchddoc", "other", Sig) + end)}. + +should_stop_indexing_after_deleting_ddoc(PortType, {Host, DbName}) -> + {timeout, 60, + ?_test(begin + ok = meck:reset(clouseau_rpc), + ok = meck:reset(dreyfus_index), + ok = create_doc(PortType, ?l2b(DbName), <<"doc_id">>, {[]}), + ReqUrl = Host ++ "/" ++ DbName ++ "/_design/searchddoc/_search/other?q=*:*", + {ok, Status, _Headers, _Body} = + test_request:get(ReqUrl, [?AUTH]), + ?assertEqual(200, Status), + DDocId = "_design/searchddoc", + Sig = ensure_index_is_opened(DbName, DDocId, "other"), + + % Delete the design doc and verify that the index will be closed + delete_ddoc(Host, DbName, ?l2b(DDocId)), + ensure_index_is_closed(DbName, "_design/searchddoc", "other", Sig) + end)}. + +updated_ddoc_json_obj(DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + % {DDocJsonObj} = ?DDOC_UPDATE, + % Rev = {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}, + % DDocUpdated = {[Rev] ++ DDocJsonObj}, + % DDocUpdated. + UpdatedDDocJsonObj = + {[ + {<<"_id">>, <<"_design/searchddoc">>}, + {<<"_rev">>, ?b2l(couch_doc:rev_to_str(DDoc#doc.revs))}, + {<<"indexes">>, {[ + {<<"default">>, {[ + {<<"index">>, <<"function(doc){if(doc.blah){index(\\\"default\\\", doc.blah);}}">>} + ]}} + ]}} + ]}, + UpdatedDDocJsonObj. + +ensure_index_is_opened(DbName, DDocId, IndexName) -> + IndexWithName = getIndex(DbName, ?l2b(DDocId), ?l2b(IndexName)), + ?assertMatch(#index{sig=_Sig}, IndexWithName), + [#index{sig=Sig}] = [IndexWithName], + IndexPidsFromEts = get_index_by_sig_from_ets(?l2b(DDocId), Sig), + ?assert(length(IndexPidsFromEts) > 0), + Sig. + +ensure_index_is_closed(DbName, DDocId, IndexName, Sig) -> + NotAnIndex = getIndex(DbName, ?l2b(DDocId), ?l2b(IndexName)), + ?assertEqual(ok, NotAnIndex), + (catch wait_for_indices_to_stop(?l2b(DbName))), + IndexPidsFromEts = get_index_by_sig_from_ets(?l2b(DDocId), Sig), + ?assertEqual(0, length(IndexPidsFromEts)), + ok. + +getIndex(DbName, DDocId, IndexName) -> + case fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]) of + {ok, DDoc} -> + case dreyfus_index:design_doc_to_index(DDoc, IndexName) of + {ok, Index} -> + Index; + _ -> + ok + end; + _ -> + ok + end. + +get_index_by_sig_from_ets(DDocId, Sig) -> + Indexes = ets:match_object( + dreyfus_by_db, {'$1', {DDocId, Sig}}), + lists:foldl(fun({DbName, {_DDocId, _Sig}}, Acc) -> + case ets:lookup(dreyfus_by_sig, {DbName, Sig}) of + [{_, Pid}] -> [Pid| Acc]; + _ -> Acc + end + end, [], Indexes). + +wait_for_indices_to_stop(DbName) -> + DbDir = config:get("couchdb", "database_dir", "."), + WaitFun = fun() -> + filelib:fold_files(DbDir, <<".*", DbName/binary, "\\.[0-9]+.*">>, + true, fun(_F, _A) -> wait end, ok) + end, + ok = test_util:wait(WaitFun). + +create_doc(clustered, DbName, Id, Body) -> + JsonDoc = couch_util:json_apply_field({<<"_id">>, Id}, Body), + Doc = couch_doc:from_json_obj(JsonDoc), + {ok, _} = fabric:update_docs(DbName, [Doc], [?ADMIN_CTX]), + ok. + +create_db(clustered, DbName) -> + {ok, Status, _, _} = test_request:put(db_url(DbName), [?AUTH], ""), + assert_success(create_db, Status), + ok. + +delete_db(clustered, DbName) -> + {ok, Status, _, _} = test_request:delete(db_url(DbName), [?AUTH]), + assert_success(delete_db, Status), + ok. + +assert_success(create_db, Status) -> + ?assert(lists:member(Status, [201, 202])); +assert_success(delete_db, Status) -> + ?assert(lists:member(Status, [200, 202])). + +host_url(PortType) -> + "http://" ++ bind_address(PortType) ++ ":" ++ port(PortType). + +bind_address(PortType) -> + config:get(section(PortType), "bind_address", "127.0.0.1"). + +section(clustered) -> "chttpd". + +db_url(DbName) when is_binary(DbName) -> + db_url(binary_to_list(DbName)); +db_url(DbName) when is_list(DbName) -> + host_url(clustered) ++ "/" ++ DbName. + +port(clustered) -> + integer_to_list(mochiweb_socket_server:get(chttpd, port)). + +upload_ddoc(Host, DbName, DDocJson) -> + Url = Host ++ "/" ++ DbName ++ "/_design/searchddoc", + Body = couch_util:json_encode(DDocJson), + {ok, 201, _Resp, _Body} = test_request:put(Url, [?AUTH], Body), + ok. + +delete_ddoc(Host, DbName, DDocId) -> + {ok, DDoc} = fabric:open_doc(DbName, <>, [ejson_body, ?ADMIN_CTX]), + Url = Host ++ "/" ++ DbName ++ "/_design/searchddoc" ++ "?rev=" ++ ?b2l(couch_doc:rev_to_str(DDoc#doc.revs)), + {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), + ok. + +mochiweb_socket_recv(Sock, Len, Timeout) -> + case meck:passthrough([Sock, Len, Timeout]) of + {ok, <<"{truncated}">>} -> + {error, closed}; + {ok, Data} -> + {ok, Data}; + Else -> + Else + end.