Skip to content

Graph Managers

Martin Olveyra edited this page Oct 28, 2022 · 10 revisions

Previous Chapter: Managing Hubstorage Crawl Frontiers


Lets suppose we have the following spider:

class MySiteArticlesSpider(Spider):

    name = 'mysite.com'

    def parse(Self, response):
        (... link discovery logic ...)
        yield Request(..., callback=self.parse_article)

    def parse_article(self, response):
        (... actual implementation of the article parsing method ...)

We can run this spider as is until completion. We can also split the spider into a frontier consumer and a fronter producer as we did in the previous chapter. In that case, we ended up building a workflow where a producer spider run first, and a crawl manager that schedule consumers run after it. In that case we need to schedule both components either manually or by programming their scheduling separately at specific times. We can also build a more sophisticated scheduling by using another external script, a graph manager. The graph manager allows to automatize the building of Zyte Scrapy Cloud workflows by defining a graph of tasks.

With this idea in mind, we could create a graph that schedules the producer spider and the hcf crawl manager that schedules the consumers. In this chapter we will explain this approach, but without having a separated producer and consumer spider. We will rather use a single spider that can either run standalone, or plugged into a workflow to run in two different modes, producer and consumer, without performing any change in its code. If you still prefer the splitted spiders version, you can easily adapt the example used here for your purpose. But notice that starting from a single spider will be the most common case, as most spiders are implemented in that way. This approach has the advantage that you don't need to write spiders thinking on how they will run. You decouple the spider implementation from its scheduling strategy, that may vary along stages of a project.

So, lets start by creating a manager script (i.e. scripts/manager_articles.py), by subclassing GraphManager class (explanation of the code goes below):

import json
from typing import Tuple

from shub_workflow.graph import GraphManager
from shub_workflow.graph.task import Task, SpiderTask

class MyArticlesGraphManager(GraphManager):

    loop_mode = 120
    default_max_jobs = 8

    frontier = 'mysite-articles-frontier'

    def add_argparser_options(self):
        super().add_argparser_options()
        self.argparser.add_argument("--slot-prefix", default="test")

    def configure_workflow(self) -> Tuple[Task]:
        producer_frontera_settings = {
            "BACKEND": "hcf_backend.HCFBackend",
            "HCF_PRODUCER_FRONTIER": self.frontier,
            "HCF_PRODUCER_SLOT_PREFIX": self.args.slot_prefix,
            "HCF_PRODUCER_NUMBER_OF_SLOTS": self.args.max_running_jobs,
        }
        producer_settings = {
            "FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER": "parse_article",
        }

        discover = SpiderTask(
            "discover",
            "mysite.com",
            job_settings=producer_settings,
            frontera_settings_json=json.dumps(producer_frontera_settings)
        )

        consumer_frontera_settings = {
            "BACKEND": "hcf_backend.HCFBackend",
            "HCF_CONSUMER_FRONTIER": self.frontier,
            "HCF_CONSUMER_MAX_BATCHES": 50,
        }

        consumer_sertings = {
            "FRONTERA_SCHEDULER_SKIP_START_REQUESTS": True,
        }

        scrapers = Task(
            "scrapers",
            "hcf_crawlmanager.py",
            init_args=[
                "mysite.com",
                self.frontier,
                self.args.slot_prefix,
                "--loop-mode=60",
                f"--frontera-settings-json={json.dumps(consumer_frontera_settings)}",
                f"--max-running-jobs={self.args.max_running_jobs}",
                f"--job-settings={json.dumps(consumer_settings)}"
            ]
        )

        discover.add_next_task(scrapers)
        return (discover,)

Lets dig into the details of this code. The workflow is defined via the configure_workflow() method. This method defines two tasks: discover and scrapers. The discover task is a spider task. It schedules the producer, using same frontera and scrapy settings as before, so they don't need explanation. The SpiderTask class is a wrapper to define the scheduling of a spider. The first parameter is the name of the task. The second one is the name of the spider. Then we are passing job_settings and frontera_settings_json parameters.

The scrapers task is a script task. The first parameter is the task name, the second, the script name, and then a list of arguments, which are the same we passed manually to the hcf_crawlmanager.py invoked in the previous sections, except that we are passing an additional scrapy setting: FRONTERA_SCHEDULER_SKIP_START_REQUESTS. This setting instructs the consumer not to execute start requests. In our previous section, this setting was not required because we had separated consumer and producer codes. Here we need it in order to avoid the consumer to perform discovery stage.

The workflow declaration is completed by adding the scrapers task as a next job of discovery task. So, when discovery finishes, scrapers is scheduled. The return value of the configure_workflow() method is a tuple of tasks which will be the root tasks. In this case, only discover will be the root. In order to execute this workflow, the script must be invoked in this way:

$ python manager_articles.py --root-jobs

The same rules described before for hcf_manager.py apply regarding the target SC project where the jobs are scheduled.

Long running producers

In some use cases, the producer can take long time to be completed. However, during the run the requests are already being sent to the frontier. If you don't want the consumers task to wait for the producer to be completed and start the consumers as soon as possible, you can modify slightly the workflow by adding the scrapers task the wait_time parameter, and returning both discover and scraper tasks as root jobs, instead of adding one as next job of the other:

        (...)

        scrapers = Task(
            "scrapers",
            "hcf_crawlmanager.py",
            init_args=[
                "mysite.com",
                self.frontier,
                self.args.slot_prefix,
                "--loop-mode=60",
                f"--frontera-settings-json={json.dumps(consumer_frontera_settings)}",
                f"--max-running-jobs={self.args.max_running_jobs}",
                f"--job-settings={json.dumps(consumer_settings)}"
            ],
            wait_time=600,
        )

        return (discover, scraper)

The addition of wait_time=600 instructs the scrapers task to wait 10 minutes before actually starting, thus giving a margin to the discovery task to generate requests to start to consumer (if the hcf_manager.py don't find any request in the target slots, it will consider that the crawl was completed and will terminate)

Even longer running producers: broad crawler use case.

What if the producer spider needs very long time to be completed and a failure in the middle stops it prematurely? This is the typical case of a broad crawler:

class MySiteArticlesSpider(Spider):

    name = 'mysite.com'

    def parse(Self, response):
        (... link discovery logic ...)
        if <some condition>:
            # articles links
            yield Request(..., callback=self.parse_article)
        else:
            # other links
            yield Request(..., callback=self.parse)

    def parse_article(self, response):
        (... actual implementation of the article parsing method ...)

It is very expensive to start again from zero if the producer stops. So here we can also save the status of the discovery spider in the frontier. We need to write exploration links on a different set of slots than the one we used for articles links. So, we need a way to map own requests to a different set of slots. And that feature is provided by the scrapy-frontera setting FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP. So, you will modify the previous configure_workflow() method by changing the producer section in this way:

        (...)
        producer_frontera_settings = {
            "BACKEND": "hcf_backend.HCFBackend",
            "HCF_PRODUCER_FRONTIER": self.frontier,
            "HCF_PRODUCER_SLOT_PREFIX": "links",
        }
        producer_settings = {
            "FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER": "parse,parse_article",
            'FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP': {
                'parse_article': f'{self.args.slot_prefix}/{self.args.max_running_jobs}',
            },
        }
        discover = Task(
            "discover",
            "hcf_crawlmanager.py",
            init_args=[
                "mysite.com",
                self.frontier,
                "links",
                "--loop-mode=60",
                f"--frontera-settings-json={json.dumps(producer_frontera_settings)}",
                f"--job-settings={json.dumps(producer_settings)}"
            ]
        )

Let's review the changes performed:

  • Notice the addition of 'parse' into the FRONTERA_SCHEDULER_REQUEST_CALLBACKS_TO_FRONTIER list. Now we will send to the frontier not only the requests with parse_article() callback as previously, but also the ones with the parse() callback.
  • We changed the value of HCF_PRODUCER_SLOT_PREFIX to links and removed the HCF_PRODUCER_NUMBER_OF_SLOTS from producer_frontera_settings, so the number of slots will take the default value 1. So by default the producer will write all requests to a single slot links0.
  • We now configure the target slots for the consumer via FRONTERA_SCHEDULER_CALLBACK_SLOT_PREFIX_MAP.
  • discover task is now an hcf crawlmanager script Task instead of a SpiderTask. This hcf crawlmanager will configure the producer also as a consumer, and take care of its periodic scheduling.

This will result in a producer that on first job will scrape the start url, extract links, both for itself and for the consumer, and finish. The hcf crawl manager will then detect the presence of requests in the links0 slot and schedule the producer again, consumer all links saved in the frontier, generate more links and stop. Once the requests generated for itself are big enough, the flush of them to the frontier will be performed before the spider finish, so it will reach a continuous regime of reading/writing without stopping, except for an abnormal situation. This premature stopping will not, however, will not affect significantly the producer crawl process. Once the producer is rescheduled by the hcf crawl manager, it will continue by reading the next batch after the one last read in the failed job. If that last batch was not fully processed, some requests will be lost.

If you want to avoid this collateral loss, you can pass an additional HCF frontera setting: HCF_CONSUMER_DELETE_BATCHES_ON_STOP=True. By default, batches are deleted immediately after read, and that is the cause of the loss in case the job finishes abnormally. With this setting you will ensure that batches are deleted when the spider stops. However, if you provide this setting with no extra configuration, as the batches are deleted on finish and the producer never reaches that stage, it will loop reading continuously the same batches once and again. So when using this setting you need to provide additional settings in order to ensure that the producer finishes each time it reads some number of batches. You must play with HCF_CONSUMER_MAX_BATCHES and MAX_NEXT_REQUESTS. MAX_NEXT_REQUESTS governs how many requests will be read on a next read request. This is a frontera (not HCF frontera backend) setting, and its default value is 64. The number of batches before stopping will depend on this value and the batch sizes generated by the producer (see HCF_PRODUCER_BATCH_SIZE on hcf backend documentation). Roughly, number of read batches on each read cycle will be int(MAX_NEXT_REQUESTS / <batch size>). And take special care to set HCF_CONSUMER_MAX_BATCHES to a number equal or smaller than this one. If bigger, the spider will read some batches more than once, thus loosing performance. Also, HCF_CONSUMER_MAX_BATCHES must be bigger than 0, which is the default value for this setting. 0 means not limit the number of batches to read, and this in combination with HCF_CONSUMER_DELETE_BATCHES_ON_STOP=True will result in a job reading the same batches forever and never stop.

Here you will have a compromise: the smaller the number of read batches per job, the bigger the proportion of tail requests in the total job time, and more cost in setting up/shutting down SC jobs, so performance will decay. But as the number of read batches per job is bigger, more memory you will use, and more links will be reprocessed on a new job if current one finishes abnormally. If the spider is enough stable this second effect will be insignificant, however. But the memory factor will still be very important. A starting typical number could be to read around 100 batches per job, so an appropiate set of settings for this case would be:

        producer_frontera_settings = {
            "BACKEND": "hcf_backend.HCFBackend",
            "HCF_PRODUCER_FRONTIER": self.frontier,
            "HCF_PRODUCER_SLOT_PREFIX": "links",

            HCF_CONSUMER_MAX_BATCHES: 100,
            MAX_NEXT_REQUESTS: 10000,
        }

With the combination of settings above you will never read less than 100 batches. And number of requests read will be between 10000 and 10099 depending on exact batch sizes, which is not a constant value. Starting from these values you can tune your project further, as ultimately the best performance will depend on the exact case. As the maximal size of an HCF batch is 100, this will also ensure that HCF_CONSUMER_MAX_BATCHES is equal or smaller than MAX_NEXT_REQUESTS / <batch size>.

Making deliveries and cleaning the frontier

When you have a large crawl split into many jobs, the delivery of data into a single file can become a pain. Shub-workflow provides a deliver script that facilitates this task, by reading items from all the jobs of the given target spider jobs with same FLOW_ID tag as the delivery script. As all the scripts scheduled by a graph manager and hcf manager inherits same flow id, it is ensured that all and only the spider jobs belonging to same crawl will be read and delivered by the deliver script.

Let's suppose we want to deliver our job items into s3 (the script also supports GCS and local file storage), so we would add to our project the script scripts/deliver.py:

import json
from tempfile import mktemp

from shub_workflow.deliver import BaseDeliverScript
from shub_workflow.deliver.futils import mv_file

class MyDeliverScript(BaseDeliverScript):

    def add_argparser_options(self):
        super().add_argparser_options()
        self.argparser.add_argument("--output-file")

    def on_item(self, item: dict, scrapername: str):
        print(json.dumps(item), file=self.tempfile)

    def run(self):
        if self.args.output_file is not None:
            self.tempfile = open(mktemp(), "w", encoding="utf8")
            super().run()

    def on_close(self):
        if self.args.output_file and self.total_items_count > 0:
            self.tempfile.close()
            mv_file(self.tempfile.name, self.args.output_file)
        super().on_close()


if __name__ == '__main__':
    deliver = MyDeliverScript()
    deliver.run()

The above subclassing is very simple and it is easy to understand what it does. But this class is designed for easy overriding of multiple attributes and methods in order to provide very flexible customization for the specific needs of a project. (see DeliverScript docstring). So, in our example, once the crawl is finished, which happens once the consumer task is completed, you want a delivery:

    def configure_workflow(self):

        (...)

        (... compute destination parameters ...)

        deliverTask = Task("deliver", "deliver.py", init_args=["mysite.com", f"--output-file=s3://{mybucket}/{myfolder}/{myfilename}"])
        scrapers.add_next_task(deliverTask)

        (...)

Finally, a good workflow must include the cleaning of the frontier slots for a future crawl. For our tutorial example:

    def configure_workflow(self):
        (...)

        clean = Task('clean', 'hcfpal.py', init_args=["delete", self.frontier, self.args.slot_prefix])
        scrapers.add_next_task(clean)

        (...)

And additionally, for the broad crawler example:

    def configure_workflow(self):
        (...)

        cleanLinks = Task('cleanLinks', 'hcfpal.py', init_args=["delete", self.frontier, 'links'])
        discover.add_next_task(cleanLinks)

        (...)

Next Chapter: Workflow Resuming