Implement WebSocket reverse proxy support
moparisthebest/xmpp-proxy/pipeline/head This commit looks good Details

This commit is contained in:
Travis Burtrum 2021-07-28 02:24:08 -04:00
parent c82869eb7c
commit 1bef5f2a9b
9 changed files with 492 additions and 45 deletions

172
Cargo.lock generated
View File

@ -55,12 +55,27 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.7.0" version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.0.1" version = "1.0.1"
@ -95,6 +110,15 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b"
[[package]]
name = "cpufeatures"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "ct-logs" name = "ct-logs"
version = "0.8.0" version = "0.8.0"
@ -116,6 +140,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8634d5e6139f7364a4e99bd718b2f511f2f25863146360e70909bc45a016290" checksum = "f8634d5e6139f7364a4e99bd718b2f511f2f25863146360e70909bc45a016290"
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "enum-as-inner" name = "enum-as-inner"
version = "0.3.3" version = "0.3.3"
@ -141,6 +174,12 @@ dependencies = [
"termcolor", "termcolor",
] ]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]] [[package]]
name = "form_urlencoded" name = "form_urlencoded"
version = "1.0.1" version = "1.0.1"
@ -245,6 +284,16 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "generic-array"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
dependencies = [
"typenum",
"version_check",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.3" version = "0.2.3"
@ -285,6 +334,23 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "http"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "httparse"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68"
[[package]] [[package]]
name = "humantime" name = "humantime"
version = "2.1.0" version = "2.1.0"
@ -302,6 +368,15 @@ dependencies = [
"unicode-normalization", "unicode-normalization",
] ]
[[package]]
name = "input_buffer"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413"
dependencies = [
"bytes",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.10" version = "0.1.10"
@ -329,6 +404,12 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9"
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.51" version = "0.3.51"
@ -448,6 +529,12 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]] [[package]]
name = "openssl-probe" name = "openssl-probe"
version = "0.1.4" version = "0.1.4"
@ -485,6 +572,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.7" version = "0.2.7"
@ -763,6 +870,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "sha-1"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81"
dependencies = [
"block-buffer",
"cfg-if",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.3" version = "0.4.3"
@ -886,6 +1006,19 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "tokio-tungstenite"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2"
dependencies = [
"futures-util",
"log",
"pin-project",
"tokio",
"tungstenite",
]
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.5.8" version = "0.5.8"
@ -972,6 +1105,32 @@ dependencies = [
"trust-dns-proto", "trust-dns-proto",
] ]
[[package]]
name = "tungstenite"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"input_buffer",
"log",
"rand",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]] [[package]]
name = "unicode-bidi" name = "unicode-bidi"
version = "0.3.5" version = "0.3.5"
@ -1020,6 +1179,18 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.10.2+wasi-snapshot-preview1" version = "0.10.2+wasi-snapshot-preview1"
@ -1171,6 +1342,7 @@ dependencies = [
"serde_derive", "serde_derive",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-tungstenite",
"toml", "toml",
"trust-dns-resolver", "trust-dns-resolver",
"webpki-roots", "webpki-roots",

View File

@ -45,8 +45,11 @@ trust-dns-resolver = { version = "0.20", optional = true }
# quic deps # quic deps
quinn = { version = "0.7", optional = true } quinn = { version = "0.7", optional = true }
# websocket deps
tokio-tungstenite = { version = "0.14", optional = true }
[features] [features]
default = ["incoming", "outgoing", "quic", "logging"] default = ["incoming", "outgoing", "quic", "websocket", "logging"]
#default = ["incoming", "outgoing"] #default = ["incoming", "outgoing"]
#default = ["incoming", "quic"] #default = ["incoming", "quic"]
#default = ["outgoing", "quic"] #default = ["outgoing", "quic"]
@ -56,4 +59,5 @@ default = ["incoming", "outgoing", "quic", "logging"]
incoming = ["tokio-rustls"] incoming = ["tokio-rustls"]
outgoing = ["tokio-rustls", "trust-dns-resolver", "webpki-roots", "lazy_static"] outgoing = ["tokio-rustls", "trust-dns-resolver", "webpki-roots", "lazy_static"]
quic = ["quinn"] quic = ["quinn"]
websocket = ["tokio-tungstenite"]
logging = ["rand", "env_logger"] logging = ["rand", "env_logger"]

View File

@ -3,12 +3,13 @@
[![Build Status](https://ci.moparisthe.best/job/moparisthebest/job/xmpp-proxy/job/master/badge/icon%3Fstyle=plastic)](https://ci.moparisthe.best/job/moparisthebest/job/xmpp-proxy/job/master/) [![Build Status](https://ci.moparisthe.best/job/moparisthebest/job/xmpp-proxy/job/master/badge/icon%3Fstyle=plastic)](https://ci.moparisthe.best/job/moparisthebest/job/xmpp-proxy/job/master/)
xmpp-proxy is a reverse proxy and outgoing proxy for XMPP servers and clients, providing STARTTLS, xmpp-proxy is a reverse proxy and outgoing proxy for XMPP servers and clients, providing STARTTLS,
[Direct TLS](https://xmpp.org/extensions/xep-0368.html), and [QUIC](https://datatracker.ietf.org/doc/html/draft-ietf-quic-transport) [Direct TLS](https://xmpp.org/extensions/xep-0368.html), [QUIC](https://datatracker.ietf.org/doc/html/draft-ietf-quic-transport),
connectivity to plain-text XMPP servers and clients and limiting stanza sizes without an XML parser. and [WebSocket](https://datatracker.ietf.org/doc/html/rfc7395) connectivity to plain-text XMPP servers and clients and
limiting stanza sizes without an XML parser.
xmpp-proxy in reverse proxy (incoming) mode will: xmpp-proxy in reverse proxy (incoming) mode will:
1. listen on any number of interfaces/ports 1. listen on any number of interfaces/ports
2. accept any STARTTLS, Direct TLS, or QUIC c2s or s2s connections from the internet 2. accept any STARTTLS, Direct TLS, QUIC, or WebSocket c2s or s2s connections from the internet
3. terminate TLS 3. terminate TLS
4. connect them to a local real XMPP server over plain-text TCP 4. connect them to a local real XMPP server over plain-text TCP
5. send the [PROXY protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) v1 header if configured, so the 5. send the [PROXY protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) v1 header if configured, so the
@ -125,10 +126,12 @@ s2s_ports = {15268}
If you are a grumpy power user who wants to build xmpp-proxy with exactly the features you want, nothing less, nothing If you are a grumpy power user who wants to build xmpp-proxy with exactly the features you want, nothing less, nothing
more, this section is for you! more, this section is for you!
xmpp-proxy has 3 compile-time features: xmpp-proxy has 5 compile-time features:
1. `incoming` - enables `incoming_listen` config option for reverse proxy STARTTLS/TLS 1. `incoming` - enables `incoming_listen` config option for reverse proxy STARTTLS/TLS
2. `outgoing` - enables `outgoing_listen` config option for outgoing proxy STARTTLS/TLS 2. `outgoing` - enables `outgoing_listen` config option for outgoing proxy STARTTLS/TLS
3. `quic` - enables `quic_listen` config option for reverse proxy QUIC, and QUIC support for `outgoing` if it is enabled 3. `quic` - enables `quic_listen` config option for reverse proxy QUIC, and QUIC support for `outgoing` if it is enabled
4. `websocket` - enables `websocket_listen` config option for reverse proxy WebSocket
5. `logging` - enables configurable logging
So to build only supporting reverse proxy STARTTLS/TLS, no QUIC, run: `cargo build --release --no-default-features --features incoming` So to build only supporting reverse proxy STARTTLS/TLS, no QUIC, run: `cargo build --release --no-default-features --features incoming`
To build a reverse proxy only, but supporting all of STARTTLS/TLS/QUIC, run: `cargo build --release --no-default-features --features incoming,quic` To build a reverse proxy only, but supporting all of STARTTLS/TLS/QUIC, run: `cargo build --release --no-default-features --features incoming,quic`
@ -140,4 +143,5 @@ Thanks [rxml](https://github.com/horazont/rxml) for afl-fuzz seeds
#### todo #### todo
1. sasl external for s2s, initiating and receiving 1. sasl external for s2s, initiating and receiving
2. websocket incoming and outgoing, maybe even for s2s 2. websocket outgoing
3. XEP for XMPP-over-QUIC and XMPP-S2S-over-WebSocket

View File

@ -11,7 +11,7 @@ use die::Die;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@ -47,6 +47,11 @@ mod srv;
#[cfg(feature = "outgoing")] #[cfg(feature = "outgoing")]
use crate::srv::*; use crate::srv::*;
#[cfg(feature = "websocket")]
mod websocket;
#[cfg(feature = "websocket")]
use crate::websocket::*;
const IN_BUFFER_SIZE: usize = 8192; const IN_BUFFER_SIZE: usize = 8192;
const OUT_BUFFER_SIZE: usize = 8192; const OUT_BUFFER_SIZE: usize = 8192;
@ -59,6 +64,7 @@ struct Config {
tls_cert: String, tls_cert: String,
incoming_listen: Option<Vec<String>>, incoming_listen: Option<Vec<String>>,
quic_listen: Option<Vec<String>>, quic_listen: Option<Vec<String>>,
websocket_listen: Option<Vec<String>>,
outgoing_listen: Option<Vec<String>>, outgoing_listen: Option<Vec<String>>,
max_stanza_size_bytes: usize, max_stanza_size_bytes: usize,
s2s_target: SocketAddr, s2s_target: SocketAddr,
@ -89,8 +95,8 @@ impl Config {
fn get_cloneable_cfg(&self) -> CloneableConfig { fn get_cloneable_cfg(&self) -> CloneableConfig {
CloneableConfig { CloneableConfig {
max_stanza_size_bytes: self.max_stanza_size_bytes, max_stanza_size_bytes: self.max_stanza_size_bytes,
s2s_target: self.s2s_target.clone(), s2s_target: self.s2s_target,
c2s_target: self.c2s_target.clone(), c2s_target: self.c2s_target,
proxy: self.proxy, proxy: self.proxy,
} }
} }
@ -130,38 +136,7 @@ async fn shuffle_rd_wr_filter<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
// now read to figure out client vs server // now read to figure out client vs server
let (stream_open, is_c2s, mut in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), &client_addr, in_filter).await?; let (stream_open, is_c2s, mut in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), &client_addr, in_filter).await?;
let target = if is_c2s { config.c2s_target } else { config.s2s_target }; let (mut out_rd, mut out_wr) = open_incoming(config, local_addr, client_addr, &stream_open, is_c2s, &mut in_filter).await?;
client_addr.set_to_addr(target);
client_addr.set_c2s_stream_open(is_c2s, &stream_open);
let out_stream = tokio::net::TcpStream::connect(target).await?;
let (mut out_rd, mut out_wr) = tokio::io::split(out_stream);
if config.proxy {
/*
https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
PROXY TCP4 255.255.255.255 255.255.255.255 65535 65535\r\n
PROXY TCP6 ffff:f...f:ffff ffff:f...f:ffff 65535 65535\r\n
PROXY TCP6 SOURCE_IP DEST_IP SOURCE_PORT DEST_PORT\r\n
*/
// tokio AsyncWrite doesn't have write_fmt so have to go through this buffer for some crazy reason
//write!(out_wr, "PROXY TCP{} {} {} {} {}\r\n", if client_addr.is_ipv4() { '4' } else {'6' }, client_addr.ip(), local_addr.ip(), client_addr.port(), local_addr.port())?;
write!(
&mut in_filter.buf[0..],
"PROXY TCP{} {} {} {} {}\r\n",
if client_addr.client_addr().is_ipv4() { '4' } else { '6' },
client_addr.client_addr().ip(),
local_addr.ip(),
client_addr.client_addr().port(),
local_addr.port()
)?;
let end_idx = &(&in_filter.buf[0..]).first_index_of(b"\n")? + 1;
trace!("{} '{}'", client_addr.log_from(), to_str(&in_filter.buf[0..end_idx]));
out_wr.write_all(&in_filter.buf[0..end_idx]).await?;
}
trace!("{} '{}'", client_addr.log_from(), to_str(&stream_open));
out_wr.write_all(&stream_open).await?;
out_wr.flush().await?;
drop(stream_open); drop(stream_open);
let mut out_buf = [0u8; OUT_BUFFER_SIZE]; let mut out_buf = [0u8; OUT_BUFFER_SIZE];
@ -195,6 +170,49 @@ async fn shuffle_rd_wr_filter<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
Ok(()) Ok(())
} }
async fn open_incoming(
config: CloneableConfig,
local_addr: SocketAddr,
client_addr: &mut Context<'_>,
stream_open: &[u8],
is_c2s: bool,
in_filter: &mut StanzaFilter,
) -> Result<(ReadHalf<tokio::net::TcpStream>, WriteHalf<tokio::net::TcpStream>)> {
let target = if is_c2s { config.c2s_target } else { config.s2s_target };
client_addr.set_to_addr(target);
client_addr.set_c2s_stream_open(is_c2s, &stream_open);
let out_stream = tokio::net::TcpStream::connect(target).await?;
let (out_rd, mut out_wr) = tokio::io::split(out_stream);
if config.proxy {
/*
https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
PROXY TCP4 255.255.255.255 255.255.255.255 65535 65535\r\n
PROXY TCP6 ffff:f...f:ffff ffff:f...f:ffff 65535 65535\r\n
PROXY TCP6 SOURCE_IP DEST_IP SOURCE_PORT DEST_PORT\r\n
*/
// tokio AsyncWrite doesn't have write_fmt so have to go through this buffer for some crazy reason
//write!(out_wr, "PROXY TCP{} {} {} {} {}\r\n", if client_addr.is_ipv4() { '4' } else {'6' }, client_addr.ip(), local_addr.ip(), client_addr.port(), local_addr.port())?;
write!(
&mut in_filter.buf[0..],
"PROXY TCP{} {} {} {} {}\r\n",
if client_addr.client_addr().is_ipv4() { '4' } else { '6' },
client_addr.client_addr().ip(),
local_addr.ip(),
client_addr.client_addr().port(),
local_addr.port()
)?;
let end_idx = &(&in_filter.buf[0..]).first_index_of(b"\n")? + 1;
trace!("{} '{}'", client_addr.log_from(), to_str(&in_filter.buf[0..end_idx]));
out_wr.write_all(&in_filter.buf[0..end_idx]).await?;
}
trace!("{} '{}'", client_addr.log_from(), to_str(&stream_open));
out_wr.write_all(&stream_open).await?;
out_wr.flush().await?;
Ok((out_rd, out_wr))
}
async fn stream_preamble<R: AsyncRead + Unpin>(mut in_rd: StanzaReader<R>, client_addr: &Context<'_>, mut in_filter: StanzaFilter) -> Result<(Vec<u8>, bool, StanzaReader<R>, StanzaFilter)> { async fn stream_preamble<R: AsyncRead + Unpin>(mut in_rd: StanzaReader<R>, client_addr: &Context<'_>, mut in_filter: StanzaFilter) -> Result<(Vec<u8>, bool, StanzaReader<R>, StanzaFilter)> {
let mut stream_open = Vec::new(); let mut stream_open = Vec::new();
while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await { while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await {
@ -253,6 +271,13 @@ async fn main() {
handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone())); handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone()));
} }
} }
#[cfg(feature = "websocket")]
if let Some(ref listeners) = main_config.websocket_listen {
let acceptor = main_config.tls_acceptor().die("invalid cert/key ?");
for listener in listeners {
handles.push(spawn_websocket_listener(listener.parse().die("invalid listener address"), config.clone(), acceptor.clone()));
}
}
#[cfg(feature = "outgoing")] #[cfg(feature = "outgoing")]
if let Some(ref listeners) = main_config.outgoing_listen { if let Some(ref listeners) = main_config.outgoing_listen {
for listener in listeners { for listener in listeners {

View File

@ -29,9 +29,11 @@ impl<T: PartialEq + Clone> SliceSubsequence<T> for &[T] {
} }
fn first_index_of(&self, needle: &[T]) -> Result<usize> { fn first_index_of(&self, needle: &[T]) -> Result<usize> {
for i in 0..self.len() - needle.len() + 1 { if self.len() >= needle.len() {
if self[i..i + needle.len()] == needle[..] { for i in 0..self.len() - needle.len() + 1 {
return Ok(i); if self[i..i + needle.len()] == needle[..] {
return Ok(i);
}
} }
} }
Err(anyhow!("not found")) Err(anyhow!("not found"))

View File

@ -23,6 +23,7 @@ enum StanzaState {
pub struct StanzaFilter { pub struct StanzaFilter {
buf_size: usize, buf_size: usize,
pub buf: Vec<u8>, pub buf: Vec<u8>,
end_of_first_tag: usize,
cnt: usize, cnt: usize,
tag_cnt: usize, tag_cnt: usize,
state: StanzaState, state: StanzaState,
@ -43,6 +44,7 @@ impl StanzaFilter {
StanzaFilter { StanzaFilter {
buf_size, buf_size,
buf: vec![0u8; buf_size], buf: vec![0u8; buf_size],
end_of_first_tag: 0,
cnt: 0, cnt: 0,
tag_cnt: 0, tag_cnt: 0,
state: OutsideStanza, state: OutsideStanza,
@ -96,6 +98,9 @@ impl StanzaFilter {
}, },
InsideTag => match b { InsideTag => match b {
b'>' => { b'>' => {
if self.end_of_first_tag == 0 {
self.end_of_first_tag = self.cnt;
}
if self.buf[self.cnt - 1] == b'/' { if self.buf[self.cnt - 1] == b'/' {
// state can't be InsideTag unless we are on at least the second character, so can't go out of range // state can't be InsideTag unless we are on at least the second character, so can't go out of range
// self-closing tag // self-closing tag
@ -150,6 +155,9 @@ impl StanzaFilter {
EndStream => { EndStream => {
if b == b'>' { if b == b'>' {
if self.last_equals(b"</stream:stream>")? { if self.last_equals(b"</stream:stream>")? {
if self.end_of_first_tag == 0 {
self.end_of_first_tag = self.cnt;
}
return self.stanza_end(); return self.stanza_end();
} else { } else {
bail!("illegal stanza: {}", to_str(&self.buf[..(self.cnt + 1)])); bail!("illegal stanza: {}", to_str(&self.buf[..(self.cnt + 1)]));
@ -205,6 +213,23 @@ impl<T: tokio::io::AsyncRead + Unpin> StanzaReader<T> {
} }
} }
} }
#[cfg(feature = "websocket")]
pub async fn next_eoft<'a>(&'a mut self, filter: &'a mut StanzaFilter) -> Result<Option<(&'a [u8], usize)>> {
use tokio::io::AsyncReadExt;
loop {
let n = self.0.read(filter.current_buf()).await?;
if n == 0 {
return Ok(None);
}
if let Some(idx) = filter.process_next_byte_idx()? {
let end_of_first_tag = filter.end_of_first_tag;
filter.end_of_first_tag = 0;
return Ok(Some((&filter.buf[0..idx], end_of_first_tag)));
}
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -226,7 +251,6 @@ mod tests {
async fn process_next_byte() -> Result<()> { async fn process_next_byte() -> Result<()> {
let mut filter = StanzaFilter::new(262_144); let mut filter = StanzaFilter::new(262_144);
//todo: <x a='/>'>This is going to be fun.</x>
assert_eq!( assert_eq!(
StanzaReader(Cursor::new( StanzaReader(Cursor::new(
br###" br###"

View File

@ -201,6 +201,8 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: &
let stream = acceptor.accept(stream).await?; let stream = acceptor.accept(stream).await?;
// todo: try to peek stream here and handle websocket on these ports too?
let (in_rd, in_wr) = tokio::io::split(stream); let (in_rd, in_wr) = tokio::io::split(stream);
shuffle_rd_wr_filter(in_rd, in_wr, config, local_addr, client_addr, in_filter).await shuffle_rd_wr_filter(in_rd, in_wr, config, local_addr, client_addr, in_filter).await

212
src/websocket.rs Normal file
View File

@ -0,0 +1,212 @@
use crate::*;
use futures::{SinkExt, StreamExt, TryStreamExt};
use tokio_tungstenite::tungstenite::protocol::Message::*;
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
// https://datatracker.ietf.org/doc/html/rfc7395
pub fn spawn_websocket_listener(local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface");
loop {
let (stream, client_addr) = listener.accept().await?;
let config = config.clone();
let acceptor = acceptor.clone();
tokio::spawn(async move {
let mut client_addr = Context::new("websocket-in", client_addr);
if let Err(e) = handle_websocket_connection(stream, &mut client_addr, local_addr, config, acceptor).await {
error!("{} {}", client_addr.log_from(), e);
}
});
}
#[allow(unreachable_code)]
Ok(())
})
}
async fn handle_websocket_connection(stream: tokio::net::TcpStream, client_addr: &mut Context<'_>, local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> Result<()> {
info!("{} connected", client_addr.log_from());
// start TLS
let stream = acceptor.accept(stream).await?;
// accept the websocket
let stream = tokio_tungstenite::accept_async_with_config(
stream,
Some(WebSocketConfig {
max_send_queue: None, // unlimited
max_frame_size: Some(config.max_stanza_size_bytes), // this is exactly the stanza size
max_message_size: Some(config.max_stanza_size_bytes * 4), // this is the message size, default is 4x frame size, so I guess we'll do the same here
accept_unmasked_frames: true,
}),
)
.await?;
let (mut in_wr, mut in_rd) = stream.split();
// https://docs.rs/tungstenite/0.14.0/tungstenite/protocol/enum.Message.html
// https://datatracker.ietf.org/doc/html/rfc7395#section-3.2 Data frame messages in the XMPP subprotocol MUST be of the text type and contain UTF-8 encoded data.
let (stanza, is_c2s) = match in_rd.try_next().await? {
// todo: c2s is xmlns="urn:ietf:params:xml:ns:xmpp-framing", let's make up s2s ? xmlns="urn:ietf:params:xml:ns:xmpp-framing-server" sounds good to me
Some(Text(stanza)) => {
let is_c2s = stanza.contains(r#" xmlns="urn:ietf:params:xml:ns:xmpp-framing""#) || stanza.contains(r#" xmlns='urn:ietf:params:xml:ns:xmpp-framing'"#);
(stanza, is_c2s)
}
_ => bail!("expected first websocket frame to be open"),
};
let stanza = from_ws(stanza);
let stream_open = stanza.as_bytes();
// websocket frame size filters incoming stanza size from client, this is used to split the
// stanzas from the servers up so we can send them across websocket frames
let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes);
let (out_rd, mut out_wr) = open_incoming(config, local_addr, client_addr, &stream_open, is_c2s, &mut in_filter).await?;
let mut out_rd = StanzaReader(out_rd);
loop {
tokio::select! {
// server to client
Ok(buf) = out_rd.next_eoft(&mut in_filter) => {
match buf {
None => break,
Some((buf, end_of_first_tag)) => {
// ignore this
if buf.starts_with(b"<?xml ") {
continue;
}
let stanza = to_ws_new(buf, end_of_first_tag, is_c2s)?;
trace!("{} '{}'", client_addr.log_to(), stanza);
in_wr.feed(Text(stanza)).await?;
in_wr.flush().await?;
}
}
},
Ok(Some(msg)) = in_rd.try_next() => {
match msg {
// actual XMPP stanzas
Text(stanza) => {
let stanza = from_ws(stanza);
trace!("{} '{}'", client_addr.log_from(), stanza);
out_wr.write_all(stanza.as_bytes()).await?;
out_wr.flush().await?;
}
// websocket ping/pong
Ping(msg) => {
in_wr.feed(Pong(msg)).await?;
in_wr.flush().await?;
},
// handle Close, just break from loop, hopefully client sent <close/> before
Close(_) => break,
_ => bail!("invalid websocket message: {}", msg) // Binary or Pong
}
},
// todo: should we also send pings to the client ourselves on a schedule? StanzaFilter strips out whitespace pings if the server uses them...
}
}
info!("{} disconnected", client_addr.log_from());
Ok(())
}
pub fn from_ws(stanza: String) -> String {
if stanza.starts_with("<open ") {
return stanza
.replace("<open ", r#"<?xml version='1.0'?><stream:stream xmlns:stream="http://etherx.jabber.org/streams" "#)
.replace("urn:ietf:params:xml:ns:xmpp-framing-server", "jabber:server")
.replace("urn:ietf:params:xml:ns:xmpp-framing", "jabber:client")
.replace("/>", ">");
} else if stanza.starts_with("<close ") {
return "</stream:stream>".to_string();
}
stanza
}
pub fn to_ws_new(buf: &[u8], mut end_of_first_tag: usize, is_c2s: bool) -> Result<String> {
if end_of_first_tag == 0 {
return Ok(String::from_utf8(buf.to_vec())?);
}
if buf.starts_with(b"<stream:stream ") {
let buf = String::from_utf8(buf.to_vec())?;
return Ok(buf
.replace("<stream:stream ", "<open ")
.replace("jabber:server", "urn:ietf:params:xml:ns:xmpp-framing-server")
.replace("jabber:client", "urn:ietf:params:xml:ns:xmpp-framing")
.replace(">", "/>"));
}
if buf.starts_with(b"</stream:stream") {
return Ok(r#"<close xmlns="urn:ietf:params:xml:ns:xmpp-framing" />"#.to_string());
}
if buf[end_of_first_tag - 1] == b'/' {
end_of_first_tag -= 1;
}
let first_tag_bytes = &buf[0..end_of_first_tag];
if first_tag_bytes.first_index_of(b" xmlns='").is_ok() || first_tag_bytes.first_index_of(br#" xmlns=""#).is_ok() {
// already set, do nothing
return Ok(String::from_utf8(buf.to_vec())?);
}
// otherwise add proper xmlns before end of tag
let mut ret = String::with_capacity(buf.len() + 22);
ret.push_str(std::str::from_utf8(&first_tag_bytes)?);
ret.push_str(if is_c2s { " xmlns='jabber:client'" } else { " xmlns='jabber:server'" });
ret.push_str(std::str::from_utf8(&buf[end_of_first_tag..])?);
Ok(ret)
}
#[cfg(test)]
mod tests {
use crate::websocket::*;
use std::io::Cursor;
#[test]
fn test_from_ws() {
assert_eq!(
from_ws(r#"<open xmlns="urn:ietf:params:xml:ns:xmpp-framing" version="1.0" to="test.moparisthe.best" xml:lang="en" />"#.to_string()),
r#"<?xml version='1.0'?><stream:stream xmlns:stream="http://etherx.jabber.org/streams" xmlns="jabber:client" version="1.0" to="test.moparisthe.best" xml:lang="en" >"#.to_string()
);
assert_eq!(from_ws(r#"<close xmlns="urn:ietf:params:xml:ns:xmpp-framing" />"#.to_string()), r#"</stream:stream>"#.to_string());
}
async fn to_vec_eoft<'a, T: tokio::io::AsyncRead + Unpin>(mut stanza_reader: StanzaReader<T>, filter: &'a mut StanzaFilter) -> Result<Vec<String>> {
let mut ret = Vec::new();
while let Some((buf, end_of_first_tag)) = stanza_reader.next_eoft(filter).await? {
ret.push(to_ws_new(buf, end_of_first_tag, true)?);
}
return Ok(ret);
}
#[tokio::test]
async fn test_to_ws() -> Result<()> {
let mut filter = StanzaFilter::new(262_144);
assert_eq!(
to_vec_eoft(
StanzaReader(Cursor::new(
br###"
<stream:stream xmlns="jabber:client" version="1.0" to="test.moparisthe.best" xml:lang="en">
</stream:stream>
<iq type='result' id='6ef4a4b7-7f2b-462b-9176-83ec706c625e' to='test1@test.moparisthe.best/gajim.12S9XM42'/>
<stream:features><mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl'><mechanism>PLAIN</mechanism><mechanism>SCRAM-SHA-1</mechanism></mechanisms></stream:features>
<iq type='result' id='7b0d57bb-6446-4701-92e5-8b9354bbfabe'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'><jid>test1@test.moparisthe.best/gajim.12S9XM42</jid></bind></iq>
<iq type='result' id='7b0d57bb-6446-4701-92e5-8b9354bb>fabe'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'><jid>test1@test.moparisthe.best/gajim.12S9XM42</jid></bind></iq>
"###,
)),
&mut filter
)
.await?,
vec![
r#"<open xmlns="urn:ietf:params:xml:ns:xmpp-framing" version="1.0" to="test.moparisthe.best" xml:lang="en"/>"#,
r#"<close xmlns="urn:ietf:params:xml:ns:xmpp-framing" />"#,
r#"<iq type='result' id='6ef4a4b7-7f2b-462b-9176-83ec706c625e' to='test1@test.moparisthe.best/gajim.12S9XM42' xmlns='jabber:client'/>"#,
r#"<stream:features xmlns='jabber:client'><mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl'><mechanism>PLAIN</mechanism><mechanism>SCRAM-SHA-1</mechanism></mechanisms></stream:features>"#,
r#"<iq type='result' id='7b0d57bb-6446-4701-92e5-8b9354bbfabe' xmlns='jabber:client'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'><jid>test1@test.moparisthe.best/gajim.12S9XM42</jid></bind></iq>"#,
r#"<iq type='result' id='7b0d57bb-6446-4701-92e5-8b9354bb>fabe' xmlns='jabber:client'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'><jid>test1@test.moparisthe.best/gajim.12S9XM42</jid></bind></iq>"#,
]
);
Ok(())
}
}

View File

@ -3,6 +3,8 @@
incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269" ] incoming_listen = [ "0.0.0.0:5222", "0.0.0.0:5269" ]
# interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet # interfaces to listen for reverse proxy QUIC XMPP connections on, should be open to the internet
quic_listen = [ "0.0.0.0:443" ] quic_listen = [ "0.0.0.0:443" ]
# interfaces to listen for reverse proxy TLS WebSocket (wss) XMPP connections on, should be open to the internet
websocket_listen = [ "0.0.0.0:443" ]
# interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost # interfaces to listen for outgoing proxy TCP XMPP connections on, should be localhost
outgoing_listen = [ "127.0.0.1:15270" ] outgoing_listen = [ "127.0.0.1:15270" ]