Skip to content

Commit

Permalink
Process each expired epoch in separate Kafka + DB transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
DJAndries committed Nov 15, 2024
1 parent 8ff985e commit 82d2bb6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
13 changes: 2 additions & 11 deletions src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ mod spot;

use crate::aggregator::spot::check_spot_termination_status;
use crate::epoch::EpochConfig;
use crate::models::{
begin_db_transaction, commit_db_transaction, DBConnectionType, DBPool, DBStorageConnections,
PgStoreError,
};
use crate::models::{DBConnectionType, DBPool, DBStorageConnections, PgStoreError};
use crate::profiler::{Profiler, ProfilerStat};
use crate::record_stream::{
get_data_channel_topic_from_env, KafkaRecordStream, KafkaRecordStreamConfig, RecordStream,
Expand Down Expand Up @@ -214,21 +211,15 @@ pub async fn start_aggregation(
let mut out_stream = create_output_stream(output_measurements_to_stdout, channel_name)?;
if let Some(out_stream) = out_stream.as_mut() {
out_stream.init_producer_queues().await;
out_stream.begin_producer_transaction()?;
}
let db_conn = Arc::new(Mutex::new(db_pool.get().await?));
begin_db_transaction(db_conn.clone())?;
process_expired_epochs(
db_conn.clone(),
&epoch_config,
out_stream.as_ref().map(|v| v.as_ref()),
out_stream.as_ref(),
profiler.clone(),
)
.await?;
if let Some(out_stream) = out_stream.as_ref() {
wait_and_commit_producer(out_stream).await?;
}
commit_db_transaction(db_conn)?;
info!("Profiler summary:\n{}", profiler.summary().await);

info!("Finished aggregation");
Expand Down
21 changes: 16 additions & 5 deletions src/aggregator/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use super::group::{GroupedMessages, MessageChunk};
use super::recovered::RecoveredMessages;
use super::report::report_measurements;
use super::AggregatorError;
use crate::aggregator::wait_and_commit_producer;
use crate::epoch::EpochConfig;
use crate::models::{
DBConnection, DBPool, DBStorageConnections, MessageWithThreshold, PendingMessage,
RecoveredMessage,
begin_db_transaction, commit_db_transaction, DBConnection, DBPool, DBStorageConnections,
MessageWithThreshold, PendingMessage, RecoveredMessage,
};
use crate::profiler::{Profiler, ProfilerStat};
use crate::record_stream::{DynRecordStream, RecordStreamArc};
use crate::record_stream::RecordStreamArc;
use crate::star::{recover_key, recover_msgs, AppSTARError, MsgRecoveryInfo};
use star_constellation::api::NestedMessage;
use star_constellation::Error as ConstellationError;
Expand All @@ -20,7 +21,7 @@ use tokio::task::JoinHandle;
pub async fn process_expired_epochs(
conn: Arc<Mutex<DBConnection>>,
epoch_config: &EpochConfig,
out_stream: Option<&DynRecordStream>,
out_stream: Option<&RecordStreamArc>,
profiler: Arc<Profiler>,
) -> Result<(), AggregatorError> {
let epochs = RecoveredMessage::list_distinct_epochs(conn.clone()).await?;
Expand All @@ -29,6 +30,11 @@ pub async fn process_expired_epochs(
continue;
}
info!("Detected expired epoch '{}', processing...", epoch);
if let Some(out_stream) = out_stream.as_ref() {
out_stream.begin_producer_transaction()?;
}
begin_db_transaction(conn.clone())?;

let mut rec_msgs = RecoveredMessages::default();
rec_msgs
.fetch_all_recovered_with_nonzero_count(conn.clone(), epoch as u8, profiler.clone())
Expand All @@ -39,12 +45,17 @@ pub async fn process_expired_epochs(
epoch_config,
epoch as u8,
true,
out_stream,
out_stream.as_ref().map(|v| v.as_ref()),
profiler.clone(),
)
.await?;
RecoveredMessage::delete_epoch(conn.clone(), epoch, profiler.clone()).await?;
PendingMessage::delete_epoch(conn.clone(), epoch, profiler.clone()).await?;

if let Some(out_stream) = out_stream.as_ref() {
wait_and_commit_producer(out_stream).await?;
}
commit_db_transaction(conn.clone())?;
}
Ok(())
}
Expand Down

0 comments on commit 82d2bb6

Please sign in to comment.