Skip to content

Commit

Permalink
Add storage proxy worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Dec 20, 2023
1 parent 8e78e17 commit fdd9d15
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ hex = { version = "0.4.3", features = ["serde"] }
hpke-rs = "0.2.0"
hpke-rs-crypto = "0.2.0"
hpke-rs-rust-crypto = "0.2.0"
http = "1"
matchit = "0.7.3"
paste = "1.0.14"
prio = { git = "https://github.com/divviup/libprio-rs", rev = "d0168336bbad1805231a69181b329e37ee962203" }
Expand Down
2 changes: 1 addition & 1 deletion daphne_service_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ readme = "../README.md"

[dependencies]
daphne = { path = "../daphne", default-features = false }
http = "1"
http.workspace = true
prometheus.workspace = true
ring.workspace = true
serde.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion daphne_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@
//! | `DAP_REPORT_SHARD_COUNT` | `u64` | no | Number of report shards per storage epoch. |
//! | `DAP_REPORT_SHARD_KEY` | `String` | yes | Hex-encoded key used to hash a report into one of the report shards. |
mod config;
pub mod config;
mod durable;
mod error_reporting;
mod roles;
mod router;
pub mod storage_proxy;
mod tracing_utils;

use crate::config::{DaphneWorkerIsolateState, DaphneWorkerRequestState};
Expand Down
213 changes: 213 additions & 0 deletions daphne_worker/src/storage_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

//! This is a worker implementation that serves as a proxy to the storage that workers has access
//! to, i.e., KV and Durable Objects.
//!
//! Comunication with this worker is done via http.
//!
//! # KV
//!
//! The base of all kv request uris is [`KV_PATH_BASE`].
//!
//! ## Getting a key
//!
//! Make a `GET` request with uri `{KV_PATH_BASE}/path/to/key`.
//!
//! ## Putting a key
//!
//! Make a `POST` request with uri `{KV_PATH_BASE}/path/to/key`. The body of the request will be
//! stored in kv as is, without any processing.
//!
//! ## Putting a key if it doesn't exist
//!
//! Make a `PUT` request with uri `{KV_PATH_BASE}/path/to/key`. The body of the request will be
//! stored in kv as is, without any processing, if this key is not already present in KV.
//!
//! ## Deleting a key
//!
//! Make a `DELETE` request with uri `{KV_PATH_BASE}/path/to/key`.
//!
//!
//! # Durable Objects
//!
//! The base of all durable object requests is [`DO_PATH_BASE`].
//!
//! To interact with a durable object create an instance of [`DurableRequest`] which will be the
//! body of a `POST` request to `{DO_PATH_BASE}/{DURABLE_OBJECT_METHOD}` where
//! `DURABLE_OBJECT_METHOD` is defined by the [`DurableMethod::to_uri`][to_uri] trait method of the
//! binding used to create the [`DurableRequest`].
//!
//! ```
//! # // hack so we don't need to depend on reqwest just for this example.
//! # use reqwest_wasm as reqwest;
//! use url::Url;
//! use daphne_service_utils::durable_requests::{
//! DurableRequest,
//! bindings::{self, DurableMethod}
//! };
//!
//! let (durable_request, uri) = DurableRequest::new(
//! bindings::AggregateStore::Merge,
//! // some mock data here
//! (
//! daphne::DapVersion::DraftLatest,
//! "some-task-id-in-hex",
//! &daphne::DapBatchBucket::TimeInterval { batch_window: 50 }
//! ),
//! );
//!
//! let worker_url = Url::parse("https://example-worker.com")
//! .unwrap()
//! .join(uri)
//! .unwrap();
//!
//!
//! let _send_future = reqwest::Client::new()
//! .post(worker_url)
//! .body(durable_request.into_bytes())
//! .send();
//! ```
//!
//! [to_uri]: daphne_service_utils::durable_requests::bindings::DurableMethod::to_uri
//!
use daphne_service_utils::durable_requests::{
DurableRequest, ObjectIdFrom, DO_PATH_BASE, KV_PATH_BASE,
};
use url::Url;
use worker::{js_sys::Uint8Array, Env, Request, RequestInit, Response};

const KV_BINDING_DAP_CONFIG: &str = "DAP_CONFIG";

/// Handle a proxy request. This is the entry point of the worker.
pub async fn handle_request(
req: Request,
env: Env,
_ctx: worker::Context,
) -> worker::Result<Response> {
let path = req.path();
if let Some(uri) = path.strip_prefix(KV_PATH_BASE) {
handle_kv_request(req, env, uri).await
} else if let Some(uri) = path.strip_prefix(DO_PATH_BASE) {
handle_do_request(req, env, uri).await
} else {
#[cfg(feature = "test-utils")]
if let Some("") = path.strip_prefix(daphne_service_utils::durable_requests::PURGE_STORAGE) {
return storage_purge(env).await;
}
tracing::error!("path {path:?} was invalid");
Response::error("invalid base path", 400)
}
}

#[cfg(feature = "test-utils")]
/// Clear all storage. Only available to tests
async fn storage_purge(env: Env) -> worker::Result<Response> {
use daphne_service_utils::durable_requests::bindings::{DurableMethod, GarbageCollector};

let kv_delete = async {
let kv = env.kv(KV_BINDING_DAP_CONFIG)?;
for key in kv.list().execute().await?.keys {
kv.delete(&key.name).await?;
tracing::trace!("deleted KV item {}", key.name);
}
Ok(())
};

let do_delete = async {
let req = Request::new_with_init(
&format!("https://fake-host{}", GarbageCollector::DeleteAll.to_uri(),),
RequestInit::new().with_method(worker::Method::Post),
)?;

env.durable_object(GarbageCollector::BINDING)?
.id_from_name(GarbageCollector::NAME_STR)?
.get_stub()?
.fetch_with_request(req)
.await
};

futures::try_join!(kv_delete, do_delete)?;
Response::empty()
}

/// Handle a kv request.
async fn handle_kv_request(mut req: Request, env: Env, key: &str) -> worker::Result<Response> {
match req.method() {
worker::Method::Get => {
let bytes = env.kv(KV_BINDING_DAP_CONFIG)?.get(key).bytes().await?;

match bytes {
Some(bytes) => Response::from_bytes(bytes),
None => Response::error("value not found", 404),
}
}
worker::Method::Post => {
env.kv(KV_BINDING_DAP_CONFIG)?
.put_bytes(key, &req.bytes().await?)?
.execute()
.await?;

Response::empty()
}
worker::Method::Put => {
let kv = env.kv(KV_BINDING_DAP_CONFIG)?;
if kv
.list()
.prefix(key.into())
.execute()
.await?
.keys
.into_iter()
.any(|k| k.name == key)
{
Response::error(String::new(), 409 /* Conflict */)
} else {
kv.put_bytes(key, &req.bytes().await?)?.execute().await?;

Response::empty()
}
}
worker::Method::Delete => {
env.kv(KV_BINDING_DAP_CONFIG)?.delete(key).await?;

Response::empty()
}
_ => Response::error(String::new(), 405 /* Method not allowed */),
}
}

/// Handle a durable object request
async fn handle_do_request(mut req: Request, env: Env, uri: &str) -> worker::Result<Response> {
let buf = req.bytes().await.map_err(|e| {
tracing::error!(error = ?e, "failed to get bytes");
e
})?;
tracing::debug!(len = buf.len(), "deserializing do request");
let parsed_req = DurableRequest::from_bytes(&buf)
.map_err(|e| worker::Error::RustError(format!("invalid format: {e:?}")))?;

let binding = env.durable_object(&parsed_req.binding)?;
let obj = match &parsed_req.id {
ObjectIdFrom::Name(name) => binding.id_from_name(name)?,
ObjectIdFrom::Hex(hex) => binding.id_from_string(hex)?,
};

let mut do_req = RequestInit::new();
do_req.with_method(worker::Method::from(parsed_req.method.as_str().to_string()));
do_req.with_headers(req.headers().clone());
if let payload @ [_a, ..] = parsed_req.body() {
let buffer = Uint8Array::new_with_length(payload.len().try_into().map_err(|_| {
worker::Error::RustError(format!("buffer is too long {}", payload.len()))
})?);
buffer.copy_from(payload);
do_req.with_body(Some(buffer.into()));
}
let url = Url::parse("https://fake-host/").unwrap().join(uri).unwrap();
let do_req = Request::new_with_init(url.as_str(), &do_req)?;

let stub = obj.get_stub()?;

stub.fetch_with_request(do_req).await
}
2 changes: 1 addition & 1 deletion daphne_worker_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ cfg-if = "1.0.0"
# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for
# code size when deploying.
console_error_panic_hook = { version = "0.1.7", optional = true }
daphne_worker = { path = "../daphne_worker" }
daphne_worker = { path = "../daphne_worker", features = ["test-utils"] }
getrandom = { workspace = true, features = ["js", "std"] }
tracing.workspace = true
worker.workspace = true
Expand Down
23 changes: 23 additions & 0 deletions daphne_worker_test/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) 2022 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
---
version: "3.3"
services:
leader_storage:
ports:
- 4000:4000
build:
context: ..
dockerfile: daphne_worker_test/docker/storage_proxy.Dockerfile
command:
- "--port=4000"
- "--global-random"
helper_storage:
ports:
- 4001:4001
build:
context: ..
dockerfile: daphne_worker_test/docker/storage_proxy.Dockerfile
command:
- "--port=4001"
- "--global-random"
33 changes: 33 additions & 0 deletions daphne_worker_test/docker/storage_proxy.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM rust:1.73-alpine AS builder
WORKDIR /tmp/dap_test
RUN apk add --update \
bash \
g++ \
make \
npm \
openssl-dev \
wasm-pack
RUN npm install -g [email protected]

# Pre-install worker-build and Rust's wasm32 target to speed up our custom build command
RUN cargo install --git https://github.com/cloudflare/workers-rs
RUN rustup target add wasm32-unknown-unknown

COPY Cargo.toml Cargo.lock ./
COPY daphne_worker_test ./daphne_worker_test
COPY daphne_worker ./daphne_worker
COPY daphne_service_utils ./daphne_service_utils
COPY daphne ./daphne
RUN cargo new --lib daphne_server
WORKDIR /tmp/dap_test/daphne_worker_test
COPY daphne_worker_test/wrangler.storage_proxy.toml ./wrangler.toml
RUN wrangler publish --dry-run

FROM alpine:3.16 AS test
RUN apk add --update npm bash
RUN npm install -g [email protected]
COPY --from=builder /tmp/dap_test/daphne_worker_test/wrangler.toml /wrangler.toml
COPY --from=builder /tmp/dap_test/daphne_worker_test/build/worker/* /build/worker/
EXPOSE 8080
# `-B ""` to skip build command.
ENTRYPOINT ["miniflare", "--modules", "--modules-rule=CompiledWasm=**/*.wasm", "/build/worker/shim.mjs", "-B", ""]
20 changes: 13 additions & 7 deletions daphne_worker_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn log_request(req: &Request) {
static CAP: cap::Cap<std::alloc::System> = cap::Cap::new(std::alloc::System, 65_000_000);

#[event(fetch, respond_with_errors)]
pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result<Response> {
pub async fn main(req: Request, env: Env, ctx: worker::Context) -> Result<Response> {
// Optionally, get more helpful error messages written to the console in the case of a panic.
utils::set_panic_hook();

Expand All @@ -47,10 +47,16 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result<Respo

log_request(&req);

let router = DaphneWorkerRouter {
enable_internal_test: true,
enable_default_response: false,
..Default::default()
};
router.handle_request(req, env).await
if matches!(env.var("DAP_PROXY").map(|v| v.to_string()), Ok(v) if v == "true") {
info!("starting storage proxy");
daphne_worker::storage_proxy::handle_request(req, env, ctx).await
} else {
info!("starting normal worker");
let router = DaphneWorkerRouter {
enable_internal_test: true,
enable_default_response: false,
..Default::default()
};
router.handle_request(req, env).await
}
}
Loading

0 comments on commit fdd9d15

Please sign in to comment.