Skip to content

Commit

Permalink
Properly request repaints from background threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
KoffeinFlummi committed Nov 21, 2023
1 parent bd3c5b2 commit 5d40843
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 63 deletions.
3 changes: 0 additions & 3 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ pub trait DataSource {
/// Send an authenticated uplink command
fn send_command(&mut self, cmd: Command) -> Result<(), SendError<UplinkMessage>>;

/// The minimum fps required for the data source. Occasional redraws
/// are necessary if data source is live.
fn minimum_fps(&self) -> Option<u64>;
fn end(&self) -> Option<Instant>;

fn status_bar_ui(&mut self, _ui: &mut egui::Ui) {
Expand Down
14 changes: 5 additions & 9 deletions src/data_source/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl LogFileDataSource {
}

impl DataSource for LogFileDataSource {
fn update(&mut self, _ctx: &egui::Context) {
fn update(&mut self, ctx: &egui::Context) {
if let Some(file) = self.file.as_mut() {
if let Err(e) = file.read_to_end(&mut self.buffer) {
error!("Failed to read log file: {:?}", e);
Expand Down Expand Up @@ -130,6 +130,10 @@ impl DataSource for LogFileDataSource {
for (t, msg) in self.messages.drain(..pointer) {
self.vehicle_states.push((t, msg.into()));
}

if self.replay {
ctx.request_repaint_after(Duration::from_millis(16));
}
}

fn vehicle_states<'a>(&'a self) -> Iter<'_, (Instant, VehicleState)> {
Expand Down Expand Up @@ -157,14 +161,6 @@ impl DataSource for LogFileDataSource {
Ok(())
}

fn minimum_fps(&self) -> Option<u64> {
if self.replay && self.last_time.map(|t| t > Instant::now()).unwrap_or(true) {
Some(60)
} else {
None
}
}

fn end(&self) -> Option<Instant> {
if self.replay {
let last = self.last_time.unwrap_or(Instant::now());
Expand Down
27 changes: 15 additions & 12 deletions src/data_source/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum SerialStatus {
/// messages.
#[cfg(not(target_arch = "wasm32"))] // TODO: serial ports on wasm?
pub fn downlink_port(
ctx: Option<egui::Context>,
downlink_tx: &mut Sender<DownlinkMessage>,
uplink_rx: &mut Receiver<UplinkMessage>,
port: String,
Expand Down Expand Up @@ -113,6 +114,9 @@ pub fn downlink_port(
// If successful, send msg through channel.
downlink_tx.send(msg)?;
last_message = now;
if let Some(ctx) = &ctx {
ctx.request_repaint();
}
}

now = Instant::now();
Expand Down Expand Up @@ -141,6 +145,7 @@ pub fn find_serial_port() -> Option<String> {
/// Run in a separate thread using `spawn_downlink_monitor`.
#[cfg(not(target_arch = "wasm32"))] // TODO: serial ports on wasm?
pub fn downlink_monitor(
ctx: Option<egui::Context>,
serial_status_tx: Sender<(SerialStatus, Option<String>)>,
mut downlink_tx: Sender<DownlinkMessage>,
mut uplink_rx: Receiver<UplinkMessage>,
Expand All @@ -150,9 +155,12 @@ pub fn downlink_monitor(
// If a device was connected, start reading messages.
if let Some(p) = find_serial_port() {
serial_status_tx.send((SerialStatus::Connected, Some(p.clone())))?;
if let Err(e) = downlink_port(&mut downlink_tx, &mut uplink_rx, p.clone(), send_heartbeats) {
if let Err(e) = downlink_port(ctx.clone(), &mut downlink_tx, &mut uplink_rx, p.clone(), send_heartbeats) {
eprintln!("{:?}", e);
serial_status_tx.send((SerialStatus::Error, Some(p)))?;
if let Some(ctx) = &ctx {
ctx.request_repaint();
}
}
}

Expand All @@ -163,13 +171,14 @@ pub fn downlink_monitor(
/// Spawns `downlink_monitor` in a new thread.
#[cfg(not(target_arch = "wasm32"))] // TODO: serial ports on wasm?
pub fn spawn_downlink_monitor(
ctx: Option<egui::Context>,
serial_status_tx: Sender<(SerialStatus, Option<String>)>,
downlink_tx: Sender<DownlinkMessage>,
uplink_rx: Receiver<UplinkMessage>,
send_heartbeats: bool,
) -> JoinHandle<()> {
std::thread::spawn(move || {
downlink_monitor(serial_status_tx, downlink_tx, uplink_rx, send_heartbeats).unwrap_or_default()
downlink_monitor(ctx, serial_status_tx, downlink_tx, uplink_rx, send_heartbeats).unwrap_or_default()
})
}

Expand All @@ -194,13 +203,15 @@ pub struct SerialDataSource {

impl SerialDataSource {
/// Create a new serial port data source.
pub fn new(lora_settings: LoRaSettings) -> Self {
pub fn new(ctx: &egui::Context, lora_settings: LoRaSettings) -> Self {
let (downlink_tx, downlink_rx) = std::sync::mpsc::channel::<DownlinkMessage>();
let (uplink_tx, uplink_rx) = std::sync::mpsc::channel::<UplinkMessage>();
let (serial_status_tx, serial_status_rx) = std::sync::mpsc::channel::<(SerialStatus, Option<String>)>();

let ctx = ctx.clone();

#[cfg(not(target_arch = "wasm32"))] // TODO: can't spawn threads on wasm
spawn_downlink_monitor(serial_status_tx, downlink_tx, uplink_rx, true);
spawn_downlink_monitor(Some(ctx), serial_status_tx, downlink_tx, uplink_rx, true);

let telemetry_log_path = Self::new_telemetry_log_path();
let telemetry_log_file = File::create(&telemetry_log_path);
Expand Down Expand Up @@ -355,14 +366,6 @@ impl DataSource for SerialDataSource {
self.send(UplinkMessage::Command(cmd))
}

fn minimum_fps(&self) -> Option<u64> {
if self.last_time.map(|t| t.elapsed() > Duration::from_secs_f64(10.0)).unwrap_or(false) {
Some(1)
} else {
Some(u64::max(30, u64::min(self.message_receipt_times.len() as u64, 60)))
}
}

fn end(&self) -> Option<Instant> {
let postroll = Duration::from_secs_f64(10.0);

Expand Down
4 changes: 0 additions & 4 deletions src/data_source/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ impl DataSource for SimulationDataSource {
Ok(())
}

fn minimum_fps(&self) -> Option<u64> {
None
}

fn end(&self) -> Option<Instant> {
self.vehicle_states.last().map(|(t, _vs)| *t)
}
Expand Down
22 changes: 9 additions & 13 deletions src/gui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ pub struct Sam {
impl Sam {
/// Initialize the application, including the state objects for widgets
/// such as plots and maps.
pub fn init(ctx: &egui::Context, settings: AppSettings, data_source: Box<dyn DataSource>) -> Self {
pub fn init(ctx: &egui::Context, settings: AppSettings, data_source: Option<Box<dyn DataSource>>) -> Self {
let data_source = data_source.unwrap_or(Box::new(SerialDataSource::new(ctx, settings.lora.clone())));

let mut fonts = egui::FontDefinitions::default();
let roboto = egui::FontData::from_static(include_bytes!("../assets/fonts/RobotoMono-Regular.ttf"));
let lato = egui::FontData::from_static(include_bytes!("../assets/fonts/Overpass-Light.ttf"));
Expand Down Expand Up @@ -70,8 +72,8 @@ impl Sam {
}

/// Closes the currently opened data source
fn close_data_source(&mut self) {
self.data_source = Box::new(SerialDataSource::new(self.settings.lora.clone()));
fn close_data_source(&mut self, ctx: &egui::Context) {
self.data_source = Box::new(SerialDataSource::new(ctx, self.settings.lora.clone()));
}

pub fn ui(&mut self, ctx: &egui::Context) {
Expand Down Expand Up @@ -187,13 +189,6 @@ impl Sam {
}
}
});

// If we have live data coming in, we need to tell egui to repaint.
// If we don't, we shouldn't.
if let Some(fps) = self.data_source.minimum_fps().or(self.archive_window.open.then(|| 60)) {
let t = std::time::Duration::from_millis(1000 / fps);
ctx.request_repaint_after(t);
}
}
}

Expand All @@ -210,9 +205,10 @@ impl eframe::App for Sam {
pub fn main(log_file: Option<PathBuf>) -> Result<(), Box<dyn std::error::Error>> {
let app_settings = AppSettings::load().ok().unwrap_or(AppSettings::default());

let data_source: Box<dyn DataSource> = match log_file {
Some(path) => Box::new(LogFileDataSource::new(path)?),
None => Box::new(SerialDataSource::new(app_settings.lora.clone())),
let data_source: Option<Box<dyn DataSource>> = if let Some(path) = log_file {
Some(Box::new(LogFileDataSource::new(path)?))
} else {
None
};

#[cfg(feature = "profiling")]
Expand Down
18 changes: 11 additions & 7 deletions src/gui/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ fn load_tile_bytes(tile: &Tile, access_token: &String) -> Result<Vec<u8>, Box<dy
}

#[cfg(not(target_arch = "wasm32"))]
fn load_tile_image(tile: &Tile, access_token: &String) -> Result<ColorImage, Box<dyn std::error::Error>> {
fn load_tile_image(ctx: egui::Context, tile: &Tile, access_token: &String) -> Result<ColorImage, Box<dyn std::error::Error>> {
let bytes = load_tile_bytes(tile, access_token)?;
let image = egui_extras::image::load_image_bytes(&bytes)?;
ctx.request_repaint();
Ok(image)
}

Expand All @@ -90,9 +91,10 @@ async fn load_tile_bytes(tile: &Tile, access_token: &String) -> Result<Vec<u8>,
}

#[cfg(target_arch = "wasm32")]
async fn load_tile_image(tile: &Tile, access_token: &String) -> Result<ColorImage, Box<dyn std::error::Error>> {
async fn load_tile_image(ctx: egui::Context, tile: &Tile, access_token: &String) -> Result<ColorImage, Box<dyn std::error::Error>> {
let bytes = load_tile_bytes(tile, access_token).await?;
let image = egui_extras::image::load_image_bytes(&bytes)?;
ctx.request_repaint();
Ok(image)
}

Expand Down Expand Up @@ -305,11 +307,12 @@ impl MapState {
}

#[cfg(not(target_arch = "wasm32"))]
fn load_tile(&self, tile: Tile) {
fn load_tile(&self, ctx: &egui::Context, tile: Tile) {
let ctx = ctx.clone();
let cache = self.tile_cache.clone();
let at = self.access_token.clone();
std::thread::spawn(move || {
match load_tile_image(&tile, &at) {
match load_tile_image(ctx, &tile, &at) {
Ok(image) => cache.lock().insert(tile, image),
Err(e) => log::error!("{:?}", e),
}
Expand All @@ -319,11 +322,12 @@ impl MapState {
}

#[cfg(target_arch = "wasm32")]
fn load_tile(&self, tile: Tile) {
fn load_tile(&self, ctx: &egui::Context, tile: Tile) {
let ctx = ctx.clone();
let cache = self.tile_cache.clone();
let access_token = self.access_token.clone();
wasm_bindgen_futures::spawn_local(async move {
match load_tile_image(&tile, &access_token).await {
match load_tile_image(ctx, &tile, &access_token).await {
Ok(image) => cache.lock().insert(tile, image),
Err(e) => log::error!("{:?}", e),
}
Expand Down Expand Up @@ -351,7 +355,7 @@ impl MapState {
let iter = bbox.tiles_for_zoom(zoom as u8).filter_map(|tile| {
let result = self.tile_cache.lock().cached_image(ctx, &tile);
if result.is_none() && self.tile_cache.lock().loading.insert(tile_id(&tile)) {
self.load_tile(tile);
self.load_tile(ctx, tile);
}
result
});
Expand Down
2 changes: 1 addition & 1 deletion src/gui/panels/menu_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl MenuBarPanel {
ui.allocate_ui_with_layout(ui.available_size(), Layout::right_to_left(Align::Center), |ui| {
if data_source_is_log || data_source_is_sim {
if ui.button("❌").clicked() {
sam.close_data_source();
sam.close_data_source(ctx);
}
}
});
Expand Down
21 changes: 13 additions & 8 deletions src/gui/windows/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct ArchiveWindow {
}

impl ArchiveWindow {
async fn load_log(url: &str, progress_sender: Sender<ArchiveLoadProgress>) {
async fn load_log(ctx: egui::Context, url: &str, progress_sender: Sender<ArchiveLoadProgress>) {
let start = Instant::now();
let response = match reqwest::Client::new().get(url).send().await {
Ok(res) => res,
Expand All @@ -63,6 +63,7 @@ impl ArchiveWindow {

let total_size = response.content_length().unwrap_or(0);
progress_sender.send(ArchiveLoadProgress::Progress((0, total_size))).unwrap();
ctx.request_repaint();

let mut cursor = std::io::Cursor::new(Vec::with_capacity(total_size as usize));
let (mut progress, mut last_progress) = (0, 0);
Expand All @@ -75,10 +76,12 @@ impl ArchiveWindow {
if progress == total_size || progress > last_progress + 256 * 1024 {
let _ = progress_sender.send(ArchiveLoadProgress::Progress((progress, total_size)));
last_progress = progress;
ctx.request_repaint();
}
}
Err(e) => {
progress_sender.send(ArchiveLoadProgress::Error(e)).unwrap();
ctx.request_repaint();
return;
}
}
Expand All @@ -88,23 +91,26 @@ impl ArchiveWindow {
let duration = start.elapsed().as_secs_f32();
let mib = (total_size as f32) / 1024.0 / 1024.0;
info!("Downloaded {}MiB in {:.1}ms ({}MiB/s)", mib, duration * 1000.0, mib / duration);
ctx.request_repaint();
}

#[cfg(not(target_arch = "wasm32"))]
fn open_log(&mut self, url: &'static str) {
fn open_log(&mut self, ctx: &egui::Context, url: &'static str) {
let ctx = ctx.clone();
let (sender, receiver) = std::sync::mpsc::channel();
self.progress_receiver = Some(receiver);
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread().enable_io().enable_time().build().unwrap();
rt.block_on(Self::load_log(url, sender));
rt.block_on(Self::load_log(ctx, url, sender));
});
}

#[cfg(target_arch = "wasm32")]
fn open_log(&mut self, url: &'static str) {
fn open_log(&mut self, ctx: &egui::Context, url: &'static str) {
let ctx = ctx.clone();
let (sender, receiver) = std::sync::mpsc::channel();
self.progress_receiver = Some(receiver);
wasm_bindgen_futures::spawn_local(Self::load_log(url, sender));
wasm_bindgen_futures::spawn_local(Self::load_log(ctx, url, sender));
}

pub fn show_if_open(&mut self, ctx: &egui::Context) -> Option<LogFileDataSource> {
Expand All @@ -114,7 +120,6 @@ impl ArchiveWindow {
self.progress = Some(progress);
}
Ok(ArchiveLoadProgress::Complete(bytes)) => {
// TODO
self.open = false;
self.progress_receiver = None;
self.progress = None;
Expand Down Expand Up @@ -154,11 +159,11 @@ impl ArchiveWindow {
ui.label(*title);
ui.with_layout(Layout::right_to_left(Align::Center), |ui| {
if ui.add_enabled(flash.is_some(), Button::new("🖴 Flash")).clicked() {
self.open_log(flash.unwrap());
self.open_log(ctx, flash.unwrap());
}

if ui.add_enabled(telem.is_some(), Button::new("📡 Telemetry")).clicked() {
self.open_log(telem.unwrap());
self.open_log(ctx, telem.unwrap());
}
});
});
Expand Down
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,11 @@ impl WebHandle {
/// Call this once from JavaScript to start your app.
#[wasm_bindgen]
pub async fn start(&self, canvas_id: &str) -> Result<(), wasm_bindgen::JsValue> {
use crate::data_source::SerialDataSource;
let data_source = Box::new(SerialDataSource::new(mithril::settings::LoRaSettings::default()));

self.runner
.start(
canvas_id,
eframe::WebOptions::default(),
Box::new(|cc| Box::new(Sam::init(&cc.egui_ctx, AppSettings::default(), data_source))),
Box::new(|cc| Box::new(Sam::init(&cc.egui_ctx, AppSettings::default(), None))),
)
.await
}
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn logcat(verbose: bool) -> Result<(), Box<dyn std::error::Error>> {
let (downlink_tx, downlink_rx) = channel::<DownlinkMessage>();
let (_uplink_tx, uplink_rx) = channel::<UplinkMessage>();
let (serial_status_tx, serial_status_rx) = channel::<(SerialStatus, Option<String>)>();
spawn_downlink_monitor(serial_status_tx, downlink_tx, uplink_rx, true);
spawn_downlink_monitor(None, serial_status_tx, downlink_tx, uplink_rx, true);

loop {
for (status, port) in serial_status_rx.try_iter() {
Expand Down Expand Up @@ -182,7 +182,7 @@ fn dump_flash(path: PathBuf, force: bool, raw: bool, start: Option<u32>) -> Resu
let (downlink_tx, downlink_rx) = channel::<DownlinkMessage>();
let (uplink_tx, uplink_rx) = channel::<UplinkMessage>();
let (serial_status_tx, _serial_status_rx) = channel::<(SerialStatus, Option<String>)>();
spawn_downlink_monitor(serial_status_tx, downlink_tx, uplink_rx, false);
spawn_downlink_monitor(None, serial_status_tx, downlink_tx, uplink_rx, false);

let flash_size = FLASH_SIZE;

Expand Down

0 comments on commit 5d40843

Please sign in to comment.