Skip to content

Commit

Permalink
Merge pull request #228 from scrapinghub/flush-states-task
Browse files Browse the repository at this point in the history
moving flushing of states to a separate periodical task in SW
  • Loading branch information
sibiryakov authored Nov 29, 2016
2 parents 76ab1f8 + a0d2aba commit 9952f0f
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions frontera/worker/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def __init__(self, states):
self._requests = []
self._states = states
self._fingerprints = set()
self._cache_flush_counter = 0

def to_fetch(self, requests):
if isinstance(requests, Sequence):
Expand All @@ -73,14 +72,10 @@ def release(self):
self._states.update_cache(self._requests)
self._requests = []

# Flushing states cache if needed
if self._cache_flush_counter == 30:
logger.info("Flushing states")
self._states.flush(force_clear=False)
logger.info("Flushing states finished")
self._cache_flush_counter = 0

self._cache_flush_counter += 1
def flush(self):
logger.info("Flushing states")
self._states.flush(force_clear=False)
logger.info("Flushing of states finished")


class StrategyWorker(object):
Expand Down Expand Up @@ -115,6 +110,7 @@ def __init__(self, settings, strategy_class):
self.job_id = 0
self.task = LoopingCall(self.work)
self._logging_task = LoopingCall(self.log_status)
self._flush_states_task = LoopingCall(self.flush_states)
logger.info("Strategy worker is initialized and consuming partition %d", partition_id)

def collect_unknown_message(self, msg):
Expand Down Expand Up @@ -218,18 +214,26 @@ def work(self):
self.stats['consumed_since_start'] += consumed

def run(self):
def errback(failure):
def log_failure(failure):
logger.exception(failure.value)
if failure.frames:
logger.critical(str("").join(format_tb(failure.getTracebackObject())))
self.task.start(interval=0).addErrback(errback)

def errback_main(failure):
log_failure(failure)
self.task.start(interval=0).addErrback(errback_main)

def errback_flush_states(failure):
log_failure(failure)
self._flush_states_task.start(interval=300).addErrback(errback_flush_states)

def debug(sig, frame):
logger.critical("Signal received: printing stack trace")
logger.critical(str("").join(format_stack(frame)))

self.task.start(interval=0).addErrback(errback)
self.task.start(interval=0).addErrback(errback_main)
self._logging_task.start(interval=30)
self._flush_states_task.start(interval=300).addErrback(errback_flush_states)
signal(SIGUSR1, debug)
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run()
Expand All @@ -238,6 +242,9 @@ def log_status(self):
for k, v in six.iteritems(self.stats):
logger.info("%s=%s", k, v)

def flush_states(self):
self.states_context.flush()

def stop(self):
logger.info("Closing crawling strategy.")
self.strategy.close()
Expand Down

0 comments on commit 9952f0f

Please sign in to comment.