Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Jan 10, 2025
1 parent a6da2f5 commit 97dc9b4
Show file tree
Hide file tree
Showing 45 changed files with 1,969 additions and 241 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-subscriber = "0.3.18"
url = { version = "2.5.4", features = ["serde"] }
wasm-bindgen = "0.2.99"
webpki = "0.22.4"
worker = { version = "0.5", features = ["http"] }
worker = "0.5"
x509-parser = "0.15.1"

[workspace.dependencies.sentry]
Expand Down
1 change: 1 addition & 0 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn get_agg_share(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<DapAggregateShare, DapError> {
Expand Down
12 changes: 11 additions & 1 deletion crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use axum::async_trait;
use daphne::{
messages::{request::AggregationJobRequestHash, AggregationJobId, TaskId},
fatal_error,
messages::{request::AggregationJobRequestHash, AggregationJobId, AggregationJobResp, TaskId},
roles::DapHelper,
DapError, DapVersion,
};
Expand All @@ -20,4 +21,13 @@ impl DapHelper for crate::App {
// the server implementation can't check for this
Ok(())
}

async fn poll_aggregated(
&self,
_version: DapVersion,
_task_id: &TaskId,
_agg_job_id: &AggregationJobId,
) -> Result<AggregationJobResp, DapError> {
Err(fatal_error!(err = "polling not implemented"))
}
}
5 changes: 4 additions & 1 deletion crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ fn main() {
#[cfg(feature = "durable_requests")]
compiler
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.file("./src/durable_requests/bindings/new_aggregate_store.capnp")
.file("./src/durable_requests/bindings/agg_job_response_store.capnp")
.file("./src/durable_requests/bindings/replay_checker.capnp");

#[cfg(feature = "cpu_offload")]
compiler.file("./src/cpu_offload/cpu_offload.capnp");
Expand Down
2 changes: 0 additions & 2 deletions crates/daphne-service-utils/src/cpu_offload/cpu_offload.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

@0xd932f3d934afce3b;

# Utilities

using Base = import "../capnproto/base.capnp";

using VdafConfig = Text; # json encoded
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

@0xd30da336463f3205;

using Base = import "../../capnproto/base.capnp";

struct AggregationJobResponse {
enum ReportError {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}

struct TransitionVar {
union {
continued @0 :Data;
failed @1 :ReportError;
}
}

struct Transition {
reportId @0 :Base.ReportId;
var @1 :TransitionVar;
}

transitions @0 :List(Transition);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use daphne::{
messages::{AggregationJobId, ReadyAggregationJobResp, TaskId, Transition, TransitionVar},
DapVersion,
};

use crate::{
agg_job_response_store_capnp::aggregation_job_response,
capnproto::{
decode_list, encode_list, usize_to_capnp_len, CapnprotoPayloadDecode,
CapnprotoPayloadEncode,
},
durable_requests::ObjectIdFrom,
};

super::define_do_binding! {
const BINDING = "AGGREGATE_JOB_RESULT_STORE";
enum Command {
Get = "/get",
Put = "/put",
}

fn name(
(version, task_id, agg_job_id):
(DapVersion, &'n TaskId, &'n AggregationJobId)
) -> ObjectIdFrom {
ObjectIdFrom::Name(format!("{version}/task/{task_id}/agg_job/{agg_job_id}"))
}
}

impl CapnprotoPayloadEncode for ReadyAggregationJobResp {
type Builder<'a> = aggregation_job_response::Builder<'a>;

fn encode_to_builder(&self, builder: Self::Builder<'_>) {
let Self { transitions } = self;
encode_list(
transitions,
builder.init_transitions(usize_to_capnp_len(transitions.len())),
);
}
}

impl CapnprotoPayloadEncode for Transition {
type Builder<'a> = aggregation_job_response::transition::Builder<'a>;

fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
let Self { report_id, var } = self;
report_id.encode_to_builder(builder.reborrow().init_report_id());
let mut builder = builder.init_var();
match var {
TransitionVar::Continued(vec) => builder.set_continued(vec),
TransitionVar::Failed(report_error) => builder.set_failed((*report_error).into()),
}
}
}

impl CapnprotoPayloadDecode for Transition {
type Reader<'a> = aggregation_job_response::transition::Reader<'a>;

fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self>
where
Self: Sized,
{
Ok(Self {
report_id: <_>::decode_from_reader(reader.get_report_id()?)?,
var: match reader.get_var()?.which()? {
aggregation_job_response::transition_var::Which::Continued(data) => {
TransitionVar::Continued(data?.to_vec())
}
aggregation_job_response::transition_var::Which::Failed(report_error) => {
TransitionVar::Failed(report_error?.into())
}
},
})
}
}

impl CapnprotoPayloadDecode for ReadyAggregationJobResp {
type Reader<'a> = aggregation_job_response::Reader<'a>;

fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self>
where
Self: Sized,
{
Ok(Self {
transitions: decode_list::<Transition, _, _>(reader.get_transitions()?)?,
})
}
}

impl From<daphne::messages::ReportError> for aggregation_job_response::ReportError {
fn from(error: daphne::messages::ReportError) -> Self {
match error {
daphne::messages::ReportError::Reserved => Self::Reserved,
daphne::messages::ReportError::BatchCollected => Self::BatchCollected,
daphne::messages::ReportError::ReportReplayed => Self::ReportReplayed,
daphne::messages::ReportError::ReportDropped => Self::ReportDropped,
daphne::messages::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
daphne::messages::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
daphne::messages::ReportError::VdafPrepError => Self::VdafPrepError,
daphne::messages::ReportError::BatchSaturated => Self::BatchSaturated,
daphne::messages::ReportError::TaskExpired => Self::TaskExpired,
daphne::messages::ReportError::InvalidMessage => Self::InvalidMessage,
daphne::messages::ReportError::ReportTooEarly => Self::ReportTooEarly,
daphne::messages::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

impl From<aggregation_job_response::ReportError> for daphne::messages::ReportError {
fn from(error: aggregation_job_response::ReportError) -> Self {
match error {
aggregation_job_response::ReportError::Reserved => Self::Reserved,
aggregation_job_response::ReportError::BatchCollected => Self::BatchCollected,
aggregation_job_response::ReportError::ReportReplayed => Self::ReportReplayed,
aggregation_job_response::ReportError::ReportDropped => Self::ReportDropped,
aggregation_job_response::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
aggregation_job_response::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
aggregation_job_response::ReportError::VdafPrepError => Self::VdafPrepError,
aggregation_job_response::ReportError::BatchSaturated => Self::BatchSaturated,
aggregation_job_response::ReportError::TaskExpired => Self::TaskExpired,
aggregation_job_response::ReportError::InvalidMessage => Self::InvalidMessage,
aggregation_job_response::ReportError::ReportTooEarly => Self::ReportTooEarly,
aggregation_job_response::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::capnproto::{CapnprotoPayloadDecodeExt as _, CapnprotoPayloadEncodeExt as _};
use daphne::messages::ReportId;
use rand::{thread_rng, Rng};

fn gen_agg_job_resp() -> ReadyAggregationJobResp {
ReadyAggregationJobResp {
transitions: vec![
Transition {
report_id: ReportId(thread_rng().gen()),
var: TransitionVar::Continued(vec![1, 2, 3]),
},
Transition {
report_id: ReportId(thread_rng().gen()),
var: TransitionVar::Failed(daphne::messages::ReportError::InvalidMessage),
},
],
}
}

#[test]
fn serialization_deserialization_round_trip() {
let this = gen_agg_job_resp();
let other = ReadyAggregationJobResp::decode_from_bytes(&this.encode_to_bytes()).unwrap();
assert_eq!(this, other);
}
}
Loading

0 comments on commit 97dc9b4

Please sign in to comment.