Skip to content
This repository has been archived by the owner on Aug 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #57 from wiki-ai/celery
Browse files Browse the repository at this point in the history
Implement Celery workers for processing
  • Loading branch information
yuvipanda committed Jul 15, 2015
2 parents 82660ec + 8762c6e commit 126fcd5
Show file tree
Hide file tree
Showing 21 changed files with 629 additions and 246 deletions.
39 changes: 27 additions & 12 deletions config/ores-localdev.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,40 @@
# Top level configuration
ores:
scorers:
- enwiki
score_processor: local_celery
wsgi:
application_root: ""
url_prefix: ""

# Available scorers
scorers:
enwiki:
extractor: enwiki_api
score_cache: local_redis
scorer_models:
reverted: enwiki_revert

# Available caches
# Caching options
score_caches:
local_redis:
class: ores.score_caches.Redis
host: localhost
prefix: "ores"
prefix: ores
lru:
class: ores.score_caches.LRU

# Score processing options
score_processors:
defaults:
score_cache: local_redis
scoring_contexts:
- enwiki
local_celery:
class: ores.score_processors.Celery
BROKER_URL: redis://localhost
CELERY_RESULT_BACKEND: redis://localhost
timeout: 5 # seconds
timeout:
class: ores.score_processors.Timeout
timeout: 5 # seconds

# Scoring contexts
scoring_contexts:
enwiki:
extractor: enwiki_api
scorer_models:
reverted: enwiki_revert

# Available feature extractors
extractors:
Expand Down
1 change: 1 addition & 0 deletions ores/ores.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* label_reverted -- Labels reverted revisions
* dev_server -- Starts a development WSGI server
* celery_worker -- Starts a "ScoreProcessor" celery worker
Usage:
ores (-h | --help)
Expand Down
4 changes: 4 additions & 0 deletions ores/score_processors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .score_processor import (ScoreProcessor, ScoreResult, SimpleScoreProcessor,
SimpleScoreResult)
from .timeout import Timeout
from .celery import Celery
125 changes: 125 additions & 0 deletions ores/score_processors/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import logging

import celery

from ..score_caches import ScoreCache
from ..scoring_contexts import ScoringContext
from .score_processor import ScoreResult
from .timeout import timeout as timeout_func
from .timeout import Timeout

logger = logging.getLogger("ores.score_processors.celery")


class CeleryTimeoutResult(ScoreResult):

def __init__(self, async_result, timeout):
self.async_result = async_result
self.timeout = timeout

def get(self):
return self.async_result.get(timeout=self.timeout)


class Celery(Timeout):

def __init__(self, *args, application, **kwargs):
super().__init__(*args, **kwargs)
self.application = application

@self.application.task
def _process(context, model, cache):
scoring_context = self[context]
score = scoring_context.score(model, cache)
return score

self._process = _process

def _generate_id(self, context, model, rev_id):
scorer_model = self[context][model]
version = scorer_model.version

return ":".join(str(v) for v in [context, model, rev_id, version])

def process(self, context, model, rev_id, cache):
id_string = self._generate_id(context, model, rev_id)

result = self._process.apply_async(args=(context, model, cache),
task_id=id_string)
return CeleryTimeoutResult(result, self.timeout)

def score(self, context, model, rev_ids):
rev_ids = set(rev_ids)

# Lookup scoring results that are currently in progress
results = self._lookup_inprogress_results(context, model, rev_ids)
missing_rev_ids = rev_ids - results.keys()

# Lookup scoring results that are in the cache
scores = self._lookup_cached_scores(context, model, missing_rev_ids)
missing_rev_ids = missing_rev_ids - scores.keys()

# Generate scores for missing rev_ids
scores.update(self._score(context, model, missing_rev_ids))

# Gather results
for rev_id in results:
try:
scores[rev_id] = results[rev_id].get()
except Exception as e:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

# Return scores
return scores


def _lookup_inprogress_results(self, context, model, rev_ids):
scorer_model = self[context][model]
version = scorer_model.version

results = {}
for rev_id in rev_ids:
id_string = self._generate_id(context, model, rev_id)
try:
results[rev_id] = self._get_result(id_string)
except KeyError:
pass

return results

def _get_result(self, id_string):

# Try to get an async_result for an in_progress task
logger.debug("Checking if {0} is already being processed"
.format(repr(id_string)))
result = self._process.AsyncResult(task_id=id_string)
if result.state not in ("STARTED", "SUCCESS"):
raise KeyError(id_string)
else:
logger.debug("Found AsyncResult for {0}".format(repr(id_string)))
return result

@classmethod
def from_config(cls, config, name, section_key="score_processors"):
section = config[section_key][name]

scoring_contexts = {name: ScoringContext.from_config(config, name)
for name in section['scoring_contexts']}

if 'score_cache' in section:
score_cache = ScoreCache.from_config(config, section['score_cache'])
else:
score_cache = None

timeout = section.get('timeout')
application = celery.Celery('ores.score_processors.celery')
application.conf.update(**{k: v for k, v in section.items()
if k not in ('class', 'timeout')})

return cls(scoring_contexts, score_cache=score_cache,
application=application, timeout=timeout)
130 changes: 130 additions & 0 deletions ores/score_processors/score_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import logging
import time

import yamlconf

from ..score_caches import Empty

logger = logging.getLogger("ores.score_processors.score_processor")


class ScoreResult():
def get(self, *args, **kwargs):
raise NotImplementedError()

class ScoreProcessor(dict):

def __init__(self, scoring_contexts, score_cache=None):
super().__init__()
self.update(scoring_contexts)
self.score_cache = score_cache or Empty()

def score(self, context, model, rev_ids, caches=None):
# Remove dupes and prepares for set differences
rev_ids = set(rev_ids)

# Lookup cached scores
scores = self._lookup_cached_scores(context, model, rev_ids)
missing_rev_ids = rev_ids - scores.keys()

# Generate scores for the rest
scores.update(self._score(context, model, missing_rev_ids,
caches=caches))

return scores

def _lookup_cached_scores(self, context, model, rev_ids):
scores = {}

scorer_model = self[context][model]

# Lookup scores that are in the cache
version = scorer_model.version
for rev_id in rev_ids:
try:
score = self.score_cache.lookup(context, model, rev_id,
version=version)

logger.debug("Found cached score for {0}:{1}"
.format(model, rev_id))

scores[rev_id] = score
except KeyError:
pass

return scores

def _score(self, context, model, rev_ids, caches=None):
scores = {}

# Batch extract root datasources for features of the missing ids
scoring_context = self[context]
root_ds_caches = scoring_context.extract_roots(model, rev_ids,
caches=caches)

# Process scores for each revision using the cached data
results = {}
for rev_id in rev_ids:
error, cache = root_ds_caches[rev_id]

if error is None:
results[rev_id] = self.process(context, model, rev_id, cache)
else:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

for rev_id in results:
try:
scores[rev_id] = results[rev_id].get()
except Exception as error:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

return scores

def process(self, context, model, rev_id, cache):
raise NotImplementedError()

@classmethod
def from_config(cls, config, name, section_key="score_processors"):
logger.info("Loading ScoreProcessor '{0}' from config.".format(name))
section = config[section_key][name]
if 'module' in section:
return yamlconf.import_module(section['module'])
elif 'class' in section:
Class = yamlconf.import_module(section['class'])
return Class.from_config(config, name)


class SimpleScoreProcessor(ScoreProcessor):

def _process(self, context, model, rev_id, cache):
scoring_context = self[context]
return scoring_context.score(model, cache)

def process(self, context, model, rev_id, cache):
try:
score = self._process(context, model, rev_id, cache)
return SimpleScoreResult(score=score)
except Exception as e:
return SimpleScoreResult(error=e)

class SimpleScoreResult(ScoreResult):

def __init__(self, *, score=None, error=None):
self.score = score
self.error = error

def get(self):
if self.error is not None:
raise self.error
else:
return self.score
File renamed without changes.
14 changes: 14 additions & 0 deletions ores/score_processors/tests/test_score_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from nose.tools import eq_, raises

from ..score_processor import SimpleScoreResult


def test_simple_score_result():
ssr = SimpleScoreResult(score=5)
eq_(ssr.get(), 5)


@raises(RuntimeError)
def test_simple_score_error():
ssr = SimpleScoreResult(error=RuntimeError())
eq_(ssr.get(), 5)
54 changes: 54 additions & 0 deletions ores/score_processors/tests/test_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import time
from collections import namedtuple

from nose.tools import eq_, raises
from revscoring.dependencies import Context
from revscoring.features import Feature
from revscoring.scorer_models import ScorerModel

from ...scoring_contexts import ScoringContext
from ..timeout import Timeout, TimeoutError

wait_time = Feature("wait_time", returns=float)


class FakeSM(ScorerModel):

def __init__(self):
self.features = [wait_time]
self.language = None
self.version = None

def score(self, feature_values):
raise NotImplementedError()

class FakeSC(ScoringContext):

def solve(self, model, cache):
return cache

def score(self, model, cache):
wait_time_value = cache[wait_time]
time.sleep(wait_time_value)
return {'score': True}

def extract_roots(self, model, rev_ids, caches=None):
return {rev_id: (None, caches[rev_id]) for rev_id in rev_ids}


def test_score():
fakewiki = FakeSC("fakewiki", {'fake': FakeSM()}, None)
score_processor = Timeout({'fakewiki': fakewiki}, timeout=0.10)

scores = score_processor.score("fakewiki", "fake", [1],
caches={1: {wait_time: 0.05}})
eq_(scores, {1: {'score': True}})

def test_timeout():
fakewiki = FakeSC("fakewiki", {'fake': FakeSM()}, None)
score_processor = Timeout({'fakewiki': fakewiki}, timeout=0.05)

scores = score_processor.score("fakewiki", "fake", [1],
caches={1: {wait_time: 0.10}})
assert 'error' in scores[1]
assert 'Timed out after' in scores[1]['error']['message']
Loading

0 comments on commit 126fcd5

Please sign in to comment.