diff --git a/CHANGES.md b/CHANGES.md index 549feae..4dff9af 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [3.0.0] - 2024-05-28 + +* Switch to individual ntex_* crates + +* Use ntex-service 3.0 + ## [2.0.2] - 2024-05-15 * Remove non_exhaustive marker diff --git a/Cargo.toml b/Cargo.toml index cf6ff04..9f7ee4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "2.0.2" +version = "3.0.0" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" @@ -15,8 +15,13 @@ edition = "2021" features = ["ntex/tokio"] [dependencies] -ntex = "1.2" -ntex-io = "1.2" +ntex-io = "2" +ntex-net = "2" +ntex-util = "2" +ntex-service = "3" +ntex-bytes = "0.1" +ntex-codec = "0.6" +ntex-router = "0.5" bitflags = "2" log = "0.4" pin-project-lite = "0.2" @@ -27,7 +32,8 @@ thiserror = "1" [dev-dependencies] rand = "0.8" env_logger = "0.11" -ntex-tls = "1.1" +ntex-tls = "2" +ntex-macros = "0.1" openssl = "0.10" test-case = "3.2" -ntex = { version = "1", features = ["tokio", "openssl"] } +ntex = { version = "2", features = ["tokio", "openssl"] } diff --git a/examples/mqtt-ws-client.rs b/examples/mqtt-ws-client.rs index 6cc74d7..3e99b44 100644 --- a/examples/mqtt-ws-client.rs +++ b/examples/mqtt-ws-client.rs @@ -1,7 +1,7 @@ //! Mqtt-over-WS client use std::{io, rc::Rc}; -use ntex::connect::{openssl::Connector, Connect, ConnectError}; +use ntex::connect::{openssl::SslConnector, Connect, ConnectError}; use ntex::time::{sleep, Millis, Seconds}; use ntex::{util::Bytes, ws}; use ntex_mqtt::v3; @@ -21,9 +21,12 @@ async fn main() -> std::io::Result<()> { // we need custom connector that would open ws connection and enable ws transport let ws_client = Rc::new( - ws::WsClient::with_connector("https://127.0.0.1:8883", Connector::new(builder.build())) - .finish() - .unwrap(), + ws::WsClient::with_connector( + "https://127.0.0.1:8883", + SslConnector::new(builder.build()), + ) + .finish() + .unwrap(), ); // connect to server diff --git a/examples/openssl-client.rs b/examples/openssl-client.rs index 788711f..ae04ff1 100644 --- a/examples/openssl-client.rs +++ b/examples/openssl-client.rs @@ -1,4 +1,4 @@ -use ntex::connect::openssl::Connector; +use ntex::connect::openssl::SslConnector; use ntex::time::{sleep, Millis, Seconds}; use ntex_mqtt::v5; use openssl::ssl; @@ -35,7 +35,7 @@ async fn main() -> std::io::Result<()> { // connect to server let client = v5::client::MqttConnector::new("127.0.0.1:8883") - .connector(Connector::new(builder.build())) + .connector(SslConnector::new(builder.build())) .client_id("user") .keep_alive(Seconds::ONE) .max_packet_size(30) diff --git a/src/error.rs b/src/error.rs index 69076a5..87abef5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,6 @@ use std::{fmt, io, num::NonZeroU16}; -use ntex::util::Either; +use ntex_util::future::Either; use crate::v5::codec::DisconnectReasonCode; @@ -195,7 +195,7 @@ pub enum ClientError { Disconnected(Option), /// Connect error #[error("Connect error: {}", _0)] - Connect(#[from] ntex::connect::ConnectError), + Connect(#[from] ntex_net::connect::ConnectError), } impl From for ClientError { diff --git a/src/inflight.rs b/src/inflight.rs index 00807f9..bbecfcd 100644 --- a/src/inflight.rs +++ b/src/inflight.rs @@ -1,8 +1,8 @@ //! Service that limits number of in-flight async requests. -use std::{cell::Cell, rc::Rc, task::Context, task::Poll}; +use std::{cell::Cell, future::poll_fn, rc::Rc, task::Context, task::Poll}; -use ntex::service::{Service, ServiceCtx}; -use ntex::task::LocalWaker; +use ntex_service::{Service, ServiceCtx}; +use ntex_util::task::LocalWaker; pub(crate) trait SizedRequest { fn size(&self) -> u32; @@ -27,21 +27,13 @@ where type Response = T::Response; type Error = T::Error; - ntex::forward_poll_shutdown!(service); + ntex_service::forward_shutdown!(service); #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let p1 = self.service.poll_ready(cx)?.is_pending(); - let p2 = !self.count.available(cx); - if p2 { - log::trace!("InFlight limit exceeded"); - } - - if p1 || p2 { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + ctx.ready(&self.service).await?; + self.count.available().await; + Ok(()) } #[inline] @@ -77,8 +69,8 @@ impl Counter { CounterGuard::new(size, self.0.clone()) } - fn available(&self, cx: &Context<'_>) -> bool { - self.0.available(cx) + async fn available(&self) { + poll_fn(|cx| if self.0.available(cx) { Poll::Ready(()) } else { Poll::Pending }).await } } @@ -134,7 +126,8 @@ impl CounterInner { mod tests { use std::{future::poll_fn, time::Duration}; - use ntex::{service::Pipeline, time::sleep, util::lazy}; + use ntex_service::Pipeline; + use ntex_util::{future::lazy, task::LocalWaker, time::sleep}; use super::*; @@ -157,63 +150,69 @@ mod tests { } } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_inflight() { let wait_time = Duration::from_millis(50); - let srv = Pipeline::new(InFlightService::new(1, 0, SleepService(wait_time))); + let srv = Pipeline::new(InFlightService::new(1, 0, SleepService(wait_time))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = srv2.call(()).await; }); - ntex::time::sleep(Duration::from_millis(25)).await; + ntex_util::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - ntex::time::sleep(Duration::from_millis(50)).await; + ntex_util::time::sleep(Duration::from_millis(50)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready()); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_inflight2() { let wait_time = Duration::from_millis(50); - let srv = Pipeline::new(InFlightService::new(0, 10, SleepService(wait_time))); + let srv = Pipeline::new(InFlightService::new(0, 10, SleepService(wait_time))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = srv2.call(()).await; }); - ntex::time::sleep(Duration::from_millis(25)).await; + ntex_util::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - ntex::time::sleep(Duration::from_millis(100)).await; + ntex_util::time::sleep(Duration::from_millis(100)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); } struct Srv2 { dur: Duration, cnt: Cell, + waker: LocalWaker, } impl Service<()> for Srv2 { type Response = (); type Error = (); - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { - if !self.cnt.get() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> { + poll_fn(|cx| { + if !self.cnt.get() { + Poll::Ready(Ok(())) + } else { + self.waker.register(cx.waker()); + Poll::Pending + } + }) + .await } async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { let fut = sleep(self.dur); self.cnt.set(true); + self.waker.wake(); let _ = fut.await; self.cnt.set(false); @@ -224,27 +223,28 @@ mod tests { /// InflightService::poll_ready() must always register waker, /// otherwise it can lose wake up if inner service's poll_ready /// does not wakes dispatcher. - #[ntex::test] + #[ntex_macros::rt_test] async fn test_inflight3() { let wait_time = Duration::from_millis(50); let srv = Pipeline::new(InFlightService::new( 1, 10, - Srv2 { dur: wait_time, cnt: Cell::new(false) }, - )); + Srv2 { dur: wait_time, cnt: Cell::new(false), waker: LocalWaker::new() }, + )) + .bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv2 = srv.clone(); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = srv2.call(()).await; }); - ntex::time::sleep(Duration::from_millis(25)).await; + ntex_util::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); let srv2 = srv.clone(); - let (tx, rx) = ntex::channel::oneshot::channel(); - ntex::rt::spawn(async move { + let (tx, rx) = ntex_util::channel::oneshot::channel(); + ntex_util::spawn(async move { let _ = poll_fn(|cx| srv2.poll_ready(cx)).await; let _ = tx.send(()); }); diff --git a/src/io.rs b/src/io.rs index 79ec567..53ee33c 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,13 +1,14 @@ //! Framed transport dispatcher -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc}; -use ntex::codec::{Decoder, Encoder}; -use ntex::io::{ +use ntex_bytes::Pool; +use ntex_codec::{Decoder, Encoder}; +use ntex_io::{ Decoded, DispatchItem, DispatcherConfig, IoBoxed, IoRef, IoStatusUpdate, RecvError, }; -use ntex::service::{IntoService, Pipeline, PipelineCall, Service}; -use ntex::{time::Seconds, util::ready, util::Pool}; +use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service}; +use ntex_util::time::Seconds; type Response = ::Item; @@ -41,7 +42,7 @@ struct DispatcherInner>, U: Encoder + Decoder + 'stat io: IoBoxed, flags: Flags, codec: U, - service: Pipeline, + service: PipelineBinding>, st: IoDispatcherState, state: Rc>>, config: DispatcherConfig, @@ -136,7 +137,7 @@ where } else { Flags::empty() }, - service: Pipeline::new(service.into_service()), + service: Pipeline::new(service.into_service()).bind(), config: config.clone(), st: IoDispatcherState::Processing, response: None, @@ -393,7 +394,7 @@ where { fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem) { let mut state = self.state.borrow_mut(); - let mut fut = self.service.call_static(item); + let mut fut = self.service.call(item); // optimize first call if self.response.is_none() { @@ -429,7 +430,7 @@ where let state = self.state.clone(); #[allow(clippy::let_underscore_future)] - let _ = ntex::rt::spawn(async move { + let _ = ntex_util::spawn(async move { let item = fut.await; state.borrow_mut().handle_result(item, response_idx, &st, &codec, true); }); @@ -584,10 +585,12 @@ where mod tests { use std::{cell::Cell, io, sync::Arc, sync::Mutex}; - use ntex::channel::condition::Condition; - use ntex::time::{sleep, Millis}; - use ntex::util::{Bytes, BytesMut}; - use ntex::{codec::BytesCodec, io as nio, service::ServiceCtx, testing::Io}; + use ntex_bytes::{Bytes, BytesMut}; + use ntex_codec::BytesCodec; + use ntex_io::{self as nio, testing::IoTest as Io}; + use ntex_service::ServiceCtx; + use ntex_util::channel::condition::Condition; + use ntex_util::time::{sleep, Millis}; use rand::Rng; use super::*; @@ -632,7 +635,7 @@ mod tests { state, config, keepalive_timeout, - service: Pipeline::new(service.into_service()), + service: Pipeline::new(service.into_service()).bind(), response: None, response_idx: 0, io: IoBoxed::from(io), @@ -652,7 +655,7 @@ mod tests { } } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_basic() { let (client, server) = Io::create(); client.remote_buffer_cap(1024); @@ -661,7 +664,7 @@ mod tests { let (disp, _) = Dispatcher::new_debug( nio::Io::new(server), BytesCodec, - ntex::service::fn_service(|msg: DispatchItem| async move { + ntex_service::fn_service(|msg: DispatchItem| async move { sleep(Millis(50)).await; if let DispatchItem::Item(msg) = msg { Ok::<_, ()>(Some(msg.freeze())) @@ -670,7 +673,7 @@ mod tests { } }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); sleep(Millis(25)).await; @@ -686,7 +689,7 @@ mod tests { assert!(client.is_server_dropped()); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_ordering() { let (client, server) = Io::create(); client.remote_buffer_cap(1024); @@ -698,7 +701,7 @@ mod tests { let (disp, _) = Dispatcher::new_debug( nio::Io::new(server), BytesCodec, - ntex::service::fn_service(move |msg: DispatchItem| { + ntex_service::fn_service(move |msg: DispatchItem| { let waiter = waiter.clone(); async move { waiter.await; @@ -710,7 +713,7 @@ mod tests { } }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); sleep(Millis(50)).await; @@ -728,7 +731,7 @@ mod tests { assert!(client.is_server_dropped()); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_sink() { let (client, server) = Io::create(); client.remote_buffer_cap(1024); @@ -737,7 +740,7 @@ mod tests { let (disp, io) = Dispatcher::new_debug( nio::Io::new(server), BytesCodec, - ntex::service::fn_service(|msg: DispatchItem| async move { + ntex_service::fn_service(|msg: DispatchItem| async move { if let DispatchItem::Item(msg) = msg { Ok::<_, ()>(Some(msg.freeze())) } else { @@ -745,7 +748,7 @@ mod tests { } }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); @@ -761,7 +764,7 @@ mod tests { assert!(client.is_server_dropped()); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_err_in_service() { let (client, server) = Io::create(); client.remote_buffer_cap(0); @@ -770,11 +773,11 @@ mod tests { let (disp, io) = Dispatcher::new_debug( nio::Io::new(server), BytesCodec, - ntex::service::fn_service(|_: DispatchItem| async move { + ntex_service::fn_service(|_: DispatchItem| async move { Err::, _>(()) }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); @@ -793,7 +796,7 @@ mod tests { assert!(client.is_server_dropped()); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_err_in_service_ready() { let (client, server) = Io::create(); client.remote_buffer_cap(0); @@ -807,9 +810,9 @@ mod tests { type Response = Option>; type Error = (); - fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), ()> { self.0.set(self.0.get() + 1); - Poll::Ready(Err(())) + Err(()) } async fn call( @@ -824,7 +827,7 @@ mod tests { let (disp, io) = Dispatcher::new_debug(nio::Io::new(server), BytesCodec, Srv(counter.clone())); io.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec).unwrap(); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); @@ -844,7 +847,7 @@ mod tests { assert_eq!(counter.get(), 1); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_write_backpressure() { let (client, server) = Io::create(); // do not allow to write to socket @@ -857,7 +860,7 @@ mod tests { let (disp, io) = Dispatcher::new_debug( nio::Io::new(server), BytesCodec, - ntex::service::fn_service(move |msg: DispatchItem| { + ntex_service::fn_service(move |msg: DispatchItem| { let data = data2.clone(); async move { match msg { @@ -886,7 +889,7 @@ mod tests { pool.set_read_params(8 * 1024, 1024); pool.set_write_params(16 * 1024, 1024); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); @@ -913,7 +916,7 @@ mod tests { assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_shutdown_dispatcher_waker() { let (client, server) = Io::create(); let server = nio::Io::new(server); @@ -926,7 +929,7 @@ mod tests { let (disp, _io) = Dispatcher::new_debug( server, BytesCodec, - ntex::service::fn_service(move |item: DispatchItem| { + ntex_service::fn_service(move |item: DispatchItem| { let first = flag2.get(); flag2.set(false); let io = server_ref.clone(); @@ -946,8 +949,8 @@ mod tests { } }), ); - let (tx, rx) = ntex::channel::oneshot::channel(); - ntex::rt::spawn(async move { + let (tx, rx) = ntex_util::channel::oneshot::channel(); + ntex_util::spawn(async move { let _ = disp.await; let _ = tx.send(()); }); @@ -970,7 +973,7 @@ mod tests { } /// Update keep-alive timer after receiving frame - #[ntex::test] + #[ntex_macros::rt_test] async fn test_keepalive() { let (client, server) = Io::create(); client.remote_buffer_cap(1024); @@ -981,7 +984,7 @@ mod tests { let (disp, _) = Dispatcher::new_debug( nio::Io::new(server), BytesCodec, - ntex::service::fn_service(move |msg: DispatchItem| { + ntex_service::fn_service(move |msg: DispatchItem| { let data = data2.clone(); async move { match msg { @@ -998,7 +1001,7 @@ mod tests { } }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.keepalive_timeout(Seconds(2)).await; }); @@ -1049,7 +1052,7 @@ mod tests { } /// Do not use keep-alive timer if not configured - #[ntex::test] + #[ntex_macros::rt_test] async fn test_no_keepalive_err_after_frame_timeout() { let (client, server) = Io::create(); client.remote_buffer_cap(1024); @@ -1064,7 +1067,7 @@ mod tests { nio::Io::new(server), BytesLenCodec(2), config, - ntex::service::fn_service(move |msg: DispatchItem| { + ntex_service::fn_service(move |msg: DispatchItem| { let data = data2.clone(); async move { match msg { @@ -1081,7 +1084,7 @@ mod tests { } }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); @@ -1095,7 +1098,7 @@ mod tests { assert_eq!(&data.lock().unwrap().borrow()[..], &[0]); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_read_timeout() { let (client, server) = Io::create(); client.remote_buffer_cap(1024); @@ -1114,7 +1117,7 @@ mod tests { nio::Io::new(server), BytesLenCodec(8), config, - ntex::service::fn_service(move |msg: DispatchItem| { + ntex_service::fn_service(move |msg: DispatchItem| { let data = data2.clone(); async move { match msg { @@ -1131,7 +1134,7 @@ mod tests { } }), ); - ntex::rt::spawn(async move { + ntex_util::spawn(async move { let _ = disp.await; }); diff --git a/src/server.rs b/src/server.rs index a6481b9..cf9caac 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,9 +1,9 @@ -use std::{fmt, io, marker, task::Context, task::Poll}; +use std::{fmt, io, marker}; -use ntex::io::{Filter, Io, IoBoxed}; -use ntex::service::{Service, ServiceCtx, ServiceFactory}; -use ntex::time::{Deadline, Millis, Seconds}; -use ntex::util::{join, select, Either}; +use ntex_io::{Filter, Io, IoBoxed}; +use ntex_service::{Service, ServiceCtx, ServiceFactory}; +use ntex_util::future::{join, select, Either}; +use ntex_util::time::{Deadline, Millis, Seconds}; use crate::version::{ProtocolVersion, VersionCodec}; use crate::{error::HandshakeError, error::MqttError, v3, v5}; @@ -214,27 +214,17 @@ where type Error = MqttError; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready1 = self.handlers.0.poll_ready(cx)?.is_ready(); - let ready2 = self.handlers.1.poll_ready(cx)?.is_ready(); - - if ready1 && ready2 { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (ready1, ready2) = + join(ctx.ready(&self.handlers.0), ctx.ready(&self.handlers.1)).await; + ready1?; + ready2 } #[inline] - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let ready1 = self.handlers.0.poll_shutdown(cx).is_ready(); - let ready2 = self.handlers.1.poll_shutdown(cx).is_ready(); - - if ready1 && ready2 { - Poll::Ready(()) - } else { - Poll::Pending - } + async fn shutdown(&self) { + self.handlers.0.shutdown().await; + self.handlers.1.shutdown().await; } #[inline] @@ -290,13 +280,13 @@ where type Error = MqttError; #[inline] - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - Service::::poll_ready(self, cx) + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + Service::::ready(self, ctx).await } #[inline] - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - Service::::poll_shutdown(self, cx) + async fn shutdown(&self) { + Service::::shutdown(self).await } #[inline] diff --git a/src/service.rs b/src/service.rs index 714ff01..a0f743e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,9 +1,9 @@ use std::{fmt, marker::PhantomData, rc::Rc}; -use ntex::codec::{Decoder, Encoder}; -use ntex::io::{DispatchItem, DispatcherConfig, Filter, Io, IoBoxed}; -use ntex::service::{Service, ServiceCtx, ServiceFactory}; -use ntex::time::Seconds; +use ntex_codec::{Decoder, Encoder}; +use ntex_io::{DispatchItem, DispatcherConfig, Filter, Io, IoBoxed}; +use ntex_service::{Service, ServiceCtx, ServiceFactory}; +use ntex_util::time::Seconds; use crate::io::Dispatcher; @@ -112,8 +112,8 @@ where type Response = (); type Error = C::Error; - ntex::forward_poll_ready!(connect); - ntex::forward_poll_shutdown!(connect); + ntex_service::forward_ready!(connect); + ntex_service::forward_shutdown!(connect); async fn call(&self, req: IoBoxed, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let tag = req.tag(); @@ -150,8 +150,8 @@ where type Response = (); type Error = C::Error; - ntex::forward_poll_ready!(connect); - ntex::forward_poll_shutdown!(connect); + ntex_service::forward_ready!(connect); + ntex_service::forward_shutdown!(connect); #[inline] async fn call(&self, io: Io, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { diff --git a/src/topic.rs b/src/topic.rs index 0f945f6..d06bbad 100644 --- a/src/topic.rs +++ b/src/topic.rs @@ -1,6 +1,6 @@ use std::{fmt, fmt::Write, io}; -use ntex::util::ByteString; +use ntex_bytes::ByteString; pub(crate) fn is_valid(topic: &str) -> bool { if topic.is_empty() { diff --git a/src/utils.rs b/src/utils.rs index b28589d..c0f7c4f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,6 @@ use std::{io::Cursor, num::NonZeroU16, num::NonZeroU32}; -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use crate::error::{DecodeError, EncodeError}; diff --git a/src/v3/client/connection.rs b/src/v3/client/connection.rs index 4e948c7..cc145ec 100644 --- a/src/v3/client/connection.rs +++ b/src/v3/client/connection.rs @@ -1,11 +1,11 @@ #![allow(clippy::let_underscore_future)] use std::{fmt, marker::PhantomData, rc::Rc}; -use ntex::io::{DispatcherConfig, IoBoxed}; -use ntex::router::{IntoPattern, Router, RouterBuilder}; -use ntex::service::{boxed, into_service, IntoService, Pipeline, Service}; -use ntex::time::{sleep, Millis, Seconds}; -use ntex::util::{Either, Ready}; +use ntex_io::{DispatcherConfig, IoBoxed}; +use ntex_router::{IntoPattern, Router, RouterBuilder}; +use ntex_service::{boxed, fn_service, IntoService, Pipeline, Service}; +use ntex_util::future::{Either, Ready}; +use ntex_util::time::{sleep, Millis, Seconds}; use crate::v3::{codec, shared::MqttShared, sink::MqttSink, ControlAck, Publish}; use crate::{error::MqttError, io::Dispatcher}; @@ -96,14 +96,14 @@ impl Client { pub async fn start_default(self) { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( self.shared.clone(), self.max_receive, - into_service(|pkt| Ready::Ok(Either::Right(pkt))), - into_service(|msg: Control<()>| Ready::<_, ()>::Ok(msg.disconnect())), + fn_service(|pkt| Ready::Ok(Either::Right(pkt))), + fn_service(|msg: Control<()>| Ready::<_, ()>::Ok(msg.disconnect())), ); let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await; @@ -118,13 +118,13 @@ impl Client { { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( self.shared.clone(), self.max_receive, - into_service(|pkt| Ready::Ok(Either::Right(pkt))), + fn_service(|pkt| Ready::Ok(Either::Right(pkt))), service.into_service(), ); @@ -182,14 +182,14 @@ where pub async fn start_default(self) { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( self.shared.clone(), self.max_receive, dispatch(self.builder.finish(), self.handlers), - into_service(|msg: Control| Ready::<_, Err>::Ok(msg.disconnect())), + fn_service(|msg: Control| Ready::<_, Err>::Ok(msg.disconnect())), ); let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await; @@ -203,7 +203,7 @@ where { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( @@ -227,7 +227,7 @@ where { let handlers = Rc::new(handlers); - into_service(move |mut req: Publish| { + fn_service(move |mut req: Publish| { if let Some((idx, _info)) = router.recognize(req.topic_mut()) { // exec handler let idx = *idx; diff --git a/src/v3/client/connector.rs b/src/v3/client/connector.rs index 1978bed..ef9588a 100644 --- a/src/v3/client/connector.rs +++ b/src/v3/client/connector.rs @@ -1,10 +1,10 @@ use std::rc::Rc; -use ntex::connect::{self, Address, Connect, Connector}; -use ntex::io::{DispatcherConfig, IoBoxed}; -use ntex::service::{IntoService, Pipeline, Service}; -use ntex::time::{timeout_checked, Seconds}; -use ntex::util::{ByteString, Bytes, PoolId}; +use ntex_bytes::{ByteString, Bytes, PoolId}; +use ntex_io::{DispatcherConfig, IoBoxed}; +use ntex_net::connect::{self, Address, Connect, Connector}; +use ntex_service::{IntoService, Pipeline, Service}; +use ntex_util::time::{timeout_checked, Seconds}; use super::{codec, connection::Client, error::ClientError, error::ProtocolError}; use crate::v3::shared::{MqttShared, MqttSinkPool}; diff --git a/src/v3/client/dispatcher.rs b/src/v3/client/dispatcher.rs index c340573..6bbf2df 100644 --- a/src/v3/client/dispatcher.rs +++ b/src/v3/client/dispatcher.rs @@ -1,9 +1,9 @@ -use std::task::{Context, Poll}; use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc}; -use ntex::io::DispatchItem; -use ntex::service::{Pipeline, Service, ServiceCtx}; -use ntex::util::{inflight::InFlightService, BoxFuture, Either, HashSet}; +use ntex_io::DispatchItem; +use ntex_service::{Pipeline, Service, ServiceCtx}; +use ntex_util::future::{join, Either}; +use ntex_util::{services::inflight::InFlightService, HashSet}; use crate::error::{HandshakeError, MqttError, ProtocolError}; use crate::v3::shared::{Ack, MqttShared}; @@ -33,7 +33,6 @@ where /// Mqtt protocol dispatcher pub(crate) struct Dispatcher>, E> { publish: T, - shutdown: RefCell>>, inner: Rc>, _t: PhantomData, } @@ -52,7 +51,6 @@ where pub(crate) fn new(sink: Rc, publish: T, control: C) -> Self { Self { publish, - shutdown: RefCell::new(None), inner: Rc::new(Inner { sink, control, inflight: RefCell::new(HashSet::default()) }), _t: PhantomData, } @@ -68,35 +66,19 @@ where type Response = Option; type Error = MqttError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let res1 = self.publish.poll_ready(cx).map_err(MqttError::Service)?; - let res2 = self.inner.control.poll_ready(cx)?; - - if res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + #[inline] + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; + res1.map_err(MqttError::Service)?; + res2 } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut shutdown = self.shutdown.borrow_mut(); - if !shutdown.is_some() { - self.inner.sink.close(); - let inner = self.inner.clone(); - *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(Control::closed()).await; - })); - } + async fn shutdown(&self) { + self.inner.sink.close(); + let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await; - let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx); - let res1 = self.publish.poll_shutdown(cx); - let res2 = self.inner.control.poll_shutdown(cx); - if res0.is_pending() || res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(()) - } + self.publish.shutdown().await; + self.inner.control.shutdown().await } async fn call( @@ -251,15 +233,18 @@ where #[cfg(test)] mod tests { - use ntex::time::{sleep, Seconds}; - use ntex::util::{lazy, ByteString, Bytes, Ready}; - use ntex::{io::Io, service::fn_service, testing::IoTest}; use std::{future::Future, pin::Pin}; + use ntex_bytes::{ByteString, Bytes}; + use ntex_io::{testing::IoTest, Io}; + use ntex_service::fn_service; + use ntex_util::future::{lazy, Ready}; + use ntex_util::time::{sleep, Seconds}; + use super::*; use crate::v3::{codec::Codec, MqttSink, QoS}; - #[ntex::test] + #[ntex_macros::rt_test] async fn test_dup_packet_id() { let io = Io::new(IoTest::create().0); let codec = codec::Codec::default(); @@ -309,7 +294,7 @@ mod tests { } } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_wr_backpressure() { let io = Io::new(IoTest::create().0); let codec = Codec::default(); diff --git a/src/v3/codec/codec.rs b/src/v3/codec/codec.rs index aa291b8..e04d2fa 100644 --- a/src/v3/codec/codec.rs +++ b/src/v3/codec/codec.rs @@ -1,7 +1,7 @@ use std::cell::Cell; -use ntex::codec::{Decoder, Encoder}; -use ntex::util::{Buf, BytesMut}; +use ntex_bytes::{Buf, BytesMut}; +use ntex_codec::{Decoder, Encoder}; use super::{decode, encode, Packet, Publish}; use crate::error::{DecodeError, EncodeError}; @@ -115,7 +115,7 @@ impl Encoder for Codec { #[cfg(test)] mod tests { use super::*; - use ntex::util::{ByteString, Bytes}; + use ntex_bytes::{ByteString, Bytes}; #[test] fn test_max_size() { diff --git a/src/v3/codec/decode.rs b/src/v3/codec/decode.rs index a28535a..5123891 100644 --- a/src/v3/codec/decode.rs +++ b/src/v3/codec/decode.rs @@ -1,6 +1,6 @@ use std::num::NonZeroU16; -use ntex::util::{Buf, ByteString, Bytes}; +use ntex_bytes::{Buf, ByteString, Bytes}; use crate::error::DecodeError; use crate::types::{packet_type, QoS, MQTT, MQTT_LEVEL_3, WILL_QOS_SHIFT}; diff --git a/src/v3/codec/encode.rs b/src/v3/codec/encode.rs index 61c090d..8910920 100644 --- a/src/v3/codec/encode.rs +++ b/src/v3/codec/encode.rs @@ -1,4 +1,4 @@ -use ntex::util::{BufMut, ByteString, BytesMut}; +use ntex_bytes::{BufMut, ByteString, BytesMut}; use crate::error::EncodeError; use crate::types::{packet_type, ConnectFlags, QoS, MQTT, MQTT_LEVEL_3, WILL_QOS_SHIFT}; @@ -224,7 +224,7 @@ fn encode_connect(connect: &Connect, dst: &mut BytesMut) -> Result<(), EncodeErr #[cfg(test)] mod tests { - use ntex::util::Bytes; + use ntex_bytes::Bytes; use std::num::NonZeroU16; use super::*; diff --git a/src/v3/codec/packet.rs b/src/v3/codec/packet.rs index a69e9a3..80a9f1c 100644 --- a/src/v3/codec/packet.rs +++ b/src/v3/codec/packet.rs @@ -1,6 +1,6 @@ use std::{fmt, num::NonZeroU16}; -use ntex::util::{ByteString, Bytes}; +use ntex_bytes::{ByteString, Bytes}; use crate::types::{packet_type, QoS}; diff --git a/src/v3/control.rs b/src/v3/control.rs index 072269d..424aa1d 100644 --- a/src/v3/control.rs +++ b/src/v3/control.rs @@ -1,4 +1,4 @@ -use ntex::util::ByteString; +use ntex_bytes::ByteString; use std::{io, marker::PhantomData, num::NonZeroU16}; use super::codec; diff --git a/src/v3/default.rs b/src/v3/default.rs index 448b2bf..242531c 100644 --- a/src/v3/default.rs +++ b/src/v3/default.rs @@ -1,6 +1,6 @@ use std::{fmt, marker::PhantomData}; -use ntex::service::{Service, ServiceCtx, ServiceFactory}; +use ntex_service::{Service, ServiceCtx, ServiceFactory}; use super::control::{Control, ControlAck, ControlAckKind}; use super::publish::Publish; diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index 3187939..5f27d7e 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -1,10 +1,10 @@ -use std::task::{Context, Poll}; use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc}; -use ntex::io::DispatchItem; -use ntex::service::{self, Pipeline, Service, ServiceCtx, ServiceFactory}; -use ntex::util::buffer::{BufferService, BufferServiceError}; -use ntex::util::{inflight::InFlightService, join, BoxFuture, HashSet}; +use ntex_io::DispatchItem; +use ntex_service::{Pipeline, Service, ServiceCtx, ServiceFactory}; +use ntex_util::services::buffer::{BufferService, BufferServiceError}; +use ntex_util::services::inflight::InFlightService; +use ntex_util::{future::join, HashSet}; use crate::error::{HandshakeError, MqttError, ProtocolError}; use crate::types::QoS; @@ -35,7 +35,7 @@ where { let factories = Rc::new((publish, control)); - service::fn_factory_with_config(move |session: Session| { + ntex_service::fn_factory_with_config(move |session: Session| { let factories = factories.clone(); async move { @@ -92,7 +92,6 @@ pub(crate) struct Dispatcher>, E> { publish: T, max_qos: QoS, handle_qos_after_disconnect: Option, - shutdown: RefCell>>, inner: Rc>, _t: PhantomData<(E,)>, } @@ -120,7 +119,6 @@ where publish, max_qos, handle_qos_after_disconnect, - shutdown: RefCell::new(None), inner: Rc::new(Inner { sink, control, inflight: RefCell::new(HashSet::default()) }), _t: PhantomData, } @@ -136,35 +134,19 @@ where type Response = Option; type Error = MqttError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let res1 = self.publish.poll_ready(cx).map_err(|e| MqttError::Service(e.into()))?; - let res2 = self.inner.control.poll_ready(cx)?; - - if res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + #[inline] + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; + res1.map_err(|e| MqttError::Service(e.into()))?; + res2 } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut shutdown = self.shutdown.borrow_mut(); - if !shutdown.is_some() { - self.inner.sink.close(); - let inner = self.inner.clone(); - *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(Control::closed()).await; - })); - } + async fn shutdown(&self) { + self.inner.sink.close(); + let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await; - let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx); - let res1 = self.publish.poll_shutdown(cx); - let res2 = self.inner.control.poll_shutdown(cx); - if res0.is_pending() || res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(()) - } + self.publish.shutdown().await; + self.inner.control.shutdown().await; } async fn call( @@ -442,15 +424,18 @@ where #[cfg(test)] mod tests { - use ntex::time::{sleep, Seconds}; - use ntex::util::{lazy, ByteString, Bytes, Ready}; - use ntex::{io::Io, service::fn_service, testing::IoTest}; use std::{future::Future, pin::Pin}; + use ntex_bytes::{ByteString, Bytes}; + use ntex_io::{testing::IoTest, Io}; + use ntex_service::fn_service; + use ntex_util::future::{lazy, Ready}; + use ntex_util::time::{sleep, Seconds}; + use super::*; use crate::v3::MqttSink; - #[ntex::test] + #[ntex_macros::rt_test] async fn test_dup_packet_id() { let io = Io::new(IoTest::create().0); let codec = codec::Codec::default(); @@ -503,7 +488,7 @@ mod tests { assert!(*err.borrow()); } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_wr_backpressure() { let io = Io::new(IoTest::create().0); let codec = codec::Codec::default(); diff --git a/src/v3/handshake.rs b/src/v3/handshake.rs index 1bf6c95..71990ce 100644 --- a/src/v3/handshake.rs +++ b/src/v3/handshake.rs @@ -1,10 +1,9 @@ use std::{fmt, rc::Rc}; -use ntex::{io::IoBoxed, time::Seconds}; +use ntex_io::IoBoxed; +use ntex_util::time::Seconds; -use super::codec as mqtt; -use super::shared::MqttShared; -use super::sink::MqttSink; +use super::{codec as mqtt, shared::MqttShared, sink::MqttSink}; const DEFAULT_KEEPALIVE: Seconds = Seconds(30); diff --git a/src/v3/publish.rs b/src/v3/publish.rs index 6cb3649..0e7be1c 100644 --- a/src/v3/publish.rs +++ b/src/v3/publish.rs @@ -1,7 +1,7 @@ use std::{mem, num::NonZeroU16}; -use ntex::router::Path; -use ntex::util::{ByteString, Bytes}; +use ntex_bytes::{ByteString, Bytes}; +use ntex_router::Path; use serde::de::DeserializeOwned; use serde_json::Error as JsonError; diff --git a/src/v3/router.rs b/src/v3/router.rs index 578b62d..40a504e 100644 --- a/src/v3/router.rs +++ b/src/v3/router.rs @@ -1,8 +1,8 @@ -use std::{rc::Rc, task::Context, task::Poll}; +use std::rc::Rc; -use ntex::router::{IntoPattern, RouterBuilder}; -use ntex::service::boxed::{self, BoxService, BoxServiceFactory}; -use ntex::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use ntex_router::{IntoPattern, RouterBuilder}; +use ntex_service::boxed::{self, BoxService, BoxServiceFactory}; +use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; use super::{publish::Publish, Session}; @@ -32,7 +32,7 @@ where + 'static, { Router { - router: ntex::router::Router::build(), + router: ntex_router::Router::build(), handlers: Vec::new(), default: boxed::factory(default_service.into_factory()), } @@ -67,7 +67,7 @@ where } pub struct RouterFactory { - router: Rc>, + router: Rc>, handlers: Vec>, default: Handler, } @@ -99,7 +99,7 @@ where } pub struct RouterService { - router: Rc>, + router: Rc>, handlers: Vec>, default: HandlerService, } @@ -108,23 +108,12 @@ impl Service for RouterService { type Response = (); type Error = Err; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let mut not_ready = false; + #[inline] + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { for hnd in &self.handlers { - if hnd.poll_ready(cx)?.is_pending() { - not_ready = true; - } - } - - if self.default.poll_ready(cx)?.is_pending() { - not_ready = true; - } - - if not_ready { - Poll::Pending - } else { - Poll::Ready(Ok(())) + ctx.ready(hnd).await?; } + ctx.ready(&self.default).await } async fn call( diff --git a/src/v3/server.rs b/src/v3/server.rs index f228b6c..1ccbb5a 100644 --- a/src/v3/server.rs +++ b/src/v3/server.rs @@ -1,8 +1,8 @@ use std::{fmt, marker::PhantomData, rc::Rc}; -use ntex::io::{DispatchItem, DispatcherConfig, IoBoxed}; -use ntex::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use ntex::time::{timeout_checked, Millis, Seconds}; +use ntex_io::{DispatchItem, DispatcherConfig, IoBoxed}; +use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use ntex_util::time::{timeout_checked, Millis, Seconds}; use crate::error::{HandshakeError, MqttError, ProtocolError}; use crate::{service, types::QoS}; @@ -375,8 +375,8 @@ where type Response = (IoBoxed, Rc, Session, Seconds); type Error = MqttError; - ntex::forward_poll_ready!(service, MqttError::Service); - ntex::forward_poll_shutdown!(service); + ntex_service::forward_ready!(service, MqttError::Service); + ntex_service::forward_shutdown!(service); async fn call( &self, diff --git a/src/v3/shared.rs b/src/v3/shared.rs index a84c73f..1784fb3 100644 --- a/src/v3/shared.rs +++ b/src/v3/shared.rs @@ -1,9 +1,9 @@ use std::{cell::Cell, cell::RefCell, collections::VecDeque, num::NonZeroU16, rc::Rc}; -use ntex::channel::pool; -use ntex::codec::{Decoder, Encoder}; -use ntex::io::IoRef; -use ntex::util::{BytesMut, HashSet, PoolId, PoolRef}; +use ntex_bytes::{BytesMut, PoolId, PoolRef}; +use ntex_codec::{Decoder, Encoder}; +use ntex_io::IoRef; +use ntex_util::{channel::pool, HashSet}; use crate::error::{DecodeError, EncodeError, ProtocolError, SendPacketError}; use crate::{types::packet_type, v3::codec}; diff --git a/src/v3/sink.rs b/src/v3/sink.rs index 0f4a4c3..3fed6fb 100644 --- a/src/v3/sink.rs +++ b/src/v3/sink.rs @@ -1,6 +1,7 @@ use std::{fmt, future::ready, future::Future, num::NonZeroU16, rc::Rc}; -use ntex::util::{ByteString, Bytes, Either, Ready}; +use ntex_bytes::{ByteString, Bytes}; +use ntex_util::future::{Either, Ready}; use super::{codec, error::SendPacketError, shared::AckType, shared::MqttShared}; diff --git a/src/v5/client/connection.rs b/src/v5/client/connection.rs index 909b1dd..93c65af 100644 --- a/src/v5/client/connection.rs +++ b/src/v5/client/connection.rs @@ -1,11 +1,12 @@ #![allow(clippy::let_underscore_future)] use std::{cell::RefCell, fmt, marker, num::NonZeroU16, rc::Rc}; -use ntex::io::{DispatcherConfig, IoBoxed}; -use ntex::router::{IntoPattern, Path, Router, RouterBuilder}; -use ntex::service::{boxed, into_service, IntoService, Pipeline, Service}; -use ntex::time::{sleep, Millis, Seconds}; -use ntex::util::{ByteString, Either, HashMap, Ready}; +use ntex_bytes::ByteString; +use ntex_io::{DispatcherConfig, IoBoxed}; +use ntex_router::{IntoPattern, Path, Router, RouterBuilder}; +use ntex_service::{boxed, fn_service, IntoService, Pipeline, Service}; +use ntex_util::time::{sleep, Millis, Seconds}; +use ntex_util::{future::Either, future::Ready, HashMap}; use crate::v5::publish::{Publish, PublishAck}; use crate::v5::{codec, shared::MqttShared, sink::MqttSink, ControlAck}; @@ -104,15 +105,15 @@ impl Client { pub async fn start_default(self) { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( MqttSink::new(self.shared.clone()), self.max_receive, 16, - into_service(|pkt| Ready::Ok(Either::Left(pkt))), - into_service(|msg: Control<()>| { + fn_service(|pkt| Ready::Ok(Either::Left(pkt))), + fn_service(|msg: Control<()>| { Ready::Ok(msg.disconnect(codec::Disconnect::default())) }), ); @@ -129,14 +130,14 @@ impl Client { { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( MqttSink::new(self.shared.clone()), self.max_receive, 16, - into_service(|pkt| Ready::Ok(Either::Left(pkt))), + fn_service(|pkt| Ready::Ok(Either::Left(pkt))), service.into_service(), ); @@ -195,7 +196,7 @@ where pub async fn start_default(self) { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( @@ -203,7 +204,7 @@ where self.max_receive, 16, dispatch(self.builder.finish(), self.handlers), - into_service(|msg: Control| { + fn_service(|msg: Control| { Ready::Ok(msg.disconnect(codec::Disconnect::default())) }), ); @@ -219,7 +220,7 @@ where { if self.keepalive.non_zero() { let _ = - ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); + ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } let dispatcher = create_dispatcher( @@ -252,7 +253,7 @@ where RefCell::new(HashMap::default()); let handlers = Rc::new(handlers); - into_service(move |mut req: Publish| { + fn_service(move |mut req: Publish| { let idx = if !req.publish_topic().is_empty() { if let Some((idx, _info)) = router.recognize(req.topic_mut()) { // save info for topic alias diff --git a/src/v5/client/connector.rs b/src/v5/client/connector.rs index f0ebba1..3f93e66 100644 --- a/src/v5/client/connector.rs +++ b/src/v5/client/connector.rs @@ -1,10 +1,10 @@ use std::{num::NonZeroU16, num::NonZeroU32, rc::Rc}; -use ntex::connect::{self, Address, Connect, Connector}; -use ntex::io::{DispatcherConfig, IoBoxed}; -use ntex::service::{IntoService, Pipeline, Service}; -use ntex::time::{timeout_checked, Seconds}; -use ntex::util::{ByteString, Bytes, PoolId}; +use ntex_bytes::{ByteString, Bytes, PoolId}; +use ntex_io::{DispatcherConfig, IoBoxed}; +use ntex_net::connect::{self, Address, Connect, Connector}; +use ntex_service::{IntoService, Pipeline, Service}; +use ntex_util::time::{timeout_checked, Seconds}; use super::{codec, connection::Client, error::ClientError, error::ProtocolError}; use crate::v5::shared::{MqttShared, MqttSinkPool}; diff --git a/src/v5/client/control.rs b/src/v5/client/control.rs index 8abd4b3..faaab20 100644 --- a/src/v5/client/control.rs +++ b/src/v5/client/control.rs @@ -1,6 +1,6 @@ use std::io; -use ntex::util::ByteString; +use ntex_bytes::ByteString; use crate::{error, v5::codec}; diff --git a/src/v5/client/dispatcher.rs b/src/v5/client/dispatcher.rs index 96bd10a..00861ee 100644 --- a/src/v5/client/dispatcher.rs +++ b/src/v5/client/dispatcher.rs @@ -1,9 +1,9 @@ -use std::task::{Context, Poll}; use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc}; -use ntex::io::DispatchItem; -use ntex::service::{Pipeline, Service, ServiceCtx}; -use ntex::util::{BoxFuture, ByteString, Either, HashMap, HashSet}; +use ntex_bytes::ByteString; +use ntex_io::DispatchItem; +use ntex_service::{Pipeline, Service, ServiceCtx}; +use ntex_util::{future::join, future::Either, HashMap, HashSet}; use crate::error::{HandshakeError, MqttError, ProtocolError}; use crate::types::packet_type; @@ -38,7 +38,6 @@ where /// Mqtt protocol dispatcher pub(crate) struct Dispatcher>, E> { publish: T, - shutdown: RefCell>>, max_receive: usize, max_topic_alias: u16, inner: Rc>, @@ -72,7 +71,6 @@ where publish, max_receive, max_topic_alias, - shutdown: RefCell::new(None), inner: Rc::new(Inner { control, sink: sink.shared(), @@ -94,35 +92,19 @@ where type Response = Option; type Error = MqttError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let res1 = self.publish.poll_ready(cx).map_err(MqttError::Service)?; - let res2 = self.inner.control.poll_ready(cx)?; - - if res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + #[inline] + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; + res1.map_err(MqttError::Service)?; + res2 } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut shutdown = self.shutdown.borrow_mut(); - if !shutdown.is_some() { - self.inner.sink.drop_sink(); - let inner = self.inner.clone(); - *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(Control::closed()).await; - })); - } + async fn shutdown(&self) { + self.inner.sink.drop_sink(); + let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await; - let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx); - let res1 = self.publish.poll_shutdown(cx); - let res2 = self.inner.control.poll_shutdown(cx); - if res0.is_pending() || res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(()) - } + self.publish.shutdown().await; + self.inner.control.shutdown().await; } #[allow(clippy::await_holding_refcell_ref)] @@ -427,14 +409,16 @@ where #[cfg(test)] mod tests { - use ntex::{io::Io, service::fn_service, testing::IoTest, util::lazy, util::Ready}; + use ntex_io::{testing::IoTest, Io}; + use ntex_service::fn_service; + use ntex_util::future::{lazy, Ready}; use super::*; #[derive(Debug)] struct TestError; - #[ntex::test] + #[ntex_macros::rt_test] async fn test_wr_backpressure() { let io = Io::new(IoTest::create().0); let codec = codec::Codec::default(); diff --git a/src/v5/codec/codec.rs b/src/v5/codec/codec.rs index ca4f195..6f419fe 100644 --- a/src/v5/codec/codec.rs +++ b/src/v5/codec/codec.rs @@ -1,7 +1,7 @@ use std::cell::Cell; -use ntex::codec::{Decoder, Encoder}; -use ntex::util::{Buf, BytesMut}; +use ntex_bytes::{Buf, BytesMut}; +use ntex_codec::{Decoder, Encoder}; use super::{decode::decode_packet, encode::EncodeLtd, Packet}; use crate::error::{DecodeError, EncodeError}; diff --git a/src/v5/codec/decode.rs b/src/v5/codec/decode.rs index b7bdbb6..112d899 100644 --- a/src/v5/codec/decode.rs +++ b/src/v5/codec/decode.rs @@ -1,4 +1,4 @@ -use ntex::util::{ByteString, Bytes}; +use ntex_bytes::{ByteString, Bytes}; use super::{packet::*, UserProperty}; use crate::error::DecodeError; @@ -38,7 +38,7 @@ impl Decode for UserProperty { #[cfg(test)] mod tests { - use ntex::util::BytesMut; + use ntex_bytes::BytesMut; use std::num::{NonZeroU16, NonZeroU32}; use super::*; @@ -55,7 +55,7 @@ mod tests { let (_len, consumed) = decode_variable_length(&bytes[1..]).unwrap().unwrap(); let cur = Bytes::copy_from_slice(&bytes[consumed + 1..]); let mut tmp = BytesMut::with_capacity(4096); - ntex::codec::Encoder::encode(&crate::v5::codec::Codec::new(), res.clone(), &mut tmp) + ntex_codec::Encoder::encode(&crate::v5::codec::Codec::new(), res.clone(), &mut tmp) .unwrap(); let decoded = decode_packet(cur, fixed); let res = Ok(res); diff --git a/src/v5/codec/encode.rs b/src/v5/codec/encode.rs index ca7593d..4b3c912 100644 --- a/src/v5/codec/encode.rs +++ b/src/v5/codec/encode.rs @@ -1,4 +1,4 @@ -use ntex::util::{BufMut, ByteString, BytesMut}; +use ntex_bytes::{BufMut, ByteString, BytesMut}; use super::packet::{property_type as pt, *}; use super::{UserProperties, UserProperty}; @@ -271,7 +271,7 @@ pub(super) fn reduce_limit(limit: u32, reduction: usize) -> u32 { #[cfg(test)] mod tests { - use ntex::util::Bytes; + use ntex_bytes::Bytes; use std::num::{NonZeroU16, NonZeroU32}; use super::*; diff --git a/src/v5/codec/mod.rs b/src/v5/codec/mod.rs index 20b0c5c..bebea49 100644 --- a/src/v5/codec/mod.rs +++ b/src/v5/codec/mod.rs @@ -1,6 +1,6 @@ //! MQTT v5 Protocol codec -use ntex::util::ByteString; +use ntex_bytes::ByteString; #[allow(clippy::module_inception)] mod codec; diff --git a/src/v5/codec/packet/auth.rs b/src/v5/codec/packet/auth.rs index 7c81122..ffd4e2a 100644 --- a/src/v5/codec/packet/auth.rs +++ b/src/v5/codec/packet/auth.rs @@ -1,4 +1,4 @@ -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use crate::error::{DecodeError, EncodeError}; use crate::utils::{self, Decode, Property}; diff --git a/src/v5/codec/packet/connack.rs b/src/v5/codec/packet/connack.rs index a71bc57..ca82224 100644 --- a/src/v5/codec/packet/connack.rs +++ b/src/v5/codec/packet/connack.rs @@ -1,6 +1,6 @@ use std::num::NonZeroU16; -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use crate::error::{DecodeError, EncodeError}; use crate::types::{ConnectAckFlags, QoS}; diff --git a/src/v5/codec/packet/connect.rs b/src/v5/codec/packet/connect.rs index 24f9962..5e62c64 100644 --- a/src/v5/codec/packet/connect.rs +++ b/src/v5/codec/packet/connect.rs @@ -1,6 +1,6 @@ use std::num::{NonZeroU16, NonZeroU32}; -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use crate::error::{DecodeError, EncodeError}; use crate::types::{ConnectFlags, QoS, MQTT, MQTT_LEVEL_5, WILL_QOS_SHIFT}; diff --git a/src/v5/codec/packet/disconnect.rs b/src/v5/codec/packet/disconnect.rs index 39b6e3a..cb8800c 100644 --- a/src/v5/codec/packet/disconnect.rs +++ b/src/v5/codec/packet/disconnect.rs @@ -1,4 +1,4 @@ -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use crate::error::{DecodeError, EncodeError}; use crate::utils::{self, Decode, Property}; diff --git a/src/v5/codec/packet/mod.rs b/src/v5/codec/packet/mod.rs index 5f99658..ee679a9 100644 --- a/src/v5/codec/packet/mod.rs +++ b/src/v5/codec/packet/mod.rs @@ -1,4 +1,4 @@ -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; pub use crate::types::{ConnectAckFlags, ConnectFlags, QoS}; diff --git a/src/v5/codec/packet/pubacks.rs b/src/v5/codec/packet/pubacks.rs index cfbe441..025d3e8 100644 --- a/src/v5/codec/packet/pubacks.rs +++ b/src/v5/codec/packet/pubacks.rs @@ -1,6 +1,6 @@ use std::num::NonZeroU16; -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use super::ack_props; use crate::error::{DecodeError, EncodeError}; diff --git a/src/v5/codec/packet/publish.rs b/src/v5/codec/packet/publish.rs index bd73cfd..f7097d1 100644 --- a/src/v5/codec/packet/publish.rs +++ b/src/v5/codec/packet/publish.rs @@ -1,6 +1,6 @@ use std::{fmt, num::NonZeroU16, num::NonZeroU32}; -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use crate::error::{DecodeError, EncodeError}; use crate::types::QoS; diff --git a/src/v5/codec/packet/subscribe.rs b/src/v5/codec/packet/subscribe.rs index c3030fe..246cd70 100644 --- a/src/v5/codec/packet/subscribe.rs +++ b/src/v5/codec/packet/subscribe.rs @@ -1,6 +1,6 @@ use std::num::{NonZeroU16, NonZeroU32}; -use ntex::util::{Buf, BufMut, ByteString, Bytes, BytesMut}; +use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut}; use super::ack_props; use crate::error::{DecodeError, EncodeError}; @@ -326,7 +326,7 @@ impl EncodeLtd for UnsubscribeAck { #[cfg(test)] mod tests { - use ntex::codec::{Decoder, Encoder}; + use ntex_codec::{Decoder, Encoder}; use super::super::super::{Codec, Packet}; use super::*; diff --git a/src/v5/control.rs b/src/v5/control.rs index 9f5b1d0..6d60b9c 100644 --- a/src/v5/control.rs +++ b/src/v5/control.rs @@ -1,6 +1,6 @@ use std::{io, marker::PhantomData}; -use ntex::util::ByteString; +use ntex_bytes::ByteString; use super::codec::{self, DisconnectReasonCode, QoS, UserProperties}; use crate::error; diff --git a/src/v5/default.rs b/src/v5/default.rs index 84c31b5..e9d08ae 100644 --- a/src/v5/default.rs +++ b/src/v5/default.rs @@ -1,6 +1,6 @@ use std::{fmt, marker::PhantomData}; -use ntex::service::{Service, ServiceCtx, ServiceFactory}; +use ntex_service::{Service, ServiceCtx, ServiceFactory}; use super::control::{Control, ControlAck}; use super::publish::{Publish, PublishAck}; diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index 1500062..ac6ed4b 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -1,10 +1,11 @@ -use std::{cell::RefCell, marker, num, rc::Rc, task::Context, task::Poll}; +use std::{cell::RefCell, marker, num, rc::Rc}; -use ntex::io::DispatchItem; -use ntex::util::inflight::InFlightService; -use ntex::util::{buffer::BufferService, buffer::BufferServiceError}; -use ntex::util::{join, BoxFuture, ByteString, HashMap, HashSet}; -use ntex::{service, Pipeline, Service, ServiceCtx, ServiceFactory}; +use ntex_bytes::ByteString; +use ntex_io::DispatchItem; +use ntex_service::{self as service, Pipeline, Service, ServiceCtx, ServiceFactory}; +use ntex_util::services::inflight::InFlightService; +use ntex_util::services::{buffer::BufferService, buffer::BufferServiceError}; +use ntex_util::{future::join, HashMap, HashSet}; use crate::error::{HandshakeError, MqttError, ProtocolError}; use crate::types::QoS; @@ -83,7 +84,6 @@ impl crate::inflight::SizedRequest for DispatchItem> { pub(crate) struct Dispatcher>, E> { publish: T, handle_qos_after_disconnect: Option, - shutdown: RefCell>>, inner: Rc>, _t: marker::PhantomData, } @@ -115,7 +115,6 @@ where Self { publish, handle_qos_after_disconnect, - shutdown: RefCell::new(None), inner: Rc::new(Inner { sink, control, @@ -139,35 +138,19 @@ where type Response = Option; type Error = MqttError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let res1 = self.publish.poll_ready(cx).map_err(|e| MqttError::Service(e.into()))?; - let res2 = self.inner.control.poll_ready(cx)?; - - if res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + #[inline] + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; + res1.map_err(|e| MqttError::Service(e.into()))?; + res2 } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut shutdown = self.shutdown.borrow_mut(); - if !shutdown.is_some() { - self.inner.sink.drop_sink(); - let inner = self.inner.clone(); - *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(Control::closed()).await; - })); - } + async fn shutdown(&self) { + self.inner.sink.drop_sink(); + let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await; - let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx); - let res1 = self.publish.poll_shutdown(cx); - let res2 = self.inner.control.poll_shutdown(cx); - if res0.is_pending() || res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(()) - } + self.publish.shutdown().await; + self.inner.control.shutdown().await; } #[allow(clippy::await_holding_refcell_ref)] @@ -580,7 +563,9 @@ where #[cfg(test)] mod tests { - use ntex::{io::Io, service::fn_service, testing::IoTest, util::lazy, util::Ready}; + use ntex_io::{testing::IoTest, Io}; + use ntex_service::fn_service; + use ntex_util::future::{lazy, Ready}; use super::*; use crate::v5::MqttSink; @@ -596,7 +581,7 @@ mod tests { } } - #[ntex::test] + #[ntex_macros::rt_test] async fn test_wr_backpressure() { let io = Io::new(IoTest::create().0); let codec = codec::Codec::default(); diff --git a/src/v5/handshake.rs b/src/v5/handshake.rs index 6e321a4..b2143a1 100644 --- a/src/v5/handshake.rs +++ b/src/v5/handshake.rs @@ -1,4 +1,4 @@ -use ntex::io::IoBoxed; +use ntex_io::IoBoxed; use std::{fmt, num::NonZeroU16, rc::Rc}; use super::{codec, shared::MqttShared, sink::MqttSink}; diff --git a/src/v5/publish.rs b/src/v5/publish.rs index 666213c..86a5f23 100644 --- a/src/v5/publish.rs +++ b/src/v5/publish.rs @@ -1,7 +1,7 @@ use std::{mem, num::NonZeroU16}; -use ntex::router::Path; -use ntex::util::{ByteString, Bytes}; +use ntex_bytes::{ByteString, Bytes}; +use ntex_router::Path; use serde::de::DeserializeOwned; use serde_json::Error as JsonError; diff --git a/src/v5/router.rs b/src/v5/router.rs index dac0b4c..7aab22d 100644 --- a/src/v5/router.rs +++ b/src/v5/router.rs @@ -1,12 +1,12 @@ -use std::{cell::RefCell, num::NonZeroU16, rc::Rc, task::Context, task::Poll}; +use std::{cell::RefCell, num::NonZeroU16, rc::Rc}; -use ntex::router::{IntoPattern, Path, RouterBuilder}; -use ntex::service::boxed::{self, BoxService, BoxServiceFactory}; -use ntex::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use ntex::util::{ByteString, HashMap}; +use ntex_bytes::ByteString; +use ntex_router::{IntoPattern, Path, RouterBuilder}; +use ntex_service::boxed::{self, BoxService, BoxServiceFactory}; +use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use ntex_util::HashMap; -use super::publish::{Publish, PublishAck}; -use super::Session; +use super::{publish::Publish, publish::PublishAck, Session}; type Handler = BoxServiceFactory, Publish, PublishAck, E, E>; type HandlerService = BoxService; @@ -39,7 +39,7 @@ where > + 'static, { Router { - router: ntex::router::Router::build(), + router: ntex_router::Router::build(), handlers: Vec::new(), default: boxed::factory(default_service.into_factory()), } @@ -79,7 +79,7 @@ where } pub struct RouterFactory { - router: ntex::router::Router, + router: ntex_router::Router, handlers: Rc>>, default: Handler, } @@ -112,7 +112,7 @@ where } pub struct RouterService { - router: ntex::router::Router, + router: ntex_router::Router, default: HandlerService, handlers: Vec>, aliases: RefCell)>>, @@ -122,23 +122,12 @@ impl Service for RouterService { type Response = PublishAck; type Error = Err; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let mut not_ready = false; + async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { for hnd in self.handlers.iter() { - if hnd.poll_ready(cx)?.is_pending() { - not_ready = true; - } - } - - if self.default.poll_ready(cx)?.is_pending() { - not_ready = true; + ctx.ready(hnd).await?; } - if not_ready { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + ctx.ready(&self.default).await } #[allow(clippy::await_holding_refcell_ref)] diff --git a/src/v5/server.rs b/src/v5/server.rs index 6860e8d..da0b6c2 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -1,8 +1,8 @@ use std::{fmt, marker::PhantomData, rc::Rc}; -use ntex::io::{DispatchItem, DispatcherConfig, IoBoxed}; -use ntex::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; -use ntex::time::{timeout_checked, Millis, Seconds}; +use ntex_io::{DispatchItem, DispatcherConfig, IoBoxed}; +use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use ntex_util::time::{timeout_checked, Millis, Seconds}; use crate::error::{HandshakeError, MqttError, ProtocolError}; use crate::{service, types::QoS}; @@ -352,8 +352,8 @@ where type Response = (IoBoxed, Rc, Session, Seconds); type Error = MqttError; - ntex::forward_poll_ready!(service, MqttError::Service); - ntex::forward_poll_shutdown!(service); + ntex_service::forward_ready!(service, MqttError::Service); + ntex_service::forward_shutdown!(service); async fn call( &self, diff --git a/src/v5/shared.rs b/src/v5/shared.rs index 980a74c..660a2c9 100644 --- a/src/v5/shared.rs +++ b/src/v5/shared.rs @@ -1,8 +1,9 @@ use std::{cell::Cell, cell::RefCell, collections::VecDeque, num::NonZeroU16, rc::Rc}; -use ntex::codec::{Decoder, Encoder}; -use ntex::util::{BytesMut, HashSet, PoolId, PoolRef}; -use ntex::{channel::pool, io::IoRef}; +use ntex_bytes::{BytesMut, PoolId, PoolRef}; +use ntex_codec::{Decoder, Encoder}; +use ntex_io::IoRef; +use ntex_util::{channel::pool, HashSet}; use crate::{error, error::SendPacketError, types::packet_type, v5::codec, QoS}; diff --git a/src/v5/sink.rs b/src/v5/sink.rs index ab0ca4c..da6de4d 100644 --- a/src/v5/sink.rs +++ b/src/v5/sink.rs @@ -1,6 +1,7 @@ use std::{fmt, future::ready, future::Future, num::NonZeroU16, num::NonZeroU32, rc::Rc}; -use ntex::util::{ByteString, Bytes, Either, Ready}; +use ntex_bytes::{ByteString, Bytes}; +use ntex_util::future::{Either, Ready}; use super::{ codec, codec::EncodeLtd, error::SendPacketError, shared::AckType, shared::MqttShared, diff --git a/src/version.rs b/src/version.rs index e780f56..5f0b75b 100644 --- a/src/version.rs +++ b/src/version.rs @@ -1,5 +1,5 @@ -use ntex::codec::{Decoder, Encoder}; -use ntex::util::BytesMut; +use ntex_bytes::BytesMut; +use ntex_codec::{Decoder, Encoder}; use crate::error::{DecodeError, EncodeError}; use crate::types::{packet_type, MQTT, MQTT_LEVEL_3, MQTT_LEVEL_5}; diff --git a/tests/test_server.rs b/tests/test_server.rs index d7eddd0..cb2a9a1 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -559,7 +559,7 @@ async fn test_large_publish_openssl() -> std::io::Result<()> { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let con = Pipeline::new(ntex::connect::openssl::Connector::new(builder.build())); + let con = Pipeline::new(ntex::connect::openssl::SslConnector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = con.call(addr.into()).await.unwrap();