forked from aws-samples/aws-big-data-blog-dmscdc-walkthrough
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DMSCDC_LoadIncremental.py
78 lines (65 loc) · 2.83 KB
/
DMSCDC_LoadIncremental.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import sys
from awsglue.job import Job
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
import boto3
import urlparse
import urllib
s3conn = boto3.client('s3')
sparkContext = SparkContext.getOrCreate()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'bucket',
'prefix',
'folder',
'out_path',
'lastIncrementalFile',
'newIncrementalFile',
'primaryKey',
'partitionKey'])
job.init(args['JOB_NAME'], args)
s3_inputpath = 's3://' + args['bucket'] + '/' + args['prefix'] + args['folder']
s3_outputpath = 's3://' + args['out_path'] + args['folder']
last_file = args['lastIncrementalFile']
curr_file = args['newIncrementalFile']
primary_keys = args['primaryKey']
partition_keys = args['partitionKey']
inputfile = spark.read.parquet(s3_inputpath+"/2*.parquet")
inputfile.filter(input_file_name() > last_file)
inputfile.filter(input_file_name() <= curr_file)
#No Primary_Keys implies insert only
if primary_keys == "null":
output = inputfile.filter(inputfile.Op=='I')
filelist = [["null"]]
else:
primaryKeys = primary_keys.split(",")
windowRow = Window.partitionBy(primaryKeys).orderBy("sortpath")
#Loads the targetdata adding columns for processing
target = spark.read.parquet(s3_outputpath).withColumn("sortpath", lit("0")).withColumn("filepath",input_file_name()).withColumn("rownum", lit(1))
input = inputfile.withColumn("sortpath", input_file_name()).withColumn("filepath",input_file_name()).withColumn("rownum", row_number().over(windowRow))
#determine impacted files
files = target.join(inputfile, primaryKeys, 'inner').select(col("filepath").alias("filepath1")).distinct()
#union new and existing data of impacted files
uniondata = input.select(target.columns).union(target.join(files,files.filepath1==target.filepath).select(target.columns))
window = Window.partitionBy(primaryKeys).orderBy(desc("sortpath"), desc("rownum"))
output = uniondata.withColumn('rnk', rank().over(window)).where(col("rnk")==1).where(col("Op")!="D").coalesce(1).select(inputfile.columns)
# write data by partitions
if partition_keys != "null" :
partitionKeys = partition_keys.split(",")
output.repartition(partitionKeys[0]).write.mode('append').partitionBy(partitionKeys).parquet(s3_outputpath)
else:
output.write.mode('append').parquet(s3_outputpath)
#delete old files
filelist = files.collect()
for row in filelist:
if row[0] != "null":
o = urlparse.urlparse(row[0])
s3conn.delete_object(Bucket=o.netloc, Key=urllib.unquote(o.path)[1:])