-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
1,376 additions
and
366 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
crates/daphne-server/examples/configuration-cpu-offload.toml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved. | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
|
||
port = 5000 | ||
|
||
# None of these settings are relevant and can be deleted later when | ||
# daphne-server stops being an aggregator | ||
[storage_proxy] | ||
url = "http://localhost:4001" | ||
# SECRET: This is a test secret. In production, we'll generate and securely provision the token. | ||
auth_token = 'this-is-the-storage-proxy-auth-token' | ||
|
||
[service] | ||
env = "oxy" | ||
role = "helper" | ||
max_batch_duration = 360000 | ||
min_batch_interval_start = 259200 | ||
max_batch_interval_end = 259200 | ||
supported_hpke_kems = ["x25519_hkdf_sha256"] | ||
default_version = "v09" | ||
report_storage_epoch_duration = 300000 | ||
base_url = "http://127.0.0.1:8788" | ||
default_num_agg_span_shards = 4 | ||
|
||
[service.taskprov] | ||
peer_auth.leader.expected_token = "I-am-the-leader" # SECRET | ||
vdaf_verify_key_init = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18" # SECRET | ||
hpke_collector_config = """{ | ||
"id": 23, | ||
"kem_id": "p256_hkdf_sha256", | ||
"kdf_id": "hkdf_sha256", | ||
"aead_id": "aes128_gcm", | ||
"public_key": "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0160d9252b82b4b5c52354205f5ec945645cb79facff8d85c9c31b490cdf35466" | ||
}""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved. | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
|
||
use crate::App; | ||
use axum::{async_trait, extract::FromRequest, response::IntoResponse, routing::post}; | ||
use daphne::{error::DapAbort, InitializedReport}; | ||
use daphne_service_utils::{ | ||
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadDecodeExt, CapnprotoPayloadEncodeExt}, | ||
cpu_offload::{InitializeReports, InitializedReports}, | ||
}; | ||
use http::StatusCode; | ||
use prio::codec::ParameterizedDecode; | ||
use rayon::iter::{IntoParallelIterator as _, ParallelIterator}; | ||
|
||
pub(super) fn add_routes(router: super::Router<App>) -> super::Router<App> { | ||
router.route("/cpu_offload/initialize_reports", post(initialize_reports)) | ||
} | ||
|
||
struct CapnprotoExtractor<T>(T); | ||
|
||
#[async_trait] | ||
impl<S, T> FromRequest<S> for CapnprotoExtractor<T> | ||
where | ||
T: CapnprotoPayloadDecode, | ||
{ | ||
type Rejection = StatusCode; | ||
|
||
async fn from_request( | ||
req: http::Request<axum::body::Body>, | ||
_state: &S, | ||
) -> Result<Self, Self::Rejection> { | ||
let bytes = axum::body::to_bytes(req.into_body(), usize::MAX) | ||
.await | ||
.map_err(|_| StatusCode::BAD_REQUEST)?; | ||
let t = T::decode_from_bytes(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?; | ||
|
||
Ok(CapnprotoExtractor(t)) | ||
} | ||
} | ||
|
||
#[tracing::instrument(skip_all, fields(%task_id, report_count = prep_inits.len()))] | ||
async fn initialize_reports( | ||
CapnprotoExtractor(InitializeReports { | ||
hpke_keys, | ||
valid_report_range, | ||
task_id, | ||
task_config, | ||
agg_param, | ||
prep_inits, | ||
}): CapnprotoExtractor<InitializeReports<'static>>, | ||
) -> impl IntoResponse { | ||
tracing::info!("initializing reports"); | ||
let initialized_reports = prep_inits | ||
.into_par_iter() | ||
.map(|prep_init| { | ||
InitializedReport::from_leader( | ||
&hpke_keys.as_ref(), | ||
valid_report_range.clone(), | ||
&task_id, | ||
&task_config, | ||
prep_init.report_share, | ||
prep_init.payload, | ||
&daphne::DapAggregationParam::get_decoded_with_param(&task_config.vdaf, &agg_param) | ||
.map_err(|e| DapAbort::from_codec_error(e, task_id))?, | ||
) | ||
}) | ||
.collect::<Result<Vec<_>, _>>(); | ||
|
||
match initialized_reports { | ||
Ok(reports) => { | ||
let body = InitializedReports { | ||
vdaf: task_config.vdaf.into_owned(), | ||
reports, | ||
} | ||
.encode_to_bytes(); | ||
|
||
(StatusCode::OK, body).into_response() | ||
} | ||
Err(error) => (StatusCode::BAD_REQUEST, axum::Json(error)).into_response(), | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
crates/daphne-service-utils/src/cpu_offload/cpu_offload.capnp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved. | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
|
||
@0xd932f3d934afce3b; | ||
|
||
# Utilities | ||
|
||
using Base = import "../capnproto/base.capnp"; | ||
|
||
using VdafConfig = Text; # json encoded | ||
|
||
struct TimeRange @0xf0d27aaa9b1959f7 { | ||
start @0 :UInt64; | ||
end @1 :UInt64; | ||
} | ||
|
||
# Top level message | ||
struct InitializeReports @0x90aadb2f44c9fb78 { | ||
hpkeKeys @0 :List(HpkeReceiverConfig); | ||
validReportRange @1 :TimeRange; | ||
taskId @2 :Base.TaskId; | ||
taskConfig @3 :PartialDapTaskConfig; | ||
aggParam @4 :Data; # encoded | ||
prepInits @5 :List(PrepareInit); | ||
} | ||
|
||
struct HpkeReceiverConfig @0xeec9b4a50458edb7 { | ||
struct HpkeConfig @0xa546066418a5cdc7 { | ||
enum HpkeKemId @0xf4bbeaed8d1fd18a { | ||
p256HkdfSha256 @0; x25519HkdfSha256 @1; | ||
} | ||
enum HpkeKdfId @0x9336afc63df27ba3 { hkdfSha256 @0; } | ||
enum HpkeAeadId @0xd68d403e118c806c { aes128Gcm @0; } | ||
|
||
id @0 :UInt8; | ||
kemId @1 :HpkeKemId; | ||
kdfId @2 :HpkeKdfId; | ||
aeadId @3 :HpkeAeadId; | ||
publicKey @4 :Data; | ||
} | ||
|
||
config @0 :HpkeConfig; | ||
privateKey @1 :Data; | ||
} | ||
|
||
struct PartialDapTaskConfig @0xdcc9bf18fc62d406 { | ||
|
||
version @0 :Base.DapVersion; | ||
methodIsTaskprov @1 :Bool; | ||
notAfter @2 :Base.Time; | ||
vdaf @3 :VdafConfig; | ||
vdafVerifyKey @4 :VdafVerifyKey; | ||
} | ||
|
||
struct VdafVerifyKey @0xf890ee7dfa2e36b8 { | ||
union { | ||
l16 @0 :Base.U8L16; | ||
l32 @1 :Base.U8L32; | ||
} | ||
} | ||
|
||
struct ReportMetadata @0xefba178ad4584bc4 { | ||
|
||
id @0 :Base.ReportId; | ||
time @1 :Base.Time; | ||
} | ||
|
||
struct PrepareInit @0x8192568cb3d03f59 { | ||
|
||
struct HpkeCiphertext @0xf0813319decf7eaf { | ||
configId @0 :UInt8; | ||
enc @1 :Data; | ||
payload @2 :Data; | ||
} | ||
|
||
struct ReportShare @0xb4134aa2db41ef60 { | ||
reportMetadata @0 :ReportMetadata; | ||
publicShare @1 :Data; | ||
encryptedInputShare @2 :HpkeCiphertext; | ||
} | ||
|
||
reportShare @0 :ReportShare; | ||
payload @1 :Data; | ||
} | ||
|
||
|
||
|
||
struct InitializedReports { | ||
struct InitializedReport { | ||
using VdafPrepShare = Data; | ||
using VdafPrepState = Data; | ||
|
||
enum ReportError { | ||
reserved @0; | ||
batchCollected @1; | ||
reportReplayed @2; | ||
reportDropped @3; | ||
hpkeUnknownConfigId @4; | ||
hpkeDecryptError @5; | ||
vdafPrepError @6; | ||
batchSaturated @7; | ||
taskExpired @8; | ||
invalidMessage @9; | ||
reportTooEarly @10; | ||
taskNotStarted @11; | ||
} | ||
|
||
|
||
union { | ||
ready :group { | ||
metadata @0 :ReportMetadata; | ||
publicShare @1 :Data; | ||
prepShare @2 :VdafPrepShare; | ||
prepState @3 :VdafPrepState; | ||
peerPrepShare @4 :Data; | ||
} | ||
rejected :group { | ||
metadata @5 :ReportMetadata; | ||
failure @6 :ReportError; | ||
} | ||
} | ||
} | ||
|
||
vdafConfig @0 :VdafConfig; | ||
reports @1 :List(InitializedReport); | ||
} |
Oops, something went wrong.