-
Notifications
You must be signed in to change notification settings - Fork 16
/
Exp #4538-P - UDFs.py
136 lines (98 loc) · 3.77 KB
/
Exp #4538-P - UDFs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Databricks notebook source
sc.setJobDescription("Step A: Basic initialization")
# Disable the Delta IO Cache to avoid side effects
spark.conf.set("spark.databricks.io.cache.enabled", False)
# COMMAND ----------
sc.setJobDescription("Step B: Create Table")
initDF = (spark
.read
.format("delta")
.load("dbfs:/mnt/training/global-sales/transactions/2011-to-2018-100gb-par_year.delta")
)
initDF.createOrReplaceTempView("transactions")
# Printing the schema here forces spark to read the schema
# avoiding side effects in future benchmarks
initDF.printSchema()
# COMMAND ----------
sc.setJobDescription("Step C: Establish a baseline")
baseTrxDF = (spark
.read.table("transactions")
.select("description")
)
baseTrxDF.write.mode("overwrite").format("noop").save()
# COMMAND ----------
sc.setJobDescription("Step D: Higher-order functions")
from pyspark.sql.functions import *
trxDF = (baseTrxDF
.withColumn("ccd_id", regexp_extract("description", "ccd id: \\d+", 0))
.withColumn("ppd_id", regexp_extract("description", "ppd id: \\d+", 0))
.withColumn("arc_id", regexp_extract("description", "arc id: \\d+", 0))
.withColumn("temp_id", when(col("ccd_id") != "", col("ccd_id"))
.when(col("ppd_id") != "", col("ppd_id"))
.when(col("arc_id") != "", col("arc_id"))
.otherwise(None))
.withColumn("trxType", regexp_replace(split("temp_id", ": ")[0], " id", ""))
.withColumn("id", split("temp_id", ": ")[1])
.drop("ccd_id", "ppd_id", "arc_id", "temp_id")
)
trxDF.write.mode("overwrite").format("noop").save()
# COMMAND ----------
sc.setJobDescription("Step E: UDFs")
import re
from pyspark.sql.functions import *
@udf('string')
def parserId(description):
ccdId = re.findall("ccd id: \\d+", description)
if len(ccdId) > 0: return ccdId[0][8:]
ppdId = re.findall("ppd id: \\d+", description)
if len(ppdId) > 0: return ppdId[0][8:]
arcId = re.findall("arc id: \\d+", description)
if len(arcId) > 0: return arcId[0][8:]
return None
@udf('string')
def parseType(description):
ccdId = re.findall("ccd id: \\d+", description)
if len(ccdId) > 0: return ccdId[0][0:3]
ppdId = re.findall("ppd id: \\d+", description)
if len(ppdId) > 0: return ppdId[0][0:3]
arcId = re.findall("arc id: \\d+", description)
if len(arcId) > 0: return arcId[0][0:3]
return None
trxDF = (baseTrxDF
.withColumn("trxType", parseType("description"))
.withColumn("id", parserId("description"))
)
trxDF.write.mode("overwrite").format("noop").save()
# COMMAND ----------
sc.setJobDescription("Step F: Python Vectorized UDFs")
import re
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
@pandas_udf('string')
def parserId(descriptions: pd.Series) -> pd.Series:
def _parse_id(description):
ccdId = re.findall("ccd id: \\d+", description)
if len(ccdId) > 0: return ccdId[0][8:]
ppdId = re.findall("ppd id: \\d+", description)
if len(ppdId) > 0: return ppdId[0][8:]
arcId = re.findall("arc id: \\d+", description)
if len(arcId) > 0: return arcId[0][8:]
return None
return descriptions.map(_parse_id)
@pandas_udf('string')
def parseType(descriptions: pd.Series) -> pd.Series:
def _parse_type(description):
ccdId = re.findall("ccd id: \\d+", description)
if len(ccdId) > 0: return ccdId[0][0:3]
ppdId = re.findall("ppd id: \\d+", description)
if len(ppdId) > 0: return ppdId[0][0:3]
arcId = re.findall("arc id: \\d+", description)
if len(arcId) > 0: return arcId[0][0:3]
return None
return descriptions.map(_parse_type)
trxDF = (baseTrxDF
.withColumn("trxType", parseType("description"))
.withColumn("id", parserId("description"))
)
trxDF.write.mode("overwrite").format("noop").save()