Skip to content

Commit

Permalink
Move HashedAggregationJobReq to roles/helper
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Jan 14, 2025
1 parent bdd6738 commit 040b1ba
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 91 deletions.
4 changes: 2 additions & 2 deletions crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use axum::async_trait;
use daphne::{
messages::{request::AggregationJobRequestHash, AggregationJobId, TaskId},
roles::DapHelper,
messages::{AggregationJobId, TaskId},
roles::{helper::AggregationJobRequestHash, DapHelper},
DapError, DapVersion,
};

Expand Down
3 changes: 2 additions & 1 deletion crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use daphne::{
error::DapAbort,
fatal_error,
messages::{
request::{CollectionPollReq, HashedAggregationJobReq, RequestBody},
request::{CollectionPollReq, RequestBody},
taskprov::TaskprovAdvertisement,
AggregateShareReq, AggregationJobInitReq, CollectionReq, Report, TaskId,
},
roles::helper::HashedAggregationJobReq,
DapError, DapRequest, DapRequestMeta, DapVersion,
};
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
Expand Down
7 changes: 5 additions & 2 deletions crates/daphne-server/src/router/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use axum::{
routing::{post, put},
};
use daphne::{
messages::{request::HashedAggregationJobReq, AggregateShareReq},
roles::{helper, DapHelper},
messages::AggregateShareReq,
roles::{
helper::{self, HashedAggregationJobReq},
DapHelper,
},
};
use http::StatusCode;

Expand Down
4 changes: 2 additions & 2 deletions crates/daphne-worker/src/aggregator/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::aggregator::App;
use daphne::{
error::DapAbort,
fatal_error,
messages::{request::AggregationJobRequestHash, AggregationJobId, TaskId},
roles::DapHelper,
messages::{AggregationJobId, TaskId},
roles::{helper::AggregationJobRequestHash, DapHelper},
DapError, DapVersion,
};
use daphne_service_utils::durable_requests::bindings::aggregation_job_store;
Expand Down
3 changes: 2 additions & 1 deletion crates/daphne-worker/src/aggregator/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use daphne::{
error::DapAbort,
fatal_error,
messages::{
request::{CollectionPollReq, HashedAggregationJobReq, RequestBody},
request::{CollectionPollReq, RequestBody},
taskprov::TaskprovAdvertisement,
AggregateShareReq, AggregationJobInitReq, CollectionReq, Report, TaskId,
},
roles::helper::HashedAggregationJobReq,
DapError, DapRequest, DapRequestMeta, DapVersion,
};
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
Expand Down
7 changes: 5 additions & 2 deletions crates/daphne-worker/src/aggregator/router/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ use axum::{
use daphne::{
fatal_error,
hpke::HpkeProvider,
messages::{request::HashedAggregationJobReq, AggregateShareReq},
roles::{helper, DapAggregator, DapHelper},
messages::AggregateShareReq,
roles::{
helper::{self, HashedAggregationJobReq},
DapAggregator, DapHelper,
},
DapError, DapResponse,
};
use daphne_service_utils::compute_offload;
Expand Down
69 changes: 4 additions & 65 deletions crates/daphne/src/messages/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use super::{
taskprov::TaskprovAdvertisement, AggregateShareReq, AggregationJobId, AggregationJobInitReq,
CollectionJobId, CollectionReq, Report,
};
use crate::{constants::DapMediaType, error::DapAbort, messages::TaskId, DapVersion};
use prio::codec::{ParameterizedDecode, ParameterizedEncode};
use crate::{
constants::DapMediaType, error::DapAbort, messages::TaskId,
roles::helper::HashedAggregationJobReq, DapVersion,
};

pub trait RequestBody {
type ResourceId;
Expand All @@ -25,69 +27,6 @@ macro_rules! impl_req_body {
};
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
pub struct AggregationJobRequestHash(Vec<u8>);

impl AggregationJobRequestHash {
pub fn get(&self) -> &[u8] {
&self.0
}

fn hash(bytes: &[u8]) -> Self {
Self(
ring::digest::digest(&ring::digest::SHA256, bytes)
.as_ref()
.to_vec(),
)
}
}

pub struct HashedAggregationJobReq {
pub request: AggregationJobInitReq,
pub hash: AggregationJobRequestHash,
}

impl HashedAggregationJobReq {
#[cfg(any(test, feature = "test-utils"))]
pub fn from_aggregation_req(version: DapVersion, request: AggregationJobInitReq) -> Self {
let mut buf = Vec::new();
request.encode_with_param(&version, &mut buf).unwrap();
Self {
request,
hash: AggregationJobRequestHash::hash(&buf),
}
}
}

impl ParameterizedEncode<DapVersion> for HashedAggregationJobReq {
fn encode_with_param(
&self,
encoding_parameter: &DapVersion,
bytes: &mut Vec<u8>,
) -> Result<(), prio::codec::CodecError> {
self.request.encode_with_param(encoding_parameter, bytes)
}
}

impl ParameterizedDecode<DapVersion> for HashedAggregationJobReq {
fn decode_with_param(
decoding_parameter: &DapVersion,
bytes: &mut std::io::Cursor<&[u8]>,
) -> Result<Self, prio::codec::CodecError> {
let start = usize::try_from(bytes.position())
.map_err(|e| prio::codec::CodecError::Other(Box::new(e)))?;
let request = AggregationJobInitReq::decode_with_param(decoding_parameter, bytes)?;
let end = usize::try_from(bytes.position())
.map_err(|e| prio::codec::CodecError::Other(Box::new(e)))?;

Ok(Self {
request,
hash: AggregationJobRequestHash::hash(&bytes.get_ref()[start..end]),
})
}
}

impl_req_body! {
// body type | id type
// --------------------| ----------------
Expand Down
6 changes: 3 additions & 3 deletions crates/daphne/src/roles/helper/handle_agg_job.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use super::{check_part_batch, DapHelper};
use super::{check_part_batch, DapHelper, HashedAggregationJobReq};
use crate::{
error::DapAbort,
messages::{
request::HashedAggregationJobReq, AggregationJobInitReq, AggregationJobResp,
PartialBatchSelector, ReportError, TaskId, TransitionVar,
AggregationJobInitReq, AggregationJobResp, PartialBatchSelector, ReportError, TaskId,
TransitionVar,
},
metrics::ReportStatus,
protocol::aggregator::ReportProcessedStatus,
Expand Down
70 changes: 67 additions & 3 deletions crates/daphne/src/roles/helper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,79 @@ use crate::{
constants::DapMediaType,
error::DapAbort,
messages::{
constant_time_eq,
request::{AggregationJobRequestHash, HashedAggregationJobReq},
AggregateShare, AggregateShareReq, AggregationJobId, PartialBatchSelector, TaskId,
constant_time_eq, AggregateShare, AggregateShareReq, AggregationJobId,
AggregationJobInitReq, PartialBatchSelector, TaskId,
},
metrics::{DaphneRequestType, ReportStatus},
protocol::aggregator::ReplayProtection,
DapAggregationParam, DapError, DapRequest, DapResponse, DapTaskConfig, DapVersion,
};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
pub struct AggregationJobRequestHash(Vec<u8>);

impl AggregationJobRequestHash {
pub fn get(&self) -> &[u8] {
&self.0
}

fn hash(bytes: &[u8]) -> Self {
Self(
ring::digest::digest(&ring::digest::SHA256, bytes)
.as_ref()
.to_vec(),
)
}
}

/// An [`AggregationJobInitReq`] and its hash. Used by the helper to prevent the parameters of an
/// aggregation job from changing.
pub struct HashedAggregationJobReq {
pub request: AggregationJobInitReq,
pub hash: AggregationJobRequestHash,
}

impl HashedAggregationJobReq {
#[cfg(any(test, feature = "test-utils"))]
pub fn from_aggregation_req(version: DapVersion, request: AggregationJobInitReq) -> Self {
let mut buf = Vec::new();
request.encode_with_param(&version, &mut buf).unwrap();
Self {
request,
hash: AggregationJobRequestHash::hash(&buf),
}
}
}

impl ParameterizedEncode<DapVersion> for HashedAggregationJobReq {
fn encode_with_param(
&self,
encoding_parameter: &DapVersion,
bytes: &mut Vec<u8>,
) -> Result<(), prio::codec::CodecError> {
self.request.encode_with_param(encoding_parameter, bytes)
}
}

impl ParameterizedDecode<DapVersion> for HashedAggregationJobReq {
fn decode_with_param(
decoding_parameter: &DapVersion,
bytes: &mut std::io::Cursor<&[u8]>,
) -> Result<Self, prio::codec::CodecError> {
let start = usize::try_from(bytes.position())
.map_err(|e| prio::codec::CodecError::Other(Box::new(e)))?;
let request = AggregationJobInitReq::decode_with_param(decoding_parameter, bytes)?;
let end = usize::try_from(bytes.position())
.map_err(|e| prio::codec::CodecError::Other(Box::new(e)))?;

Ok(Self {
request,
hash: AggregationJobRequestHash::hash(&bytes.get_ref()[start..end]),
})
}
}

/// DAP Helper functionality.
#[async_trait]
pub trait DapHelper: DapAggregator {
Expand Down
11 changes: 5 additions & 6 deletions crates/daphne/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,12 @@ mod test {
constants::DapMediaType,
hpke::{HpkeKemId, HpkeProvider, HpkeReceiverConfig},
messages::{
request::{HashedAggregationJobReq, RequestBody},
AggregateShareReq, AggregationJobId, AggregationJobInitReq, AggregationJobResp,
BatchId, BatchSelector, Collection, CollectionJobId, CollectionReq, Extension,
HpkeCiphertext, Interval, PartialBatchSelector, Query, Report, ReportError, TaskId,
Time, TransitionVar,
request::RequestBody, AggregateShareReq, AggregationJobId, AggregationJobInitReq,
AggregationJobResp, BatchId, BatchSelector, Collection, CollectionJobId, CollectionReq,
Extension, HpkeCiphertext, Interval, PartialBatchSelector, Query, Report, ReportError,
TaskId, Time, TransitionVar,
},
roles::{leader::WorkItem, DapAggregator},
roles::{helper::HashedAggregationJobReq, leader::WorkItem, DapAggregator},
testing::InMemoryAggregator,
vdaf::{Prio3Config, VdafConfig},
DapAbort, DapAggregationJobState, DapAggregationParam, DapBatchBucket, DapBatchMode,
Expand Down
8 changes: 4 additions & 4 deletions crates/daphne/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use crate::{
fatal_error,
hpke::{HpkeConfig, HpkeKemId, HpkeProvider, HpkeReceiverConfig},
messages::{
self, request::AggregationJobRequestHash, AggregationJobId, AggregationJobInitReq,
AggregationJobResp, Base64Encode, BatchId, BatchSelector, Collection, CollectionJobId,
HpkeCiphertext, Interval, PartialBatchSelector, Report, ReportId, TaskId, Time,
self, AggregationJobId, AggregationJobInitReq, AggregationJobResp, Base64Encode, BatchId,
BatchSelector, Collection, CollectionJobId, HpkeCiphertext, Interval, PartialBatchSelector,
Report, ReportId, TaskId, Time,
},
metrics::{prometheus::DaphnePromMetrics, DaphneMetrics},
roles::{
aggregator::{MergeAggShareError, TaskprovConfig},
helper,
helper::{self, AggregationJobRequestHash},
leader::{in_memory_leader::InMemoryLeaderState, WorkItem},
DapAggregator, DapHelper, DapLeader,
},
Expand Down

0 comments on commit 040b1ba

Please sign in to comment.