From cb8a25ff1a1876bc42cc4b21a6abcaec94124cb0 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 11:41:24 +0200 Subject: [PATCH] Added mode parameter to PySparkS3Dataset This parameter allows for filtering out VisitIds that are part of `incompleted_visits` or that had a command with a command_status other than "ok" since users probably shouldn't consider them for analysis This filtering functionality is extracted into the TableFilter class to be reused by other Datasets. --- openwpm_utils/crawlhistory.py | 28 +++++++++++++ openwpm_utils/dataquality.py | 26 +++++++++++- openwpm_utils/s3.py | 77 ++++++++++++++++++++++++----------- setup.py | 2 +- 4 files changed, 108 insertions(+), 25 deletions(-) create mode 100644 openwpm_utils/crawlhistory.py diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py new file mode 100644 index 0000000..3799f47 --- /dev/null +++ b/openwpm_utils/crawlhistory.py @@ -0,0 +1,28 @@ +import pyspark.sql.functions as F +from pyspark.sql.types import StringType + +reduce_to_worst_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "error"), "error") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "timeout"), "timeout") + .otherwise("ok") + .alias("worst_status") +) + + +reduce_to_best_command_status = ( + F.when(F.array_contains("command_status", "ok"), "ok") + .when(F.array_contains("command_status", "timeout"), "timeout") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "error"), "error") + .otherwise("critical") + .alias("best_status") +) + + +def get_worst_status_per_visit_id(crawl_history): + """Adds column `worst_status`""" + return (crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status",reduce_to_worst_command_status)) diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index d4a36f9..e7a2ce8 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -1,5 +1,9 @@ -from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when +import pyspark.sql.functions as F from pyspark.mllib.stat import Statistics +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id def count_not_null(c, nan_as_null=False): @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True): "\nNumber of records with visit_id == -1: %d" % df.where(df.visit_id == -1).count() ) + + +class TableFilter: + def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None: + self._incomplete_visit_ids = incomplete_visits.select("visit_id") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def clean_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + + def dirty_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 5654ba8..3e8581c 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -1,15 +1,22 @@ import gzip +from typing import List import boto3 import jsbeautifier import pyarrow.parquet as pq +import pyspark.sql.functions as F import s3fs from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa -from pyspark.sql import SQLContext +from pyspark import SparkContext +from pyspark.sql import DataFrame, SQLContext -class S3Dataset: - def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter + + +class S3Dataset(object): + def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -38,30 +45,33 @@ def read_table(self, table_name, columns=None): columns : list of strings The set of columns to filter the parquet dataset by """ - return pq.ParquetDataset( - self._s3_table_loc % table_name, - filesystem=self._s3fs, - metadata_nthreads=4 - ).read(use_pandas_metadata=True, columns=columns).to_pandas() + return ( + pq.ParquetDataset( + self._s3_table_loc % table_name, + filesystem=self._s3fs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") try: obj = s3.get_object( - Bucket=self._s3_bucket, - Key=self._content_key % content_hash + Bucket=self._s3_bucket, Key=self._content_key % content_hash ) body = obj["Body"] compressed_content = body.read() body.close() except ClientError as e: - if e.response['Error']['Code'] != 'NoSuchKey': + if e.response["Error"]["Code"] != "NoSuchKey": raise else: return None - with gzip.GzipFile(fileobj=compressed_content, mode='r') as f: + with gzip.GzipFile(fileobj=compressed_content, mode="r") as f: content = f.read() if content is None or content == "": @@ -74,9 +84,11 @@ def collect_content(self, content_hash, beautify=False): pass return content + class PySparkS3Dataset(S3Dataset): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): + def __init__( + self, spark_context, s3_directory: str, s3_bucket: str = "openwpm-crawls" + ): """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -89,16 +101,17 @@ def __init__(self, spark_context, s3_directory, s3_bucket : string, optional The bucket name on S3. Defaults to `openwpm-crawls`. """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory + super().__init__(s3_directory, s3_bucket) self._spark_context = spark_context self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) + self._s3_table_loc = f"s3a://{self._s3_table_loc}" + incomplete_visits = self.read_table("incomplete_visits", mode="all") + crawl_history = self.read_table("crawl_history", mode="all") + self._filter = TableFilter(incomplete_visits, crawl_history) - def read_table(self, table_name, columns=None): + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -107,8 +120,26 @@ def read_table(self, table_name, columns=None): OpenWPM table to read columns : list of strings The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + + if mode == "all": + table = table + elif mode == "failed": + table = self._filter.dirty_table(table) + elif mode == "successful": + table = self._filter.clean_table(table) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + if columns is not None: - return table.select(columns) + table = table.select(columns) + return table diff --git a/setup.py b/setup.py index 8e7bbee..9ba38d6 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ name='openwpm-utils', license='MPL 2.0', url='https://github.com/mozilla/openwpm-utils', - version='0.2.0', + version='0.3.0', packages=['openwpm_utils'], # Dependencies