blamaybe
Some checks failed
moparisthebest/xmpp-proxy/pipeline/head There was a failure building this commit

This commit is contained in:
Travis Burtrum 2024-09-18 01:16:19 -04:00
parent 9644f88144
commit cf0eac9e0b
Signed by: moparisthebest
GPG Key ID: 88C93BFE27BC8229
3 changed files with 38 additions and 7 deletions

21
Cargo.lock generated
View File

@ -914,6 +914,26 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pin-project"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.13"
@ -2270,6 +2290,7 @@ dependencies = [
"lazy_static",
"log",
"nix",
"pin-project",
"quinn",
"rand",
"reqwest",

View File

@ -77,6 +77,7 @@ nix = { version = "0.27", optional = true, default-features = false, features =
# tokio-xmpp if you want a ServerConnector impl
tokio-xmpp = { version = "4.0", optional = true, default-features = false }
tokio-util = { version = "0.7", optional = true, features = ["codec"] }
pin-project = "1" # todo: optional?
[features]
default = ["c2s-incoming", "c2s-outgoing", "s2s-incoming", "s2s-outgoing", "tls", "quic", "websocket", "webtransport", "logging", "tls-ca-roots-native", "systemd", "tokio-xmpp"]

View File

@ -139,8 +139,7 @@ pub struct StanzaStream {
wr: StanzaWrite,
rd: StanzaRead,
fut_next_stanza: Option<u64>,
fut_next_stanza: Option<Pin<Box<dyn Future<Output = Result<Option<(&'static [u8], usize)>>> + Send>>>,
send_stream_open: bool,
stream_open: Vec<u8>,
@ -194,6 +193,10 @@ impl StanzaStream {
}
}
pub async fn next_stanza_vec(&mut self) -> Result<Option<(Vec<u8>, usize)>> {
self.next_stanza().await.map(|o| o.map(|(v, u)| (v.to_vec(), u)))
}
pub async fn next_stanza<'a>(&'a mut self) -> Result<Option<(&'a [u8], usize)>> {
if self.send_stream_open {
self.send_stream_open = false;
@ -246,10 +249,16 @@ impl AsyncRead for StanzaStream {
fn poll_read(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>) -> std::task::Poll<std::io::Result<()>> {
// todo: instead of waiting for a whole stanza, if self is AsyncRead, we could go directly to that and skip stanzafilter, problem is this would break Stream::poll_next and XmppStream::next_stanza, so maybe we need a different struct to do that?
// todo: instead of using our StanzaFilter and copying bytes from it, we could make one out of the buf?
let future = self.next_stanza();
// self.fut_next_stanza = Some(future);
let future = std::pin::pin!(future);
match future.poll(cx) {
let mut future = match self.fut_next_stanza.take() {
Some(fut) => fut,
None => {
let this: &'static mut std::pin::Pin<&mut Self> = unsafe { std::mem::transmute(&mut self) };
let future = this.next_stanza();
Box::pin(future)
}
};
match future.as_mut().poll(cx) {
std::task::Poll::Ready(res) => {
if let Some((stanza, _)) = res.map_err(|e| IoError::other(e))? {
if stanza.len() >= buf.remaining() {
@ -260,7 +269,7 @@ impl AsyncRead for StanzaStream {
return Poll::Ready(Ok(()));
}
std::task::Poll::Pending => {
// self.fut_next_stanza = Some(future);
self.fut_next_stanza = Some(future);
std::task::Poll::Pending
}
}