Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Hive Input/Output format to use vineyard as the storage backend #1441

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5fbb493
Test connect to vineyard.
vegetableysm Jun 28, 2023
cd065ff
Seal dataframe metadata on vineyard.
vegetableysm Jun 29, 2023
87820e4
Minor.
vegetableysm Jun 29, 2023
599536c
Seal tensor buffer.
vegetableysm Jun 30, 2023
86370e1
All changes
vegetableysm Jun 30, 2023
2976c26
Remove the dependency of vineyard_client to make the hive artifacts w…
sighingnow Jun 30, 2023
63d4318
Add seal API for BufferBuilder.
vegetableysm Jun 30, 2023
8495975
Clean code.
vegetableysm Jun 30, 2023
b6189ec
Support float data.(Write)
vegetableysm Jul 3, 2023
ae0ccde
Store hive data as a table in vineyard.
vegetableysm Jul 4, 2023
51721c6
Clean code.
vegetableysm Jul 4, 2023
cceb15d
Support VineyardInputFormat(with bugs)
vegetableysm Jul 4, 2023
d440521
Fixes the value count
sighingnow Jul 5, 2023
dfe2e84
Minor
vegetableysm Jul 5, 2023
4ce10d2
Minor
vegetableysm Jul 5, 2023
3aa620a
Enable input formats from vineyard table to hive
sighingnow Jul 5, 2023
abaff32
Fix bug that sealing string object in vineyard will trigle exceptions.
vegetableysm Jul 7, 2023
511523f
Resolve the conflict
sighingnow Jul 10, 2023
b011083
Add VineyardBatchInputFormat and other testing codes
sighingnow Jul 10, 2023
22b1912
Support add recordBatch muti times.
vegetableysm Jul 10, 2023
24c17c5
Fill data to VectorizedRowBatch.(Mem copy)
vegetableysm Jul 11, 2023
de126aa
Read data from vineyard.
vegetableysm Jul 11, 2023
125b4de
Insert table to vineayrd/Read table from vineyard.
vegetableysm Jul 11, 2023
ddda4e2
Fix bug: VineyatdSplit returns a null pointer of path.
vegetableysm Jul 12, 2023
0c2f9fd
Fix bug: index out of bound when copy table.
vegetableysm Jul 12, 2023
dd3de0f
Update hive readme.
vegetableysm Jul 13, 2023
b44edc7
Clean code
vegetableysm Jul 13, 2023
b5d0c6b
Update readme and support insert multi times.
vegetableysm Jul 14, 2023
1e0dea9
Fix bug that hive will throw "out of bound exception" when use large …
vegetableysm Jul 17, 2023
fc33fbe
Clean code.
vegetableysm Jul 17, 2023
f1e5f6a
Split large vineyard object into multi mapers.
vegetableysm Jul 18, 2023
9837e74
Fix bug : When loading data from a file with recordBatch of 1, output…
vegetableysm Jul 19, 2023
b19f9c2
Support static partition.
vegetableysm Jul 19, 2023
c4f9c05
Clean code and update readme.
vegetableysm Jul 20, 2023
a855c37
Clean code.
vegetableysm Jul 20, 2023
f3c4254
Support dynamic partition.
vegetableysm Jul 24, 2023
79ab8a8
Implement VineyardSerDe to support dynamic partition.
vegetableysm Jul 26, 2023
a0a17c6
Adapt to hive 3.1.3 and connect hive from spark
sighingnow Aug 1, 2023
b0d0aa3
Cleanup
sighingnow Aug 1, 2023
1f17f00
Shard code.
vegetableysm Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions java/hive/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,29 @@ Hive and Vineyard
stored as
INPUTFORMAT 'io.v6d.hive.ql.io.VineyardInputFormat'
OUTPUTFORMAT 'io.v6d.hive.ql.io.VineyardOutputFormat';
insert into table hive_static_partition partition(value=666) values (1, 2);
insert into table hive_static_partition partition(value=666) values (999, 2), (999, 2), (999, 2);
insert into table hive_static_partition partition(value=666) values (3, 4);
insert into table hive_static_partition partition(value=114514) values (1, 2);
select * from hive_static_partition;
select * from hive_static_partition where value=666;
select * from hive_static_partition where value=114514;

- Test static partition(BUG):
- Test dynamic partition:

We must set the batch size to 1. Because ArrowColumnarBatchSerDe do not process dynamic partitioning. If
the batch size is not set to 1, ArrowColumnarBatchSerDe will batch data from different partitions together,
and the data will be written to the same partition, which will cause the data to be written incorrectly.
vegetableysm marked this conversation as resolved.
Show resolved Hide resolved

.. code:: sql

set hive.fetch.task.conversion=none;
set hive.vectorized.use.vectorized.input.format=true;
set hive.vectorized.use.row.serde.deserialize=false;
set hive.vectorized.use.vector.serde.deserialize=true;
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true;
set hive.vectorized.row.serde.inputformat.excludes=io.v6d.hive.ql.io.VineyardInputFormat;
set hive.arrow.batch.size=500;
set hive.arrow.batch.size=1;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
create table hive_dynamic_partition_data
Expand All @@ -245,9 +250,10 @@ Hive and Vineyard
(
src_id int,
dst_id int
)partitioned by(year int)
)partitioned by(mounth int, year int)
row format serde "org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe"
stored as
INPUTFORMAT 'io.v6d.hive.ql.io.VineyardInputFormat'
OUTPUTFORMAT 'io.v6d.hive.ql.io.VineyardOutputFormat';
insert into table hive_dynamic_partition_test partition(year) select src_id,dst_id,year from hive_dynamic_partition_data;
insert into table hive_dynamic_partition_test partition(mounth=1, year) select src_id,dst_id,year from hive_dynamic_partition_data;
select * from hive_dynamic_partition_test;
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@
import io.v6d.modules.basic.arrow.Arrow;
import io.v6d.modules.basic.arrow.RecordBatchBuilder;

import java.io.File;

Check notice on line 28 in java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java#L28

Unused import - java.io.File.
import java.io.IOException;
import java.util.Properties;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Check notice on line 33 in java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java#L33

Unused import - java.util.Map.

import lombok.val;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.*;
import org.apache.arrow.vector.*;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
Expand All @@ -45,6 +51,7 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.hive.common.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,6 +88,7 @@
private Path finalOutPath;
private Properties tableProperties;
private Progressable progress;
private FileSystem fs;

// vineyard
private IPCClient client;
Expand All @@ -92,10 +100,21 @@
private List<String> partitions;
private boolean hasData = false;

public static final PathFilter VINEYARD_FILES_PATH_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return name.startsWith("_task_tmp.") && (!name.substring(10, name.length()).startsWith("-"));
}
};

private void getTableName(Properties tableProperties) {
String location = tableProperties.getProperty("location");
// int index = location.lastIndexOf("/");
// tableName = location.substring(index + 1);
System.out.println("finalOutPath : "+ finalOutPath.toString());

// Get partition count
String partition = tableProperties.getProperty("partition_columns.types");
int index = -1;
int partitionCount= 0;
Expand All @@ -106,14 +125,57 @@
} while(index != -1);
}
System.out.println("Partition count:" + partitionCount);
index = location.length() + 1;
for (int i = 0; i < partitionCount; i++) {
index = finalOutPath.toString().indexOf("/", index + 1);

// Construct table name
String path = finalOutPath.toString();
path = path.substring(location.length(), path.length());
String pathSplits[] = path.split("/");
PathFilter vineyardPathFilter = VINEYARD_FILES_PATH_FILTER;
PathFilter hiddernPathFilter = FileUtils.HIDDEN_FILES_PATH_FILTER;
if (pathSplits.length == 0) {
return;
}
tableName = location;
for (int i = 1; i < pathSplits.length; i++) {
if (pathSplits[i].length() > 0 && hiddernPathFilter.accept(new Path(pathSplits[i]))) {
System.out.println("path split:" + pathSplits[i]);
tableName += "/" + pathSplits[i];
} else if (pathSplits[i].length() > 0 && vineyardPathFilter.accept(new Path(pathSplits[i]))) {
System.out.println("path split:" + pathSplits[i].substring(10, pathSplits[i].length()));
tableName += "/" + pathSplits[i].substring(10, pathSplits[i].length());
}
}
tableName = finalOutPath.toString().substring(0, index);
tableName = tableName.replace('/', '#');
tableName = tableName.replaceAll("/", "#");
System.out.println("Table name:" + tableName);
System.out.println("fina path:" + finalOutPath.toString());

// Create temp file
String tmpFilePath = finalOutPath.toString();
tmpFilePath = tmpFilePath.substring(0, tmpFilePath.lastIndexOf("/"));
System.out.println("out path:" + tmpFilePath);
// File file = FileUtils.createTempFile(outPath, "vineyard", ".tmp");
tmpFilePath = tmpFilePath.replaceAll("_task", "");
Path tmpPath = new Path(tmpFilePath, "vineyard.tmp");
try {
fs = finalOutPath.getFileSystem(jc);
System.out.println("tmp path:" + tmpPath.toString());
FSDataOutputStream output = FileSystem.create(fs, tmpPath, new FsPermission("777"));
if (output != null) {
System.out.println("Create succeed!");
output.write("test".getBytes(), 0, 4);
output.close();
}
// System.in.read();
} catch (Exception e) {
System.out.println("Create failed!");
}
// index = location.length() + 1;
// for (int i = 0; i < partitionCount; i++) {
// index = finalOutPath.toString().indexOf("/", index + 1);
// }
// tableName = finalOutPath.toString().substring(0, index);
// tableName = tableName.replace('/', '#');
// System.out.println("Table name:" + tableName);
// System.out.println("fina path:" + finalOutPath.toString());

}

Expand Down Expand Up @@ -161,6 +223,7 @@
public void write(Writable w) throws IOException {
System.out.println("write");
if (w == null) {
System.out.println("w is null");
return;
}
ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) w;
Expand All @@ -181,7 +244,9 @@
System.out.println("close");
System.out.println("table name:" + tableName);
Table oldTable = null;
if (!hasData) {
System.out.println("has data:" + hasData);
if (schemaBuilder == null) {
System.out.println("No data to write.");
client.disconnect();
System.out.println("Bye, vineyard!");
return;
Expand Down Expand Up @@ -217,6 +282,7 @@
throw new IOException("Seal TableBuilder failed");
}
client.disconnect();

System.out.println("Bye, vineyard!");
}

Expand All @@ -234,62 +300,62 @@
recordBatchBuilders.add(recordBatchBuilder);
}

private void fillColumns(VectorSchemaRoot root, Schema schema)
throws VineyardException {
for (int i = 0; i < schema.getFields().size(); i++) {
val column = recordBatchBuilder.getColumnBuilder(i);
Field field = schema.getFields().get(i);
if (field.getType().equals(Arrow.Type.Boolean)) {
BitVector vector = (BitVector) root.getFieldVectors().get(i);
for (int j = 0; j < root.getRowCount(); j++) {
if (vector.get(j) != 0) {
column.setBoolean(j, true);
} else {
column.setBoolean(j, false);
}
}
} else if (field.getType().equals(Arrow.Type.Int)) {
IntVector vector= (IntVector) root.getFieldVectors().get(i);
// System.out.println("row count:" + root.getRowCount());
for (int j = 0; j < root.getRowCount(); j++) {
column.setInt(j, vector.get(j));
// System.out.println("int value:" + vector.get(j));
}
} else if (field.getType().equals(Arrow.Type.Int64)) {
BigIntVector vector = (BigIntVector) root.getFieldVectors().get(i);
for (int j = 0; j < root.getRowCount(); j++) {
column.setLong(j, vector.get(j));
}
} else if (field.getType().equals(Arrow.Type.Float)) {
Float4Vector vector = (Float4Vector) root.getFieldVectors().get(i);
for (int j = 0; j < root.getRowCount(); j++) {
column.setFloat(j, vector.get(j));
}
} else if (field.getType().equals(Arrow.Type.Double)) {
Float8Vector vector = (Float8Vector) root.getFieldVectors().get(i);
for (int j = 0; j < root.getRowCount(); j++) {
column.setDouble(j, vector.get(j));
}
} else if (field.getType().equals(Arrow.Type.LargeVarChar)) {
LargeVarCharVector vector = (LargeVarCharVector) root.getFieldVectors().get(i);
for (int j = 0; j < root.getRowCount(); j++) {
column.setUTF8String(j, vector.getObject(j));
}
} else if (field.getType().equals(Arrow.Type.VarChar)) {
VarCharVector vector = (VarCharVector) root.getFieldVectors().get(i);
for (int j = 0; j < root.getRowCount(); j++) {
column.setUTF8String(j, vector.getObject(j));
}
} else {
throw new VineyardException.NotImplemented(
"array builder for type " + field.getType() + " is not supported");
}
}
}
}

class MapredRecordWriter<K extends NullWritable, V extends ArrowWrapperWritable>
implements RecordWriter<K, V> {

Check notice on line 358 in java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java

View check run for this annotation

codefactor.io / CodeFactor

java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java#L303-L358

Complex Method
MapredRecordWriter() throws IOException {
System.out.printf("creating vineyard record writer\n");
throw new RuntimeException("mapred record writter: unimplemented");
Expand All @@ -305,3 +371,12 @@
System.out.printf("closing\n");
}
}

// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/.hive-staging_hive_2023-07-19_10-49-12_537_8312530081206971822-1/_task_tmp.-ext-10000/year=2017/_tmp.000000_0
// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/.hive-staging_hive_2023-07-19_10-49-12_537_8312530081206971822-1/-ext-10000

// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/year=2018/.hive-staging_hive_2023-07-19_10-52-50_144_3570501016920325767-1/_task_tmp.-ext-10000/_tmp.000000_0
// file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/year=2018/.hive-staging_hive_2023-07-19_10-52-50_144_3570501016920325767-1/-ext-10000

//file:/opt/hive/data/warehouse/hive_dynamic_partition_test6/.hive-staging_hive_2023-07-19_11-15-58_433_128099267613906011-1/-ext-10000
//file:/opt/hive/data/warehouse/hive_dynamic_partition_test7/.hive-staging_hive_2023-07-19_11-14-48_835_1857436151368976840-1/-ext-10000
Loading