Skip to content

Commit

Permalink
use asyncio-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Lupino committed Sep 23, 2021
1 parent bda3745 commit 98037a6
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 82 deletions.
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ To put our spider to work, go to the project's top level directory and edit ``ma
engine.set_sched(sched)
engine.set_spiders([DmozSpider()])

engine.start()
await engine.start()

then::

Expand Down
18 changes: 0 additions & 18 deletions grapy/core/base_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,3 @@ async def submit_req(self, req):

async def submit_item(self, item):
await self.engine.process_item(item)

async def run(self):
'''
run the scheduler
'''
raise NotImplementedError('you must rewrite at sub class')

async def async_start(self):
async with self.locker:
if self.is_running:
return

self.is_running = True

await self.run()

def start(self):
return self.engine.loop.create_task(self.async_start())
22 changes: 4 additions & 18 deletions grapy/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@

class Engine(object):

__slots__ = ['pipelines', 'spiders', 'middlewares', 'sched', 'loop']
__slots__ = ['pipelines', 'spiders', 'middlewares', 'sched']

def __init__(self, loop=None):
def __init__(self):
self.pipelines = []
self.spiders = {}
self.middlewares = []
self.sched = None
self.loop = loop
if not self.loop:
self.loop = asyncio.get_event_loop()

def set_spiders(self, spiders):
self.spiders = {}
Expand Down Expand Up @@ -168,16 +165,5 @@ async def push_req(req, spider):
for req in spider.start_request():
await push_req(req, spider)

async def run(self):
await self.start_request()

def start(self, forever=True):
self.loop.create_task(self.run())
if forever:
self.loop.run_forever()

def shutdown(self):
if self.loop.is_running():
self.loop.stop()
else:
self.loop.close()
async def start(self):
await self.start_request()
53 changes: 9 additions & 44 deletions grapy/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@
import re
from .utils import logger
from .core.exceptions import IgnoreRequest, RetryRequest
from asyncio_pool import AioPool

re_url = re.compile('^https?://[^/]+')

__all__ = ['Scheduler']


def hash_url(url):
h = hashlib.sha1()
h = hashlib.sha256()
h.update(bytes(url, 'utf-8'))
return h.hexdigest()


class Scheduler(BaseScheduler):
def __init__(self, max_tasks=5, auto_shutdown=True):
def __init__(self, size=10):
BaseScheduler.__init__(self)
self._storage = {}
self._queue = []
self._sem = asyncio.Semaphore(max_tasks)
self.tasks = []
self.auto_shutdown = auto_shutdown

self.shutdown_task = None
self._pool = AioPool(size = size)

async def push_req(self, req):
if not re_url.match(req.url):
Expand All @@ -33,51 +30,19 @@ async def push_req(self, req):
if req.unique and key in self._storage:
return

self._queue.insert(0, req)
await self._pool.spawn(self.submit_req(req))
self._storage[key] = True

self.start()

async def run(self):
while True:
if len(self._queue) == 0:
break

req = self._queue.pop()
await self._sem.acquire()
task = self.engine.loop.create_task(self.submit_req(req))
task.add_done_callback(lambda t: self._sem.release())
task.add_done_callback(lambda t: self.tasks.remove(t))
self.tasks.append(task)

async with self.locker:
self.is_running = False

if self.auto_shutdown:
async with self.locker:
if self.shutdown_task is None:
self.shutdown_task = self.engine.loop.create_task(
self.run_auto_shutdown())

async def run_auto_shutdown(self):
while True:
await asyncio.gather(*self.tasks)
if len(self.tasks) == 0:
break

if not self.is_running:
self.engine.shutdown()

self.shutdown_task = None

async def submit_req(self, req):
try:
await BaseScheduler.submit_req(self, req)
key = hash_url(req.url)
except IgnoreRequest:
pass
except RetryRequest:
req.unique = False
await self.push_req(req)
except Exception as e:
logger.exception(e)

async def join(self):
await self._pool.join()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
'grapy.core',
]

requires = ['asyncio', 'aiohttp', 'beautifulsoup4', 'requests']
requires = ['asyncio', 'aiohttp', 'beautifulsoup4', 'requests', 'asyncio-pool']

setup(
name='grapy',
Expand Down

0 comments on commit 98037a6

Please sign in to comment.