Skip to content

Commit

Permalink
Merge pull request #169 from scrapinghub/finishing-and-options
Browse files Browse the repository at this point in the history
Proper finishing by Crawling Strategy + traceback on USR1 + Batch size opts
  • Loading branch information
sibiryakov authored Jun 29, 2016
2 parents 2491ffa + cd31bdf commit 62d9e49
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 11 deletions.
19 changes: 15 additions & 4 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,29 @@ Default: ``frontera.contrib.canonicalsolvers.Basic``
The :class:`CanonicalSolver <frontera.core.components.CanonicalSolver>` to be used by the frontier for resolving
canonical URLs. For more info see :ref:`Canonical URL Solver <canonical-url-solver>`.

.. setting:: CONSUMER_BATCH_SIZE
.. setting:: SPIDER_LOG_CONSUMER_BATCH_SIZE

CONSUMER_BATCH_SIZE
-------------------
SPIDER_LOG_CONSUMER_BATCH_SIZE
------------------------------

Default: ``512``

This is a batch size used by strategy and db workers for consuming of spider log and scoring log streams. Increasing it
This is a batch size used by strategy and db workers for consuming of spider log stream. Increasing it
will cause worker to spend more time on every task, but processing more items per task, therefore leaving less time for
other tasks during some fixed time interval. Reducing it will result to running several tasks withing the same time
interval, but with less overall efficiency. Use it when your consumers too slow, or too fast.

.. setting:: SCORING_LOG_CONSUMER_BATCH_SIZE

SCORING_LOG_CONSUMER_BATCH_SIZE
-------------------------------

Default: ``512``

This is a batch size used by db worker for consuming of scoring log stream. Use it when you need to adjust scoring log
consumption speed.


.. setting:: CRAWLING_STRATEGY

CRAWLING_STRATEGY
Expand Down
3 changes: 2 additions & 1 deletion frontera/settings/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
AUTO_START = True
BACKEND = 'frontera.contrib.backends.memory.FIFO'
CANONICAL_SOLVER = 'frontera.contrib.canonicalsolvers.Basic'
CONSUMER_BATCH_SIZE = 512
DELAY_ON_EMPTY = 5.0
DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1'

Expand Down Expand Up @@ -34,6 +33,8 @@
RESPONSE_MODEL = 'frontera.core.models.Response'

SCORING_PARTITION_ID = 0
SCORING_LOG_CONSUMER_BATCH_SIZE = 512
SPIDER_LOG_CONSUMER_BATCH_SIZE = 512
SPIDER_LOG_PARTITIONS = 1
SPIDER_FEED_PARTITIONS = 1
SPIDER_PARTITION_ID = 0
Expand Down
14 changes: 11 additions & 3 deletions frontera/worker/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
import logging
from traceback import format_stack
from signal import signal, SIGUSR1
from logging.config import fileConfig
from argparse import ArgumentParser
from time import asctime
Expand Down Expand Up @@ -81,7 +83,8 @@ def __init__(self, settings, no_batches, no_incoming):
else:
self.strategy_enabled = False

self.consumer_batch_size = settings.get('CONSUMER_BATCH_SIZE')
self.spider_log_consumer_batch_size = settings.get('SPIDER_LOG_CONSUMER_BATCH_SIZE')
self.scoring_log_consumer_batch_size = settings.get('SCORING_LOG_CONSUMER_BATCH_SIZE')
self.spider_feed_partitioning = 'fingerprint' if not settings.get('QUEUE_HOSTNAME_PARTITIONING') else 'hostname'
self.max_next_requests = settings.MAX_NEXT_REQUESTS
self.slot = Slot(self.new_batch, self.consume_incoming, self.consume_scoring, no_batches,
Expand All @@ -98,8 +101,13 @@ def set_process_info(self, process_info):
self.process_info = process_info

def run(self):
def debug(sig, frame):
logger.critical("Signal received: printing stack trace")
logger.critical(str("").join(format_stack(frame)))

self.slot.schedule(on_start=True)
self._logging_task.start(30)
signal(SIGUSR1, debug)
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run()

Expand All @@ -119,7 +127,7 @@ def enable_new_batches(self):

def consume_incoming(self, *args, **kwargs):
consumed = 0
for m in self.spider_log_consumer.get_messages(timeout=1.0, count=self.consumer_batch_size):
for m in self.spider_log_consumer.get_messages(timeout=1.0, count=self.spider_log_consumer_batch_size):
try:
msg = self._decoder.decode(m)
except (KeyError, TypeError), e:
Expand Down Expand Up @@ -178,7 +186,7 @@ def consume_scoring(self, *args, **kwargs):
consumed = 0
seen = set()
batch = []
for m in self.scoring_log_consumer.get_messages(count=self.consumer_batch_size):
for m in self.scoring_log_consumer.get_messages(count=self.scoring_log_consumer_batch_size):
try:
msg = self._decoder.decode(m)
except (KeyError, TypeError), e:
Expand Down
14 changes: 11 additions & 3 deletions frontera/worker/strategy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from time import asctime
import logging
from traceback import format_stack
from signal import signal, SIGUSR1
from logging.config import fileConfig
from argparse import ArgumentParser
from os.path import exists
Expand Down Expand Up @@ -100,7 +102,7 @@ def __init__(self, settings, strategy_class):
self.update_score = UpdateScoreStream(self._encoder, self.scoring_log_producer, 1024)
self.states_context = StatesContext(self._manager.backend.states)

self.consumer_batch_size = settings.get('CONSUMER_BATCH_SIZE')
self.consumer_batch_size = settings.get('SPIDER_LOG_CONSUMER_BATCH_SIZE')
self.strategy = strategy_class.from_worker(self._manager, self.update_score, self.states_context)
self.states = self._manager.backend.states
self.stats = {
Expand Down Expand Up @@ -191,8 +193,8 @@ def work(self):
logger.info("Successfully reached the crawling goal.")
logger.info("Closing crawling strategy.")
self.strategy.close()
logger.info("Exiting.")
exit(0)
logger.info("Finishing.")
reactor.callFromThread(reactor.stop)

self.stats['last_consumed'] = consumed
self.stats['last_consumption_run'] = asctime()
Expand All @@ -202,8 +204,14 @@ def run(self):
def errback(failure):
logger.exception(failure.value)
self.task.start(interval=0).addErrback(errback)

def debug(sig, frame):
logger.critical("Signal received: printing stack trace")
logger.critical(str("").join(format_stack(frame)))

self.task.start(interval=0).addErrback(errback)
self._logging_task.start(interval=30)
signal(SIGUSR1, debug)
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run()

Expand Down

0 comments on commit 62d9e49

Please sign in to comment.