Skip to content

Commit

Permalink
Make AsyncSession clonable
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed Sep 1, 2024
1 parent 5fff646 commit 10c8462
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions src/async_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::handle::UnsafeHandle;
use futures::{AsyncRead, AsyncWrite};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use windows_sys::Win32::{
Foundation::{FALSE, HANDLE, WAIT_ABANDONED_0, WAIT_EVENT, WAIT_OBJECT_0},
Expand All @@ -15,13 +15,14 @@ enum WaitingStopReason {
Ready,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
enum ReadState {
Waiting(Option<blocking::Task<WaitingStopReason>>),
Waiting(Option<Arc<Mutex<blocking::Task<WaitingStopReason>>>>),
Idle,
Closed,
}

#[derive(Clone)]
pub struct AsyncSession {
session: Arc<crate::session::Session>,
read_state: ReadState,
Expand Down Expand Up @@ -69,15 +70,28 @@ impl AsyncRead for AsyncSession {
Ok(None) => {
let read_event = self.session.get_read_wait_event()?;
let shutdown_event = self.session.shutdown_event.get_handle();
self.read_state = ReadState::Waiting(Some(blocking::unblock(move || {
let task = Arc::new(Mutex::new(blocking::unblock(move || {
Self::wait_for_read(read_event, shutdown_event)
})));
self.read_state = ReadState::Waiting(Some(task));
}
Err(err) => return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))),
},
ReadState::Waiting(task) => {
let mut task = task.take().unwrap();
self.read_state = match Pin::new(&mut task).poll(cx) {
let task = match task.take() {
Some(task) => task,
None => return Poll::Pending,
};
let task_clone = task.clone();
let mut task_guard = match task_clone.lock() {
Ok(guard) => guard,
Err(e) => {
self.read_state = ReadState::Waiting(Some(task));
use std::io::{Error, ErrorKind};
return Poll::Ready(Err(Error::new(ErrorKind::Other, format!("lock task fail: {:?}", e))));
}
};
self.read_state = match Pin::new(&mut *task_guard).poll(cx) {
Poll::Ready(WaitingStopReason::Shutdown) => ReadState::Closed,
Poll::Ready(WaitingStopReason::Ready) => ReadState::Idle,
Poll::Pending => ReadState::Waiting(Some(task)),
Expand Down

0 comments on commit 10c8462

Please sign in to comment.