Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio 1.0 対応 #37

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ travis-ci = {repository = "frugalos/libfrugalos"}

[dependencies]
bytecodec = { version = "0.4", features = ["bincode_codec"] }
fibers = "0.1"
fibers_rpc = "0.3"
futures = "0.1"
libc = "0.2"
pin-project = "1"
serde = "1"
serde_derive = "1"
trackable = { version = "0.2", features = ["serialize"] }
trackable = { version = "1", features = ["serialize"] }
futures03 = { package = "futures", version = "0.3" }


[patch.crates-io]
fibers = { git = "https://github.com/dwango/fibers-rs", branch = "feature/tokio-1.0" }
fibers_rpc = { git = "https://github.com/koba-e964/fibers_rpc", branch = "feature/tokio-1.0" }
trackable = { git = "https://github.com/koba-e964/trackable", branch = "feature/trackable-for-poll"}
116 changes: 56 additions & 60 deletions src/client/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//! 構成管理系API用のRPCクライアント。
use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle;
use fibers_rpc::Call as RpcCall;
use futures::{Async, Future, Poll};
use futures03::Future;
use pin_project::pin_project;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

use super::Response;
use crate::entity::bucket::{Bucket, BucketId, BucketSummary};
use crate::entity::device::{Device, DeviceId, DeviceSummary};
use crate::entity::server::{Server, ServerId, ServerSummary};
use crate::schema::config;
use crate::{Error, ErrorKind, Result};
use crate::{ErrorKind, Result};

/// RPCクライアント。
#[derive(Debug)]
Expand All @@ -27,90 +30,76 @@ impl Client {
}

/// `ListServersRpc`を実行する。
pub fn list_servers(&self) -> impl Future<Item = Vec<ServerSummary>, Error = Error> {
pub fn list_servers(&self) -> impl Future<Output = Result<Vec<ServerSummary>>> {
Call::<config::ListServersRpc, _>::new(self, ())
}

/// `GetServerRpc`を実行する。
pub fn get_server(
&self,
server: ServerId,
) -> impl Future<Item = Option<Server>, Error = Error> {
pub fn get_server(&self, server: ServerId) -> impl Future<Output = Result<Option<Server>>> {
Call::<config::GetServerRpc, _>::new(self, server)
}

/// `PutServerRpc`を実行する。
pub fn put_server(&self, server: Server) -> impl Future<Item = Server, Error = Error> {
pub fn put_server(&self, server: Server) -> impl Future<Output = Result<Server>> {
Call::<config::PutServerRpc, _>::new(self, server)
}

/// `DeleteServerRpc`を実行する。
pub fn delete_server(
&self,
server: ServerId,
) -> impl Future<Item = Option<Server>, Error = Error> {
pub fn delete_server(&self, server: ServerId) -> impl Future<Output = Result<Option<Server>>> {
Call::<config::DeleteServerRpc, _>::new(self, server)
}

/// `ListDevicesRpc`を実行する。
pub fn list_devices(&self) -> impl Future<Item = Vec<DeviceSummary>, Error = Error> {
pub fn list_devices(&self) -> impl Future<Output = Result<Vec<DeviceSummary>>> {
Call::<config::ListDevicesRpc, _>::new(self, ())
}

/// `GetDeviceRpc`を実行する。
pub fn get_device(
&self,
device: DeviceId,
) -> impl Future<Item = Option<Device>, Error = Error> {
pub fn get_device(&self, device: DeviceId) -> impl Future<Output = Result<Option<Device>>> {
Call::<config::GetDeviceRpc, _>::new(self, device)
}

/// `PutDeviceRpc`を実行する。
pub fn put_device(&self, device: Device) -> impl Future<Item = Device, Error = Error> {
pub fn put_device(&self, device: Device) -> impl Future<Output = Result<Device>> {
Call::<config::PutDeviceRpc, _>::new(self, device)
}

/// `DeleteDeviceRpc`を実行する。
pub fn delete_device(
&self,
device: DeviceId,
) -> impl Future<Item = Option<Device>, Error = Error> {
pub fn delete_device(&self, device: DeviceId) -> impl Future<Output = Result<Option<Device>>> {
Call::<config::DeleteDeviceRpc, _>::new(self, device)
}

/// `ListBucketsRpc`を実行する。
pub fn list_buckets(&self) -> impl Future<Item = Vec<BucketSummary>, Error = Error> {
pub fn list_buckets(&self) -> impl Future<Output = Result<Vec<BucketSummary>>> {
Call::<config::ListBucketsRpc, _>::new(self, ())
}

/// `GetBucketRpc`を実行する。
pub fn get_bucket(
&self,
bucket: BucketId,
) -> impl Future<Item = Option<Bucket>, Error = Error> {
pub fn get_bucket(&self, bucket: BucketId) -> impl Future<Output = Result<Option<Bucket>>> {
Call::<config::GetBucketRpc, _>::new(self, bucket)
}

/// `PutBucketRpc`を実行する。
pub fn put_bucket(&self, bucket: Bucket) -> impl Future<Item = Bucket, Error = Error> {
pub fn put_bucket(&self, bucket: Bucket) -> impl Future<Output = Result<Bucket>> {
Call::<config::PutBucketRpc, _>::new(self, bucket)
}

/// `DeleteBucketRpc`を実行する。
pub fn delete_bucket(
&self,
bucket: BucketId,
) -> impl Future<Item = Option<Bucket>, Error = Error> {
pub fn delete_bucket(&self, bucket: BucketId) -> impl Future<Output = Result<Option<Bucket>>> {
Call::<config::DeleteBucketRpc, _>::new(self, bucket)
}
}

#[pin_project]
#[derive(Debug)]
struct Call<T: RpcCall, U> {
contact_server: SocketAddr,
rpc_service: RpcServiceHandle,
#[pin]
leader: Response<SocketAddr>,
#[pin]
request: T::Req,
#[pin]
response: Option<Response<U>>,
is_retried: bool,
}
Expand Down Expand Up @@ -141,42 +130,49 @@ where
T::ReqEncoder: Default,
T::ResDecoder: Default,
{
type Item = U;
type Error = Error;
type Output = Result<U>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.response.poll() {
Err(e) => {
if *e.kind() == ErrorKind::NotLeader {
track_assert!(
!self.is_retried,
ErrorKind::Unavailable,
"Unstable cluster: RPC={}",
T::NAME
);

self.is_retried = true;
let future = config::GetLeaderRpc::client(&self.rpc_service)
.call(self.contact_server, ());
self.leader = Response(future);
self.response = None;
} else {
return Err(track!(e, T::NAME));
let mut this = self.as_mut().project();
match this.response.as_mut().as_pin_mut().map(|fut| fut.poll(cx)) {
None => {}
Some(result) => match result {
Poll::Ready(Err(e)) => {
if *e.kind() == ErrorKind::NotLeader {
if let Err(e) = (|| {
track_assert!(
!*this.is_retried,
ErrorKind::Unavailable,
"Unstable cluster: RPC={}",
T::NAME
);
Ok(())
})() {
return Poll::Ready(Err(e));
}

*this.is_retried = true;
let future = config::GetLeaderRpc::client(&this.rpc_service)
.call(*this.contact_server, ());
this.leader.set(Response(future));
this.response.set(None);
} else {
return Poll::Ready(Err(track!(e, T::NAME)));
}
}
}
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => {}
Ok(Async::Ready(Some(response))) => return Ok(Async::Ready(response)),
Poll::Pending => break,
Poll::Ready(Ok(response)) => return Poll::Ready(Ok(response)),
},
}

if let Async::Ready(leader) = track!(self.leader.poll())? {
let future = T::client(&self.rpc_service).call(leader, self.request.clone());
self.response = Some(Response(future));
if let Poll::Ready(leader) = track!(this.leader.poll(cx))? {
let future = T::client(&this.rpc_service).call(leader, this.request.clone());
this.response.set(Some(Response(future)));
} else {
break;
}
}
Ok(Async::NotReady)
Poll::Pending
}
}
36 changes: 18 additions & 18 deletions src/client/frugalos.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Frugalosの公開API用のRPCクライアント。
use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle;
use fibers_rpc::Call as RpcCall;
use futures::Future;
use futures03::Future;
use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::ops::Range;
Expand All @@ -19,7 +19,7 @@ use crate::expect::Expect;
use crate::multiplicity::MultiplicityConfig;
use crate::repair::RepairConfig;
use crate::schema::frugalos;
use crate::Error;
use crate::Result;

/// RPCクライアント。
#[derive(Debug)]
Expand All @@ -44,7 +44,7 @@ impl Client {
deadline: Duration,
expect: Expect,
consistency: ReadConsistency,
) -> impl Future<Item = Option<(ObjectVersion, Vec<u8>)>, Error = Error> {
) -> impl Future<Output = Result<Option<(ObjectVersion, Vec<u8>)>>> {
let request = frugalos::ObjectRequest {
bucket_id,
object_id,
Expand All @@ -61,7 +61,7 @@ impl Client {
bucket_id: BucketId,
segment: u16,
consistency: ReadConsistency,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Vec<ObjectSummary>>> {
let request = frugalos::ListObjectsRequest {
bucket_id,
segment,
Expand All @@ -76,7 +76,7 @@ impl Client {
bucket_id: BucketId,
prefix: ObjectPrefix,
deadline: Duration,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Vec<ObjectSummary>>> {
let request = frugalos::PrefixRequest {
bucket_id,
prefix,
Expand All @@ -92,7 +92,7 @@ impl Client {
&self,
bucket_id: BucketId,
segment: u16,
) -> impl Future<Item = Option<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectSummary>>> {
let request = frugalos::SegmentRequest { bucket_id, segment };
Response(
frugalos::GetLatestVersionRpc::client(&self.rpc_service).call(self.server, request),
Expand All @@ -107,7 +107,7 @@ impl Client {
deadline: Duration,
expect: Expect,
consistency: ReadConsistency,
) -> impl Future<Item = Option<FragmentsSummary>, Error = Error> {
) -> impl Future<Output = Result<Option<FragmentsSummary>>> {
let request = frugalos::CountFragmentsRequest {
bucket_id,
object_id,
Expand All @@ -127,7 +127,7 @@ impl Client {
expect: Expect,
consistency: ReadConsistency,
check_storage: bool,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectVersion>>> {
let request = frugalos::HeadObjectRequest {
bucket_id,
object_id,
Expand All @@ -148,7 +148,7 @@ impl Client {
deadline: Duration,
expect: Expect,
multiplicity_config: MultiplicityConfig,
) -> impl Future<Item = (ObjectVersion, bool), Error = Error> {
) -> impl Future<Output = Result<(ObjectVersion, bool)>> {
let request = frugalos::PutObjectRequest {
bucket_id,
object_id,
Expand All @@ -167,7 +167,7 @@ impl Client {
object_id: ObjectId,
deadline: Duration,
expect: Expect,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectVersion>>> {
let request = frugalos::ObjectRequest {
bucket_id,
object_id,
Expand All @@ -185,7 +185,7 @@ impl Client {
segment: u16,
object_version: ObjectVersion,
deadline: Duration,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectVersion>>> {
let request = frugalos::VersionRequest {
bucket_id,
segment,
Expand All @@ -205,7 +205,7 @@ impl Client {
segment: u16,
targets: Range<ObjectVersion>,
deadline: Duration,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Vec<ObjectSummary>>> {
let request = frugalos::RangeRequest {
bucket_id,
segment,
Expand All @@ -223,7 +223,7 @@ impl Client {
bucket_id: BucketId,
prefix: ObjectPrefix,
deadline: Duration,
) -> impl Future<Item = DeleteObjectsByPrefixSummary, Error = Error> {
) -> impl Future<Output = Result<DeleteObjectsByPrefixSummary>> {
let request = frugalos::PrefixRequest {
bucket_id,
prefix,
Expand All @@ -241,7 +241,7 @@ impl Client {
bucket_id: BucketId,
device_id: DeviceId,
object_ids: BTreeSet<ObjectId>,
) -> impl Future<Item = (), Error = Error> {
) -> impl Future<Output = Result<()>> {
Response(
frugalos::DeleteObjectSetFromDeviceRpc::client(&self.rpc_service).call(
self.server,
Expand All @@ -255,28 +255,28 @@ impl Client {
}

/// `StopRpc`を実行する。
pub fn stop(&self) -> impl Future<Item = (), Error = Error> {
pub fn stop(&self) -> impl Future<Output = Result<()>> {
Response(frugalos::StopRpc::client(&self.rpc_service).call(self.server, ()))
}

/// `TakeSnapshotRpc`を実行する。
pub fn take_snapshot(&self) -> impl Future<Item = (), Error = Error> {
pub fn take_snapshot(&self) -> impl Future<Output = Result<()>> {
Response(frugalos::TakeSnapshotRpc::client(&self.rpc_service).call(self.server, ()))
}

/// Executes `SetRepairConfigRpc`
pub fn set_repair_config(
&self,
repair_config: RepairConfig,
) -> impl Future<Item = (), Error = Error> {
) -> impl Future<Output = Result<()>> {
Response(
frugalos::SetRepairConfigRpc::client(&self.rpc_service)
.call(self.server, repair_config),
)
}

/// Executes `TruncateBucketRpc`
pub fn truncate_bucket(&self, bucket_seqno: u32) -> impl Future<Item = (), Error = Error> {
pub fn truncate_bucket(&self, bucket_seqno: u32) -> impl Future<Output = Result<()>> {
Response(
frugalos::TruncateBucketRpc::client(&self.rpc_service)
.call(self.server, frugalos::BucketSeqnoRequest { bucket_seqno }),
Expand Down
Loading