Skip to content

Commit

Permalink
Merge pull request #836 from aaxelb/fix/indexer-fallback-failure
Browse files Browse the repository at this point in the history
[ENG-6654] fixfix: correct misunderstanding, handle conflicts
  • Loading branch information
aaxelb authored Dec 5, 2024
2 parents c85fb0d + 0880650 commit 317b7dc
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 67 deletions.
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ services:

worker:
image: quay.io/centerforopenscience/share:develop
command: /usr/local/bin/celery --app project worker --uid daemon -l INFO
command:
chown -R daemon:daemon /elastic8_certs/ &&
/usr/local/bin/celery --app project worker --uid daemon -l INFO
depends_on:
- postgres
- rabbitmq
Expand Down
1 change: 1 addition & 0 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ def split(string, delim):
'TIMEOUT': int(os.environ.get('ELASTICSEARCH_TIMEOUT', '45')),
'CHUNK_SIZE': int(os.environ.get('ELASTICSEARCH_CHUNK_SIZE', 2000)),
'MAX_RETRIES': int(os.environ.get('ELASTICSEARCH_MAX_RETRIES', 7)),
'POST_INDEX_DELAY': int(os.environ.get('ELASTICSEARCH_POST_INDEX_DELAY', 3)),
}
ELASTICSEARCH5_URL = (
os.environ.get('ELASTICSEARCH5_URL')
Expand Down
31 changes: 3 additions & 28 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import threading
import time

import amqp.exceptions
from django.conf import settings
import kombu
from kombu.mixins import ConsumerMixin
Expand Down Expand Up @@ -61,8 +60,6 @@ def start_daemonthreads_for_strategy(self, index_strategy):
index_strategy=index_strategy,
message_callback=_daemon.on_message,
)
# give the daemon a more robust callback for ack-ing
_daemon.ack_callback = _consumer.ensure_ack
# spin up daemonthreads, ready for messages
self._daemonthreads.extend(_daemon.start())
# start a thread to consume messages from this strategy's queues
Expand All @@ -82,7 +79,7 @@ def stop_daemonthreads(self, *, wait=False):


class KombuMessageConsumer(ConsumerMixin):
PREFETCH_COUNT = 7500
PREFETCH_COUNT = settings.ELASTICSEARCH['CHUNK_SIZE']

should_stop: bool # (from ConsumerMixin)

Expand Down Expand Up @@ -130,28 +127,9 @@ def consume(self, *args, **kwargs):
consume = self.connection.ensure(self.connection, super().consume)
return consume(*args, **kwargs)

def ensure_ack(self, daemon_message: messages.DaemonMessage):
# if the connection the message came thru is no longer usable,
# use `kombu.Connection.autoretry` to revive it for an ack
try:
daemon_message.ack()
except (ConnectionError, amqp.exceptions.ConnectionError):
@self.connection.autoretry
def _do_ack(*, channel):
try:
channel.basic_ack(daemon_message.kombu_message.delivery_tag)
finally:
channel.close()
_do_ack()


def _default_ack_callback(daemon_message: messages.DaemonMessage) -> None:
daemon_message.ack()


class IndexerDaemon:
MAX_LOCAL_QUEUE_SIZE = 5000
ack_callback: Callable[[messages.DaemonMessage], None]
MAX_LOCAL_QUEUE_SIZE = settings.ELASTICSEARCH['CHUNK_SIZE']

def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None):
self.stop_event = (
Expand All @@ -163,7 +141,6 @@ def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None
self.__daemonthread_context = daemonthread_context or contextlib.nullcontext
self.__local_message_queues = {}
self.__started = False
self.ack_callback = _default_ack_callback

def start(self) -> list[threading.Thread]:
if self.__started:
Expand Down Expand Up @@ -192,7 +169,6 @@ def start_typed_loop_and_queue(self, message_type) -> threading.Thread:
local_message_queue=_queue_from_rabbit_to_daemon,
log_prefix=f'{repr(self)} MessageHandlingLoop: ',
daemonthread_context=self.__daemonthread_context,
ack_callback=self.ack_callback,
)
return _handling_loop.start_thread()

Expand Down Expand Up @@ -226,7 +202,6 @@ class MessageHandlingLoop:
local_message_queue: queue.Queue
log_prefix: str
daemonthread_context: Callable[[], contextlib.AbstractContextManager]
ack_callback: Callable[[messages.DaemonMessage], None]
_leftover_daemon_messages_by_target_id = None

def __post_init__(self):
Expand Down Expand Up @@ -310,7 +285,7 @@ def _handle_some_messages(self):
sentry_sdk.capture_message('error handling message', extras={'message_response': message_response})
target_id = message_response.index_message.target_id
for daemon_message in daemon_messages_by_target_id.pop(target_id, ()):
self.ack_callback(daemon_message)
daemon_message.ack() # finally set it free
if daemon_messages_by_target_id: # should be empty by now
logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id))
sentry_sdk.capture_message(
Expand Down
69 changes: 47 additions & 22 deletions share/search/index_strategy/elastic8.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ def index_mappings(self):
raise NotImplementedError

@abc.abstractmethod
def build_elastic_actions(self, messages_chunk: messages.MessagesChunk) -> typing.Iterable[tuple[int, dict]]:
# yield (message_target_id, elastic_action) pairs
def build_elastic_actions(
self,
messages_chunk: messages.MessagesChunk,
) -> typing.Iterable[tuple[int, dict | typing.Iterable[dict]]]:
# yield (message_target_id, [elastic_action, ...]) pairs
raise NotImplementedError

def before_chunk(
Expand Down Expand Up @@ -148,10 +151,17 @@ def pls_handle_messages_chunk(self, messages_chunk):
_indexname = _response_body['_index']
_is_done = _ok or (_op_type == 'delete' and _status == 404)
if _is_done:
_action_tracker.action_done(_indexname, _docid)
_finished_message_id = _action_tracker.action_done(_indexname, _docid)
if _finished_message_id is not None:
yield messages.IndexMessageResponse(
is_done=True,
index_message=messages.IndexMessage(messages_chunk.message_type, _finished_message_id),
status_code=HTTPStatus.OK.value,
error_text=None,
)
_action_tracker.forget_message(_finished_message_id)
else:
_action_tracker.action_errored(_indexname, _docid)
# yield error responses immediately
yield messages.IndexMessageResponse(
is_done=False,
index_message=messages.IndexMessage(
Expand All @@ -161,16 +171,14 @@ def pls_handle_messages_chunk(self, messages_chunk):
status_code=_status,
error_text=str(_response_body),
)
self.after_chunk(messages_chunk, _indexnames)
# yield successes after the whole chunk completes
# (since one message may involve several actions)
for _messageid in _action_tracker.all_done_messages():
for _message_id in _action_tracker.remaining_done_messages():
yield messages.IndexMessageResponse(
is_done=True,
index_message=messages.IndexMessage(messages_chunk.message_type, _messageid),
index_message=messages.IndexMessage(messages_chunk.message_type, _message_id),
status_code=HTTPStatus.OK.value,
error_text=None,
)
self.after_chunk(messages_chunk, _indexnames)

# abstract method from IndexStrategy
def pls_make_default_for_searching(self, specific_index: IndexStrategy.SpecificIndex):
Expand Down Expand Up @@ -202,14 +210,18 @@ def _alias_for_keeping_live(self):
def _elastic_actions_with_index(self, messages_chunk, indexnames, action_tracker: _ActionTracker):
if not indexnames:
raise ValueError('cannot index to no indexes')
for _message_target_id, _elastic_action in self.build_elastic_actions(messages_chunk):
_docid = _elastic_action['_id']
for _indexname in indexnames:
action_tracker.add_action(_message_target_id, _indexname, _docid)
yield {
**_elastic_action,
'_index': _indexname,
}
for _message_target_id, _elastic_actions in self.build_elastic_actions(messages_chunk):
if isinstance(_elastic_actions, dict): # allow a single action
_elastic_actions = [_elastic_actions]
for _elastic_action in _elastic_actions:
_docid = _elastic_action['_id']
for _indexname in indexnames:
action_tracker.add_action(_message_target_id, _indexname, _docid)
yield {
**_elastic_action,
'_index': _indexname,
}
action_tracker.done_scheduling(_message_target_id)

def _get_indexnames_for_alias(self, alias_name) -> set[str]:
try:
Expand Down Expand Up @@ -371,24 +383,37 @@ class _ActionTracker:
default_factory=lambda: collections.defaultdict(set),
)
errored_messageids: set[int] = dataclasses.field(default_factory=set)
fully_scheduled_messageids: set[int] = dataclasses.field(default_factory=set)

def add_action(self, message_id: int, index_name: str, doc_id: str):
self.messageid_by_docid[doc_id] = message_id
self.actions_by_messageid[message_id].add((index_name, doc_id))

def action_done(self, index_name: str, doc_id: str):
_messageid = self.messageid_by_docid[doc_id]
_message_actions = self.actions_by_messageid[_messageid]
_message_actions.discard((index_name, doc_id))
def action_done(self, index_name: str, doc_id: str) -> int | None:
_messageid = self.get_message_id(doc_id)
_remaining_message_actions = self.actions_by_messageid[_messageid]
_remaining_message_actions.discard((index_name, doc_id))
# return the message id only if this was the last action for that message
return (
None
if _remaining_message_actions or (_messageid not in self.fully_scheduled_messageids)
else _messageid
)

def action_errored(self, index_name: str, doc_id: str):
_messageid = self.messageid_by_docid[doc_id]
self.errored_messageids.add(_messageid)

def done_scheduling(self, message_id: int):
self.fully_scheduled_messageids.add(message_id)

def forget_message(self, message_id: int):
del self.actions_by_messageid[message_id]

def get_message_id(self, doc_id: str):
return self.messageid_by_docid[doc_id]

def all_done_messages(self):
def remaining_done_messages(self):
for _messageid, _actions in self.actions_by_messageid.items():
if _messageid not in self.errored_messageids:
assert not _actions
Expand Down
73 changes: 60 additions & 13 deletions share/search/index_strategy/trovesearch_denorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Literal,
)

import celery
from django.conf import settings
import elasticsearch8
from primitive_metadata import primitive_rdf as rdf
Expand Down Expand Up @@ -154,15 +155,14 @@ def _paths_and_values_mappings(self):

# override method from Elastic8IndexStrategy
def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
# refresh to avoid delete-by-query conflicts
self.es8_client.indices.refresh(index=','.join(indexnames))
# delete any docs that belong to cards in this chunk but weren't touched by indexing
self.es8_client.delete_by_query(
index=list(indexnames),
query={'bool': {'must': [
{'terms': {'card.card_pk': messages_chunk.target_ids_chunk}},
{'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}},
]}},
task__delete_iri_value_scraps.apply_async(
kwargs={
'index_strategy_name': self.name,
'indexnames': list(indexnames),
'card_pks': messages_chunk.target_ids_chunk,
'timestamp': messages_chunk.timestamp,
},
countdown=settings.ELASTICSEARCH['POST_INDEX_DELAY'],
)

# abstract method from Elastic8IndexStrategy
Expand All @@ -173,12 +173,13 @@ def build_elastic_actions(self, messages_chunk: messages.MessagesChunk):
_docbuilder = self._SourcedocBuilder(_indexcard_rdf, messages_chunk.timestamp)
if not _docbuilder.should_skip(): # if skipped, will be deleted
_indexcard_pk = _indexcard_rdf.indexcard_id
for _doc_id, _doc in _docbuilder.build_docs():
_index_action = self.build_index_action(
yield _indexcard_pk, (
self.build_index_action(
doc_id=_doc_id,
doc_source=_doc,
)
yield _indexcard_pk, _index_action
for _doc_id, _doc in _docbuilder.build_docs()
)
_remaining_indexcard_pks.discard(_indexcard_pk)
# delete any that were skipped for any reason
for _indexcard_pk in _remaining_indexcard_pks:
Expand Down Expand Up @@ -279,7 +280,10 @@ def should_skip(self) -> bool:

def build_docs(self) -> Iterator[tuple[str, dict]]:
# index once without `iri_value`
yield self._doc_id(), {'card': self._card_subdoc}
yield self._doc_id(), {
'card': self._card_subdoc,
'chunk_timestamp': self.chunk_timestamp,
}
for _iri in self._fullwalk.paths_by_iri:
yield self._doc_id(_iri), {
'card': self._card_subdoc,
Expand Down Expand Up @@ -888,3 +892,46 @@ def _any_query(queries: abc.Collection[dict]):
(_query,) = queries
return _query
return {'bool': {'should': list(queries), 'minimum_should_match': 1}}


@celery.shared_task(
name='share.search.index_strategy.trovesearch_denorm.task__delete_iri_value_scraps',
max_retries=None, # retries only on delete_by_query conflicts -- should work eventually!
retry_backoff=True,
bind=True, # for explicit retry
)
def task__delete_iri_value_scraps(
task: celery.Task,
index_strategy_name: str,
card_pks: list[int],
indexnames: list[str],
timestamp: int,
):
'''followup task to delete value-docs no longer present
each time an index-card is updated, value-docs are created (or updated) for each iri value
present in the card's contents -- if some values are absent from a later update, the
corresponding docs will remain untouched
this task deletes those untouched value-docs after the index has refreshed at its own pace
(allowing a slightly longer delay for items to _stop_ matching queries for removed values)
'''
from share.search.index_strategy import get_index_strategy
_index_strategy = get_index_strategy(index_strategy_name)
assert isinstance(_index_strategy, Elastic8IndexStrategy)
# delete any docs that belong to cards in this chunk but weren't touched by indexing
_delete_resp = _index_strategy.es8_client.delete_by_query(
index=indexnames,
query={'bool': {'must': [
{'terms': {'card.card_pk': card_pks}},
{'range': {'chunk_timestamp': {'lt': timestamp}}},
]}},
params={
'slices': 'auto',
'conflicts': 'proceed', # count conflicts instead of halting
'request_cache': False,
},
)
_conflict_count = _delete_resp.get('version_conflicts', 0)
if _conflict_count > 0:
raise task.retry()
2 changes: 1 addition & 1 deletion share/search/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __init__(self, *, kombu_message=None):
def ack(self):
if self.kombu_message is None:
raise exceptions.DaemonMessageError('ack! called DaemonMessage.ack() but there is nothing to ack')
return self.kombu_message.ack()
self.kombu_message.ack()

def requeue(self):
if self.kombu_message is None:
Expand Down
10 changes: 10 additions & 0 deletions tests/share/search/index_strategy/_with_real_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ def setUp(self):
super().setUp()
self.enterContext(mock.patch('share.models.core._setup_user_token_and_groups'))
self.index_strategy = self.get_index_strategy()

def _fake_get_index_strategy(name):
if self.index_strategy.name == name:
return self.index_strategy
raise ValueError(f'unknown index strategy in test: {name}')

self.enterContext(mock.patch(
'share.search.index_strategy.get_index_strategy',
new=_fake_get_index_strategy,
))
self.index_messenger = IndexMessenger(
celery_app=celery_app,
index_strategys=[self.index_strategy],
Expand Down
24 changes: 22 additions & 2 deletions tests/share/search/index_strategy/test_trovesearch_denorm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
from share.search.index_strategy.trovesearch_denorm import TrovesearchDenormIndexStrategy
from unittest import mock

from share.search.index_strategy.trovesearch_denorm import (
TrovesearchDenormIndexStrategy,
task__delete_iri_value_scraps,
)

from . import _common_trovesearch_tests


class TestTroveIndexcardFlats(_common_trovesearch_tests.CommonTrovesearchTests):
class TestTrovesearchDenorm(_common_trovesearch_tests.CommonTrovesearchTests):
def setUp(self):
super().setUp()

# make the followup delete task eager
def _fake_apply_async(*args, **kwargs):
kwargs['countdown'] = 0 # don't wait
task__delete_iri_value_scraps.apply(*args, **kwargs)
self.enterContext(
mock.patch.object(
task__delete_iri_value_scraps,
'apply_async',
new=_fake_apply_async,
)
)

# for RealElasticTestCase
def get_index_strategy(self):
return TrovesearchDenormIndexStrategy('test_trovesearch_denorm')

0 comments on commit 317b7dc

Please sign in to comment.