Multiplex WebSocket on the same sockets as STARTTLS and Direct TLS

This commit is contained in:
Travis Burtrum 2022-02-11 00:55:36 -05:00
parent 14baf94efb
commit d5dca49008
11 changed files with 81 additions and 69 deletions

View File

@ -20,7 +20,7 @@ xmpp-proxy in outgoing mode will:
1. listen on any number of interfaces/ports 1. listen on any number of interfaces/ports
2. accept any plain-text TCP connection from a local XMPP server or client 2. accept any plain-text TCP connection from a local XMPP server or client
3. look up the required SRV records 3. look up the required SRV records
4. connect to a real XMPP server across the internet over STARTTLS, Direct TLS, or QUIC 4. connect to a real XMPP server across the internet over STARTTLS, Direct TLS, QUIC, or WebSocket
5. fallback to next SRV target or defaults as required to fully connect 5. fallback to next SRV target or defaults as required to fully connect
6. perform all the proper required certificate validation logic 6. perform all the proper required certificate validation logic
7. limit incoming stanza sizes as configured 7. limit incoming stanza sizes as configured
@ -127,7 +127,7 @@ xmpp-proxy has 5 compile-time features:
1. `incoming` - enables `incoming_listen` config option for reverse proxy STARTTLS/TLS 1. `incoming` - enables `incoming_listen` config option for reverse proxy STARTTLS/TLS
2. `outgoing` - enables `outgoing_listen` config option for outgoing proxy STARTTLS/TLS 2. `outgoing` - enables `outgoing_listen` config option for outgoing proxy STARTTLS/TLS
3. `quic` - enables `quic_listen` config option for reverse proxy QUIC, and QUIC support for `outgoing` if it is enabled 3. `quic` - enables `quic_listen` config option for reverse proxy QUIC, and QUIC support for `outgoing` if it is enabled
4. `websocket` - enables `websocket_listen` config option for reverse proxy WebSocket 4. `websocket` - enables reverse proxy WebSocket on `incoming_listen`, and WebSocket support for `outgoing` if it is enabled
5. `logging` - enables configurable logging 5. `logging` - enables configurable logging
So to build only supporting reverse proxy STARTTLS/TLS, no QUIC, run: `cargo build --release --no-default-features --features incoming` So to build only supporting reverse proxy STARTTLS/TLS, no QUIC, run: `cargo build --release --no-default-features --features incoming`
@ -140,5 +140,4 @@ Thanks [rxml](https://github.com/horazont/rxml) for afl-fuzz seeds
#### todo #### todo
1. sasl external for s2s, initiating and receiving 1. sasl external for s2s, initiating and receiving
2. websocket outgoing 2. XEP for XMPP-over-QUIC and XMPP-S2S-over-WebSocket
3. XEP for XMPP-over-QUIC and XMPP-S2S-over-WebSocket

View File

@ -1,10 +1,8 @@
# interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet
incoming_listen = [ ] incoming_listen = [ "0.0.0.0:5281" ]
# interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet
quic_listen = [ ] quic_listen = [ ]
# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet
websocket_listen = [ "0.0.0.0:5281" ]
# interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost
outgoing_listen = [ ] outgoing_listen = [ ]

View File

@ -1,10 +1,8 @@
# interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet
incoming_listen = [ "0.0.0.0:5222" ] incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5281" ]
# interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet
quic_listen = [ ] quic_listen = [ ]
# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet
websocket_listen = [ "0.0.0.0:5281" ]
# interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost
outgoing_listen = [ "0.0.0.0:15270" ] outgoing_listen = [ "0.0.0.0:15270" ]

View File

@ -1,10 +1,8 @@
# interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet
incoming_listen = [ "0.0.0.0:5222" ] incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5281" ]
# interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet
quic_listen = [ ] quic_listen = [ ]
# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet
websocket_listen = [ "0.0.0.0:5281" ]
# interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost
outgoing_listen = [ "0.0.0.0:15270" ] outgoing_listen = [ "0.0.0.0:15270" ]

View File

@ -9,14 +9,14 @@ use futures_util::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
SinkExt, TryStreamExt, SinkExt, TryStreamExt,
}; };
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufStream};
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
use tokio_tungstenite::{tungstenite::Message::*, WebSocketStream}; use tokio_tungstenite::{tungstenite::Message::*, WebSocketStream};
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
type WsWr = SplitSink<WebSocketStream<tokio_rustls::TlsStream<tokio::net::TcpStream>>, tokio_tungstenite::tungstenite::Message>; type WsWr = SplitSink<WebSocketStream<BufStream<tokio_rustls::TlsStream<tokio::net::TcpStream>>>, tokio_tungstenite::tungstenite::Message>;
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
type WsRd = SplitStream<WebSocketStream<tokio_rustls::TlsStream<tokio::net::TcpStream>>>; type WsRd = SplitStream<WebSocketStream<BufStream<tokio_rustls::TlsStream<tokio::net::TcpStream>>>>;
pub enum StanzaWrite { pub enum StanzaWrite {
AsyncWrite(Box<dyn AsyncWrite + Unpin + Send>), AsyncWrite(Box<dyn AsyncWrite + Unpin + Send>),
@ -25,14 +25,17 @@ pub enum StanzaWrite {
} }
pub enum StanzaRead { pub enum StanzaRead {
AsyncRead(StanzaReader<BufReader<Box<dyn AsyncRead + Unpin + Send>>>), AsyncRead(StanzaReader<Box<dyn AsyncRead + Unpin + Send>>),
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
WebSocketRead(WsRd), WebSocketRead(WsRd),
} }
impl StanzaWrite { impl StanzaWrite {
pub fn new(wr: Box<dyn AsyncWrite + Unpin + Send>) -> Self { #[inline(always)]
AsyncWrite(wr) pub fn new<T: 'static + AsyncWrite + Unpin + Send>(wr: T) -> Self {
AsyncWrite(Box::new(wr))
// todo: investigate buffering this, but don't double buffer
//AsyncWrite(Box::new(tokio::io::BufWriter::with_capacity(8192, wr)))
} }
pub async fn write_all<'a>(&'a mut self, is_c2s: bool, buf: &'a [u8], end_of_first_tag: usize, client_addr: &'a str) -> Result<()> { pub async fn write_all<'a>(&'a mut self, is_c2s: bool, buf: &'a [u8], end_of_first_tag: usize, client_addr: &'a str) -> Result<()> {
@ -69,9 +72,16 @@ impl StanzaWrite {
} }
impl StanzaRead { impl StanzaRead {
pub fn new(rd: Box<dyn AsyncRead + Unpin + Send>) -> Self { #[inline(always)]
pub fn new<T: 'static + AsyncRead + Unpin + Send>(rd: T) -> Self {
// we naively read 1 byte at a time, which buffering significantly speeds up // we naively read 1 byte at a time, which buffering significantly speeds up
AsyncRead(StanzaReader(BufReader::with_capacity(crate::IN_BUFFER_SIZE, rd))) AsyncRead(StanzaReader(Box::new(BufReader::with_capacity(crate::IN_BUFFER_SIZE, rd))))
}
#[inline(always)]
pub fn already_buffered<T: 'static + AsyncRead + Unpin + Send>(rd: T) -> Self {
// we naively read 1 byte at a time, which buffering significantly speeds up
AsyncRead(StanzaReader(Box::new(rd)))
} }
pub async fn next<'a>(&'a mut self, filter: &'a mut StanzaFilter, client_addr: &'a str, wrt: &mut StanzaWrite) -> Result<Option<(&'a [u8], usize)>> { pub async fn next<'a>(&'a mut self, filter: &'a mut StanzaFilter, client_addr: &'a str, wrt: &mut StanzaWrite) -> Result<Option<(&'a [u8], usize)>> {

View File

@ -93,7 +93,6 @@ struct Config {
tls_cert: String, tls_cert: String,
incoming_listen: Option<Vec<String>>, incoming_listen: Option<Vec<String>>,
quic_listen: Option<Vec<String>>, quic_listen: Option<Vec<String>>,
websocket_listen: Option<Vec<String>>,
outgoing_listen: Option<Vec<String>>, outgoing_listen: Option<Vec<String>>,
max_stanza_size_bytes: usize, max_stanza_size_bytes: usize,
s2s_target: SocketAddr, s2s_target: SocketAddr,
@ -177,8 +176,8 @@ async fn shuffle_rd_wr_filter(
shuffle_rd_wr_filter_only( shuffle_rd_wr_filter_only(
in_rd, in_rd,
in_wr, in_wr,
StanzaRead::new(Box::new(out_rd)), StanzaRead::new(out_rd),
StanzaWrite::new(Box::new(out_wr)), StanzaWrite::new(out_wr),
is_c2s, is_c2s,
config.max_stanza_size_bytes, config.max_stanza_size_bytes,
client_addr, client_addr,
@ -326,13 +325,6 @@ async fn main() {
handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone())); handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone()));
} }
} }
#[cfg(feature = "websocket")]
if let Some(ref listeners) = main_config.websocket_listen {
let acceptor = main_config.tls_acceptor().die("invalid cert/key ?");
for listener in listeners {
handles.push(spawn_websocket_listener(listener.parse().die("invalid listener address"), config.clone(), acceptor.clone()));
}
}
#[cfg(feature = "outgoing")] #[cfg(feature = "outgoing")]
if let Some(ref listeners) = main_config.outgoing_listen { if let Some(ref listeners) = main_config.outgoing_listen {
for listener in listeners { for listener in listeners {

View File

@ -7,8 +7,8 @@ async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr:
let (in_rd, in_wr) = tokio::io::split(stream); let (in_rd, in_wr) = tokio::io::split(stream);
let mut in_rd = StanzaRead::new(Box::new(in_rd)); let mut in_rd = StanzaRead::new(in_rd);
let mut in_wr = StanzaWrite::new(Box::new(in_wr)); let mut in_wr = StanzaWrite::new(in_wr);
// now read to figure out client vs server // now read to figure out client vs server
let (stream_open, is_c2s) = stream_preamble(&mut in_rd, &mut in_wr, client_addr.log_to(), &mut in_filter).await?; let (stream_open, is_c2s) = stream_preamble(&mut in_rd, &mut in_wr, client_addr.log_to(), &mut in_filter).await?;

View File

@ -20,7 +20,7 @@ pub async fn quic_connect(target: SocketAddr, server_name: &str, is_c2s: bool) -
trace!("quic connected: addr={}", connection.remote_address()); trace!("quic connected: addr={}", connection.remote_address());
let (wrt, rd) = connection.open_bi().await?; let (wrt, rd) = connection.open_bi().await?;
Ok((StanzaWrite::AsyncWrite(Box::new(wrt)), StanzaRead::new(Box::new(rd)))) Ok((StanzaWrite::new(wrt), StanzaRead::new(rd)))
} }
impl Config { impl Config {
@ -80,7 +80,7 @@ pub fn spawn_quic_listener(local_addr: SocketAddr, config: CloneableConfig, serv
let mut client_addr = client_addr.clone(); let mut client_addr = client_addr.clone();
info!("{} connected new stream", client_addr.log_from()); info!("{} connected new stream", client_addr.log_from());
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = shuffle_rd_wr(StanzaRead::new(Box::new(rd)), StanzaWrite::new(Box::new(wrt)), config, local_addr, &mut client_addr).await { if let Err(e) = shuffle_rd_wr(StanzaRead::new(rd), StanzaWrite::new(wrt), config, local_addr, &mut client_addr).await {
error!("{} {}", client_addr.log_from(), e); error!("{} {}", client_addr.log_from(), e);
} }
}); });

View File

@ -1,5 +1,6 @@
use crate::*; use crate::*;
use std::convert::TryFrom; use std::convert::TryFrom;
use tokio::io::{AsyncBufReadExt, BufStream};
#[cfg(any(feature = "incoming", feature = "outgoing"))] #[cfg(any(feature = "incoming", feature = "outgoing"))]
use tokio_rustls::{ use tokio_rustls::{
@ -37,7 +38,7 @@ pub async fn tls_connect(target: SocketAddr, server_name: &str, is_c2s: bool) ->
SERVER_TLS_CONFIG.connect(dnsname, stream).await? SERVER_TLS_CONFIG.connect(dnsname, stream).await?
}; };
let (rd, wrt) = tokio::io::split(stream); let (rd, wrt) = tokio::io::split(stream);
Ok((StanzaWrite::AsyncWrite(Box::new(wrt)), StanzaRead::new(Box::new(rd)))) Ok((StanzaWrite::new(wrt), StanzaRead::new(rd)))
} }
#[cfg(feature = "outgoing")] #[cfg(feature = "outgoing")]
@ -85,7 +86,7 @@ pub async fn starttls_connect(target: SocketAddr, server_name: &str, is_c2s: boo
SERVER_TLS_CONFIG.connect(dnsname, stream).await? SERVER_TLS_CONFIG.connect(dnsname, stream).await?
}; };
let (rd, wrt) = tokio::io::split(stream); let (rd, wrt) = tokio::io::split(stream);
Ok((StanzaWrite::AsyncWrite(Box::new(wrt)), StanzaRead::new(Box::new(rd)))) Ok((StanzaWrite::new(wrt), StanzaRead::new(rd)))
} }
#[cfg(feature = "incoming")] #[cfg(feature = "incoming")]
@ -203,9 +204,43 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: &
let stream = acceptor.accept(stream).await?; let stream = acceptor.accept(stream).await?;
// todo: try to peek stream here and handle websocket on these ports too? #[cfg(not(feature = "websocket"))]
{
let (in_rd, in_wr) = tokio::io::split(stream);
shuffle_rd_wr_filter(StanzaRead::new(in_rd), StanzaWrite::new(in_wr), config, local_addr, client_addr, in_filter).await
}
let (in_rd, in_wr) = tokio::io::split(stream); #[cfg(feature = "websocket")]
{
let stream: tokio_rustls::TlsStream<tokio::net::TcpStream> = stream.into();
let mut stream = BufStream::with_capacity(crate::IN_BUFFER_SIZE, 0, stream);
let websocket = {
// wait up to 10 seconds until 3 bytes have been read
use std::time::{Duration, Instant};
let duration = Duration::from_secs(10);
let now = Instant::now();
let mut buf = stream.fill_buf().await?;
loop {
if buf.len() >= 3 {
break; // success
}
if buf.is_empty() {
bail!("not enough bytes");
}
if Instant::now() - now > duration {
bail!("less than 3 bytes in 10 seconds, closed connection?");
}
buf = stream.fill_buf().await?;
}
shuffle_rd_wr_filter(StanzaRead::new(Box::new(in_rd)), StanzaWrite::new(Box::new(in_wr)), config, local_addr, client_addr, in_filter).await buf[..3] == b"GET"[..]
};
if websocket {
handle_websocket_connection(stream, client_addr, local_addr, config).await
} else {
let (in_rd, in_wr) = tokio::io::split(stream);
shuffle_rd_wr_filter(StanzaRead::already_buffered(in_rd), StanzaWrite::new(in_wr), config, local_addr, client_addr, in_filter).await
}
}
} }

View File

@ -6,33 +6,14 @@ use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
// https://datatracker.ietf.org/doc/html/rfc7395 // https://datatracker.ietf.org/doc/html/rfc7395
pub fn spawn_websocket_listener(local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> JoinHandle<Result<()>> { pub async fn handle_websocket_connection(
tokio::spawn(async move { stream: BufStream<tokio_rustls::TlsStream<tokio::net::TcpStream>>,
let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface"); client_addr: &mut Context<'_>,
loop { local_addr: SocketAddr,
let (stream, client_addr) = listener.accept().await?; config: CloneableConfig,
let config = config.clone(); ) -> Result<()> {
let acceptor = acceptor.clone();
tokio::spawn(async move {
let mut client_addr = Context::new("websocket-in", client_addr);
if let Err(e) = handle_websocket_connection(stream, &mut client_addr, local_addr, config, acceptor).await {
error!("{} {}", client_addr.log_from(), e);
}
});
}
#[allow(unreachable_code)]
Ok(())
})
}
async fn handle_websocket_connection(stream: tokio::net::TcpStream, client_addr: &mut Context<'_>, local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> Result<()> {
info!("{} connected", client_addr.log_from()); info!("{} connected", client_addr.log_from());
// start TLS
let stream = acceptor.accept(stream).await?;
let stream: tokio_rustls::TlsStream<tokio::net::TcpStream> = stream.into();
// accept the websocket // accept the websocket
// todo: check SEC_WEBSOCKET_PROTOCOL or ORIGIN ? // todo: check SEC_WEBSOCKET_PROTOCOL or ORIGIN ?
let stream = tokio_tungstenite::accept_async_with_config( let stream = tokio_tungstenite::accept_async_with_config(
@ -106,6 +87,7 @@ pub fn to_ws_new(buf: &[u8], mut end_of_first_tag: usize, is_c2s: bool) -> Resul
use rustls::ServerName; use rustls::ServerName;
use std::convert::TryFrom; use std::convert::TryFrom;
use tokio::io::BufStream;
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::client::IntoClientRequest;
@ -127,6 +109,8 @@ pub async fn websocket_connect(target: SocketAddr, server_name: &str, url: &Uri,
let stream = connector.connect(dnsname, stream).await?; let stream = connector.connect(dnsname, stream).await?;
let stream: tokio_rustls::TlsStream<tokio::net::TcpStream> = stream.into(); let stream: tokio_rustls::TlsStream<tokio::net::TcpStream> = stream.into();
// todo: tokio_tungstenite seems to have a bug, if the write buffer is non-zero, it'll hang forever, even though we always flush, investigate
let stream = BufStream::with_capacity(crate::IN_BUFFER_SIZE, 0, stream);
let (stream, _) = tokio_tungstenite::client_async_with_config(request, stream, None).await?; let (stream, _) = tokio_tungstenite::client_async_with_config(request, stream, None).await?;

View File

@ -1,10 +1,8 @@
# interfaces to listen for reverse proxy STARTTLS/Direct TLS XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy STARTTLS/Direct TLS/TLS WebSocket (wss) XMPP connections on, should be open to the internet
incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269" ] incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269", "0.0.0.0:443" ]
# interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet
quic_listen = [ "0.0.0.0:443" ] quic_listen = [ "0.0.0.0:443" ]
# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet
websocket_listen = [ "0.0.0.0:443" ]
# interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost
outgoing_listen = [ "127.0.0.1:15270" ] outgoing_listen = [ "127.0.0.1:15270" ]