Skip to content

Commit

Permalink
Support dynamic partition.
Browse files Browse the repository at this point in the history
Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Jul 24, 2023
1 parent a855c37 commit f3c4254
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 12 deletions.
16 changes: 11 additions & 5 deletions java/hive/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,29 @@ Hive and Vineyard
stored as
INPUTFORMAT 'io.v6d.hive.ql.io.VineyardInputFormat'
OUTPUTFORMAT 'io.v6d.hive.ql.io.VineyardOutputFormat';
insert into table hive_static_partition partition(value=666) values (1, 2);
insert into table hive_static_partition partition(value=666) values (999, 2), (999, 2), (999, 2);
insert into table hive_static_partition partition(value=666) values (3, 4);
insert into table hive_static_partition partition(value=114514) values (1, 2);
select * from hive_static_partition;
select * from hive_static_partition where value=666;
select * from hive_static_partition where value=114514;
- Test static partition(BUG):
- Test dynamic partition:

We must set the batch size to 1. Because ArrowColumnarBatchSerDe do not process dynamic partitioning. If
the batch size is not set to 1, ArrowColumnarBatchSerDe will batch data from different partitions together,
and the data will be written to the same partition, which will cause the data to be written incorrectly.

.. code:: sql
set hive.fetch.task.conversion=none;
set hive.vectorized.use.vectorized.input.format=true;
set hive.vectorized.use.row.serde.deserialize=false;
set hive.vectorized.use.vector.serde.deserialize=true;
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true;
set hive.vectorized.row.serde.inputformat.excludes=io.v6d.hive.ql.io.VineyardInputFormat;
set hive.arrow.batch.size=500;
set hive.arrow.batch.size=1;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
create table hive_dynamic_partition_data
Expand All @@ -245,9 +250,10 @@ Hive and Vineyard
(
src_id int,
dst_id int
)partitioned by(year int)
)partitioned by(mounth int, year int)
row format serde "org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe"
stored as
INPUTFORMAT 'io.v6d.hive.ql.io.VineyardInputFormat'
OUTPUTFORMAT 'io.v6d.hive.ql.io.VineyardOutputFormat';
insert into table hive_dynamic_partition_test partition(year) select src_id,dst_id,year from hive_dynamic_partition_data;
insert into table hive_dynamic_partition_test partition(mounth=1, year) select src_id,dst_id,year from hive_dynamic_partition_data;
select * from hive_dynamic_partition_test;
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@
import io.v6d.modules.basic.arrow.Arrow;
import io.v6d.modules.basic.arrow.RecordBatchBuilder;

import java.io.File;

Check notice on line 28 in java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java#L28

Unused import - java.io.File.
import java.io.IOException;
import java.util.Properties;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Check notice on line 33 in java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java#L33

Unused import - java.util.Map.

import lombok.val;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.*;
import org.apache.arrow.vector.*;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
Expand All @@ -45,6 +51,7 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.hive.common.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,6 +88,7 @@ class SinkRecordWriter implements FileSinkOperator.RecordWriter {
private Path finalOutPath;
private Properties tableProperties;
private Progressable progress;
private FileSystem fs;

// vineyard
private IPCClient client;
Expand All @@ -92,10 +100,21 @@ class SinkRecordWriter implements FileSinkOperator.RecordWriter {
private List<String> partitions;
private boolean hasData = false;

public static final PathFilter VINEYARD_FILES_PATH_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return name.startsWith("_task_tmp.") && (!name.substring(10, name.length()).startsWith("-"));
}
};

private void getTableName(Properties tableProperties) {
String location = tableProperties.getProperty("location");
// int index = location.lastIndexOf("/");
// tableName = location.substring(index + 1);
System.out.println("finalOutPath : "+ finalOutPath.toString());

// Get partition count
String partition = tableProperties.getProperty("partition_columns.types");
int index = -1;
int partitionCount= 0;
Expand All @@ -106,14 +125,57 @@ private void getTableName(Properties tableProperties) {
} while(index != -1);
}
System.out.println("Partition count:" + partitionCount);
index = location.length() + 1;
for (int i = 0; i < partitionCount; i++) {
index = finalOutPath.toString().indexOf("/", index + 1);

// Construct table name
String path = finalOutPath.toString();
path = path.substring(location.length(), path.length());
String pathSplits[] = path.split("/");
PathFilter vineyardPathFilter = VINEYARD_FILES_PATH_FILTER;
PathFilter hiddernPathFilter = FileUtils.HIDDEN_FILES_PATH_FILTER;
if (pathSplits.length == 0) {
return;
}
tableName = location;
for (int i = 1; i < pathSplits.length; i++) {
if (pathSplits[i].length() > 0 && hiddernPathFilter.accept(new Path(pathSplits[i]))) {
System.out.println("path split:" + pathSplits[i]);
tableName += "/" + pathSplits[i];
} else if (pathSplits[i].length() > 0 && vineyardPathFilter.accept(new Path(pathSplits[i]))) {
System.out.println("path split:" + pathSplits[i].substring(10, pathSplits[i].length()));
tableName += "/" + pathSplits[i].substring(10, pathSplits[i].length());
}
}
tableName = finalOutPath.toString().substring(0, index);
tableName = tableName.replace('/', '#');
tableName = tableName.replaceAll("/", "#");
System.out.println("Table name:" + tableName);
System.out.println("fina path:" + finalOutPath.toString());

// Create temp file
String tmpFilePath = finalOutPath.toString();
tmpFilePath = tmpFilePath.substring(0, tmpFilePath.lastIndexOf("/"));
System.out.println("out path:" + tmpFilePath);
// File file = FileUtils.createTempFile(outPath, "vineyard", ".tmp");
tmpFilePath = tmpFilePath.replaceAll("_task", "");
Path tmpPath = new Path(tmpFilePath, "vineyard.tmp");
try {
fs = finalOutPath.getFileSystem(jc);
System.out.println("tmp path:" + tmpPath.toString());
FSDataOutputStream output = FileSystem.create(fs, tmpPath, new FsPermission("777"));
if (output != null) {
System.out.println("Create succeed!");
output.write("test".getBytes(), 0, 4);
output.close();
}
// System.in.read();
} catch (Exception e) {
System.out.println("Create failed!");
}
// index = location.length() + 1;
// for (int i = 0; i < partitionCount; i++) {
// index = finalOutPath.toString().indexOf("/", index + 1);
// }
// tableName = finalOutPath.toString().substring(0, index);
// tableName = tableName.replace('/', '#');
// System.out.println("Table name:" + tableName);
// System.out.println("fina path:" + finalOutPath.toString());

}

Expand Down Expand Up @@ -161,6 +223,7 @@ public SinkRecordWriter(
public void write(Writable w) throws IOException {
System.out.println("write");
if (w == null) {
System.out.println("w is null");
return;
}
ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) w;
Expand All @@ -181,7 +244,9 @@ public void close(boolean abort) throws IOException {
System.out.println("close");
System.out.println("table name:" + tableName);
Table oldTable = null;
if (!hasData) {
System.out.println("has data:" + hasData);
if (schemaBuilder == null) {
System.out.println("No data to write.");
client.disconnect();
System.out.println("Bye, vineyard!");
return;
Expand Down Expand Up @@ -217,6 +282,7 @@ public void close(boolean abort) throws IOException {
throw new IOException("Seal TableBuilder failed");
}
client.disconnect();

System.out.println("Bye, vineyard!");
}

Expand Down Expand Up @@ -305,3 +371,12 @@ public void close(Reporter reporter) throws IOException {
System.out.printf("closing\n");
}
}

// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/.hive-staging_hive_2023-07-19_10-49-12_537_8312530081206971822-1/_task_tmp.-ext-10000/year=2017/_tmp.000000_0
// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/.hive-staging_hive_2023-07-19_10-49-12_537_8312530081206971822-1/-ext-10000

// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/year=2018/.hive-staging_hive_2023-07-19_10-52-50_144_3570501016920325767-1/_task_tmp.-ext-10000/_tmp.000000_0
// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/year=2018/.hive-staging_hive_2023-07-19_10-52-50_144_3570501016920325767-1/-ext-10000

//file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/.hive-staging_hive_2023-07-19_11-15-58_433_128099267613906011-1/-ext-10000
//file:/opt/hive/data/warehouse/hive_dynamic_partition_test7/.hive-staging_hive_2023-07-19_11-14-48_835_1857436151368976840-1/-ext-10000

0 comments on commit f3c4254

Please sign in to comment.