diff --git a/Cargo.lock b/Cargo.lock index e263ccd..5bf136d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -914,6 +914,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -2270,6 +2290,7 @@ dependencies = [ "lazy_static", "log", "nix", + "pin-project", "quinn", "rand", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 73d79be..7d6c981 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ nix = { version = "0.27", optional = true, default-features = false, features = # tokio-xmpp if you want a ServerConnector impl tokio-xmpp = { version = "4.0", optional = true, default-features = false } tokio-util = { version = "0.7", optional = true, features = ["codec"] } +pin-project = "1" # todo: optional? [features] default = ["c2s-incoming", "c2s-outgoing", "s2s-incoming", "s2s-outgoing", "tls", "quic", "websocket", "webtransport", "logging", "tls-ca-roots-native", "systemd", "tokio-xmpp"] diff --git a/src/in_out.rs b/src/in_out.rs index d411ef8..2ee5a8c 100644 --- a/src/in_out.rs +++ b/src/in_out.rs @@ -139,8 +139,7 @@ pub struct StanzaStream { wr: StanzaWrite, rd: StanzaRead, - fut_next_stanza: Option, - + fut_next_stanza: Option>> + Send>>>, send_stream_open: bool, stream_open: Vec, @@ -194,6 +193,10 @@ impl StanzaStream { } } + pub async fn next_stanza_vec(&mut self) -> Result, usize)>> { + self.next_stanza().await.map(|o| o.map(|(v, u)| (v.to_vec(), u))) + } + pub async fn next_stanza<'a>(&'a mut self) -> Result> { if self.send_stream_open { self.send_stream_open = false; @@ -246,10 +249,16 @@ impl AsyncRead for StanzaStream { fn poll_read(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>) -> std::task::Poll> { // todo: instead of waiting for a whole stanza, if self is AsyncRead, we could go directly to that and skip stanzafilter, problem is this would break Stream::poll_next and XmppStream::next_stanza, so maybe we need a different struct to do that? // todo: instead of using our StanzaFilter and copying bytes from it, we could make one out of the buf? - let future = self.next_stanza(); - // self.fut_next_stanza = Some(future); - let future = std::pin::pin!(future); - match future.poll(cx) { + let mut future = match self.fut_next_stanza.take() { + Some(fut) => fut, + None => { + let this: &'static mut std::pin::Pin<&mut Self> = unsafe { std::mem::transmute(&mut self) }; + let future = this.next_stanza(); + Box::pin(future) + } + }; + + match future.as_mut().poll(cx) { std::task::Poll::Ready(res) => { if let Some((stanza, _)) = res.map_err(|e| IoError::other(e))? { if stanza.len() >= buf.remaining() { @@ -260,7 +269,7 @@ impl AsyncRead for StanzaStream { return Poll::Ready(Ok(())); } std::task::Poll::Pending => { - // self.fut_next_stanza = Some(future); + self.fut_next_stanza = Some(future); std::task::Poll::Pending } }