Skip to content

Commit

Permalink
RUST-1983 Cherry-picks for 3.0.1 (#1164)
Browse files Browse the repository at this point in the history
  • Loading branch information
abr-egn authored Jul 9, 2024
1 parent 7bc5b39 commit 339d366
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 23 deletions.
15 changes: 8 additions & 7 deletions src/action/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<'a, Session, T> Aggregate<'a, Session, T> {
);
}

impl<'a> Aggregate<'a, ImplicitSession> {
impl<'a, T> Aggregate<'a, ImplicitSession, T> {
/// Use the provided session when running the operation.
pub fn session(
self,
Expand All @@ -144,7 +144,7 @@ impl<'a> Aggregate<'a, ImplicitSession> {
}
}

impl<'a, Session> Aggregate<'a, Session, Document> {
impl<'a, Session, T> Aggregate<'a, Session, T> {
/// Use the provided type for the returned cursor.
///
/// ```rust
Expand All @@ -167,7 +167,7 @@ impl<'a, Session> Aggregate<'a, Session, Document> {
/// # Ok(())
/// # }
/// ```
pub fn with_type<T>(self) -> Aggregate<'a, Session, T> {
pub fn with_type<U>(self) -> Aggregate<'a, Session, U> {
Aggregate {
target: self.target,
pipeline: self.pipeline,
Expand Down Expand Up @@ -199,11 +199,11 @@ impl<'a, T> Action for Aggregate<'a, ImplicitSession, T> {
}
}

#[action_impl(sync = crate::sync::SessionCursor<Document>)]
impl<'a> Action for Aggregate<'a, ExplicitSession<'a>> {
#[action_impl(sync = crate::sync::SessionCursor<T>)]
impl<'a, T> Action for Aggregate<'a, ExplicitSession<'a>, T> {
type Future = AggregateSessionFuture;

async fn execute(mut self) -> Result<SessionCursor<Document>> {
async fn execute(mut self) -> Result<SessionCursor<T>> {
resolve_read_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
resolve_write_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
resolve_selection_criteria_with_session!(
Expand All @@ -218,8 +218,9 @@ impl<'a> Action for Aggregate<'a, ExplicitSession<'a>> {
self.options,
);
let client = self.target.client();
let session = self.session;
client
.execute_session_cursor_operation(aggregate, self.session.0)
.execute_session_cursor_operation(aggregate, session.0)
.await
}
}
Expand Down
35 changes: 20 additions & 15 deletions src/action/watch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;
use std::{marker::PhantomData, time::Duration};

use bson::{Bson, Document, Timestamp};
use serde::de::DeserializeOwned;

use super::{action_impl, deeplink, option_setters, ExplicitSession, ImplicitSession};
use crate::{
Expand Down Expand Up @@ -96,11 +97,11 @@ where
/// Change streams require either a "majority" read concern or no read concern. Anything else
/// will cause a server error.
///
/// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
/// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
/// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<T>>>`] or
/// d[`Result<SessionChangeStream<ChangeStreamEvent<T>>>`] if a
/// [`ClientSession`] has been provided.
#[deeplink]
pub fn watch(&self) -> Watch {
pub fn watch(&self) -> Watch<T> {
Watch::new(self.client(), self.namespace().into())
}
}
Expand Down Expand Up @@ -153,24 +154,25 @@ where
///
/// Change streams require either a "majority" read concern or no read concern. Anything else
/// will cause a server error.
pub fn watch(&self) -> Watch {
pub fn watch(&self) -> Watch<T> {
self.async_collection.watch()
}
}

/// Starts a new [`ChangeStream`] that receives events for all changes in a given scope. Create by
/// calling [`Client::watch`], [`Database::watch`], or [`Collection::watch`].
#[must_use]
pub struct Watch<'a, S = ImplicitSession> {
pub struct Watch<'a, T = Document, S = ImplicitSession> {
client: &'a Client,
target: AggregateTarget,
pipeline: Vec<Document>,
options: Option<ChangeStreamOptions>,
session: S,
cluster: bool,
phantom: PhantomData<fn() -> T>,
}

impl<'a> Watch<'a, ImplicitSession> {
impl<'a, T> Watch<'a, T, ImplicitSession> {
fn new(client: &'a Client, target: AggregateTarget) -> Self {
Self {
client,
Expand All @@ -179,6 +181,7 @@ impl<'a> Watch<'a, ImplicitSession> {
options: None,
session: ImplicitSession,
cluster: false,
phantom: PhantomData,
}
}

Expand All @@ -190,6 +193,7 @@ impl<'a> Watch<'a, ImplicitSession> {
options: None,
session: ImplicitSession,
cluster: true,
phantom: PhantomData,
}
}
}
Expand Down Expand Up @@ -235,28 +239,29 @@ impl<'a, S> Watch<'a, S> {
);
}

impl<'a> Watch<'a, ImplicitSession> {
impl<'a, T> Watch<'a, T, ImplicitSession> {
/// Use the provided ['ClientSession'].
pub fn session<'s>(
self,
session: impl Into<&'s mut ClientSession>,
) -> Watch<'a, ExplicitSession<'s>> {
) -> Watch<'a, T, ExplicitSession<'s>> {
Watch {
client: self.client,
target: self.target,
pipeline: self.pipeline,
options: self.options,
session: ExplicitSession(session.into()),
cluster: self.cluster,
phantom: PhantomData,
}
}
}

#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<Document>>)]
impl<'a> Action for Watch<'a, ImplicitSession> {
#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<T>>)]
impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ImplicitSession> {
type Future = WatchFuture;

async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<T>>> {
resolve_options!(
self.client,
self.options,
Expand All @@ -273,11 +278,11 @@ impl<'a> Action for Watch<'a, ImplicitSession> {
}
}

#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<Document>>)]
impl<'a> Action for Watch<'a, ExplicitSession<'a>> {
#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<T>>)]
impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ExplicitSession<'a>> {
type Future = WatchSessionFuture;

async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<T>>> {
resolve_read_concern_with_session!(self.client, self.options, Some(&mut *self.session.0))?;
resolve_selection_criteria_with_session!(
self.client,
Expand Down
10 changes: 10 additions & 0 deletions src/test/change_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,13 @@ async fn split_large_event() -> Result<()> {

Ok(())
}

// Regression test: `Collection::watch` uses the type parameter. This is not flagged as a test to
// run because it's just asserting that this compiles.
#[allow(unreachable_code, unused_variables, clippy::diverging_sub_expression)]
async fn _collection_watch_typed() {
let coll: Collection<bson::RawDocumentBuf> = unimplemented!();
let mut stream = coll.watch().await.unwrap();
let _: Option<crate::error::Result<ChangeStreamEvent<bson::RawDocumentBuf>>> =
stream.next().await;
}
21 changes: 20 additions & 1 deletion src/test/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,26 @@ async fn aggregate_with_generics() {
let _: Cursor<Document> = database.aggregate(pipeline.clone()).await.unwrap();

// Assert that data is properly deserialized when using with_type
let mut cursor = database.aggregate(pipeline).with_type::<A>().await.unwrap();
let mut cursor = database
.aggregate(pipeline.clone())
.with_type::<A>()
.await
.unwrap();
assert!(cursor.advance().await.unwrap());
assert_eq!(&cursor.deserialize_current().unwrap().str, "hi");

// Assert that `with_type` can be used with an explicit session.
let mut session = client.start_session().await.unwrap();
let _ = database
.aggregate(pipeline.clone())
.session(&mut session)
.with_type::<A>()
.await
.unwrap();
let _ = database
.aggregate(pipeline.clone())
.with_type::<A>()
.session(&mut session)
.await
.unwrap();
}

0 comments on commit 339d366

Please sign in to comment.