2021-04-27 02:02:02 -04:00
use crate ::* ;
use futures ::StreamExt ;
2022-01-19 01:09:32 -05:00
use quinn ::{ ServerConfig , TransportConfig } ;
use rustls ::client ::ClientConfig ;
2021-04-27 02:02:02 -04:00
use std ::{ net ::SocketAddr , sync ::Arc } ;
use tokio ::io ::{ AsyncRead , AsyncWrite , ReadBuf } ;
use anyhow ::Result ;
2022-02-10 01:18:35 -05:00
pub async fn quic_connect ( target : SocketAddr , server_name : & str , is_c2s : bool ) -> Result < ( StanzaWrite , StanzaRead ) > {
2021-04-27 02:02:02 -04:00
let bind_addr = " 0.0.0.0:0 " . parse ( ) . unwrap ( ) ;
2022-01-19 01:09:32 -05:00
let mut client_cfg = ClientConfig ::builder ( ) . with_safe_defaults ( ) . with_root_certificates ( root_cert_store ( ) ) . with_no_client_auth ( ) ; // todo: for s2s do client auth
client_cfg . alpn_protocols . push ( if is_c2s { ALPN_XMPP_CLIENT } else { ALPN_XMPP_SERVER } . to_vec ( ) ) ;
let mut endpoint = quinn ::Endpoint ::client ( bind_addr ) ? ;
endpoint . set_default_client_config ( quinn ::ClientConfig ::new ( Arc ::new ( client_cfg ) ) ) ;
2021-04-27 02:02:02 -04:00
// connect to server
2022-01-19 01:09:32 -05:00
let quinn ::NewConnection { connection , .. } = endpoint . connect ( target , server_name ) ? . await ? ;
2021-06-08 00:14:22 -04:00
trace! ( " quic connected: addr={} " , connection . remote_address ( ) ) ;
2021-04-27 02:02:02 -04:00
2021-05-15 00:32:36 -04:00
let ( wrt , rd ) = connection . open_bi ( ) . await ? ;
2022-02-10 01:18:35 -05:00
Ok ( ( StanzaWrite ::AsyncWrite ( Box ::new ( wrt ) ) , StanzaRead ::new ( Box ::new ( rd ) ) ) )
2021-04-27 02:02:02 -04:00
}
impl Config {
pub fn quic_server_config ( & self ) -> Result < ServerConfig > {
let transport_config = TransportConfig ::default ( ) ;
// todo: configure transport_config here if needed
2022-01-19 01:09:32 -05:00
let mut server_config = self . server_config ( ) ? ;
// todo: will connecting without alpn work then?
server_config . alpn_protocols . push ( ALPN_XMPP_CLIENT . to_vec ( ) ) ;
server_config . alpn_protocols . push ( ALPN_XMPP_SERVER . to_vec ( ) ) ;
let mut server_config = quinn ::ServerConfig ::with_crypto ( Arc ::new ( server_config ) ) ;
2021-04-27 02:02:02 -04:00
server_config . transport = Arc ::new ( transport_config ) ;
2022-01-19 01:09:32 -05:00
Ok ( server_config )
2021-04-27 02:02:02 -04:00
}
}
struct NoopIo ;
use core ::pin ::Pin ;
use core ::task ::{ Context , Poll } ;
// todo: could change this to return Error and kill the stream instead, after all, s2s *should* not be receiving any bytes back
impl AsyncWrite for NoopIo {
fn poll_write ( self : Pin < & mut Self > , _cx : & mut Context < '_ > , buf : & [ u8 ] ) -> Poll < io ::Result < usize > > {
Poll ::Ready ( Ok ( buf . len ( ) ) )
}
fn poll_flush ( self : Pin < & mut Self > , _cx : & mut Context < '_ > ) -> Poll < io ::Result < ( ) > > {
Poll ::Ready ( Ok ( ( ) ) )
}
fn poll_shutdown ( self : Pin < & mut Self > , _cx : & mut Context < '_ > ) -> Poll < io ::Result < ( ) > > {
Poll ::Ready ( Ok ( ( ) ) )
}
}
impl AsyncRead for NoopIo {
fn poll_read ( self : Pin < & mut Self > , _cx : & mut Context < '_ > , _buf : & mut ReadBuf < '_ > ) -> Poll < io ::Result < ( ) > > {
Poll ::Pending
}
}
pub fn spawn_quic_listener ( local_addr : SocketAddr , config : CloneableConfig , server_config : ServerConfig ) -> JoinHandle < Result < ( ) > > {
2022-01-19 01:09:32 -05:00
let ( _endpoint , mut incoming ) = quinn ::Endpoint ::server ( server_config , local_addr ) . die ( " cannot listen on port/interface " ) ;
2021-04-27 02:02:02 -04:00
tokio ::spawn ( async move {
2021-05-23 22:27:12 -04:00
// when could this return None, do we quit?
while let Some ( incoming_conn ) = incoming . next ( ) . await {
let config = config . clone ( ) ;
tokio ::spawn ( async move {
if let Ok ( mut new_conn ) = incoming_conn . await {
2021-07-24 01:53:00 -04:00
let client_addr = crate ::Context ::new ( " quic-in " , new_conn . connection . remote_address ( ) ) ;
info! ( " {} connected new connection " , client_addr . log_from ( ) ) ;
2021-04-27 02:02:02 -04:00
2021-05-23 22:27:12 -04:00
while let Some ( Ok ( ( wrt , rd ) ) ) = new_conn . bi_streams . next ( ) . await {
let config = config . clone ( ) ;
2021-07-24 01:53:00 -04:00
let mut client_addr = client_addr . clone ( ) ;
info! ( " {} connected new stream " , client_addr . log_from ( ) ) ;
2021-05-23 22:27:12 -04:00
tokio ::spawn ( async move {
2022-02-10 01:18:35 -05:00
if let Err ( e ) = shuffle_rd_wr ( StanzaRead ::new ( Box ::new ( rd ) ) , StanzaWrite ::new ( Box ::new ( wrt ) ) , config , local_addr , & mut client_addr ) . await {
2021-07-24 01:53:00 -04:00
error! ( " {} {} " , client_addr . log_from ( ) , e ) ;
2021-05-23 22:27:12 -04:00
}
} ) ;
2021-05-15 00:32:36 -04:00
}
2021-05-23 22:27:12 -04:00
}
} ) ;
}
2021-07-24 01:53:00 -04:00
error! ( " quic listener shutting down, should never happen???? " ) ;
2021-04-27 02:02:02 -04:00
#[ allow(unreachable_code) ]
Ok ( ( ) )
} )
}