Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Hive Input/Output format to use vineyard as the storage backend #1441

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5fbb493
Test connect to vineyard.
vegetableysm Jun 28, 2023
cd065ff
Seal dataframe metadata on vineyard.
vegetableysm Jun 29, 2023
87820e4
Minor.
vegetableysm Jun 29, 2023
599536c
Seal tensor buffer.
vegetableysm Jun 30, 2023
86370e1
All changes
vegetableysm Jun 30, 2023
2976c26
Remove the dependency of vineyard_client to make the hive artifacts w…
sighingnow Jun 30, 2023
63d4318
Add seal API for BufferBuilder.
vegetableysm Jun 30, 2023
8495975
Clean code.
vegetableysm Jun 30, 2023
b6189ec
Support float data.(Write)
vegetableysm Jul 3, 2023
ae0ccde
Store hive data as a table in vineyard.
vegetableysm Jul 4, 2023
51721c6
Clean code.
vegetableysm Jul 4, 2023
cceb15d
Support VineyardInputFormat(with bugs)
vegetableysm Jul 4, 2023
d440521
Fixes the value count
sighingnow Jul 5, 2023
dfe2e84
Minor
vegetableysm Jul 5, 2023
4ce10d2
Minor
vegetableysm Jul 5, 2023
3aa620a
Enable input formats from vineyard table to hive
sighingnow Jul 5, 2023
abaff32
Fix bug that sealing string object in vineyard will trigle exceptions.
vegetableysm Jul 7, 2023
511523f
Resolve the conflict
sighingnow Jul 10, 2023
b011083
Add VineyardBatchInputFormat and other testing codes
sighingnow Jul 10, 2023
22b1912
Support add recordBatch muti times.
vegetableysm Jul 10, 2023
24c17c5
Fill data to VectorizedRowBatch.(Mem copy)
vegetableysm Jul 11, 2023
de126aa
Read data from vineyard.
vegetableysm Jul 11, 2023
125b4de
Insert table to vineayrd/Read table from vineyard.
vegetableysm Jul 11, 2023
ddda4e2
Fix bug: VineyatdSplit returns a null pointer of path.
vegetableysm Jul 12, 2023
0c2f9fd
Fix bug: index out of bound when copy table.
vegetableysm Jul 12, 2023
dd3de0f
Update hive readme.
vegetableysm Jul 13, 2023
b44edc7
Clean code
vegetableysm Jul 13, 2023
b5d0c6b
Update readme and support insert multi times.
vegetableysm Jul 14, 2023
1e0dea9
Fix bug that hive will throw "out of bound exception" when use large …
vegetableysm Jul 17, 2023
fc33fbe
Clean code.
vegetableysm Jul 17, 2023
f1e5f6a
Split large vineyard object into multi mapers.
vegetableysm Jul 18, 2023
9837e74
Fix bug : When loading data from a file with recordBatch of 1, output…
vegetableysm Jul 19, 2023
b19f9c2
Support static partition.
vegetableysm Jul 19, 2023
c4f9c05
Clean code and update readme.
vegetableysm Jul 20, 2023
a855c37
Clean code.
vegetableysm Jul 20, 2023
f3c4254
Support dynamic partition.
vegetableysm Jul 24, 2023
79ab8a8
Implement VineyardSerDe to support dynamic partition.
vegetableysm Jul 26, 2023
a0a17c6
Adapt to hive 3.1.3 and connect hive from spark
sighingnow Aug 1, 2023
b0d0aa3
Cleanup
sighingnow Aug 1, 2023
1f17f00
Shard code.
vegetableysm Aug 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions java/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,18 @@ else()
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-rpath,$ORIGIN:$ORIGIN/../lib")
endif()

if(CMAKE_VERSION VERSION_LESS "3.1")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(BUILD_VINEYARD_PYPI_PACKAGES AND NOT HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
endif()
else()
set(HAVE_FLAG_STD_CXX14 TRUE)
endif()
else()
check_cxx_compiler_flag(-std=c++17 HAVE_FLAG_STD_CXX17)
check_cxx_compiler_flag(-std=c++14 HAVE_FLAG_STD_CXX14)
if(HAVE_FLAG_STD_CXX17)
set(CMAKE_CXX_STANDARD 17)
elseif(HAVE_FLAG_STD_CXX14)
set(CMAKE_CXX_STANDARD 14)
set(HAVE_FLAG_STD_CXX14 TRUE)
else()
set(CMAKE_CXX_STANDARD 11)
endif()

if(CMAKE_VERSION VERSION_LESS "3.1")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++${CMAKE_CXX_STANDARD}")
endif()

# find vineyard
Expand All @@ -128,7 +126,7 @@ macro(target_add_link_options target scope)
endmacro()

# JNI
find_package(JNI REQUIRED)
find_package(JNI COMPONENTS JVM REQUIRED)
include_directories(SYSTEM ${JAVA_INCLUDE_PATH})
include_directories(SYSTEM ${JAVA_INCLUDE_PATH2})

Expand All @@ -141,8 +139,7 @@ include_directories(${CMAKE_CURRENT_LIST_DIR}/src/main/cpp)

add_library(vineyard-core_jni ${CLIENT_SRC_FILES})
set_target_properties(vineyard-core_jni PROPERTIES OUTPUT_NAME vineyard-core_jni-${PLATFORM_NATIVE_LIBRARY_VERSION})
target_link_libraries(vineyard-core_jni PUBLIC vineyard_client
${JAVA_JVM_LIBRARY}
target_link_libraries(vineyard-core_jni PUBLIC ${JAVA_JVM_LIBRARY}
${JNI_LIBRARIES}
)
target_include_directories(vineyard-core_jni PRIVATE ${VINEYARD_INCLUDE_DIRS}
Expand Down
74 changes: 71 additions & 3 deletions java/core/src/main/cpp/ffi/ffi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,89 @@ limitations under the License.
#include <stdio.h>
#include <string.h>
#include <sys/mman.h>

#include "vineyard/common/memory/fling.h"
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>

#ifdef __cplusplus
extern "C" {
#endif

static void init_msg(struct msghdr* msg, struct iovec* iov, char* buf,
size_t buf_len) {
iov->iov_base = buf;
iov->iov_len = 1;

msg->msg_iov = iov;
msg->msg_iovlen = 1;
msg->msg_control = buf;
msg->msg_controllen = static_cast<socklen_t>(buf_len);
msg->msg_name = NULL;
msg->msg_namelen = 0;
}

static int recv_fd(int conn) {
struct msghdr msg;
struct iovec iov;
char buf[CMSG_SPACE(sizeof(int))];
init_msg(&msg, &iov, buf, sizeof(buf));

while (true) {
ssize_t r = recvmsg(conn, &msg, 0);
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
} else {
fprintf(stderr, "[error] Error in recv_fd (errno = %d: %s)\n", errno, strerror(errno));
return -1;
}
} else {
break;
}
}

int found_fd = -1;
int oh_noes = 0;
for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
header = CMSG_NXTHDR(&msg, header))
if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
ssize_t count =
(header->cmsg_len -
(CMSG_DATA(header) - reinterpret_cast<unsigned char*>(header))) /
sizeof(int);
for (int i = 0; i < count; ++i) {
int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
if (found_fd == -1) {
found_fd = fd;
} else {
close(fd);
oh_noes = 1;
}
}
}

// The sender sent us more than one file descriptor. We've closed
// them all to prevent fd leaks but notify the caller that we got
// a bad message.
if (oh_noes) {
close(found_fd);
errno = EBADMSG;
fprintf(stderr, "[error] Error in recv_fd: more than one fd received in message\n");
return -1;
}

return found_fd;
}

/*
* Class: io_v6d_core_common_memory_ffi_Fling
* Method: sendFD
* Signature: (II)I
*/
JNIEXPORT jint JNICALL Java_io_v6d_core_common_memory_ffi_Fling_sendFD(
JNIEnv*, jclass, jint conn, jint fd) {
return send_fd(conn, fd);
// return send_fd(conn, fd);
return -1;
}

/*
Expand Down
16 changes: 15 additions & 1 deletion java/core/src/main/java/io/v6d/core/client/IPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public ObjectMeta createMetaData(ObjectMeta meta) throws VineyardException {
}
}

@Override
public boolean connected() {
return channel != null && channel.isConnected();
}

@Override
public ObjectMeta getMetaData(ObjectID id, boolean sync_remote, boolean wait)
throws VineyardException {
Expand Down Expand Up @@ -299,14 +304,22 @@ public Buffer createBuffer(long size) throws VineyardException {
reply.get(this.doReadJson());

val payload = reply.getPayload();
long pointer = this.mmap(payload.getStoreFD(), payload.getMapSize(), true, true);
long pointer = this.mmap(payload.getStoreFD(), payload.getMapSize(), false, true);
val buffer = new Buffer();
buffer.setObjectId(reply.getId());
buffer.setPointer(pointer + payload.getDataOffset());
buffer.setSize(reply.getPayload().getDataSize());
return buffer;
}

public void sealBlob(ObjectID id) throws VineyardException {
val root = mapper.createObjectNode();
SealRequest.put(root, id);
this.doWrite(root);
val reply = new SealReply();
reply.get(this.doReadJson());
}
Copy link
Member

@sighingnow sighingnow Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferBuilder itself already has a seal(), method.


private void connectIPCSocket(UnixSocketAddress address) throws VineyardException.IOError {
try {
channel = UnixSocketChannel.open(address);
Expand All @@ -331,6 +344,7 @@ private void connectIPCSocketWithRetry(String pathname)
}
num_retries -= 1;
}

if (reader == null || writer == null) {
throw new VineyardException.ConnectionFailed();
}
Expand Down
15 changes: 15 additions & 0 deletions java/core/src/main/java/io/v6d/core/common/util/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.v6d.core.common.memory.Payload;
import java.util.*;
import java.io.*;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.val;
Expand Down Expand Up @@ -126,6 +127,13 @@ public static void put(ObjectNode root, long size) {
}
}

public static class SealRequest extends Request {
public static void put(ObjectNode root, ObjectID id) {
root.put("type", "seal_request");
root.put("object_id", id.value());
}
}

@Data
@EqualsAndHashCode(callSuper = false)
public static class CreateBufferReply extends Reply {
Expand All @@ -140,6 +148,13 @@ public void get(JsonNode root) throws VineyardException {
}
}

public static class SealReply extends Reply {
@Override
public void get(JsonNode root) throws VineyardException {
check(root, "seal_reply");
}
}

public static class GetBuffersRequest extends Request {
public static void put(ObjectNode root, List<ObjectID> ids) {
root.put("type", "get_buffers_request");
Expand Down
Loading