Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asynchronous publisher #1050

Merged
merged 13 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
TEST_API_URL: http://localhost:${{ job.services.httpbin.ports[80] }}/anything/
# For requests.post() in KingfisherProcessAPI2._post_synchronous().
run: pytest -W error -W ignore::ResourceWarning -rs --cov kingfisher_scrapy
- run: python test_delayed_request_middleware.py
- env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: coveralls --service=github
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/nonlinux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ jobs:
- env:
CI_SKIP: true
run: pytest -W error -rs --cov kingfisher_scrapy
- run: python test_delayed_request_middleware.py
1 change: 1 addition & 0 deletions kingfisher_scrapy/downloadermiddlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def process_request(self, request, spider):
if delay:
from twisted.internet import reactor

# Simulate a sleep.
d = Deferred()
reactor.callLater(delay, d.callback, None)
return d
121 changes: 61 additions & 60 deletions kingfisher_scrapy/extensions/kingfisher_process_api2.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,45 @@
import functools
import json
import logging
from urllib.parse import parse_qs, urlencode, urljoin, urlsplit
import threading
from urllib.parse import urljoin

import pika
import requests
from scrapy import signals
from scrapy.exceptions import NotConfigured
from yapw import methods
from yapw.clients import Async

from kingfisher_scrapy.items import FileError, PluckedItem


class Client(Async):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ready = False

def exchange_ready(self):
self.ready = True

def reset(self):
super().reset()
self.ready = False


class KingfisherProcessAPI2:
"""
If the ``KINGFISHER_API2_URL`` environment variable or configuration setting is set,
then messages are sent to a Kingfisher Process API for the ``item_scraped`` and ``spider_closed`` signals.
"""

ITEMS_SENT_POST = 'kingfisher_process_items_sent_post'
ITEMS_FAILED_POST = 'kingfisher_process_items_failed_post'

ITEMS_SENT_RABBIT = 'kingfisher_process_items_sent_rabbit'
ITEMS_FAILED_RABBIT = 'kingfisher_process_items_failed_rabbit'

def __init__(self, url, stats, rabbit_url, rabbit_exchange_name, rabbit_routing_key):
self.url = url
self.stats = stats
self.exchange = rabbit_exchange_name
self.routing_key = rabbit_routing_key
self.client = Client(url=rabbit_url, exchange=rabbit_exchange_name, routing_key_template="{routing_key}")

# The collection ID is set by the spider_opened handler.
self.collection_id = None

# To avoid DEBUG-level messages from pika
logging.getLogger('pika').setLevel(logging.WARNING)

# Add query string parameters to the RabbitMQ URL.
parsed = urlsplit(rabbit_url)
query = parse_qs(parsed.query)
# NOTE: Heartbeat should not be disabled.
# https://github.com/open-contracting/data-registry/issues/140
query.update({'blocked_connection_timeout': 1800, 'heartbeat': 0})
self.rabbit_url = parsed._replace(query=urlencode(query, doseq=True)).geturl()

self.open_connection_and_channel()
self.channel.exchange_declare(exchange=self.exchange, durable=True, exchange_type='direct')

def open_connection_and_channel(self):
self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbit_url))
self.channel = self.connection.channel()

@classmethod
def from_crawler(cls, crawler):
url = crawler.settings['KINGFISHER_API2_URL']
Expand Down Expand Up @@ -95,21 +86,48 @@ def spider_opened(self, spider):
response = self._post_synchronous(spider, 'api/v1/create_collection', data)

if response.ok:
from twisted.internet import reactor

self.collection_id = response.json()['collection_id']
# WARNING! If this log message is changed, then a regular expression in data_registry/cbom/task/scrape.py
# in the open-contracting/data-registry repository must be updated to match.

# WARNING! If this log message is changed, update the regular expression in the data_registry/
# process_manager/task/collect.py file in the open-contracting/data-registry repository to match.
spider.logger.info('Created collection %d in Kingfisher Process', self.collection_id)

# Connect to RabbitMQ only if a collection_id is set, as other signals don't use RabbitMQ, otherwise.
self.client.connect()

# threading.Thread(target=cb) is used, instead of reactor.callInThread(cb), because the latter is hard to
# test. The reactor needs to run for the callback to callInThread() to run. The reactor needs to stop, in
# order for a test to return. But, the reactor isn't restartable. So, testing seems impossible.
self.thread = threading.Thread(target=self.client.connection.ioloop.run_forever)
self.thread.start()

# Ensure the RabbitMQ connection is closed, if an unclean shutdown is forced.
reactor.addSystemEventTrigger('before', 'shutdown', self.disconnect_and_join)
else:
self._response_error(spider, 'Failed to create collection', response)

def disconnect_and_join(self):
"""
Closes the RabbitMQ connection and joins the client's thread.
"""
cb = functools.partial(self._when_ready, self.client.interrupt)
methods.add_callback_threadsafe(self.client.connection, cb)

# Join last, to avoid blocking before scheduling interrupt.
self.thread.join()

def spider_closed(self, spider, reason):
"""
Sends an API request to close the collection in Kingfisher Process.
"""
if spider.pluck or spider.kingfisher_process_keep_collection_open:
if not self.collection_id:
return

if not self.collection_id:
self.disconnect_and_join()

if spider.pluck or spider.kingfisher_process_keep_collection_open:
return

response = self._post_synchronous(spider, 'api/v1/close_collection', {
Expand All @@ -123,16 +141,14 @@ def spider_closed(self, spider, reason):
else:
self._response_error(spider, 'Failed to close collection', response)

self.connection.close()

def item_scraped(self, item, spider):
"""
Sends either a RabbitMQ or API request to store the file, file item or file error in Kingfisher Process.
Publishes a RabbitMQ message to store the file, file item or file error in Kingfisher Process.
"""
if isinstance(item, PluckedItem):
if not self.collection_id:
return

if not self.collection_id:
if isinstance(item, PluckedItem):
return

data = {
Expand All @@ -145,24 +161,8 @@ def item_scraped(self, item, spider):
else:
data['path'] = item['path']

for attempt in range(1, 4):
try:
self._publish_to_rabbit(data)
# This error is caused by another error, which might have been caught and logged by pika in another
# thread: for example, if RabbitMQ crashes due to insufficient memory, the connection is reset.
except pika.exceptions.ChannelWrongStateError as e:
spider.logger.warning('Retrying to publish message to RabbitMQ (failed %d times): %s', attempt, e)
self.open_connection_and_channel()
except Exception:
spider.logger.exception('Failed to publish message to RabbitMQ')
self.stats.inc_value(self.ITEMS_FAILED_RABBIT)
break
else:
self.stats.inc_value(self.ITEMS_SENT_RABBIT)
break
else:
spider.logger.error('Failed to publish message to RabbitMQ (failed 3 times)')
self.stats.inc_value(self.ITEMS_FAILED_RABBIT)
cb = functools.partial(self._when_ready, self.client.publish, data, self.routing_key)
methods.add_callback_threadsafe(self.client.connection, cb)

def _post_synchronous(self, spider, path, data):
"""
Expand All @@ -172,11 +172,12 @@ def _post_synchronous(self, spider, path, data):
spider.logger.debug('Sending synchronous request to Kingfisher Process at %s with %s', url, data)
return requests.post(url, json=data)

# This method is extracted so that it can be mocked in tests.
def _publish_to_rabbit(self, message):
# https://www.rabbitmq.com/publishers.html#message-properties
self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json'))
def _when_ready(self, function, *args):
# Scrapy can sometimes reach signals before yapw reaches exchange_ready.
if self.client.ready:
function(*args)
else:
self.client.connection.ioloop.call_soon(self._when_ready, function, *args)

def _response_error(self, spider, message, response):
spider.logger.error('%s: HTTP %d (%s) (%s)', message, response.status_code, response.text, response.headers)
12 changes: 5 additions & 7 deletions kingfisher_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
# `FilesStore` must run before `KingfisherProcessAPI2`, because the file needs to be written before the
# request is sent to Kingfisher Process.
'kingfisher_scrapy.extensions.FilesStore': 100,
'kingfisher_scrapy.extensions.KingfisherProcessAPI2': 501,
'kingfisher_scrapy.extensions.KingfisherProcessAPI2': 500,
'kingfisher_scrapy.extensions.ItemCount': 600,
'kingfisher_scrapy.extensions.DatabaseStore': 700,
}
Expand All @@ -105,9 +105,9 @@
# To send items to Kingfisher Process (version 2). If the API has basic authentication, add the username and password
# to the URL, like http://user:pass@localhost:8000
KINGFISHER_API2_URL = os.getenv('KINGFISHER_API2_URL')
RABBIT_URL = os.getenv("RABBIT_URL")
RABBIT_EXCHANGE_NAME = os.getenv("RABBIT_EXCHANGE_NAME")
RABBIT_ROUTING_KEY = os.getenv("RABBIT_ROUTING_KEY")
RABBIT_URL = os.getenv('RABBIT_URL')
RABBIT_EXCHANGE_NAME = os.getenv('RABBIT_EXCHANGE_NAME')
RABBIT_ROUTING_KEY = os.getenv('RABBIT_ROUTING_KEY')

LOG_FORMATTER = 'kingfisher_scrapy.log_formatter.LogFormatter'

Expand Down Expand Up @@ -148,7 +148,7 @@
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

# Set settings whose default value is deprecated to a future-proof value
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
REQUEST_FINGERPRINTER_IMPLEMENTATION = '2.7'

# https://docs.scrapy.org/en/latest/topics/media-pipeline.html#std:setting-FILES_STORE
FILES_STORE = os.getenv('FILES_STORE', 'data')
Expand All @@ -163,5 +163,3 @@
if os.getenv('SCRAPY_PROJECT') is None:
# https://docs.scrapy.org/en/latest/topics/commands.html#commands-module
COMMANDS_MODULE = 'kingfisher_scrapy.commands'

TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ jsonpointer
jsonschema
ocdskit[perf]
ocdsmerge
pika
psycopg2
rarfile
referencing
Expand All @@ -15,3 +14,4 @@ scrapyd
scrapyd-client
sentry-sdk
twisted
yapw[perf]
10 changes: 7 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ odfpy==1.4.1
openpyxl==3.0.5
# via flattentool
orjson==3.9.5
# via ocdskit
# via
# ocdskit
# yapw
packaging==23.1
# via
# scrapy
Expand All @@ -120,7 +122,7 @@ persistent==4.9.3
# btrees
# zodb
pika==1.2.0
# via -r requirements.in
# via yapw
protego==0.2.1
# via scrapy
psycopg2==2.9.2
Expand Down Expand Up @@ -174,7 +176,7 @@ scrapy==2.11.0
# -r requirements.in
# scrapyd
# scrapyd-client
scrapyd==1.4.2
scrapyd==1.4.3
# via -r requirements.in
scrapyd-client==1.2.3
# via -r requirements.in
Expand Down Expand Up @@ -219,6 +221,8 @@ w3lib==2.1.1
# scrapyd-client
xmltodict==0.12.0
# via flattentool
yapw[perf]==0.1.4
# via -r requirements.in
zc-lockfile==3.0.post1
# via zodb
zc-zlibstorage==1.2.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c1ba709e103721ef6d76ed17afb4e5072951192cdcefd650e6eb5303f14e2bb1 requirements.txt
c2b0cb6e87bf4abb9c3efaedee274a646f11a99f91d55cb132a68e7d0aa56ba6 requirements.txt
2 changes: 2 additions & 0 deletions requirements_dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ flake8
isort
openpyxl
pip-tools
pika
psycopg2-binary
pytest
pytest-asyncio
pytest-cov
pytest-order
pytest-twisted
10 changes: 9 additions & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ cssselect==1.1.0
# -r requirements.txt
# parsel
# scrapy
decorator==5.1.1
# via pytest-twisted
defusedxml==0.6.0
# via
# -r requirements.txt
Expand All @@ -90,6 +92,8 @@ flake8==3.7.9
# via -r requirements_dev.in
flattentool==0.24.0
# via -r requirements.txt
greenlet==3.0.3
# via pytest-twisted
hyperlink==21.0.0
# via
# -r requirements.txt
Expand Down Expand Up @@ -205,6 +209,7 @@ persistent==4.9.3
pika==1.2.0
# via
# -r requirements.txt
# -r requirements_dev.in
# yapw
pip-tools==6.13.0
# via -r requirements_dev.in
Expand Down Expand Up @@ -251,12 +256,15 @@ pytest==7.4.4
# pytest-asyncio
# pytest-cov
# pytest-order
# pytest-twisted
pytest-asyncio==0.23.3
# via -r requirements_dev.in
pytest-cov==4.1.0
# via -r requirements_dev.in
pytest-order==1.2.0
# via -r requirements_dev.in
pytest-twisted==1.14.0
# via -r requirements_dev.in
pytz==2020.1
# via
# -r requirements.txt
Expand Down Expand Up @@ -377,7 +385,7 @@ xmltodict==0.12.0
# via
# -r requirements.txt
# flattentool
yapw[perf]==0.1.3
yapw[perf]==0.1.4
# via -r requirements.txt
zc-lockfile==3.0.post1
# via
Expand Down
Loading
Loading