diff --git a/Cargo.toml b/Cargo.toml index f616a5e7..42e7da15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,9 @@ futures-01 = ["futures-01-crate"] # The standard library's implementation of futures futures = ["futures-core-crate", "pin-project"] +# Add extension trait for the SinkExt trait +sink = ["futures-crate"] + # No public user should make use of this feature # https://github.com/rust-lang/cargo/issues/1596 "internal-dev-dependencies" = ["futures-crate"] @@ -63,3 +66,6 @@ futures-01-crate = { package = "futures", version = "0.1", optional = true, defa futures-crate = { package = "futures", version = "0.3.0", optional = true, default-features = false } futures-core-crate = { package = "futures-core", version = "0.3.0", optional = true, default-features = false } pin-project = { version = "0.4", optional = true, default-features = false } + +[dev-dependencies] +futures-01-crate = { package = "futures", version = "0.1" } diff --git a/compatibility-tests/futures-0.1/src/lib.rs b/compatibility-tests/futures-0.1/src/lib.rs index c13a933d..316a57ef 100644 --- a/compatibility-tests/futures-0.1/src/lib.rs +++ b/compatibility-tests/futures-0.1/src/lib.rs @@ -1,12 +1,19 @@ #![cfg(test)] mod api { - use futures::{future, stream, Future, Stream}; + use futures::{future, stream, Future, Stream, Sink}; use snafu::Snafu; #[derive(Debug, Snafu)] pub enum Error { InvalidUrl { url: String }, + InfallibleFailed, + } + + impl From<()> for Error { + fn from(_: ()) -> Self { + Error::InfallibleFailed + } } pub fn fetch_page(url: &str) -> impl Future { @@ -16,11 +23,19 @@ mod api { pub fn keep_fetching_page<'u>(url: &'u str) -> impl Stream + 'u { stream::repeat::<_, ()>(()).then(move |_| fetch_page(url)) } + + pub fn upload_str(url: &str, _: &str) -> Result { + InvalidUrl { url }.fail() + } + + pub fn upload<'u>(url: &'u str) -> impl Sink + 'u { + Vec::new().with(move |s: String| { upload_str(url, &s) }) + } } -use futures::{future, Future, Stream}; +use futures::{future, stream, Future, Stream, Sink}; use snafu::{ - futures01::{future::FutureExt as _, stream::StreamExt as _}, + futures01::{future::FutureExt as _, stream::StreamExt as _, sink::SinkExt as _}, Snafu, }; @@ -28,6 +43,8 @@ use snafu::{ enum Error { UnableToLoadAppleStock { source: api::Error }, UnableToLoadGoogleStock { source: api::Error, name: String }, + UnableToUploadApple { source: api::Error }, + UnableToUploadGoogle { source: api::Error, name: String }, } // Can be used as a `Future` combinator @@ -59,6 +76,21 @@ fn load_stock_data_series() -> impl Future { }) } +// Can be used as a `SinkExt` combinator +fn upload_strings() -> impl Future { + let apple = api::upload("apple").context(UnableToUploadApple); + let google = api::upload("google").with_context(|| UnableToUploadGoogle { + name: String::from("sink"), + }); + + let together = apple.fanout(google); + + stream::repeat("str".to_owned()) + .take(10) + .forward(together) + .map(|_| ()) +} + #[test] fn implements_error() { fn check() {} @@ -69,4 +101,7 @@ fn implements_error() { let c = load_stock_data_series().wait(); c.unwrap_err(); + + let d = upload_strings().wait(); + d.unwrap_err(); } diff --git a/compatibility-tests/futures/Cargo.toml b/compatibility-tests/futures/Cargo.toml index 532a9152..d52693d9 100644 --- a/compatibility-tests/futures/Cargo.toml +++ b/compatibility-tests/futures/Cargo.toml @@ -5,5 +5,5 @@ authors = ["Jake Goulding "] edition = "2018" [dependencies] -snafu = { path = "../..", features = ["futures"] } +snafu = { path = "../..", features = ["futures", "sink"] } futures = "0.3.0" diff --git a/compatibility-tests/futures/src/lib.rs b/compatibility-tests/futures/src/lib.rs index 83f0ba55..0ba6585a 100644 --- a/compatibility-tests/futures/src/lib.rs +++ b/compatibility-tests/futures/src/lib.rs @@ -1,14 +1,21 @@ #![cfg(test)] mod api { - use futures::{stream, StreamExt, TryStream}; + use futures::{sink, stream, Sink, SinkExt, StreamExt, TryStream}; use snafu::Snafu; + use std::convert::Infallible; - #[derive(Debug, Snafu)] + #[derive(Debug, Clone, Snafu)] pub enum Error { InvalidUrl { url: String }, } + impl From for Error { + fn from(e: Infallible) -> Self { + match e {} + } + } + pub async fn fetch_page(url: &str) -> Result { InvalidUrl { url }.fail() } @@ -16,21 +23,33 @@ mod api { pub fn keep_fetching_page<'u>(url: &'u str) -> impl TryStream + 'u { stream::repeat(()).then(move |_| fetch_page(url)) } + + pub async fn upload_str(url: &str, _: &str) -> Result { + InvalidUrl { url }.fail() + } + + pub fn upload<'u>(url: &'u str) -> impl Sink + 'u { + sink::drain().with(move |s: String| async move { upload_str(url, &s).await }) + } } +use futures::future::ok; use futures::{ future, - stream::{StreamExt as _, TryStreamExt as _}, + stream::{self, StreamExt as _, TryStreamExt as _}, + SinkExt, }; use snafu::{ - futures::{TryFutureExt as _, TryStreamExt as _}, + futures::{SnafuSinkExt as _, TryFutureExt as _, TryStreamExt as _}, ResultExt, Snafu, }; -#[derive(Debug, Snafu)] +#[derive(Debug, Clone, Snafu)] enum Error { UnableToLoadAppleStock { source: api::Error }, UnableToLoadGoogleStock { source: api::Error, name: String }, + UnableToUploadApple { source: api::Error }, + UnableToUploadGoogle { source: api::Error, name: String }, } // Normal `Result` code works with `await` @@ -97,6 +116,22 @@ async fn load_stock_data_series() -> Result { .await } +// Can be used as a `SinkExt` combinator +async fn upload_strings() -> Result<(), Error> { + let apple = api::upload("apple").context(UnableToUploadApple); + let google = api::upload("google").with_context(|| UnableToUploadGoogle { + name: String::from("sink"), + }); + + let together = apple.fanout(google); + + stream::repeat(Ok("str".to_owned())) + .take(10) + .forward(together) + .await?; + Ok(()) +} + #[test] fn implements_error() { fn check() {} @@ -115,4 +150,7 @@ fn implements_error() { let d = block_on(load_stock_data_series()); d.unwrap_err(); + + let d = block_on(upload_strings()); + d.unwrap_err(); } diff --git a/src/futures/mod.rs b/src/futures/mod.rs index 3f0ded34..68341b69 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -7,9 +7,14 @@ //! [`TryStream`]: futures_core_crate::TryStream //! [feature flag]: crate::guide::feature_flags +#[cfg(feature = "sink")] +pub mod sink; pub mod try_future; pub mod try_stream; +#[cfg(feature = "sink")] +#[doc(inline)] +pub use self::sink::SnafuSinkExt; #[doc(inline)] pub use self::try_future::TryFutureExt; #[doc(inline)] diff --git a/src/futures/sink.rs b/src/futures/sink.rs new file mode 100644 index 00000000..096b1a3a --- /dev/null +++ b/src/futures/sink.rs @@ -0,0 +1,239 @@ +//! Additions to the [`TryStream`] trait. +//! +//! [`TryStream`]: futures_core_crate::TryStream + +use crate::{Error, ErrorCompat, IntoError, ResultExt}; +use core::{ + marker::PhantomData, + pin::Pin, + task::{Context as TaskContext, Poll}, +}; +use futures_crate::{Sink, SinkExt}; +use pin_project::pin_project; + +/// Additions to [`SinkExt`]. +pub trait SnafuSinkExt: SinkExt + Sized { + /// Extend a [`SinkExt`]'s error with additional context-sensitive + /// information. + /// + /// ```rust + /// # use futures_crate as futures; + /// use futures::{Sink, SinkExt}; + /// # use futures::sink; + /// use snafu::{futures::SnafuSinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// transactions().context(Authenticating { + /// user_name: "admin", + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn transactions() -> impl Sink { + /// /* ... */ + /// # sink::drain().sink_err_into() + /// } + /// ``` + /// + /// Note that the context selector will call [`Into::into`] on + /// each field, so the types are not required to exactly match. + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat; + + /// Extend a [`SinkExt`]'s error with lazily-generated + /// context-sensitive information. + /// + /// ```rust + /// # use futures_crate as futures; + /// use futures::{Sink, SinkExt}; + /// # use futures::sink; + /// use snafu::{futures::SnafuSinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// transactions().with_context(|| Authenticating { + /// user_name: "admin", + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn transactions() -> impl Sink { + /// /* ... */ + /// # sink::drain().sink_err_into() + /// } + /// ``` + /// + /// Note that this *may not* be needed in many cases because the + /// context selector will call [`Into::into`] on each field. + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat; +} + +impl SnafuSinkExt for Si +where + Si: Sink, +{ + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat, + { + Context { + inner: self, + context, + _e: Default::default(), + } + } + + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, + { + WithContext { + inner: self, + context, + _e: PhantomData, + } + } +} + +/// Sink for the [`context`](SnafuStreamExt::context) combinator. +/// +/// See the [`SnafuStreamExt::context`] method for more details. +#[pin_project] +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Context { + #[pin] + inner: Si, + context: C, + _e: PhantomData, +} + +impl Sink for Context +where + Sk: SinkExt, + C: IntoError + Clone, + E: Error + ErrorCompat, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner + .poll_ready(ctx) + .map_err(|e| context.clone().into_error(e)) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.start_send(item).context(context.clone()) + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner + .poll_flush(ctx) + .map_err(|e| context.clone().into_error(e)) + } + + fn poll_close(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner + .poll_close(ctx) + .map_err(|e| context.clone().into_error(e)) + } +} + +/// Sink for the [`with_context`](SnafuSinkExt::with_context) combinator. +/// +/// See the [`SnafuSinkExt::with_context`] method for more details. +#[pin_project] +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct WithContext { + #[pin] + inner: St, + context: F, + _e: PhantomData, +} + +impl Sink for WithContext +where + Si: Sink, + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.poll_ready(ctx).map_err(|e| context().into_error(e)) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.start_send(item).with_context(context) + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.poll_flush(ctx).map_err(|e| context().into_error(e)) + } + + fn poll_close(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.poll_close(ctx).map_err(|e| context().into_error(e)) + } +} diff --git a/src/futures01/mod.rs b/src/futures01/mod.rs index d638baa9..eac42cda 100644 --- a/src/futures01/mod.rs +++ b/src/futures01/mod.rs @@ -8,9 +8,12 @@ //! [feature flag]: crate::guide::feature_flags pub mod future; +pub mod sink; pub mod stream; #[doc(inline)] pub use self::future::FutureExt; #[doc(inline)] +pub use self::sink::SinkExt; +#[doc(inline)] pub use self::stream::StreamExt; diff --git a/src/futures01/sink.rs b/src/futures01/sink.rs new file mode 100644 index 00000000..0274146f --- /dev/null +++ b/src/futures01/sink.rs @@ -0,0 +1,190 @@ +//! Additions to the Futures 0.1 [`Sink`] trait. +//! +//! [`Sink`]: futures_01_crate::Sink + +use crate::{Error, ErrorCompat, IntoError, ResultExt}; +use core::marker::PhantomData; +use futures_01_crate::{Async, AsyncSink, Sink}; + +/// Additions to [`Sink`]. +pub trait SinkExt: Sink + Sized { + /// Extend a [`Sink`]'s error with additional context-sensitive + /// information. + /// + /// [`Sink`]: futures_01_crate::Sink] + /// + /// ```rust + /// # use futures_01_crate as futures; + /// use futures::Sink; + /// use snafu::{futures01::SinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// stock_prices().context(Authenticating { + /// user_name: "admin", + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn stock_prices() -> impl Sink { + /// /* ... */ + /// # Vec::new().sink_map_err(|_| String::new()).sink_from_err() + /// } + /// ``` + /// + /// Note that the context selector will call [`Into::into`] on + /// each field, so the types are not required to exactly match. + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat; + + /// Extend a [`Sink`]'s error with lazily-generated context-sensitive + /// information. + /// + /// [`Sink`]: futures_01_crate::Sink] + /// + /// ```rust + /// # use futures_01_crate as futures; + /// use futures::Sink; + /// use snafu::{futures01::SinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// stock_prices().with_context(|| Authenticating { + /// user_name: "admin".to_string(), + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn stock_prices() -> impl Sink { + /// /* ... */ + /// # Vec::new().sink_map_err(|_| String::new()).sink_from_err() + /// } + /// ``` + /// + /// Note that this *may not* be needed in many cases because the + /// context selector will call [`Into::into`] on each field. + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat; +} + +impl SinkExt for St +where + St: Sink, +{ + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat, + { + Context { + sink: self, + context, + _e: PhantomData, + } + } + + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, + { + WithContext { + sink: self, + context, + _e: PhantomData, + } + } +} + +/// Sink for the [`context`](SinkExt::context) combinator. +/// +/// See the [`SinkExt::context`] method for more details. +pub struct Context { + sink: Si, + context: C, + _e: PhantomData, +} + +impl Sink for Context +where + Si: Sink, + C: IntoError + Clone, + E: Error + ErrorCompat, +{ + type SinkItem = Si::SinkItem; + type SinkError = E; + + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> Result, Self::SinkError> { + self.sink.start_send(item).context(self.context.clone()) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.sink.poll_complete().context(self.context.clone()) + } + + fn close(&mut self) -> Result, Self::SinkError> { + self.sink.close().context(self.context.clone()) + } +} + +/// Sink for the [`with_context`](SinkExt::with_context) combinator. +/// +/// See the [`SinkExt::with_context`] method for more details. +pub struct WithContext { + sink: Si, + context: F, + _e: PhantomData, +} + +impl Sink for WithContext +where + St: Sink, + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, +{ + type SinkItem = St::SinkItem; + type SinkError = E; + + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> Result, Self::SinkError> { + self.sink.start_send(item).with_context(|| (self.context)()) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.sink.poll_complete().with_context(|| (self.context)()) + } + + fn close(&mut self) -> Result, Self::SinkError> { + self.sink.close().with_context(|| (self.context)()) + } +}