diff --git a/share/search/daemon.py b/share/search/daemon.py index 017fede33..7d0024a5c 100644 --- a/share/search/daemon.py +++ b/share/search/daemon.py @@ -61,8 +61,6 @@ def start_daemonthreads_for_strategy(self, index_strategy): index_strategy=index_strategy, message_callback=_daemon.on_message, ) - # give the daemon a more robust callback for ack-ing - _daemon.ack_callback = _consumer.ensure_ack # spin up daemonthreads, ready for messages self._daemonthreads.extend(_daemon.start()) # start a thread to consume messages from this strategy's queues @@ -130,28 +128,9 @@ def consume(self, *args, **kwargs): consume = self.connection.ensure(self.connection, super().consume) return consume(*args, **kwargs) - def ensure_ack(self, daemon_message: messages.DaemonMessage): - # if the connection the message came thru is no longer usable, - # use `kombu.Connection.autoretry` to revive it for an ack - try: - daemon_message.ack() - except (ConnectionError, amqp.exceptions.ConnectionError): - @self.connection.autoretry - def _do_ack(*, channel): - try: - channel.basic_ack(daemon_message.kombu_message.delivery_tag) - finally: - channel.close() - _do_ack() - - -def _default_ack_callback(daemon_message: messages.DaemonMessage) -> None: - daemon_message.ack() - class IndexerDaemon: MAX_LOCAL_QUEUE_SIZE = 5000 - ack_callback: Callable[[messages.DaemonMessage], None] def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None): self.stop_event = ( @@ -163,7 +142,6 @@ def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None self.__daemonthread_context = daemonthread_context or contextlib.nullcontext self.__local_message_queues = {} self.__started = False - self.ack_callback = _default_ack_callback def start(self) -> list[threading.Thread]: if self.__started: @@ -192,7 +170,6 @@ def start_typed_loop_and_queue(self, message_type) -> threading.Thread: local_message_queue=_queue_from_rabbit_to_daemon, log_prefix=f'{repr(self)} MessageHandlingLoop: ', daemonthread_context=self.__daemonthread_context, - ack_callback=self.ack_callback, ) return _handling_loop.start_thread() @@ -226,7 +203,6 @@ class MessageHandlingLoop: local_message_queue: queue.Queue log_prefix: str daemonthread_context: Callable[[], contextlib.AbstractContextManager] - ack_callback: Callable[[messages.DaemonMessage], None] _leftover_daemon_messages_by_target_id = None def __post_init__(self): @@ -310,7 +286,7 @@ def _handle_some_messages(self): sentry_sdk.capture_message('error handling message', extras={'message_response': message_response}) target_id = message_response.index_message.target_id for daemon_message in daemon_messages_by_target_id.pop(target_id, ()): - self.ack_callback(daemon_message) + daemon_message.ack() # finally set it free if daemon_messages_by_target_id: # should be empty by now logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id)) sentry_sdk.capture_message( diff --git a/share/search/index_strategy/trovesearch_denorm.py b/share/search/index_strategy/trovesearch_denorm.py index 8bbbbc7c5..414362598 100644 --- a/share/search/index_strategy/trovesearch_denorm.py +++ b/share/search/index_strategy/trovesearch_denorm.py @@ -2,6 +2,7 @@ from collections import abc, defaultdict import dataclasses import functools +import itertools import json import logging import re @@ -154,16 +155,25 @@ def _paths_and_values_mappings(self): # override method from Elastic8IndexStrategy def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]): - # refresh to avoid delete-by-query conflicts - self.es8_client.indices.refresh(index=','.join(indexnames)) - # delete any docs that belong to cards in this chunk but weren't touched by indexing - self.es8_client.delete_by_query( - index=list(indexnames), - query={'bool': {'must': [ - {'terms': {'card.card_pk': messages_chunk.target_ids_chunk}}, - {'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}}, - ]}}, - ) + for _trycount in itertools.count(1): # keep trying until it works + # delete any docs that belong to cards in this chunk but weren't touched by indexing + _delete_resp = self.es8_client.delete_by_query( + index=list(indexnames), + query={'bool': {'must': [ + {'terms': {'card.card_pk': messages_chunk.target_ids_chunk}}, + {'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}}, + ]}}, + params={ + 'slices': 'auto', + 'conflicts': 'proceed', # count conflicts instead of halting + }, + ) + if _delete_resp.version_conflicts: + # refresh to avoid further conflicts and try again + self.es8_client.indices.refresh(index=','.join(indexnames)) + else: # success! + logger.debug('%s: after_chunk succeeded after %s tries', (self, _trycount)) + return # abstract method from Elastic8IndexStrategy def build_elastic_actions(self, messages_chunk: messages.MessagesChunk): diff --git a/share/search/messages.py b/share/search/messages.py index 5ba2e466a..9a816f67a 100644 --- a/share/search/messages.py +++ b/share/search/messages.py @@ -6,6 +6,8 @@ import time import typing +import amqp.exceptions + from share.search import exceptions from share.util import chunked @@ -142,7 +144,12 @@ def __init__(self, *, kombu_message=None): def ack(self): if self.kombu_message is None: raise exceptions.DaemonMessageError('ack! called DaemonMessage.ack() but there is nothing to ack') - return self.kombu_message.ack() + try: + self.kombu_message.ack() + except (ConnectionError, amqp.exceptions.ConnectionError): + # acks must be on the same channel the message was received on -- + # if the channel failed, oh well, the message already got requeued + pass def requeue(self): if self.kombu_message is None: