Major re-factor, add XMPP-over-QUIC and outgoing forward support
moparisthebest/xmpp-proxy/pipeline/head This commit looks good Details

This commit is contained in:
Travis Burtrum 2021-04-27 02:02:02 -04:00
parent 6a41f503d6
commit 377fbe0b73
10 changed files with 1343 additions and 159 deletions

572
Cargo.lock generated
View File

@ -6,6 +6,17 @@ version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b"
[[package]]
name = "async-trait"
version = "0.1.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.0.1"
@ -18,6 +29,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bumpalo"
version = "3.6.1"
@ -42,12 +59,65 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "core-foundation"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b"
[[package]]
name = "ct-logs"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8"
dependencies = [
"sct",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "die"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8634d5e6139f7364a4e99bd718b2f511f2f25863146360e70909bc45a016290"
[[package]]
name = "enum-as-inner"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c5f0096a91d210159eceb2ff5e1c4da18388a170e1e3ce948aac9c8fdbbf595"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "form_urlencoded"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
dependencies = [
"matches",
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.14"
@ -140,6 +210,26 @@ dependencies = [
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "heck"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.18"
@ -149,6 +239,55 @@ dependencies = [
"libc",
]
[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
dependencies = [
"libc",
"match_cfg",
"winapi",
]
[[package]]
name = "idna"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
dependencies = [
"matches",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipconfig"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7e2f18aece9709094573a9f24f483c4f65caa4298e2f7ae1b71cc65d853fad7"
dependencies = [
"socket2",
"widestring",
"winapi",
"winreg",
]
[[package]]
name = "ipnet"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
[[package]]
name = "js-sys"
version = "0.3.50"
@ -170,6 +309,21 @@ version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41"
[[package]]
name = "linked-hash-map"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3c91c24eae6777794bb1997ad98bbb87daf92890acab859f7eaa4320333176"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
@ -179,6 +333,27 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matches"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "memchr"
version = "2.3.4"
@ -232,6 +407,43 @@ version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "openssl-probe"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project-lite"
version = "0.2.6"
@ -244,6 +456,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -265,6 +483,51 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quinn"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c82c0a393b300104f989f3db8b8637c0d11f7a32a9c214560b47849ba8f119aa"
dependencies = [
"bytes",
"futures",
"lazy_static",
"libc",
"mio",
"quinn-proto",
"rustls",
"socket2",
"thiserror",
"tokio",
"tracing",
"webpki",
]
[[package]]
name = "quinn-proto"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09027365a21874b71e1fbd9d31cb99bff8e11ba81cc9ef2b9425bb607e42d3b2"
dependencies = [
"bytes",
"ct-logs",
"rand",
"ring",
"rustls",
"rustls-native-certs",
"slab",
"thiserror",
"tinyvec",
"tracing",
"webpki",
]
[[package]]
name = "quote"
version = "1.0.9"
@ -274,6 +537,65 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8270314b5ccceb518e7e578952f0b72b88222d02e8f77f5ecf7abbb673539041"
dependencies = [
"bitflags",
]
[[package]]
name = "resolv-conf"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00"
dependencies = [
"hostname",
"quick-error",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -302,6 +624,34 @@ dependencies = [
"webpki",
]
[[package]]
name = "rustls-native-certs"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092"
dependencies = [
"openssl-probe",
"rustls",
"schannel",
"security-framework",
]
[[package]]
name = "schannel"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
dependencies = [
"lazy_static",
"winapi",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.1"
@ -312,6 +662,29 @@ dependencies = [
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3670b1d2fdf6084d192bc71ead7aabe6c06aa2ea3fbd9cc3ac111fa5c2b1bd84"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3676258fd3cfe2c9a0ec99ce3038798d847ce3e4bb17746373eb9f0f1ac16339"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "1.0.125"
@ -338,6 +711,23 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
@ -355,6 +745,41 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "thiserror"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tinyvec"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b5220f05bb7de7f3f53c7c065e1199b3172696fe2db9f9c4d8ad9b4ee74c342"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.5.0"
@ -402,6 +827,107 @@ dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01ebdc2bb4498ab1ab5f5b73c5803825e60199229ccba0698170e3be0e7f959f"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f"
dependencies = [
"lazy_static",
]
[[package]]
name = "trust-dns-proto"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "952a078337565ba39007de99b151770f41039253a31846f0a3d5cd5a4ac8eedf"
dependencies = [
"async-trait",
"cfg-if",
"data-encoding",
"enum-as-inner",
"futures-channel",
"futures-io",
"futures-util",
"idna",
"ipnet",
"lazy_static",
"log",
"rand",
"smallvec",
"thiserror",
"tinyvec",
"tokio",
"url",
]
[[package]]
name = "trust-dns-resolver"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9c97f7d103e0f94dbe384a57908833505ae5870126492f166821b7cf685589"
dependencies = [
"cfg-if",
"futures-util",
"ipconfig",
"lazy_static",
"log",
"lru-cache",
"parking_lot",
"resolv-conf",
"smallvec",
"thiserror",
"tokio",
"trust-dns-proto",
]
[[package]]
name = "unicode-bidi"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeb8be209bb1c96b7c177c7420d26e04eccacb0eeae6b980e35fcb74678107e0"
dependencies = [
"matches",
]
[[package]]
name = "unicode-normalization"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07fbfce1c8a97d547e8b5334978438d9d6ec8c20e38f56d4a4374d181493eaef"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796"
[[package]]
name = "unicode-xid"
version = "0.2.1"
@ -414,6 +940,24 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b"
dependencies = [
"form_urlencoded",
"idna",
"matches",
"percent-encoding",
]
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.73"
@ -488,6 +1032,21 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [
"webpki",
]
[[package]]
name = "widestring"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c168940144dd21fd8046987c16a46a33d5fc84eec29ef9dcddc2ac9e31526b7c"
[[package]]
name = "winapi"
version = "0.3.9"
@ -510,6 +1069,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "winreg"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2986deb581c4fe11b621998a5e53361efe6b48a151178d0cd9eeffa4dc6acc9"
dependencies = [
"winapi",
]
[[package]]
name = "xmpp-proxy"
version = "1.0.0"
@ -517,9 +1085,13 @@ dependencies = [
"anyhow",
"die",
"futures",
"lazy_static",
"quinn",
"serde",
"serde_derive",
"tokio",
"tokio-rustls",
"toml",
"trust-dns-resolver",
"webpki-roots",
]

View File

@ -24,7 +24,30 @@ toml = "0.5"
serde_derive = "1.0"
serde = { version = "1.0", features = ["derive"] }
futures = "0.3"
die = "0.2.0"
die = "0.2"
anyhow = "1.0"
tokio = { version = "1.4", features = ["net", "rt", "rt-multi-thread", "macros", "io-util"] }
tokio-rustls = "0.22"
# incoming deps
tokio-rustls = { version = "0.22", optional = true }
# outgoing deps
lazy_static = { version = "1.4", optional = true }
webpki-roots = { version = "0.21", optional = true }
trust-dns-resolver = { version = "0.20", optional = true }
#trust-dns-resolver = { version = "0.20", features = ["dns-over-rustls"], optional = true }
# quic deps
quinn = { version = "0.7", optional = true }
[features]
default = ["incoming", "outgoing", "quic"]
#default = ["incoming", "outgoing"]
#default = ["incoming", "quic"]
#default = ["outgoing", "quic"]
#default = ["quic"]
#default = ["outgoing"]
#default = ["incoming"]
incoming = ["tokio-rustls"]
outgoing = ["trust-dns-resolver", "webpki-roots", "lazy_static", "tokio-rustls"]
quic = ["quinn"]

View File

@ -1,7 +1,7 @@
use std::ffi::OsString;
use std::fs::File;
use std::io;
use std::io::{BufReader, Read};
use std::io::{BufReader, Read, Write};
use std::iter::Iterator;
use std::net::SocketAddr;
use std::path::Path;
@ -11,13 +11,16 @@ use die::Die;
use serde_derive::Deserialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio_rustls::rustls::internal::pemfile::{certs, pkcs8_private_keys};
use tokio_rustls::rustls::{NoClientAuth, ServerConfig};
use tokio_rustls::TlsAcceptor;
#[cfg(feature = "incoming")]
use tokio_rustls::{
rustls::internal::pemfile::{certs, pkcs8_private_keys},
rustls::{NoClientAuth, ServerConfig},
TlsAcceptor,
};
use anyhow::{bail, Result};
@ -27,9 +30,30 @@ use slicesubsequence::*;
mod stanzafilter;
use stanzafilter::*;
#[cfg(feature = "quic")]
mod quic;
#[cfg(feature = "quic")]
use crate::quic::*;
mod tls;
use crate::tls::*;
#[cfg(feature = "outgoing")]
mod outgoing;
#[cfg(feature = "outgoing")]
use crate::outgoing::*;
#[cfg(feature = "outgoing")]
mod srv;
#[cfg(feature = "outgoing")]
use crate::srv::*;
const IN_BUFFER_SIZE: usize = 8192;
const OUT_BUFFER_SIZE: usize = 8192;
const ALPN_XMPP_CLIENT: &[&[u8]] = &[b"xmpp-client"];
const ALPN_XMPP_SERVER: &[&[u8]] = &[b"xmpp-server"];
#[cfg(debug_assertions)]
fn c2s(is_c2s: bool) -> &'static str {
if is_c2s {
@ -55,7 +79,9 @@ macro_rules! debug {
struct Config {
tls_key: String,
tls_cert: String,
listen: Vec<String>,
listen: Option<Vec<String>>,
quic_listen: Option<Vec<String>>,
outgoing_listen: Option<Vec<String>>,
max_stanza_size_bytes: usize,
s2s_target: String,
c2s_target: String,
@ -63,12 +89,11 @@ struct Config {
}
#[derive(Clone)]
struct CloneableConfig {
pub struct CloneableConfig {
max_stanza_size_bytes: usize,
s2s_target: String,
c2s_target: String,
proxy: bool,
acceptor: TlsAcceptor,
}
impl Config {
@ -79,16 +104,16 @@ impl Config {
Ok(toml::from_str(&input)?)
}
fn get_cloneable_cfg(&self) -> Result<CloneableConfig> {
Ok(CloneableConfig {
fn get_cloneable_cfg(&self) -> CloneableConfig {
CloneableConfig {
max_stanza_size_bytes: self.max_stanza_size_bytes,
s2s_target: self.s2s_target.clone(),
c2s_target: self.c2s_target.clone(),
proxy: self.proxy,
acceptor: self.tls_acceptor()?,
})
}
}
#[cfg(feature = "incoming")]
fn tls_acceptor(&self) -> Result<TlsAcceptor> {
let mut tls_key = pkcs8_private_keys(&mut BufReader::new(File::open(&self.tls_key)?)).map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"))?;
if tls_key.is_empty() {
@ -99,142 +124,61 @@ impl Config {
let tls_cert = certs(&mut BufReader::new(File::open(&self.tls_cert)?)).map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid cert"))?;
let mut config = ServerConfig::new(NoClientAuth::new());
config.set_single_cert(tls_cert, tls_key).map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
config.set_single_cert(tls_cert, tls_key)?;
Ok(TlsAcceptor::from(Arc::new(config)))
}
}
#[derive(PartialEq)]
pub enum AllowedType {
ClientOnly,
ServerOnly,
Any,
}
fn to_str(buf: &[u8]) -> std::borrow::Cow<'_, str> {
//&str {
//std::str::from_utf8(buf).unwrap_or("[invalid utf-8]")
String::from_utf8_lossy(buf)
}
async fn handle_connection(mut stream: tokio::net::TcpStream, client_addr: SocketAddr, local_addr: SocketAddr, config: CloneableConfig) -> Result<()> {
println!("INFO: {} connected", client_addr);
async fn shuffle_rd_wr<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
in_rd: R,
in_wr: W,
config: CloneableConfig,
local_addr: SocketAddr,
client_addr: SocketAddr,
allowed_type: AllowedType,
) -> Result<()> {
let filter = StanzaFilter::new(config.max_stanza_size_bytes);
shuffle_rd_wr_filter(in_rd, in_wr, config, local_addr, client_addr, allowed_type, filter).await
}
let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes);
let direct_tls = {
// sooo... I don't think peek here can be used for > 1 byte without this timer
// craziness... can it? this could be switched to only peek 1 byte and assume
// a leading 0x16 is TLS, it would *probably* be ok ?
//let mut p = [0u8; 3];
let mut p = &mut in_filter.buf[0..3];
// wait up to 10 seconds until 3 bytes have been read
use std::time::{Duration, Instant};
let duration = Duration::from_secs(10);
let now = Instant::now();
loop {
let n = stream.peek(&mut p).await?;
if n == 3 {
break; // success
}
if n == 0 {
bail!("not enough bytes");
}
if Instant::now() - now > duration {
bail!("less than 3 bytes in 10 seconds, closed connection?");
}
}
/* TLS packet starts with a record "Hello" (0x16), followed by version
* (0x03 0x00-0x03) (RFC6101 A.1)
* This means we reject SSLv2 and lower, which is actually a good thing (RFC6176)
*/
p[0] == 0x16 && p[1] == 0x03 && p[2] <= 0x03
};
println!("INFO: {} direct_tls: {}", client_addr, direct_tls);
// starttls
if !direct_tls {
let mut proceed_sent = false;
let (in_rd, mut in_wr) = stream.split();
// we naively read 1 byte at a time, which buffering significantly speeds up
let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd);
let mut in_rd = StanzaReader(in_rd);
while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await {
debug!("received pre-tls stanza: {} '{}'", client_addr, to_str(&buf));
if buf.starts_with(b"<?xml ") {
debug!("> {} '{}'", client_addr, to_str(&buf));
in_wr.write_all(&buf).await?;
in_wr.flush().await?;
} else if buf.starts_with(b"<stream:stream ") {
// gajim seems to REQUIRE an id here...
let buf = if buf.contains_seq(b"id=") {
buf.replace_first(b" id='", b" id='xmpp-proxy")
.replace_first(br#" id=""#, br#" id="xmpp-proxy"#)
.replace_first(b" to=", br#" bla toblala="#)
.replace_first(b" from=", b" to=")
.replace_first(br#" bla toblala="#, br#" from="#)
} else {
buf.replace_first(b" to=", br#" bla toblala="#)
.replace_first(b" from=", b" to=")
.replace_first(br#" bla toblala="#, br#" id='xmpp-proxy' from="#)
};
debug!("> {} '{}'", client_addr, to_str(&buf));
in_wr.write_all(&buf).await?;
// ejabberd never sends <starttls/> with the first, only the second?
//let buf = br###"<features xmlns="http://etherx.jabber.org/streams"><starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls"><required/></starttls></features>"###;
let buf = br###"<stream:features><starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls"><required/></starttls></stream:features>"###;
debug!("> {} '{}'", client_addr, to_str(buf));
in_wr.write_all(buf).await?;
in_wr.flush().await?;
} else if buf.starts_with(b"<starttls ") {
let buf = br###"<proceed xmlns="urn:ietf:params:xml:ns:xmpp-tls" />"###;
debug!("> {} '{}'", client_addr, to_str(buf));
in_wr.write_all(buf).await?;
in_wr.flush().await?;
proceed_sent = true;
break;
} else {
bail!("bad pre-tls stanza: {}", to_str(&buf));
}
}
if !proceed_sent {
bail!("stream ended before open");
}
}
let stream = config.acceptor.accept(stream).await?;
let (in_rd, mut in_wr) = tokio::io::split(stream);
async fn shuffle_rd_wr_filter<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
in_rd: R,
mut in_wr: W,
config: CloneableConfig,
local_addr: SocketAddr,
client_addr: SocketAddr,
allowed_type: AllowedType,
in_filter: StanzaFilter,
) -> Result<()> {
// we naively read 1 byte at a time, which buffering significantly speeds up
let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd);
let mut in_rd = StanzaReader(in_rd);
// now read to figure out client vs server
let (stream_open, is_c2s) = {
let mut stream_open = Vec::new();
let mut ret = None;
let (stream_open, is_c2s, mut in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), client_addr, in_filter).await?;
while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await {
debug!("received pre-<stream:stream> stanza: {} '{}'", client_addr, to_str(&buf));
if buf.starts_with(b"<?xml ") {
stream_open.extend_from_slice(buf);
} else if buf.starts_with(b"<stream:stream ") {
stream_open.extend_from_slice(buf);
//return (stream_open, stanza.contains(r#" xmlns="jabber:client""#) || stanza.contains(r#" xmlns='jabber:client'"#));
ret = Some((stream_open, buf.contains_seq(br#" xmlns="jabber:client""#) || buf.contains_seq(br#" xmlns='jabber:client'"#)));
break;
} else {
bail!("bad pre-<stream:stream> stanza: {}", to_str(&buf));
}
let target = if is_c2s {
if allowed_type == AllowedType::ServerOnly {
bail!("c2s requested when only s2s allowed");
}
if ret.is_some() {
ret.unwrap()
} else {
bail!("stream ended before open");
config.c2s_target
} else {
if allowed_type == AllowedType::ClientOnly {
bail!("s2s requested when only c2s allowed");
}
config.s2s_target
};
let target = if is_c2s { config.c2s_target } else { config.s2s_target };
println!("INFO: {} is_c2s: {}, target: {}", client_addr, is_c2s, target);
let out_stream = tokio::net::TcpStream::connect(target).await?;
@ -249,7 +193,6 @@ async fn handle_connection(mut stream: tokio::net::TcpStream, client_addr: Socke
*/
// tokio AsyncWrite doesn't have write_fmt so have to go through this buffer for some crazy reason
//write!(out_wr, "PROXY TCP{} {} {} {} {}\r\n", if client_addr.is_ipv4() { '4' } else {'6' }, client_addr.ip(), local_addr.ip(), client_addr.port(), local_addr.port())?;
use std::io::Write;
write!(
&mut in_filter.buf[0..],
"PROXY TCP{} {} {} {} {}\r\n",
@ -299,26 +242,25 @@ async fn handle_connection(mut stream: tokio::net::TcpStream, client_addr: Socke
Ok(())
}
/*
async fn handle_connection(mut stream: tokio::net::TcpStream, client_addr: SocketAddr, local_addr: SocketAddr, config: CloneableConfig) -> Result<()> {
Ok(())
}
*/
fn spawn_listener(listener: TcpListener, config: CloneableConfig) -> JoinHandle<Result<()>> {
let local_addr = listener.local_addr().die("could not get local_addr?");
tokio::spawn(async move {
loop {
let (stream, client_addr) = listener.accept().await?;
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, client_addr, local_addr, config).await {
eprintln!("ERROR: {} {}", client_addr, e);
}
});
async fn stream_preamble<R: AsyncRead + Unpin>(mut in_rd: StanzaReader<R>, client_addr: SocketAddr, mut in_filter: StanzaFilter) -> Result<(Vec<u8>, bool, StanzaReader<R>, StanzaFilter)> {
let mut stream_open = Vec::new();
while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await {
debug!("received pre-<stream:stream> stanza: {} '{}'", client_addr, to_str(&buf));
if buf.starts_with(b"<?xml ") {
stream_open.extend_from_slice(buf);
} else if buf.starts_with(b"<stream:stream ") {
stream_open.extend_from_slice(buf);
return Ok((
stream_open,
buf.contains_seq(br#" xmlns="jabber:client""#) || buf.contains_seq(br#" xmlns='jabber:client'"#),
in_rd,
in_filter,
));
} else {
bail!("bad pre-<stream:stream> stanza: {}", to_str(&buf));
}
#[allow(unreachable_code)]
Ok(())
})
}
bail!("stream ended before open");
}
#[tokio::main]
@ -326,12 +268,28 @@ fn spawn_listener(listener: TcpListener, config: CloneableConfig) -> JoinHandle<
async fn main() {
let main_config = Config::parse(std::env::args_os().skip(1).next().unwrap_or(OsString::from("/etc/xmpp-proxy/xmpp-proxy.toml"))).die("invalid config file");
let config = main_config.get_cloneable_cfg().die("invalid cert/key ?");
let config = main_config.get_cloneable_cfg();
let mut handles = Vec::with_capacity(main_config.listen.len());
for listener in main_config.listen {
let listener = TcpListener::bind(&listener).await.die("cannot listen on port/interface");
handles.push(spawn_listener(listener, config.clone()));
let mut handles: Vec<JoinHandle<Result<()>>> = Vec::new();
#[cfg(feature = "incoming")]
if let Some(ref listeners) = main_config.listen {
let acceptor = main_config.tls_acceptor().die("invalid cert/key ?");
for listener in listeners {
handles.push(spawn_tls_listener(listener.parse().die("invalid listener address"), config.clone(), acceptor.clone()));
}
}
#[cfg(feature = "quic")]
if let Some(ref listeners) = main_config.quic_listen {
let quic_config = main_config.quic_server_config().die("invalid cert/key ?");
for listener in listeners {
handles.push(spawn_quic_listener(listener.parse().die("invalid listener address"), config.clone(), quic_config.clone()));
}
}
#[cfg(feature = "outgoing")]
if let Some(ref listeners) = main_config.outgoing_listen {
for listener in listeners {
handles.push(spawn_outgoing_listener(listener.parse().die("invalid listener address"), config.max_stanza_size_bytes));
}
}
futures::future::join_all(handles).await;
}

75
src/outgoing.rs Normal file
View File

@ -0,0 +1,75 @@
use crate::*;
async fn handle_outgoing_connection(stream: tokio::net::TcpStream, client_addr: SocketAddr, max_stanza_size_bytes: usize) -> Result<()> {
println!("INFO: outgoing {} connected", client_addr);
let in_filter = StanzaFilter::new(max_stanza_size_bytes);
let (in_rd, mut in_wr) = tokio::io::split(stream);
// we naively read 1 byte at a time, which buffering significantly speeds up
//let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd);
// now read to figure out client vs server
let (stream_open, is_c2s, in_rd, mut in_filter) = stream_preamble(StanzaReader(in_rd), client_addr, in_filter).await?;
// pull raw reader back out of StanzaReader
let mut in_rd = in_rd.0;
// todo: unsure how legit changing to a string here is...
let domain = to_str(stream_open.extract_between(b" to='", b"'").or_else(|_| stream_open.extract_between(b" to=\"", b"\""))?);
println!("INFO: {} is_c2s: {}, domain: {}", client_addr, is_c2s, domain);
debug!("< {} {} '{}'", client_addr, c2s(is_c2s), to_str(&stream_open));
let (mut out_wr, mut out_rd, stream_open) = srv_connect(&domain, is_c2s, &stream_open, &mut in_filter).await?;
// send server response to client
in_wr.write_all(&stream_open).await?;
in_wr.flush().await?;
drop(stream_open);
let mut out_buf = [0u8; OUT_BUFFER_SIZE];
loop {
tokio::select! {
Ok(buf) = out_rd.next(&mut in_filter) => {
match buf {
None => break,
Some(buf) => {
debug!("< {} {} '{}'", client_addr, c2s(is_c2s), to_str(buf));
in_wr.write_all(buf).await?;
in_wr.flush().await?;
}
}
},
// we could filter outgoing from-client stanzas by size here too by doing same as above
// but instead, we'll just send whatever the client sends as it sends it...
Ok(n) = in_rd.read(&mut out_buf) => {
if n == 0 {
break;
}
debug!("> {} {} '{}'", client_addr, c2s(is_c2s), to_str(&out_buf[0..n]));
out_wr.write_all(&out_buf[0..n]).await?;
out_wr.flush().await?;
},
}
}
println!("INFO: {} disconnected", client_addr);
Ok(())
}
pub fn spawn_outgoing_listener(local_addr: SocketAddr, max_stanza_size_bytes: usize) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface");
loop {
let (stream, client_addr) = listener.accept().await?;
tokio::spawn(async move {
if let Err(e) = handle_outgoing_connection(stream, client_addr, max_stanza_size_bytes).await {
eprintln!("ERROR: {} {}", client_addr, e);
}
});
}
#[allow(unreachable_code)]
Ok(())
})
}

113
src/quic.rs Normal file
View File

@ -0,0 +1,113 @@
use crate::*;
use futures::StreamExt;
use quinn::{ClientConfigBuilder, Endpoint, ServerConfig, ServerConfigBuilder, TransportConfig};
use std::{net::SocketAddr, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use anyhow::Result;
pub async fn quic_connect(target: SocketAddr, server_name: &str, is_c2s: bool) -> Result<(Box<dyn AsyncWrite + Unpin + Send>, Box<dyn AsyncRead + Unpin + Send>)> {
let bind_addr = "0.0.0.0:0".parse().unwrap();
let mut client_cfg = ClientConfigBuilder::default();
client_cfg.protocols(if is_c2s { ALPN_XMPP_CLIENT } else { ALPN_XMPP_SERVER });
let client_cfg = client_cfg.build();
let mut endpoint_builder = Endpoint::builder();
endpoint_builder.default_client_config(client_cfg);
let (endpoint, _incoming) = endpoint_builder.bind(&bind_addr)?;
// connect to server
let quinn::NewConnection { connection, .. } = endpoint.connect(&target, server_name).unwrap().await.unwrap();
debug!("[client] connected: addr={}", connection.remote_address());
if is_c2s {
let (wrt, rd) = connection.open_bi().await.unwrap();
Ok((Box::new(wrt), Box::new(rd)))
} else {
let wrt = connection.open_uni().await.unwrap();
Ok((Box::new(wrt), Box::new(NoopIo)))
}
}
impl Config {
pub fn quic_server_config(&self) -> Result<ServerConfig> {
let pem = std::fs::read(&self.tls_key).expect("error reading key");
let tls_key = quinn::PrivateKey::from_pem(&pem).expect("error parsing key");
let pem = std::fs::read(&self.tls_cert).expect("error reading certificates");
let cert_chain = quinn::CertificateChain::from_pem(&pem).expect("error parsing certificates");
let transport_config = TransportConfig::default();
// todo: configure transport_config here if needed
let mut server_config = ServerConfig::default();
server_config.transport = Arc::new(transport_config);
let mut cfg_builder = ServerConfigBuilder::new(server_config);
cfg_builder.certificate(cert_chain, tls_key)?;
Ok(cfg_builder.build())
}
}
struct NoopIo;
use core::pin::Pin;
use core::task::{Context, Poll};
// todo: could change this to return Error and kill the stream instead, after all, s2s *should* not be receiving any bytes back
impl AsyncWrite for NoopIo {
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncRead for NoopIo {
fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
Poll::Pending
}
}
pub fn spawn_quic_listener(local_addr: SocketAddr, config: CloneableConfig, server_config: ServerConfig) -> JoinHandle<Result<()>> {
//let (mut incoming, server_cert) = make_server_endpoint(local_addr).die("cannot listen on port/interface");
let mut endpoint_builder = Endpoint::builder();
endpoint_builder.listen(server_config);
let (_endpoint, mut incoming) = endpoint_builder.bind(&local_addr).die("cannot listen on port/interface");
// accept a single connection
tokio::spawn(async move {
let incoming_conn = incoming.next().await.unwrap();
let mut new_conn = incoming_conn.await.unwrap();
let client_addr = new_conn.connection.remote_address();
let config = config.clone();
tokio::spawn(async move {
println!("INFO: {} quic connected", client_addr);
loop {
tokio::select! {
Some(Ok((wrt, rd))) = new_conn.bi_streams.next() => {
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = shuffle_rd_wr(rd, wrt, config, local_addr, client_addr, AllowedType::ClientOnly).await {
eprintln!("ERROR: {} {}", client_addr, e);
}
});
},
Some(Ok(rd)) = new_conn.uni_streams.next() => {
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = shuffle_rd_wr(rd, NoopIo, config, local_addr, client_addr, AllowedType::ServerOnly).await {
eprintln!("ERROR: {} {}", client_addr, e);
}
});
},
}
}
});
#[allow(unreachable_code)]
Ok(())
})
}

View File

@ -4,6 +4,7 @@ pub trait SliceSubsequence<T> {
fn trim_start(&self, needle: &[T]) -> &[T];
fn first_index_of(&self, needle: &[T]) -> Result<usize>;
fn replace_first(self, needle: &[T], replacement: &[T]) -> Vec<T>;
fn extract_between(&self, before: &[T], after: &[T]) -> Result<&[T]>;
fn contains_seq(&self, needle: &[T]) -> bool {
self.first_index_of(needle).is_ok()
@ -39,6 +40,11 @@ impl<T: PartialEq + Clone> SliceSubsequence<T> for &[T] {
fn replace_first(self, needle: &[T], replacement: &[T]) -> Vec<T> {
self.to_vec().replace_first(needle, replacement)
}
fn extract_between(&self, before: &[T], after: &[T]) -> Result<&[T]> {
let first = &self[self.first_index_of(before)? + before.len()..];
Ok(&first[..first.first_index_of(after)? + after.len() - 1])
}
}
impl<T: PartialEq + Clone> SliceSubsequence<T> for Vec<T> {
@ -47,7 +53,7 @@ impl<T: PartialEq + Clone> SliceSubsequence<T> for Vec<T> {
}
fn first_index_of(&self, needle: &[T]) -> Result<usize> {
return (self.as_slice()).first_index_of(needle);
(self.as_slice()).first_index_of(needle)
}
fn replace_first(self, needle: &[T], replacement: &[T]) -> Vec<T> {
@ -62,6 +68,11 @@ impl<T: PartialEq + Clone> SliceSubsequence<T> for Vec<T> {
self
}
}
fn extract_between(&self, before: &[T], after: &[T]) -> Result<&[T]> {
let first = &self[self.first_index_of(before)? + before.len()..];
Ok(&first[..first.first_index_of(after)? + after.len() - 1])
}
}
#[cfg(test)]
@ -99,4 +110,17 @@ mod tests {
let buf = buf.replace_first(b" bla2", b"");
assert_eq!(buf, b"bla to='tsnhaou'");
}
#[test]
fn extract_between() {
let buf = &b"bla to='tsnhaou' bla2"[..];
assert_eq!(buf.extract_between(b" to='", b"'").unwrap(), b"tsnhaou");
let buf = &br###"<stream:stream xmlns='jabber:server' xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' version='1.0' to='example.org' from='example.com' xml:lang='en'>"###[..];
assert_eq!(buf.extract_between(b" to='", b"'").or_else(|_| buf.extract_between(b" to=\"", b"\"")).unwrap(), b"example.org");
let buf = &br###"<stream:stream xmlns="jabber:server" xmlns:stream="http://etherx.jabber.org/streams" xmlns:db="jabber:server:dialback" version="1.0" to="example.org" from="example.com" xml:lang="en">"###[..];
assert_eq!(buf.extract_between(b" to='", b"'").or_else(|_| buf.extract_between(b" to=\"", b"\"")).unwrap(), b"example.org");
}
}

208
src/srv.rs Normal file
View File

@ -0,0 +1,208 @@
use std::net::SocketAddr;
use trust_dns_resolver::error::ResolveError;
use trust_dns_resolver::lookup::SrvLookup;
use trust_dns_resolver::{IntoName, TokioAsyncResolver};
use anyhow::{bail, Result};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use crate::stanzafilter::StanzaReader;
use crate::*;
lazy_static::lazy_static! {
static ref RESOLVER: TokioAsyncResolver = TokioAsyncResolver::tokio_from_system_conf().unwrap();
}
#[derive(Copy, Clone, Debug)]
pub enum XmppConnectionType {
StartTLS,
DirectTLS,
#[cfg(feature = "quic")]
QUIC,
}
#[derive(Debug)]
pub struct XmppConnection {
conn_type: XmppConnectionType,
priority: u16,
weight: u16,
port: u16,
target: String,
}
impl XmppConnection {
pub async fn connect(
&self,
domain: &str,
is_c2s: bool,
stream_open: &[u8],
mut in_filter: &mut crate::StanzaFilter,
) -> Result<(Box<dyn AsyncWrite + Unpin + Send>, Box<dyn AsyncRead + Unpin + Send>)> {
// todo: need to set options to Ipv4AndIpv6
let ips = RESOLVER.lookup_ip(self.target.clone()).await?;
debug!("trying 1 domain {}, SRV: {:?}", domain, self);
for ip in ips.iter() {
debug!("trying domain {}, ip {}, SRV: {:?}", domain, ip, self);
match self.conn_type {
XmppConnectionType::StartTLS => match crate::starttls_connect(SocketAddr::new(ip, self.port), domain, is_c2s, &stream_open, &mut in_filter).await {
Ok((wr, rd)) => return Ok((wr, rd)),
Err(e) => println!("ERROR: starttls connection failed to IP {} from SRV {}, error: {}", ip, self.target, e),
},
XmppConnectionType::DirectTLS => match crate::tls_connect(SocketAddr::new(ip, self.port), domain, is_c2s).await {
Ok((wr, rd)) => return Ok((wr, rd)),
Err(e) => println!("ERROR: direct tls connection failed to IP {} from SRV {}, error: {}", ip, self.target, e),
},
#[cfg(feature = "quic")]
XmppConnectionType::QUIC => match crate::quic_connect(SocketAddr::new(ip, self.port), domain, is_c2s).await {
Ok((wr, rd)) => return Ok((wr, rd)),
Err(e) => println!("ERROR: quic connection failed to IP {} from SRV {}, error: {}", ip, self.target, e),
},
}
}
debug!("trying 2 domain {}, SRV: {:?}", domain, self);
bail!("cannot connect to any IPs for SRV: {}", self.target)
}
}
fn collect_srvs(ret: &mut Vec<XmppConnection>, srv_records: std::result::Result<SrvLookup, ResolveError>, conn_type: XmppConnectionType) {
if let Ok(srv_records) = srv_records {
for srv in srv_records.iter() {
if !srv.target().is_root() {
ret.push(XmppConnection {
conn_type,
priority: srv.priority(),
weight: srv.weight(),
port: srv.port(),
target: srv.target().to_ascii(),
});
}
}
}
}
pub async fn get_xmpp_connections(domain: &str, is_c2s: bool) -> Result<Vec<XmppConnection>> {
let (starttls, direct_tls, quic) = if is_c2s {
("_xmpp-client._tcp", "_xmpps-client._tcp", "_xmppq-client._udp")
} else {
("_xmpp-server._tcp", "_xmpps-server._tcp", "_xmppq-server._udp")
};
let starttls = format!("{}.{}.", starttls, domain).into_name()?;
let direct_tls = format!("{}.{}.", direct_tls, domain).into_name()?;
let quic = format!("{}.{}.", quic, domain).into_name()?;
// this lets them run concurrently but not in parallel, could spawn parallel tasks but... worth it ?
let (starttls, direct_tls, quic) = tokio::join!(RESOLVER.srv_lookup(starttls), RESOLVER.srv_lookup(direct_tls), RESOLVER.srv_lookup(quic),);
let mut ret = Vec::new();
collect_srvs(&mut ret, starttls, XmppConnectionType::StartTLS);
collect_srvs(&mut ret, direct_tls, XmppConnectionType::DirectTLS);
#[cfg(feature = "quic")]
collect_srvs(&mut ret, quic, XmppConnectionType::QUIC);
ret.sort_by(|a, b| a.priority.cmp(&b.priority));
// todo: do something with weight
if ret.is_empty() {
// default starttls ports
ret.push(XmppConnection {
priority: 0,
weight: 0,
target: domain.to_string(),
conn_type: XmppConnectionType::StartTLS,
port: if is_c2s { 5222 } else { 5269 },
});
// by spec there are no default direct/quic ports, but we are going 443
ret.push(XmppConnection {
priority: 0,
weight: 0,
target: domain.to_string(),
conn_type: XmppConnectionType::DirectTLS,
port: 443,
});
#[cfg(feature = "quic")]
ret.push(XmppConnection {
priority: 0,
weight: 0,
target: domain.to_string(),
conn_type: XmppConnectionType::QUIC,
port: 443,
});
}
/*
// manual target for testing
ret.clear();
ret.push(XmppConnection {
priority: 0,
weight: 0,
target: "127.0.0.1".to_string(),
conn_type: XmppConnectionType::QUIC,
port: 4443,
});
*/
debug!("{} records for {}: {:?}", ret.len(), domain, ret);
Ok(ret)
}
pub async fn srv_connect(
domain: &str,
is_c2s: bool,
stream_open: &[u8],
mut in_filter: &mut crate::StanzaFilter,
) -> Result<(Box<dyn AsyncWrite + Unpin + Send>, StanzaReader<tokio::io::BufReader<Box<dyn AsyncRead + Unpin + Send>>>, Vec<u8>)> {
for srv in get_xmpp_connections(&domain, is_c2s).await? {
debug!("main srv: {:?}", srv);
let connect = srv.connect(&domain, is_c2s, &stream_open, &mut in_filter).await;
if connect.is_err() {
continue;
}
let (mut out_wr, out_rd) = connect.unwrap();
debug!("main srv out: {:?}", srv);
// we naively read 1 byte at a time, which buffering significantly speeds up
let mut out_rd = StanzaReader(tokio::io::BufReader::with_capacity(crate::IN_BUFFER_SIZE, out_rd));
out_wr.write_all(&stream_open).await?;
out_wr.flush().await?;
let mut server_response = Vec::new();
if is_c2s {
// let's read to first <stream:stream to make sure we are successfully connected to a real XMPP server
let mut stream_received = false;
while let Ok(Some(buf)) = out_rd.next(&mut in_filter).await {
debug!("received pre-tls stanza: {} '{}'", domain, to_str(&buf));
if buf.starts_with(b"<?xml ") {
server_response.extend_from_slice(&buf);
} else if buf.starts_with(b"<stream:stream ") {
server_response.extend_from_slice(&buf);
stream_received = true;
break;
} else {
debug!("bad pre-tls stanza: {}", to_str(&buf));
break;
}
}
if !stream_received {
debug!("bad server response, going to next record");
continue;
}
}
return Ok((Box::new(out_wr), out_rd, server_response));
}
bail!("all connection attempts failed")
}
#[cfg(test)]
mod tests {
use crate::srv::*;
#[tokio::test]
async fn srv() -> Result<()> {
get_xmpp_connections("burtrum.org", true).await?;
Ok(())
}
}

View File

@ -211,7 +211,7 @@ mod tests {
}
#[tokio::test]
async fn process_next_byte() -> std::result::Result<(), anyhow::Error> {
async fn process_next_byte() -> Result<()> {
let mut filter = StanzaFilter::new(262_144);
//todo: <x a='/>'>This is going to be fun.</x>

209
src/tls.rs Normal file
View File

@ -0,0 +1,209 @@
use crate::*;
#[cfg(any(feature = "incoming", feature = "outgoing"))]
use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef, TlsConnector};
#[cfg(feature = "outgoing")]
lazy_static::lazy_static! {
static ref CLIENT_TLS_CONFIG: TlsConnector = {
let mut config = ClientConfig::new();
config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
config.alpn_protocols.push(ALPN_XMPP_CLIENT[0].to_vec());
let config = TlsConnector::from(Arc::new(config));
config
};
static ref SERVER_TLS_CONFIG: TlsConnector = {
let mut config = ClientConfig::new();
config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
config.alpn_protocols.push(ALPN_XMPP_SERVER[0].to_vec());
let config = TlsConnector::from(Arc::new(config));
config
};
}
#[cfg(feature = "outgoing")]
pub async fn tls_connect(target: SocketAddr, server_name: &str, is_c2s: bool) -> Result<(Box<dyn AsyncWrite + Unpin + Send>, Box<dyn AsyncRead + Unpin + Send>)> {
let dnsname = DNSNameRef::try_from_ascii_str(server_name)?;
let stream = tokio::net::TcpStream::connect(target).await?;
let stream = if is_c2s {
CLIENT_TLS_CONFIG.connect(dnsname, stream).await?
} else {
SERVER_TLS_CONFIG.connect(dnsname, stream).await?
};
let (rd, wrt) = tokio::io::split(stream);
Ok((Box::new(wrt), Box::new(rd)))
}
#[cfg(feature = "outgoing")]
pub async fn starttls_connect(
target: SocketAddr,
server_name: &str,
is_c2s: bool,
stream_open: &[u8],
mut in_filter: &mut StanzaFilter,
) -> Result<(Box<dyn AsyncWrite + Unpin + Send>, Box<dyn AsyncRead + Unpin + Send>)> {
let dnsname = DNSNameRef::try_from_ascii_str(server_name)?;
let mut stream = tokio::net::TcpStream::connect(target).await?;
let (in_rd, mut in_wr) = stream.split();
// send the stream_open
debug!("starttls sending: {} '{}'", server_name, to_str(&stream_open));
in_wr.write_all(&stream_open).await?;
in_wr.flush().await?;
// we naively read 1 byte at a time, which buffering significantly speeds up
let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd);
let mut in_rd = StanzaReader(in_rd);
let mut proceed_received = false;
debug!("starttls reading stream open {}", server_name);
while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await {
debug!("received pre-tls stanza: {} '{}'", server_name, to_str(&buf));
if buf.starts_with(b"<?xml ") {
// ignore this
} else if buf.starts_with(b"<stream:stream ") {
// ignore this
} else if buf.starts_with(b"<stream:features") {
// we send starttls regardless, it could have been stripped out, we don't do plaintext
let buf = br###"<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"###;
debug!("> {} '{}'", server_name, to_str(buf));
in_wr.write_all(buf).await?;
in_wr.flush().await?;
} else if buf.starts_with(b"<proceed ") {
proceed_received = true;
break;
} else {
bail!("bad pre-tls stanza: {}", to_str(&buf));
}
}
if !proceed_received {
bail!("stream ended before proceed");
}
debug!("starttls starting TLS {}", server_name);
let stream = if is_c2s {
CLIENT_TLS_CONFIG.connect(dnsname, stream).await?
} else {
SERVER_TLS_CONFIG.connect(dnsname, stream).await?
};
let (rd, wrt) = tokio::io::split(stream);
Ok((Box::new(wrt), Box::new(rd)))
}
#[cfg(feature = "incoming")]
pub fn spawn_tls_listener(local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let listener = TcpListener::bind(&local_addr).await.die("cannot listen on port/interface");
loop {
let (stream, client_addr) = listener.accept().await?;
let config = config.clone();
let acceptor = acceptor.clone();
tokio::spawn(async move {
if let Err(e) = handle_tls_connection(stream, client_addr, local_addr, config, acceptor).await {
eprintln!("ERROR: {} {}", client_addr, e);
}
});
}
#[allow(unreachable_code)]
Ok(())
})
}
#[cfg(feature = "incoming")]
async fn handle_tls_connection(mut stream: tokio::net::TcpStream, client_addr: SocketAddr, local_addr: SocketAddr, config: CloneableConfig, acceptor: TlsAcceptor) -> Result<()> {
println!("INFO: {} connected", client_addr);
let mut in_filter = StanzaFilter::new(config.max_stanza_size_bytes);
let direct_tls = {
// sooo... I don't think peek here can be used for > 1 byte without this timer
// craziness... can it? this could be switched to only peek 1 byte and assume
// a leading 0x16 is TLS, it would *probably* be ok ?
//let mut p = [0u8; 3];
let mut p = &mut in_filter.buf[0..3];
// wait up to 10 seconds until 3 bytes have been read
use std::time::{Duration, Instant};
let duration = Duration::from_secs(10);
let now = Instant::now();
loop {
let n = stream.peek(&mut p).await?;
if n == 3 {
break; // success
}
if n == 0 {
bail!("not enough bytes");
}
if Instant::now() - now > duration {
bail!("less than 3 bytes in 10 seconds, closed connection?");
}
}
/* TLS packet starts with a record "Hello" (0x16), followed by version
* (0x03 0x00-0x03) (RFC6101 A.1)
* This means we reject SSLv2 and lower, which is actually a good thing (RFC6176)
*/
p[0] == 0x16 && p[1] == 0x03 && p[2] <= 0x03
};
println!("INFO: {} direct_tls: {}", client_addr, direct_tls);
// starttls
if !direct_tls {
let mut proceed_sent = false;
let (in_rd, mut in_wr) = stream.split();
// we naively read 1 byte at a time, which buffering significantly speeds up
let in_rd = tokio::io::BufReader::with_capacity(IN_BUFFER_SIZE, in_rd);
let mut in_rd = StanzaReader(in_rd);
while let Ok(Some(buf)) = in_rd.next(&mut in_filter).await {
debug!("received pre-tls stanza: {} '{}'", client_addr, to_str(&buf));
if buf.starts_with(b"<?xml ") {
debug!("> {} '{}'", client_addr, to_str(&buf));
in_wr.write_all(&buf).await?;
in_wr.flush().await?;
} else if buf.starts_with(b"<stream:stream ") {
// gajim seems to REQUIRE an id here...
let buf = if buf.contains_seq(b"id=") {
buf.replace_first(b" id='", b" id='xmpp-proxy")
.replace_first(br#" id=""#, br#" id="xmpp-proxy"#)
.replace_first(b" to=", br#" bla toblala="#)
.replace_first(b" from=", b" to=")
.replace_first(br#" bla toblala="#, br#" from="#)
} else {
buf.replace_first(b" to=", br#" bla toblala="#)
.replace_first(b" from=", b" to=")
.replace_first(br#" bla toblala="#, br#" id='xmpp-proxy' from="#)
};
debug!("> {} '{}'", client_addr, to_str(&buf));
in_wr.write_all(&buf).await?;
// ejabberd never sends <starttls/> with the first, only the second?
//let buf = br###"<features xmlns="http://etherx.jabber.org/streams"><starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls"><required/></starttls></features>"###;
let buf = br###"<stream:features><starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls"><required/></starttls></stream:features>"###;
debug!("> {} '{}'", client_addr, to_str(buf));
in_wr.write_all(buf).await?;
in_wr.flush().await?;
} else if buf.starts_with(b"<starttls ") {
let buf = br###"<proceed xmlns="urn:ietf:params:xml:ns:xmpp-tls" />"###;
debug!("> {} '{}'", client_addr, to_str(buf));
in_wr.write_all(buf).await?;
in_wr.flush().await?;
proceed_sent = true;
break;
} else {
bail!("bad pre-tls stanza: {}", to_str(&buf));
}
}
if !proceed_sent {
bail!("stream ended before open");
}
}
let stream = acceptor.accept(stream).await?;
let (in_rd, in_wr) = tokio::io::split(stream);
shuffle_rd_wr_filter(in_rd, in_wr, config, local_addr, client_addr, AllowedType::Any, in_filter).await
}

View File

@ -1,6 +1,8 @@
# interfaces to listen for external XMPP connections on
listen = [ "0.0.0.0:5222", "0.0.0.0:5269" ]
quic_listen = [ "0.0.0.0:443" ]
outgoing_listen = [ "0.0.0.0:5252" ]
# these ports shouldn't do any TLS, but should assume any connection from xmpp-proxy is secure
# prosody module: https://modules.prosody.im/mod_secure_interfaces.html