Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix checker db paths #444

Merged
merged 12 commits into from
Sep 25, 2024
4 changes: 2 additions & 2 deletions Dockerfile.nocuda
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ FROM --platform=linux/amd64 build-image as build-app
WORKDIR /src/gpu-iris-mpc
COPY . .

RUN cargo build --release --target x86_64-unknown-linux-gnu --bin upgrade-server --bin upgrade-client --bin upgrade-checker
RUN cargo build --release --target x86_64-unknown-linux-gnu --bin seed-v1-dbs --bin upgrade-server --bin upgrade-client --bin upgrade-checker


FROM --platform=linux/amd64 ubuntu:22.04
ENV DEBIAN_FRONTEND=noninteractive

COPY --from=build-app /src/gpu-iris-mpc/target/x86_64-unknown-linux-gnu/release/upgrade-server /bin/upgrade-server
COPY --from=build-app /src/gpu-iris-mpc/target/x86_64-unknown-linux-gnu/release/seed-v1-dbs /bin/seed-v1-dbs
COPY --from=build-app /src/gpu-iris-mpc/target/x86_64-unknown-linux-gnu/release/upgrade-server /bin/upgrade-server
COPY --from=build-app /src/gpu-iris-mpc/target/x86_64-unknown-linux-gnu/release/upgrade-client /bin/upgrade-client
COPY --from=build-app /src/gpu-iris-mpc/target/x86_64-unknown-linux-gnu/release/upgrade-checker /bin/upgrade-checker
Expand Down
17 changes: 17 additions & 0 deletions iris-mpc-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Store {
}

pub async fn new(url: &str, schema_name: &str) -> Result<Self> {
tracing::info!("Connecting to V2 database with, schema: {}", schema_name);
let connect_sql = sql_switch_schema(schema_name)?;

let pool = PgPoolOptions::new()
Expand Down Expand Up @@ -135,6 +136,22 @@ impl Store {
.fetch(&self.pool)
}

pub fn stream_irises_in_range(
&self,
id_range: std::ops::Range<u64>,
) -> impl Stream<Item = sqlx::Result<StoredIris>> + '_ {
sqlx::query_as(
r#"
SELECT *
FROM irises
WHERE id >= $1 AND id < $2
ORDER BY id ASC
"#,
)
.bind(i64::try_from(id_range.start).expect("id fits into i64"))
.bind(i64::try_from(id_range.end).expect("id fits into i64"))
.fetch(&self.pool)
}
/// Stream irises in parallel, without a particular order.
pub async fn stream_irises_par(
&self,
Expand Down
106 changes: 82 additions & 24 deletions iris-mpc-upgrade/src/bin/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,40 @@ use itertools::izip;
use mpc_uniqueness_check::{bits::Bits, distance::EncodedBits};
use std::collections::HashMap;

// quick checking script that recombines the shamir shares for a local server
// setup and prints the iris code share
fn install_tracing() {
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

let fmt_layer = fmt::layer().with_target(true).with_line_number(true);
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();

tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}

const APP_NAME: &str = "SMPC";

#[derive(Debug, Clone, Parser)]
struct Args {
#[clap(long)]
db_urls: Vec<String>,
db_urls: Vec<String>,

#[clap(long)]
num_elements: u64,
from: u64,

#[clap(long)]
to: u64,

#[clap(long)]
environment: String,
environment: String,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
install_tracing();
let args = Args::parse();

if args.db_urls.len() != 6 {
Expand All @@ -39,20 +56,21 @@ async fn main() -> eyre::Result<()> {
}

let old_left_shares_db0 =
V1Db::new(format!("{}/{}", args.db_urls[0], "/participant1_left").as_str()).await?;
V1Db::new(format!("{}/{}", args.db_urls[0], "participant1_left").as_str()).await?;
let old_left_shares_db1 =
V1Db::new(format!("{}/{}", args.db_urls[1], "/participant2_left").as_str()).await?;
V1Db::new(format!("{}/{}", args.db_urls[1], "participant2_left").as_str()).await?;
let old_left_masks_db =
V1Db::new(format!("{}/{}", args.db_urls[2], "/coordinator_left").as_str()).await?;
V1Db::new(format!("{}/{}", args.db_urls[2], "coordinator_left").as_str()).await?;

let old_right_shares_db0 =
V1Db::new(format!("{}/{}", args.db_urls[0], "/participant1_right").as_str()).await?;
V1Db::new(format!("{}/{}", args.db_urls[0], "participant1_right").as_str()).await?;
let old_right_shares_db1 =
V1Db::new(format!("{}/{}", args.db_urls[1], "/participant2_right").as_str()).await?;
V1Db::new(format!("{}/{}", args.db_urls[1], "participant2_right").as_str()).await?;
let old_right_masks_db1 =
V1Db::new(format!("{}/{}", args.db_urls[2], "/coordinator_right").as_str()).await?;
V1Db::new(format!("{}/{}", args.db_urls[2], "coordinator_right").as_str()).await?;

let base_schema_name = format!("{}_{}", APP_NAME, args.environment);

let new_db0 = Store::new(
&args.db_urls[3],
format!("{}_{}", base_schema_name, "0").as_str(),
Expand All @@ -71,15 +89,15 @@ async fn main() -> eyre::Result<()> {

// grab the old shares from the db and reconstruct them
let old_left_shares0 = old_left_shares_db0
.stream_shares(1..args.num_elements + 1)
.stream_shares(args.from..args.to)
.collect::<Vec<_>>()
.await;
let old_left_shares1 = old_left_shares_db1
.stream_shares(1..args.num_elements + 1)
.stream_shares(args.from..args.to)
.collect::<Vec<_>>()
.await;
let old_left_masks = old_left_masks_db
.stream_masks(1..args.num_elements + 1)
.stream_masks(args.from..args.to)
.collect::<Vec<_>>()
.await;

Expand All @@ -102,15 +120,15 @@ async fn main() -> eyre::Result<()> {
.collect();

let old_right_shares0 = old_right_shares_db0
.stream_shares(1..args.num_elements + 1)
.stream_shares(args.from..args.to)
.collect::<Vec<_>>()
.await;
let old_right_shares1 = old_right_shares_db1
.stream_shares(1..args.num_elements + 1)
.stream_shares(args.from..args.to)
.collect::<Vec<_>>()
.await;
let old_right_masks = old_right_masks_db1
.stream_masks(1..args.num_elements + 1)
.stream_masks(args.from..args.to)
.collect::<Vec<_>>()
.await;

Expand All @@ -133,9 +151,19 @@ async fn main() -> eyre::Result<()> {
.collect();

// grab shares from new DB and recombine
let new_shares0 = new_db0.stream_irises().await.collect::<Vec<_>>().await;
let new_shares1 = new_db1.stream_irises().await.collect::<Vec<_>>().await;
let new_shares2 = new_db2.stream_irises().await.collect::<Vec<_>>().await;
let new_shares0 = new_db0
.stream_irises_in_range(args.from..args.to)
.collect::<Vec<_>>()
.await;

let new_shares1 = new_db1
.stream_irises_in_range(args.from..args.to)
.collect::<Vec<_>>()
.await;
let new_shares2 = new_db2
.stream_irises_in_range(args.from..args.to)
.collect::<Vec<_>>()
.await;

let mut new_left_db = HashMap::new();
let mut new_right_db = HashMap::new();
Expand Down Expand Up @@ -233,21 +261,51 @@ async fn main() -> eyre::Result<()> {
assert_eq!(enc_right_masks, enc_right_masks2);

new_right_db.insert(iris0.id(), (enc_right_bits, enc_right_masks));
if iris0.id() % 1000 == 0 {
tracing::info!("Processed {} shares", iris0.id());
}
}
tracing::info!("Processed {} shares", args.to - args.from);

assert_eq!(old_left_db.len(), new_left_db.len());
assert_eq!(old_right_db.len(), new_right_db.len());
assert_eq!(old_left_db.len(), old_right_db.len());

tracing::info!("Left / right db lengths match");

for (idx, (code, mask)) in old_left_db {
let (new_code, new_mask) = new_left_db.get(&idx).expect("old id is present in new db");
assert_eq!(code, *new_code);
assert_eq!(mask, *new_mask);
if code != *new_code {
tracing::error!("Code for id {} left does not match", idx);
tracing::error!("Old: {:?}", code);
tracing::error!("New: {:?}", new_code);
} else if idx % 250 == 0 {
tracing::info!("Code for id {} left matches", idx);
}
if mask != *new_mask {
tracing::error!("Mask for id {} left does not match", idx);
tracing::error!("Old: {:?}", mask);
tracing::error!("New: {:?}", new_mask);
} else if idx % 250 == 0 {
tracing::info!("Mask for id {} left matches", idx);
}
}
for (idx, (code, mask)) in old_right_db {
let (new_code, new_mask) = new_right_db.get(&idx).expect("old id is present in new db");
assert_eq!(code, *new_code);
assert_eq!(mask, *new_mask);
if code != *new_code {
tracing::error!("Code for id {} right does not match", idx);
tracing::error!("Old: {:?}", code);
tracing::error!("New: {:?}", new_code);
} else if idx % 250 == 0 {
tracing::info!("Code for id {} right matches", idx);
}
if mask != *new_mask {
tracing::error!("Mask for id {} right does not match", idx);
tracing::error!("Old: {:?}", mask);
tracing::error!("New: {:?}", new_mask);
} else if idx % 250 == 0 {
tracing::info!("Mask for id {} right matches", idx);
}
}

Ok(())
Expand Down
Loading
Loading