Skip to content

Commit

Permalink
updating quic plugin to v2.1.9
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Jan 24, 2025
1 parent 753d264 commit 8615148
Show file tree
Hide file tree
Showing 16 changed files with 910 additions and 242 deletions.
979 changes: 802 additions & 177 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ license = "AGPL"
edition = "2021"

[workspace.dependencies]
solana-sdk = "~2.0.19"
solana-program = "~2.0.19"
solana-transaction-status = "~2.0.19"
solana-logger = "~2.0.19"
solana-rpc-client = "~2.0.19"
solana-rpc-client-api = "~2.0.19"
solana-account-decoder = "~2.0.19"
agave-geyser-plugin-interface = "=2.0.19"
solana-sdk = "~2.1.9"
solana-program = "~2.1.9"
solana-transaction-status = "~2.1.9"
solana-logger = "~2.1.9"
solana-rpc-client = "~2.1.9"
solana-rpc-client-api = "~2.1.9"
solana-account-decoder = "~2.1.9"
agave-geyser-plugin-interface = "=2.1.9"

itertools = "0.10.5"
serde = "1.0.201"
Expand Down
26 changes: 17 additions & 9 deletions block-builder/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,23 @@ pub fn build_blocks(
}
}
}
ChannelMessage::Slot(slot, _parent_slot, commitment) => {
if commitment.is_finalized() {
// dispactch partially build blocks if not already dispatched
dispatch_partial_block(
&mut partially_build_blocks,
slot,
&output,
compression_type,
);
ChannelMessage::Slot(slot, _parent_slot, slot_status) => {
match slot_status {
quic_geyser_common::types::block_meta::SlotStatus::Finalized => {
// dispactch partially build blocks if not already dispatched
dispatch_partial_block(
&mut partially_build_blocks,
slot,
&output,
compression_type,
);
}
quic_geyser_common::types::block_meta::SlotStatus::Dead => {
partially_build_blocks.remove(&slot);
}
_ => {
// do nothing
}
}
}
ChannelMessage::BlockMeta(meta) => {
Expand Down
7 changes: 3 additions & 4 deletions block-builder/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ use itertools::Itertools;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
types::{
block_meta::BlockMeta,
block_meta::{BlockMeta, SlotStatus},
slot_identifier::SlotIdentifier,
transaction::{Transaction, TransactionMeta},
},
};
use solana_sdk::{
account::Account,
commitment_config::CommitmentConfig,
hash::Hash,
message::{
v0::{LoadedAddresses, Message as SolanaMessage},
Expand Down Expand Up @@ -884,7 +883,7 @@ fn test_block_creation_incomplete_slot() {
.send(ChannelMessage::Transaction(Box::new(tx2.clone())))
.unwrap();
channelmsg_sx
.send(ChannelMessage::Slot(5, 4, CommitmentConfig::processed()))
.send(ChannelMessage::Slot(5, 4, SlotStatus::Processed))
.unwrap();

let tx3 = Transaction {
Expand Down Expand Up @@ -926,7 +925,7 @@ fn test_block_creation_incomplete_slot() {
sleep(Duration::from_millis(1));
assert_eq!(msg_rx.try_recv(), Err(TryRecvError::Empty));
channelmsg_sx
.send(ChannelMessage::Slot(5, 4, CommitmentConfig::finalized()))
.send(ChannelMessage::Slot(5, 4, SlotStatus::Finalized))
.unwrap();
sleep(Duration::from_millis(1));

Expand Down
13 changes: 6 additions & 7 deletions blocking_client/src/quiche_client_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,10 @@ mod tests {
filters::Filter,
message::Message,
net::parse_host_port,
types::{block_meta::SlotMeta, connections_parameters::ConnectionParameters},
types::{
block_meta::{SlotMeta, SlotStatus},
connections_parameters::ConnectionParameters,
},
};

use super::client_loop;
Expand All @@ -384,11 +387,7 @@ mod tests {
let port = 10900;
let maximum_concurrent_streams = 100;

let message_1 = ChannelMessage::Slot(
3,
2,
solana_sdk::commitment_config::CommitmentConfig::confirmed(),
);
let message_1 = ChannelMessage::Slot(3, 2, SlotStatus::Confirmed);
let message_2 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
Expand Down Expand Up @@ -514,7 +513,7 @@ mod tests {
Message::SlotMsg(SlotMeta {
slot: 3,
parent: 2,
commitment_config: solana_sdk::commitment_config::CommitmentConfig::confirmed()
slot_status: SlotStatus::Confirmed
})
);

Expand Down
7 changes: 3 additions & 4 deletions common/src/channel_message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey,
};
use solana_sdk::{account::Account, clock::Slot, pubkey::Pubkey};

use crate::types::block_meta::SlotStatus;
use crate::types::{block::Block, block_meta::BlockMeta, transaction::Transaction};

#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -14,7 +13,7 @@ pub struct AccountData {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelMessage {
Account(AccountData, Slot, bool),
Slot(u64, u64, CommitmentConfig),
Slot(u64, u64, SlotStatus),
BlockMeta(BlockMeta),
Transaction(Box<Transaction>),
Block(Block),
Expand Down
16 changes: 12 additions & 4 deletions common/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Message {
mod tests {
use itertools::Itertools;
use rand::{rngs::ThreadRng, Rng};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use solana_sdk::pubkey::Pubkey;

use crate::types::{
account::Account,
Expand All @@ -76,7 +76,15 @@ mod tests {
0 => Message::SlotMsg(SlotMeta {
slot: rng.gen(),
parent: rng.gen(),
commitment_config: CommitmentConfig::processed(),
slot_status: match rng.gen::<u8>() % 6 {
0 => crate::types::block_meta::SlotStatus::Processed,
1 => crate::types::block_meta::SlotStatus::Confirmed,
2 => crate::types::block_meta::SlotStatus::Finalized,
3 => crate::types::block_meta::SlotStatus::FirstShredReceived,
4 => crate::types::block_meta::SlotStatus::LastShredReceived,
5 => crate::types::block_meta::SlotStatus::Dead,
_ => unreachable!(),
},
}),
1 => {
let data_length = rng.gen_range(10..128);
Expand Down Expand Up @@ -116,7 +124,7 @@ mod tests {
let message = Message::SlotMsg(SlotMeta {
slot: 73282,
parent: 8392983,
commitment_config: CommitmentConfig::finalized(),
slot_status: crate::types::block_meta::SlotStatus::Finalized,
});
let binary = message.to_binary_stream();
assert_eq!(binary.len(), 32);
Expand All @@ -127,7 +135,7 @@ mod tests {
let message = Message::SlotMsg(SlotMeta {
slot: 73282,
parent: 8392983,
commitment_config: CommitmentConfig::finalized(),
slot_status: crate::types::block_meta::SlotStatus::Finalized,
});
let binary = message.to_binary_stream();
let (msg_2, _) = Message::from_binary_stream(&binary).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions common/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ mod tests {
use circular_buffer::CircularBuffer;
use itertools::Itertools;
use rand::{rngs::ThreadRng, Rng};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use solana_sdk::pubkey::Pubkey;

use crate::{
message::Message,
types::{
account::Account,
block_meta::{BlockMeta, SlotMeta},
block_meta::{BlockMeta, SlotMeta, SlotStatus},
slot_identifier::SlotIdentifier,
},
};
Expand All @@ -101,7 +101,7 @@ mod tests {
0 => Message::SlotMsg(SlotMeta {
slot: rng.gen(),
parent: rng.gen(),
commitment_config: CommitmentConfig::processed(),
slot_status: SlotStatus::Processed,
}),
1 => {
let data_length = rng.gen_range(10..128);
Expand Down
14 changes: 12 additions & 2 deletions common/src/types/block_meta.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
use serde::{Deserialize, Serialize};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::Reward;

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum SlotStatus {
Processed = 0,
Confirmed = 1,
Finalized = 2,
FirstShredReceived = 3,
LastShredReceived = 4,
Dead = 5,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
pub struct SlotMeta {
pub slot: u64,
pub parent: u64,
pub commitment_config: CommitmentConfig,
pub slot_status: SlotStatus,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand Down
9 changes: 6 additions & 3 deletions examples/tester-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
use clap::Parser;
use cli::Args;
use quic_geyser_client::non_blocking::client::Client;
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
use quic_geyser_common::{
filters::Filter,
types::{block_meta::SlotStatus, connections_parameters::ConnectionParameters},
};
use solana_rpc_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;

Expand Down Expand Up @@ -177,7 +180,7 @@ fn blocking(args: Args, client_stats: ClientStats, break_thread: Arc<AtomicBool>
client_stats
.slot_notifications
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if slot.commitment_config == CommitmentConfig::processed() {
if slot.slot_status == SlotStatus::FirstShredReceived {
client_stats
.slot_slot
.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -298,7 +301,7 @@ async fn non_blocking(args: Args, client_stats: ClientStats, break_thread: Arc<A
client_stats
.slot_notifications
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if slot.commitment_config == CommitmentConfig::processed() {
if slot.slot_status == SlotStatus::FirstShredReceived {
client_stats
.slot_slot
.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
Expand Down
22 changes: 18 additions & 4 deletions examples/tester-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use quic_geyser_common::{
};
use quic_geyser_server::quic_server::QuicServer;
use rand::{thread_rng, Rng};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use solana_sdk::{account::Account, pubkey::Pubkey};
use std::time::Duration;

pub mod cli;
Expand Down Expand Up @@ -65,21 +65,35 @@ pub fn main() {
.send_message(ChannelMessage::Slot(
slot,
slot - 1,
CommitmentConfig::processed(),
quic_geyser_common::types::block_meta::SlotStatus::FirstShredReceived,
))
.unwrap();
quic_server
.send_message(ChannelMessage::Slot(
slot,
slot - 1,
quic_geyser_common::types::block_meta::SlotStatus::LastShredReceived,
))
.unwrap();
quic_server
.send_message(ChannelMessage::Slot(
slot,
slot - 1,
quic_geyser_common::types::block_meta::SlotStatus::Processed,
))
.unwrap();
quic_server
.send_message(ChannelMessage::Slot(
slot - 1,
slot - 2,
CommitmentConfig::confirmed(),
quic_geyser_common::types::block_meta::SlotStatus::Confirmed,
))
.unwrap();
quic_server
.send_message(ChannelMessage::Slot(
slot - 2,
slot - 3,
CommitmentConfig::finalized(),
quic_geyser_common::types::block_meta::SlotStatus::Finalized,
))
.unwrap();
for i in 1..args.accounts_per_second + 1 {
Expand Down
25 changes: 14 additions & 11 deletions plugin/src/quic_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
plugin_error::QuicGeyserError,
types::{
block_meta::BlockMeta,
block_meta::{BlockMeta, SlotStatus as QuicSlotStatus},
slot_identifier::SlotIdentifier,
transaction::{Transaction, TransactionMeta},
},
};
use quic_geyser_server::quic_server::QuicServer;
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentConfig, message::v0::Message,
pubkey::Pubkey,
};
use solana_sdk::{account::Account, clock::Slot, message::v0::Message, pubkey::Pubkey};

#[derive(Debug, Default)]
pub struct QuicGeyserPlugin {
Expand Down Expand Up @@ -130,18 +127,24 @@ impl GeyserPlugin for QuicGeyserPlugin {
&self,
slot: Slot,
parent: Option<u64>,
status: SlotStatus,
status: &SlotStatus,
) -> PluginResult<()> {
// Todo
let Some(quic_server) = &self.quic_server else {
return Ok(());
};
let commitment_level = match status {
SlotStatus::Processed => CommitmentConfig::processed(),
SlotStatus::Rooted => CommitmentConfig::finalized(),
SlotStatus::Confirmed => CommitmentConfig::confirmed(),
let quic_slot_status = match status {
SlotStatus::Processed => QuicSlotStatus::Processed,
SlotStatus::Rooted => QuicSlotStatus::Finalized,
SlotStatus::Confirmed => QuicSlotStatus::Confirmed,
SlotStatus::FirstShredReceived => QuicSlotStatus::FirstShredReceived,
SlotStatus::Completed => QuicSlotStatus::LastShredReceived,
SlotStatus::CreatedBank => {
return Ok(());
}
SlotStatus::Dead(_) => QuicSlotStatus::Dead,
};
let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level);
let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), quic_slot_status);

if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(slot_message.clone());
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub fn main() -> anyhow::Result<()> {
quic_geyser_common::message::Message::SlotMsg(slot_message) => ChannelMessage::Slot(
slot_message.slot,
slot_message.parent,
slot_message.commitment_config,
slot_message.slot_status,
),
quic_geyser_common::message::Message::BlockMetaMsg(block_meta_message) => {
ChannelMessage::BlockMeta(block_meta_message)
Expand Down
4 changes: 2 additions & 2 deletions run_clippy_fmt.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cargo +1.76.0 fmt --all
cargo +1.76.0 clippy --locked --workspace --all-targets -- -D warnings
cargo +1.81.0 fmt --all
cargo +1.81.0 clippy --locked --workspace --all-targets -- -D warnings
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.78.0"
channel = "1.81.0"
Loading

0 comments on commit 8615148

Please sign in to comment.