From 90a5f1837cfa2b95a17cc11f0bc3a4513e1495b0 Mon Sep 17 00:00:00 2001 From: moparisthebest Date: Sat, 24 Jul 2021 01:53:00 -0400 Subject: [PATCH] much improved logging --- Cargo.lock | 5 ++- Cargo.toml | 6 ++- README.md | 3 +- src/lib.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++- src/main.rs | 40 +++++++++-------- src/outgoing.rs | 27 ++++++------ src/quic.rs | 12 ++--- src/srv.rs | 50 +++++++++++---------- src/tls.rs | 22 +++++----- xmpp-proxy.toml | 3 ++ 10 files changed, 205 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2eacd07..628541a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -130,9 +130,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.8.3" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" +checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" dependencies = [ "atty", "humantime", @@ -1163,6 +1163,7 @@ dependencies = [ "lazy_static", "log", "quinn", + "rand", "serde", "serde_derive", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 48110f6..f0e578a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,8 @@ tokio = { version = "1.4", features = ["net", "rt", "rt-multi-thread", "macros", # logging deps log = "0.4" -env_logger = { version = "0.8", optional = true } +rand = { version = "0.8", optional = true, features = [] } +env_logger = { version = "0.9", optional = true, features = [] } # incoming deps tokio-rustls = { version = "0.22", optional = true } @@ -45,7 +46,7 @@ trust-dns-resolver = { version = "0.20", optional = true } quinn = { version = "0.7", optional = true } [features] -default = ["incoming", "outgoing", "quic", "env_logger"] +default = ["incoming", "outgoing", "quic", "logging"] #default = ["incoming", "outgoing"] #default = ["incoming", "quic"] #default = ["outgoing", "quic"] @@ -55,3 +56,4 @@ default = ["incoming", "outgoing", "quic", "env_logger"] incoming = ["tokio-rustls"] outgoing = ["tokio-rustls", "trust-dns-resolver", "webpki-roots", "lazy_static"] quic = ["quinn"] +logging = ["rand", "env_logger"] diff --git a/README.md b/README.md index 3598926..f207483 100644 --- a/README.md +++ b/README.md @@ -140,5 +140,4 @@ Thanks [rxml](https://github.com/horazont/rxml) for afl-fuzz seeds #### todo 1. sasl external for s2s, initiating and receiving - 2. better debug log output - 3. websocket incoming and outgoing, maybe even for s2s + 2. websocket incoming and outgoing, maybe even for s2s diff --git a/src/lib.rs b/src/lib.rs index 765feab..4a8029d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,12 @@ mod stanzafilter; pub use stanzafilter::*; -pub use log::{debug, error, info, trace}; +mod slicesubsequence; +use slicesubsequence::*; + +use std::net::SocketAddr; + +pub use log::{debug, error, info, log_enabled, trace}; pub fn to_str(buf: &[u8]) -> std::borrow::Cow<'_, str> { String::from_utf8_lossy(buf) @@ -14,3 +19,110 @@ pub fn c2s(is_c2s: bool) -> &'static str { "s2s" } } + +#[derive(Clone)] +pub struct Context<'a> { + conn_id: String, + log_from: String, + log_to: String, + proto: &'a str, + is_c2s: Option, + to: Option, + to_addr: Option, + from: Option, + client_addr: SocketAddr, +} + +impl<'a> Context<'a> { + pub fn new(proto: &'static str, client_addr: SocketAddr) -> Context { + let (log_to, log_from, conn_id) = if log_enabled!(log::Level::Info) { + #[cfg(feature = "env_logger")] + let conn_id = { + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; + thread_rng().sample_iter(&Alphanumeric).take(10).map(char::from).collect() + }; + #[cfg(not(feature = "env_logger"))] + let conn_id = "".to_string(); + ( + format!("{}: ({} <- ({}-unk)):", conn_id, client_addr, proto), + format!("{}: ({} -> ({}-unk)):", conn_id, client_addr, proto), + conn_id, + ) + } else { + ("".to_string(), "".to_string(), "".to_string()) + }; + + Context { + conn_id, + log_from, + log_to, + proto, + client_addr, + is_c2s: None, + to: None, + to_addr: None, + from: None, + } + } + + fn re_calc(&mut self) { + // todo: make this good + self.log_from = format!( + "{}: ({} ({}) -> ({}-{}) -> {} ({})):", + self.conn_id, + self.client_addr, + if self.from.is_some() { self.from.as_ref().unwrap() } else { "unk" }, + self.proto, + if self.is_c2s.is_some() { c2s(self.is_c2s.unwrap()) } else { "unk" }, + if self.to_addr.is_some() { self.to_addr.as_ref().unwrap().to_string() } else { "unk".to_string() }, + if self.to.is_some() { self.to.as_ref().unwrap() } else { "unk" }, + ); + self.log_to = self.log_from.replace(" -> ", " <- "); + } + + pub fn log_from(&self) -> &str { + &self.log_from + } + + pub fn log_to(&self) -> &str { + &self.log_to + } + + pub fn client_addr(&self) -> &SocketAddr { + &self.client_addr + } + + pub fn set_proto(&mut self, proto: &'static str) { + if log_enabled!(log::Level::Info) { + self.proto = proto; + self.to_addr = None; + self.re_calc(); + } + } + + pub fn set_c2s_stream_open(&mut self, is_c2s: bool, stream_open: &[u8]) { + if log_enabled!(log::Level::Info) { + self.is_c2s = Some(is_c2s); + self.from = stream_open + .extract_between(b" from='", b"'") + .or_else(|_| stream_open.extract_between(b" from=\"", b"\"")) + .map(|b| to_str(b).to_string()) + .ok(); + self.to = stream_open + .extract_between(b" to='", b"'") + .or_else(|_| stream_open.extract_between(b" to=\"", b"\"")) + .map(|b| to_str(b).to_string()) + .ok(); + self.re_calc(); + info!("{} stream data set", &self.log_from()); + } + } + + pub fn set_to_addr(&mut self, to_addr: SocketAddr) { + if log_enabled!(log::Level::Info) { + self.to_addr = Some(to_addr); + self.re_calc(); + } + } +} diff --git a/src/main.rs b/src/main.rs index 6dc4dde..a83caaf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,8 +61,8 @@ struct Config { quic_listen: Option>, outgoing_listen: Option>, max_stanza_size_bytes: usize, - s2s_target: String, - c2s_target: String, + s2s_target: SocketAddr, + c2s_target: SocketAddr, proxy: bool, #[cfg(feature = "env_logger")] log_level: Option, @@ -73,8 +73,8 @@ struct Config { #[derive(Clone)] pub struct CloneableConfig { max_stanza_size_bytes: usize, - s2s_target: String, - c2s_target: String, + s2s_target: SocketAddr, + c2s_target: SocketAddr, proxy: bool, } @@ -111,7 +111,7 @@ impl Config { } } -async fn shuffle_rd_wr(in_rd: R, in_wr: W, config: CloneableConfig, local_addr: SocketAddr, client_addr: SocketAddr) -> Result<()> { +async fn shuffle_rd_wr(in_rd: R, in_wr: W, config: CloneableConfig, local_addr: SocketAddr, client_addr: &mut Context<'_>) -> Result<()> { let filter = StanzaFilter::new(config.max_stanza_size_bytes); shuffle_rd_wr_filter(in_rd, in_wr, config, local_addr, client_addr, filter).await } @@ -121,18 +121,18 @@ async fn shuffle_rd_wr_filter( mut in_wr: W, config: CloneableConfig, local_addr: SocketAddr, - client_addr: SocketAddr, + client_addr: &mut Context<'_>, in_filter: StanzaFilter, ) -> Result<()> { // we naively read 1 byte at a time, which buffering significantly speeds up let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd); // now read to figure out client vs server - let (stream_open, is_c2s, mut in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), client_addr, in_filter).await?; + let (stream_open, is_c2s, mut in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), &client_addr, in_filter).await?; let target = if is_c2s { config.c2s_target } else { config.s2s_target }; - - info!("{} is_c2s: {}, target: {}", client_addr, is_c2s, target); + client_addr.set_to_addr(target); + client_addr.set_c2s_stream_open(is_c2s, &stream_open); let out_stream = tokio::net::TcpStream::connect(target).await?; let (mut out_rd, mut out_wr) = tokio::io::split(out_stream); @@ -149,17 +149,17 @@ async fn shuffle_rd_wr_filter( write!( &mut in_filter.buf[0..], "PROXY TCP{} {} {} {} {}\r\n", - if client_addr.is_ipv4() { '4' } else { '6' }, - client_addr.ip(), + if client_addr.client_addr().is_ipv4() { '4' } else { '6' }, + client_addr.client_addr().ip(), local_addr.ip(), - client_addr.port(), + client_addr.client_addr().port(), local_addr.port() )?; let end_idx = &(&in_filter.buf[0..]).first_index_of(b"\n")? + 1; - trace!("< {} {} '{}'", client_addr, c2s(is_c2s), to_str(&in_filter.buf[0..end_idx])); + trace!("{} '{}'", client_addr.log_from(), to_str(&in_filter.buf[0..end_idx])); out_wr.write_all(&in_filter.buf[0..end_idx]).await?; } - trace!("< {} {} '{}'", client_addr, c2s(is_c2s), to_str(&stream_open)); + trace!("{} '{}'", client_addr.log_from(), to_str(&stream_open)); out_wr.write_all(&stream_open).await?; out_wr.flush().await?; drop(stream_open); @@ -172,7 +172,7 @@ async fn shuffle_rd_wr_filter( match buf { None => break, Some(buf) => { - trace!("< {} {} '{}'", client_addr, c2s(is_c2s), to_str(buf)); + trace!("{} '{}'", client_addr.log_from(), to_str(buf)); out_wr.write_all(buf).await?; out_wr.flush().await?; } @@ -184,21 +184,21 @@ async fn shuffle_rd_wr_filter( if n == 0 { break; } - trace!("> {} {} '{}'", client_addr, c2s(is_c2s), to_str(&out_buf[0..n])); + trace!("{} '{}'", client_addr.log_to(), to_str(&out_buf[0..n])); in_wr.write_all(&out_buf[0..n]).await?; in_wr.flush().await?; }, } } - info!("{} disconnected", client_addr); + info!("{} disconnected", client_addr.log_from()); Ok(()) } -async fn stream_preamble(mut in_rd: StanzaReader, client_addr: SocketAddr, mut in_filter: StanzaFilter) -> Result<(Vec, bool, StanzaReader, StanzaFilter)> { +async fn stream_preamble(mut in_rd: StanzaReader, client_addr: &Context<'_>, mut in_filter: StanzaFilter) -> Result<(Vec, bool, StanzaReader, StanzaFilter)> { let mut stream_open = Vec::new(); while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await { - trace!("received pre- stanza: {} '{}'", client_addr, to_str(&buf)); + trace!("{} received pre- stanza: '{}'", client_addr.log_from(), to_str(&buf)); if buf.starts_with(b" Result<()> { - info!("out {} connected", client_addr); +async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: &mut Context<'_>, max_stanza_size_bytes: usize) -> Result<()> { + info!("{} connected", client_addr.log_from()); let in_filter = StanzaFilter::new(max_stanza_size_bytes); @@ -11,17 +11,15 @@ async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: //let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd); // now read to figure out client vs server - let (stream_open, is_c2s, in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), client_addr, in_filter).await?; + let (stream_open, is_c2s, in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), &client_addr, in_filter).await?; + client_addr.set_c2s_stream_open(is_c2s, &stream_open); // pull raw reader back out of StanzaReader let mut in_rd = in_rd.0; - // todo: unsure how legit changing to a string here is... - let domain = to_str(stream_open.extract_between(b" to='", b"'").or_else(|_| stream_open.extract_between(b" to=\"", b"\""))?); + // we require a valid to= here or we fail + let to = std::str::from_utf8(stream_open.extract_between(b" to='", b"'").or_else(|_| stream_open.extract_between(b" to=\"", b"\""))?)?; - info!("out {} is_c2s: {}, domain: {}", client_addr, is_c2s, domain); - - trace!("out < {} {} '{}'", client_addr, c2s(is_c2s), to_str(&stream_open)); - let (mut out_wr, mut out_rd, stream_open) = srv_connect(&domain, is_c2s, &stream_open, &mut in_filter).await?; + let (mut out_wr, mut out_rd, stream_open) = srv_connect(&to, is_c2s, &stream_open, &mut in_filter, client_addr).await?; // send server response to client in_wr.write_all(&stream_open).await?; in_wr.flush().await?; @@ -35,7 +33,7 @@ async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: match buf { None => break, Some(buf) => { - trace!("out < {} {} '{}'", domain, c2s(is_c2s), to_str(buf)); + trace!("{} '{}'", client_addr.log_to(), to_str(buf)); in_wr.write_all(buf).await?; in_wr.flush().await?; } @@ -47,14 +45,14 @@ async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: if n == 0 { break; } - trace!("out > {} {} '{}'", domain, c2s(is_c2s), to_str(&out_buf[0..n])); + trace!("{} '{}'", client_addr.log_from(), to_str(&out_buf[0..n])); out_wr.write_all(&out_buf[0..n]).await?; out_wr.flush().await?; }, } } - info!("out {} disconnected", client_addr); + info!("{} disconnected", client_addr.log_from()); Ok(()) } @@ -64,8 +62,9 @@ pub fn spawn_outgoing_listener(local_addr: SocketAddr, max_stanza_size_bytes: us loop { let (stream, client_addr) = listener.accept().await?; tokio::spawn(async move { - if let Err(e) = handle_outgoing_connection(stream, client_addr, max_stanza_size_bytes).await { - error!("{} {}", client_addr, e); + let mut client_addr = Context::new("unk-out", client_addr); + if let Err(e) = handle_outgoing_connection(stream, &mut client_addr, max_stanza_size_bytes).await { + error!("{} {}", client_addr.log_from(), e); } }); } diff --git a/src/quic.rs b/src/quic.rs index 40c9c82..72312c6 100644 --- a/src/quic.rs +++ b/src/quic.rs @@ -78,21 +78,23 @@ pub fn spawn_quic_listener(local_addr: SocketAddr, config: CloneableConfig, serv let config = config.clone(); tokio::spawn(async move { if let Ok(mut new_conn) = incoming_conn.await { - let client_addr = new_conn.connection.remote_address(); - info!("{} quic connected", client_addr); + let client_addr = crate::Context::new("quic-in", new_conn.connection.remote_address()); + info!("{} connected new connection", client_addr.log_from()); while let Some(Ok((wrt, rd))) = new_conn.bi_streams.next().await { let config = config.clone(); + let mut client_addr = client_addr.clone(); + info!("{} connected new stream", client_addr.log_from()); tokio::spawn(async move { - if let Err(e) = shuffle_rd_wr(rd, wrt, config, local_addr, client_addr).await { - error!("{} {}", client_addr, e); + if let Err(e) = shuffle_rd_wr(rd, wrt, config, local_addr, &mut client_addr).await { + error!("{} {}", client_addr.log_from(), e); } }); } } }); } - info!("quic listener shutting down, should never happen????"); + error!("quic listener shutting down, should never happen????"); #[allow(unreachable_code)] Ok(()) }) diff --git a/src/srv.rs b/src/srv.rs index 947a638..e3641e3 100644 --- a/src/srv.rs +++ b/src/srv.rs @@ -46,29 +46,30 @@ impl XmppConnection { is_c2s: bool, stream_open: &[u8], mut in_filter: &mut crate::StanzaFilter, - ) -> Result<(Box, Box)> { + client_addr: &mut Context<'_>, + ) -> Result<(Box, Box, SocketAddr, &'static str)> { + debug!("{} attempting connection to SRV: {:?}", client_addr.log_from(), self); // todo: need to set options to Ipv4AndIpv6 let ips = RESOLVER.lookup_ip(self.target.clone()).await?; - debug!("trying 1 domain {}, SRV: {:?}", domain, self); for ip in ips.iter() { - debug!("trying domain {}, ip {}, is_c2s: {}, SRV: {:?}", domain, ip, is_c2s, self); + let to_addr = SocketAddr::new(ip, self.port); + debug!("{} trying ip {}", client_addr.log_from(), to_addr); match self.conn_type { - XmppConnectionType::StartTLS => match crate::starttls_connect(SocketAddr::new(ip, self.port), domain, is_c2s, &stream_open, &mut in_filter).await { - Ok((wr, rd)) => return Ok((wr, rd)), - Err(e) => error!("starttls connection failed to IP {} from SRV {}, error: {}", ip, self.target, e), + XmppConnectionType::StartTLS => match crate::starttls_connect(to_addr, domain, is_c2s, &stream_open, &mut in_filter).await { + Ok((wr, rd)) => return Ok((wr, rd, to_addr, "starttls-out")), + Err(e) => error!("starttls connection failed to IP {} from SRV {}, error: {}", to_addr, self.target, e), }, - XmppConnectionType::DirectTLS => match crate::tls_connect(SocketAddr::new(ip, self.port), domain, is_c2s).await { - Ok((wr, rd)) => return Ok((wr, rd)), - Err(e) => error!("direct tls connection failed to IP {} from SRV {}, error: {}", ip, self.target, e), + XmppConnectionType::DirectTLS => match crate::tls_connect(to_addr, domain, is_c2s).await { + Ok((wr, rd)) => return Ok((wr, rd, to_addr, "directtls-out")), + Err(e) => error!("direct tls connection failed to IP {} from SRV {}, error: {}", to_addr, self.target, e), }, #[cfg(feature = "quic")] - XmppConnectionType::QUIC => match crate::quic_connect(SocketAddr::new(ip, self.port), domain, is_c2s).await { - Ok((wr, rd)) => return Ok((wr, rd)), - Err(e) => error!("quic connection failed to IP {} from SRV {}, error: {}", ip, self.target, e), + XmppConnectionType::QUIC => match crate::quic_connect(to_addr, domain, is_c2s).await { + Ok((wr, rd)) => return Ok((wr, rd, to_addr, "quic-out")), + Err(e) => error!("quic connection failed to IP {} from SRV {}, error: {}", to_addr, self.target, e), }, } } - debug!("trying 2 domain {}, SRV: {:?}", domain, self); bail!("cannot connect to any IPs for SRV: {}", self.target) } } @@ -160,19 +161,23 @@ pub async fn srv_connect( is_c2s: bool, stream_open: &[u8], mut in_filter: &mut crate::StanzaFilter, + client_addr: &mut Context<'_>, ) -> Result<(Box, StanzaReader>>, Vec)> { for srv in get_xmpp_connections(&domain, is_c2s).await? { - debug!("main srv: {:?}", srv); - let connect = srv.connect(&domain, is_c2s, &stream_open, &mut in_filter).await; + let connect = srv.connect(&domain, is_c2s, &stream_open, &mut in_filter, client_addr).await; if connect.is_err() { continue; } - let (mut out_wr, out_rd) = connect.unwrap(); - debug!("main srv out: {:?}", srv); + let (mut out_wr, out_rd, to_addr, proto) = connect.unwrap(); + // if any of these ? returns early with an Err, these will stay set, I think that's ok though, the connection will be closed + client_addr.set_proto(proto); + client_addr.set_to_addr(to_addr); + debug!("{} connected", client_addr.log_from()); // we naively read 1 byte at a time, which buffering significantly speeds up let mut out_rd = StanzaReader(tokio::io::BufReader::with_capacity(crate::IN_BUFFER_SIZE, out_rd)); + trace!("{} '{}'", client_addr.log_from(), to_str(&stream_open)); out_wr.write_all(&stream_open).await?; out_wr.flush().await?; @@ -180,7 +185,7 @@ pub async fn srv_connect( // let's read to first Result<()> { - info!("{} connected", client_addr); +async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: &mut Context<'_>, local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> Result<()> { + info!("{} connected", client_addr.log_from()); let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes); @@ -141,7 +142,8 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: S p[0] == 0x16 && p[1] == 0x03 && p[2] <= 0x03 }; - info!("{} direct_tls: {}", client_addr, direct_tls); + client_addr.set_proto(if direct_tls { "directtls-in" } else { "starttls-in" }); + info!("{} direct_tls sniffed", client_addr.log_from()); // starttls if !direct_tls { @@ -153,9 +155,9 @@ async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: S let mut in_rd = StanzaReader(in_rd); while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await { - trace!("received pre-tls stanza: {} '{}'", client_addr, to_str(&buf)); + trace!("{} received pre-tls stanza: '{}'", client_addr.log_from(), to_str(&buf)); if buf.starts_with(b" {} '{}'", client_addr, to_str(&buf)); + trace!("{} '{}'", client_addr.log_to(), to_str(&buf)); in_wr.write_all(&buf).await?; in_wr.flush().await?; } else if buf.starts_with(b" {} '{}'", client_addr, to_str(&buf)); + trace!("{} '{}'", client_addr.log_to(), to_str(&buf)); in_wr.write_all(&buf).await?; // ejabberd never sends with the first, only the second? //let buf = br###""###; let buf = br###""###; - trace!("> {} '{}'", client_addr, to_str(buf)); + trace!("{} '{}'", client_addr.log_to(), to_str(buf)); in_wr.write_all(buf).await?; in_wr.flush().await?; } else if buf.starts_with(b""###; - trace!("> {} '{}'", client_addr, to_str(buf)); + trace!("{} '{}'", client_addr.log_to(), to_str(buf)); in_wr.write_all(buf).await?; in_wr.flush().await?; proceed_sent = true; diff --git a/xmpp-proxy.toml b/xmpp-proxy.toml index 8e03a1c..a12a379 100644 --- a/xmpp-proxy.toml +++ b/xmpp-proxy.toml @@ -35,5 +35,8 @@ tls_cert = "/etc/xmpp-proxy/fullchain.cer" # can also set env variables XMPP_PROXY_LOG_LEVEL and/or XMPP_PROXY_LOG_STYLE, but values in this file override them # many options, trace is XML-console-level, refer to: https://docs.rs/env_logger/0.8.3/env_logger/#enabling-logging #log_level = "info" +# for development/debugging: +#log_level = "info,xmpp_proxy=trace" + # one of auto, always, never, refer to: https://docs.rs/env_logger/0.8.3/env_logger/#disabling-colors #log_style = "never" \ No newline at end of file