Skip to content

Commit

Permalink
Merge commit 'd7c750c00ff90b7dd908babc562c77a31d97bf5c' into damir_ke…
Browse files Browse the repository at this point in the history
…ys_algo
  • Loading branch information
modship committed Jan 23, 2025
2 parents 15d57b9 + d7c750c commit fabd1ad
Show file tree
Hide file tree
Showing 32 changed files with 1,440 additions and 443 deletions.
30 changes: 30 additions & 0 deletions massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,16 @@ impl MassaRpcServer for API<Public> {
&self,
req: Vec<DeferredCallsQuoteRequest>,
) -> RpcResult<Vec<DeferredCallsQuoteResponse>> {
let current_network_version = self
.0
.keypair_factory
.mip_store
.get_network_version_current();

if current_network_version < 1 {
return Err(ApiError::NotFound.into());
}

if req.len() as u64 > self.0.api_settings.max_arguments {
return Err(ApiError::BadRequest("too many arguments".into()).into());
}
Expand Down Expand Up @@ -1267,6 +1277,16 @@ impl MassaRpcServer for API<Public> {
&self,
arg: Vec<String>,
) -> RpcResult<Vec<DeferredCallResponse>> {
let current_network_version = self
.0
.keypair_factory
.mip_store
.get_network_version_current();

if current_network_version < 1 {
return Err(ApiError::NotFound.into());
}

if arg.len() as u64 > self.0.api_settings.max_arguments {
return Err(ApiError::BadRequest("too many arguments".into()).into());
}
Expand Down Expand Up @@ -1307,6 +1327,16 @@ impl MassaRpcServer for API<Public> {
&self,
slots: Vec<Slot>,
) -> RpcResult<Vec<DeferredCallsSlotResponse>> {
let current_network_version = self
.0
.keypair_factory
.mip_store
.get_network_version_current();

if current_network_version < 1 {
return Err(ApiError::NotFound.into());
}

if slots.len() as u64 > self.0.api_settings.max_arguments {
return Err(ApiError::BadRequest("too many arguments".into()).into());
}
Expand Down
6 changes: 3 additions & 3 deletions massa-bootstrap/src/tests/universe_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl BootstrapServerTestUniverseBuilder {
let mut batch = DBBatch::default();
let versioning_batch = DBBatch::default();
self.final_ledger
.apply_changes_to_batch(ledger_changes, &mut batch);
.apply_changes_to_batch(ledger_changes, &mut batch, 1);
self.controllers
.database
.write()
Expand All @@ -216,7 +216,7 @@ impl BootstrapServerTestUniverseBuilder {
let mut batch = DBBatch::default();
let versioning_batch = DBBatch::default();
self.final_ledger
.apply_changes_to_batch(ledger_changes, &mut batch);
.apply_changes_to_batch(ledger_changes, &mut batch, 1);
self.controllers
.database
.write()
Expand All @@ -232,7 +232,7 @@ impl BootstrapServerTestUniverseBuilder {
let mut batch = DBBatch::default();
let versioning_batch = DBBatch::default();
self.final_ledger
.apply_changes_to_batch(ledger_changes, &mut batch);
.apply_changes_to_batch(ledger_changes, &mut batch, 1);
self.controllers
.database
.write()
Expand Down
69 changes: 62 additions & 7 deletions massa-event-cache/src/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,21 +677,28 @@ impl EventCache {
) -> Result<u64, ModelsError> {
match filter_item {
FilterItem::SlotStart(start) => {
let diff = self.last_slot.slots_since(start, self.thread_count)?;
// Note: Pessimistic estimation - should we keep an average count of events per slot
// and use that instead?
let diff = self
.last_slot
.slots_since(start, self.thread_count)
.unwrap_or(0); // If start is after self.last_slot, we have no events in the filter
// Note: Pessimistic estimation - should we keep an average count of events per slot
// and use that instead?
Ok(diff
.saturating_mul(self.max_events_per_operation)
.saturating_mul(self.max_operations_per_block))
}
FilterItem::SlotStartEnd(start, end) => {
let diff = end.slots_since(start, self.thread_count)?;
// If start is after end, we have no events in the filter as it is inconsistent
let diff = end.slots_since(start, self.thread_count).unwrap_or(0);
Ok(diff
.saturating_mul(self.max_events_per_operation)
.saturating_mul(self.max_operations_per_block))
}
FilterItem::SlotEnd(end) => {
let diff = end.slots_since(&self.first_slot, self.thread_count)?;
// If end is after self.first_slot, we have no events in the filter
let diff = end
.slots_since(&self.first_slot, self.thread_count)
.unwrap_or(0);
Ok(diff
.saturating_mul(self.max_events_per_operation)
.saturating_mul(self.max_operations_per_block))
Expand Down Expand Up @@ -944,7 +951,7 @@ mod tests {
use tempfile::TempDir;
// internal
use massa_models::config::{
MAX_EVENT_DATA_SIZE, MAX_EVENT_PER_OPERATION, MAX_OPERATIONS_PER_BLOCK,
MAX_EVENT_DATA_SIZE_V1, MAX_EVENT_PER_OPERATION, MAX_OPERATIONS_PER_BLOCK,
MAX_RECURSIVE_CALLS_DEPTH, THREAD_COUNT,
};
use massa_models::operation::OperationId;
Expand All @@ -959,7 +966,7 @@ mod tests {
300,
THREAD_COUNT,
MAX_RECURSIVE_CALLS_DEPTH,
MAX_EVENT_DATA_SIZE as u64,
MAX_EVENT_DATA_SIZE_V1 as u64,
MAX_EVENT_PER_OPERATION as u64,
MAX_OPERATIONS_PER_BLOCK as u64,
5000, // MAX_EVENTS_PER_QUERY,
Expand Down Expand Up @@ -1799,6 +1806,54 @@ mod tests {
assert_eq!(emit_addr_1_count, (threshold) as u64);
assert_eq!(emit_addr_2_count, (threshold + 1 + 2) as u64);

// Check if we correctly count the number of events in the DB with slot related filters
// First with filters that should give a count of 0 (too early, too late, or inconsistent start/end)
let slot_start_1 = Slot::new(3, 0); // start is after the last slot
let slot_end_1 = Slot::new(0, 0); // end is before the first slot
let slot_start_end_1 = Slot::new(2, 0)..Slot::new(1, 0); // start is after end

// Then with filters that should give a positive count
// num_thread * max_events_per_operation * max_operations_per_block
let slot_start_2 = Slot::new(1, 0);
let slot_end_2 = Slot::new(2, 0);
let slot_start_end_2 = Slot::new(1, 0)..Slot::new(2, 0);

let slot_start_count_1 = cache
.filter_item_estimate_count(&KeyIndent::Event, &FilterItem::SlotStart(slot_start_1))
.unwrap();
let slot_end_count_1 = cache
.filter_item_estimate_count(&KeyIndent::Event, &FilterItem::SlotEnd(slot_end_1))
.unwrap();
let slot_start_end_count_1 = cache
.filter_item_estimate_count(
&KeyIndent::Event,
&FilterItem::SlotStartEnd(slot_start_end_1.start, slot_start_end_1.end),
)
.unwrap();

let slot_start_count_2 = cache
.filter_item_estimate_count(&KeyIndent::Event, &FilterItem::SlotStart(slot_start_2))
.unwrap();
let slot_end_count_2 = cache
.filter_item_estimate_count(&KeyIndent::Event, &FilterItem::SlotEnd(slot_end_2))
.unwrap();
let slot_start_end_count_2 = cache
.filter_item_estimate_count(
&KeyIndent::Event,
&FilterItem::SlotStartEnd(slot_start_end_2.start, slot_start_end_2.end),
)
.unwrap();

assert_eq!(slot_start_count_1, 0);
assert_eq!(slot_end_count_1, 0);
assert_eq!(slot_start_end_count_1, 0);

let expected_count_per_period =
THREAD_COUNT as u64 * MAX_EVENT_PER_OPERATION as u64 * MAX_OPERATIONS_PER_BLOCK as u64;
assert_eq!(slot_start_count_2, expected_count_per_period);
assert_eq!(slot_end_count_2, 2 * expected_count_per_period); // 2 periods as first_slot is (0,0)
assert_eq!(slot_start_end_count_2, expected_count_per_period);

// Check if we query first by emitter address then is_error

let filter_1 = EventFilter {
Expand Down
19 changes: 19 additions & 0 deletions massa-execution-exports/src/mapping_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use massa_proto_rs::massa::model::v1 as grpc_model;
/// Convert a `grpc_api::ScExecutionEventsRequest` to a `ScExecutionEventsRequest`
pub fn to_querystate_filter(
query: grpc_api::ExecutionQueryRequestItem,
network_version: u32,
max_datastore_query_config: Option<u32>,
max_datastore_key_length: u8,
) -> Result<ExecutionQueryRequestItem, ModelsError> {
Expand Down Expand Up @@ -188,6 +189,12 @@ pub fn to_querystate_filter(
Ok(ExecutionQueryRequestItem::Events(event_filter))
}
exec::RequestItem::DeferredCallQuote(value) => {
if network_version < 1 {
return Err(ModelsError::InvalidVersionError(
"deferred call quote is not supported in this network version".to_string(),
));
}

Ok(ExecutionQueryRequestItem::DeferredCallQuote {
target_slot: value
.target_slot
Expand All @@ -200,10 +207,22 @@ pub fn to_querystate_filter(
})
}
exec::RequestItem::DeferredCallInfo(info) => {
if network_version < 1 {
return Err(ModelsError::InvalidVersionError(
"deferred call quote is not supported in this network version".to_string(),
));
}

let id = DeferredCallId::from_str(&info.call_id)?;
Ok(ExecutionQueryRequestItem::DeferredCallInfo(id))
}
exec::RequestItem::DeferredCallsBySlot(value) => {
if network_version < 1 {
return Err(ModelsError::InvalidVersionError(
"deferred call quote is not supported in this network version".to_string(),
));
}

Ok(ExecutionQueryRequestItem::DeferredCallsBySlot(
value
.slot
Expand Down
4 changes: 3 additions & 1 deletion massa-execution-exports/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ pub struct ExecutionConfig {
/// slot execution outputs channel capacity
pub broadcast_slot_execution_output_channel_capacity: usize,
/// max size of event data, in bytes
pub max_event_size: usize,
pub max_event_size_v0: usize,
/// max size of event data, in bytes
pub max_event_size_v1: usize,
/// chain id
pub chain_id: u64,
/// whether slot execution traces broadcast is enabled
Expand Down
3 changes: 2 additions & 1 deletion massa-execution-exports/src/test_exports/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl Default for ExecutionConfig {
denunciation_expire_periods: DENUNCIATION_EXPIRE_PERIODS,
broadcast_enabled: true,
broadcast_slot_execution_output_channel_capacity: 5000,
max_event_size: 512,
max_event_size_v0: 50_000,
max_event_size_v1: 512,
max_event_per_operation: 25,
max_function_length: 1000,
max_parameter_length: 1000,
Expand Down
2 changes: 2 additions & 0 deletions massa-execution-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ test-exports = [
"massa_pos_worker",
"massa_metrics/test-exports",
"massa_metrics/test-exports",
"massa_versioning/test-exports",
"massa_db_worker",
"massa_event_cache/test-exports",
"tempfile",
Expand Down Expand Up @@ -109,6 +110,7 @@ massa_metrics = { workspace = true, features = ["test-exports"] }
massa_db_worker = { workspace = true }
tempfile = { workspace = true }
massa_test_framework = { workspace = true, "features" = ["test-exports"] }
massa_versioning = { workspace = true, "features" = ["test-exports"] }
tokio = { workspace = true, features = ["sync"] }
hex-literal = { workspace = true }
mockall = { workspace = true }
Expand Down
26 changes: 16 additions & 10 deletions massa-execution-worker/src/active_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl ActiveHistory {
&self,
message_id: &AsyncMessageId,
mut current_updates: AsyncMessageUpdate,
execution_compound_version: u32,
) -> HistorySearchResult<SetUpdateOrDelete<AsyncMessage, AsyncMessageUpdate>> {
for history_element in self.0.iter().rev() {
match history_element
Expand All @@ -81,11 +82,16 @@ impl ActiveHistory {
msg.apply(current_updates);
return HistorySearchResult::Present(SetUpdateOrDelete::Set(msg));
}
Some(SetUpdateOrDelete::Update(msg_update)) => {
let mut combined_message_update = msg_update.clone();
combined_message_update.apply(current_updates);
current_updates = combined_message_update;
}
Some(SetUpdateOrDelete::Update(msg_update)) => match execution_compound_version {
0 => {
current_updates.apply(msg_update.clone());
}
_ => {
let mut combined_message_update = msg_update.clone();
combined_message_update.apply(current_updates);
current_updates = combined_message_update;
}
},
Some(SetUpdateOrDelete::Delete) => return HistorySearchResult::Absent,
_ => (),
}
Expand Down Expand Up @@ -498,14 +504,14 @@ mod test {
// Test fetch_message with message_id (expect HistorySearchResult::Absent)
{
let current_updates = AsyncMessageUpdate::default();
let fetched = active_history.fetch_message(&message_id, current_updates);
let fetched = active_history.fetch_message(&message_id, current_updates, 1);
assert_matches!(fetched, HistorySearchResult::Absent);
}

// Test fetch_message with message_id_2 (expect HistorySearchResult::Set)
{
let current_updates = AsyncMessageUpdate::default();
let fetched = active_history.fetch_message(&message_id_2, current_updates);
let fetched = active_history.fetch_message(&message_id_2, current_updates, 1);

if let HistorySearchResult::Present(SetUpdateOrDelete::Set(msg)) = fetched {
assert_eq!(msg, msg_2);
Expand All @@ -526,7 +532,7 @@ mod test {
validity_end: SetOrKeep::Set(validity_end_new),
..Default::default()
};
let fetched = active_history.fetch_message(&message_id_2, current_updates);
let fetched = active_history.fetch_message(&message_id_2, current_updates, 1);

if let HistorySearchResult::Present(SetUpdateOrDelete::Set(msg)) = fetched {
assert_ne!(msg, msg_2);
Expand All @@ -542,7 +548,7 @@ mod test {
// Test fetch_message with message_id_3 (expect HistorySearchResult::Present)
{
let current_updates = AsyncMessageUpdate::default();
let fetched = active_history.fetch_message(&message_id_3, current_updates);
let fetched = active_history.fetch_message(&message_id_3, current_updates, 1);

if let HistorySearchResult::Present(SetUpdateOrDelete::Set(msg)) = fetched {
// Check the updates were applied correctly
Expand All @@ -561,7 +567,7 @@ mod test {
// Expect updates to be empty (or default) here
{
let current_updates = AsyncMessageUpdate::default();
let fetched = active_history.fetch_message(&message_id_3_2, current_updates);
let fetched = active_history.fetch_message(&message_id_3_2, current_updates, 1);
if let HistorySearchResult::Present(SetUpdateOrDelete::Update(updates)) = fetched {
assert_eq!(updates, AsyncMessageUpdate::default());
} else {
Expand Down
Loading

0 comments on commit fabd1ad

Please sign in to comment.