Skip to content

Commit

Permalink
Clean code.
Browse files Browse the repository at this point in the history
Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Jun 30, 2023
1 parent 63d4318 commit 8495975
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 216 deletions.
41 changes: 1 addition & 40 deletions java/core/src/main/java/io/v6d/core/client/IPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,36 +294,21 @@ public ClusterStatus getClusterStatus() throws VineyardException {
}

public Buffer createBuffer(long size) throws VineyardException {
System.out.println("createBuffer:" + size);
if (size == 0) {
return Buffer.empty();
}
System.out.println("stage: 1");
val root = mapper.createObjectNode();
System.out.println("stage: 2");
CreateBufferRequest.put(root, size);
System.out.println("stage: 3");
this.doWrite(root);
System.out.println("stage: 4");
val reply = new CreateBufferReply();
System.out.println("stage: 5");
reply.get(this.doReadJson());

System.out.println("stage: 6");
val payload = reply.getPayload();
System.out.println("stage: 7");
System.out.println("size: " + payload.getMapSize());
System.out.println("fd: " + payload.getStoreFD());
long pointer = this.mmap(payload.getStoreFD(), payload.getMapSize(), false, true);
System.out.println("stage: 8");
val buffer = new Buffer();
System.out.println("stage: 9");
buffer.setObjectId(reply.getId());
System.out.println("stage: 10");
buffer.setPointer(pointer + payload.getDataOffset());
System.out.println("stage: 11");
buffer.setSize(reply.getPayload().getDataSize());
System.out.println("stage: 12");
return buffer;
}

Expand All @@ -338,14 +323,10 @@ public void sealBlob(ObjectID id) throws VineyardException {

private void connectIPCSocket(UnixSocketAddress address) throws VineyardException.IOError {
try {
System.out.println("open:" + address.path());
channel = UnixSocketChannel.open(address);
System.out.println("open success.");
} catch (IOException e) {
System.out.println("Failed to connect to IPC socket: " + e.getMessage());
throw new VineyardException.IOError(e.getMessage());
}
System.out.println("Connected to IPC socket successfully.");
writer = new LittleEndianDataOutputStream(Channels.newOutputStream(channel));

Check warning on line 330 in java/core/src/main/java/io/v6d/core/client/IPCClient.java

View check run for this annotation

codefactor.io / CodeFactor

java/core/src/main/java/io/v6d/core/client/IPCClient.java#L330

Resolve unexpected comment. (com.puppycrawl.tools.checkstyle.checks.TodoCommentCheck)
reader = new LittleEndianDataInputStream(Channels.newInputStream(channel));
}
Expand All @@ -355,8 +336,6 @@ private void connectIPCSocketWithRetry(String pathname)
throws VineyardException.ConnectionFailed {
val address = new UnixSocketAddress(new File(pathname).getAbsolutePath());
int num_retries = NUM_CONNECT_ATTEMPTS;
System.out.println("Connecting to " + pathname);
System.out.println("address:" + address.path());
while (num_retries > 0) {
try {
connectIPCSocket(address);
Expand All @@ -366,13 +345,8 @@ private void connectIPCSocketWithRetry(String pathname)
}
num_retries -= 1;
}
System.out.println("Connected to IPC socket successfully2.");

if (reader == null || writer == null) {
if (reader == null) {
System.out.println("reader is null");
} else {
System.out.println("writer is null");
}
throw new VineyardException.ConnectionFailed();
}
}
Expand All @@ -399,22 +373,15 @@ private Map<ObjectID, Buffer> getBuffers(Set<ObjectID> ids) throws VineyardExcep

private long mmap(int fd, long mapSize, boolean readonly, boolean realign)
throws VineyardException {
System.out.println("mmap start");
if (mmapTable.containsKey(fd)) {
System.out.println("mmap exists");
return mmapTable.get(fd);
}
System.out.println("tans fd");
System.out.println("channel fd:" + this.channel.getFD());
int client_fd = Fling.recvFD(this.channel.getFD());
System.out.println("client fd:" + client_fd);
long pointer = Fling.mapSharedMem(client_fd, mapSize, readonly, realign);
System.out.println("map pointer:" + pointer);
if (pointer == -1) {
throw new VineyardException.UnknownError("mmap failed for fd " + fd);
}
mmapTable.put(fd, pointer);
System.out.println("set in map");
return pointer;
}

Expand All @@ -437,25 +404,19 @@ private void doWrite(JsonNode node) {

@SneakyThrows(IOException.class)
private byte[] doRead() {
System.out.println("doRead");
int length = (int) reader.readLong(); // n.b.: the server writes a size_t (long)
val content = new byte[length];
int done = 0, remaining = length;
System.out.println("length:" + length);
while (done < length) {
int batch = reader.read(content, done, remaining);
done += batch;
remaining -= batch;
System.out.println("done:" + done);
System.out.println("remaining:" + remaining);
}
System.out.println("doRead end");
return content;
}

@SneakyThrows(IOException.class)
private JsonNode doReadJson() {
System.out.println("doReadJson");
return mapper.readTree(doRead());
}
}
9 changes: 0 additions & 9 deletions java/core/src/main/java/io/v6d/core/common/util/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,8 @@ public static class CreateBufferReply extends Reply {
@Override
public void get(JsonNode root) throws VineyardException {
check(root, "create_buffer_reply");

System.out.println("get id");

this.id = new ObjectID(JSON.getLong(root, "id"));
System.out.println(root.toString());

System.out.println("id:" + this.id.value());
System.out.println("get payload");

this.payload = Payload.fromJson(root.get("created"));
System.out.println("end");
}
}

Expand Down
84 changes: 19 additions & 65 deletions java/hive/src/main/java/io/v6d/hive/ql/io/VineyardOutputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,16 @@

import io.v6d.core.common.util.VineyardException;
import io.v6d.core.client.IPCClient;
import io.v6d.modules.basic.arrow.*;
import io.v6d.modules.basic.dataframe.DataFrameBuilder;
import io.v6d.modules.basic.tensor.TensorBuilder;
import io.v6d.modules.basic.tensor.ITensor;
import io.v6d.modules.basic.arrow.BufferBuilder;
import io.v6d.core.client.ds.ObjectMeta;

import java.io.IOException;
import java.util.Properties;
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
Expand Down Expand Up @@ -107,6 +101,7 @@ public SinkRecordWriter(
System.out.printf("final out path: %s\n", finalOutPath);
// connect to vineyard
if (client == null) {
// TBD: get vineyard socket path from table properties
client = new IPCClient("/tmp/vineyard.sock");
}
if (client == null || !client.connected()) {
Expand All @@ -123,74 +118,33 @@ public void write(Writable w) throws IOException {
VectorSchemaRoot root = ((ArrowWrapperWritable) w).getVectorSchemaRoot();
org.apache.arrow.vector.types.pojo.Schema schema = root.getSchema();

// why?
// System.out.printf("row count: %d\n", root.getRowCount());
// for (int i = 0; i < root.getRowCount(); ++i) {
// System.out.printf("row %d: ", i);
// for (int j = 0; j < structVector.getChildrenFromFields().size(); ++j) {
// System.out.printf("%s ", structVector.getChildrenFromFields().get(j).getObject(i));
// }
// System.out.printf("\n");
// }
// System.out.println("==============");
// for (int i = 0; i < structVector.getChildrenFromFields().size(); ++i) {
// System.out.printf("field %d: %s\n", i, structVector.getChildrenFromFields().get(i).getName());
// }
System.out.println("==============");

for (int i = 0; i < root.getRowCount(); i++) {
System.out.printf("row %d: ", i);
for (int j = 0; j < root.getFieldVectors().size(); ++j) {
System.out.printf("%s ", root.getFieldVectors().get(j).getObject(i));
}
System.out.printf("\n");
}

// try {
// BufferBuilder bufferBuilder = new BufferBuilder(client, 100);
// ObjectMeta meta = bufferBuilder.seal(client);
// System.out.println("buffer id: " + meta.getId().value());
// } catch (Exception e) {
// System.out.printf("failed to create buffer builder: %s\n", e);
// }

System.out.println("==============");
for (int j = 0; j < schema.getFields().size(); ++j) {
System.out.printf(schema.getFields().get(j).getName() + " ");
try {
dataFrameBuilder = new DataFrameBuilder(client);
} catch (Exception e) {
throw new IOException("Create DataFrameBuilder failed");
}
for (int j = 0; j < schema.getFields().size(); ++j) {
System.out.printf(schema.getFields().get(j).getName() + " ");
}
System.out.printf("\n");

dataFrameBuilder = new DataFrameBuilder(client);
// Create Tensors
for (int i = 0; i < schema.getFields().size(); i++) {
ArrowTypeID arrowTypeID = schema.getFields().get(i).getType().getTypeID();
switch(arrowTypeID) {
// TODO: other type
case Int:
System.out.printf("int\n");
List<Integer> shape = new ArrayList<Integer>(1);
shape.add(root.getRowCount());
tensorBuilder = new TensorBuilder(client, shape, root.getFieldVectors().get(i));
// tensorBuilder.setValues(root.getFieldVectors().get(i));
// tensorBuilder = new TensorBuilder(root.getFieldVectors().get(i));
// column
dataFrameBuilder.addColumn(schema.getFields().get(i).getName(), tensorBuilder);
break;
default:
System.out.printf("unsupported arrow type: %s\n", arrowTypeID);
break;
List<Integer> shape = new ArrayList<Integer>(1);
shape.add(root.getRowCount());
try {
tensorBuilder = new TensorBuilder(client, shape, root.getFieldVectors().get(i));
dataFrameBuilder.addColumn(schema.getFields().get(i).getName(), tensorBuilder);
} catch (Exception e) {
throw new IOException("Create TensorBuilder failed");
}
}
}

@Override
public void close(boolean abort) throws IOException {
System.out.println("vineyard filesink operator closing\n");
ObjectMeta meta = dataFrameBuilder.seal(client);
System.out.printf("DataFrame id:" + meta.getId().value());
System.out.println("vineyard filesink operator closing");
try {
ObjectMeta meta = dataFrameBuilder.seal(client);
System.out.println("DataFrame id:" + meta.getId().value());
} catch (Exception e) {
throw new IOException("Seal DataFrame failed");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ public BufferBuilder(IPCClient client, long size) throws VineyardException {
if (size == 0) {
this.arrowBuf = null;
} else {
System.out.println("create arrow buffer");
this.arrowBuf =
new ArrowBuf(
ReferenceManager.NO_OP, null, buffer.getSize(), buffer.getPointer());
}
System.out.println("create buffer builder end");
}

public BufferBuilder(IPCClient client, final ArrowBuf buffer) throws VineyardException {
Expand Down
Loading

0 comments on commit 8495975

Please sign in to comment.