Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I have done some workaround across implementing ActiveMq to the messagebus so would like to contribute #350

Open
wants to merge 332 commits into
base: exception-dbw
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
332 commits
Select commit Hold shift + click to select a range
e57c763
adding zmq broker output
sibiryakov Nov 1, 2017
baed69e
more work
sibiryakov Nov 1, 2017
8c5f638
adding cityhash to test reqs
sibiryakov Nov 1, 2017
b050066
enabling all tests
sibiryakov Nov 1, 2017
e8dcd48
converting row prefix to bytes
sibiryakov Nov 2, 2017
90b3530
style
sibiryakov Nov 2, 2017
5e685c7
Merge pull request #299 from scrapinghub/headers-etc
sibiryakov Nov 2, 2017
679c8c0
add message type logging and time for batch
Preetwinder Mar 15, 2017
312dd58
smaller poll intervals
sibiryakov Mar 23, 2017
da3d0c7
more logging in offset-fetcher
sibiryakov Mar 27, 2017
87f9e53
logging contents of e
sibiryakov Mar 27, 2017
bec1877
change err message and raising kafka log level to INFO
sibiryakov Mar 28, 2017
c2e57a6
fix of 'Future' object is not iterable
sibiryakov Mar 30, 2017
1ccbd42
simplifying logging configuration
sibiryakov May 29, 2017
a271206
Update hbase.py
clarksun Dec 27, 2017
b69e318
maybe miss b?
clarksun Dec 27, 2017
fbc6fc2
update BCPerHostLimit in cluster example
clarksun Dec 27, 2017
4536702
Merge pull request #305 from scrapinghub/logging-improvement
sibiryakov Jan 12, 2018
6a7f061
Merge pull request #302 from scrapinghub/per-type-worker-stats
sibiryakov Jan 12, 2018
3c07c5d
Merge pull request #304 from scrapinghub/logging-async-fetcher
sibiryakov Jan 12, 2018
6a3e824
Merge pull request #303 from scrapinghub/smaller-kafka-poll-intervals
sibiryakov Jan 12, 2018
1fe7b4c
Merge pull request #306 from clarksun/patch-1
sibiryakov Jan 24, 2018
90f685a
Merge pull request #307 from clarksun/patch-2
sibiryakov Jan 24, 2018
3cbfd67
Merge pull request #309 from clarksun/patch-4
sibiryakov Jan 24, 2018
cbeee88
hbase states table option
sibiryakov Mar 15, 2017
9d90f6b
reducing logging verbosity
sibiryakov Jan 24, 2018
db17e76
Merge pull request #301 from scrapinghub/states-table
sibiryakov Jan 24, 2018
285e03b
Send crawl stats to Kafka message bus
vshlapakov Jun 27, 2017
3c16f22
Add Mesos task id as a tag for DB worker metrics
vshlapakov Jul 10, 2017
5aaff5f
Send backend stats to message bus
vshlapakov Jul 10, 2017
c296a86
No need to setup stats log partitions
vshlapakov Jul 26, 2017
ea8fb18
Decrease log-level for stats producer
vshlapakov Aug 2, 2017
7589208
lru cache for HBaseState
vshlapakov Jul 12, 2017
457b8ee
codecs tests fixed
sibiryakov Jan 30, 2018
5d4d98a
other tests fix
sibiryakov Jan 30, 2018
e5b946c
moving stats to worker submodule
sibiryakov Jan 30, 2018
852bef5
base stats log, zmq stats log
sibiryakov Jan 30, 2018
a0fe937
Merge pull request #313 from scrapinghub/lru-cache-states
sibiryakov Jan 30, 2018
e44f827
Merge pull request #312 from scrapinghub/stats-log
sibiryakov Jan 30, 2018
500055f
Split DB worker to components based on threads
vshlapakov Jul 14, 2017
ff6c7dd
Fix attribute error related with job_id
vshlapakov Jul 24, 2017
469c64a
Run only batchgen via thread
vshlapakov Jul 24, 2017
5d7b1e6
Polish changes and fix minor issues
vshlapakov Jul 24, 2017
198e3b9
Minor code simplification
vshlapakov Jul 24, 2017
5923e21
Initial job_id must be 0 for db worker
vshlapakov Jul 24, 2017
d14c4c8
Smarted backoff for batchgen component
vshlapakov Jul 26, 2017
f6ed24f
Do back-off if no requests to schedule
vshlapakov Jul 26, 2017
deb3617
max_request_size is set to 4Mb
sibiryakov Jul 24, 2017
d5b1cc7
Update kafkabus producers settings
vshlapakov Aug 3, 2017
1b7951a
Move values to constants
vshlapakov Aug 3, 2017
b0e039e
more precise scoring, encoding from scrapy, hash for request
vshlapakov Jul 26, 2017
66941c9
changing scrapy test website
sibiryakov Apr 6, 2018
257ee93
ov buffer test fix
sibiryakov Apr 6, 2018
eed086e
Merge pull request #316 from scrapinghub/misc
sibiryakov Apr 6, 2018
637012b
making Kafka mb to support latest kafka-python
sibiryakov Apr 9, 2018
ccd554e
Merge pull request #315 from scrapinghub/kafka-params-up
sibiryakov Apr 9, 2018
3a8e2d6
string formatting corrected in the connection information being logged
akshayphilar Apr 12, 2018
0908e03
db worker test fix
sibiryakov Apr 23, 2018
90d4616
Merge pull request #314 from scrapinghub/dbw-components-teardown
sibiryakov Apr 23, 2018
786c2cd
Setting for splitting partitions b/w batchgens
vshlapakov Aug 2, 2017
4093cb3
conflicts resolved
vshlapakov Aug 2, 2017
343fe31
Config option to pass partitions
vshlapakov Aug 2, 2017
b4547c7
No need new setting
vshlapakov Aug 2, 2017
c44fe05
Minor renaming
vshlapakov Aug 3, 2017
7a1dee9
Wrap sending message logic
vshlapakov Aug 3, 2017
eaee5a4
Catch and print shutdown errors
vshlapakov Sep 6, 2017
426c1bc
Non-exhausted generators must be closed manually
vshlapakov Aug 11, 2017
e902ad3
conflicts resolved
vshlapakov Aug 21, 2017
7cb12c0
Provide SW states cache hits ratio
vshlapakov Aug 24, 2017
40a2b78
Provide flushed batches count for SW states
vshlapakov Aug 24, 2017
a3762f2
Simplify state stats using with a variable
vshlapakov Aug 24, 2017
28139a2
Provide stats for SW state cache evictions
vshlapakov Sep 6, 2017
a4d65bb
Skip links-extracted when reached limit for domain
vshlapakov Sep 1, 2017
c70b0ad
no need to flush on every batch
sibiryakov Sep 13, 2017
c3ce7cc
create_request method added to manager, etc
sibiryakov Oct 5, 2017
01b3b73
removed domain info logic from hbase.queue, domain mw enabled by default
sibiryakov Oct 5, 2017
2060f0c
enabling domain fingerprint mw by default
sibiryakov Oct 5, 2017
bea71e8
strategy tests fixed
sibiryakov May 8, 2018
c6e074f
using unittest
sibiryakov May 8, 2018
0bd0255
tests fix
sibiryakov May 8, 2018
389396a
speeding up serialization
sibiryakov May 8, 2018
8e4ffc3
Merge pull request #327 from scrapinghub/create-req-manager
sibiryakov May 8, 2018
6773404
test case
sibiryakov May 8, 2018
95170ca
Merge pull request #324 from scrapinghub/hbase-state-cache
sibiryakov May 8, 2018
117f341
Merge pull request #326 from scrapinghub/no-flush-scoring-log
sibiryakov May 8, 2018
c0c6920
Merge pull request #323 from scrapinghub/generators-pypy
sibiryakov May 8, 2018
47004eb
Merge pull request #322 from scrapinghub/log-improve-batchgen-sw
sibiryakov May 8, 2018
d402216
Merge pull request #321 from scrapinghub/part-split-btw-batchgen
sibiryakov May 8, 2018
bbfcfba
Merge pull request #320 from scrapinghub/log-connected-host
sibiryakov May 8, 2018
184a719
Merge branch 'master' into pre-filter-of-links-ext
sibiryakov May 8, 2018
930a96f
Merge pull request #325 from scrapinghub/pre-filter-of-links-ext
sibiryakov May 8, 2018
9d6ad05
removed ambiguous redirect logic
sibiryakov Nov 24, 2017
a821652
retries budget added in offsets fetch
sibiryakov Dec 25, 2017
a6057e9
fix of partitioning in batch-gen
sibiryakov Jan 30, 2018
8d6e7b1
domain stats in batch gen
sibiryakov Feb 7, 2018
88cdd46
fixes
sibiryakov Feb 7, 2018
a868aeb
hbase refactor, first DomainCache with second gen draft
sibiryakov Feb 12, 2018
6590a1c
converting keys to native str
sibiryakov Feb 13, 2018
5e4f086
set serialization fix
sibiryakov Feb 13, 2018
0542494
flush fix and dosctring
sibiryakov Feb 13, 2018
c12f021
crash fix for py3
sibiryakov Feb 16, 2018
1ed0be6
improved error output
sibiryakov Feb 16, 2018
1a627d1
add seeds run mode
sibiryakov Mar 16, 2018
47a0012
overused buffer settings
sibiryakov Mar 22, 2018
48fffd4
passing args to run()
sibiryakov Mar 23, 2018
514e5d3
removed seeds addition through spider log
sibiryakov Mar 23, 2018
825fee0
codecs test fixed
sibiryakov Mar 23, 2018
92def37
renaming add_seeds to read_seeds(file)
sibiryakov Mar 23, 2018
5146876
fixed reading of seeds
sibiryakov Mar 23, 2018
033fcc5
s3 dependency
sibiryakov Mar 23, 2018
27985a0
docs
sibiryakov Apr 10, 2018
66347c2
slot for hbase queue partitioning
sibiryakov Apr 20, 2018
fc4dee3
support of non-iterable argument
sibiryakov Apr 26, 2018
b809725
documented None values
sibiryakov May 8, 2018
1e59bf5
DEFAULT_HBASE_THRIFT_FRAME_SIZE constant
sibiryakov May 10, 2018
2b1ee27
check for a key presence before deletion
sibiryakov May 10, 2018
2211ede
disabling scrapy test
sibiryakov May 10, 2018
7afb27e
codecs tests are fixed
sibiryakov May 10, 2018
f1b6e01
overusedbuffer test cases are fixed
sibiryakov May 10, 2018
b0be022
more work to codecs fix
sibiryakov May 11, 2018
017b70e
batch_size as argument and bug fix + new test case
sibiryakov May 11, 2018
4e81956
table creation
sibiryakov May 11, 2018
167ba3e
removed add_seeds
sibiryakov May 11, 2018
775dfbb
crawling strategy test fix
sibiryakov May 11, 2018
5f5d29f
table presence fix
sibiryakov May 11, 2018
662faad
add seeds run mode test, sw test fix
sibiryakov May 11, 2018
934b2f8
memory backends tests fix
sibiryakov May 11, 2018
95c46a3
don't use hbase namespace for testing
sibiryakov May 11, 2018
d9464c5
strategy test case fix
sibiryakov May 11, 2018
92f3f54
removed add_seeds
sibiryakov May 14, 2018
1a4f8c8
syntax
sibiryakov May 14, 2018
da87cbd
Merge pull request #330 from scrapinghub/test-fix2
sibiryakov May 14, 2018
0ffb2ab
first draft version
sibiryakov May 15, 2018
d701f55
batched workflow extracted
sibiryakov May 23, 2018
4a27212
more work
sibiryakov May 23, 2018
c6c159b
FrontierManager refactoring, more to come
sibiryakov May 23, 2018
fde25fc
proper init hierarchy
sibiryakov May 25, 2018
ec47475
import, frontier_manager and strategy fixes
sibiryakov May 28, 2018
d6c2e5d
SW test case fixes
sibiryakov May 28, 2018
3affb12
tests fixed
sibiryakov May 30, 2018
32069fe
tests fixed
sibiryakov May 30, 2018
214d345
backend tests fixes
sibiryakov May 30, 2018
38de8de
default crawling strategy
sibiryakov May 31, 2018
bcc3d75
scrapy mws test fix
sibiryakov May 31, 2018
ac0da26
dbw test cases fix
sibiryakov May 31, 2018
0ac5f9c
memory backends test fixes
sibiryakov May 31, 2018
d9b1d64
Redis test case fix
sibiryakov May 31, 2018
1de3c4b
syntax
sibiryakov May 31, 2018
90484e4
more work on style
sibiryakov May 31, 2018
986fad4
fix of add seeds utility
sibiryakov May 31, 2018
2e04254
fixing distributed run mode
sibiryakov Jun 1, 2018
b90275b
auto_start/test_mode attrs refactor
sibiryakov Jun 4, 2018
7b1d3fb
move of BaseCrawlingStrategy, BFS/DFS strategy
sibiryakov Jun 18, 2018
42722d1
seeds have to be injected using crawling strategy
sibiryakov Jun 18, 2018
cd8d95e
new CS guide, and single mode quick start
sibiryakov Jun 18, 2018
df214c6
removed obsolete test
sibiryakov Jul 12, 2018
ebd8532
new path for a crawling strategy
sibiryakov Jul 12, 2018
b37e7d2
style
sibiryakov Jul 12, 2018
bca36a8
DomainMetadata interface
sibiryakov Jul 13, 2018
524dd33
using DomainMetadata in hbase DomainCache
sibiryakov Jul 13, 2018
943231b
SQLA DomainMetadata storage
sibiryakov Jul 13, 2018
7c1272a
moved domain cache test
sibiryakov Jul 13, 2018
f7044ac
DomainCache is instantiated in HBaseBackend now
sibiryakov Jul 16, 2018
1f1f4a1
removing old memory backends
sibiryakov Jul 16, 2018
276da48
removing old sqlalchemy backends
sibiryakov Jul 16, 2018
f2ba99c
pointing to existing memory backend
sibiryakov Jul 16, 2018
8865daf
backends docs update
sibiryakov Jul 20, 2018
f5da2ec
local mode instantiation
sibiryakov Jul 20, 2018
4028a36
error fix
sibiryakov Jul 20, 2018
61efc5e
style
sibiryakov Jul 20, 2018
97fe2ab
fix of references in docs
sibiryakov Jul 23, 2018
cb999cc
fixes of single process sqlalchemy tutorial
sibiryakov Jul 23, 2018
b8c1cac
Discovery crawling strategy
sibiryakov Jul 23, 2018
e339e49
removed distributed spiders run mode
sibiryakov Jul 23, 2018
b3247fc
crawling strategy docs
sibiryakov Jul 23, 2018
ca53383
style mainly
sibiryakov Jul 23, 2018
d8c9c3a
some mistakes corrected
sibiryakov Jul 24, 2018
ae138ed
putting the code under name=main
sibiryakov Jul 24, 2018
2faf280
is -> ==
sibiryakov Jul 24, 2018
fc161f6
docstring
sibiryakov Jul 24, 2018
8e13eb1
page_error -> request_error
sibiryakov Jul 24, 2018
a6d1c37
states tests extraction
sibiryakov Jul 24, 2018
2c982d2
new hbase states test
sibiryakov Jul 24, 2018
2dffcab
fix
sibiryakov Jul 24, 2018
17c4489
moving states tester
sibiryakov Jul 24, 2018
26eff5e
style
sibiryakov Jul 24, 2018
ed5c4af
using fixtures
sibiryakov Jul 24, 2018
a94ee78
sqlalchemy states
sibiryakov Jul 24, 2018
5631001
test fix
sibiryakov Jul 24, 2018
4205ea2
table creation
sibiryakov Jul 24, 2018
b46600b
updated tests
sibiryakov Jul 25, 2018
dffa5e7
style
sibiryakov Jul 25, 2018
19025cd
test fix
sibiryakov Jul 25, 2018
b31cbf1
frontera-scrapy test is enabled
sibiryakov Jul 25, 2018
95c34f2
fix warning
sibiryakov Jul 25, 2018
dbb51d7
skipping scrapy spider for now
sibiryakov Jul 25, 2018
6ad1a4c
renaming modules to support py3.7
sibiryakov Jul 25, 2018
03a5f91
to the previous commit
sibiryakov Jul 25, 2018
8ac2316
Merge pull request #331 from scrapinghub/strategy-in-manager
sibiryakov Jul 25, 2018
f91dd7a
updated with latest changes
sibiryakov Jul 25, 2018
a26a0a9
version bump
sibiryakov Jul 25, 2018
dbf15b0
Update README.md
sibiryakov Jul 25, 2018
fb05073
some refactor + devnull producer for stats
sibiryakov Jul 30, 2018
4a8f0c1
docs + example config update
sibiryakov Jul 30, 2018
264ad76
Merge pull request #341 from scrapinghub/fix-general-spider
sibiryakov Jul 30, 2018
51526c1
local_mode option
sibiryakov Jul 30, 2018
8830ce2
Merge pull request #342 from scrapinghub/fix-docs
sibiryakov Jul 30, 2018
7b6717d
version bump
sibiryakov Jul 30, 2018
fff3617
Remove invalid keyword argument 'sleep' from kafka pull
icapurro Aug 21, 2018
4fb16c9
Removed offset unit test
icapurro Aug 28, 2018
bd3211b
Fix the redis backend
vfcosta Sep 11, 2018
bd781e5
Merge pull request #347 from vfcosta/fix-redis
sibiryakov Sep 11, 2018
0b245ef
Merge pull request #345 from icapurro/fix-kafka-pull
sibiryakov Sep 12, 2018
246575a
Merge branch 'master' of https://github.com/scrapinghub/crawl-frontier
sibiryakov Nov 2, 2018
0c8c58b
scrapy role, overview update
sibiryakov Nov 2, 2018
037c3b4
Update cluster-setup docs
guillermoap Nov 14, 2018
e1a4ca9
Merge pull request #351 from guillermoap/update_docs
sibiryakov Nov 15, 2018
f5ef109
Docs: BaseCrawlingStrategy._refresh_states() also takes a non iterab…
IAlwaysBeCoding Nov 29, 2018
0c81482
Update frontera/strategy/__init__.py
sibiryakov Nov 30, 2018
94886b3
Merge pull request #355 from IAlwaysBeCoding/IAlwaysBeCoding-_refresh…
sibiryakov Dec 3, 2018
dfdc396
removing call to absent API in happybase
sibiryakov Jan 14, 2019
424f569
Update after redis update
Jan 21, 2019
e418807
fixed DomainCache crash on init
sibiryakov Apr 3, 2019
2e23f27
Merge pull request #365 from scrapinghub/domain-cache-crash-fix
sibiryakov Apr 3, 2019
f20de81
Merge pull request #361 from khellan/redis_fix
sibiryakov Apr 5, 2019
45576af
fixing Cache.__init__ signature changes
sibiryakov Apr 5, 2019
22d3bc5
more work
sibiryakov Apr 5, 2019
fd294e0
Merge pull request #366 from scrapinghub/test-fix3
sibiryakov Apr 5, 2019
9bb47c9
explicit cluster metadata bootstrap in Kafka client
sibiryakov Apr 12, 2019
4e6f547
syntax fixes
sibiryakov Apr 12, 2019
1e951c6
line breaks
sibiryakov Apr 12, 2019
5762a26
notimplemented->error
sibiryakov Apr 12, 2019
befc866
update models.py
Jul 3, 2019
2457357
Update models.py
Jul 3, 2019
13efd27
Update models.py
Jul 3, 2019
02326d2
Merge pull request #375 from liho00/liho00-patch-1
sibiryakov Jul 5, 2019
59050bf
Bump to Python 3.6
Oct 23, 2019
f05104e
Use thriftpy2
Oct 23, 2019
e4885cb
Add Python 3.7
Oct 23, 2019
4a4d8b8
Make test_zmq_message_bus() flaky
Oct 23, 2019
4e49f76
Add .env to gitignore
Oct 23, 2019
5c5972b
Add badges
Oct 24, 2019
482b4d5
Add pyversions badge
Oct 24, 2019
e6bb8da
Rename to "python versions"
Oct 24, 2019
068f169
Merge pull request #381 from suzil/cleanup/add-badges
sibiryakov Oct 25, 2019
c5a7001
Move thriftpy2 to test_requires
Oct 25, 2019
ebb4d32
Also include thriftpy2 as an hbase requirement
Oct 25, 2019
b98accd
Merge pull request #379 from suzil/bump/py36
sibiryakov Oct 25, 2019
d5f5905
add assertion error for easier debugging
a-shkarupin Nov 29, 2019
84f9e10
Merge pull request #389 from a-shkarupin/assertion_message
sibiryakov Nov 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
FrontierManager refactoring, more to come
  • Loading branch information
sibiryakov committed May 23, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit c6c159b23e18631a4dc141d0f9b0a80e4b8d9a6c
256 changes: 193 additions & 63 deletions frontera/core/manager.py
Original file line number Diff line number Diff line change
@@ -13,9 +13,67 @@
from frontera.utils.misc import load_object


class ComponentsPipelineMixin(object):
def __init__(self, backend, strategy_class, strategy_args, middlewares=None, canonicalsolver=None, db_worker=False,
strategy_worker=False):
class BackendMixin(object):
def __init__(self, backend, db_worker=False, strategy_worker=False):
# Load backend
self._logger_components.debug("Loading backend '%s'", backend)
self._backend = self._load_backend(backend, db_worker, strategy_worker)
self._backend.frontier_start()

def _load_backend(self, backend, db_worker, strategy_worker):
# FIXME remove obsolete
cls = load_object(backend)
assert issubclass(cls, Backend), "backend '%s' must subclass Backend" % cls.__name__
if issubclass(cls, DistributedBackend):
if db_worker:
return cls.db_worker(self)
if strategy_worker:
return cls.strategy_worker(self)
return cls.local(self)
else:
assert not strategy_worker, "In order to distribute backend only DistributedBackend " \
"subclasses are allowed to use"
if hasattr(cls, 'from_manager'):
return cls.from_manager(self)
else:
return cls()

@property
def backend(self):
"""
The :class:`Backend <frontera.core.components.Backend>` object to be used by the frontier. \
Can be defined with :setting:`BACKEND` setting.
"""
return self._backend

def close(self):
self.backend.frontier_stop()


class StrategyMixin(object):
def __init__(self, strategy_class, strategy_args, scoring_stream):
self._scoring_stream = scoring_stream if scoring_stream else LocalUpdateScoreStream(self.backend.queue)
self._states_context = StatesContext(self.backend.states)
if isinstance(strategy_class, str):
strategy_class = load_object(strategy_class)
self._strategy = strategy_class.from_worker(self, strategy_args, self._scoring_stream, self._states_context)

@property
def strategy(self):
return self._strategy

@property
def states_context(self):
return self._states_context

def close(self):
self.strategy.close()
self.states_context.flush()


class ComponentsPipelineMixin(BackendMixin, StrategyMixin):
def __init__(self, backend, strategy_class, strategy_args, scoring_stream, middlewares=None, canonicalsolver=None,
db_worker=False, strategy_worker=False):
self._logger_components = logging.getLogger("manager.components")

# Load middlewares
@@ -26,18 +84,10 @@ def __init__(self, backend, strategy_class, strategy_args, middlewares=None, can
self._canonicalsolver = self._load_object(canonicalsolver)
assert isinstance(self.canonicalsolver, CanonicalSolver), \
"canonical solver '%s' must subclass CanonicalSolver" % self.canonicalsolver.__class__.__name__

# Load backend
self._logger_components.debug("Loading backend '%s'", backend)
self._backend = self._load_backend(backend, db_worker, strategy_worker)
self._backend.frontier_start()

# Instantiate strategy
self._scoring_stream = LocalUpdateScoreStream(self.backend.queue)
self._states_context = StatesContext(self.backend.states)
if isinstance(strategy_class, str):
strategy_class = load_object(strategy_class)
self._strategy = strategy_class.from_worker(self, strategy_args, self._scoring_stream, self._states_context)
BackendMixin.__init__(self, backend, db_worker, strategy_worker)
if not db_worker:
# TODO Distributed Scrapy case
StrategyMixin.__init__(self, strategy_class, strategy_args, scoring_stream)

@property
def canonicalsolver(self):
@@ -54,36 +104,6 @@ def middlewares(self):
"""
return self._middlewares

@property
def backend(self):
"""
The :class:`Backend <frontera.core.components.Backend>` object to be used by the frontier. \
Can be defined with :setting:`BACKEND` setting.
"""
return self._backend

@property
def strategy(self):
return self._strategy

def _load_backend(self, backend, db_worker, strategy_worker):
# FIXME remove obsolete
cls = load_object(backend)
assert issubclass(cls, Backend), "backend '%s' must subclass Backend" % cls.__name__
if issubclass(cls, DistributedBackend):
if db_worker:
return cls.db_worker(self)
if strategy_worker:
return cls.strategy_worker(self)
return cls.local(self)
else:
assert not strategy_worker, "In order to distribute backend only DistributedBackend " \
"subclasses are allowed to use"
if hasattr(cls, 'from_manager'):
return cls.from_manager(self)
else:
return cls()

def _load_middlewares(self, middleware_names):
# TO-DO: Use dict for middleware ordering
mws = []
@@ -131,9 +151,8 @@ def _process_component(self, component, method_name, component_category, obj, re
return return_obj

def close(self):
self.strategy.close()
self._states_context.flush()
self.backend.frontier_stop()
StrategyMixin.close(self)
BackendMixin.close(self)


class BaseManager(object):
@@ -204,7 +223,7 @@ def settings(self):
return self._settings


class FrontierManager(BaseManager, ComponentsPipelineMixin):
class LocalFrontierManager(BaseManager, ComponentsPipelineMixin):
"""
The :class:`FrontierManager <frontera.core.manager.FrontierManager>` object encapsulates the whole frontier,
providing an API to interact with. It's also responsible of loading and communicating all different frontier
@@ -297,7 +316,7 @@ def from_settings(cls, settings=None, db_worker=False, strategy_worker=False):
:ref:`frontier default settings <frontier-default-settings>` are used.
"""
manager_settings = Settings.object_from(settings)
return FrontierManager(request_model=manager_settings.REQUEST_MODEL,
return LocalFrontierManager(request_model=manager_settings.REQUEST_MODEL,
response_model=manager_settings.RESPONSE_MODEL,
backend=manager_settings.BACKEND,
strategy_class=manager_settings.STRATEGY,
@@ -473,11 +492,11 @@ def page_crawled(self, response):
(self.response_model.__name__, type(response).__name__)
self._states_context.to_fetch(response)
self._states_context.fetch()
self._states_context._states.set_states(response)
self._states_context.states.set_states(response)
self._process_components(method_name='page_crawled',
obj=response,
return_classes=self.response_model)
self._states_context._states.update_cache(response)
self._states_context.states.update_cache(response)

def links_extracted(self, request, links):
"""
@@ -507,13 +526,13 @@ def links_extracted(self, request, links):
self._states_context.to_fetch(request)
self._states_context.to_fetch(filtered)
self._states_context.fetch()
self._states_context._states.set_states(filtered)
self._states_context.states.set_states(filtered)
self._process_components(method_name='links_extracted',
obj=request,
return_classes=self.request_model,
components=(2,),
links=filtered)
self._states_context._states.update_cache(filtered)
self._states_context.states.update_cache(filtered)

def request_error(self, request, error):
"""
@@ -528,12 +547,12 @@ def request_error(self, request, error):
self._logger.debug('PAGE_REQUEST_ERROR url=%s error=%s', request.url, error)
self._states_context.to_fetch(request)
self._states_context.fetch()
self._states_context._states.set_states(request)
self._states_context.states.set_states(request)
processed_page = self._process_components(method_name='request_error',
obj=request,
return_classes=self.request_model,
error=error)
self._states_context._states.update_cache(request)
self._states_context.states.update_cache(request)
return processed_page

def create_request(self, url, method=b'GET', headers=None, cookies=None, meta=None, body=b''):
@@ -554,13 +573,124 @@ def create_request(self, url, method=b'GET', headers=None, cookies=None, meta=No
return_classes=self.request_model,
components=(0,1))



def _check_startstop(self):
assert self._started, "Frontier not started!"
assert not self._stopped, "Call to stopped frontier!"


class WorkerFrontierManager(BaseManager, ComponentsPipelineMixin):
"""
The :class:`WorkerFrontierManager <frontera.core.manager.WorkerFrontierManager>` class role is to
instantiate the core components and is used mainly by workers.
"""
def __init__(self, settings, request_model, response_model, backend, strategy_class, strategy_args,
max_next_requests, scoring_stream, middlewares=None, canonicalsolver=None, db_worker=False,
strategy_worker=False):
"""
:param object/string request_model: The :class:`Request <frontera.core.models.Request>` object to be \
used by the frontier.

:param object/string response_model: The :class:`Response <frontera.core.models.Response>` object to be \
used by the frontier.

:param object/string backend: The :class:`Backend <frontera.core.components.Backend>` object to be \
used by the frontier.

:param list middlewares: A list of :class:`Middleware <frontera.core.components.Middleware>` \
objects to be used by the frontier.

:param int max_next_requests: Maximum number of requests returned by \
:attr:`get_next_requests <frontera.core.manager.FrontierManager.get_next_requests>` method.

:param object/string settings: The :class:`Settings <frontera.settings.Settings>` object used by \
the frontier.

:param object/string canonicalsolver: The :class:`CanonicalSolver <frontera.core.components.CanonicalSolver>`
object to be used by frontier.
:param object scoring_stream: Instance of :class:`UpdateScoreStream <frontera.core.manager.UpdateScoreStream>`
for crawling strategy to send scheduled requests to.

:param bool db_worker: True if class is instantiated in DB worker environment

:param bool strategy_worker: True if class is instantiated in strategy worker environment
"""

BaseManager.__init__(self, request_model, response_model, settings=settings)

self._max_next_requests = max_next_requests

ComponentsPipelineMixin.__init__(self, backend=backend, strategy_class=strategy_class,
strategy_args=strategy_args, scoring_stream=scoring_stream,
middlewares=middlewares, canonicalsolver=canonicalsolver,
db_worker=db_worker,strategy_worker=strategy_worker)

# Init frontier components pipeline
# Some code relies on the order, modify carefully
self._components_pipeline = [
('Middleware', self.middlewares, True),
('CanonicalSolver', self.canonicalsolver, False),
]

# Log frontier manager start
self._logger.info('Frontier Manager Started!')
self._logger.info('-'*80)

@classmethod
def from_settings(cls, settings=None, db_worker=False, strategy_worker=False, scoring_stream=None):
manager_settings = Settings.object_from(settings)
return WorkerFrontierManager(request_model=manager_settings.REQUEST_MODEL,
response_model=manager_settings.RESPONSE_MODEL,
backend=manager_settings.BACKEND,
strategy_class=manager_settings.STRATEGY,
strategy_args=manager_settings.STRATEGY_ARGS,
middlewares=manager_settings.MIDDLEWARES,
max_next_requests=manager_settings.MAX_NEXT_REQUESTS,
settings=manager_settings,
canonicalsolver=manager_settings.CANONICAL_SOLVER,
db_worker=db_worker,
strategy_worker=strategy_worker,
scoring_stream=scoring_stream)

def create_request(self, url, method=b'GET', headers=None, cookies=None, meta=None, body=b''):
"""
Creates request and applies middleware and canonical solver pipelines.

:param url: str
:param method: bytes
:param headers: dict
:param cookies: dict
:param meta: dict
:param body: bytes
:return: :class:`Request <frontera.core.models.Request>` object
"""
r = self.request_model(url, method=method, headers=headers, cookies=cookies, meta=meta, body=body)
return self._process_components('create_request',
obj=r,
return_classes=self.request_model,
components=(0, 1))


class SpiderFrontierManager(LocalFrontierManager):
def __init__(self, *args, **kwargs):
super(SpiderFrontierManager, self).__init__(*args, **kwargs)

@classmethod
def from_settings(cls, settings=None, db_worker=False, strategy_worker=False, scoring_stream=None):
manager_settings = Settings.object_from(settings)
return SpiderFrontierManager(request_model=manager_settings.REQUEST_MODEL,
response_model=manager_settings.RESPONSE_MODEL,
backend=manager_settings.BACKEND,
strategy_class=manager_settings.STRATEGY,
strategy_args=manager_settings.STRATEGY_ARGS,
middlewares=manager_settings.MIDDLEWARES,
max_next_requests=manager_settings.MAX_NEXT_REQUESTS,
settings=manager_settings,
canonicalsolver=manager_settings.CANONICAL_SOLVER,
db_worker=db_worker,
strategy_worker=strategy_worker,
scoring_stream=scoring_stream)


@six.add_metaclass(ABCMeta)
class UpdateScoreStream(object):

@@ -598,7 +728,7 @@ class StatesContext(object):

def __init__(self, states):
self._requests = []
self._states = states
self.states = states
self._fingerprints = dict()
self.logger = logging.getLogger("states-context")

@@ -609,20 +739,20 @@ def to_fetch(self, requests):
self._fingerprints[fingerprint] = request

def fetch(self):
self._states.fetch(self._fingerprints)
self.states.fetch(self._fingerprints)
self._fingerprints.clear()

def refresh_and_keep(self, requests):
self.to_fetch(requests)
self.fetch()
self._states.set_states(requests)
self.states.set_states(requests)
self._requests.extend(requests if isinstance(requests, Iterable) else [requests])

def release(self):
self._states.update_cache(self._requests)
self.states.update_cache(self._requests)
self._requests = []

def flush(self):
self.logger.info("Flushing states")
self._states.flush()
self.states.flush()
self.logger.info("Flushing of states finished")
4 changes: 2 additions & 2 deletions frontera/utils/managers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import absolute_import
from frontera.core.manager import FrontierManager
from frontera.core.manager import LocalFrontierManager
from .converters import BaseRequestConverter, BaseResponseConverter


class FrontierManagerWrapper(object):
def __init__(self, settings, manager=None):
manager = manager or FrontierManager
manager = manager or LocalFrontierManager
self.manager = manager.from_settings(settings)
self.request_converter = None
self.response_converter = None
Loading