Skip to content

Commit

Permalink
chore(ingest/s3) Bump Deequ and Pyspark version (#8638)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Sikowitz <[email protected]>
  • Loading branch information
treff7es and asikowitz authored Aug 29, 2023
1 parent 7b0ebe6 commit d86b336
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 292 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
metadata-ingestion:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
SPARK_VERSION: 3.3.2
DATAHUB_TELEMETRY_ENABLED: false
# TODO: Enable this once the test is fixed.
# DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/docs/sources/s3/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,9 @@ If you are ingesting datasets from AWS S3, we recommend running the ingestion on
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` and `SPARK_VERSION` environment variables to be set. The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).

For an example guide on setting up PyDeequ on AWS, see [this guide](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/).

:::caution

From Spark 3.2.0+, Avro reader fails on column names that don't start with a letter and contains other character than letters, number, and underscore. [https://github.com/apache/spark/blob/72c62b6596d21e975c5597f8fff84b1a9d070a02/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala#L158]
Avro files that contain such columns won't be profiled.
:::
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ def get_long_description():
}

data_lake_profiling = {
"pydeequ>=1.0.1, <1.1",
"pyspark==3.0.3",
"pydeequ==1.1.0",
"pyspark~=3.3.0",
}

delta_lake = {
Expand Down
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,14 @@ def init_spark(self):
import pydeequ

conf = SparkConf()

spark_version = os.getenv("SPARK_VERSION", "3.3")
conf.set(
"spark.jars.packages",
",".join(
[
"org.apache.hadoop:hadoop-aws:3.0.3",
"org.apache.spark:spark-avro_2.12:3.0.3",
# Spark's avro version needs to be matched with the Spark version
f"org.apache.spark:spark-avro_2.12:{spark_version}{'.0' if spark_version.count('.') == 1 else ''}",
pydeequ.deequ_maven_coord,
]
),
Expand Down Expand Up @@ -374,10 +375,10 @@ def read_file_spark(self, file: str, ext: str) -> Optional[DataFrame]:
elif ext.endswith(".avro"):
try:
df = self.spark.read.format("avro").load(file)
except AnalysisException:
except AnalysisException as e:
self.report.report_warning(
file,
"To ingest avro files, please install the spark-avro package: https://mvnrepository.com/artifact/org.apache.spark/spark-avro_2.12/3.0.3",
f"Avro file reading failed with exception. The error was: {e}",
)
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2782,7 +2782,7 @@
"customProperties": {
"schema_inferred_from": "tests/integration/s3/test_data/local_system/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro",
"number_of_files": "1",
"size_in_bytes": "1024"
"size_in_bytes": "619"
},
"name": "chord_progressions_avro.avro",
"description": "",
Expand Down Expand Up @@ -2820,62 +2820,62 @@
},
"fields": [
{
"fieldPath": "[version=2.0].[type=Root].[type=double].Progression Quality",
"nullable": true,
"fieldPath": "[version=2.0].[type=Record].[type=long].FirstChord",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "double",
"nativeDataType": "FirstChord",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "[version=2.0].[type=Root].[type=long].1st chord",
"nullable": true,
"fieldPath": "[version=2.0].[type=Record].[type=long].FourthChord",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "long",
"nativeDataType": "FourthChord",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "[version=2.0].[type=Root].[type=long].2nd chord",
"nullable": true,
"fieldPath": "[version=2.0].[type=Record].[type=long].SecondChord",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "long",
"nativeDataType": "SecondChord",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "[version=2.0].[type=Root].[type=long].3rd chord",
"nullable": true,
"fieldPath": "[version=2.0].[type=Record].[type=long].ThirdChord",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "long",
"nativeDataType": "ThirdChord",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "[version=2.0].[type=Root].[type=string].4th chord",
"nullable": true,
"fieldPath": "[version=2.0].[type=Record].[type=string].ProgressionQuality",
"nullable": false,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "string",
"nativeDataType": "ProgressionQuality",
"recursive": false,
"isPartOfKey": false
}
Expand Down Expand Up @@ -2939,7 +2939,58 @@
"columnCount": 5,
"fieldProfiles": [
{
"fieldPath": "1st chord",
"fieldPath": "FirstChord",
"uniqueCount": 5,
"uniqueProportion": 0.17857142857142858,
"nullCount": 0,
"nullProportion": 0.0,
"distinctValueFrequencies": [
{
"value": "1",
"frequency": 19
},
{
"value": "2",
"frequency": 3
},
{
"value": "4",
"frequency": 2
},
{
"value": "5",
"frequency": 1
},
{
"value": "6",
"frequency": 3
}
],
"sampleValues": [
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"2",
"4",
"5",
"6",
"6",
"6"
]
},
{
"fieldPath": "SecondChord",
"uniqueCount": 5,
"uniqueProportion": 0.17857142857142858,
"nullCount": 0,
Expand Down Expand Up @@ -2990,7 +3041,7 @@
]
},
{
"fieldPath": "2nd chord",
"fieldPath": "ThirdChord",
"uniqueCount": 7,
"uniqueProportion": 0.25,
"nullCount": 0,
Expand Down Expand Up @@ -3049,7 +3100,7 @@
]
},
{
"fieldPath": "3rd chord",
"fieldPath": "FourthChord",
"uniqueCount": 6,
"uniqueProportion": 0.21428571428571427,
"nullCount": 0,
Expand Down Expand Up @@ -3104,7 +3155,7 @@
]
},
{
"fieldPath": "4th chord",
"fieldPath": "ProgressionQuality",
"uniqueCount": 20,
"uniqueProportion": 0.7142857142857143,
"nullCount": 0,
Expand Down Expand Up @@ -3213,41 +3264,6 @@
"Sweet",
"Wistful"
]
},
{
"fieldPath": "Progression Quality",
"uniqueCount": 1,
"uniqueProportion": 0.03571428571428571,
"nullCount": 0,
"nullProportion": 0.0,
"distinctValueFrequencies": [
{
"value": "NaN",
"frequency": 28
}
],
"sampleValues": [
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan",
"nan"
]
}
]
}
Expand Down
Loading

0 comments on commit d86b336

Please sign in to comment.