diff --git a/README.md b/README.md index 8c6b480..5d93ad3 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ xmpp-proxy in outgoing mode will: 1. listen on any number of interfaces/ports 2. accept any plain-text TCP connection from a local XMPP server or client 3. look up the required SRV records - 4. connect to a real XMPP server across the internet over STARTTLS, Direct TLS, or QUIC + 4. connect to a real XMPP server across the internet over STARTTLS, Direct TLS, QUIC, or WebSocket 5. fallback to next SRV target or defaults as required to fully connect 6. perform all the proper required certificate validation logic 7. limit incoming stanza sizes as configured @@ -127,7 +127,7 @@ xmpp-proxy has 5 compile-time features: 1. `incoming` - enables `incoming_listen` config option for reverse proxy STARTTLS/TLS 2. `outgoing` - enables `outgoing_listen` config option for outgoing proxy STARTTLS/TLS 3. `quic` - enables `quic_listen` config option for reverse proxy QUIC, and QUIC support for `outgoing` if it is enabled - 4. `websocket` - enables `websocket_listen` config option for reverse proxy WebSocket + 4. `websocket` - enables reverse proxy WebSocket on `incoming_listen`, and WebSocket support for `outgoing` if it is enabled 5. `logging` - enables configurable logging So to build only supporting reverse proxy STARTTLS/TLS, no QUIC, run: `cargo build --release --no-default-features --features incoming` @@ -140,5 +140,4 @@ Thanks [rxml](https://github.com/horazont/rxml) for afl-fuzz seeds #### todo 1. sasl external for s2s, initiating and receiving - 2. websocket outgoing - 3. XEP for XMPP-over-QUIC and XMPP-S2S-over-WebSocket + 2. XEP for XMPP-over-QUIC and XMPP-S2S-over-WebSocket diff --git a/integration/11-c2s-websocket/xmpp-proxy1.toml b/integration/11-c2s-websocket/xmpp-proxy1.toml index 75521e8..e0cb4de 100644 --- a/integration/11-c2s-websocket/xmpp-proxy1.toml +++ b/integration/11-c2s-websocket/xmpp-proxy1.toml @@ -1,10 +1,8 @@ # interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet -incoming_listen = [ ] +incoming_listen = [ "0.0.0.0:5281" ] # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet quic_listen = [ ] -# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet -websocket_listen = [ "0.0.0.0:5281" ] # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost outgoing_listen = [ ] diff --git a/integration/18-s2s-websocket/xmpp-proxy1.toml b/integration/18-s2s-websocket/xmpp-proxy1.toml index 222ec01..37bf24f 100644 --- a/integration/18-s2s-websocket/xmpp-proxy1.toml +++ b/integration/18-s2s-websocket/xmpp-proxy1.toml @@ -1,10 +1,8 @@ # interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet -incoming_listen = [ "0.0.0.0:5222" ] +incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5281" ] # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet quic_listen = [ ] -# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet -websocket_listen = [ "0.0.0.0:5281" ] # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost outgoing_listen = [ "0.0.0.0:15270" ] diff --git a/integration/18-s2s-websocket/xmpp-proxy2.toml b/integration/18-s2s-websocket/xmpp-proxy2.toml index 54440e4..122c195 100644 --- a/integration/18-s2s-websocket/xmpp-proxy2.toml +++ b/integration/18-s2s-websocket/xmpp-proxy2.toml @@ -1,10 +1,8 @@ # interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet -incoming_listen = [ "0.0.0.0:5222" ] +incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5281" ] # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet quic_listen = [ ] -# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet -websocket_listen = [ "0.0.0.0:5281" ] # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost outgoing_listen = [ "0.0.0.0:15270" ] diff --git a/src/in_out.rs b/src/in_out.rs index 370a081..4b700d8 100644 --- a/src/in_out.rs +++ b/src/in_out.rs @@ -9,14 +9,14 @@ use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, TryStreamExt, }; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufStream}; #[cfg(feature = "websocket")] use tokio_tungstenite::{tungstenite::Message::*, WebSocketStream}; #[cfg(feature = "websocket")] -type WsWr = SplitSink>, tokio_tungstenite::tungstenite::Message>; +type WsWr = SplitSink>>, tokio_tungstenite::tungstenite::Message>; #[cfg(feature = "websocket")] -type WsRd = SplitStream>>; +type WsRd = SplitStream>>>; pub enum StanzaWrite { AsyncWrite(Box), @@ -25,14 +25,17 @@ pub enum StanzaWrite { } pub enum StanzaRead { - AsyncRead(StanzaReader>>), + AsyncRead(StanzaReader>), #[cfg(feature = "websocket")] WebSocketRead(WsRd), } impl StanzaWrite { - pub fn new(wr: Box) -> Self { - AsyncWrite(wr) + #[inline(always)] + pub fn new(wr: T) -> Self { + AsyncWrite(Box::new(wr)) + // todo: investigate buffering this, but don't double buffer + //AsyncWrite(Box::new(tokio::io::BufWriter::with_capacity(8192, wr))) } pub async fn write_all<'a>(&'a mut self, is_c2s: bool, buf: &'a [u8], end_of_first_tag: usize, client_addr: &'a str) -> Result<()> { @@ -69,9 +72,16 @@ impl StanzaWrite { } impl StanzaRead { - pub fn new(rd: Box) -> Self { + #[inline(always)] + pub fn new(rd: T) -> Self { // we naively read 1 byte at a time, which buffering significantly speeds up - AsyncRead(StanzaReader(BufReader::with_capacity(crate::IN_BUFFER_SIZE, rd))) + AsyncRead(StanzaReader(Box::new(BufReader::with_capacity(crate::IN_BUFFER_SIZE, rd)))) + } + + #[inline(always)] + pub fn already_buffered(rd: T) -> Self { + // we naively read 1 byte at a time, which buffering significantly speeds up + AsyncRead(StanzaReader(Box::new(rd))) } pub async fn next<'a>(&'a mut self, filter: &'a mut StanzaFilter, client_addr: &'a str, wrt: &mut StanzaWrite) -> Result> { diff --git a/src/main.rs b/src/main.rs index bdd61e5..e9cf9d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -93,7 +93,6 @@ struct Config { tls_cert: String, incoming_listen: Option>, quic_listen: Option>, - websocket_listen: Option>, outgoing_listen: Option>, max_stanza_size_bytes: usize, s2s_target: SocketAddr, @@ -177,8 +176,8 @@ async fn shuffle_rd_wr_filter( shuffle_rd_wr_filter_only( in_rd, in_wr, - StanzaRead::new(Box::new(out_rd)), - StanzaWrite::new(Box::new(out_wr)), + StanzaRead::new(out_rd), + StanzaWrite::new(out_wr), is_c2s, config.max_stanza_size_bytes, client_addr, @@ -326,13 +325,6 @@ async fn main() { handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone())); } } - #[cfg(feature = "websocket")] - if let Some(ref listeners) = main_config.websocket_listen { - let acceptor = main_config.tls_acceptor().die("invalid cert/key ?"); - for listener in listeners { - handles.push(spawn_websocket_listener(listener.parse().die("invalid listener address"), config.clone(), acceptor.clone())); - } - } #[cfg(feature = "outgoing")] if let Some(ref listeners) = main_config.outgoing_listen { for listener in listeners { diff --git a/src/outgoing.rs b/src/outgoing.rs index e30e9d1..d8da559 100644 --- a/src/outgoing.rs +++ b/src/outgoing.rs @@ -7,8 +7,8 @@ async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: let (in_rd, in_wr) = tokio::io::split(stream); - let mut in_rd = StanzaRead::new(Box::new(in_rd)); - let mut in_wr = StanzaWrite::new(Box::new(in_wr)); + let mut in_rd = StanzaRead::new(in_rd); + let mut in_wr = StanzaWrite::new(in_wr); // now read to figure out client vs server let (stream_open, is_c2s) = stream_preamble(&mut in_rd, &mut in_wr, client_addr.log_to(), &mut in_filter).await?; diff --git a/src/quic.rs b/src/quic.rs index 2f3e617..8bf5f38 100644 --- a/src/quic.rs +++ b/src/quic.rs @@ -20,7 +20,7 @@ pub async fn quic_connect(target: SocketAddr, server_name: &str, is_c2s: bool) - trace!("quic connected: addr={}", connection.remote_address()); let (wrt, rd) = connection.open_bi().await?; - Ok((StanzaWrite::AsyncWrite(Box::new(wrt)), StanzaRead::new(Box::new(rd)))) + Ok((StanzaWrite::new(wrt), StanzaRead::new(rd))) } impl Config { @@ -80,7 +80,7 @@ pub fn spawn_quic_listener(local_addr: SocketAddr, config: CloneableConfig, serv let mut client_addr = client_addr.clone(); info!("{} connected new stream", client_addr.log_from()); tokio::spawn(async move { - if let Err(e) = shuffle_rd_wr(StanzaRead::new(Box::new(rd)), StanzaWrite::new(Box::new(wrt)), config, local_addr, &mut client_addr).await { + if let Err(e) = shuffle_rd_wr(StanzaRead::new(rd), StanzaWrite::new(wrt), config, local_addr, &mut client_addr).await { error!("{} {}", client_addr.log_from(), e); } }); diff --git a/src/tls.rs b/src/tls.rs index 9ca2ca0..00c88d8 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -1,5 +1,6 @@ use crate::*; use std::convert::TryFrom; +use tokio::io::{AsyncBufReadExt, BufStream}; #[cfg(any(feature = "incoming", feature = "outgoing"))] use tokio_rustls::{ @@ -37,7 +38,7 @@ pub async fn tls_connect(target: SocketAddr, server_name: &str, is_c2s: bool) -> SERVER_TLS_CONFIG.connect(dnsname, stream).await? }; let (rd, wrt) = tokio::io::split(stream); - Ok((StanzaWrite::AsyncWrite(Box::new(wrt)), StanzaRead::new(Box::new(rd)))) + Ok((StanzaWrite::new(wrt), StanzaRead::new(rd))) } #[cfg(feature = "outgoing")] @@ -85,7 +86,7 @@ pub async fn starttls_connect(target: SocketAddr, server_name: &str, is_c2s: boo SERVER_TLS_CONFIG.connect(dnsname, stream).await? }; let (rd, wrt) = tokio::io::split(stream); - Ok((StanzaWrite::AsyncWrite(Box::new(wrt)), StanzaRead::new(Box::new(rd)))) + Ok((StanzaWrite::new(wrt), StanzaRead::new(rd))) } #[cfg(feature = "incoming")] @@ -203,9 +204,43 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: & let stream = acceptor.accept(stream).await?; - // todo: try to peek stream here and handle websocket on these ports too? + #[cfg(not(feature = "websocket"))] + { + let (in_rd, in_wr) = tokio::io::split(stream); + shuffle_rd_wr_filter(StanzaRead::new(in_rd), StanzaWrite::new(in_wr), config, local_addr, client_addr, in_filter).await + } - let (in_rd, in_wr) = tokio::io::split(stream); + #[cfg(feature = "websocket")] + { + let stream: tokio_rustls::TlsStream = stream.into(); + let mut stream = BufStream::with_capacity(crate::IN_BUFFER_SIZE, 0, stream); + let websocket = { + // wait up to 10 seconds until 3 bytes have been read + use std::time::{Duration, Instant}; + let duration = Duration::from_secs(10); + let now = Instant::now(); + let mut buf = stream.fill_buf().await?; + loop { + if buf.len() >= 3 { + break; // success + } + if buf.is_empty() { + bail!("not enough bytes"); + } + if Instant::now() - now > duration { + bail!("less than 3 bytes in 10 seconds, closed connection?"); + } + buf = stream.fill_buf().await?; + } - shuffle_rd_wr_filter(StanzaRead::new(Box::new(in_rd)), StanzaWrite::new(Box::new(in_wr)), config, local_addr, client_addr, in_filter).await + buf[..3] == b"GET"[..] + }; + + if websocket { + handle_websocket_connection(stream, client_addr, local_addr, config).await + } else { + let (in_rd, in_wr) = tokio::io::split(stream); + shuffle_rd_wr_filter(StanzaRead::already_buffered(in_rd), StanzaWrite::new(in_wr), config, local_addr, client_addr, in_filter).await + } + } } diff --git a/src/websocket.rs b/src/websocket.rs index 643380e..c486b5d 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -6,33 +6,14 @@ use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; // https://datatracker.ietf.org/doc/html/rfc7395 -pub fn spawn_websocket_listener(local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> JoinHandle> { - tokio::spawn(async move { - let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface"); - loop { - let (stream, client_addr) = listener.accept().await?; - let config = config.clone(); - let acceptor = acceptor.clone(); - tokio::spawn(async move { - let mut client_addr = Context::new("websocket-in", client_addr); - if let Err(e) = handle_websocket_connection(stream, &mut client_addr, local_addr, config, acceptor).await { - error!("{} {}", client_addr.log_from(), e); - } - }); - } - #[allow(unreachable_code)] - Ok(()) - }) -} - -async fn handle_websocket_connection(stream: tokio::net::TcpStream, client_addr: &mut Context<'_>, local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> Result<()> { +pub async fn handle_websocket_connection( + stream: BufStream>, + client_addr: &mut Context<'_>, + local_addr: SocketAddr, + config: CloneableConfig, +) -> Result<()> { info!("{} connected", client_addr.log_from()); - // start TLS - let stream = acceptor.accept(stream).await?; - - let stream: tokio_rustls::TlsStream = stream.into(); - // accept the websocket // todo: check SEC_WEBSOCKET_PROTOCOL or ORIGIN ? let stream = tokio_tungstenite::accept_async_with_config( @@ -106,6 +87,7 @@ pub fn to_ws_new(buf: &[u8], mut end_of_first_tag: usize, is_c2s: bool) -> Resul use rustls::ServerName; use std::convert::TryFrom; +use tokio::io::BufStream; use tokio_rustls::TlsConnector; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -127,6 +109,8 @@ pub async fn websocket_connect(target: SocketAddr, server_name: &str, url: &Uri, let stream = connector.connect(dnsname, stream).await?; let stream: tokio_rustls::TlsStream = stream.into(); + // todo: tokio_tungstenite seems to have a bug, if the write buffer is non-zero, it'll hang forever, even though we always flush, investigate + let stream = BufStream::with_capacity(crate::IN_BUFFER_SIZE, 0, stream); let (stream, _) = tokio_tungstenite::client_async_with_config(request, stream, None).await?; diff --git a/xmpp-proxy.toml b/xmpp-proxy.toml index e8ca4b2..d9130c1 100644 --- a/xmpp-proxy.toml +++ b/xmpp-proxy.toml @@ -1,10 +1,8 @@ -# interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet -incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269" ] +# interfaces to listen for reverse proxy STARTTLS/Direct TLS/TLS WebSocket (wss) XMPP connections on, should be open to the internet +incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269", "0.0.0.0:443" ] # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet quic_listen = [ "0.0.0.0:443" ] -# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet -websocket_listen = [ "0.0.0.0:443" ] # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost outgoing_listen = [ "127.0.0.1:15270" ]