From c0a8adc3e0d39c4a046b281aec31643dd89cbae5 Mon Sep 17 00:00:00 2001 From: moparisthebest Date: Fri, 25 Mar 2022 01:32:10 -0400 Subject: [PATCH] Add support for proxying outgoing WebSocket connections --- README.md | 2 +- src/in_out.rs | 8 ++++---- src/lib.rs | 24 ++++++++++++++++++++++++ src/outgoing.rs | 10 +++++++--- src/tls.rs | 38 ++++++++------------------------------ src/websocket.rs | 39 ++++++++++++++++++++------------------- xmpp-proxy.toml | 2 +- 7 files changed, 65 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index 3aeb049..b447d12 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ xmpp-proxy in reverse proxy (incoming) mode will: xmpp-proxy in outgoing mode will: 1. listen on any number of interfaces/ports - 2. accept any plain-text TCP connection from a local XMPP server or client + 2. accept any plain-text TCP or WebSocket connection from a local XMPP server or client 3. look up the required SRV records 4. connect to a real XMPP server across the internet over STARTTLS, Direct TLS, QUIC, or WebSocket 5. fallback to next SRV target or defaults as required to fully connect diff --git a/src/in_out.rs b/src/in_out.rs index 4b700d8..7cde7e4 100644 --- a/src/in_out.rs +++ b/src/in_out.rs @@ -2,21 +2,21 @@ #[cfg(feature = "websocket")] use crate::{from_ws, to_ws_new}; -use crate::{slicesubsequence::SliceSubsequence, trace, StanzaFilter, StanzaRead::*, StanzaReader, StanzaWrite::*}; +use crate::{slicesubsequence::SliceSubsequence, trace, AsyncReadAndWrite, StanzaFilter, StanzaRead::*, StanzaReader, StanzaWrite::*}; use anyhow::{bail, Result}; #[cfg(feature = "websocket")] use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, TryStreamExt, }; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufStream}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; #[cfg(feature = "websocket")] use tokio_tungstenite::{tungstenite::Message::*, WebSocketStream}; #[cfg(feature = "websocket")] -type WsWr = SplitSink>>, tokio_tungstenite::tungstenite::Message>; +type WsWr = SplitSink>, tokio_tungstenite::tungstenite::Message>; #[cfg(feature = "websocket")] -type WsRd = SplitStream>>>; +type WsRd = SplitStream>>; pub enum StanzaWrite { AsyncWrite(Box), diff --git a/src/lib.rs b/src/lib.rs index ea2a7b2..d9493a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub use stanzafilter::*; mod slicesubsequence; use slicesubsequence::*; +use anyhow::bail; use std::net::SocketAddr; pub use log::{debug, error, info, log_enabled, trace}; @@ -21,6 +22,29 @@ pub fn c2s(is_c2s: bool) -> &'static str { } } +pub async fn first_bytes_match(stream: &tokio::net::TcpStream, p: &mut [u8], matcher: fn(&[u8]) -> bool) -> anyhow::Result { + // sooo... I don't think peek here can be used for > 1 byte without this timer craziness... can it? + let len = p.len(); + // wait up to 10 seconds until len bytes have been read + use std::time::{Duration, Instant}; + let duration = Duration::from_secs(10); + let now = Instant::now(); + loop { + let n = stream.peek(p).await?; + if n == len { + break; // success + } + if n == 0 { + bail!("not enough bytes"); + } + if Instant::now() - now > duration { + bail!("less than {} bytes in 10 seconds, closed connection?", len); + } + } + + Ok(matcher(p)) +} + #[derive(Clone)] pub struct Context<'a> { conn_id: String, diff --git a/src/outgoing.rs b/src/outgoing.rs index c14b65c..d7089cc 100644 --- a/src/outgoing.rs +++ b/src/outgoing.rs @@ -5,10 +5,14 @@ async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes); - let (in_rd, in_wr) = tokio::io::split(stream); + let is_ws = first_bytes_match(&stream, &mut in_filter.buf[0..3], |p| p == b"GET").await?; - let mut in_rd = StanzaRead::new(in_rd); - let mut in_wr = StanzaWrite::new(in_wr); + let (mut in_rd, mut in_wr) = if is_ws { + incoming_websocket_connection(Box::new(stream), config.max_stanza_size_bytes).await? + } else { + let (in_rd, in_wr) = tokio::io::split(stream); + (StanzaRead::new(in_rd), StanzaWrite::new(in_wr)) + }; // now read to figure out client vs server let (stream_open, is_c2s) = stream_preamble(&mut in_rd, &mut in_wr, client_addr.log_to(), &mut in_filter).await?; diff --git a/src/tls.rs b/src/tls.rs index 33d2a09..b235340 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -83,35 +83,13 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: & let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes); - let direct_tls = { - // sooo... I don't think peek here can be used for > 1 byte without this timer - // craziness... can it? this could be switched to only peek 1 byte and assume - // a leading 0x16 is TLS, it would *probably* be ok ? - //let mut p = [0u8; 3]; - let p = &mut in_filter.buf[0..3]; - // wait up to 10 seconds until 3 bytes have been read - use std::time::{Duration, Instant}; - let duration = Duration::from_secs(10); - let now = Instant::now(); - loop { - let n = stream.peek(p).await?; - if n == 3 { - break; // success - } - if n == 0 { - bail!("not enough bytes"); - } - if Instant::now() - now > duration { - bail!("less than 3 bytes in 10 seconds, closed connection?"); - } - } - - /* TLS packet starts with a record "Hello" (0x16), followed by version - * (0x03 0x00-0x03) (RFC6101 A.1) - * This means we reject SSLv2 and lower, which is actually a good thing (RFC6176) - */ - p[0] == 0x16 && p[1] == 0x03 && p[2] <= 0x03 - }; + /* TLS packet starts with a record "Hello" (0x16), followed by version + * (0x03 0x00-0x03) (RFC6101 A.1) + * This means we reject SSLv2 and lower, which is actually a good thing (RFC6176) + * + * could just check the leading 0x16 is TLS, it would *probably* be ok ? + */ + let direct_tls = first_bytes_match(&stream, &mut in_filter.buf[0..3], |p| p[0] == 0x16 && p[1] == 0x03 && p[2] <= 0x03).await?; client_addr.set_proto(if direct_tls { "directtls-in" } else { "starttls-in" }); info!("{} direct_tls sniffed", client_addr.log_from()); @@ -218,7 +196,7 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: & }; if websocket { - handle_websocket_connection(stream, config, server_certs, local_addr, client_addr, in_filter).await + handle_websocket_connection(Box::new(stream), config, server_certs, local_addr, client_addr, in_filter).await } else { let (in_rd, in_wr) = tokio::io::split(stream); shuffle_rd_wr_filter(StanzaRead::already_buffered(in_rd), StanzaWrite::new(in_wr), config, server_certs, local_addr, client_addr, in_filter).await diff --git a/src/websocket.rs b/src/websocket.rs index 1d5a4cf..fafea4a 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -15,8 +15,22 @@ fn ws_cfg(max_stanza_size_bytes: usize) -> Option { }) } +pub trait AsyncReadAndWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite {} + +impl AsyncReadAndWrite for T {} + +pub async fn incoming_websocket_connection(stream: Box, max_stanza_size_bytes: usize) -> Result<(StanzaRead, StanzaWrite)> { + // accept the websocket + // todo: check SEC_WEBSOCKET_PROTOCOL or ORIGIN ? + let stream = tokio_tungstenite::accept_async_with_config(stream, ws_cfg(max_stanza_size_bytes)).await?; + + let (in_wr, in_rd) = stream.split(); + + Ok((StanzaRead::WebSocketRead(in_rd), StanzaWrite::WebSocketClientWrite(in_wr))) +} + pub async fn handle_websocket_connection( - stream: BufStream>, + stream: Box, config: CloneableConfig, server_certs: ServerCerts, local_addr: SocketAddr, @@ -26,22 +40,9 @@ pub async fn handle_websocket_connection( client_addr.set_proto("websocket-in"); info!("{} connected", client_addr.log_from()); - // accept the websocket - // todo: check SEC_WEBSOCKET_PROTOCOL or ORIGIN ? - let stream = tokio_tungstenite::accept_async_with_config(stream, ws_cfg(config.max_stanza_size_bytes)).await?; + let (in_rd, in_wr) = incoming_websocket_connection(stream, config.max_stanza_size_bytes).await?; - let (in_wr, in_rd) = stream.split(); - - shuffle_rd_wr_filter( - StanzaRead::WebSocketRead(in_rd), - StanzaWrite::WebSocketClientWrite(in_wr), - config, - server_certs, - local_addr, - client_addr, - in_filter, - ) - .await + shuffle_rd_wr_filter(in_rd, in_wr, config, server_certs, local_addr, client_addr, in_filter).await } pub fn from_ws(stanza: String) -> String { @@ -97,7 +98,6 @@ pub fn to_ws_new(buf: &[u8], mut end_of_first_tag: usize, is_c2s: bool) -> Resul use rustls::ServerName; use std::convert::TryFrom; -use tokio::io::BufStream; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::http::header::{ORIGIN, SEC_WEBSOCKET_PROTOCOL}; @@ -113,9 +113,10 @@ pub async fn websocket_connect(target: SocketAddr, server_name: &str, url: &Uri, let stream = tokio::net::TcpStream::connect(target).await?; let stream = config.connector.connect(dnsname, stream).await?; - let stream: tokio_rustls::TlsStream = stream.into(); + //let stream: tokio_rustls::TlsStream = stream.into(); // todo: tokio_tungstenite seems to have a bug, if the write buffer is non-zero, it'll hang forever, even though we always flush, investigate - let stream = BufStream::with_capacity(crate::IN_BUFFER_SIZE, 0, stream); + //let stream = BufStream::with_capacity(crate::IN_BUFFER_SIZE, 0, stream); + let stream: Box = Box::new(stream); let (stream, _) = tokio_tungstenite::client_async_with_config(request, stream, ws_cfg(config.max_stanza_size_bytes)).await?; diff --git a/xmpp-proxy.toml b/xmpp-proxy.toml index d9130c1..3e3f6e0 100644 --- a/xmpp-proxy.toml +++ b/xmpp-proxy.toml @@ -3,7 +3,7 @@ incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269", "0.0.0.0:443" ] # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet quic_listen = [ "0.0.0.0:443" ] -# interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost +# interfaces to listen for outgoing proxy TCP or WebSocket XMPP connections on, should be localhost outgoing_listen = [ "127.0.0.1:15270" ] # these ports shouldn't do any TLS, but should assume any connection from xmpp-proxy is secure