Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…der-failing' into 'master'

chore(sync-call): [NET-1688] Handle messages completing execution,  state reader failing.

Closes NET-1688

- Adds a concrete return type for the call v3 handler, `CallV3Response` that follows the IC-spec.
- Add two test cases to assert that `202` is returned if the state reader fails or the certified state snapshot fails.
- Bug fix: Future implementation for `CertificationSubscriber` failed to wake up the Waker. Removed the Future implementation in favor of an async `wait_for_certification()`. 

Closes NET-1688

See merge request dfinity-lab/public/ic!19760
  • Loading branch information
DSharifi committed Jun 12, 2024
2 parents d3fe128 + de55c8e commit 1e898a7
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 79 deletions.
160 changes: 98 additions & 62 deletions rs/http_endpoints/public/src/call/call_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use axum::{
use http::Request;
use hyper::StatusCode;
use ic_crypto_tree_hash::{sparse_labeled_tree_from_paths, Label, Path};
use ic_error_types::UserError;
use ic_interfaces_state_manager::StateReader;
use ic_logger::{error, warn};
use ic_replicated_state::ReplicatedState;
Expand All @@ -31,8 +32,71 @@ use std::{
sync::{Arc, RwLock},
time::Duration,
};
use tokio_util::time::FutureExt;
use tower::{util::BoxCloneService, ServiceBuilder};

enum CallV3Response {
Certificate(Certificate),
UserError(UserError),
Accepted(&'static str),
HttpError(HttpError),
}

impl IntoResponse for CallV3Response {
fn into_response(self) -> Response {
match self {
CallV3Response::Certificate(cert) => Cbor(CBOR::Map(BTreeMap::from([
(
CBOR::Text("status".to_string()),
CBOR::Text("replied".to_string()),
),
(
CBOR::Text("certificate".to_string()),
CBOR::Bytes(into_cbor(&cert)),
),
])))
.into_response(),

CallV3Response::UserError(user_err) => Cbor(CBOR::Map(BTreeMap::from([
(
CBOR::Text("status".to_string()),
CBOR::Text("non_replicated_rejection".to_string()),
),
(
CBOR::Text("error_code".to_string()),
CBOR::Text(user_err.code().to_string()),
),
(
CBOR::Text("reject_message".to_string()),
CBOR::Text(user_err.description().to_string()),
),
(
CBOR::Text("reject_code".to_string()),
CBOR::Integer(user_err.reject_code() as i128),
),
])))
.into_response(),

CallV3Response::Accepted(reason) => {
(StatusCode::ACCEPTED, reason.to_string()).into_response()
}

CallV3Response::HttpError(HttpError { status, message }) => {
(status, message).into_response()
}
}
}
}

impl From<IngressError> for CallV3Response {
fn from(err: IngressError) -> Self {
match err {
IngressError::UserError(user_err) => CallV3Response::UserError(user_err),
IngressError::HttpError(http_err) => CallV3Response::HttpError(http_err),
}
}
}

#[derive(Clone)]
pub struct CallServiceV3 {
ingress_watcher_handle: IngressWatcherHandle,
Expand Down Expand Up @@ -100,44 +164,25 @@ async fn call_sync_v3(
delegation_from_nns,
}): State<CallServiceV3>,
request: Cbor<HttpRequestEnvelope<HttpCallContent>>,
) -> Result<impl IntoResponse, Response> {
) -> CallV3Response {
let log = call_handler.log.clone();

let ingress_submitter = call_handler
let ingress_submitter = match call_handler
.validate_ingress_message(request, effective_canister_id)
.await
.map_err(|err| match err {
IngressError::UserError(user_err) => Cbor(CBOR::Map(BTreeMap::from([
(
CBOR::Text("status".to_string()),
CBOR::Text("non_replicated_rejection".to_string()),
),
(
CBOR::Text("error_code".to_string()),
CBOR::Text(user_err.code().to_string()),
),
(
CBOR::Text("reject_message".to_string()),
CBOR::Text(user_err.description().to_string()),
),
(
CBOR::Text("reject_code".to_string()),
CBOR::Integer(user_err.reject_code() as i128),
),
])))
.into_response(),
IngressError::HttpError(HttpError { status, message }) => {
(status, message).into_response()
}
})?;
{
Ok(ingress_submitter) => ingress_submitter,
Err(err) => return CallV3Response::from(err),
};

let message_id = ingress_submitter.message_id();

let certification_subscriber = match tokio::time::timeout(
Duration::from_secs(ingress_message_certificate_timeout_seconds),
ingress_watcher_handle.subscribe_for_certification(message_id.clone()),
)
.await
let timeout = Duration::from_secs(ingress_message_certificate_timeout_seconds);

let certification_subscriber = match ingress_watcher_handle
.subscribe_for_certification(message_id.clone())
.timeout(timeout)
.await
{
Ok(Ok(message_subscriber)) => Ok(message_subscriber),
Ok(Err(SubscriptionError::DuplicateSubscriptionError)) => {
Expand All @@ -162,29 +207,28 @@ async fn call_sync_v3(
}
};

ingress_submitter
.try_submit()
.map_err(|HttpError { status, message }| (status, message).into_response())?;

let make_accepted_response =
|reason: &str| Ok((StatusCode::ACCEPTED, reason.to_string()).into_response());
let ingres_submission = ingress_submitter.try_submit();

if let Err(ingress_submission) = ingres_submission {
return CallV3Response::HttpError(ingress_submission);
}
// The ingress message was submitted successfully.
// From this point on we only return a certificate or `Accepted 202``.
let certification_subscriber = match certification_subscriber {
Ok(certification_subscriber) => certification_subscriber,
Err(reason) => {
return make_accepted_response(reason);
return CallV3Response::Accepted(reason);
}
};

match tokio::time::timeout(
Duration::from_secs(ingress_message_certificate_timeout_seconds),
certification_subscriber,
)
.await
match certification_subscriber
.wait_for_certification()
.timeout(timeout)
.await
{
Ok(()) => (),
Err(_) => {
return make_accepted_response(
return CallV3Response::Accepted(
"Message did not complete execution and certification within the replica defined timeout.",
);
}
Expand All @@ -197,7 +241,7 @@ async fn call_sync_v3(
{
Ok(Some(certified_state_reader)) => certified_state_reader,
Ok(None) | Err(_) => {
return make_accepted_response(
return CallV3Response::Accepted(
"Certified state is not available. Please try /read_state.",
);
}
Expand All @@ -213,25 +257,17 @@ async fn call_sync_v3(
.expect("Path is within length bound.");

let Some((tree, certification)) = certified_state_reader.read_certified_state(&tree) else {
return make_accepted_response("Certified state is not available. Please try /read_state.");
return CallV3Response::Accepted(
"Certified state is not available. Please try /read_state.",
);
};

let delegation_from_nns = delegation_from_nns.read().unwrap().clone();
let signature = certification.signed.signature.signature.get().0;
let certified_response = Cbor(CBOR::Map(BTreeMap::from([
(
CBOR::Text("status".to_string()),
CBOR::Text("replied".to_string()),
),
(
CBOR::Text("certificate".to_string()),
CBOR::Bytes(into_cbor(&Certificate {
tree,
signature: Blob(signature),
delegation: delegation_from_nns,
})),
),
])));

Ok(certified_response.into_response())

CallV3Response::Certificate(Certificate {
tree,
signature: Blob(signature),
delegation: delegation_from_nns,
})
}
15 changes: 3 additions & 12 deletions rs/http_endpoints/public/src/call/ingress_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::metrics::HttpHandlerMetrics;
use futures::{Future, FutureExt};
use ic_async_utils::JoinMap;
use ic_logger::{error, ReplicaLogger};
use ic_types::{messages::MessageId, Height};
Expand Down Expand Up @@ -89,17 +88,9 @@ pub(crate) struct IngressCertificationSubscriber {
_drop_guard: DropGuard,
}

impl Future for IngressCertificationSubscriber {
type Output = ();

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<()> {
self.certification_notifier
.notified()
.boxed()
.poll_unpin(cx)
impl IngressCertificationSubscriber {
pub(crate) async fn wait_for_certification(self) {
self.certification_notifier.notified().await;
}
}

Expand Down
4 changes: 0 additions & 4 deletions rs/http_endpoints/public/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,6 @@ fn basic_state_manager_mock() -> MockStateManager {
.expect_read_certified_state()
.returning(default_read_certified_state);

mock_state_manager
.expect_read_certified_state()
.returning(default_read_certified_state);

mock_state_manager
.expect_latest_certified_height()
.returning(default_latest_certified_height);
Expand Down
106 changes: 105 additions & 1 deletion rs/http_endpoints/public/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod common;

use crate::common::{
create_conn_and_send_request, default_get_latest_state, default_latest_certified_height,
get_free_localhost_socket_addr,
default_read_certified_state, get_free_localhost_socket_addr,
test_agent::{self, wait_for_status_healthy, IngressMessage},
HttpEndpointBuilder,
};
Expand Down Expand Up @@ -1287,3 +1287,107 @@ fn test_synchronous_call_endpoint_no_certification(
);
});
}

struct FakeCertifiedStateSnapshot;

impl CertifiedStateSnapshot for FakeCertifiedStateSnapshot {
type State = ReplicatedState;

fn get_state(&self) -> &ReplicatedState {
unimplemented!()
}

fn get_height(&self) -> Height {
unimplemented!()
}

fn read_certified_state(
&self,
_paths: &LabeledTree<()>,
) -> Option<(MixedHashTree, Certification)> {
None
}
}

/// Tests that the /v3/.../call endpoint responds with `202 ACCEPTED` for
/// ingress messages that complete execution, but the state reader fails to
/// read the certified state.
#[rstest]
#[case::certified_state_snapshot_unavailable(None)]
#[case::reading_certified_state_fails(Some(Box::new(FakeCertifiedStateSnapshot) as _))]
fn test_call_v3_response_when_state_reader_fails(
#[case] certified_state_snapshot: Option<
Box<dyn CertifiedStateSnapshot<State = ReplicatedState>>,
>,
) {
let rt = Runtime::new().unwrap();
let addr = get_free_localhost_socket_addr();
let config = Config {
listen_addr: addr,
max_request_size_bytes: 2048,
..Default::default()
};

let mut mock_state_manager = MockStateManager::new();
mock_state_manager
.expect_get_latest_state()
.returning(default_get_latest_state);

mock_state_manager
.expect_read_certified_state()
.returning(default_read_certified_state);

mock_state_manager
.expect_latest_certified_height()
.returning(default_latest_certified_height);

// Inject the mock certified state snapshot
mock_state_manager
.expect_get_certified_state_snapshot()
.return_once(move || certified_state_snapshot);

let mut handlers = HttpEndpointBuilder::new(rt.handle().clone(), config)
.with_state_manager(mock_state_manager)
.run();

let message = IngressMessage::default();

// Mock ingress filter to always accept the message.
rt.spawn(async move {
loop {
let (_, resp) = handlers.ingress_filter.next_request().await.unwrap();
resp.send_response(Ok(()))
}
});

rt.spawn(async move {
let new_ingress = handlers.ingress_rx.recv().await.unwrap();
let UnvalidatedArtifactMutation::Insert((message, _)) = new_ingress else {
panic!("Expected Insert");
};
let message_id = message.id();

// Execute the ingress and certify it.
handlers
.terminal_state_ingress_messages
.send((message_id, Height::from(0)))
.unwrap();
handlers
.certified_height_watcher
.send(Height::from(1))
.unwrap();
});

rt.block_on(async {
wait_for_status_healthy(&addr).await.unwrap();
let response = test_agent::Call::V3.call(addr, message).await;
let status = response.status();
let text = response.text().await;
assert_eq!(StatusCode::ACCEPTED, status, "{:?}", text.unwrap());

assert_eq!(
"Certified state is not available. Please try /read_state.",
text.unwrap()
)
});
}

0 comments on commit 1e898a7

Please sign in to comment.