-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
351 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved. | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
|
||
//! This is a Worker that proxies requests to the storage that Workers has access to, i.e., KV and | ||
//! Durable Objects. | ||
//! | ||
//! Comunication with this Worker is done via HTTP. | ||
//! | ||
//! # KV | ||
//! | ||
//! The prefix of all KV request URIs is [`KV_PATH_BASE`]. | ||
//! | ||
//! ## Getting a key | ||
//! | ||
//! Make a `GET` request with uri `{KV_PATH_BASE}/path/to/key`. | ||
//! | ||
//! ## Putting a key | ||
//! | ||
//! Make a `POST` request with uri `{KV_PATH_BASE}/path/to/key`. The body of the request will be | ||
//! stored in kv as is, without any processing. | ||
//! | ||
//! ## Putting a key if it doesn't exist | ||
//! | ||
//! Make a `PUT` request with uri `{KV_PATH_BASE}/path/to/key`. The body of the request will be | ||
//! stored in kv as is, without any processing, if this key is not already present in KV. | ||
//! | ||
//! ## Deleting a key | ||
//! | ||
//! Make a `DELETE` request with uri `{KV_PATH_BASE}/path/to/key`. | ||
//! | ||
//! | ||
//! # Durable Objects | ||
//! | ||
//! The prefix of all durable object request URIs is [`DO_PATH_BASE`]. | ||
//! | ||
//! To interact with a durable object, create an instance of [`DurableRequest`], which will be the | ||
//! body of a `POST` request to `{DO_PATH_BASE}/{DURABLE_OBJECT_METHOD}` where | ||
//! `DURABLE_OBJECT_METHOD` is defined by the [`DurableMethod::to_uri`][to_uri] trait method of the | ||
//! binding used to create the [`DurableRequest`]. | ||
//! | ||
//! ``` | ||
//! # // hack so we don't need to depend on reqwest just for this example. | ||
//! # use reqwest_wasm as reqwest; | ||
//! use url::Url; | ||
//! use daphne_service_utils::durable_requests::{ | ||
//! DurableRequest, | ||
//! bindings::{self, DurableMethod} | ||
//! }; | ||
//! | ||
//! let (durable_request, uri) = DurableRequest::new( | ||
//! bindings::AggregateStore::Merge, | ||
//! // some mock data here | ||
//! ( | ||
//! daphne::DapVersion::DraftLatest, | ||
//! "some-task-id-in-hex", | ||
//! &daphne::DapBatchBucket::TimeInterval { batch_window: 50 } | ||
//! ), | ||
//! ); | ||
//! | ||
//! let worker_url = Url::parse("https://example-worker.com") | ||
//! .unwrap() | ||
//! .join(uri) | ||
//! .unwrap(); | ||
//! | ||
//! | ||
//! let _send_future = reqwest::Client::new() | ||
//! .post(worker_url) | ||
//! .body(durable_request.into_bytes()) | ||
//! .send(); | ||
//! ``` | ||
//! | ||
//! [to_uri]: daphne_service_utils::durable_requests::bindings::DurableMethod::to_uri | ||
use daphne_service_utils::durable_requests::{ | ||
DurableRequest, ObjectIdFrom, DO_PATH_PREFIX, KV_PATH_PREFIX, | ||
}; | ||
use url::Url; | ||
use worker::{js_sys::Uint8Array, Env, Request, RequestInit, Response}; | ||
|
||
const KV_BINDING_DAP_CONFIG: &str = "DAP_CONFIG"; | ||
|
||
/// Handle a proxy request. This is the entry point of the Worker. | ||
pub async fn handle_request( | ||
req: Request, | ||
env: Env, | ||
_ctx: worker::Context, | ||
) -> worker::Result<Response> { | ||
let path = req.path(); | ||
if let Some(uri) = path.strip_prefix(KV_PATH_PREFIX) { | ||
handle_kv_request(req, env, uri).await | ||
} else if let Some(uri) = path.strip_prefix(DO_PATH_PREFIX) { | ||
handle_do_request(req, env, uri).await | ||
} else { | ||
#[cfg(feature = "test-utils")] | ||
if let Some("") = path.strip_prefix(daphne_service_utils::durable_requests::PURGE_STORAGE) { | ||
return storage_purge(env).await; | ||
} | ||
tracing::error!("path {path:?} was invalid"); | ||
Response::error("invalid base path", 400) | ||
} | ||
} | ||
|
||
#[cfg(feature = "test-utils")] | ||
/// Clear all storage. Only available to tests | ||
async fn storage_purge(env: Env) -> worker::Result<Response> { | ||
use daphne_service_utils::durable_requests::bindings::{DurableMethod, GarbageCollector}; | ||
|
||
let kv_delete = async { | ||
let kv = env.kv(KV_BINDING_DAP_CONFIG)?; | ||
for key in kv.list().execute().await?.keys { | ||
kv.delete(&key.name).await?; | ||
tracing::trace!("deleted KV item {}", key.name); | ||
} | ||
Ok(()) | ||
}; | ||
|
||
let do_delete = async { | ||
let req = Request::new_with_init( | ||
&format!("https://fake-host{}", GarbageCollector::DeleteAll.to_uri(),), | ||
RequestInit::new().with_method(worker::Method::Post), | ||
)?; | ||
|
||
env.durable_object(GarbageCollector::BINDING)? | ||
.id_from_name(GarbageCollector::NAME_STR)? | ||
.get_stub()? | ||
.fetch_with_request(req) | ||
.await | ||
}; | ||
|
||
futures::try_join!(kv_delete, do_delete)?; | ||
Response::empty() | ||
} | ||
|
||
/// Handle a kv request. | ||
async fn handle_kv_request(mut req: Request, env: Env, key: &str) -> worker::Result<Response> { | ||
match req.method() { | ||
worker::Method::Get => { | ||
let bytes = env.kv(KV_BINDING_DAP_CONFIG)?.get(key).bytes().await?; | ||
|
||
match bytes { | ||
Some(bytes) => Response::from_bytes(bytes), | ||
None => Response::error("value not found", 404), | ||
} | ||
} | ||
worker::Method::Post => { | ||
env.kv(KV_BINDING_DAP_CONFIG)? | ||
.put_bytes(key, &req.bytes().await?)? | ||
.execute() | ||
.await?; | ||
|
||
Response::empty() | ||
} | ||
worker::Method::Put => { | ||
let kv = env.kv(KV_BINDING_DAP_CONFIG)?; | ||
if kv | ||
.list() | ||
.prefix(key.into()) | ||
.execute() | ||
.await? | ||
.keys | ||
.into_iter() | ||
.any(|k| k.name == key) | ||
{ | ||
Response::error(String::new(), 409 /* Conflict */) | ||
} else { | ||
kv.put_bytes(key, &req.bytes().await?)?.execute().await?; | ||
|
||
Response::empty() | ||
} | ||
} | ||
worker::Method::Delete => { | ||
env.kv(KV_BINDING_DAP_CONFIG)?.delete(key).await?; | ||
|
||
Response::empty() | ||
} | ||
_ => Response::error(String::new(), 405 /* Method not allowed */), | ||
} | ||
} | ||
|
||
/// Handle a durable object request | ||
async fn handle_do_request(mut req: Request, env: Env, uri: &str) -> worker::Result<Response> { | ||
let buf = req.bytes().await.map_err(|e| { | ||
tracing::error!(error = ?e, "failed to get bytes"); | ||
e | ||
})?; | ||
tracing::debug!(len = buf.len(), "deserializing do request"); | ||
let parsed_req = DurableRequest::try_from(&buf) | ||
.map_err(|e| worker::Error::RustError(format!("invalid format: {e:?}")))?; | ||
|
||
let binding = env.durable_object(&parsed_req.binding)?; | ||
let obj = match &parsed_req.id { | ||
ObjectIdFrom::Name(name) => binding.id_from_name(name)?, | ||
ObjectIdFrom::Hex(hex) => binding.id_from_string(hex)?, | ||
}; | ||
|
||
let mut do_req = RequestInit::new(); | ||
do_req.with_method(worker::Method::Post); | ||
do_req.with_headers(req.headers().clone()); | ||
if let body @ [_a, ..] = parsed_req.body() { | ||
let buffer = | ||
Uint8Array::new_with_length(body.len().try_into().map_err(|_| { | ||
worker::Error::RustError(format!("buffer is too long {}", body.len())) | ||
})?); | ||
buffer.copy_from(body); | ||
do_req.with_body(Some(buffer.into())); | ||
} | ||
let url = Url::parse("https://fake-host/").unwrap().join(uri).unwrap(); | ||
let do_req = Request::new_with_init(url.as_str(), &do_req)?; | ||
|
||
let stub = obj.get_stub()?; | ||
|
||
stub.fetch_with_request(do_req).await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Copyright (c) 2022 Cloudflare, Inc. All rights reserved. | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
--- | ||
version: "3.3" | ||
services: | ||
leader_storage: | ||
ports: | ||
- 4000:4000 | ||
build: | ||
context: .. | ||
dockerfile: daphne_worker_test/docker/storage_proxy.Dockerfile | ||
command: | ||
- "--port=4000" | ||
- "--global-random" | ||
helper_storage: | ||
ports: | ||
- 4001:4001 | ||
build: | ||
context: .. | ||
dockerfile: daphne_worker_test/docker/storage_proxy.Dockerfile | ||
command: | ||
- "--port=4001" | ||
- "--global-random" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
FROM rust:1.73-alpine AS builder | ||
WORKDIR /tmp/dap_test | ||
RUN apk add --update \ | ||
bash \ | ||
g++ \ | ||
make \ | ||
npm \ | ||
openssl-dev \ | ||
wasm-pack \ | ||
capnproto | ||
RUN npm install -g [email protected] | ||
|
||
# Pre-install worker-build and Rust's wasm32 target to speed up our custom build command | ||
RUN cargo install --git https://github.com/cloudflare/workers-rs | ||
RUN rustup target add wasm32-unknown-unknown | ||
|
||
COPY Cargo.toml Cargo.lock ./ | ||
COPY daphne_worker_test ./daphne_worker_test | ||
COPY daphne_worker ./daphne_worker | ||
COPY daphne_service_utils ./daphne_service_utils | ||
COPY daphne ./daphne | ||
RUN cargo new --lib daphne_server | ||
WORKDIR /tmp/dap_test/daphne_worker_test | ||
COPY daphne_worker_test/wrangler.storage_proxy.toml ./wrangler.toml | ||
RUN wrangler publish --dry-run | ||
|
||
FROM alpine:3.16 AS test | ||
RUN apk add --update npm bash | ||
RUN npm install -g [email protected] | ||
COPY --from=builder /tmp/dap_test/daphne_worker_test/wrangler.toml /wrangler.toml | ||
COPY --from=builder /tmp/dap_test/daphne_worker_test/build/worker/* /build/worker/ | ||
EXPOSE 8080 | ||
# `-B ""` to skip build command. | ||
ENTRYPOINT ["miniflare", "--modules", "--modules-rule=CompiledWasm=**/*.wasm", "/build/worker/shim.mjs", "-B", ""] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
name = "daphne_storage_proxy" | ||
main = "build/worker/shim.mjs" | ||
compatibility_date = "2023-12-21" | ||
|
||
[build] | ||
command = "cargo install --git https://github.com/cloudflare/workers-rs && worker-build --dev" | ||
|
||
[[rules]] | ||
globs = ["**/*.wasm"] | ||
type = "CompiledWasm" | ||
fallthrough = false | ||
|
||
# NOTE: Variables marked as SECRET need to be provisioned securely in | ||
# production. In particular, they will not be passed as environment variables | ||
# as they are here. See | ||
# https://developers.cloudflare.com/workers/wrangler/commands/#secret. | ||
[vars] | ||
DAP_PROXY = "true" | ||
DAP_REPORT_STORAGE_MAX_FUTURE_TIME_SKEW = "300" | ||
DAP_DEPLOYMENT = "dev" | ||
DAP_HELPER_STATE_STORE_GARBAGE_COLLECT_AFTER_SECS = "10" | ||
DAP_COLLECTION_JOB_ID_KEY = "12da249b0e3b1a9b5936d6ff6cdd2fa09964e2e11f5450e04eef11dc5e64daf1" # SECRET | ||
|
||
[durable_objects] | ||
bindings = [ | ||
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" }, | ||
{ name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" }, | ||
{ name = "DAP_HELPER_STATE_STORE", class_name = "HelperStateStore" }, | ||
{ name = "DAP_LEADER_AGG_JOB_QUEUE", class_name = "LeaderAggregationJobQueue" }, | ||
{ name = "DAP_LEADER_BATCH_QUEUE", class_name = "LeaderBatchQueue" }, | ||
{ name = "DAP_LEADER_COL_JOB_QUEUE", class_name = "LeaderCollectionJobQueue" }, | ||
{ name = "DAP_REPORTS_PENDING", class_name = "ReportsPending" }, | ||
] | ||
|
||
|
||
[[kv_namespaces]] | ||
binding = "DAP_CONFIG" | ||
# KV bindings are in a looked up in a namespace identified by a 16-byte id number. | ||
# This number is assigned by calling | ||
# | ||
# wrangler kv:namespace create <NAME> | ||
# | ||
# for some unique name you specify, and it returns a unique id number to use. | ||
# Here we should use something like "leader" for the <NAME>. | ||
id = "<assign-one-for-the-leader>" | ||
# A "preview id" is an id used when running in "wrangler dev" mode locally, and | ||
# can just be made up. We generated the number below with the following python | ||
# code: | ||
# | ||
# import secrets | ||
# print(secrets.token_hex(16)) | ||
# | ||
preview_id = "24c4dc92d5cf4680e508fe18eb8f0281" | ||
|
||
[[migrations]] | ||
tag = "v1" | ||
new_classes = [ | ||
"AggregateStore", | ||
"HelperStateStore", | ||
"LeaderAggregationJobQueue", | ||
"LeaderBatchQueue", | ||
"LeaderCollectionJobQueue", | ||
"GarbageCollector", | ||
"ReportsPending", | ||
] |