Skip to content

Commit

Permalink
Arrow array resolvers and builders.
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow committed Aug 22, 2023
1 parent 1a674d6 commit b2ac8b9
Show file tree
Hide file tree
Showing 28 changed files with 1,254 additions and 452 deletions.
11 changes: 8 additions & 3 deletions .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ jobs:
override: true
components: rustfmt, clippy

- name: Test
- name: Check
run: |
cd rust
cargo fmt --all -- --check
# cargo clippy -- -D warnings
cargo clippy -- -D warnings
cargo check
# cargo test --lib
- name: Unittest
if: false
run: |
cd rust
cargo test --lib
5 changes: 4 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
edition = "2021"
resolver = "2"

[workspace]
members = ["vineyard"]
members = [
"vineyard",
]
8 changes: 6 additions & 2 deletions rust/vineyard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ name = "vineyard"
path = "src/lib.rs"

[dependencies]
arrow = "44"
const_format = "0.2"
arrow = "45"
arrow-array = "45"
ctor = "0.2"
downcast-rs = "1.2"
env_logger = "0.9"
gensym = "0.1"
lazy_static = "1"
log = "0.4"
memmap2 = "0.7"
num-traits = "0.2"
num-derive = "0.4"
parking_lot = "0.12"
rand = "0.8"
sendfd = "0.4"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
static_str_ops = "0.1.2"
thiserror = "1.0"

[dev-dependencies]
Expand Down
85 changes: 40 additions & 45 deletions rust/vineyard/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;

use parking_lot::ReentrantMutexGuard;

use crate::common::util::json::*;
use crate::common::util::protocol::*;
use crate::common::util::status::*;
Expand Down Expand Up @@ -55,7 +57,7 @@ impl InstanceStatus {

pub trait Client {
/// Disconnect this client.
fn disconnect(&mut self) -> ();
fn disconnect(&mut self);

fn connected(&mut self) -> bool;

Expand All @@ -69,16 +71,14 @@ pub trait Client {

fn get_metadata(&mut self, id: ObjectID) -> Result<ObjectMeta>;

fn get_metadata_batch(&mut self, ids: &Vec<ObjectID>) -> Result<Vec<ObjectMeta>>;
fn get_metadata_batch(&mut self, ids: &[ObjectID]) -> Result<Vec<ObjectMeta>>;

fn fetch_and_get_metadata(&mut self, id: ObjectID) -> Result<ObjectMeta> {
self.ensure_connect()?;
let local_id = self.migrate(id)?;
return self.get_metadata(local_id);
}

fn fetch_and_get_metadata_batch(&mut self, ids: &Vec<ObjectID>) -> Result<Vec<ObjectMeta>> {
self.ensure_connect()?;
fn fetch_and_get_metadata_batch(&mut self, ids: &[ObjectID]) -> Result<Vec<ObjectMeta>> {
let mut local_ids = Vec::new();
for id in ids {
local_ids.push(self.migrate(*id)?);
Expand All @@ -87,28 +87,28 @@ pub trait Client {
}

fn drop_buffer(&mut self, id: ObjectID) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_drop_buffer_request(id)?;
self.do_write(&message_out)?;
return read_drop_buffer_reply(&self.do_read()?);
}

fn seal_buffer(&mut self, id: ObjectID) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_seal_request(id)?;
self.do_write(&message_out)?;
return read_seal_reply(&self.do_read()?);
}

fn get_data(&mut self, id: ObjectID, sync_remote: bool, wait: bool) -> Result<JSON> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_get_data_request(id, sync_remote, wait)?;
self.do_write(&message_out)?;
return read_get_data_reply(&self.do_read()?);
}

fn get_data_batch(&mut self, ids: &Vec<ObjectID>) -> Result<Vec<JSON>> {
self.ensure_connect()?;
fn get_data_batch(&mut self, ids: &[ObjectID]) -> Result<Vec<JSON>> {
let _ = self.ensure_connect()?;
let message_out = write_get_data_batch_request(&ids, false, false)?;
self.do_write(&message_out)?;
let reply = read_get_data_batch_reply(&self.do_read()?)?;
Expand All @@ -128,7 +128,7 @@ pub trait Client {
}

fn create_data(&mut self, data: &JSON) -> Result<(ObjectID, Signature, InstanceID)> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_create_data_request(data)?;
self.do_write(&message_out)?;
let reply = read_create_data_reply(&self.do_read()?)?;
Expand All @@ -145,22 +145,22 @@ pub trait Client {
}

fn delete(&mut self, id: ObjectID, force: bool, deep: bool) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_delete_data_request(id, force, deep, false)?;
self.do_write(&message_out)?;
return read_delete_data_reply(&self.do_read()?);
}

fn delete_batch(&mut self, ids: &Vec<ObjectID>, force: bool, deep: bool) -> Result<()> {
self.ensure_connect()?;
fn delete_batch(&mut self, ids: &[ObjectID], force: bool, deep: bool) -> Result<()> {
let _ = self.ensure_connect()?;
let message_out = write_delete_data_batch_request(ids, force, deep, false)?;
self.do_write(&message_out)?;
return read_delete_data_reply(&self.do_read()?);
}

/// @param pattern: The pattern of typename.
fn list_data(&mut self, pattern: &str, regex: bool, limit: usize) -> Result<Vec<JSON>> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_list_data_request(pattern, regex, limit)?;
self.do_write(&message_out)?;
return read_list_data_reply(&self.do_read()?);
Expand All @@ -173,70 +173,70 @@ pub trait Client {
regex: bool,
limit: usize,
) -> Result<HashMap<String, ObjectID>> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_list_name_request(pattern, regex, limit)?;
self.do_write(&message_out)?;
return read_list_name_reply(&self.do_read()?);
}

fn persist(&mut self, id: ObjectID) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_persist_request(id)?;
self.do_write(&message_out)?;
return read_persist_reply(&self.do_read()?);
}

fn if_persist(&mut self, id: ObjectID) -> Result<bool> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_if_persist_request(id)?;
self.do_write(&message_out)?;
return read_if_persist_reply(&self.do_read()?);
}

fn exists(&mut self, id: ObjectID) -> Result<bool> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_exists_request(id)?;
self.do_write(&message_out)?;
return read_exists_reply(&self.do_read()?);
}

fn put_name(&mut self, id: ObjectID, name: &str) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_put_name_request(id, name)?;
self.do_write(&message_out)?;
return read_put_name_reply(&self.do_read()?);
}

fn get_name(&mut self, name: &String, wait: bool) -> Result<ObjectID> {
self.ensure_connect()?;
fn get_name(&mut self, name: &str, wait: bool) -> Result<ObjectID> {
let _ = self.ensure_connect()?;
let message_out = write_get_name_request(name, wait)?;
self.do_write(&message_out)?;
return read_get_name_reply(&self.do_read()?);
}

fn drop_name(&mut self, name: &String) -> Result<()> {
self.ensure_connect()?;
fn drop_name(&mut self, name: &str) -> Result<()> {
let _ = self.ensure_connect()?;
let message_out = write_drop_name_request(name)?;
self.do_write(&message_out)?;
return read_drop_name_reply(&self.do_read()?);
}

fn migrate(&mut self, id: ObjectID) -> Result<ObjectID> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_migrate_object_request(id)?;
self.do_write(&message_out)?;
return read_migrate_object_reply(&self.do_read()?);
}

fn clear(&mut self) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let message_out = write_clear_request()?;
self.do_write(&message_out)?;
return read_clear_reply(&self.do_read()?);
}

fn label(&mut self, id: ObjectID, key: &str, value: &str) -> Result<()> {
self.ensure_connect()?;
let _ = self.ensure_connect()?;
let keys: Vec<String> = vec![key.into()];
let values: Vec<String> = vec![value.into()];
let message_out = write_label_request(id, &keys, &values)?;
Expand All @@ -245,51 +245,46 @@ pub trait Client {
}

fn evict(&mut self, id: ObjectID) -> Result<()> {
self.ensure_connect()?;
let message_out = write_evict_request(&vec![id])?;
let _ = self.ensure_connect()?;
let message_out = write_evict_request(&[id])?;
self.do_write(&message_out)?;
return read_evict_reply(&self.do_read()?);
}

fn evict_batch(&mut self, ids: &Vec<ObjectID>) -> Result<()> {
self.ensure_connect()?;
fn evict_batch(&mut self, ids: &[ObjectID]) -> Result<()> {
let _ = self.ensure_connect()?;
let message_out = write_evict_request(ids)?;
self.do_write(&message_out)?;
return read_evict_reply(&self.do_read()?);
}

fn load(&mut self, id: ObjectID, pin: bool) -> Result<()> {
self.ensure_connect()?;
let message_out = write_load_request(&vec![id], pin)?;
let _ = self.ensure_connect()?;
let message_out = write_load_request(&[id], pin)?;
self.do_write(&message_out)?;
return read_load_reply(&self.do_read()?);
}

fn load_batch(&mut self, ids: &Vec<ObjectID>, pin: bool) -> Result<()> {
self.ensure_connect()?;
fn load_batch(&mut self, ids: &[ObjectID], pin: bool) -> Result<()> {
let _ = self.ensure_connect()?;
let message_out = write_load_request(ids, pin)?;
self.do_write(&message_out)?;
return read_load_reply(&self.do_read()?);
}

fn unpin(&mut self, id: ObjectID) -> Result<()> {
self.ensure_connect()?;
let message_out = write_unpin_request(&vec![id])?;
let _ = self.ensure_connect()?;
let message_out = write_unpin_request(&[id])?;
self.do_write(&message_out)?;
return read_unpin_reply(&self.do_read()?);
}

fn unpin_batch(&mut self, ids: &Vec<ObjectID>) -> Result<()> {
self.ensure_connect()?;
fn unpin_batch(&mut self, ids: &[ObjectID]) -> Result<()> {
let _ = self.ensure_connect()?;
let message_out = write_unpin_request(ids)?;
self.do_write(&message_out)?;
return read_unpin_reply(&self.do_read()?);
}

fn ensure_connect(&mut self) -> Result<()> {
if !self.connected() {
return Err(VineyardError::io_error("client not connected".into()));
}
return Ok(());
}
fn ensure_connect(&mut self) -> Result<ReentrantMutexGuard<'_, ()>>;
}
Loading

0 comments on commit b2ac8b9

Please sign in to comment.