Skip to content

Commit

Permalink
Merge pull request #656 from primitivefinance/colin/fix-event-logger
Browse files Browse the repository at this point in the history
`data_collection.rs` stability improvements
  • Loading branch information
0xJepsen authored Oct 27, 2023
2 parents 1e983a6 + e69eb77 commit dc7bb8c
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 195 deletions.
214 changes: 135 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

212 changes: 124 additions & 88 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@
//! `Sync`, and has a static lifetime.
//! * `E` - Type that implements the `EthLogDecode`, `Debug`, `Serialize`
//! traits, and has a static lifetime.
use std::{collections::BTreeMap, env::current_dir, fmt::Debug, sync::Arc};
use std::{
collections::BTreeMap, fmt::Debug, io::BufWriter, marker::PhantomData, mem::transmute,
sync::Arc,
};

use ethers::{
abi::RawLog,
contract::{builders::Event, EthLogDecode},
providers::StreamExt as ProviderStreamExt,
providers::Middleware,
types::{Filter, FilteredParams},
};
use serde::Serialize;
use serde_json::Value;
use tokio::io::AsyncWriteExt;
use tracing::info;

use crate::middleware::{errors::RevmMiddlewareError, RevmMiddleware};
use crate::{
environment::Broadcast,
middleware::{cast::revm_logs_to_ethers_logs, errors::RevmMiddlewareError, RevmMiddleware},
};

type FilterDecoder =
BTreeMap<String, (FilteredParams, Box<dyn Fn(&RawLog) -> String + Send + Sync>)>;
/// `EventLogger` is a struct that logs events from the Ethereum network.
///
/// It contains a BTreeMap of events, where each event is represented by a
Expand All @@ -45,8 +53,10 @@ use crate::middleware::{errors::RevmMiddlewareError, RevmMiddleware};
/// * `E` - Type that implements the `EthLogDecode`, `Debug`, `Serialize`
/// traits, and has a static lifetime.
pub struct EventLogger {
events: tokio::task::JoinSet<()>,
path: Option<String>,
directory: Option<String>,
file_name: Option<String>,
decoder: FilterDecoder,
receiver: Option<crossbeam_channel::Receiver<Broadcast>>,
}

impl EventLogger {
Expand All @@ -58,8 +68,10 @@ impl EventLogger {
/// no specified path.
pub fn builder() -> Self {
Self {
events: tokio::task::JoinSet::new(),
path: None,
directory: None,
file_name: None,
decoder: BTreeMap::new(),
receiver: None,
}
}

Expand All @@ -73,93 +85,59 @@ impl EventLogger {
/// # Returns
///
/// The `EventLogger` instance with the added event.
pub fn add<S: Into<String>, E: EthLogDecode + Debug + Serialize + 'static>(
pub fn add<S: Into<String>, D: EthLogDecode + Debug + Serialize + 'static>(
mut self,
event: Event<Arc<RevmMiddleware>, RevmMiddleware, E>,
event: Event<Arc<RevmMiddleware>, RevmMiddleware, D>,
name: S,
) -> Self {
let name = name.into();
let event_dir = current_dir()
.unwrap()
.join(self.path.clone().unwrap_or("events".into()))
.join(name);
std::fs::create_dir_all(&event_dir).unwrap();
self.events.spawn(async move {
let mut stream = event.stream().await.unwrap();
let mut files: BTreeMap<String, tokio::fs::File> = BTreeMap::new();
let mut columns_written: BTreeMap<String, bool> = BTreeMap::new();
while let Some(Ok(log)) = stream.next().await {
let serialized = serde_json::to_string(&log).unwrap();
let deserialized: BTreeMap<String, Value> =
serde_json::from_str(&serialized).unwrap();
let (key, value) = deserialized.iter().next().unwrap();
let file_name = event_dir.join(format!("{}.csv", key));
let file_key = file_name.to_str().unwrap();
let file_value = files.get(file_key);
let toggle_written_columns = columns_written.get(file_key).unwrap_or(&false);
if file_value.is_none() {
files.insert(
file_key.into(),
tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&file_name)
.await
.unwrap(),
);
}

let file = files.get_mut(file_key).unwrap();

if toggle_written_columns == &true {
let values = value
.as_object()
.unwrap()
.values()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",");
file.write_all(values.as_bytes()).await.unwrap();
file.write_all("\n".as_bytes()).await.unwrap();
} else {
columns_written.entry(file_key.into()).or_insert(true);
let columns = value
.as_object()
.unwrap()
.keys()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",");
file.write_all(columns.as_bytes()).await.unwrap();
file.write_all("\n".as_bytes()).await.unwrap();
let values = value
.as_object()
.unwrap()
.values()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",");
file.write_all(values.as_bytes()).await.unwrap();
file.write_all("\n".as_bytes()).await.unwrap();
}
continue;
}
});
// Grab the connection from the client and add a new event sender so that we
// have a distinct channel to now receive events over
let event_transmuted: EventTransmuted<Arc<RevmMiddleware>, RevmMiddleware, D> =
unsafe { transmute(event) };
let middleware = event_transmuted.provider.clone();
let decoder = |x: &_| serde_json::to_string(&D::decode_log(x).unwrap()).unwrap();
let filter = event_transmuted.filter.clone();
self.decoder.insert(
name.into(),
(FilteredParams::new(Some(filter)), Box::new(decoder)),
);
let connection = middleware.provider().as_ref();
if self.receiver.is_none() {
let (event_sender, event_receiver) = crossbeam_channel::unbounded::<Broadcast>();
connection
.event_broadcaster
.lock()
.unwrap()
.add_sender(event_sender);
self.receiver = Some(event_receiver);
}
self
}
/// Sets the directory for the `EventLogger`.
///
/// # Arguments
///
/// * `directory` - The directory where the event logs will be stored.
///
/// # Returns
///
/// The `EventLogger` instance with the specified directory.
pub fn directory<S: Into<String>>(mut self, path: S) -> Self {
self.directory = Some(path.into());
self
}

/// Sets the path for the `EventLogger`.
/// Sets the output file name for the `EventLogger`.
///
/// # Arguments
///
/// * `path` - The path where the event logs will be stored.
/// * `file_name` - The file where the event logs will be stored.
///
/// # Returns
///
/// The `EventLogger` instance with the specified path.
pub fn path<S: Into<String>>(mut self, path: S) -> Self {
self.path = Some(path.into());
/// The `EventLogger` instance with the specified file.
pub fn file_name<S: Into<String>>(mut self, path: S) -> Self {
self.file_name = Some(path.into());
self
}

Expand All @@ -184,12 +162,70 @@ impl EventLogger {
/// This function will return an error if there is a problem creating the
/// directories or files, or writing to the files.
pub fn run(self) -> Result<(), RevmMiddlewareError> {
tokio::spawn(async move {
let mut set = self.events;
while let Some(res) = set.join_next().await {
info!("task completed: {:?}", res);
let receiver = self.receiver.unwrap();
let dir = self.directory.unwrap_or("./out".into());
let file_name = self.file_name.unwrap_or("output".into());
std::thread::spawn(move || {
let mut logs: BTreeMap<String, BTreeMap<String, Vec<Value>>> = BTreeMap::new();
while let Ok(broadcast) = receiver.recv() {
match broadcast {
Broadcast::StopSignal => {
// create new directory with path
let output_dir = std::env::current_dir().unwrap().join(dir);
std::fs::create_dir_all(&output_dir).unwrap();
let file_path = output_dir.join(format!("{}.json", file_name));
let file = std::fs::File::create(file_path).unwrap();
let writer = BufWriter::new(file);
serde_json::to_writer(writer, &logs).expect("Unable to write data");
break;
}
Broadcast::Event(event) => {
let ethers_logs = revm_logs_to_ethers_logs(event);
for log in ethers_logs {
for (contract_name, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
let cloned_logs = log.clone();
let event_as_value = serde_json::from_str::<Value>(&decoder(
&cloned_logs.into(),
))
.unwrap();
let event_as_object = event_as_value.as_object().unwrap();

let contract = logs.get(contract_name);
if contract.is_none() {
logs.insert(contract_name.clone(), BTreeMap::new());
}
let contract = logs.get_mut(contract_name).unwrap();

let event_name =
event_as_object.clone().keys().collect::<Vec<&String>>()[0]
.clone();

let event = contract.get_mut(&event_name);
if event.is_none() {
contract.insert(event_name.to_string(), vec![]);
}
let event = contract.get_mut(&event_name).unwrap();

for (_key, value) in event_as_object {
event.push(value.clone());
}
}
}
}
}
}
}
});
Ok(())
}
}

struct EventTransmuted<B, M, D> {
/// The event filter's state
pub filter: Filter,
pub(crate) provider: B,
/// Stores the event datatype
pub(crate) datatype: PhantomData<D>,
pub(crate) _m: PhantomData<M>,
}
2 changes: 1 addition & 1 deletion arbiter-core/src/environment/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub enum EnvironmentError {
/// [`EventBroadcaster`] fails to broadcast events. This should be
/// rare (if not impossible). If this is thrown, please report this error!
#[error("error broadcasting! the source error is: {0}")]
Broadcast(#[from] crossbeam_channel::SendError<Vec<Log>>),
Broadcast(#[from] crossbeam_channel::SendError<Broadcast>),

/// [`EnvironmentError::Conversion`] is thrown when a type fails to
/// convert into another (typically a type used in `revm` versus a type used
Expand Down
37 changes: 32 additions & 5 deletions arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub(crate) type OutcomeReceiver = Receiver<Result<Outcome, EnvironmentError>>;

/// Alias for the sender used in the [`EventBroadcaster`] that transmits
/// contract events via [`Log`].
pub(crate) type EventSender = Sender<Vec<Log>>;
pub(crate) type EventSender = Sender<Broadcast>;

/// Represents a sandboxed EVM environment.
///
Expand Down Expand Up @@ -487,7 +487,7 @@ impl Environment {
transaction_index: transaction_index.into(),
cumulative_gas_per_block,
};
event_broadcaster.broadcast(execution_result.logs())?;
event_broadcaster.broadcast(Some(execution_result.logs()), false)?;
outcome_sender
.send(Ok(Outcome::TransactionCompleted(
execution_result,
Expand Down Expand Up @@ -581,6 +581,7 @@ impl Environment {
outcome_sender
.send(Ok(Outcome::StopCompleted))
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
event_broadcaster.lock().unwrap().broadcast(None, true)?;
break;
}
}
Expand Down Expand Up @@ -647,6 +648,22 @@ pub(crate) struct Socket {
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
}

/// Enum representing the types of broadcasts that can be sent.
///
/// This enum is used to differentiate between different types of broadcasts
/// that can be sent from the environment to external entities.
///
/// Variants:
/// * `StopSignal`: Represents a signal to stop the event logger process.
/// * `Event(Vec<Log>)`: Represents a broadcast of a vector of Ethereum logs.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Broadcast {
/// Represents a signal to stop the event logger process.
StopSignal,
/// Represents a broadcast of a vector of Ethereum logs.
Event(Vec<Log>),
}

/// Responsible for broadcasting Ethereum logs to subscribers.
///
/// Maintains a list of senders to which logs are sent whenever they are
Expand All @@ -668,9 +685,19 @@ impl EventBroadcaster {

/// Loop through each sender and send `Vec<Log>` emitted from a transaction
/// downstream to any and all receivers
fn broadcast(&self, logs: Vec<Log>) -> Result<(), EnvironmentError> {
for sender in &self.0 {
sender.send(logs.clone())?;
fn broadcast(&self, logs: Option<Vec<Log>>, stop_signal: bool) -> Result<(), EnvironmentError> {
if stop_signal {
for sender in &self.0 {
sender.send(Broadcast::StopSignal)?;
}
return Ok(());
} else {
if logs.is_none() {
panic!("This shouldn't happen, but we should probably make this an error. Somehow we got told to broadcast `None` logs!");
}
for sender in &self.0 {
sender.send(Broadcast::Event(logs.clone().unwrap()))?;
}
}
Ok(())
}
Expand Down
29 changes: 20 additions & 9 deletions arbiter-core/src/middleware/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use ethers::{
use serde::{de::DeserializeOwned, Serialize};

use super::cast::revm_logs_to_ethers_logs;
use crate::environment::{EventBroadcaster, InstructionSender, OutcomeReceiver, OutcomeSender};
use crate::environment::{
Broadcast, EventBroadcaster, InstructionSender, OutcomeReceiver, OutcomeSender,
};

/// Represents a connection to the EVM contained in the corresponding
/// [`Environment`].
Expand Down Expand Up @@ -90,13 +92,22 @@ impl JsonRpcClient for Connection {
))?;
let mut logs = vec![];
let filtered_params = FilteredParams::new(Some(filter_receiver.filter.clone()));
if let Ok(received_logs) = filter_receiver.receiver.try_recv() {
let ethers_logs = revm_logs_to_ethers_logs(received_logs);
for log in ethers_logs {
if filtered_params.filter_address(&log)
&& filtered_params.filter_topics(&log)
{
logs.push(log);
if let Ok(broadcast) = filter_receiver.receiver.try_recv() {
match broadcast {
Broadcast::Event(received_logs) => {
let ethers_logs = revm_logs_to_ethers_logs(received_logs);
for log in ethers_logs {
if filtered_params.filter_address(&log)
&& filtered_params.filter_topics(&log)
{
logs.push(log);
}
}
}
Broadcast::StopSignal => {
return Err(ProviderError::CustomError(
"The `EventBroadcaster` has stopped!".to_string(),
))
}
}
}
Expand All @@ -121,5 +132,5 @@ pub(crate) struct FilterReceiver {

/// The receiver for the channel that receives logs from the broadcaster.
/// These are filtered upon reception.
pub(crate) receiver: crossbeam_channel::Receiver<Vec<revm::primitives::Log>>,
pub(crate) receiver: crossbeam_channel::Receiver<Broadcast>,
}
Loading

0 comments on commit dc7bb8c

Please sign in to comment.