diff --git a/Cargo.toml b/Cargo.toml index d2eea51..fd7566f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,17 @@ travis-ci = {repository = "frugalos/libfrugalos"} [dependencies] bytecodec = { version = "0.4", features = ["bincode_codec"] } -fibers = "0.1" fibers_rpc = "0.3" futures = "0.1" libc = "0.2" +pin-project = "1" serde = "1" serde_derive = "1" -trackable = { version = "0.2", features = ["serialize"] } +trackable = { version = "1", features = ["serialize"] } +futures03 = { package = "futures", version = "0.3" } + + +[patch.crates-io] +fibers = { git = "https://github.com/dwango/fibers-rs", branch = "feature/tokio-1.0" } +fibers_rpc = { git = "https://github.com/koba-e964/fibers_rpc", branch = "feature/tokio-1.0" } +trackable = { git = "https://github.com/koba-e964/trackable", branch = "feature/trackable-for-poll"} diff --git a/src/client/config.rs b/src/client/config.rs index c8ac73e..012634e 100644 --- a/src/client/config.rs +++ b/src/client/config.rs @@ -1,15 +1,18 @@ //! 構成管理系API用のRPCクライアント。 use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle; use fibers_rpc::Call as RpcCall; -use futures::{Async, Future, Poll}; +use futures03::Future; +use pin_project::pin_project; use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; use super::Response; use crate::entity::bucket::{Bucket, BucketId, BucketSummary}; use crate::entity::device::{Device, DeviceId, DeviceSummary}; use crate::entity::server::{Server, ServerId, ServerSummary}; use crate::schema::config; -use crate::{Error, ErrorKind, Result}; +use crate::{ErrorKind, Result}; /// RPCクライアント。 #[derive(Debug)] @@ -27,90 +30,76 @@ impl Client { } /// `ListServersRpc`を実行する。 - pub fn list_servers(&self) -> impl Future, Error = Error> { + pub fn list_servers(&self) -> impl Future>> { Call::::new(self, ()) } /// `GetServerRpc`を実行する。 - pub fn get_server( - &self, - server: ServerId, - ) -> impl Future, Error = Error> { + pub fn get_server(&self, server: ServerId) -> impl Future>> { Call::::new(self, server) } /// `PutServerRpc`を実行する。 - pub fn put_server(&self, server: Server) -> impl Future { + pub fn put_server(&self, server: Server) -> impl Future> { Call::::new(self, server) } /// `DeleteServerRpc`を実行する。 - pub fn delete_server( - &self, - server: ServerId, - ) -> impl Future, Error = Error> { + pub fn delete_server(&self, server: ServerId) -> impl Future>> { Call::::new(self, server) } /// `ListDevicesRpc`を実行する。 - pub fn list_devices(&self) -> impl Future, Error = Error> { + pub fn list_devices(&self) -> impl Future>> { Call::::new(self, ()) } /// `GetDeviceRpc`を実行する。 - pub fn get_device( - &self, - device: DeviceId, - ) -> impl Future, Error = Error> { + pub fn get_device(&self, device: DeviceId) -> impl Future>> { Call::::new(self, device) } /// `PutDeviceRpc`を実行する。 - pub fn put_device(&self, device: Device) -> impl Future { + pub fn put_device(&self, device: Device) -> impl Future> { Call::::new(self, device) } /// `DeleteDeviceRpc`を実行する。 - pub fn delete_device( - &self, - device: DeviceId, - ) -> impl Future, Error = Error> { + pub fn delete_device(&self, device: DeviceId) -> impl Future>> { Call::::new(self, device) } /// `ListBucketsRpc`を実行する。 - pub fn list_buckets(&self) -> impl Future, Error = Error> { + pub fn list_buckets(&self) -> impl Future>> { Call::::new(self, ()) } /// `GetBucketRpc`を実行する。 - pub fn get_bucket( - &self, - bucket: BucketId, - ) -> impl Future, Error = Error> { + pub fn get_bucket(&self, bucket: BucketId) -> impl Future>> { Call::::new(self, bucket) } /// `PutBucketRpc`を実行する。 - pub fn put_bucket(&self, bucket: Bucket) -> impl Future { + pub fn put_bucket(&self, bucket: Bucket) -> impl Future> { Call::::new(self, bucket) } /// `DeleteBucketRpc`を実行する。 - pub fn delete_bucket( - &self, - bucket: BucketId, - ) -> impl Future, Error = Error> { + pub fn delete_bucket(&self, bucket: BucketId) -> impl Future>> { Call::::new(self, bucket) } } +#[pin_project] #[derive(Debug)] struct Call { contact_server: SocketAddr, rpc_service: RpcServiceHandle, + #[pin] leader: Response, + #[pin] request: T::Req, + #[pin] response: Option>, is_retried: bool, } @@ -141,42 +130,49 @@ where T::ReqEncoder: Default, T::ResDecoder: Default, { - type Item = U; - type Error = Error; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { - match self.response.poll() { - Err(e) => { - if *e.kind() == ErrorKind::NotLeader { - track_assert!( - !self.is_retried, - ErrorKind::Unavailable, - "Unstable cluster: RPC={}", - T::NAME - ); - - self.is_retried = true; - let future = config::GetLeaderRpc::client(&self.rpc_service) - .call(self.contact_server, ()); - self.leader = Response(future); - self.response = None; - } else { - return Err(track!(e, T::NAME)); + let mut this = self.as_mut().project(); + match this.response.as_mut().as_pin_mut().map(|fut| fut.poll(cx)) { + None => {} + Some(result) => match result { + Poll::Ready(Err(e)) => { + if *e.kind() == ErrorKind::NotLeader { + if let Err(e) = (|| { + track_assert!( + !*this.is_retried, + ErrorKind::Unavailable, + "Unstable cluster: RPC={}", + T::NAME + ); + Ok(()) + })() { + return Poll::Ready(Err(e)); + } + + *this.is_retried = true; + let future = config::GetLeaderRpc::client(&this.rpc_service) + .call(*this.contact_server, ()); + this.leader.set(Response(future)); + this.response.set(None); + } else { + return Poll::Ready(Err(track!(e, T::NAME))); + } } - } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => {} - Ok(Async::Ready(Some(response))) => return Ok(Async::Ready(response)), + Poll::Pending => break, + Poll::Ready(Ok(response)) => return Poll::Ready(Ok(response)), + }, } - if let Async::Ready(leader) = track!(self.leader.poll())? { - let future = T::client(&self.rpc_service).call(leader, self.request.clone()); - self.response = Some(Response(future)); + if let Poll::Ready(leader) = track!(this.leader.poll(cx))? { + let future = T::client(&this.rpc_service).call(leader, this.request.clone()); + this.response.set(Some(Response(future))); } else { break; } } - Ok(Async::NotReady) + Poll::Pending } } diff --git a/src/client/frugalos.rs b/src/client/frugalos.rs index 44e2a3f..6f4c350 100644 --- a/src/client/frugalos.rs +++ b/src/client/frugalos.rs @@ -1,7 +1,7 @@ //! Frugalosの公開API用のRPCクライアント。 use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle; use fibers_rpc::Call as RpcCall; -use futures::Future; +use futures03::Future; use std::collections::BTreeSet; use std::net::SocketAddr; use std::ops::Range; @@ -19,7 +19,7 @@ use crate::expect::Expect; use crate::multiplicity::MultiplicityConfig; use crate::repair::RepairConfig; use crate::schema::frugalos; -use crate::Error; +use crate::Result; /// RPCクライアント。 #[derive(Debug)] @@ -44,7 +44,7 @@ impl Client { deadline: Duration, expect: Expect, consistency: ReadConsistency, - ) -> impl Future)>, Error = Error> { + ) -> impl Future)>>> { let request = frugalos::ObjectRequest { bucket_id, object_id, @@ -61,7 +61,7 @@ impl Client { bucket_id: BucketId, segment: u16, consistency: ReadConsistency, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::ListObjectsRequest { bucket_id, segment, @@ -76,7 +76,7 @@ impl Client { bucket_id: BucketId, prefix: ObjectPrefix, deadline: Duration, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::PrefixRequest { bucket_id, prefix, @@ -92,7 +92,7 @@ impl Client { &self, bucket_id: BucketId, segment: u16, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::SegmentRequest { bucket_id, segment }; Response( frugalos::GetLatestVersionRpc::client(&self.rpc_service).call(self.server, request), @@ -107,7 +107,7 @@ impl Client { deadline: Duration, expect: Expect, consistency: ReadConsistency, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::CountFragmentsRequest { bucket_id, object_id, @@ -127,7 +127,7 @@ impl Client { expect: Expect, consistency: ReadConsistency, check_storage: bool, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::HeadObjectRequest { bucket_id, object_id, @@ -148,7 +148,7 @@ impl Client { deadline: Duration, expect: Expect, multiplicity_config: MultiplicityConfig, - ) -> impl Future { + ) -> impl Future> { let request = frugalos::PutObjectRequest { bucket_id, object_id, @@ -167,7 +167,7 @@ impl Client { object_id: ObjectId, deadline: Duration, expect: Expect, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::ObjectRequest { bucket_id, object_id, @@ -185,7 +185,7 @@ impl Client { segment: u16, object_version: ObjectVersion, deadline: Duration, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::VersionRequest { bucket_id, segment, @@ -205,7 +205,7 @@ impl Client { segment: u16, targets: Range, deadline: Duration, - ) -> impl Future, Error = Error> { + ) -> impl Future>> { let request = frugalos::RangeRequest { bucket_id, segment, @@ -223,7 +223,7 @@ impl Client { bucket_id: BucketId, prefix: ObjectPrefix, deadline: Duration, - ) -> impl Future { + ) -> impl Future> { let request = frugalos::PrefixRequest { bucket_id, prefix, @@ -241,7 +241,7 @@ impl Client { bucket_id: BucketId, device_id: DeviceId, object_ids: BTreeSet, - ) -> impl Future { + ) -> impl Future> { Response( frugalos::DeleteObjectSetFromDeviceRpc::client(&self.rpc_service).call( self.server, @@ -255,12 +255,12 @@ impl Client { } /// `StopRpc`を実行する。 - pub fn stop(&self) -> impl Future { + pub fn stop(&self) -> impl Future> { Response(frugalos::StopRpc::client(&self.rpc_service).call(self.server, ())) } /// `TakeSnapshotRpc`を実行する。 - pub fn take_snapshot(&self) -> impl Future { + pub fn take_snapshot(&self) -> impl Future> { Response(frugalos::TakeSnapshotRpc::client(&self.rpc_service).call(self.server, ())) } @@ -268,7 +268,7 @@ impl Client { pub fn set_repair_config( &self, repair_config: RepairConfig, - ) -> impl Future { + ) -> impl Future> { Response( frugalos::SetRepairConfigRpc::client(&self.rpc_service) .call(self.server, repair_config), @@ -276,7 +276,7 @@ impl Client { } /// Executes `TruncateBucketRpc` - pub fn truncate_bucket(&self, bucket_seqno: u32) -> impl Future { + pub fn truncate_bucket(&self, bucket_seqno: u32) -> impl Future> { Response( frugalos::TruncateBucketRpc::client(&self.rpc_service) .call(self.server, frugalos::BucketSeqnoRequest { bucket_seqno }), diff --git a/src/client/mds.rs b/src/client/mds.rs index 4fdaae4..7bcdb8f 100644 --- a/src/client/mds.rs +++ b/src/client/mds.rs @@ -1,8 +1,11 @@ //! MDS(metadata store)用のRPCクライアント。 use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle; use fibers_rpc::{Call as RpcCall, Cast as RpcCast}; -use futures::{Async, Future, Poll}; +use futures03::{future::OptionFuture, Future}; +use pin_project::pin_project; use std::ops::Range; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use super::Response; @@ -13,7 +16,7 @@ use crate::entity::object::{ }; use crate::expect::Expect; use crate::schema::mds; -use crate::{Error, ErrorKind, Result}; +use crate::{ErrorKind, Result}; /// RPCクライアント。 #[derive(Debug)] @@ -37,7 +40,7 @@ impl Client { pub fn list_objects( &self, consistency: ReadConsistency, - ) -> impl Future, Vec), Error = Error> { + ) -> impl Future, Vec)>> { let request = mds::ListObjectsRequest { node_id: self.node.1.clone(), consistency, @@ -49,7 +52,7 @@ impl Client { pub fn list_objects_by_prefix( &self, prefix: ObjectPrefix, - ) -> impl Future, Vec), Error = Error> { + ) -> impl Future, Vec)>> { let request = mds::PrefixRequest { node_id: self.node.1.clone(), prefix, @@ -60,7 +63,7 @@ impl Client { /// `GetLatestVersionRpc`を実行する。 pub fn latest_version( &self, - ) -> impl Future, Option), Error = Error> { + ) -> impl Future, Option)>> { Call::::new(self, self.node.1.clone()) } @@ -68,7 +71,7 @@ impl Client { pub fn object_count( &self, consistency: ReadConsistency, - ) -> impl Future, u64), Error = Error> { + ) -> impl Future, u64)>> { let request = mds::ObjectCountRequest { node_id: self.node.1.clone(), consistency, @@ -82,7 +85,7 @@ impl Client { id: ObjectId, expect: Expect, consistency: ReadConsistency, - ) -> impl Future, Option), Error = Error> { + ) -> impl Future, Option)>> { let request = mds::ObjectRequest { node_id: self.node.1.clone(), object_id: id, @@ -98,7 +101,7 @@ impl Client { id: ObjectId, expect: Expect, consistency: ReadConsistency, - ) -> impl Future, Option), Error = Error> { + ) -> impl Future, Option)>> { let request = mds::ObjectRequest { node_id: self.node.1.clone(), object_id: id, @@ -115,7 +118,7 @@ impl Client { metadata: Vec, expect: Expect, put_content_timeout: Duration, - ) -> impl Future, (ObjectVersion, Option)), Error = Error> + ) -> impl Future, (ObjectVersion, Option))>> { let request = mds::PutObjectRequest { node_id: self.node.1.clone(), @@ -132,7 +135,7 @@ impl Client { &self, id: ObjectId, expect: Expect, - ) -> impl Future, Option), Error = Error> { + ) -> impl Future, Option)>> { let request = mds::ObjectRequest { node_id: self.node.1.clone(), object_id: id, @@ -146,7 +149,7 @@ impl Client { pub fn delete_object_by_version( &self, version: ObjectVersion, - ) -> impl Future, Option), Error = Error> { + ) -> impl Future, Option)>> { let request = mds::VersionRequest { node_id: self.node.1.clone(), object_version: version, @@ -158,7 +161,7 @@ impl Client { pub fn delete_by_range( &self, targets: Range, - ) -> impl Future, Vec), Error = Error> { + ) -> impl Future, Vec)>> { let request = mds::RangeRequest { node_id: self.node.1.clone(), targets, @@ -170,8 +173,7 @@ impl Client { pub fn delete_by_prefix( &self, prefix: ObjectPrefix, - ) -> impl Future, DeleteObjectsByPrefixSummary), Error = Error> - { + ) -> impl Future, DeleteObjectsByPrefixSummary)>> { let request = mds::PrefixRequest { node_id: self.node.1.clone(), prefix, @@ -224,13 +226,17 @@ impl SetNodeId for mds::PutObjectRequest { } } +#[pin_project] #[derive(Debug)] struct Call { node: RemoteNodeId, rpc_service: RpcServiceHandle, - leader: Option>, + #[pin] + leader: OptionFuture>, + is_leader: bool, request: T::Req, - response: Option>, + #[pin] + response: OptionFuture>, retried_count: usize, } impl Call @@ -246,9 +252,10 @@ where Call { node: client.node.clone(), rpc_service: client.rpc_service.clone(), - leader: None, + leader: None.into(), + is_leader: false, request, - response: Some(Response(future)), + response: Some(Response(future)).into(), retried_count: 0, } } @@ -261,47 +268,57 @@ where T::ReqEncoder: Default, T::ResDecoder: Default, { - type Item = (Option, U); - type Error = Error; + type Output = Result<(Option, U)>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { - match self.response.poll() { - Err(e) => { + let mut this = self.as_mut().project(); + match this.response.as_mut().poll(cx) { + Poll::Ready(Some(Err(e))) => { if *e.kind() == ErrorKind::NotLeader { - track_assert!( - self.retried_count < 2, - ErrorKind::Unavailable, - "Unstable cluster: RPC={}", - T::NAME - ); + if let Err(e) = (|| { + track_assert!( + *this.retried_count < 2, + ErrorKind::Unavailable, + "Unstable cluster: RPC={}", + T::NAME + ); + Ok(()) + })() { + return Poll::Ready(Err(e)); + } - self.retried_count += 1; - let future = mds::GetLeaderRpc::client(&self.rpc_service) - .call(self.node.0, self.node.1.clone()); - self.leader = Some(Response(future)); - self.response = None; + *this.retried_count += 1; + let future = mds::GetLeaderRpc::client(&this.rpc_service) + .call(this.node.0, this.node.1.clone()); + this.leader.set(Some(Response(future)).into()); + *this.is_leader = true; + this.response.set(None.into()); } else { - return Err(track!(e, T::NAME)); + return Poll::Ready(Err(track!(e, T::NAME))); } } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => {} - Ok(Async::Ready(Some(response))) => { - let new_leader = self.leader.as_ref().map(|_| self.node.clone()); - return Ok(Async::Ready((new_leader, response))); + Poll::Pending => break, + Poll::Ready(None) => {} + Poll::Ready(Some(Ok(response))) => { + let new_leader = if *this.is_leader { + Some(this.node.clone()) + } else { + None + }; + return Poll::Ready(Ok((new_leader, response))); } } - if let Async::Ready(Some(leader)) = track!(self.leader.poll())? { - self.node = leader; - self.request.set_node_id(self.node.1.clone()); - let future = T::client(&self.rpc_service).call(self.node.0, self.request.clone()); - self.response = Some(Response(future)); + if let Poll::Ready(Some(leader)) = track!(this.leader.poll(cx))? { + *this.node = leader; + this.request.set_node_id(this.node.1.clone()); + let future = T::client(&this.rpc_service).call(this.node.0, this.request.clone()); + this.response.set(Some(Response(future)).into()); } else { break; } } - Ok(Async::NotReady) + Poll::Pending } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 07708eb..e87e057 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,32 +1,35 @@ //! RPCクライアント。 -use futures::{Async, Future, Poll}; +use futures03::Future; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; use trackable::error::ErrorKindExt; -use crate::{Error, ErrorKind, Result}; +use crate::{ErrorKind, Result}; pub mod config; pub mod frugalos; pub mod mds; +#[pin_project] #[derive(Debug)] -struct Response(fibers_rpc::client::Response>); +struct Response(#[pin] fibers_rpc::client::Response>); impl Future for Response { - type Item = T; - type Error = Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.0.poll() { - Err(e) => { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.project().0.poll(cx) { + Poll::Ready(Err(e)) => { let kind = match *e.kind() { fibers_rpc::ErrorKind::InvalidInput => ErrorKind::InvalidInput, fibers_rpc::ErrorKind::Unavailable => ErrorKind::Unavailable, fibers_rpc::ErrorKind::Timeout => ErrorKind::Timeout, fibers_rpc::ErrorKind::Other => ErrorKind::Other, }; - Err(track!(kind.takes_over(e)).into()) + Poll::Ready(Err(track!(kind.takes_over(e)).into())) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(result)) => track!(result.map(Async::Ready)), + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(result)) => track!(Poll::Ready(result)), } } } diff --git a/src/lib.rs b/src/lib.rs index bf1e955..0262050 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ //! [frugalos]: https://github.com/frugalos/frugalos #![warn(missing_docs)] extern crate bytecodec; -extern crate fibers; extern crate fibers_rpc; extern crate futures; extern crate libc;