Skip to content

Commit

Permalink
chore: clean code and enable error on warning (#149)
Browse files Browse the repository at this point in the history
* chore: clean code and enable error on warning

Signed-off-by: usamoi <[email protected]>

* fix: remove tests on macos

Signed-off-by: usamoi <[email protected]>

---------

Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi authored Nov 20, 2023
1 parent ddc699b commit 47583cc
Show file tree
Hide file tree
Showing 30 changed files with 163 additions and 206 deletions.
10 changes: 3 additions & 7 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ env:
RUST_BACKTRACE: 1
SCCACHE_GHA_ENABLED: true
RUSTC_WRAPPER: sccache
RUSTFLAGS: "-Dwarnings"

jobs:
matrix:
Expand All @@ -49,22 +50,17 @@ jobs:
with:
script: |
let matrix;
if("${{ github.event_name }}" == "push" || "${{ github.event_name }}" == "pull_request"){
if ("${{ github.event_name }}" == "pull_request") {
matrix = [
{ version: 15, os: "ubuntu-latest" },
];
}
if("${{ github.event_name }}" == "merge_group" || "${{ github.event_name }}" == "workflow_dispatch"){
} else {
matrix = [
{ version: 12, os: "ubuntu-latest" },
{ version: 13, os: "ubuntu-latest" },
{ version: 14, os: "ubuntu-latest" },
{ version: 15, os: "ubuntu-latest" },
{ version: 16, os: "ubuntu-latest" },
{ version: 12, os: "macos-latest" },
{ version: 13, os: "macos-latest" },
{ version: 14, os: "macos-latest" },
{ version: 15, os: "macos-latest" },
];
}
core.setOutput('matrix', JSON.stringify(matrix));
Expand Down
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ validator = { version = "0.16.1", features = ["derive"] }
toml = "0.7.6"
rayon = "1.6.1"
uuid = { version = "1.4.1", features = ["serde"] }
rustix = { version = "0.38.20", features = ["net", "mm", "shm"] }
rustix = { version = "0.38.20", features = ["net", "mm"] }
arc-swap = "1.6.0"
bytemuck = "1.14.0"
bytemuck = { version = "1.14.0", features = ["extern_crate_alloc"] }
serde_with = "3.4.0"
chrono = { version = "0.4.31", features = ["clock"] }
chrono-tz = "0.8.4"
Expand All @@ -65,3 +65,10 @@ opt-level = 3
lto = "fat"
codegen-units = 1
debug = true

[lints.clippy]
needless_range_loop = "allow"
derivable_impls = "allow"
unnecessary_literal_unwrap = "allow"
too_many_arguments = "allow"
unnecessary_unwrap = "allow"
2 changes: 1 addition & 1 deletion src/algorithms/clustering/elkan_k_means.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl Square {
Self {
x,
y,
v: unsafe { Box::new_uninit_slice(x * y).assume_init() },
v: bytemuck::zeroed_slice_box(x * y),
}
}
}
Expand Down
15 changes: 5 additions & 10 deletions src/algorithms/diskann/vamana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub struct SearchState {
l: usize,
/// Number of results to return.
//TODO: used during search.
#[allow(dead_code)]
k: usize,
}

Expand Down Expand Up @@ -94,7 +93,7 @@ impl SearchState {
self.heap
.push(Reverse(VertexWithDistance::new(vertex_id, distance)));
self.candidates.insert(distance, vertex_id);
if self.candidates.len() > self.l as usize {
if self.candidates.len() > self.l {
self.candidates.pop_last();
}
}
Expand All @@ -110,7 +109,6 @@ impl SearchState {
}
}

#[allow(unused)]
pub struct VamanaImpl {
raw: Arc<Raw>,

Expand Down Expand Up @@ -197,7 +195,7 @@ impl VamanaImpl {
break;
}

results.push((Scalar::from(distance), row));
results.push((distance, row));
}
let mut res_vec: Vec<(Scalar, u64)> =
results.iter().map(|x| (x.0, self.raw.data(x.1))).collect();
Expand Down Expand Up @@ -255,7 +253,7 @@ impl VamanaImpl {
}
}
} else {
neighbor_ids = (0..n).into_iter().collect();
neighbor_ids = (0..n).collect();
}

{
Expand All @@ -272,10 +270,8 @@ impl VamanaImpl {
guard: &mut RwLockWriteGuard<u32>,
) {
assert!(neighbor_ids.len() <= self.r as usize);
let mut i = 0;
for item in neighbor_ids {
for (i, item) in neighbor_ids.iter().enumerate() {
self.neighbors[vertex_index as usize * self.r as usize + i].store(*item);
i += 1;
}
**guard = neighbor_ids.len() as u32;
}
Expand Down Expand Up @@ -343,7 +339,6 @@ impl VamanaImpl {
.for_each(|id| self.search_and_prune_for_one_vertex(id, alpha, r, l));
}

#[allow(unused_assignments)]
fn search_and_prune_for_one_vertex(&self, id: u32, alpha: f32, r: u32, l: usize) {
let query = self.raw.vector(id);
let mut state = self._greedy_search(self.medoid, query, 1, l);
Expand All @@ -364,7 +359,7 @@ impl VamanaImpl {
let mut guard = self.neighbor_size[neighbor_id as usize].write();
let old_neighbors = self._get_neighbors_with_write_guard(neighbor_id, &guard);
let mut old_neighbors: HashSet<u32> =
old_neighbors.into_iter().map(|x| x.load()).collect();
old_neighbors.iter().map(|x| x.load()).collect();
old_neighbors.insert(id);
if old_neighbors.len() > r as usize {
// need robust prune
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Flat {
}

pub fn vector(&self, i: u32) -> &[Scalar] {
&self.mmap.raw.vector(i)
self.mmap.raw.vector(i)
}

pub fn data(&self, i: u32) -> u64 {
Expand Down
7 changes: 3 additions & 4 deletions src/algorithms/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Hnsw {
}

pub fn vector(&self, i: u32) -> &[Scalar] {
&self.mmap.raw.vector(i)
self.mmap.raw.vector(i)
}

pub fn data(&self, i: u32) -> u64 {
Expand Down Expand Up @@ -430,8 +430,7 @@ pub fn search<F: FnMut(u64) -> bool>(
let s = mmap.entry;
let levels = count_layers_of_a_vertex(mmap.m, s) - 1;
let u = fast_search(mmap, 1..=levels, s, vector);
let result = local_search(mmap, k, u, &vector, filter);
result
local_search(mmap, k, u, vector, filter)
}

pub fn fast_search(mmap: &HnswMmap, levels: RangeInclusive<u8>, u: u32, vector: &[Scalar]) -> u32 {
Expand Down Expand Up @@ -554,7 +553,7 @@ impl Visited {
fn new(capacity: usize) -> Self {
Self {
version: 0,
data: unsafe { Box::new_zeroed_slice(capacity).assume_init() },
data: bytemuck::zeroed_slice_box(capacity),
}
}
fn new_version(&mut self) -> VisitedVersion<'_> {
Expand Down
12 changes: 5 additions & 7 deletions src/algorithms/ivf/ivf_naive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl IvfNaive {
}

pub fn len(&self) -> u32 {
self.mmap.raw.len() as u32
self.mmap.raw.len()
}

pub fn vector(&self, i: u32) -> &[Scalar] {
&self.mmap.raw.vector(i)
self.mmap.raw.vector(i)
}

pub fn data(&self, i: u32) -> u64 {
Expand Down Expand Up @@ -272,12 +272,10 @@ pub fn search<F: FnMut(u64) -> bool>(
for i in lists.iter().map(|e| e.data as usize) {
let mut j = mmap.heads[i];
while u32::MAX != j {
let distance = mmap.quantization.distance(mmap.d, &vector, j);
let distance = mmap.quantization.distance(mmap.d, vector, j);
let data = mmap.raw.data(j);
if result.check(distance) {
if filter(data) {
result.push(HeapElement { distance, data });
}
if result.check(distance) && filter(data) {
result.push(HeapElement { distance, data });
}
j = mmap.nexts[j as usize];
}
Expand Down
12 changes: 5 additions & 7 deletions src/algorithms/ivf/ivf_pq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl IvfPq {
}

pub fn len(&self) -> u32 {
self.mmap.raw.len() as u32
self.mmap.raw.len()
}

pub fn vector(&self, i: u32) -> &[Scalar] {
&self.mmap.raw.vector(i)
self.mmap.raw.vector(i)
}

pub fn data(&self, i: u32) -> u64 {
Expand Down Expand Up @@ -277,12 +277,10 @@ pub fn search<F: FnMut(u64) -> bool>(
while u32::MAX != j {
let distance =
mmap.quantization
.distance_with_delta(mmap.d, &vector, j, mmap.centroids(i));
.distance_with_delta(mmap.d, vector, j, mmap.centroids(i));
let data = mmap.raw.data(j);
if result.check(distance) {
if filter(data) {
result.push(HeapElement { distance, data });
}
if result.check(distance) && filter(data) {
result.push(HeapElement { distance, data });
}
j = mmap.nexts[j as usize];
}
Expand Down
5 changes: 1 addition & 4 deletions src/algorithms/quantization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ impl QuantizationOptions {
}
}
pub fn is_product_quantization(&self) -> bool {
match self {
Self::Product(_) => true,
_ => false,
}
matches!(self, Self::Product(_))
}
}

Expand Down
44 changes: 21 additions & 23 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod bgworker;
pub mod worker;

use self::bgworker::Bgworker;
use self::worker::Worker;
use crate::ipc::server::RpcHandler;
use crate::ipc::IpcError;
use std::fs::OpenOptions;
Expand Down Expand Up @@ -38,19 +38,18 @@ pub fn main() {
}
log::error!("Panickied. Info: {:?}. Backtrace: {}.", info, backtrace);
}));
let bgworker;
if Path::new("pg_vectors").try_exists().unwrap() {
bgworker = Bgworker::open(PathBuf::from("pg_vectors"));
let worker = if Path::new("pg_vectors").try_exists().unwrap() {
Worker::open(PathBuf::from("pg_vectors"))
} else {
bgworker = Bgworker::create(PathBuf::from("pg_vectors"));
}
Worker::create(PathBuf::from("pg_vectors"))
};
std::thread::spawn({
let bgworker = bgworker.clone();
move || thread_main_2(crate::ipc::listen_unix(), bgworker)
let worker = worker.clone();
move || thread_main_2(crate::ipc::listen_unix(), worker)
});
std::thread::spawn({
let bgworker = bgworker.clone();
move || thread_main_2(crate::ipc::listen_mmap(), bgworker)
let worker = worker.clone();
move || thread_main_2(crate::ipc::listen_mmap(), worker)
});
loop {
let mut sig: i32 = 0;
Expand All @@ -70,40 +69,39 @@ pub fn main() {
}
_ => (),
}
std::thread::yield_now();
}
}

fn thread_main_2<T>(listen: T, bgworker: Arc<Bgworker>)
fn thread_main_2<T>(listen: T, worker: Arc<Worker>)
where
T: Iterator<Item = RpcHandler>,
{
for rpc_handler in listen {
std::thread::spawn({
let bgworker = bgworker.clone();
let worker = worker.clone();
move || {
if let Err(e) = thread_session(bgworker, rpc_handler) {
if let Err(e) = thread_session(worker, rpc_handler) {
log::error!("Session exited. {}.", e);
}
}
});
}
}

fn thread_session(bgworker: Arc<Bgworker>, mut handler: RpcHandler) -> Result<(), IpcError> {
fn thread_session(worker: Arc<Worker>, mut handler: RpcHandler) -> Result<(), IpcError> {
use crate::ipc::server::RpcHandle;
loop {
match handler.handle()? {
RpcHandle::Create { id, options, x } => {
bgworker.call_create(id, options);
worker.call_create(id, options);
handler = x.leave()?;
}
RpcHandle::Insert { id, insert, x } => {
let res = bgworker.call_insert(id, insert);
let res = worker.call_insert(id, insert);
handler = x.leave(res)?;
}
RpcHandle::Delete { id, mut x } => {
let res = bgworker.call_delete(id, |p| x.next(p).unwrap());
let res = worker.call_delete(id, |p| x.next(p).unwrap());
handler = x.leave(res)?;
}
RpcHandle::Search {
Expand All @@ -113,19 +111,19 @@ fn thread_session(bgworker: Arc<Bgworker>, mut handler: RpcHandler) -> Result<()
mut x,
} => {
if prefilter {
let res = bgworker.call_search(id, search, |p| x.check(p).unwrap());
let res = worker.call_search(id, search, |p| x.check(p).unwrap());
handler = x.leave(res)?;
} else {
let res = bgworker.call_search(id, search, |_| true);
let res = worker.call_search(id, search, |_| true);
handler = x.leave(res)?;
}
}
RpcHandle::Flush { id, x } => {
let result = bgworker.call_flush(id);
let result = worker.call_flush(id);
handler = x.leave(result)?;
}
RpcHandle::Destory { id, x } => {
bgworker.call_destory(id);
worker.call_destory(id);
handler = x.leave()?;
}
RpcHandle::Leave {} => {
Expand Down
Loading

0 comments on commit 47583cc

Please sign in to comment.