-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadding jar.txt
313 lines (224 loc) · 9.79 KB
/
adding jar.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
spark.jars /opt/hudi/hudi-spark-bundle.jar,/opt/hadoop/share/hadoop/hdfs/lib/hadoop-aws-3.2.0.jar,/usr/local/hive/lib/hive-exec.jar
spark.serializer org.apache.spark.serializer.KryoSerializer
CREATE EXTERNAL TABLE hudi_table (
id STRING,
name STRING,
ts TIMESTAMP
)
STORED AS PARQUET
LOCATION 'hdfs://localhost:9000/user/hudi_table';
// Hudi library to work with Hudi tables
libraryDependencies += "org.apache.hudi" %% "hudi-spark-bundle" % "0.9.0"
// Spark core and SQL libraries to work with Spark
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.3"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.3"
import org.apache.spark.sql.SparkSession
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode
val spark = SparkSession.builder()
.appName("Hudi Test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val data = Seq((1, "Alice", "2023-09-17T12:00:00Z"),
(2, "Bob", "2023-09-17T12:05:00Z"))
val df = spark.createDataFrame(data).toDF("id", "name", "ts")
df.write.format("hudi")
.option(TABLE_NAME, "hudi_table")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/user/hudi_table")
spark-shell --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/path/to/log4j.properties"
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Hudi Test") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars", "/opt/hudi/hudi-spark-bundle.jar") \
.getOrCreate()
data = [(1, "Alice", "2023-09-17T12:00:00Z"),
(2, "Bob", "2023-09-17T12:05:00Z")]
df = spark.createDataFrame(data, ["id", "name", "ts"])
df.write.format("hudi") \
.option("hoodie.table.name", "hudi_table") \
.option("hoodie.datasource.write.recordkey.field", "id") \
.option("hoodie.datasource.write.precombine.field", "ts") \
.mode("append") \
.save("hdfs://localhost:9000/user/hudi_table")
USE hudi_test;
SELECT * FROM hudi_table;
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceWriteOptions // Import only DataSourceWriteOptions
object HudiTest {
def main(args: Array[String]): Unit = {
// Create Spark session
val spark = SparkSession.builder()
.appName("Hudi Test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.jars", "/opt/hudi/hudi-spark-bundle.jar") // Specify Hudi jar
.getOrCreate()
// Sample data
val data = Seq(
(1, "Alice", "2023-09-17T12:00:00Z"),
(2, "Bob", "2023-09-17T12:05:00Z")
)
// Create DataFrame
val df = spark.createDataFrame(data).toDF("id", "name", "ts")
// Write Data to Hudi table with qualified references to avoid ambiguity
df.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_NAME, "hudi_table_test") // Qualified reference
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") // Qualified reference
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // Qualified reference
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/user/hudi_table_test")
// Stop the Spark session
spark.stop()
}
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceWriteOptions
object HudiTest {
def main(args: Array[String]): Unit = {
// Create Spark session
val spark = SparkSession.builder()
.appName("Hudi Test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.jars", "/opt/hudi/hudi-spark-bundle.jar") // Specify Hudi jar
.getOrCreate()
// Sample data
val data = Seq(
(1, "Alice", "2023-09-17T12:00:00Z"),
(2, "Bob", "2023-09-17T12:05:00Z")
)
// Create DataFrame
val df = spark.createDataFrame(data).toDF("id", "name", "ts")
// Write Data to Hudi table using string keys instead of ConfigProperty
df.write.format("hudi")
.option("hoodie.table.name", "hudi_table_test") // Use string key instead of ConfigProperty
.option("hoodie.datas
ource.write.recordkey.field", "id") // Use string key
.option("hoodie.datasource.write.precombine.field", "ts") // Use string key
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/user/hudi_table_test")
// Stop the Spark session
spark.stop()
}
}
Step 5: Integrating Apache Hudi with Spark
5.1 Add Hudi Dependencies to Spark:
Use the following command to start Spark Shell with Hudi support:
$SPARK_HOME/bin/spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
5.2 Running a Sample Hudi Job in Spark:
Write Data to Hudi:
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._
val dataGen = spark.range(0, 1000).toDF("id")
dataGen.write.format("hudi")
.option(TABLE_NAME, "hudi_table")
.option(PRECOMBINE_FIELD.key(), "id")
.option(RECORDKEY_FIELD.key(), "id")
.option(PARTITIONPATH_FIELD.key(), "")
.option(OPERATION.key(), "insert")
.option(TABLE_TYPE.key(), "COPY_ON_WRITE")
.mode(Overwrite)
.save("/path/to/hudi-table")
Read Hudi Data:
val hudi_df = spark.read.format("hudi").load("/path/to/hudi-table/*")
hudi_df.show()
Step 6: Integrating Apache Hudi with Hive
6.1 Enable Hive Sync for Hudi Tables:
When writing data into a Hudi table using Spark, you can enable Hive sync:
dataGen.write.format("hudi")
.option(TABLE_NAME, "hudi_table")
.option(PRECOMBINE_FIELD.key(), "id")
.option(RECORDKEY_FIELD.key(), "id")
.option(PARTITIONPATH_FIELD.key(), "")
.option(OPERATION.key(), "insert")
.option(TABLE_TYPE.key(), "COPY_ON_WRITE")
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.hive_sync.database", "default")
.option("hoodie.datasource.hive_sync.table", "hudi_table")
.option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://localhost:10000")
.option("hoodie.datasource.hive_sync.partition_fields", "date")
.mode(Overwrite)
.save("/path/to/hudi-table")
6.2 Query Hudi Table from Hive:
Start the Hive shell and run a query:
hive
SELECT * FROM hudi_table;
import org.apache.hudi.DataSourceWriteOptions // Import only DataSourceWriteOptions
import org.apache.spark.sql.SaveMode
object HudiTest {
def main(args: Array[String]): Unit = {
// Create Spark session
val spark = SparkSession.builder()
.appName("Hudi Test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.jars", "/opt/hudi/hudi-spark-bundle.jar") // Specify Hudi jar
.getOrCreate()
// Sample data
val dataGen = spark.range(0, 1000).toDF("id")
// Write data to Hudi table using qualified references to avoid ambiguity
dataGen.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_NAME.key(), "hudi_table_test") // Fully qualified reference
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id") // Fully qualified reference
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id") // Fully qualified reference
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "") // Fully qualified reference
.option(DataSourceWriteOptions.OPERATION.key(), "insert") // Fully qualified reference
.option(DataSourceWriteOptions.TABLE_TYPE.key(), "COPY_ON_WRITE") // Fully qualified reference
.mode(SaveMode.Overwrite) // Overwrite existing data
.save("hdfs://localhost:9000/user/hudi/hudi_table_test") // Save to HDFS
// Stop the Spark session
spark.stop()
}
}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode
val tableName = "hudi_test_table"
val basePath = "file:///tmp/hudi_test_table"
val data = Seq(
(1, "Alice", 24),
(2, "Bob", 30),
(3, "Cathy", 27)
)
val df = spark.createDataFrame(data).toDF("id", "name", "age")
df.write.format("hudi")
.option(TABLE_NAME, tableName)
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "age")
.mode(SaveMode.Overwrite)
.save(basePath)
./spark-shell --jars /opt/hudi/hudi-spark3.1.2-bundle_2.12-0.10.1.jar \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.hive.convertMetastoreParquet=false
CREATE EXTERNAL TABLE hudi_hive_table (
id INT,
name STRING,
age INT
)
STORED BY 'org.apache.hudi.hadoop.HoodieHiveStorageHandler'
LOCATION 'file:///tmp/hudi_test_table'
TBLPROPERTIES (
'hoodie.datasource.write.recordkey.field'='id',
'hoodie.datasource.write.precombine.field'='age',
'hoodie.table.name'='hudi_test_table',
'hoodie.datasource.write.hive_style_partitioning'='true'
);
export HADOOP_CLASSPATH=/opt/hudi/hudi-spark3.1.2-bundle_2.12-0.10.1.jar:$HADOOP_CLASSPATH
SELECT * FROM hudi_hive_table;
hive> CREATE EXTERNAL TABLE hudi_hive_table(
> id INT,
> name STRING,
> age INT
> )
> STORED BY 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
> LOCATION 'file:///tmp/hudi_test_table';
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hudi.hadoop.HoodieParquetInputFormat cannot be cast to org.apache.hadoop.hive.ql.metadata.HiveStorageHandler