Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Hive Input/Output format to use vineyard as the storage backend #1441

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5fbb493
Test connect to vineyard.
vegetableysm Jun 28, 2023
cd065ff
Seal dataframe metadata on vineyard.
vegetableysm Jun 29, 2023
87820e4
Minor.
vegetableysm Jun 29, 2023
599536c
Seal tensor buffer.
vegetableysm Jun 30, 2023
86370e1
All changes
vegetableysm Jun 30, 2023
2976c26
Remove the dependency of vineyard_client to make the hive artifacts w…
sighingnow Jun 30, 2023
63d4318
Add seal API for BufferBuilder.
vegetableysm Jun 30, 2023
8495975
Clean code.
vegetableysm Jun 30, 2023
b6189ec
Support float data.(Write)
vegetableysm Jul 3, 2023
ae0ccde
Store hive data as a table in vineyard.
vegetableysm Jul 4, 2023
51721c6
Clean code.
vegetableysm Jul 4, 2023
cceb15d
Support VineyardInputFormat(with bugs)
vegetableysm Jul 4, 2023
d440521
Fixes the value count
sighingnow Jul 5, 2023
dfe2e84
Minor
vegetableysm Jul 5, 2023
4ce10d2
Minor
vegetableysm Jul 5, 2023
3aa620a
Enable input formats from vineyard table to hive
sighingnow Jul 5, 2023
abaff32
Fix bug that sealing string object in vineyard will trigle exceptions.
vegetableysm Jul 7, 2023
511523f
Resolve the conflict
sighingnow Jul 10, 2023
b011083
Add VineyardBatchInputFormat and other testing codes
sighingnow Jul 10, 2023
22b1912
Support add recordBatch muti times.
vegetableysm Jul 10, 2023
24c17c5
Fill data to VectorizedRowBatch.(Mem copy)
vegetableysm Jul 11, 2023
de126aa
Read data from vineyard.
vegetableysm Jul 11, 2023
125b4de
Insert table to vineayrd/Read table from vineyard.
vegetableysm Jul 11, 2023
ddda4e2
Fix bug: VineyatdSplit returns a null pointer of path.
vegetableysm Jul 12, 2023
0c2f9fd
Fix bug: index out of bound when copy table.
vegetableysm Jul 12, 2023
dd3de0f
Update hive readme.
vegetableysm Jul 13, 2023
b44edc7
Clean code
vegetableysm Jul 13, 2023
b5d0c6b
Update readme and support insert multi times.
vegetableysm Jul 14, 2023
1e0dea9
Fix bug that hive will throw "out of bound exception" when use large …
vegetableysm Jul 17, 2023
fc33fbe
Clean code.
vegetableysm Jul 17, 2023
f1e5f6a
Split large vineyard object into multi mapers.
vegetableysm Jul 18, 2023
9837e74
Fix bug : When loading data from a file with recordBatch of 1, output…
vegetableysm Jul 19, 2023
b19f9c2
Support static partition.
vegetableysm Jul 19, 2023
c4f9c05
Clean code and update readme.
vegetableysm Jul 20, 2023
a855c37
Clean code.
vegetableysm Jul 20, 2023
f3c4254
Support dynamic partition.
vegetableysm Jul 24, 2023
79ab8a8
Implement VineyardSerDe to support dynamic partition.
vegetableysm Jul 26, 2023
a0a17c6
Adapt to hive 3.1.3 and connect hive from spark
sighingnow Aug 1, 2023
b0d0aa3
Cleanup
sighingnow Aug 1, 2023
1f17f00
Shard code.
vegetableysm Aug 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vineyard/pkg/common/memory/fling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int send_fd(int conn, int fd) {
}
} else if (r == 0) {
std::clog << "[error] Encountered unexpected EOF" << std::endl;
return 0;
return -1;
} else {
return static_cast<int>(r);
}
Expand Down
29 changes: 13 additions & 16 deletions java/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,18 @@ else()
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-rpath,$ORIGIN:$ORIGIN/../lib")
endif()

if(CMAKE_VERSION VERSION_LESS "3.1")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
endif()
else()
set(HAVE_FLAG_STD_CXX14 TRUE)
endif()
else()
check_cxx_compiler_flag(-std=c++17 HAVE_FLAG_STD_CXX17)
check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(HAVE_FLAG_STD_CXX17)
set(CMAKE_CXX_STANDARD 17)
elseif(HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_STANDARD 14)
set(HAVE_FLAG_STD_CXX14 TRUE)
else()
set(CMAKE_CXX_STANDARD 11)
endif()

if(CMAKE_VERSION VERSION_LESS "3.1")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++${CMAKE_CXX_STANDARD}")
endif()

# find vineyard
Expand All @@ -128,7 +126,7 @@ macro(target_add_link_options target scope)
endmacro()

# JNI
find_package(JNI REQUIRED)
find_package(JNI COMPONENTS JVM REQUIRED)
include_directories(SYSTEM ${JAVA_INCLUDE_PATH})
include_directories(SYSTEM ${JAVA_INCLUDE_PATH2})

Expand All @@ -141,8 +139,7 @@ include_directories(${CMAKE_CURRENT_LIST_DIR}/src/main/cpp)

add_library(vineyard-core_jni ${CLIENT_SRC_FILES})
set_target_properties(vineyard-core_jni PROPERTIES OUTPUT_NAME vineyard-core_jni-${PLATFORM_NATIVE_LIBRARY_VERSION})
target_link_libraries(vineyard-core_jni PUBLIC vineyard_client
${JAVA_JVM_LIBRARY}
target_link_libraries(vineyard-core_jni PUBLIC ${JAVA_JVM_LIBRARY}
${JNI_LIBRARIES}
)
target_include_directories(vineyard-core_jni PRIVATE ${VINEYARD_INCLUDE_DIRS}
Expand Down
74 changes: 71 additions & 3 deletions java/core/src/main/cpp/ffi/ffi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,89 @@ limitations under the License.
#include <stdio.h>
#include <string.h>
#include <sys/mman.h>

#include "vineyard/common/memory/fling.h"
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>

#ifdef __cplusplus
extern "C" {
#endif

static void init_msg(struct msghdr* msg, struct iovec* iov, char* buf,
size_t buf_len) {
iov->iov_base = buf;
iov->iov_len = 1;

msg->msg_iov = iov;
msg->msg_iovlen = 1;
msg->msg_control = buf;
msg->msg_controllen = static_cast<socklen_t>(buf_len);
msg->msg_name = NULL;
msg->msg_namelen = 0;
}

static int recv_fd(int conn) {
struct msghdr msg;
struct iovec iov;
char buf[CMSG_SPACE(sizeof(int))];
init_msg(&msg, &iov, buf, sizeof(buf));

while (true) {
ssize_t r = recvmsg(conn, &msg, 0);
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
} else {
fprintf(stderr, "[error] Error in recv_fd (errno = %d: %s)\n", errno, strerror(errno));
return -1;
}
} else {
break;
}
}

int found_fd = -1;
int oh_noes = 0;
for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
header = CMSG_NXTHDR(&msg, header))
if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
ssize_t count =
(header->cmsg_len -
(CMSG_DATA(header) - reinterpret_cast<unsigned char*>(header))) /
sizeof(int);
for (int i = 0; i < count; ++i) {
int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
if (found_fd == -1) {
found_fd = fd;
} else {
close(fd);
oh_noes = 1;
}
}
}

// The sender sent us more than one file descriptor. We've closed
// them all to prevent fd leaks but notify the caller that we got
// a bad message.
if (oh_noes) {
close(found_fd);
errno = EBADMSG;
fprintf(stderr, "[error] Error in recv_fd: more than one fd received in message\n");
return -1;
}

return found_fd;
}

/*
* Class: io_v6d_core_common_memory_ffi_Fling
* Method: sendFD
* Signature: (II)I
*/
JNIEXPORT jint JNICALL Java_io_v6d_core_common_memory_ffi_Fling_sendFD(
JNIEnv*, jclass, jint conn, jint fd) {
return send_fd(conn, fd);
// return send_fd(conn, fd);
return -1;
}

/*
Expand Down
56 changes: 55 additions & 1 deletion java/core/src/main/java/io/v6d/core/client/IPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@
}
}

@Override
public boolean connected() {
return channel != null && channel.isConnected();
}

@Override
public ObjectMeta getMetaData(ObjectID id, boolean sync_remote, boolean wait)
throws VineyardException {
Expand Down Expand Up @@ -289,30 +294,58 @@
}

public Buffer createBuffer(long size) throws VineyardException {
System.out.println("createBuffer:" + size);
if (size == 0) {
return Buffer.empty();
}
System.out.println("stage: 1");
val root = mapper.createObjectNode();
System.out.println("stage: 2");
CreateBufferRequest.put(root, size);
System.out.println("stage: 3");
this.doWrite(root);
System.out.println("stage: 4");
val reply = new CreateBufferReply();
System.out.println("stage: 5");
reply.get(this.doReadJson());

System.out.println("stage: 6");
val payload = reply.getPayload();
long pointer = this.mmap(payload.getStoreFD(), payload.getMapSize(), true, true);
System.out.println("stage: 7");
System.out.println("size: " + payload.getMapSize());
System.out.println("fd: " + payload.getStoreFD());
long pointer = this.mmap(payload.getStoreFD(), payload.getMapSize(), false, true);
System.out.println("stage: 8");
val buffer = new Buffer();
System.out.println("stage: 9");
buffer.setObjectId(reply.getId());
System.out.println("stage: 10");
buffer.setPointer(pointer + payload.getDataOffset());
System.out.println("stage: 11");
buffer.setSize(reply.getPayload().getDataSize());
System.out.println("stage: 12");
return buffer;
}

// TODO: exception

Check warning on line 330 in java/core/src/main/java/io/v6d/core/client/IPCClient.java

View check run for this annotation

codefactor.io / CodeFactor

java/core/src/main/java/io/v6d/core/client/IPCClient.java#L330

Resolve unexpected comment. (com.puppycrawl.tools.checkstyle.checks.TodoCommentCheck)
public void sealBlob(ObjectID id) throws VineyardException {
val root = mapper.createObjectNode();
SealRequest.put(root, id);
this.doWrite(root);
val reply = new SealReply();
reply.get(this.doReadJson());
}
Copy link
Member

@sighingnow sighingnow Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferBuilder itself already has a seal(), method.


private void connectIPCSocket(UnixSocketAddress address) throws VineyardException.IOError {
try {
System.out.println("open:" + address.path());
channel = UnixSocketChannel.open(address);
System.out.println("open success.");
} catch (IOException e) {
System.out.println("Failed to connect to IPC socket: " + e.getMessage());
throw new VineyardException.IOError(e.getMessage());
}
System.out.println("Connected to IPC socket successfully.");
writer = new LittleEndianDataOutputStream(Channels.newOutputStream(channel));
reader = new LittleEndianDataInputStream(Channels.newInputStream(channel));
}
Expand All @@ -322,6 +355,8 @@
throws VineyardException.ConnectionFailed {
val address = new UnixSocketAddress(new File(pathname).getAbsolutePath());
int num_retries = NUM_CONNECT_ATTEMPTS;
System.out.println("Connecting to " + pathname);
System.out.println("address:" + address.path());
while (num_retries > 0) {
try {
connectIPCSocket(address);
Expand All @@ -331,7 +366,13 @@
}
num_retries -= 1;
}
System.out.println("Connected to IPC socket successfully2.");
if (reader == null || writer == null) {
if (reader == null) {
System.out.println("reader is null");
} else {
System.out.println("writer is null");
}
throw new VineyardException.ConnectionFailed();
}
}
Expand All @@ -358,15 +399,22 @@

private long mmap(int fd, long mapSize, boolean readonly, boolean realign)
throws VineyardException {
System.out.println("mmap start");
if (mmapTable.containsKey(fd)) {
System.out.println("mmap exists");
return mmapTable.get(fd);
}
System.out.println("tans fd");
System.out.println("channel fd:" + this.channel.getFD());
int client_fd = Fling.recvFD(this.channel.getFD());
System.out.println("client fd:" + client_fd);
long pointer = Fling.mapSharedMem(client_fd, mapSize, readonly, realign);
System.out.println("map pointer:" + pointer);
if (pointer == -1) {
throw new VineyardException.UnknownError("mmap failed for fd " + fd);
}
mmapTable.put(fd, pointer);
System.out.println("set in map");
return pointer;
}

Expand All @@ -389,19 +437,25 @@

@SneakyThrows(IOException.class)
private byte[] doRead() {
System.out.println("doRead");
int length = (int) reader.readLong(); // n.b.: the server writes a size_t (long)
val content = new byte[length];
int done = 0, remaining = length;
System.out.println("length:" + length);
while (done < length) {
int batch = reader.read(content, done, remaining);
done += batch;
remaining -= batch;
System.out.println("done:" + done);
System.out.println("remaining:" + remaining);
}
System.out.println("doRead end");
return content;
}

@SneakyThrows(IOException.class)
private JsonNode doReadJson() {
System.out.println("doReadJson");
return mapper.readTree(doRead());
}
}
24 changes: 24 additions & 0 deletions java/core/src/main/java/io/v6d/core/common/util/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.v6d.core.common.memory.Payload;
import java.util.*;
import java.io.*;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.val;
Expand Down Expand Up @@ -126,6 +127,13 @@ public static void put(ObjectNode root, long size) {
}
}

public static class SealRequest extends Request {
public static void put(ObjectNode root, ObjectID id) {
root.put("type", "seal_request");
root.put("object_id", id.value());
}
}

@Data
@EqualsAndHashCode(callSuper = false)
public static class CreateBufferReply extends Reply {
Expand All @@ -135,8 +143,24 @@ public static class CreateBufferReply extends Reply {
@Override
public void get(JsonNode root) throws VineyardException {
check(root, "create_buffer_reply");

System.out.println("get id");

this.id = new ObjectID(JSON.getLong(root, "id"));
System.out.println(root.toString());

System.out.println("id:" + this.id.value());
System.out.println("get payload");

this.payload = Payload.fromJson(root.get("created"));
System.out.println("end");
}
}

public static class SealReply extends Reply {
@Override
public void get(JsonNode root) throws VineyardException {
check(root, "seal_reply");
}
}

Expand Down
1 change: 1 addition & 0 deletions java/hive/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Using docker to launch the hive server:
-v `pwd`/java/hive/target:/opt/hive/auxlib \
--env HIVE_AUX_JARS_PATH=/opt/hive/auxlib/ \
--env SERVICE_NAME=hiveserver2 \
--env SERVICE_OPTS="-Djnr.ffi.asm.enabled=false" \
--name hive \
apache/hive:${HIVE_VERSION}

Expand Down
Loading