2021-04-27 02:02:02 -04:00
|
|
|
use crate::*;
|
|
|
|
|
|
|
|
async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: SocketAddr, max_stanza_size_bytes: usize) -> Result<()> {
|
2021-06-08 00:14:22 -04:00
|
|
|
info!("out {} connected", client_addr);
|
2021-04-27 02:02:02 -04:00
|
|
|
|
|
|
|
let in_filter = StanzaFilter::new(max_stanza_size_bytes);
|
|
|
|
|
|
|
|
let (in_rd, mut in_wr) = tokio::io::split(stream);
|
|
|
|
|
|
|
|
// we naively read 1 byte at a time, which buffering significantly speeds up
|
|
|
|
//let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd);
|
|
|
|
|
|
|
|
// now read to figure out client vs server
|
|
|
|
let (stream_open, is_c2s, in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), client_addr, in_filter).await?;
|
|
|
|
// pull raw reader back out of StanzaReader
|
|
|
|
let mut in_rd = in_rd.0;
|
|
|
|
|
|
|
|
// todo: unsure how legit changing to a string here is...
|
|
|
|
let domain = to_str(stream_open.extract_between(b" to='", b"'").or_else(|_| stream_open.extract_between(b" to=\"", b"\""))?);
|
|
|
|
|
2021-06-08 00:14:22 -04:00
|
|
|
info!("out {} is_c2s: {}, domain: {}", client_addr, is_c2s, domain);
|
2021-04-27 02:02:02 -04:00
|
|
|
|
2021-06-08 00:14:22 -04:00
|
|
|
trace!("out < {} {} '{}'", client_addr, c2s(is_c2s), to_str(&stream_open));
|
2021-04-27 02:02:02 -04:00
|
|
|
let (mut out_wr, mut out_rd, stream_open) = srv_connect(&domain, is_c2s, &stream_open, &mut in_filter).await?;
|
|
|
|
// send server response to client
|
|
|
|
in_wr.write_all(&stream_open).await?;
|
|
|
|
in_wr.flush().await?;
|
|
|
|
drop(stream_open);
|
|
|
|
|
|
|
|
let mut out_buf = [0u8; OUT_BUFFER_SIZE];
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
Ok(buf) = out_rd.next(&mut in_filter) => {
|
|
|
|
match buf {
|
|
|
|
None => break,
|
|
|
|
Some(buf) => {
|
2021-06-08 00:14:22 -04:00
|
|
|
trace!("out < {} {} '{}'", domain, c2s(is_c2s), to_str(buf));
|
2021-04-27 02:02:02 -04:00
|
|
|
in_wr.write_all(buf).await?;
|
|
|
|
in_wr.flush().await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
// we could filter outgoing from-client stanzas by size here too by doing same as above
|
|
|
|
// but instead, we'll just send whatever the client sends as it sends it...
|
|
|
|
Ok(n) = in_rd.read(&mut out_buf) => {
|
|
|
|
if n == 0 {
|
|
|
|
break;
|
|
|
|
}
|
2021-06-08 00:14:22 -04:00
|
|
|
trace!("out > {} {} '{}'", domain, c2s(is_c2s), to_str(&out_buf[0..n]));
|
2021-04-27 02:02:02 -04:00
|
|
|
out_wr.write_all(&out_buf[0..n]).await?;
|
|
|
|
out_wr.flush().await?;
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-08 00:14:22 -04:00
|
|
|
info!("out {} disconnected", client_addr);
|
2021-04-27 02:02:02 -04:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn spawn_outgoing_listener(local_addr: SocketAddr, max_stanza_size_bytes: usize) -> JoinHandle<Result<()>> {
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface");
|
|
|
|
loop {
|
|
|
|
let (stream, client_addr) = listener.accept().await?;
|
|
|
|
tokio::spawn(async move {
|
|
|
|
if let Err(e) = handle_outgoing_connection(stream, client_addr, max_stanza_size_bytes).await {
|
2021-06-08 00:14:22 -04:00
|
|
|
error!("{} {}", client_addr, e);
|
2021-04-27 02:02:02 -04:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
#[allow(unreachable_code)]
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|