-
Notifications
You must be signed in to change notification settings - Fork 2
/
xlang_example.py
108 lines (96 loc) · 3.6 KB
/
xlang_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.generate_sequence import GenerateSequence
from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake
SERVER_NAME = <SNOWFLAKE SERVER NAME>
USERNAME = <SNOWFLAKE USERNAME>
PASSWORD = <SNOWFLAKE PASSWORD>
SCHEMA = <SNOWFLAKE SCHEMA>
DATABASE = <SNOWFLAKE DATABASE>
STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME>
TABLE = <SNOWFLAKE TABLE NAME>
EXPANSION_SERVICE = 'localhost:8097'
SCHEMA_STRING = """
{"schema":[
{"dataType":{"type":"text","length":null},"name":"text_column","nullable":true},
{"dataType":{"type":"integer","precision":38,"scale":0},"name":"number_column","nullable":false},
{"dataType":{"type":"boolean"},"name":"boolean_column","nullable":false}
]}
"""
OPTIONS =[
"--runner=FlinkRunner",
"--flink_version=1.10",
"--flink_master=localhost:8081",
"--environment_type=LOOPBACK"
]
class Row(object):
def __init__(self, text_column, number_column, boolean_column):
self.text_column = text_column
self.number_column = number_column
self.boolean_column = boolean_column
def __eq__(self, other):
return self.text_column == other.text_column and \
self.number_column == other.number_column and \
self.boolean_column == other.boolean_column
def __str__(self):
return self.text_column + " " + str(self.number_column) + " " + str(self.boolean_column)
def run_write():
def user_data_mapper(test_row):
return [str(test_row.text_column).encode('utf-8'),
str(test_row.number_column).encode('utf-8'),
str(test_row.boolean_column).encode('utf-8')
]
p = beam.Pipeline(options=PipelineOptions(OPTIONS))
(p
| GenerateSequence(start=1, stop=3, expansion_service=EXPANSION_SERVICE)
| beam.Map(lambda num: Row("test" + str(num), num, True))
| "Writing into Snowflake" >> WriteToSnowflake(
server_name=SERVER_NAME,
username=USERNAME,
password=PASSWORD,
schema=SCHEMA,
database=DATABASE,
staging_bucket_name=STAGING_BUCKET_NAME,
storage_integration=STORAGE_INTEGRATION,
create_disposition="CREATE_IF_NEEDED",
write_disposition="TRUNCATE",
table_schema=SCHEMA_STRING,
user_data_mapper=user_data_mapper,
table=TABLE,
query=None,
expansion_service=EXPANSION_SERVICE)
)
result = p.run()
result.wait_until_finish()
def run_read():
def csv_mapper(strings_array):
return Row(
strings_array[0],
int(strings_array[1]),
bool(strings_array[2])
)
def print_row(row):
print(row)
p = beam.Pipeline(options=PipelineOptions(OPTIONS))
(p
| "Reading from Snowflake" >> ReadFromSnowflake(
server_name=SERVER_NAME,
username=USERNAME,
password=PASSWORD,
schema=SCHEMA,
database=DATABASE,
staging_bucket_name=STAGING_BUCKET_NAME,
storage_integration=STORAGE_INTEGRATION,
csv_mapper=csv_mapper,
table=TABLE,
expansion_service=EXPANSION_SERVICE)
| "Print" >> beam.Map(print_row)
)
result = p.run()
result.wait_until_finish()
def run():
run_write()
run_read()
if __name__ == '__main__':
run()