From 4a2fa75ae006369a2974a0d5f68752f66e0f4388 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Thu, 27 Nov 2025 12:48:46 +0100 Subject: [PATCH 1/5] Reverse gRPC communication --- Cargo.lock | 332 +++++++++++++----- Cargo.toml | 1 + proto | 2 +- src/config.rs | 47 +-- src/gateway.rs | 900 +++++++++++++++++++++++++++++++------------------ src/main.rs | 32 +- 6 files changed, 876 insertions(+), 438 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c74297d7..a8de0eaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.21" @@ -59,22 +68,22 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.10" +version = "3.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -150,9 +159,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "axum" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" +checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", "bytes", @@ -239,6 +248,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "boringtun" version = "0.6.0" @@ -272,9 +290,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.1" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "camino" @@ -310,9 +328,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.44" +version = "1.2.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3" +checksum = "cd405d82c84ff7f35739f175f67d8b9fb7687a0e84ccdc78bd3568839827cf07" dependencies = [ "find-msvc-tools", "jobserver", @@ -369,9 +387,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.51" +version = "4.5.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c26d721170e0295f191a69bd9a1f93efcdb0aff38684b61ab5750468972e5f5" +checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8" dependencies = [ "clap_builder", "clap_derive", @@ -379,9 +397,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.51" +version = "4.5.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75835f0c7bf681bfd05abe44e965760fea999a5286c6eb2d59883634fd02011a" +checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00" dependencies = [ "anstream", "anstyle", @@ -449,9 +467,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", "rand_core", @@ -644,6 +662,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "dispatch2" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" +dependencies = [ + "bitflags", + "objc2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -714,9 +742,9 @@ checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] name = "find-msvc-tools" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" [[package]] name = "fixedbitset" @@ -799,9 +827,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.9" +version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", @@ -891,9 +919,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" [[package]] name = "heck" @@ -929,12 +957,11 @@ dependencies = [ [[package]] name = "http" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" dependencies = [ "bytes", - "fnv", "itoa", ] @@ -975,9 +1002,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.7.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" dependencies = [ "atomic-waker", "bytes", @@ -1011,9 +1038,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" +checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" dependencies = [ "bytes", "futures-channel", @@ -1140,9 +1167,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" +checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", "hashbrown", @@ -1216,22 +1243,22 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "jiff" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" +checksum = "49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35" dependencies = [ "jiff-static", "log", "portable-atomic", "portable-atomic-util", - "serde", + "serde_core", ] [[package]] name = "jiff-static" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" +checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69" dependencies = [ "proc-macro2", "quote", @@ -1284,9 +1311,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.22" +version = "1.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" +checksum = "15d118bbf3771060e7311cc7bb0545b01d08a8b4a7de949198dec1fa0ca1c0f7" dependencies = [ "cc", "libc", @@ -1542,6 +1569,165 @@ dependencies = [ "libc", ] +[[package]] +name = "objc2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-cloud-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ad74d880bb43877038da939b7427bba67e9dd42004a18b809ba7d87cee241c" +dependencies = [ + "bitflags", + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-data" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b402a653efbb5e82ce4df10683b6b28027616a2715e90009947d50b8dd298fa" +dependencies = [ + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags", + "dispatch2", + "objc2", +] + +[[package]] +name = "objc2-core-graphics" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e022c9d066895efa1345f8e33e584b9f958da2fd4cd116792e15e07e4720a807" +dependencies = [ + "bitflags", + "dispatch2", + "objc2", + "objc2-core-foundation", + "objc2-io-surface", +] + +[[package]] +name = "objc2-core-image" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d563b38d2b97209f8e861173de434bd0214cf020e3423a52624cd1d989f006" +dependencies = [ + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-location" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca347214e24bc973fc025fd0d36ebb179ff30536ed1f80252706db19ee452009" +dependencies = [ + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-core-text" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cde0dfb48d25d2b4862161a4d5fcc0e3c24367869ad306b0c9ec0073bfed92d" +dependencies = [ + "bitflags", + "objc2", + "objc2-core-foundation", + "objc2-core-graphics", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + +[[package]] +name = "objc2-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272" +dependencies = [ + "bitflags", + "block2", + "libc", + "objc2", + "objc2-core-foundation", +] + +[[package]] +name = "objc2-io-surface" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180788110936d59bab6bd83b6060ffdfffb3b922ba1396b312ae795e1de9d81d" +dependencies = [ + "bitflags", + "objc2", + "objc2-core-foundation", +] + +[[package]] +name = "objc2-quartz-core" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c1358452b371bf9f104e21ec536d37a650eb10f7ee379fff67d2e08d537f1f" +dependencies = [ + "bitflags", + "objc2", + "objc2-core-foundation", + "objc2-foundation", +] + +[[package]] +name = "objc2-ui-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87d638e33c06f577498cbcc50491496a3ed4246998a7fbba7ccb98b1e7eab22" +dependencies = [ + "bitflags", + "block2", + "objc2", + "objc2-cloud-kit", + "objc2-core-data", + "objc2-core-foundation", + "objc2-core-graphics", + "objc2-core-image", + "objc2-core-location", + "objc2-core-text", + "objc2-foundation", + "objc2-quartz-core", + "objc2-user-notifications", +] + +[[package]] +name = "objc2-user-notifications" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9df9128cbbfef73cda168416ccf7f837b62737d748333bfe9ab71c245d76613e" +dependencies = [ + "objc2", + "objc2-foundation", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1568,14 +1754,18 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "os_info" -version = "3.12.0" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3" +checksum = "7c39b5918402d564846d5aba164c09a66cc88d232179dfd3e3c619a25a268392" dependencies = [ + "android_system_properties", "log", - "plist", + "nix", + "objc2", + "objc2-foundation", + "objc2-ui-kit", "serde", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1673,19 +1863,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" -[[package]] -name = "plist" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07" -dependencies = [ - "base64", - "indexmap", - "quick-xml", - "serde", - "time", -] - [[package]] name = "poly1305" version = "0.8.0" @@ -1813,27 +1990,18 @@ dependencies = [ [[package]] name = "pulldown-cmark-to-cmark" -version = "21.0.0" +version = "21.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +checksum = "8246feae3db61428fd0bb94285c690b460e4517d83152377543ca802357785f1" dependencies = [ "pulldown-cmark", ] -[[package]] -name = "quick-xml" -version = "0.38.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" -dependencies = [ - "memchr", -] - [[package]] name = "quote" -version = "1.0.41" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -1935,9 +2103,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.34" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "log", "once_cell", @@ -2152,9 +2320,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.6" +version = "1.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" +checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad" dependencies = [ "libc", ] @@ -2225,9 +2393,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.108" +version = "2.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" +checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" dependencies = [ "proc-macro2", "quote", @@ -2593,9 +2761,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -2604,9 +2772,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" dependencies = [ "once_cell", "valuable", @@ -3217,9 +3385,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "winnow" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" +checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index b1ad1595..2c27df1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ toml = { version = "0.9", default-features = false, features = [ tonic = { version = "0.14", default-features = false, features = [ "codegen", "gzip", + "router", "tls-native-roots", "tls-ring", ] } diff --git a/proto b/proto index 883487df..7137ff12 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit 883487df67d90fd14fae900737cd8b5ea6c10de3 +Subproject commit 7137ff12807ab8fd807e2439d0812f1d2a5f5055 diff --git a/src/config.rs b/src/config.rs index 00445ffa..7bb00509 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use std::{fs, net::IpAddr, path::PathBuf}; +use std::{fs, net::IpAddr, path::PathBuf, time::Duration}; use clap::Parser; use serde::Deserialize; @@ -36,26 +36,23 @@ pub struct Config { #[arg(long, env = "DEFGUARD_GATEWAY_NAME")] pub name: Option, - /// defguard server gRPC endpoint URL - #[arg( - long, - short = 'g', - required_unless_present = "config_path", - env = "DEFGUARD_GRPC_URL", - default_value = "" - )] - #[serde(default)] - pub grpc_url: String, + /// Gateway gRPC server port. + #[arg(long, env = "DEFGUARD_GRPC_PORT", default_value = "50066")] + pub(crate) grpc_port: u16, + + /// Gateway gRPC server certificate. + #[arg(long, env = "DEFGUARD_GATEWAY_GRPC_CERT")] + pub(crate) grpc_cert: Option, + + /// Gateway gRPC server private key. + #[arg(long, env = "DEFGUARD_GATEWAY_GRPC_KEY")] + pub(crate) grpc_key: Option, /// Use userspace WireGuard implementation e.g. wireguard-go #[arg(long, short = 'u', env = "DEFGUARD_USERSPACE")] pub userspace: bool, - /// Path to CA file - #[arg(long, env = "DEFGUARD_GRPC_CA")] - pub grpc_ca: Option, - - /// Defines how often (in seconds) interface statistics are sent to Defguard server + /// Defines how often (in seconds) interface statistics are sent to Defguard Core. #[arg(long, short = 'p', env = "DEFGUARD_STATS_PERIOD", default_value = "30")] pub stats_period: u64, @@ -100,9 +97,9 @@ pub struct Config { /// Command to run after bringing down the interface. #[arg(long, env = "POST_DOWN")] pub post_down: Option, - /// A HTTP port that will expose the REST HTTP gateway health status - /// 200 Gateway is working and is connected to CORE - /// 503 - gateway works but is not connected to CORE + /// HTTP port that will expose the REST Gateway health status endpoint. + /// 200: Gateway is working and is connected to Core + /// 503: Gateway is working, but is not connected to Core #[arg(long, env = "HEALTH_PORT")] pub health_port: Option, @@ -125,15 +122,23 @@ pub struct Config { pub http_bind_address: Option, } +impl Config { + #[must_use] + pub fn stats_period(&self) -> Duration { + Duration::from_secs(self.stats_period) + } +} + impl Default for Config { fn default() -> Self { Self { log_level: "info".into(), token: "TOKEN".into(), name: None, - grpc_url: "http://localhost:50051".into(), + grpc_port: 50066, userspace: false, - grpc_ca: None, + grpc_cert: None, + grpc_key: None, stats_period: 15, ifname: "wg0".into(), pidfile: None, diff --git a/src/gateway.rs b/src/gateway.rs index 78fe18cf..fe1cbf87 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -7,28 +7,27 @@ use gethostname::gethostname; use std::{ collections::HashMap, fs::read_to_string, + net::{IpAddr, Ipv4Addr, SocketAddr}, str::FromStr, sync::{ Arc, Mutex, - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, }, time::{Duration, SystemTime}, }; use tokio::{ - select, sync::mpsc, task::{JoinHandle, spawn}, time::{interval, sleep}, }; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{ - Request, Status, Streaming, + Request, Response, Status, Streaming, codegen::InterceptedService, metadata::{Ascii, MetadataValue}, - service::{Interceptor, InterceptorLayer}, - transport::{Certificate, Channel, ClientTlsConfig, Endpoint}, + service::Interceptor, + transport::{Channel, Identity, Server, ServerTlsConfig}, }; -use tower::ServiceBuilder; use tracing::{Instrument, instrument}; use crate::{ @@ -41,8 +40,8 @@ use crate::{ error::GatewayError, execute_command, mask, proto::gateway::{ - Configuration, ConfigurationRequest, Peer, StatsUpdate, Update, - gateway_service_client::GatewayServiceClient, stats_update::Payload, update, + Configuration, ConfigurationRequest, CoreRequest, CoreResponse, Peer, Update, core_request, + core_response, gateway_server, update, }, version::ensure_core_version_supported, }; @@ -75,6 +74,8 @@ impl From for InterfaceConfiguration { } } +type ClientMap = HashMap>>; + /// Intercepts all grpc requests adding authentication and version metadata struct AuthInterceptor { hostname: MetadataValue, @@ -115,9 +116,9 @@ impl Interceptor for AuthInterceptor { } type PubKey = String; -type GatewayClientType = GatewayServiceClient< - InterceptedService, ClientVersionInterceptor>, ->; +// type GatewayClientType = GatewayServiceClient< +// InterceptedService, ClientVersionInterceptor>, +// >; pub struct Gateway { config: Config, @@ -129,9 +130,10 @@ pub struct Gateway { #[cfg_attr(not(target_os = "linux"), allow(unused))] firewall_config: Option, pub connected: Arc, - client: GatewayClientType, core_info: Option, - stats_thread: Option>, + // stats_thread: Option>, + // TODO: allow only one client. + pub(super) clients: ClientMap, } impl Gateway { @@ -140,18 +142,16 @@ impl Gateway { wgapi: impl WireguardInterfaceApi + Send + Sync + 'static, firewall_api: FirewallApi, ) -> Result { - let client = Self::setup_client(&config)?; Ok(Self { config, interface_configuration: None, peers: HashMap::new(), wgapi: Arc::new(Mutex::new(wgapi)), - connected: Arc::new(AtomicBool::new(false)), - client, - stats_thread: None, firewall_api, firewall_config: None, + connected: Arc::new(AtomicBool::new(false)), core_info: None, + clients: ClientMap::new(), }) } @@ -202,87 +202,87 @@ impl Gateway { }) } - /// Starts tokio thread collecting stats and sending them to backend service via gRPC. - #[instrument(skip_all)] - fn spawn_stats_thread(&mut self) -> UnboundedReceiverStream { - if let Some(handle) = self.stats_thread.take() { - debug!("Aborting previous stats thread before starting a new one"); - handle.abort(); - } - // Create an async stream that periodically yields WireGuard interface statistics. - let period = Duration::from_secs(self.config.stats_period); - let wgapi = Arc::clone(&self.wgapi); - let (tx, rx) = mpsc::unbounded_channel(); - debug!("Spawning stats thread"); - let handle = spawn( - async move { - // helper map to track if peer data is actually changing - // and avoid sending duplicate stats - let mut peer_map = HashMap::new(); - let mut interval = interval(period); - let mut id = 1; - 'outer: loop { - // wait until next iteration - interval.tick().await; - debug!("Sending active peer stats updates."); - let interface_data = wgapi.lock().unwrap().read_interface_data(); - match interface_data { - Ok(host) => { - let peers = host.peers; - debug!( - "Found {} peers configured on WireGuard interface", - peers.len() - ); - for peer in peers.into_values().filter(|p| { - p.last_handshake - .is_some_and(|lhs| lhs != SystemTime::UNIX_EPOCH) - }) { - let has_changed = peer_map - .get(&peer.public_key) - .is_none_or(|last_peer| *last_peer != peer); - if has_changed { - peer_map.insert(peer.public_key.clone(), peer.clone()); - id += 1; - if tx - .send(StatsUpdate { - id, - payload: Some(Payload::PeerStats((&peer).into())), - }) - .is_err() - { - debug!("Stats stream disappeared"); - break 'outer; - } - } else { - debug!( - "Stats for peer {} have not changed. Skipping.", - peer.public_key - ); - } - } - } - Err(err) => error!("Failed to retrieve WireGuard interface stats: {err}"), - } - debug!("Sent peer stats updates for all peers."); - } - } - .instrument(tracing::Span::current()), - ); - self.stats_thread = Some(handle); - UnboundedReceiverStream::new(rx) - } - - #[instrument(skip_all)] - async fn handle_stats_thread( - mut client: GatewayClientType, - rx: UnboundedReceiverStream, - ) { - let status = client.stats(rx).await; - match status { - Ok(_) => info!("Stats thread terminated successfully."), - Err(err) => error!("Stats thread terminated with error: {err}"), - } - } + // /// Starts tokio thread collecting stats and sending them to backend service via gRPC. + // #[instrument(skip_all)] + // fn spawn_stats_thread(&mut self) -> UnboundedReceiverStream { + // if let Some(handle) = self.stats_thread.take() { + // debug!("Aborting previous stats thread before starting a new one"); + // handle.abort(); + // } + // // Create an async stream that periodically yields WireGuard interface statistics. + // let period = Duration::from_secs(self.config.stats_period); + // let wgapi = Arc::clone(&self.wgapi); + // let (tx, rx) = mpsc::unbounded_channel(); + // debug!("Spawning stats thread"); + // let handle = spawn( + // async move { + // // helper map to track if peer data is actually changing + // // and avoid sending duplicate stats + // let mut peer_map = HashMap::new(); + // let mut interval = interval(period); + // let mut id = 1; + // 'outer: loop { + // // wait until next iteration + // interval.tick().await; + // debug!("Sending active peer stats updates."); + // let interface_data = wgapi.lock().unwrap().read_interface_data(); + // match interface_data { + // Ok(host) => { + // let peers = host.peers; + // debug!( + // "Found {} peers configured on WireGuard interface", + // peers.len() + // ); + // for peer in peers.into_values().filter(|p| { + // p.last_handshake + // .is_some_and(|lhs| lhs != SystemTime::UNIX_EPOCH) + // }) { + // let has_changed = peer_map + // .get(&peer.public_key) + // .is_none_or(|last_peer| *last_peer != peer); + // if has_changed { + // peer_map.insert(peer.public_key.clone(), peer.clone()); + // id += 1; + // if tx + // .send(StatsUpdate { + // id, + // payload: Some(Payload::PeerStats((&peer).into())), + // }) + // .is_err() + // { + // debug!("Stats stream disappeared"); + // break 'outer; + // } + // } else { + // debug!( + // "Stats for peer {} have not changed. Skipping.", + // peer.public_key + // ); + // } + // } + // } + // Err(err) => error!("Failed to retrieve WireGuard interface stats: {err}"), + // } + // debug!("Sent peer stats updates for all peers."); + // } + // } + // .instrument(tracing::Span::current()), + // ); + // self.stats_thread = Some(handle); + // UnboundedReceiverStream::new(rx) + // } + + // #[instrument(skip_all)] + // async fn handle_stats_thread( + // mut client: GatewayClientType, + // rx: UnboundedReceiverStream, + // ) { + // let status = client.stats(rx).await; + // match status { + // Ok(_) => info!("Stats thread terminated successfully."), + // Err(err) => error!("Stats thread terminated with error: {err}"), + // } + // } /// Checks whether the firewall config changed fn has_firewall_config_changed(&self, new_fw_config: &FirewallConfig) -> bool { @@ -476,208 +476,292 @@ impl Gateway { Ok(()) } - /// Continuously tries to connect to gRPC endpoint. Once the connection is established - /// configures the interface, starts the stats thread, connects and returns the updates stream. - async fn connect(&mut self) -> Streaming { - // set diconnected if we are in this function and drop mutex - self.connected.store(false, Ordering::Relaxed); - loop { - debug!( - "Connecting to Defguard gRPC endpoint: {}", - self.config.grpc_url - ); - let (response, stream) = { - let response = self - .client - .config(ConfigurationRequest { - name: self.config.name.clone(), - }) - .await; - let stream = self.client.updates(()).await; - (response, stream) - }; - match (response, stream) { - (Ok(response), Ok(stream)) => { - self.core_info = ComponentInfo::from_metadata(response.metadata()); - let (version, info) = get_tracing_variables(&self.core_info); - let span = tracing::info_span!( - "core_configuration", - component = %DefguardComponent::Core, - version = version.to_string(), - info - ); - let _guard = span.enter(); - - // check core version and exit if it's not supported - let version = self.core_info.as_ref().map(|info| &info.version); - ensure_core_version_supported(version); - - if let Err(err) = self.configure(response.into_inner()) { - error!("Interface configuration failed: {err}"); - continue; - } - info!( - "Connected to Defguard gRPC endpoint: {}", - self.config.grpc_url - ); - self.connected.store(true, Ordering::Relaxed); - break stream.into_inner(); - } - (Err(err), _) => { - error!( - "Couldn't retrieve gateway configuration from the core. Using gRPC URL: \ - {}. Retrying in 10s. Error: {err}", - self.config.grpc_url - ); - } - (_, Err(err)) => { - error!( - "Couldn't establish streaming connection to the core. Using gRPC URL: \ - {}. Retrying in 10s. Error: {err}", - self.config.grpc_url - ); - } + /// Send message to all connected clients. + pub fn broadcast_to_clients(&self, message: &CoreRequest) { + for (addr, tx) in &self.clients { + if tx.send(Ok(message.clone())).is_err() { + debug!("Failed to send message to {addr}"); } - sleep(TEN_SECS).await; } } - fn setup_client(config: &Config) -> Result { - debug!("Preparing gRPC client configuration"); - let tls = ClientTlsConfig::new(); - // Use CA if provided, otherwise load certificates from system. - let tls = if let Some(ca) = &config.grpc_ca { - let ca = read_to_string(ca).map_err(|err| { - error!("Failed to read CA file: {err}"); - GatewayError::InvalidCaFile - })?; - tls.ca_certificate(Certificate::from_pem(ca)) - } else { - tls.with_enabled_roots() - }; - let endpoint = Endpoint::from_shared(config.grpc_url.clone())? - .http2_keep_alive_interval(TEN_SECS) - .tcp_keepalive(Some(TEN_SECS)) - .keep_alive_while_idle(true) - .tls_config(tls)?; - let channel = endpoint.connect_lazy(); - let version_interceptor = ClientVersionInterceptor::new(Version::parse(VERSION)?); - let auth_interceptor = AuthInterceptor::new(&config.token)?; - let channel = ServiceBuilder::new() - .layer(InterceptorLayer::new(version_interceptor)) - .layer(InterceptorLayer::new(auth_interceptor)) - .service(channel); - let client = GatewayServiceClient::new(channel); - - debug!("gRPC client configuration done"); - Ok(client) - } - - #[instrument(skip_all)] - async fn handle_updates(&mut self, updates_stream: &mut Streaming) { - loop { - match updates_stream.message().await { - Ok(Some(update)) => { - debug!("Received update: {update:?}"); - match update.update { - Some(update::Update::Network(configuration)) => { - if let Err(err) = self.configure(configuration) { - error!("Failed to update network configuration: {err}"); - } - } - Some(update::Update::Peer(peer_config)) => { - debug!("Applying peer configuration: {peer_config:?}"); - // UpdateType::Delete - if update.update_type == 2 { - debug!("Deleting peer {peer_config:?}"); - self.peers.remove(&peer_config.pubkey); - if let Err(err) = self.wgapi.lock().unwrap().remove_peer( - &peer_config.pubkey.as_str().try_into().unwrap_or_default(), - ) { - error!("Failed to delete peer: {err}"); - } - } - // UpdateType::Create, UpdateType::Modify - else { - debug!( - "Updating peer {peer_config:?}, update type: {}", - update.update_type - ); - self.peers - .insert(peer_config.pubkey.clone(), peer_config.clone()); - if let Err(err) = self - .wgapi - .lock() - .unwrap() - .configure_peer(&peer_config.into()) - { - error!("Failed to update peer: {err}"); - } - } - } - Some(update::Update::FirewallConfig(config)) => { - if self.config.disable_firewall_management { - debug!( - "Received firewall config update, but firewall management \ - is disabled. Skipping processing this update: {config:?}" - ); - continue; - } + // Continuously tries to connect to gRPC endpoint. Once the connection is established + // configures the interface, starts the stats thread, connects and returns the updates stream. + // async fn connect(&mut self) -> Streaming { + // // set diconnected if we are in this function and drop mutex + // self.connected.store(false, Ordering::Relaxed); + // loop { + // debug!( + // "Connecting to Defguard gRPC endpoint: {}", + // self.config.grpc_url + // ); + // let (response, stream) = { + // let response = self + // .client + // .config(ConfigurationRequest { + // name: self.config.name.clone(), + // }) + // .await; + // let stream = self.client.updates(()).await; + // (response, stream) + // }; + // match (response, stream) { + // (Ok(response), Ok(stream)) => { + // self.core_info = ComponentInfo::from_metadata(response.metadata()); + // let (version, info) = get_tracing_variables(&self.core_info); + // let span = tracing::info_span!( + // "core_configuration", + // component = %DefguardComponent::Core, + // version = version.to_string(), + // info + // ); + // let _guard = span.enter(); + + // // check core version and exit if it's not supported + // let version = self.core_info.as_ref().map(|info| &info.version); + // ensure_core_version_supported(version); + + // if let Err(err) = self.configure(response.into_inner()) { + // error!("Interface configuration failed: {err}"); + // continue; + // } + // info!( + // "Connected to Defguard gRPC endpoint: {}", + // self.config.grpc_url + // ); + // self.connected.store(true, Ordering::Relaxed); + // break stream.into_inner(); + // } + // (Err(err), _) => { + // error!( + // "Couldn't retrieve gateway configuration from the core. Using gRPC URL: \ + // {}. Retrying in 10s. Error: {err}", + // self.config.grpc_url + // ); + // } + // (_, Err(err)) => { + // error!( + // "Couldn't establish streaming connection to the core. Using gRPC URL: \ + // {}. Retrying in 10s. Error: {err}", + // self.config.grpc_url + // ); + // } + // } + // sleep(TEN_SECS).await; + // } + // } + + // fn setup_client(config: &Config) -> Result { + // debug!("Preparing gRPC client configuration"); + // let tls = ClientTlsConfig::new(); + // // Use CA if provided, otherwise load certificates from system. + // let tls = if let Some(ca) = &config.grpc_ca { + // let ca = read_to_string(ca).map_err(|err| { + // error!("Failed to read CA file: {err}"); + // GatewayError::InvalidCaFile + // })?; + // tls.ca_certificate(Certificate::from_pem(ca)) + // } else { + // tls.with_enabled_roots() + // }; + // let endpoint = Endpoint::from_shared(config.grpc_url.clone())? + // .http2_keep_alive_interval(TEN_SECS) + // .tcp_keepalive(Some(TEN_SECS)) + // .keep_alive_while_idle(true) + // .tls_config(tls)?; + // let channel = endpoint.connect_lazy(); + // let version_interceptor = ClientVersionInterceptor::new(Version::parse(VERSION)?); + // let auth_interceptor = AuthInterceptor::new(&config.token)?; + // let channel = ServiceBuilder::new() + // .layer(InterceptorLayer::new(version_interceptor)) + // .layer(InterceptorLayer::new(auth_interceptor)) + // .service(channel); + // let client = GatewayServiceClient::new(channel); + + // debug!("gRPC client configuration done"); + // Ok(client) + // } + + // #[instrument(skip_all)] + // async fn handle_updates(&mut self, updates_stream: &mut Streaming) { + // loop { + // match updates_stream.message().await { + // Ok(Some(update)) => { + // debug!("Received update: {update:?}"); + // match update.update { + // Some(update::Update::Network(configuration)) => { + // if let Err(err) = self.configure(configuration) { + // error!("Failed to update network configuration: {err}"); + // } + // } + // Some(update::Update::Peer(peer_config)) => { + // debug!("Applying peer configuration: {peer_config:?}"); + // // UpdateType::Delete + // if update.update_type == 2 { + // debug!("Deleting peer {peer_config:?}"); + // self.peers.remove(&peer_config.pubkey); + // if let Err(err) = self.wgapi.lock().unwrap().remove_peer( + // &peer_config.pubkey.as_str().try_into().unwrap_or_default(), + // ) { + // error!("Failed to delete peer: {err}"); + // } + // } + // // UpdateType::Create, UpdateType::Modify + // else { + // debug!( + // "Updating peer {peer_config:?}, update type: {}", + // update.update_type + // ); + // self.peers + // .insert(peer_config.pubkey.clone(), peer_config.clone()); + // if let Err(err) = self + // .wgapi + // .lock() + // .unwrap() + // .configure_peer(&peer_config.into()) + // { + // error!("Failed to update peer: {err}"); + // } + // } + // } + // Some(update::Update::FirewallConfig(config)) => { + // if self.config.disable_firewall_management { + // debug!( + // "Received firewall config update, but firewall management \ + // is disabled. Skipping processing this update: {config:?}" + // ); + // continue; + // } + + // debug!("Applying received firewall configuration: {config:?}"); + // let config_str = format!("{config:?}"); + // match FirewallConfig::from_proto(config) { + // Ok(new_firewall_config) => { + // debug!( + // "Parsed the received firewall configuration: \ + // {new_firewall_config:?}, processing it and applying \ + // changes" + // ); + // if let Err(err) = + // self.process_firewall_changes(Some(&new_firewall_config)) + // { + // error!( + // "Failed to process received firewall configuration: \ + // {err}" + // ); + // } + // } + // Err(err) => { + // error!( + // "Failed to parse received firewall configuration: {err}. \ + // Configuration: {config_str}" + // ); + // } + // } + // } + // Some(update::Update::DisableFirewall(())) => { + // if self.config.disable_firewall_management { + // debug!( + // "Received firewall disable request, but firewall management \ + // is disabled. Skipping processing this update" + // ); + // continue; + // } + + // debug!("Disabling firewall configuration"); + // if let Err(err) = self.process_firewall_changes(None) { + // error!("Failed to disable firewall configuration: {err}"); + // } + // } + // _ => warn!("Unsupported kind of update: {update:?}"), + // } + // } + // Ok(None) => { + // break; + // } + // Err(err) => { + // error!( + // "Disconnected from Defguard gRPC endoint: {}: {err}", + // self.config.grpc_url + // ); + // break; + // } + // } + // } + // } + + // Starts the gateway process. + // * Retrieves configuration and configuration updates from Defguard gRPC server + // * Manages the interface according to configuration and updates + // * Sends interface statistics to Defguard server periodically + // pub async fn start(&mut self) -> Result<(), GatewayError> { + // info!( + // "Starting Defguard gateway version {VERSION} with configuration: {:?}", + // mask!(self.config, token) + // ); + + // // Try to create network interface for WireGuard. + // // FIXME: check if the interface already exists, or somehow be more clever. + // if let Err(err) = self.wgapi.lock().unwrap().create_interface() { + // warn!( + // "Couldn't create network interface {}: {err}. Proceeding anyway.", + // self.config.ifname + // ); + // } else { + // #[cfg(target_os = "linux")] + // if !self.config.disable_firewall_management && self.config.masquerade { + // self.firewall_api.begin()?; + // self.firewall_api.setup_nat(self.config.masquerade, &[])?; + // self.firewall_api.commit()?; + // } + // } + + // info!( + // "Trying to connect to {} and obtain the gateway configuration from Defguard.", + // self.config.grpc_url + // ); + // loop { + // let mut updates_stream = self.connect().await; + // if let Some(post_up) = &self.config.post_up { + // debug!("Executing specified POST_UP command: {post_up}"); + // execute_command(post_up)?; + // } + // let (version, info) = get_tracing_variables(&self.core_info); + // let span = tracing::info_span!( + // "core_grpc", + // component = %DefguardComponent::Core, + // version = version.to_string(), + // info, + // ); + // let _guard = span.enter(); + // let stats_stream = self.spawn_stats_thread(); + // let client = self.client.clone(); + // select! { + // biased; + // () = Self::handle_stats_thread(client, stats_stream) => { + // error!("Stats stream aborted; reconnecting"); + // } + // () = self.handle_updates(&mut updates_stream) => { + // error!("Updates stream aborted; reconnecting"); + // } + // } + // } + // } +} - debug!("Applying received firewall configuration: {config:?}"); - let config_str = format!("{config:?}"); - match FirewallConfig::from_proto(config) { - Ok(new_firewall_config) => { - debug!( - "Parsed the received firewall configuration: \ - {new_firewall_config:?}, processing it and applying \ - changes" - ); - if let Err(err) = - self.process_firewall_changes(Some(&new_firewall_config)) - { - error!( - "Failed to process received firewall configuration: \ - {err}" - ); - } - } - Err(err) => { - error!( - "Failed to parse received firewall configuration: {err}. \ - Configuration: {config_str}" - ); - } - } - } - Some(update::Update::DisableFirewall(())) => { - if self.config.disable_firewall_management { - debug!( - "Received firewall disable request, but firewall management \ - is disabled. Skipping processing this update" - ); - continue; - } +pub struct GatewayServer { + auth_token: String, + message_id: AtomicU64, + gateway: Arc>, +} - debug!("Disabling firewall configuration"); - if let Err(err) = self.process_firewall_changes(None) { - error!("Failed to disable firewall configuration: {err}"); - } - } - _ => warn!("Unsupported kind of update: {update:?}"), - } - } - Ok(None) => { - break; - } - Err(err) => { - error!( - "Disconnected from Defguard gRPC endoint: {}: {err}", - self.config.grpc_url - ); - break; - } - } +impl GatewayServer { + #[must_use] + pub fn new(auth_token: String, gateway: Arc>) -> Self { + Self { + auth_token, + message_id: AtomicU64::new(0), + gateway, } } @@ -685,57 +769,215 @@ impl Gateway { /// * Retrieves configuration and configuration updates from Defguard gRPC server /// * Manages the interface according to configuration and updates /// * Sends interface statistics to Defguard server periodically - pub async fn start(&mut self) -> Result<(), GatewayError> { + pub async fn start(self, config: Config) -> Result<(), GatewayError> { info!( "Starting Defguard gateway version {VERSION} with configuration: {:?}", - mask!(self.config, token) + mask!(config, token) ); // Try to create network interface for WireGuard. // FIXME: check if the interface already exists, or somehow be more clever. - if let Err(err) = self.wgapi.lock().unwrap().create_interface() { + if let Err(err) = self + .gateway + .lock() + .unwrap() + .wgapi + .lock() + .unwrap() + .create_interface() + { warn!( "Couldn't create network interface {}: {err}. Proceeding anyway.", - self.config.ifname + config.ifname ); + } + + // self.get_config(); + if let Some(post_up) = &config.post_up { + debug!("Executing specified post-up command: {post_up}"); + execute_command(post_up)?; + } + + // Optionally, read gRPC TLS certificate and key. + debug!("Configuring certificates for gRPC"); + let grpc_cert = config + .grpc_cert + .as_ref() + .and_then(|path| read_to_string(path).ok()); + let grpc_key = config + .grpc_key + .as_ref() + .and_then(|path| read_to_string(path).ok()); + debug!("Configured certificates for gRPC, cert: {grpc_cert:?}"); + + // Build gRPC server. + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), config.grpc_port); + info!("gRPC server is listening on {addr}"); + let mut builder = if let (Some(cert), Some(key)) = (grpc_cert, grpc_key) { + let identity = Identity::from_pem(cert, key); + Server::builder().tls_config(ServerTlsConfig::new().identity(identity))? } else { - #[cfg(target_os = "linux")] - if !self.config.disable_firewall_management && self.config.masquerade { - self.firewall_api.begin()?; - self.firewall_api.setup_nat(self.config.masquerade, &[])?; - self.firewall_api.commit()?; + Server::builder() + }; + + // Start gRPC server. This should run indefinitely. + debug!("Serving gRPC"); + builder + .add_service(gateway_server::GatewayServer::new(self)) + .serve(addr) + .await?; + + Ok(()) + } +} + +#[tonic::async_trait] +impl gateway_server::Gateway for GatewayServer { + type BidiStream = UnboundedReceiverStream>; + + /// Handle bidirectional communication with Defguard core. + async fn bidi( + &self, + request: Request>, + ) -> Result, Status> { + let Some(address) = request.remote_addr() else { + error!("Failed to determine client address for request: {request:?}"); + return Err(Status::internal("Failed to determine client address")); + }; + info!("Defguard core gRPC client connected from {address}"); + + let (tx, rx) = mpsc::unbounded_channel(); + // First, send configuration request + if let Ok(hostname) = gethostname().into_string() { + let payload = ConfigurationRequest { + name: None, + auth_token: self.auth_token.clone(), + hostname, + }; + let req = CoreRequest { + id: self.message_id.fetch_add(1, Ordering::Relaxed), + payload: Some(core_request::Payload::ConfigRequest(payload)), + }; + + match tx.send(Ok(req)) { + Ok(()) => info!("Requesting network configuration from {address}"), + Err(err) => { + error!("Unable to send network configuration request to {address}: {err}"); + return Err(Status::internal("failed to send configuration request")); + } } + } else { + error!("Unable to get hostname"); + return Err(Status::internal("failed to get hostname")); } - info!( - "Trying to connect to {} and obtain the gateway configuration from Defguard.", - self.config.grpc_url - ); - loop { - let mut updates_stream = self.connect().await; - if let Some(post_up) = &self.config.post_up { - debug!("Executing specified POST_UP command: {post_up}"); - execute_command(post_up)?; - } - let (version, info) = get_tracing_variables(&self.core_info); - let span = tracing::info_span!( - "core_grpc", - component = %DefguardComponent::Core, - version = version.to_string(), - info, - ); - let _guard = span.enter(); - let stats_stream = self.spawn_stats_thread(); - let client = self.client.clone(); - select! { - biased; - () = Self::handle_stats_thread(client, stats_stream) => { - error!("Stats stream aborted; reconnecting"); + self.gateway.lock().unwrap().clients.insert(address, tx); + + let gateway = Arc::clone(&self.gateway); + let mut stream = request.into_inner(); + tokio::spawn(async move { + loop { + match stream.message().await { + Ok(Some(response)) => { + debug!("Received message from Defguard core: {response:?}"); + // Discard empty payloads. + if let Some(payload) = response.payload { + match payload { + core_response::Payload::Config(configuration) => { + match gateway.lock() { + Ok(mut gw) => { + gw.connected.store(true, Ordering::Relaxed); + let _ = gw.configure(configuration); + } + Err(err) => error!("Lock failed: {err}"), + } + } + core_response::Payload::Update(update) => match gateway.lock() { + Ok(mut gw) => { + // gw.handle_update(update); + } + Err(err) => error!("Lock failed: {err}"), + }, + core_response::Payload::Empty(()) => (), + } + } + } + Ok(None) => { + info!("gRPC stream from Defguard Core has been closed"); + break; + } + Err(err) => { + error!("gRPC stream from Defguard Core failed with error: {err}"); + break; + } } - () = self.handle_updates(&mut updates_stream) => { - error!("Updates stream aborted; reconnecting"); + } + info!("Defguard core gRPC stream has been disconnected: {address}"); + gateway + .lock() + .unwrap() + .connected + .store(false, Ordering::Relaxed); + gateway.lock().unwrap().clients.remove(&address); + }); + + Ok(Response::new(UnboundedReceiverStream::new(rx))) + } +} + +/// Gather WireGuard statistics and send them to Core through gRPC. +pub async fn run_stats(gateway: Arc>, period: Duration) -> Result<(), GatewayError> { + // Helper map to track if peer data is actually changing to avoid sending duplicate stats. + let mut peer_map = HashMap::new(); + let mut interval = interval(period); + let mut id = 1; + loop { + // Wait until next iteration. + interval.tick().await; + + debug!("Obtaining peer statistics from WireGuard"); + let result = gateway + .lock() + .unwrap() + .wgapi + .lock() + .unwrap() + .read_interface_data(); + match result { + Ok(host) => { + let peers = host.peers; + debug!( + "Found {} peers configured on WireGuard interface", + peers.len() + ); + // Filter out never connected peers. + for peer in peers.into_values().filter(|p| { + p.last_handshake + .map_or(false, |last_hs| last_hs != SystemTime::UNIX_EPOCH) + }) { + let has_changed = match peer_map.get(&peer.public_key) { + Some(last_peer) => *last_peer != peer, + None => true, + }; + if has_changed { + peer_map.insert(peer.public_key.clone(), peer.clone()); + let payload = core_request::Payload::PeerStats((&peer).into()); + let message = CoreRequest { + id, + payload: Some(payload), + }; + id += 1; + gateway.lock().unwrap().broadcast_to_clients(&message); + debug!("Sent statistics for peer {}", peer.public_key); + } else { + debug!( + "Statistics for peer {} have not changed. Skipping.", + peer.public_key + ); + } } } + Err(err) => error!("Failed to retrieve WireGuard interface statistics: {err}"), } } } diff --git a/src/main.rs b/src/main.rs index 7ad00ac1..ea2bf2b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,19 @@ -use std::{fs::File, io::Write, process, sync::Arc}; +use std::{ + fs::File, + io::Write, + process, + sync::{Arc, Mutex}, +}; use defguard_gateway::{ - VERSION, config::get_config, enterprise::firewall::api::FirewallApi, error::GatewayError, - execute_command, gateway::Gateway, init_syslog, server::run_server, + VERSION, + config::get_config, + enterprise::firewall::api::FirewallApi, + error::GatewayError, + execute_command, + gateway::{Gateway, GatewayServer, run_stats}, + init_syslog, + server::run_server, }; use defguard_version::Version; #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] @@ -42,7 +53,7 @@ async fn main() -> Result<(), GatewayError> { let ifname = config.ifname.clone(); let firewall_api = FirewallApi::new(&ifname)?; - let mut gateway = if config.userspace { + let gateway = if config.userspace { let wgapi = WGApi::::new(ifname)?; Gateway::new(config.clone(), wgapi, firewall_api)? } else { @@ -58,7 +69,10 @@ async fn main() -> Result<(), GatewayError> { } }; + // Keep track of spawned tasks. let mut tasks = JoinSet::new(); + + // Optionally, launch HTTP server to report gateway's health. if let Some(health_port) = config.health_port { tasks.spawn(run_server( health_port, @@ -66,7 +80,15 @@ async fn main() -> Result<(), GatewayError> { Arc::clone(&gateway.connected), )); } - tasks.spawn(async move { gateway.start().await }); + + // Launch statistics gathering task. + let gateway = Arc::new(Mutex::new(gateway)); + tasks.spawn(run_stats(Arc::clone(&gateway), config.stats_period())); + + // Launch gRPC server. + let gateway_server = GatewayServer::new(config.token.clone(), gateway); + tasks.spawn(gateway_server.start(config.clone())); + while let Some(Ok(result)) = tasks.join_next().await { result?; } From bf698bfe93cd036ec2ced949472529073f46f99f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Wed, 3 Dec 2025 13:03:09 +0100 Subject: [PATCH 2/5] Add version layer to gRPC server --- Cargo.lock | 64 +++++++++++++++++++++++--------------------------- Cargo.toml | 4 ++-- src/gateway.rs | 60 +++++++++++++++++++++++++++------------------- src/version.rs | 2 +- 4 files changed, 68 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8de0eaf..7b72405e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,9 +328,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.47" +version = "1.2.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd405d82c84ff7f35739f175f67d8b9fb7687a0e84ccdc78bd3568839827cf07" +checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" dependencies = [ "find-msvc-tools", "jobserver", @@ -573,7 +573,7 @@ dependencies = [ [[package]] name = "defguard_version" version = "0.0.0" -source = "git+https://github.com/DefGuard/defguard.git?rev=8649a9ba225d7bd2066a09c9e1347705c34bd158#8649a9ba225d7bd2066a09c9e1347705c34bd158" +source = "git+https://github.com/DefGuard/defguard.git?rev=640bae9a0aea1e11395f0a29fb8c84eeefd7f115#640bae9a0aea1e11395f0a29fb8c84eeefd7f115" dependencies = [ "axum", "http", @@ -842,7 +842,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bd49230192a3797a9a4d6abe9b3eed6f7fa4c8a8a4947977c6f80025f92cbd8" dependencies = [ "rustix", - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -946,13 +946,13 @@ dependencies = [ [[package]] name = "hostname" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +checksum = "617aaa3557aef3810a6369d0a99fac8a080891b68bd9f9812a1eeda0c0730cbd" dependencies = [ "cfg-if", "libc", - "windows-link 0.1.3", + "windows-link", ] [[package]] @@ -1283,9 +1283,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.177" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libgit2-sys" @@ -1306,7 +1306,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" dependencies = [ "cfg-if", - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -1344,9 +1344,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "matchers" @@ -1413,9 +1413,9 @@ dependencies = [ [[package]] name = "mnl" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f15c13b46a48d07ffae279f54a43ebfb0badee28a59a32aacdddc67effa2c4db" +checksum = "5d3a616a69f9e3b7c5c3c9f707fe146bea1d2eef93b706271a9bc3f1070f71fb" dependencies = [ "libc", "log", @@ -1788,7 +1788,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -2130,9 +2130,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" +checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" dependencies = [ "zeroize", ] @@ -2749,9 +2749,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" dependencies = [ "log", "pin-project-lite", @@ -2793,9 +2793,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ "matchers", "nu-ansi-term", @@ -3131,7 +3131,7 @@ checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ "windows-implement", "windows-interface", - "windows-link 0.2.1", + "windows-link", "windows-result", "windows-strings", ] @@ -3143,7 +3143,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" dependencies = [ "windows-core", - "windows-link 0.2.1", + "windows-link", "windows-threading", ] @@ -3169,12 +3169,6 @@ dependencies = [ "syn", ] -[[package]] -name = "windows-link" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" - [[package]] name = "windows-link" version = "0.2.1" @@ -3188,7 +3182,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" dependencies = [ "windows-core", - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -3197,7 +3191,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" dependencies = [ - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -3206,7 +3200,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" dependencies = [ - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -3242,7 +3236,7 @@ version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -3267,7 +3261,7 @@ version = "0.53.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" dependencies = [ - "windows-link 0.2.1", + "windows-link", "windows_aarch64_gnullvm 0.53.1", "windows_aarch64_msvc 0.53.1", "windows_i686_gnu 0.53.1", @@ -3284,7 +3278,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" dependencies = [ - "windows-link 0.2.1", + "windows-link", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2c27df1f..e09999ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" axum = "0.8" base64 = "0.22" clap = { version = "4.5", features = ["derive", "env"] } -defguard_version = { git = "https://github.com/DefGuard/defguard.git", rev = "8649a9ba225d7bd2066a09c9e1347705c34bd158" } +defguard_version = { git = "https://github.com/DefGuard/defguard.git", rev = "640bae9a0aea1e11395f0a29fb8c84eeefd7f115" } defguard_wireguard_rs = { git = "https://github.com/DefGuard/wireguard-rs", rev = "886186c1e088e4805ab8049436c28cf3ea26d727" } env_logger = "0.11" gethostname = "1.0" @@ -37,7 +37,7 @@ tower = "0.5" [target.'cfg(target_os = "linux")'.dependencies] nftnl = { git = "https://github.com/DefGuard/nftnl-rs.git", rev = "1a1147271f43b9d7182a114bb056a5224c35d38f" } -mnl = "0.2" +mnl = "0.3" [target.'cfg(any(target_os = "freebsd", target_os = "macos", target_os = "netbsd"))'.dependencies] nix = { version = "0.30", default-features = false, features = ["ioctl"] } diff --git a/src/gateway.rs b/src/gateway.rs index fe1cbf87..0fb766d3 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,6 +1,6 @@ use defguard_version::{ ComponentInfo, DefguardComponent, Version, client::ClientVersionInterceptor, - get_tracing_variables, + get_tracing_variables, server::DefguardVersionLayer, }; use defguard_wireguard_rs::{WireguardInterfaceApi, net::IpAddrMask}; use gethostname::gethostname; @@ -25,9 +25,10 @@ use tonic::{ Request, Response, Status, Streaming, codegen::InterceptedService, metadata::{Ascii, MetadataValue}, - service::Interceptor, + service::{Interceptor, InterceptorLayer}, transport::{Channel, Identity, Server, ServerTlsConfig}, }; +use tower::ServiceBuilder; use tracing::{Instrument, instrument}; use crate::{ @@ -823,7 +824,18 @@ impl GatewayServer { // Start gRPC server. This should run indefinitely. debug!("Serving gRPC"); builder - .add_service(gateway_server::GatewayServer::new(self)) + .add_service( + ServiceBuilder::new() + // .layer(InterceptorLayer::new(JwtInterceptor::new( + // ClaimsType::Gateway, + // ))) + // .layer(InterceptorLayer::new(CoreVersionInterceptor::new( + // MIN_CORE_VERSION, + // incompatible_components, + // ))) + .layer(DefguardVersionLayer::new(Version::parse(VERSION)?)) + .service(gateway_server::GatewayServer::new(self)), + ) .serve(addr) .await?; @@ -844,31 +856,31 @@ impl gateway_server::Gateway for GatewayServer { error!("Failed to determine client address for request: {request:?}"); return Err(Status::internal("Failed to determine client address")); }; - info!("Defguard core gRPC client connected from {address}"); + info!("Defguard Core gRPC client connected from {address}"); let (tx, rx) = mpsc::unbounded_channel(); - // First, send configuration request - if let Ok(hostname) = gethostname().into_string() { - let payload = ConfigurationRequest { - name: None, - auth_token: self.auth_token.clone(), - hostname, - }; - let req = CoreRequest { - id: self.message_id.fetch_add(1, Ordering::Relaxed), - payload: Some(core_request::Payload::ConfigRequest(payload)), - }; - - match tx.send(Ok(req)) { - Ok(()) => info!("Requesting network configuration from {address}"), - Err(err) => { - error!("Unable to send network configuration request to {address}: {err}"); - return Err(Status::internal("failed to send configuration request")); - } - } - } else { + // First, send configuration request. + let Ok(hostname) = gethostname().into_string() else { error!("Unable to get hostname"); return Err(Status::internal("failed to get hostname")); + }; + + let payload = ConfigurationRequest { + name: None, // TODO: remove? + auth_token: self.auth_token.clone(), + hostname, + }; + let req = CoreRequest { + id: self.message_id.fetch_add(1, Ordering::Relaxed), + payload: Some(core_request::Payload::ConfigRequest(payload)), + }; + + match tx.send(Ok(req)) { + Ok(()) => info!("Requesting network configuration from {address}"), + Err(err) => { + error!("Unable to send network configuration request to {address}: {err}"); + return Err(Status::internal("failed to send configuration request")); + } } self.gateway.lock().unwrap().clients.insert(address, tx); diff --git a/src/version.rs b/src/version.rs index 8e0b75fe..f737e19f 100644 --- a/src/version.rs +++ b/src/version.rs @@ -1,6 +1,6 @@ use defguard_version::{Version, is_version_lower}; -const MIN_CORE_VERSION: Version = Version::new(1, 5, 0); +const MIN_CORE_VERSION: Version = Version::new(1, 6, 0); /// Ensures the core version meets minimum version requirements. /// Terminates the process if it doesn't. From a6caec61d6064977f011bbcfc339b0e5366357ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Thu, 4 Dec 2025 14:46:38 +0100 Subject: [PATCH 3/5] Comment-out gateway tests for now --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- src/gateway.rs | 5 ++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b72405e..a1fd1f50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1038,9 +1038,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "bytes", "futures-channel", @@ -1413,9 +1413,9 @@ dependencies = [ [[package]] name = "mnl" -version = "0.3.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3a616a69f9e3b7c5c3c9f707fe146bea1d2eef93b706271a9bc3f1070f71fb" +checksum = "f15c13b46a48d07ffae279f54a43ebfb0badee28a59a32aacdddc67effa2c4db" dependencies = [ "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index e09999ed..144fe9f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ tower = "0.5" [target.'cfg(target_os = "linux")'.dependencies] nftnl = { git = "https://github.com/DefGuard/nftnl-rs.git", rev = "1a1147271f43b9d7182a114bb056a5224c35d38f" } -mnl = "0.3" +mnl = "0.2" [target.'cfg(any(target_os = "freebsd", target_os = "macos", target_os = "netbsd"))'.dependencies] nix = { version = "0.30", default-features = false, features = ["ioctl"] } diff --git a/src/gateway.rs b/src/gateway.rs index 0fb766d3..4406bf5e 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -826,9 +826,6 @@ impl GatewayServer { builder .add_service( ServiceBuilder::new() - // .layer(InterceptorLayer::new(JwtInterceptor::new( - // ClaimsType::Gateway, - // ))) // .layer(InterceptorLayer::new(CoreVersionInterceptor::new( // MIN_CORE_VERSION, // incompatible_components, @@ -994,6 +991,7 @@ pub async fn run_stats(gateway: Arc>, period: Duration) -> Result } } +/* #[cfg(test)] mod tests { use std::{net::Ipv4Addr, slice::from_ref}; @@ -1364,3 +1362,4 @@ mod tests { assert!(gateway.has_firewall_config_changed(&config5)); } } +*/ From 33011ed72ada6a4bc1cd888b574a2549d7f40dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Fri, 5 Dec 2025 11:01:32 +0100 Subject: [PATCH 4/5] Handle updates --- src/gateway.rs | 291 ++++++++++++++++++++----------------------------- 1 file changed, 119 insertions(+), 172 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 4406bf5e..40c3a0d6 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -26,7 +26,7 @@ use tonic::{ codegen::InterceptedService, metadata::{Ascii, MetadataValue}, service::{Interceptor, InterceptorLayer}, - transport::{Channel, Identity, Server, ServerTlsConfig}, + transport::{Identity, Server, ServerTlsConfig}, }; use tower::ServiceBuilder; use tracing::{Instrument, instrument}; @@ -578,176 +578,97 @@ impl Gateway { // .layer(InterceptorLayer::new(auth_interceptor)) // .service(channel); // let client = GatewayServiceClient::new(channel); - - // debug!("gRPC client configuration done"); // Ok(client) // } - // #[instrument(skip_all)] - // async fn handle_updates(&mut self, updates_stream: &mut Streaming) { - // loop { - // match updates_stream.message().await { - // Ok(Some(update)) => { - // debug!("Received update: {update:?}"); - // match update.update { - // Some(update::Update::Network(configuration)) => { - // if let Err(err) = self.configure(configuration) { - // error!("Failed to update network configuration: {err}"); - // } - // } - // Some(update::Update::Peer(peer_config)) => { - // debug!("Applying peer configuration: {peer_config:?}"); - // // UpdateType::Delete - // if update.update_type == 2 { - // debug!("Deleting peer {peer_config:?}"); - // self.peers.remove(&peer_config.pubkey); - // if let Err(err) = self.wgapi.lock().unwrap().remove_peer( - // &peer_config.pubkey.as_str().try_into().unwrap_or_default(), - // ) { - // error!("Failed to delete peer: {err}"); - // } - // } - // // UpdateType::Create, UpdateType::Modify - // else { - // debug!( - // "Updating peer {peer_config:?}, update type: {}", - // update.update_type - // ); - // self.peers - // .insert(peer_config.pubkey.clone(), peer_config.clone()); - // if let Err(err) = self - // .wgapi - // .lock() - // .unwrap() - // .configure_peer(&peer_config.into()) - // { - // error!("Failed to update peer: {err}"); - // } - // } - // } - // Some(update::Update::FirewallConfig(config)) => { - // if self.config.disable_firewall_management { - // debug!( - // "Received firewall config update, but firewall management \ - // is disabled. Skipping processing this update: {config:?}" - // ); - // continue; - // } - - // debug!("Applying received firewall configuration: {config:?}"); - // let config_str = format!("{config:?}"); - // match FirewallConfig::from_proto(config) { - // Ok(new_firewall_config) => { - // debug!( - // "Parsed the received firewall configuration: \ - // {new_firewall_config:?}, processing it and applying \ - // changes" - // ); - // if let Err(err) = - // self.process_firewall_changes(Some(&new_firewall_config)) - // { - // error!( - // "Failed to process received firewall configuration: \ - // {err}" - // ); - // } - // } - // Err(err) => { - // error!( - // "Failed to parse received firewall configuration: {err}. \ - // Configuration: {config_str}" - // ); - // } - // } - // } - // Some(update::Update::DisableFirewall(())) => { - // if self.config.disable_firewall_management { - // debug!( - // "Received firewall disable request, but firewall management \ - // is disabled. Skipping processing this update" - // ); - // continue; - // } - - // debug!("Disabling firewall configuration"); - // if let Err(err) = self.process_firewall_changes(None) { - // error!("Failed to disable firewall configuration: {err}"); - // } - // } - // _ => warn!("Unsupported kind of update: {update:?}"), - // } - // } - // Ok(None) => { - // break; - // } - // Err(err) => { - // error!( - // "Disconnected from Defguard gRPC endoint: {}: {err}", - // self.config.grpc_url - // ); - // break; - // } - // } - // } - // } - - // Starts the gateway process. - // * Retrieves configuration and configuration updates from Defguard gRPC server - // * Manages the interface according to configuration and updates - // * Sends interface statistics to Defguard server periodically - // pub async fn start(&mut self) -> Result<(), GatewayError> { - // info!( - // "Starting Defguard gateway version {VERSION} with configuration: {:?}", - // mask!(self.config, token) - // ); + #[instrument(skip_all)] + fn handle_updates(&mut self, update: Update) { + debug!("Received update: {update:?}"); + match update.update { + Some(update::Update::Network(configuration)) => { + if let Err(err) = self.configure(configuration) { + error!("Failed to update network configuration: {err}"); + } + } + Some(update::Update::Peer(peer_config)) => { + debug!("Applying peer configuration: {peer_config:?}"); + // UpdateType::Delete + if update.update_type == 2 { + debug!("Deleting peer {peer_config:?}"); + self.peers.remove(&peer_config.pubkey); + if let Err(err) = + self.wgapi.lock().unwrap().remove_peer( + &peer_config.pubkey.as_str().try_into().unwrap_or_default(), + ) + { + error!("Failed to delete peer: {err}"); + } + } + // UpdateType::Create, UpdateType::Modify + else { + debug!( + "Updating peer {peer_config:?}, update type: {}", + update.update_type + ); + self.peers + .insert(peer_config.pubkey.clone(), peer_config.clone()); + if let Err(err) = self + .wgapi + .lock() + .unwrap() + .configure_peer(&peer_config.into()) + { + error!("Failed to update peer: {err}"); + } + } + } + Some(update::Update::FirewallConfig(config)) => { + if self.config.disable_firewall_management { + debug!( + "Received firewall config update, but firewall management is disabled. \ + Skipping processing this update: {config:?}" + ); + return; + } - // // Try to create network interface for WireGuard. - // // FIXME: check if the interface already exists, or somehow be more clever. - // if let Err(err) = self.wgapi.lock().unwrap().create_interface() { - // warn!( - // "Couldn't create network interface {}: {err}. Proceeding anyway.", - // self.config.ifname - // ); - // } else { - // #[cfg(target_os = "linux")] - // if !self.config.disable_firewall_management && self.config.masquerade { - // self.firewall_api.begin()?; - // self.firewall_api.setup_nat(self.config.masquerade, &[])?; - // self.firewall_api.commit()?; - // } - // } + debug!("Applying received firewall configuration: {config:?}"); + let config_str = format!("{config:?}"); + match FirewallConfig::from_proto(config) { + Ok(new_firewall_config) => { + debug!( + "Parsed the received firewall configuration: {new_firewall_config:?}, \ + processing it and applying changes" + ); + if let Err(err) = self.process_firewall_changes(Some(&new_firewall_config)) + { + error!("Failed to process received firewall configuration: {err}"); + } + } + Err(err) => { + error!( + "Failed to parse received firewall configuration: {err}. \ + Configuration: {config_str}" + ); + } + } + } + Some(update::Update::DisableFirewall(())) => { + if self.config.disable_firewall_management { + debug!( + "Received firewall disable request, but firewall management is disabled. \ + Skipping processing this update" + ); + return; + } - // info!( - // "Trying to connect to {} and obtain the gateway configuration from Defguard.", - // self.config.grpc_url - // ); - // loop { - // let mut updates_stream = self.connect().await; - // if let Some(post_up) = &self.config.post_up { - // debug!("Executing specified POST_UP command: {post_up}"); - // execute_command(post_up)?; - // } - // let (version, info) = get_tracing_variables(&self.core_info); - // let span = tracing::info_span!( - // "core_grpc", - // component = %DefguardComponent::Core, - // version = version.to_string(), - // info, - // ); - // let _guard = span.enter(); - // let stats_stream = self.spawn_stats_thread(); - // let client = self.client.clone(); - // select! { - // biased; - // () = Self::handle_stats_thread(client, stats_stream) => { - // error!("Stats stream aborted; reconnecting"); - // } - // () = self.handle_updates(&mut updates_stream) => { - // error!("Updates stream aborted; reconnecting"); - // } - // } - // } - // } + debug!("Disabling firewall configuration"); + if let Err(err) = self.process_firewall_changes(None) { + error!("Failed to disable firewall configuration: {err}"); + } + } + _ => warn!("Unsupported kind of update: {update:?}"), + } + } } pub struct GatewayServer { @@ -772,7 +693,7 @@ impl GatewayServer { /// * Sends interface statistics to Defguard server periodically pub async fn start(self, config: Config) -> Result<(), GatewayError> { info!( - "Starting Defguard gateway version {VERSION} with configuration: {:?}", + "Starting Defguard Gateway version {VERSION} with configuration: {:?}", mask!(config, token) ); @@ -791,11 +712,17 @@ impl GatewayServer { "Couldn't create network interface {}: {err}. Proceeding anyway.", config.ifname ); + } else { + #[cfg(target_os = "linux")] + if !self.config.disable_firewall_management && self.config.masquerade { + self.firewall_api.begin()?; + self.firewall_api.setup_nat(self.config.masquerade, &[])?; + self.firewall_api.commit()?; + } } - // self.get_config(); if let Some(post_up) = &config.post_up { - debug!("Executing specified post-up command: {post_up}"); + debug!("Executing specified POST_UP command: {post_up}"); execute_command(post_up)?; } @@ -811,6 +738,26 @@ impl GatewayServer { .and_then(|path| read_to_string(path).ok()); debug!("Configured certificates for gRPC, cert: {grpc_cert:?}"); + // let (version, info) = get_tracing_variables(&self.core_info); + // let span = tracing::info_span!( + // "core_grpc", + // component = %DefguardComponent::Core, + // version = version.to_string(), + // info, + // ); + // let _guard = span.enter(); + // let stats_stream = self.spawn_stats_thread(); + // let client = self.client.clone(); + // select! { + // biased; + // () = Self::handle_stats_thread(client, stats_stream) => { + // error!("Stats stream aborted; reconnecting"); + // } + // () = self.handle_updates(&mut updates_stream) => { + // error!("Updates stream aborted; reconnecting"); + // } + // } + // Build gRPC server. let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), config.grpc_port); info!("gRPC server is listening on {addr}"); @@ -856,12 +803,12 @@ impl gateway_server::Gateway for GatewayServer { info!("Defguard Core gRPC client connected from {address}"); let (tx, rx) = mpsc::unbounded_channel(); - // First, send configuration request. let Ok(hostname) = gethostname().into_string() else { error!("Unable to get hostname"); return Err(Status::internal("failed to get hostname")); }; + // First, send configuration request. let payload = ConfigurationRequest { name: None, // TODO: remove? auth_token: self.auth_token.clone(), @@ -888,7 +835,7 @@ impl gateway_server::Gateway for GatewayServer { loop { match stream.message().await { Ok(Some(response)) => { - debug!("Received message from Defguard core: {response:?}"); + debug!("Received message from Defguard Core: {response:?}"); // Discard empty payloads. if let Some(payload) = response.payload { match payload { @@ -903,7 +850,7 @@ impl gateway_server::Gateway for GatewayServer { } core_response::Payload::Update(update) => match gateway.lock() { Ok(mut gw) => { - // gw.handle_update(update); + gw.handle_updates(update); } Err(err) => error!("Lock failed: {err}"), }, From d75844b37c394e02dd669d11c65816ff8f4e39ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Ciarcin=CC=81ski?= Date: Fri, 5 Dec 2025 12:58:57 +0100 Subject: [PATCH 5/5] Unclog tests --- src/gateway.rs | 126 ++++++++++++++++--------------------------------- src/version.rs | 10 ++-- 2 files changed, 47 insertions(+), 89 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 40c3a0d6..08dc48b9 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -15,21 +15,16 @@ use std::{ }, time::{Duration, SystemTime}, }; -use tokio::{ - sync::mpsc, - task::{JoinHandle, spawn}, - time::{interval, sleep}, -}; +use tokio::{sync::mpsc, time::interval}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{ Request, Response, Status, Streaming, - codegen::InterceptedService, metadata::{Ascii, MetadataValue}, - service::{Interceptor, InterceptorLayer}, + service::Interceptor, transport::{Identity, Server, ServerTlsConfig}, }; use tower::ServiceBuilder; -use tracing::{Instrument, instrument}; +use tracing::instrument; use crate::{ VERSION, @@ -47,8 +42,6 @@ use crate::{ version::ensure_core_version_supported, }; -const TEN_SECS: Duration = Duration::from_secs(10); - // helper struct which stores just the interface config without peers #[derive(Clone, PartialEq)] struct InterfaceConfiguration { @@ -107,7 +100,7 @@ impl AuthInterceptor { impl Interceptor for AuthInterceptor { fn call(&mut self, mut request: Request<()>) -> Result, Status> { - // Add auth headers + // Add authorisation headers. let metadata = request.metadata_mut(); metadata.insert("authorization", self.token.clone()); metadata.insert("hostname", self.hostname.clone()); @@ -126,13 +119,10 @@ pub struct Gateway { interface_configuration: Option, peers: HashMap, wgapi: Arc>, - #[cfg_attr(not(target_os = "linux"), allow(unused))] firewall_api: FirewallApi, - #[cfg_attr(not(target_os = "linux"), allow(unused))] firewall_config: Option, pub connected: Arc, core_info: Option, - // stats_thread: Option>, // TODO: allow only one client. pub(super) clients: ClientMap, } @@ -156,7 +146,7 @@ impl Gateway { }) } - // replace current peer map with a new list of peers + // Replace current peer map with a new list of peers. fn replace_peers(&mut self, new_peers: Vec) { debug!("Replacing stored peers with {} new peers", new_peers.len()); let peers = new_peers @@ -166,7 +156,7 @@ impl Gateway { self.peers = peers; } - // check if new received configuration is different than current one + // Check if new received configuration is different than current one. fn is_interface_config_changed( &self, new_interface_configuration: &InterfaceConfiguration, @@ -179,7 +169,7 @@ impl Gateway { true } - // check if new peers are the same as the stored ones + // Check if new peers are the same as the stored ones. fn is_peer_list_changed(&self, new_peers: &[Peer]) -> bool { // check if number of peers is different if self.peers.len() != new_peers.len() { @@ -203,7 +193,7 @@ impl Gateway { }) } - // /// Starts tokio thread collecting stats and sending them to backend service via gRPC. + // /// Starts tokio thread collecting stats and send them to Defguard Code over gRPC. // #[instrument(skip_all)] // fn spawn_stats_thread(&mut self) -> UnboundedReceiverStream { // if let Some(handle) = self.stats_thread.take() { @@ -460,7 +450,7 @@ impl Gateway { ); } - // process received firewall config unless firewall management is disabled + // Process received firewall configuration, unless firewall management is disabled. if self.config.disable_firewall_management { debug!("Firewall management is disabled. Skipping updating firewall configuration"); } else { @@ -478,7 +468,7 @@ impl Gateway { } /// Send message to all connected clients. - pub fn broadcast_to_clients(&self, message: &CoreRequest) { + fn broadcast_to_clients(&self, message: &CoreRequest) { for (addr, tx) in &self.clients { if tx.send(Ok(message.clone())).is_err() { debug!("Failed to send message to {addr}"); @@ -552,35 +542,6 @@ impl Gateway { // } // } - // fn setup_client(config: &Config) -> Result { - // debug!("Preparing gRPC client configuration"); - // let tls = ClientTlsConfig::new(); - // // Use CA if provided, otherwise load certificates from system. - // let tls = if let Some(ca) = &config.grpc_ca { - // let ca = read_to_string(ca).map_err(|err| { - // error!("Failed to read CA file: {err}"); - // GatewayError::InvalidCaFile - // })?; - // tls.ca_certificate(Certificate::from_pem(ca)) - // } else { - // tls.with_enabled_roots() - // }; - // let endpoint = Endpoint::from_shared(config.grpc_url.clone())? - // .http2_keep_alive_interval(TEN_SECS) - // .tcp_keepalive(Some(TEN_SECS)) - // .keep_alive_while_idle(true) - // .tls_config(tls)?; - // let channel = endpoint.connect_lazy(); - // let version_interceptor = ClientVersionInterceptor::new(Version::parse(VERSION)?); - // let auth_interceptor = AuthInterceptor::new(&config.token)?; - // let channel = ServiceBuilder::new() - // .layer(InterceptorLayer::new(version_interceptor)) - // .layer(InterceptorLayer::new(auth_interceptor)) - // .service(channel); - // let client = GatewayServiceClient::new(channel); - // Ok(client) - // } - #[instrument(skip_all)] fn handle_updates(&mut self, update: Update) { debug!("Received update: {update:?}"); @@ -699,25 +660,25 @@ impl GatewayServer { // Try to create network interface for WireGuard. // FIXME: check if the interface already exists, or somehow be more clever. - if let Err(err) = self - .gateway - .lock() - .unwrap() - .wgapi - .lock() - .unwrap() - .create_interface() { - warn!( - "Couldn't create network interface {}: {err}. Proceeding anyway.", - config.ifname - ); - } else { - #[cfg(target_os = "linux")] - if !self.config.disable_firewall_management && self.config.masquerade { - self.firewall_api.begin()?; - self.firewall_api.setup_nat(self.config.masquerade, &[])?; - self.firewall_api.commit()?; + let gateway = &self.gateway.lock().expect("gateway mutex poison"); + if let Err(err) = gateway + .wgapi + .lock() + .expect("wgapi mutex poison") + .create_interface() + { + warn!( + "Couldn't create network interface {}: {err}. Proceeding anyway.", + config.ifname + ); + } else { + #[cfg(target_os = "linux")] + if !config.disable_firewall_management && config.masquerade { + gateway.firewall_api.begin()?; + gateway.firewall_api.setup_nat(config.masquerade, &[])?; + &gateway.firewall_api.commit()?; + } } } @@ -868,7 +829,7 @@ impl gateway_server::Gateway for GatewayServer { } } } - info!("Defguard core gRPC stream has been disconnected: {address}"); + info!("Defguard Core gRPC stream has been disconnected: {address}"); gateway .lock() .unwrap() @@ -894,10 +855,10 @@ pub async fn run_stats(gateway: Arc>, period: Duration) -> Result debug!("Obtaining peer statistics from WireGuard"); let result = gateway .lock() - .unwrap() + .expect("gateway mutex poison") .wgapi .lock() - .unwrap() + .expect("wgapi mutex poison") .read_interface_data(); match result { Ok(host) => { @@ -923,7 +884,10 @@ pub async fn run_stats(gateway: Arc>, period: Duration) -> Result payload: Some(payload), }; id += 1; - gateway.lock().unwrap().broadcast_to_clients(&message); + gateway + .lock() + .expect("gateway mutex poison") + .broadcast_to_clients(&message); debug!("Sent statistics for peer {}", peer.public_key); } else { debug!( @@ -938,7 +902,6 @@ pub async fn run_stats(gateway: Arc>, period: Duration) -> Result } } -/* #[cfg(test)] mod tests { use std::{net::Ipv4Addr, slice::from_ref}; @@ -989,19 +952,17 @@ mod tests { let wgapi = WG::new("wg0").unwrap(); let config = Config::default(); - let client = Gateway::setup_client(&config).unwrap(); let firewall_api = FirewallApi::new("wg0").unwrap(); let gateway = Gateway { config, interface_configuration: Some(old_config.clone()), peers: old_peers_map, wgapi: Arc::new(Mutex::new(wgapi)), - connected: Arc::new(AtomicBool::new(false)), - client, - stats_thread: None, firewall_api, firewall_config: None, + connected: Arc::new(AtomicBool::new(false)), core_info: None, + clients: ClientMap::new(), }; // new config is the same @@ -1178,18 +1139,16 @@ mod tests { let wgapi = WG::new("wg0").unwrap(); let config = Config::default(); - let client = Gateway::setup_client(&config).unwrap(); let mut gateway = Gateway { config, interface_configuration: None, peers: HashMap::new(), wgapi: Arc::new(Mutex::new(wgapi)), - connected: Arc::new(AtomicBool::new(false)), - client, - stats_thread: None, firewall_api: FirewallApi::new("test_interface").unwrap(), firewall_config: None, + connected: Arc::new(AtomicBool::new(false)), core_info: None, + clients: ClientMap::new(), }; // Gateway has no firewall config, new rules are empty @@ -1247,18 +1206,16 @@ mod tests { let wgapi = WG::new("wg0").unwrap(); let config = Config::default(); - let client = Gateway::setup_client(&config).unwrap(); let mut gateway = Gateway { config, interface_configuration: None, peers: HashMap::new(), wgapi: Arc::new(Mutex::new(wgapi)), - connected: Arc::new(AtomicBool::new(false)), - client, - stats_thread: None, firewall_api: FirewallApi::new("test_interface").unwrap(), firewall_config: None, + connected: Arc::new(AtomicBool::new(false)), core_info: None, + clients: ClientMap::new(), }; // Gateway has no config gateway.firewall_config = None; @@ -1309,4 +1266,3 @@ mod tests { assert!(gateway.has_firewall_config_changed(&config5)); } } -*/ diff --git a/src/version.rs b/src/version.rs index f737e19f..a55451ae 100644 --- a/src/version.rs +++ b/src/version.rs @@ -2,22 +2,24 @@ use defguard_version::{Version, is_version_lower}; const MIN_CORE_VERSION: Version = Version::new(1, 6, 0); -/// Ensures the core version meets minimum version requirements. +/// Ensures Defguard Core version meets minimum version requirements. /// Terminates the process if it doesn't. pub(crate) fn ensure_core_version_supported(core_version: Option<&Version>) { let Some(core_version) = core_version else { error!( - "Missing core component version information. This most likely means that core component uses outdated version. Exiting." + "Missing Defguard Core version information. This most likely means that Defguard Core \ + uses outdated version. Exiting." ); std::process::exit(1); }; if is_version_lower(core_version, &MIN_CORE_VERSION) { error!( - "Core version {core_version} is not supported. Minimal supported core version is {MIN_CORE_VERSION}. Exiting." + "Defguard Core version {core_version} is not supported. Minimal supported version is \ + {MIN_CORE_VERSION}. Exiting." ); std::process::exit(1); } - info!("Core version {core_version} is supported"); + info!("Defguard Core version {core_version} is supported"); }