diff --git a/Cargo.lock b/Cargo.lock index 3a55797..b36c8e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,12 +55,27 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.0.1" @@ -95,6 +110,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +[[package]] +name = "cpufeatures" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef" +dependencies = [ + "libc", +] + [[package]] name = "ct-logs" version = "0.8.0" @@ -116,6 +140,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8634d5e6139f7364a4e99bd718b2f511f2f25863146360e70909bc45a016290" +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "enum-as-inner" version = "0.3.3" @@ -141,6 +174,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -245,6 +284,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.3" @@ -285,6 +334,23 @@ dependencies = [ "winapi", ] +[[package]] +name = "http" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" + [[package]] name = "humantime" version = "2.1.0" @@ -302,6 +368,15 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "input_buffer" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" +dependencies = [ + "bytes", +] + [[package]] name = "instant" version = "0.1.10" @@ -329,6 +404,12 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + [[package]] name = "js-sys" version = "0.3.51" @@ -448,6 +529,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "openssl-probe" version = "0.1.4" @@ -485,6 +572,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -763,6 +870,19 @@ dependencies = [ "syn", ] +[[package]] +name = "sha-1" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81" +dependencies = [ + "block-buffer", + "cfg-if", + "cpufeatures", + "digest", + "opaque-debug", +] + [[package]] name = "slab" version = "0.4.3" @@ -886,6 +1006,19 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-tungstenite" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2" +dependencies = [ + "futures-util", + "log", + "pin-project", + "tokio", + "tungstenite", +] + [[package]] name = "toml" version = "0.5.8" @@ -972,6 +1105,32 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "rand", + "sha-1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" + [[package]] name = "unicode-bidi" version = "0.3.5" @@ -1020,6 +1179,18 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] +name = "version_check" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" @@ -1171,6 +1342,7 @@ dependencies = [ "serde_derive", "tokio", "tokio-rustls", + "tokio-tungstenite", "toml", "trust-dns-resolver", "webpki-roots", diff --git a/Cargo.toml b/Cargo.toml index ae73a7e..a2ca7cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,8 +45,11 @@ trust-dns-resolver = { version = "0.20", optional = true } # quic deps quinn = { version = "0.7", optional = true } +# websocket deps +tokio-tungstenite = { version = "0.14", optional = true } + [features] -default = ["incoming", "outgoing", "quic", "logging"] +default = ["incoming", "outgoing", "quic", "websocket", "logging"] #default = ["incoming", "outgoing"] #default = ["incoming", "quic"] #default = ["outgoing", "quic"] @@ -56,4 +59,5 @@ default = ["incoming", "outgoing", "quic", "logging"] incoming = ["tokio-rustls"] outgoing = ["tokio-rustls", "trust-dns-resolver", "webpki-roots", "lazy_static"] quic = ["quinn"] +websocket = ["tokio-tungstenite"] logging = ["rand", "env_logger"] diff --git a/README.md b/README.md index f207483..a292fcb 100644 --- a/README.md +++ b/README.md @@ -3,12 +3,13 @@ [![Build Status](https://ci.moparisthe.best/job/moparisthebest/job/xmpp-proxy/job/master/badge/icon%3Fstyle=plastic)](https://ci.moparisthe.best/job/moparisthebest/job/xmpp-proxy/job/master/) xmpp-proxy is a reverse proxy and outgoing proxy for XMPP servers and clients, providing STARTTLS, -[Direct TLS](https://xmpp.org/extensions/xep-0368.html), and [QUIC](https://datatracker.ietf.org/doc/html/draft-ietf-quic-transport) -connectivity to plain-text XMPP servers and clients and limiting stanza sizes without an XML parser. +[Direct TLS](https://xmpp.org/extensions/xep-0368.html), [QUIC](https://datatracker.ietf.org/doc/html/draft-ietf-quic-transport), +and [WebSocket](https://datatracker.ietf.org/doc/html/rfc7395) connectivity to plain-text XMPP servers and clients and +limiting stanza sizes without an XML parser. xmpp-proxy in reverse proxy (incoming) mode will: 1. listen on any number of interfaces/ports - 2. accept any STARTTLS, Direct TLS, or QUIC c2s or s2s connections from the internet + 2. accept any STARTTLS, Direct TLS, QUIC, or WebSocket c2s or s2s connections from the internet 3. terminate TLS 4. connect them to a local real XMPP server over plain-text TCP 5. send the [PROXY protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) v1 header if configured, so the @@ -125,10 +126,12 @@ s2s_ports = {15268} If you are a grumpy power user who wants to build xmpp-proxy with exactly the features you want, nothing less, nothing more, this section is for you! -xmpp-proxy has 3 compile-time features: +xmpp-proxy has 5 compile-time features: 1. `incoming` - enables `incoming_listen` config option for reverse proxy STARTTLS/TLS 2. `outgoing` - enables `outgoing_listen` config option for outgoing proxy STARTTLS/TLS 3. `quic` - enables `quic_listen` config option for reverse proxy QUIC, and QUIC support for `outgoing` if it is enabled + 4. `websocket` - enables `websocket_listen` config option for reverse proxy WebSocket + 5. `logging` - enables configurable logging So to build only supporting reverse proxy STARTTLS/TLS, no QUIC, run: `cargo build --release --no-default-features --features incoming` To build a reverse proxy only, but supporting all of STARTTLS/TLS/QUIC, run: `cargo build --release --no-default-features --features incoming,quic` @@ -140,4 +143,5 @@ Thanks [rxml](https://github.com/horazont/rxml) for afl-fuzz seeds #### todo 1. sasl external for s2s, initiating and receiving - 2. websocket incoming and outgoing, maybe even for s2s + 2. websocket outgoing + 3. XEP for XMPP-over-QUIC and XMPP-S2S-over-WebSocket diff --git a/src/main.rs b/src/main.rs index a83caaf..492a6b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use die::Die; use serde_derive::Deserialize; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}; use tokio::net::TcpListener; use tokio::task::JoinHandle; @@ -47,6 +47,11 @@ mod srv; #[cfg(feature = "outgoing")] use crate::srv::*; +#[cfg(feature = "websocket")] +mod websocket; +#[cfg(feature = "websocket")] +use crate::websocket::*; + const IN_BUFFER_SIZE: usize = 8192; const OUT_BUFFER_SIZE: usize = 8192; @@ -59,6 +64,7 @@ struct Config { tls_cert: String, incoming_listen: Option>, quic_listen: Option>, + websocket_listen: Option>, outgoing_listen: Option>, max_stanza_size_bytes: usize, s2s_target: SocketAddr, @@ -89,8 +95,8 @@ impl Config { fn get_cloneable_cfg(&self) -> CloneableConfig { CloneableConfig { max_stanza_size_bytes: self.max_stanza_size_bytes, - s2s_target: self.s2s_target.clone(), - c2s_target: self.c2s_target.clone(), + s2s_target: self.s2s_target, + c2s_target: self.c2s_target, proxy: self.proxy, } } @@ -130,38 +136,7 @@ async fn shuffle_rd_wr_filter( // now read to figure out client vs server let (stream_open, is_c2s, mut in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), &client_addr, in_filter).await?; - let target = if is_c2s { config.c2s_target } else { config.s2s_target }; - client_addr.set_to_addr(target); - client_addr.set_c2s_stream_open(is_c2s, &stream_open); - - let out_stream = tokio::net::TcpStream::connect(target).await?; - let (mut out_rd, mut out_wr) = tokio::io::split(out_stream); - - if config.proxy { - /* - https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt - PROXY TCP4 255.255.255.255 255.255.255.255 65535 65535\r\n - PROXY TCP6 ffff:f...f:ffff ffff:f...f:ffff 65535 65535\r\n - PROXY TCP6 SOURCE_IP DEST_IP SOURCE_PORT DEST_PORT\r\n - */ - // tokio AsyncWrite doesn't have write_fmt so have to go through this buffer for some crazy reason - //write!(out_wr, "PROXY TCP{} {} {} {} {}\r\n", if client_addr.is_ipv4() { '4' } else {'6' }, client_addr.ip(), local_addr.ip(), client_addr.port(), local_addr.port())?; - write!( - &mut in_filter.buf[0..], - "PROXY TCP{} {} {} {} {}\r\n", - if client_addr.client_addr().is_ipv4() { '4' } else { '6' }, - client_addr.client_addr().ip(), - local_addr.ip(), - client_addr.client_addr().port(), - local_addr.port() - )?; - let end_idx = &(&in_filter.buf[0..]).first_index_of(b"\n")? + 1; - trace!("{} '{}'", client_addr.log_from(), to_str(&in_filter.buf[0..end_idx])); - out_wr.write_all(&in_filter.buf[0..end_idx]).await?; - } - trace!("{} '{}'", client_addr.log_from(), to_str(&stream_open)); - out_wr.write_all(&stream_open).await?; - out_wr.flush().await?; + let (mut out_rd, mut out_wr) = open_incoming(config, local_addr, client_addr, &stream_open, is_c2s, &mut in_filter).await?; drop(stream_open); let mut out_buf = [0u8; OUT_BUFFER_SIZE]; @@ -195,6 +170,49 @@ async fn shuffle_rd_wr_filter( Ok(()) } +async fn open_incoming( + config: CloneableConfig, + local_addr: SocketAddr, + client_addr: &mut Context<'_>, + stream_open: &[u8], + is_c2s: bool, + in_filter: &mut StanzaFilter, +) -> Result<(ReadHalf, WriteHalf)> { + let target = if is_c2s { config.c2s_target } else { config.s2s_target }; + client_addr.set_to_addr(target); + client_addr.set_c2s_stream_open(is_c2s, &stream_open); + + let out_stream = tokio::net::TcpStream::connect(target).await?; + let (out_rd, mut out_wr) = tokio::io::split(out_stream); + + if config.proxy { + /* + https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt + PROXY TCP4 255.255.255.255 255.255.255.255 65535 65535\r\n + PROXY TCP6 ffff:f...f:ffff ffff:f...f:ffff 65535 65535\r\n + PROXY TCP6 SOURCE_IP DEST_IP SOURCE_PORT DEST_PORT\r\n + */ + // tokio AsyncWrite doesn't have write_fmt so have to go through this buffer for some crazy reason + //write!(out_wr, "PROXY TCP{} {} {} {} {}\r\n", if client_addr.is_ipv4() { '4' } else {'6' }, client_addr.ip(), local_addr.ip(), client_addr.port(), local_addr.port())?; + write!( + &mut in_filter.buf[0..], + "PROXY TCP{} {} {} {} {}\r\n", + if client_addr.client_addr().is_ipv4() { '4' } else { '6' }, + client_addr.client_addr().ip(), + local_addr.ip(), + client_addr.client_addr().port(), + local_addr.port() + )?; + let end_idx = &(&in_filter.buf[0..]).first_index_of(b"\n")? + 1; + trace!("{} '{}'", client_addr.log_from(), to_str(&in_filter.buf[0..end_idx])); + out_wr.write_all(&in_filter.buf[0..end_idx]).await?; + } + trace!("{} '{}'", client_addr.log_from(), to_str(&stream_open)); + out_wr.write_all(&stream_open).await?; + out_wr.flush().await?; + Ok((out_rd, out_wr)) +} + async fn stream_preamble(mut in_rd: StanzaReader, client_addr: &Context<'_>, mut in_filter: StanzaFilter) -> Result<(Vec, bool, StanzaReader, StanzaFilter)> { let mut stream_open = Vec::new(); while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await { @@ -253,6 +271,13 @@ async fn main() { handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone())); } } + #[cfg(feature = "websocket")] + if let Some(ref listeners) = main_config.websocket_listen { + let acceptor = main_config.tls_acceptor().die("invalid cert/key ?"); + for listener in listeners { + handles.push(spawn_websocket_listener(listener.parse().die("invalid listener address"), config.clone(), acceptor.clone())); + } + } #[cfg(feature = "outgoing")] if let Some(ref listeners) = main_config.outgoing_listen { for listener in listeners { diff --git a/src/slicesubsequence.rs b/src/slicesubsequence.rs index e587f95..5642d6a 100644 --- a/src/slicesubsequence.rs +++ b/src/slicesubsequence.rs @@ -29,9 +29,11 @@ impl SliceSubsequence for &[T] { } fn first_index_of(&self, needle: &[T]) -> Result { - for i in 0..self.len() - needle.len() + 1 { - if self[i..i + needle.len()] == needle[..] { - return Ok(i); + if self.len() >= needle.len() { + for i in 0..self.len() - needle.len() + 1 { + if self[i..i + needle.len()] == needle[..] { + return Ok(i); + } } } Err(anyhow!("not found")) diff --git a/src/stanzafilter.rs b/src/stanzafilter.rs index 4fad7ac..f83b868 100644 --- a/src/stanzafilter.rs +++ b/src/stanzafilter.rs @@ -23,6 +23,7 @@ enum StanzaState { pub struct StanzaFilter { buf_size: usize, pub buf: Vec, + end_of_first_tag: usize, cnt: usize, tag_cnt: usize, state: StanzaState, @@ -43,6 +44,7 @@ impl StanzaFilter { StanzaFilter { buf_size, buf: vec![0u8; buf_size], + end_of_first_tag: 0, cnt: 0, tag_cnt: 0, state: OutsideStanza, @@ -96,6 +98,9 @@ impl StanzaFilter { }, InsideTag => match b { b'>' => { + if self.end_of_first_tag == 0 { + self.end_of_first_tag = self.cnt; + } if self.buf[self.cnt - 1] == b'/' { // state can't be InsideTag unless we are on at least the second character, so can't go out of range // self-closing tag @@ -150,6 +155,9 @@ impl StanzaFilter { EndStream => { if b == b'>' { if self.last_equals(b"")? { + if self.end_of_first_tag == 0 { + self.end_of_first_tag = self.cnt; + } return self.stanza_end(); } else { bail!("illegal stanza: {}", to_str(&self.buf[..(self.cnt + 1)])); @@ -205,6 +213,23 @@ impl StanzaReader { } } } + + #[cfg(feature = "websocket")] + pub async fn next_eoft<'a>(&'a mut self, filter: &'a mut StanzaFilter) -> Result> { + use tokio::io::AsyncReadExt; + + loop { + let n = self.0.read(filter.current_buf()).await?; + if n == 0 { + return Ok(None); + } + if let Some(idx) = filter.process_next_byte_idx()? { + let end_of_first_tag = filter.end_of_first_tag; + filter.end_of_first_tag = 0; + return Ok(Some((&filter.buf[0..idx], end_of_first_tag))); + } + } + } } #[cfg(test)] @@ -226,7 +251,6 @@ mod tests { async fn process_next_byte() -> Result<()> { let mut filter = StanzaFilter::new(262_144); - //todo: This is going to be fun. assert_eq!( StanzaReader(Cursor::new( br###" diff --git a/src/tls.rs b/src/tls.rs index fe65f0f..bf8ee2c 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -201,6 +201,8 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: & let stream = acceptor.accept(stream).await?; + // todo: try to peek stream here and handle websocket on these ports too? + let (in_rd, in_wr) = tokio::io::split(stream); shuffle_rd_wr_filter(in_rd, in_wr, config, local_addr, client_addr, in_filter).await diff --git a/src/websocket.rs b/src/websocket.rs new file mode 100644 index 0000000..c49fe56 --- /dev/null +++ b/src/websocket.rs @@ -0,0 +1,212 @@ +use crate::*; +use futures::{SinkExt, StreamExt, TryStreamExt}; + +use tokio_tungstenite::tungstenite::protocol::Message::*; +use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; + +// https://datatracker.ietf.org/doc/html/rfc7395 + +pub fn spawn_websocket_listener(local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> JoinHandle> { + tokio::spawn(async move { + let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface"); + loop { + let (stream, client_addr) = listener.accept().await?; + let config = config.clone(); + let acceptor = acceptor.clone(); + tokio::spawn(async move { + let mut client_addr = Context::new("websocket-in", client_addr); + if let Err(e) = handle_websocket_connection(stream, &mut client_addr, local_addr, config, acceptor).await { + error!("{} {}", client_addr.log_from(), e); + } + }); + } + #[allow(unreachable_code)] + Ok(()) + }) +} + +async fn handle_websocket_connection(stream: tokio::net::TcpStream, client_addr: &mut Context<'_>, local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> Result<()> { + info!("{} connected", client_addr.log_from()); + + // start TLS + let stream = acceptor.accept(stream).await?; + + // accept the websocket + let stream = tokio_tungstenite::accept_async_with_config( + stream, + Some(WebSocketConfig { + max_send_queue: None, // unlimited + max_frame_size: Some(config.max_stanza_size_bytes), // this is exactly the stanza size + max_message_size: Some(config.max_stanza_size_bytes * 4), // this is the message size, default is 4x frame size, so I guess we'll do the same here + accept_unmasked_frames: true, + }), + ) + .await?; + + let (mut in_wr, mut in_rd) = stream.split(); + + // https://docs.rs/tungstenite/0.14.0/tungstenite/protocol/enum.Message.html + // https://datatracker.ietf.org/doc/html/rfc7395#section-3.2 Data frame messages in the XMPP subprotocol MUST be of the text type and contain UTF-8 encoded data. + let (stanza, is_c2s) = match in_rd.try_next().await? { + // todo: c2s is xmlns="urn:ietf:params:xml:ns:xmpp-framing", let's make up s2s ? xmlns="urn:ietf:params:xml:ns:xmpp-framing-server" sounds good to me + Some(Text(stanza)) => { + let is_c2s = stanza.contains(r#" xmlns="urn:ietf:params:xml:ns:xmpp-framing""#) || stanza.contains(r#" xmlns='urn:ietf:params:xml:ns:xmpp-framing'"#); + (stanza, is_c2s) + } + _ => bail!("expected first websocket frame to be open"), + }; + + let stanza = from_ws(stanza); + let stream_open = stanza.as_bytes(); + + // websocket frame size filters incoming stanza size from client, this is used to split the + // stanzas from the servers up so we can send them across websocket frames + let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes); + + let (out_rd, mut out_wr) = open_incoming(config, local_addr, client_addr, &stream_open, is_c2s, &mut in_filter).await?; + + let mut out_rd = StanzaReader(out_rd); + + loop { + tokio::select! { + // server to client + Ok(buf) = out_rd.next_eoft(&mut in_filter) => { + match buf { + None => break, + Some((buf, end_of_first_tag)) => { + // ignore this + if buf.starts_with(b" { + match msg { + // actual XMPP stanzas + Text(stanza) => { + let stanza = from_ws(stanza); + trace!("{} '{}'", client_addr.log_from(), stanza); + out_wr.write_all(stanza.as_bytes()).await?; + out_wr.flush().await?; + } + // websocket ping/pong + Ping(msg) => { + in_wr.feed(Pong(msg)).await?; + in_wr.flush().await?; + }, + // handle Close, just break from loop, hopefully client sent before + Close(_) => break, + _ => bail!("invalid websocket message: {}", msg) // Binary or Pong + } + }, + // todo: should we also send pings to the client ourselves on a schedule? StanzaFilter strips out whitespace pings if the server uses them... + } + } + + info!("{} disconnected", client_addr.log_from()); + Ok(()) +} + +pub fn from_ws(stanza: String) -> String { + if stanza.starts_with("", ">"); + } else if stanza.starts_with("".to_string(); + } + stanza +} + +pub fn to_ws_new(buf: &[u8], mut end_of_first_tag: usize, is_c2s: bool) -> Result { + if end_of_first_tag == 0 { + return Ok(String::from_utf8(buf.to_vec())?); + } + if buf.starts_with(b"", "/>")); + } + if buf.starts_with(b""#.to_string()); + } + if buf[end_of_first_tag - 1] == b'/' { + end_of_first_tag -= 1; + } + let first_tag_bytes = &buf[0..end_of_first_tag]; + if first_tag_bytes.first_index_of(b" xmlns='").is_ok() || first_tag_bytes.first_index_of(br#" xmlns=""#).is_ok() { + // already set, do nothing + return Ok(String::from_utf8(buf.to_vec())?); + } + // otherwise add proper xmlns before end of tag + let mut ret = String::with_capacity(buf.len() + 22); + ret.push_str(std::str::from_utf8(&first_tag_bytes)?); + ret.push_str(if is_c2s { " xmlns='jabber:client'" } else { " xmlns='jabber:server'" }); + ret.push_str(std::str::from_utf8(&buf[end_of_first_tag..])?); + Ok(ret) +} + +#[cfg(test)] +mod tests { + use crate::websocket::*; + use std::io::Cursor; + + #[test] + fn test_from_ws() { + assert_eq!( + from_ws(r#""#.to_string()), + r#""#.to_string() + ); + assert_eq!(from_ws(r#""#.to_string()), r#""#.to_string()); + } + + async fn to_vec_eoft<'a, T: tokio::io::AsyncRead + Unpin>(mut stanza_reader: StanzaReader, filter: &'a mut StanzaFilter) -> Result> { + let mut ret = Vec::new(); + while let Some((buf, end_of_first_tag)) = stanza_reader.next_eoft(filter).await? { + ret.push(to_ws_new(buf, end_of_first_tag, true)?); + } + return Ok(ret); + } + + #[tokio::test] + async fn test_to_ws() -> Result<()> { + let mut filter = StanzaFilter::new(262_144); + + assert_eq!( + to_vec_eoft( + StanzaReader(Cursor::new( + br###" + + + + PLAINSCRAM-SHA-1 + test1@test.moparisthe.best/gajim.12S9XM42 + test1@test.moparisthe.best/gajim.12S9XM42 + "###, + )), + &mut filter + ) + .await?, + vec![ + r#""#, + r#""#, + r#""#, + r#"PLAINSCRAM-SHA-1"#, + r#"test1@test.moparisthe.best/gajim.12S9XM42"#, + r#"test1@test.moparisthe.best/gajim.12S9XM42"#, + ] + ); + + Ok(()) + } +} diff --git a/xmpp-proxy.toml b/xmpp-proxy.toml index a12a379..e8ca4b2 100644 --- a/xmpp-proxy.toml +++ b/xmpp-proxy.toml @@ -3,6 +3,8 @@ incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269" ] # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet quic_listen = [ "0.0.0.0:443" ] +# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet +websocket_listen = [ "0.0.0.0:443" ] # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost outgoing_listen = [ "127.0.0.1:15270" ]