Skip to content

Commit

Permalink
cargo c compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
koba-e964 committed Apr 2, 2021
1 parent 868811f commit ccac9c1
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 136 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ fibers = "0.1"
fibers_rpc = "0.3"
futures = "0.1"
libc = "0.2"
pin-project = "1"
serde = "1"
serde_derive = "1"
trackable = { version = "0.2", features = ["serialize"] }
trackable = { version = "1", features = ["serialize"] }
futures03 = { package = "futures", version = "0.3" }


[patch.crates-io]
fibers = { git = "https://github.com/dwango/fibers-rs", branch = "feature/tokio-1.0" }
fibers_rpc = { git = "https://github.com/koba-e964/fibers_rpc", branch = "feature/tokio-1.0" }
trackable = { git = "https://github.com/koba-e964/trackable", branch = "feature/trackable-for-poll"}
116 changes: 56 additions & 60 deletions src/client/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//! 構成管理系API用のRPCクライアント。
use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle;
use fibers_rpc::Call as RpcCall;
use futures::{Async, Future, Poll};
use futures03::Future;
use pin_project::pin_project;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

use super::Response;
use crate::entity::bucket::{Bucket, BucketId, BucketSummary};
use crate::entity::device::{Device, DeviceId, DeviceSummary};
use crate::entity::server::{Server, ServerId, ServerSummary};
use crate::schema::config;
use crate::{Error, ErrorKind, Result};
use crate::{ErrorKind, Result};

/// RPCクライアント。
#[derive(Debug)]
Expand All @@ -27,90 +30,76 @@ impl Client {
}

/// `ListServersRpc`を実行する。
pub fn list_servers(&self) -> impl Future<Item = Vec<ServerSummary>, Error = Error> {
pub fn list_servers(&self) -> impl Future<Output = Result<Vec<ServerSummary>>> {
Call::<config::ListServersRpc, _>::new(self, ())
}

/// `GetServerRpc`を実行する。
pub fn get_server(
&self,
server: ServerId,
) -> impl Future<Item = Option<Server>, Error = Error> {
pub fn get_server(&self, server: ServerId) -> impl Future<Output = Result<Option<Server>>> {
Call::<config::GetServerRpc, _>::new(self, server)
}

/// `PutServerRpc`を実行する。
pub fn put_server(&self, server: Server) -> impl Future<Item = Server, Error = Error> {
pub fn put_server(&self, server: Server) -> impl Future<Output = Result<Server>> {
Call::<config::PutServerRpc, _>::new(self, server)
}

/// `DeleteServerRpc`を実行する。
pub fn delete_server(
&self,
server: ServerId,
) -> impl Future<Item = Option<Server>, Error = Error> {
pub fn delete_server(&self, server: ServerId) -> impl Future<Output = Result<Option<Server>>> {
Call::<config::DeleteServerRpc, _>::new(self, server)
}

/// `ListDevicesRpc`を実行する。
pub fn list_devices(&self) -> impl Future<Item = Vec<DeviceSummary>, Error = Error> {
pub fn list_devices(&self) -> impl Future<Output = Result<Vec<DeviceSummary>>> {
Call::<config::ListDevicesRpc, _>::new(self, ())
}

/// `GetDeviceRpc`を実行する。
pub fn get_device(
&self,
device: DeviceId,
) -> impl Future<Item = Option<Device>, Error = Error> {
pub fn get_device(&self, device: DeviceId) -> impl Future<Output = Result<Option<Device>>> {
Call::<config::GetDeviceRpc, _>::new(self, device)
}

/// `PutDeviceRpc`を実行する。
pub fn put_device(&self, device: Device) -> impl Future<Item = Device, Error = Error> {
pub fn put_device(&self, device: Device) -> impl Future<Output = Result<Device>> {
Call::<config::PutDeviceRpc, _>::new(self, device)
}

/// `DeleteDeviceRpc`を実行する。
pub fn delete_device(
&self,
device: DeviceId,
) -> impl Future<Item = Option<Device>, Error = Error> {
pub fn delete_device(&self, device: DeviceId) -> impl Future<Output = Result<Option<Device>>> {
Call::<config::DeleteDeviceRpc, _>::new(self, device)
}

/// `ListBucketsRpc`を実行する。
pub fn list_buckets(&self) -> impl Future<Item = Vec<BucketSummary>, Error = Error> {
pub fn list_buckets(&self) -> impl Future<Output = Result<Vec<BucketSummary>>> {
Call::<config::ListBucketsRpc, _>::new(self, ())
}

/// `GetBucketRpc`を実行する。
pub fn get_bucket(
&self,
bucket: BucketId,
) -> impl Future<Item = Option<Bucket>, Error = Error> {
pub fn get_bucket(&self, bucket: BucketId) -> impl Future<Output = Result<Option<Bucket>>> {
Call::<config::GetBucketRpc, _>::new(self, bucket)
}

/// `PutBucketRpc`を実行する。
pub fn put_bucket(&self, bucket: Bucket) -> impl Future<Item = Bucket, Error = Error> {
pub fn put_bucket(&self, bucket: Bucket) -> impl Future<Output = Result<Bucket>> {
Call::<config::PutBucketRpc, _>::new(self, bucket)
}

/// `DeleteBucketRpc`を実行する。
pub fn delete_bucket(
&self,
bucket: BucketId,
) -> impl Future<Item = Option<Bucket>, Error = Error> {
pub fn delete_bucket(&self, bucket: BucketId) -> impl Future<Output = Result<Option<Bucket>>> {
Call::<config::DeleteBucketRpc, _>::new(self, bucket)
}
}

#[pin_project]
#[derive(Debug)]
struct Call<T: RpcCall, U> {
contact_server: SocketAddr,
rpc_service: RpcServiceHandle,
#[pin]
leader: Response<SocketAddr>,
#[pin]
request: T::Req,
#[pin]
response: Option<Response<U>>,
is_retried: bool,
}
Expand Down Expand Up @@ -141,42 +130,49 @@ where
T::ReqEncoder: Default,
T::ResDecoder: Default,
{
type Item = U;
type Error = Error;
type Output = Result<U>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.response.poll() {
Err(e) => {
if *e.kind() == ErrorKind::NotLeader {
track_assert!(
!self.is_retried,
ErrorKind::Unavailable,
"Unstable cluster: RPC={}",
T::NAME
);

self.is_retried = true;
let future = config::GetLeaderRpc::client(&self.rpc_service)
.call(self.contact_server, ());
self.leader = Response(future);
self.response = None;
} else {
return Err(track!(e, T::NAME));
let mut this = self.as_mut().project();
match this.response.as_mut().as_pin_mut().map(|fut| fut.poll(cx)) {
None => {}
Some(result) => match result {
Poll::Ready(Err(e)) => {
if *e.kind() == ErrorKind::NotLeader {
if let Err(e) = (|| {
track_assert!(
!*this.is_retried,
ErrorKind::Unavailable,
"Unstable cluster: RPC={}",
T::NAME
);
Ok(())
})() {
return Poll::Ready(Err(e));
}

*this.is_retried = true;
let future = config::GetLeaderRpc::client(&this.rpc_service)
.call(*this.contact_server, ());
this.leader.set(Response(future));
this.response.set(None);
} else {
return Poll::Ready(Err(track!(e, T::NAME)));
}
}
}
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => {}
Ok(Async::Ready(Some(response))) => return Ok(Async::Ready(response)),
Poll::Pending => break,
Poll::Ready(Ok(response)) => return Poll::Ready(Ok(response)),
},
}

if let Async::Ready(leader) = track!(self.leader.poll())? {
let future = T::client(&self.rpc_service).call(leader, self.request.clone());
self.response = Some(Response(future));
if let Poll::Ready(leader) = track!(this.leader.poll(cx))? {
let future = T::client(&this.rpc_service).call(leader, this.request.clone());
this.response.set(Some(Response(future)));
} else {
break;
}
}
Ok(Async::NotReady)
Poll::Pending
}
}
36 changes: 18 additions & 18 deletions src/client/frugalos.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Frugalosの公開API用のRPCクライアント。
use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle;
use fibers_rpc::Call as RpcCall;
use futures::Future;
use futures03::Future;
use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::ops::Range;
Expand All @@ -19,7 +19,7 @@ use crate::expect::Expect;
use crate::multiplicity::MultiplicityConfig;
use crate::repair::RepairConfig;
use crate::schema::frugalos;
use crate::Error;
use crate::Result;

/// RPCクライアント。
#[derive(Debug)]
Expand All @@ -44,7 +44,7 @@ impl Client {
deadline: Duration,
expect: Expect,
consistency: ReadConsistency,
) -> impl Future<Item = Option<(ObjectVersion, Vec<u8>)>, Error = Error> {
) -> impl Future<Output = Result<Option<(ObjectVersion, Vec<u8>)>>> {
let request = frugalos::ObjectRequest {
bucket_id,
object_id,
Expand All @@ -61,7 +61,7 @@ impl Client {
bucket_id: BucketId,
segment: u16,
consistency: ReadConsistency,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Vec<ObjectSummary>>> {
let request = frugalos::ListObjectsRequest {
bucket_id,
segment,
Expand All @@ -76,7 +76,7 @@ impl Client {
bucket_id: BucketId,
prefix: ObjectPrefix,
deadline: Duration,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Vec<ObjectSummary>>> {
let request = frugalos::PrefixRequest {
bucket_id,
prefix,
Expand All @@ -92,7 +92,7 @@ impl Client {
&self,
bucket_id: BucketId,
segment: u16,
) -> impl Future<Item = Option<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectSummary>>> {
let request = frugalos::SegmentRequest { bucket_id, segment };
Response(
frugalos::GetLatestVersionRpc::client(&self.rpc_service).call(self.server, request),
Expand All @@ -107,7 +107,7 @@ impl Client {
deadline: Duration,
expect: Expect,
consistency: ReadConsistency,
) -> impl Future<Item = Option<FragmentsSummary>, Error = Error> {
) -> impl Future<Output = Result<Option<FragmentsSummary>>> {
let request = frugalos::CountFragmentsRequest {
bucket_id,
object_id,
Expand All @@ -127,7 +127,7 @@ impl Client {
expect: Expect,
consistency: ReadConsistency,
check_storage: bool,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectVersion>>> {
let request = frugalos::HeadObjectRequest {
bucket_id,
object_id,
Expand All @@ -148,7 +148,7 @@ impl Client {
deadline: Duration,
expect: Expect,
multiplicity_config: MultiplicityConfig,
) -> impl Future<Item = (ObjectVersion, bool), Error = Error> {
) -> impl Future<Output = Result<(ObjectVersion, bool)>> {
let request = frugalos::PutObjectRequest {
bucket_id,
object_id,
Expand All @@ -167,7 +167,7 @@ impl Client {
object_id: ObjectId,
deadline: Duration,
expect: Expect,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectVersion>>> {
let request = frugalos::ObjectRequest {
bucket_id,
object_id,
Expand All @@ -185,7 +185,7 @@ impl Client {
segment: u16,
object_version: ObjectVersion,
deadline: Duration,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
) -> impl Future<Output = Result<Option<ObjectVersion>>> {
let request = frugalos::VersionRequest {
bucket_id,
segment,
Expand All @@ -205,7 +205,7 @@ impl Client {
segment: u16,
targets: Range<ObjectVersion>,
deadline: Duration,
) -> impl Future<Item = Vec<ObjectSummary>, Error = Error> {
) -> impl Future<Output = Result<Vec<ObjectSummary>>> {
let request = frugalos::RangeRequest {
bucket_id,
segment,
Expand All @@ -223,7 +223,7 @@ impl Client {
bucket_id: BucketId,
prefix: ObjectPrefix,
deadline: Duration,
) -> impl Future<Item = DeleteObjectsByPrefixSummary, Error = Error> {
) -> impl Future<Output = Result<DeleteObjectsByPrefixSummary>> {
let request = frugalos::PrefixRequest {
bucket_id,
prefix,
Expand All @@ -241,7 +241,7 @@ impl Client {
bucket_id: BucketId,
device_id: DeviceId,
object_ids: BTreeSet<ObjectId>,
) -> impl Future<Item = (), Error = Error> {
) -> impl Future<Output = Result<()>> {
Response(
frugalos::DeleteObjectSetFromDeviceRpc::client(&self.rpc_service).call(
self.server,
Expand All @@ -255,28 +255,28 @@ impl Client {
}

/// `StopRpc`を実行する。
pub fn stop(&self) -> impl Future<Item = (), Error = Error> {
pub fn stop(&self) -> impl Future<Output = Result<()>> {
Response(frugalos::StopRpc::client(&self.rpc_service).call(self.server, ()))
}

/// `TakeSnapshotRpc`を実行する。
pub fn take_snapshot(&self) -> impl Future<Item = (), Error = Error> {
pub fn take_snapshot(&self) -> impl Future<Output = Result<()>> {
Response(frugalos::TakeSnapshotRpc::client(&self.rpc_service).call(self.server, ()))
}

/// Executes `SetRepairConfigRpc`
pub fn set_repair_config(
&self,
repair_config: RepairConfig,
) -> impl Future<Item = (), Error = Error> {
) -> impl Future<Output = Result<()>> {
Response(
frugalos::SetRepairConfigRpc::client(&self.rpc_service)
.call(self.server, repair_config),
)
}

/// Executes `TruncateBucketRpc`
pub fn truncate_bucket(&self, bucket_seqno: u32) -> impl Future<Item = (), Error = Error> {
pub fn truncate_bucket(&self, bucket_seqno: u32) -> impl Future<Output = Result<()>> {
Response(
frugalos::TruncateBucketRpc::client(&self.rpc_service)
.call(self.server, frugalos::BucketSeqnoRequest { bucket_seqno }),
Expand Down
Loading

0 comments on commit ccac9c1

Please sign in to comment.