From fe927b7c36ab1af58d2cb8af4e1945a03587ca1a Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Fri, 24 Apr 2020 23:11:57 +0500 Subject: [PATCH 01/10] Add ext trait --- src/futures/mod.rs | 3 + src/futures/try_sink.rs | 239 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 src/futures/try_sink.rs diff --git a/src/futures/mod.rs b/src/futures/mod.rs index 3f0ded34..ef22ff28 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -8,9 +8,12 @@ //! [feature flag]: crate::guide::feature_flags pub mod try_future; +pub mod try_sink; pub mod try_stream; #[doc(inline)] pub use self::try_future::TryFutureExt; #[doc(inline)] +pub use self::try_sink::SnafuSinkExt; +#[doc(inline)] pub use self::try_stream::TryStreamExt; diff --git a/src/futures/try_sink.rs b/src/futures/try_sink.rs new file mode 100644 index 00000000..9ab89c63 --- /dev/null +++ b/src/futures/try_sink.rs @@ -0,0 +1,239 @@ +//! Additions to the [`TryStream`] trait. +//! +//! [`TryStream`]: futures_core_crate::TryStream + +use crate::{Error, ErrorCompat, IntoError, ResultExt}; +use core::{ + marker::PhantomData, + pin::Pin, + task::{Context as TaskContext, Poll}, +}; +use futures_crate::{Sink, SinkExt}; +use pin_project::pin_project; + +/// Additions to [`SinkExt`]. +pub trait SnafuSinkExt: SinkExt + Sized { + /// Extend a [`SinkExt`]'s error with additional context-sensitive + /// information. + /// + /// ```rust + /// # use futures_crate as futures; + /// use futures::Sink; + /// # use futures::sink; + /// use snafu::{futures::SnafuSinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// transactions().context(Authenticating { + /// user_name: "admin", + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn transactions() -> impl Sink { + /// /* ... */ + /// # sink::drain() + /// } + /// ``` + /// + /// Note that the context selector will call [`Into::into`] on + /// each field, so the types are not required to exactly match. + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat; + + /// Extend a [`SinkExt`]'s error with lazily-generated + /// context-sensitive information. + /// + /// ```rust + /// # use futures_crate as futures; + /// use futures::Sink; + /// # use futures::sink; + /// use snafu::{futures::SnafuSinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// transactions().with_context(|| Authenticating { + /// user_name: "admin", + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn transactions() -> impl Sink { + /// /* ... */ + /// # sink::drain() + /// } + /// ``` + /// + /// Note that this *may not* be needed in many cases because the + /// context selector will call [`Into::into`] on each field. + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat; +} + +impl SnafuSinkExt for Si +where + Si: Sink, +{ + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat, + { + Context { + inner: self, + context, + _e: Default::default(), + } + } + + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, + { + WithContext { + inner: self, + context, + _e: PhantomData, + } + } +} + +/// Sink for the [`context`](SnafuStreamExt::context) combinator. +/// +/// See the [`SnafuStreamExt::context`] method for more details. +#[pin_project] +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Context { + #[pin] + inner: Si, + context: C, + _e: PhantomData, +} + +impl Sink for Context +where + Sk: SinkExt, + C: IntoError + Clone, + E: Error + ErrorCompat, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner + .poll_ready(ctx) + .map_err(|e| context.clone().into_error(e)) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.start_send(item).context(context.clone()) + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner + .poll_flush(ctx) + .map_err(|e| context.clone().into_error(e)) + } + + fn poll_close(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner + .poll_close(ctx) + .map_err(|e| context.clone().into_error(e)) + } +} + +/// Sink for the [`with_context`](SnafuSinkExt::with_context) combinator. +/// +/// See the [`SnafuSinkExt::with_context`] method for more details. +#[pin_project] +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct WithContext { + #[pin] + inner: St, + context: F, + _e: PhantomData, +} + +impl Sink for WithContext +where + Si: Sink, + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.poll_ready(ctx).map_err(|e| context().into_error(e)) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.start_send(item).with_context(context) + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.poll_flush(ctx).map_err(|e| context().into_error(e)) + } + + fn poll_close(self: Pin<&mut Self>, ctx: &mut TaskContext) -> Poll> { + let this = self.project(); + let inner = this.inner; + let context = this.context; + + inner.poll_close(ctx).map_err(|e| context().into_error(e)) + } +} From 0a80abaf202c17e617f73c4ea7c4ab148eee7105 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Fri, 24 Apr 2020 23:35:15 +0500 Subject: [PATCH 02/10] Feature gate ext trait --- Cargo.toml | 3 +++ src/futures/mod.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index f616a5e7..70d30308 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,9 @@ futures-01 = ["futures-01-crate"] # The standard library's implementation of futures futures = ["futures-core-crate", "pin-project"] +# Add extension trait for the SinkExt trait +sink = ["futures-crate"] + # No public user should make use of this feature # https://github.com/rust-lang/cargo/issues/1596 "internal-dev-dependencies" = ["futures-crate"] diff --git a/src/futures/mod.rs b/src/futures/mod.rs index ef22ff28..2b66b7c1 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -8,11 +8,13 @@ //! [feature flag]: crate::guide::feature_flags pub mod try_future; +#[cfg(feature = "sink")] pub mod try_sink; pub mod try_stream; #[doc(inline)] pub use self::try_future::TryFutureExt; +#[cfg(feature = "sink")] #[doc(inline)] pub use self::try_sink::SnafuSinkExt; #[doc(inline)] From f2c6ea288b714f81e2b8ccef393d53281e852a6b Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Sat, 25 Apr 2020 00:15:31 +0500 Subject: [PATCH 03/10] Add tests --- compatibility-tests/futures/Cargo.toml | 2 +- compatibility-tests/futures/src/lib.rs | 48 +++++++++++++++++++++++--- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/compatibility-tests/futures/Cargo.toml b/compatibility-tests/futures/Cargo.toml index 532a9152..d52693d9 100644 --- a/compatibility-tests/futures/Cargo.toml +++ b/compatibility-tests/futures/Cargo.toml @@ -5,5 +5,5 @@ authors = ["Jake Goulding "] edition = "2018" [dependencies] -snafu = { path = "../..", features = ["futures"] } +snafu = { path = "../..", features = ["futures", "sink"] } futures = "0.3.0" diff --git a/compatibility-tests/futures/src/lib.rs b/compatibility-tests/futures/src/lib.rs index 83f0ba55..0ba6585a 100644 --- a/compatibility-tests/futures/src/lib.rs +++ b/compatibility-tests/futures/src/lib.rs @@ -1,14 +1,21 @@ #![cfg(test)] mod api { - use futures::{stream, StreamExt, TryStream}; + use futures::{sink, stream, Sink, SinkExt, StreamExt, TryStream}; use snafu::Snafu; + use std::convert::Infallible; - #[derive(Debug, Snafu)] + #[derive(Debug, Clone, Snafu)] pub enum Error { InvalidUrl { url: String }, } + impl From for Error { + fn from(e: Infallible) -> Self { + match e {} + } + } + pub async fn fetch_page(url: &str) -> Result { InvalidUrl { url }.fail() } @@ -16,21 +23,33 @@ mod api { pub fn keep_fetching_page<'u>(url: &'u str) -> impl TryStream + 'u { stream::repeat(()).then(move |_| fetch_page(url)) } + + pub async fn upload_str(url: &str, _: &str) -> Result { + InvalidUrl { url }.fail() + } + + pub fn upload<'u>(url: &'u str) -> impl Sink + 'u { + sink::drain().with(move |s: String| async move { upload_str(url, &s).await }) + } } +use futures::future::ok; use futures::{ future, - stream::{StreamExt as _, TryStreamExt as _}, + stream::{self, StreamExt as _, TryStreamExt as _}, + SinkExt, }; use snafu::{ - futures::{TryFutureExt as _, TryStreamExt as _}, + futures::{SnafuSinkExt as _, TryFutureExt as _, TryStreamExt as _}, ResultExt, Snafu, }; -#[derive(Debug, Snafu)] +#[derive(Debug, Clone, Snafu)] enum Error { UnableToLoadAppleStock { source: api::Error }, UnableToLoadGoogleStock { source: api::Error, name: String }, + UnableToUploadApple { source: api::Error }, + UnableToUploadGoogle { source: api::Error, name: String }, } // Normal `Result` code works with `await` @@ -97,6 +116,22 @@ async fn load_stock_data_series() -> Result { .await } +// Can be used as a `SinkExt` combinator +async fn upload_strings() -> Result<(), Error> { + let apple = api::upload("apple").context(UnableToUploadApple); + let google = api::upload("google").with_context(|| UnableToUploadGoogle { + name: String::from("sink"), + }); + + let together = apple.fanout(google); + + stream::repeat(Ok("str".to_owned())) + .take(10) + .forward(together) + .await?; + Ok(()) +} + #[test] fn implements_error() { fn check() {} @@ -115,4 +150,7 @@ fn implements_error() { let d = block_on(load_stock_data_series()); d.unwrap_err(); + + let d = block_on(upload_strings()); + d.unwrap_err(); } From 318259d68fe7bbcf9d88da6d58c2139028af8495 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 13:25:46 +0500 Subject: [PATCH 04/10] Rename try_sink.rs to sink.rs --- src/futures/mod.rs | 4 ++-- src/futures/{try_sink.rs => sink.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename src/futures/{try_sink.rs => sink.rs} (100%) diff --git a/src/futures/mod.rs b/src/futures/mod.rs index 2b66b7c1..7450b993 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -9,13 +9,13 @@ pub mod try_future; #[cfg(feature = "sink")] -pub mod try_sink; +pub mod sink; pub mod try_stream; #[doc(inline)] pub use self::try_future::TryFutureExt; #[cfg(feature = "sink")] #[doc(inline)] -pub use self::try_sink::SnafuSinkExt; +pub use self::sink::SnafuSinkExt; #[doc(inline)] pub use self::try_stream::TryStreamExt; diff --git a/src/futures/try_sink.rs b/src/futures/sink.rs similarity index 100% rename from src/futures/try_sink.rs rename to src/futures/sink.rs From 6f5b7f760f7561e847d496657e020a86d3af49b0 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 16:11:05 +0500 Subject: [PATCH 05/10] Add futures 0.1 ext trait --- src/futures01/mod.rs | 3 + src/futures01/sink.rs | 204 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 src/futures01/sink.rs diff --git a/src/futures01/mod.rs b/src/futures01/mod.rs index d638baa9..eac42cda 100644 --- a/src/futures01/mod.rs +++ b/src/futures01/mod.rs @@ -8,9 +8,12 @@ //! [feature flag]: crate::guide::feature_flags pub mod future; +pub mod sink; pub mod stream; #[doc(inline)] pub use self::future::FutureExt; #[doc(inline)] +pub use self::sink::SinkExt; +#[doc(inline)] pub use self::stream::StreamExt; diff --git a/src/futures01/sink.rs b/src/futures01/sink.rs new file mode 100644 index 00000000..96cfe42e --- /dev/null +++ b/src/futures01/sink.rs @@ -0,0 +1,204 @@ +//! Additions to the Futures 0.1 [`Sink`] trait. +//! +//! [`Sink`]: futures_01_crate::Sink + +use crate::{Error, ErrorCompat, IntoError, ResultExt}; +use core::marker::PhantomData; +use futures_01_crate::{Async, AsyncSink, Sink}; + +/// Additions to [`Sink`]. +pub trait SinkExt: Sink + Sized { + /// Extend a [`Sink`]'s error with additional context-sensitive + /// information. + /// + /// [`Sink`]: futures_01_crate::Sink] + /// + /// ```rust + /// # use futures_01_crate as futures; + /// use futures::Sink; + /// use snafu::{futures01::SinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// stock_prices().context(Authenticating { + /// user_name: "admin", + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = Box; + /// fn stock_prices() -> impl Sink { + /// /* ... */ + /// # Vec::new() + /// } + /// ``` + /// + /// Note that the context selector will call [`Into::into`] on + /// each field, so the types are not required to exactly match. + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat; + + /// Extend a [`Sink`]'s error with lazily-generated context-sensitive + /// information. + /// + /// [`Sink`]: futures_01_crate::Sink] + /// + /// ```rust + /// # use futures_01_crate as futures; + /// use futures::Sink; + /// # use futures::stream; + /// use snafu::{futures01::SinkExt, Snafu}; + /// + /// #[derive(Debug, Snafu)] + /// enum Error { + /// Authenticating { + /// user_name: String, + /// user_id: i32, + /// source: ApiError, + /// }, + /// } + /// + /// fn example() -> impl Sink { + /// stock_prices().with_context(|| Authenticating { + /// user_name: "admin".to_string(), + /// user_id: 42, + /// }) + /// } + /// + /// # type ApiError = std::io::Error; + /// fn stock_prices() -> impl Sink { + /// /* ... */ + /// # stream::empty() + /// } + /// ``` + /// + /// Note that this *may not* be needed in many cases because the + /// context selector will call [`Into::into`] on each field. + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat; +} + +impl SinkExt for St +where + St: Sink, +{ + fn context(self, context: C) -> Context + where + C: IntoError + Clone, + E: Error + ErrorCompat, + { + Context { + sink: self, + context, + _e: PhantomData, + } + } + + fn with_context(self, context: F) -> WithContext + where + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, + { + WithContext { + sink: self, + context, + _e: PhantomData, + } + } +} + +/// Sink for the [`context`](SinkExt::context) combinator. +/// +/// See the [`SinkExt::context`] method for more details. +pub struct Context { + sink: Si, + context: C, + _e: PhantomData, +} + +impl Sink for Context +where + Si: Sink, + C: IntoError + Clone, + E: Error + ErrorCompat, +{ + type SinkItem = Si::SinkItem; + type SinkError = E; + + // fn poll(&mut self) -> Result>, Self::SinkError> { + // self.sink + // .poll() + // .map_err(|error| self.context.clone().into_error(error)) + // } + + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> Result, Self::SinkError> { + self.sink.start_send(item).context(self.context.clone()) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.sink.poll_complete().context(self.context.clone()) + } + + fn close(&mut self) -> Result, Self::SinkError> { + self.sink.close().context(self.context.clone()) + } +} + +/// Sink for the [`with_context`](SinkExt::with_context) combinator. +/// +/// See the [`SinkExt::with_context`] method for more details. +pub struct WithContext { + sink: Si, + context: F, + _e: PhantomData, +} + +impl Sink for WithContext +where + St: Sink, + F: FnMut() -> C, + C: IntoError, + E: Error + ErrorCompat, +{ + type SinkItem = St::SinkItem; + type SinkError = E; + + // fn poll(&mut self) -> Result>, Self::SinkError> { + // self.sink.poll().map_err(|error| { + // let context = &mut self.context; + // context().into_error(error) + // }) + // } + + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> Result, Self::SinkError> { + self.sink.start_send(item).with_context(|| (self.context)()) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.sink.poll_complete().with_context(|| (self.context)()) + } + + fn close(&mut self) -> Result, Self::SinkError> { + self.sink.close().with_context(|| (self.context)()) + } +} From adc3d8450066fd90a5bdf8f366635ecbb611692f Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 16:11:19 +0500 Subject: [PATCH 06/10] Add futures 0.1 tests --- compatibility-tests/futures-0.1/src/lib.rs | 41 ++++++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/compatibility-tests/futures-0.1/src/lib.rs b/compatibility-tests/futures-0.1/src/lib.rs index c13a933d..316a57ef 100644 --- a/compatibility-tests/futures-0.1/src/lib.rs +++ b/compatibility-tests/futures-0.1/src/lib.rs @@ -1,12 +1,19 @@ #![cfg(test)] mod api { - use futures::{future, stream, Future, Stream}; + use futures::{future, stream, Future, Stream, Sink}; use snafu::Snafu; #[derive(Debug, Snafu)] pub enum Error { InvalidUrl { url: String }, + InfallibleFailed, + } + + impl From<()> for Error { + fn from(_: ()) -> Self { + Error::InfallibleFailed + } } pub fn fetch_page(url: &str) -> impl Future { @@ -16,11 +23,19 @@ mod api { pub fn keep_fetching_page<'u>(url: &'u str) -> impl Stream + 'u { stream::repeat::<_, ()>(()).then(move |_| fetch_page(url)) } + + pub fn upload_str(url: &str, _: &str) -> Result { + InvalidUrl { url }.fail() + } + + pub fn upload<'u>(url: &'u str) -> impl Sink + 'u { + Vec::new().with(move |s: String| { upload_str(url, &s) }) + } } -use futures::{future, Future, Stream}; +use futures::{future, stream, Future, Stream, Sink}; use snafu::{ - futures01::{future::FutureExt as _, stream::StreamExt as _}, + futures01::{future::FutureExt as _, stream::StreamExt as _, sink::SinkExt as _}, Snafu, }; @@ -28,6 +43,8 @@ use snafu::{ enum Error { UnableToLoadAppleStock { source: api::Error }, UnableToLoadGoogleStock { source: api::Error, name: String }, + UnableToUploadApple { source: api::Error }, + UnableToUploadGoogle { source: api::Error, name: String }, } // Can be used as a `Future` combinator @@ -59,6 +76,21 @@ fn load_stock_data_series() -> impl Future { }) } +// Can be used as a `SinkExt` combinator +fn upload_strings() -> impl Future { + let apple = api::upload("apple").context(UnableToUploadApple); + let google = api::upload("google").with_context(|| UnableToUploadGoogle { + name: String::from("sink"), + }); + + let together = apple.fanout(google); + + stream::repeat("str".to_owned()) + .take(10) + .forward(together) + .map(|_| ()) +} + #[test] fn implements_error() { fn check() {} @@ -69,4 +101,7 @@ fn implements_error() { let c = load_stock_data_series().wait(); c.unwrap_err(); + + let d = upload_strings().wait(); + d.unwrap_err(); } From 18d5d669c88fffabed4618783f45e1664ba5316e Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 16:11:26 +0500 Subject: [PATCH 07/10] fmt --- src/futures/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/futures/mod.rs b/src/futures/mod.rs index 7450b993..68341b69 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -7,15 +7,15 @@ //! [`TryStream`]: futures_core_crate::TryStream //! [feature flag]: crate::guide::feature_flags -pub mod try_future; #[cfg(feature = "sink")] pub mod sink; +pub mod try_future; pub mod try_stream; -#[doc(inline)] -pub use self::try_future::TryFutureExt; #[cfg(feature = "sink")] #[doc(inline)] pub use self::sink::SnafuSinkExt; #[doc(inline)] +pub use self::try_future::TryFutureExt; +#[doc(inline)] pub use self::try_stream::TryStreamExt; From f28f0e6ae1cf658eb2e7bb86868cb4058c181e98 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 16:19:42 +0500 Subject: [PATCH 08/10] Cleanup --- src/futures01/sink.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/futures01/sink.rs b/src/futures01/sink.rs index 96cfe42e..40ba16dd 100644 --- a/src/futures01/sink.rs +++ b/src/futures01/sink.rs @@ -56,7 +56,6 @@ pub trait SinkExt: Sink + Sized { /// ```rust /// # use futures_01_crate as futures; /// use futures::Sink; - /// # use futures::stream; /// use snafu::{futures01::SinkExt, Snafu}; /// /// #[derive(Debug, Snafu)] @@ -78,7 +77,7 @@ pub trait SinkExt: Sink + Sized { /// # type ApiError = std::io::Error; /// fn stock_prices() -> impl Sink { /// /* ... */ - /// # stream::empty() + /// # Vec::new() /// } /// ``` /// @@ -139,12 +138,6 @@ where type SinkItem = Si::SinkItem; type SinkError = E; - // fn poll(&mut self) -> Result>, Self::SinkError> { - // self.sink - // .poll() - // .map_err(|error| self.context.clone().into_error(error)) - // } - fn start_send( &mut self, item: Self::SinkItem, @@ -180,13 +173,6 @@ where type SinkItem = St::SinkItem; type SinkError = E; - // fn poll(&mut self) -> Result>, Self::SinkError> { - // self.sink.poll().map_err(|error| { - // let context = &mut self.context; - // context().into_error(error) - // }) - // } - fn start_send( &mut self, item: Self::SinkItem, From 9009d6c2d01cc7bb0859deba1ddd042a62ee8727 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 17:17:11 +0500 Subject: [PATCH 09/10] Add dev dependency on futures 0.1 crate to be able to use Sink impl for Vec in doc examples --- Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 70d30308..42e7da15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,3 +66,6 @@ futures-01-crate = { package = "futures", version = "0.1", optional = true, defa futures-crate = { package = "futures", version = "0.3.0", optional = true, default-features = false } futures-core-crate = { package = "futures-core", version = "0.3.0", optional = true, default-features = false } pin-project = { version = "0.4", optional = true, default-features = false } + +[dev-dependencies] +futures-01-crate = { package = "futures", version = "0.1" } From f9a5dd8551ff6c567f4b9b0864b903eeb3248e40 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimov Date: Thu, 30 Apr 2020 17:17:23 +0500 Subject: [PATCH 10/10] Fix doc examples --- src/futures/sink.rs | 8 ++++---- src/futures01/sink.rs | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/futures/sink.rs b/src/futures/sink.rs index 9ab89c63..096b1a3a 100644 --- a/src/futures/sink.rs +++ b/src/futures/sink.rs @@ -18,7 +18,7 @@ pub trait SnafuSinkExt: SinkExt + Sized { /// /// ```rust /// # use futures_crate as futures; - /// use futures::Sink; + /// use futures::{Sink, SinkExt}; /// # use futures::sink; /// use snafu::{futures::SnafuSinkExt, Snafu}; /// @@ -41,7 +41,7 @@ pub trait SnafuSinkExt: SinkExt + Sized { /// # type ApiError = Box; /// fn transactions() -> impl Sink { /// /* ... */ - /// # sink::drain() + /// # sink::drain().sink_err_into() /// } /// ``` /// @@ -57,7 +57,7 @@ pub trait SnafuSinkExt: SinkExt + Sized { /// /// ```rust /// # use futures_crate as futures; - /// use futures::Sink; + /// use futures::{Sink, SinkExt}; /// # use futures::sink; /// use snafu::{futures::SnafuSinkExt, Snafu}; /// @@ -80,7 +80,7 @@ pub trait SnafuSinkExt: SinkExt + Sized { /// # type ApiError = Box; /// fn transactions() -> impl Sink { /// /* ... */ - /// # sink::drain() + /// # sink::drain().sink_err_into() /// } /// ``` /// diff --git a/src/futures01/sink.rs b/src/futures01/sink.rs index 40ba16dd..0274146f 100644 --- a/src/futures01/sink.rs +++ b/src/futures01/sink.rs @@ -37,7 +37,7 @@ pub trait SinkExt: Sink + Sized { /// # type ApiError = Box; /// fn stock_prices() -> impl Sink { /// /* ... */ - /// # Vec::new() + /// # Vec::new().sink_map_err(|_| String::new()).sink_from_err() /// } /// ``` /// @@ -74,10 +74,10 @@ pub trait SinkExt: Sink + Sized { /// }) /// } /// - /// # type ApiError = std::io::Error; + /// # type ApiError = Box; /// fn stock_prices() -> impl Sink { /// /* ... */ - /// # Vec::new() + /// # Vec::new().sink_map_err(|_| String::new()).sink_from_err() /// } /// ``` ///