From 04641fe6f86ffa069ae2ba1140fd3d992bbf0491 Mon Sep 17 00:00:00 2001 From: Tim Diekmann Date: Tue, 9 Sep 2025 16:12:22 +0200 Subject: [PATCH 1/4] Implement Wall profiler --- .env | 1 + Cargo.lock | 14 +- Cargo.toml | 1 + libs/@local/telemetry/Cargo.toml | 3 +- libs/@local/telemetry/src/lib.rs | 48 +++-- .../telemetry/src/profiling/collector.rs | 131 ++++++++++++ libs/@local/telemetry/src/profiling/mod.rs | 192 ++++++++++++++++++ .../telemetry/src/profiling/uploader.rs | 95 +++++++++ tests/graph/benches/graph/lib.rs | 19 +- tests/graph/benches/util.rs | 24 ++- 10 files changed, 486 insertions(+), 42 deletions(-) create mode 100644 libs/@local/telemetry/src/profiling/collector.rs create mode 100644 libs/@local/telemetry/src/profiling/mod.rs create mode 100644 libs/@local/telemetry/src/profiling/uploader.rs diff --git a/.env b/.env index ee15e462ef3..5f6c20cac6f 100644 --- a/.env +++ b/.env @@ -80,6 +80,7 @@ HASH_REDIS_HOST=localhost HASH_REDIS_PORT=6379 HASH_OTLP_ENDPOINT=http://localhost:4317 +HASH_PROFILER_ENDPOINT=http://localhost:4040 HASH_SEARCH_LOADER_PORT=3838 HASH_SEARCH_QUEUE_NAME=search diff --git a/Cargo.lock b/Cargo.lock index 45ad812537f..218e342b151 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3790,12 +3790,14 @@ version = "0.0.0" dependencies = [ "clap", "clap_builder", + "crossbeam-channel", "derive_more 2.0.1", "error-stack", "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry_sdk", + "reqwest", "sentry", "sentry-core", "sentry-types", @@ -3805,7 +3807,6 @@ dependencies = [ "tracing-appender", "tracing-core", "tracing-error", - "tracing-flame", "tracing-opentelemetry", "tracing-subscriber", ] @@ -9685,17 +9686,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "tracing-flame" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9" -dependencies = [ - "lazy_static", - "tracing", - "tracing-subscriber", -] - [[package]] name = "tracing-indicatif" version = "0.3.13" diff --git a/Cargo.toml b/Cargo.toml index 2765b53a554..29065039127 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,7 @@ console_error_panic_hook = { version = "=0.1.7", default-features = fa convert_case = { version = "=0.8.0", default-features = false } criterion = { version = "=0.7.0" } criterion-macro = { version = "=0.4.0", default-features = false } +crossbeam-channel = { version = "=0.5.15", default-features = false } dashu-base = { version = "=0.4.1", default-features = false } dashu-float = { version = "=0.4.3", default-features = false } deadpool = { version = "=0.12.2", default-features = false } diff --git a/libs/@local/telemetry/Cargo.toml b/libs/@local/telemetry/Cargo.toml index f041cff4a4c..aecdcf14f37 100644 --- a/libs/@local/telemetry/Cargo.toml +++ b/libs/@local/telemetry/Cargo.toml @@ -25,14 +25,15 @@ tracing-subscriber = { workspace = true, public = true, features = ["std", "ansi # Private third-party dependencies clap = { workspace = true, optional = true, features = ["derive", "env"] } +crossbeam-channel = { workspace = true } derive_more = { workspace = true, features = ["display", "error"] } opentelemetry-appender-tracing = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["trace", "logs", "metrics", "rt-tokio"] } +reqwest = { workspace = true } sentry = { workspace = true } simple-mermaid = { workspace = true } tracing = { workspace = true } tracing-error = { workspace = true } -tracing-flame = { workspace = true } tracing-opentelemetry = { workspace = true } [features] diff --git a/libs/@local/telemetry/src/lib.rs b/libs/@local/telemetry/src/lib.rs index 7f5052f3baa..6b11cf8fd75 100644 --- a/libs/@local/telemetry/src/lib.rs +++ b/libs/@local/telemetry/src/lib.rs @@ -17,15 +17,12 @@ extern crate alloc; pub mod logging; pub mod metrics; +pub mod profiling; pub mod traces; mod otlp; -use std::{ - fs::{self, File}, - io::BufWriter, - path::PathBuf, -}; +use core::time::Duration; use error_stack::{Report, ResultExt as _}; use opentelemetry_sdk::{ @@ -33,12 +30,12 @@ use opentelemetry_sdk::{ }; use tracing::{Dispatch, subscriber::DefaultGuard, warn}; use tracing_error::ErrorLayer; -use tracing_flame::{FlameLayer, FlushGuard}; use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _}; pub use self::otlp::OtlpConfig; use self::{ logging::{ConsoleConfig, FileConfig, LoggingConfig}, + profiling::{ProfilerCliConfig, ProfilerConfig}, traces::sentry::SentryConfig, }; @@ -54,6 +51,9 @@ pub struct TracingConfig { #[cfg_attr(feature = "clap", clap(flatten))] pub sentry: SentryConfig, + + #[cfg_attr(feature = "clap", clap(flatten))] + pub profile: ProfilerCliConfig, } #[derive(Debug, derive_more::Display, derive_more::Error)] @@ -63,11 +63,11 @@ pub struct InitTracingError; #[derive(Debug, Default)] pub struct TelemetryRegistry { error_layer: bool, + profiler: Option, console_logging: Option, file_logging: Option, sentry: Option, otlp: Option<(OtlpConfig, &'static str)>, - flamegraph_path: Option, } impl TelemetryRegistry { @@ -111,8 +111,10 @@ impl TelemetryRegistry { } #[must_use] - pub fn with_flamegraph(mut self, target_dir: impl Into) -> Self { - self.flamegraph_path = Some(target_dir.into()); + pub fn with_tracing_profiler(mut self, config: ProfilerConfig) -> Self { + if config.pyroscope_endpoint.is_some() || config.folded_path.is_some() { + self.profiler = Some(config); + } self } @@ -157,13 +159,11 @@ impl TelemetryRegistry { }) .transpose()?; - let (flame_layer, flamegraph_guard) = self - .flamegraph_path - .map(|path| { - fs::create_dir_all(&path).change_context(InitTracingError)?; - FlameLayer::with_file(path.join("tracing.folded")).change_context(InitTracingError) - }) - .transpose()? + let (profiling_layer, profiler_guard) = self + .profiler + .map(ProfilerConfig::build) + .transpose() + .change_context(InitTracingError)? .unzip(); let guard = TelemetryGuard { @@ -171,7 +171,7 @@ impl TelemetryRegistry { otlp_logs_provider, otlp_metrics_provider, _file_guard: file_guard, - _flamegraph: flamegraph_guard, + _profiler: profiler_guard, }; Ok(( @@ -182,7 +182,7 @@ impl TelemetryRegistry { .with(otlp_traces_layer) .with(console_layer) .with(file_layer) - .with(flame_layer), + .with(profiling_layer), guard, )) } @@ -245,15 +245,15 @@ impl TelemetryRegistry { } } -struct TelemetryGuard { +struct TelemetryGuard { otlp_traces_provider: Option, otlp_logs_provider: Option, otlp_metrics_provider: Option, _file_guard: F, - _flamegraph: Option>>, + _profiler: Option

, } -impl Drop for TelemetryGuard { +impl Drop for TelemetryGuard { fn drop(&mut self) { if let Some(provider) = self.otlp_metrics_provider.take() && let Err(error) = provider.shutdown() @@ -290,5 +290,11 @@ pub fn init_tracing( .with_file_logging(config.logging.file) .with_sentry(config.sentry) .with_otlp(config.otlp, service_name) + .with_tracing_profiler(ProfilerConfig { + pyroscope_endpoint: config.profile.profile_endpoint, + folded_path: None, + service_name: service_name.to_owned(), + flush_interval: Duration::from_secs(config.profile.flush_interval_seconds), + }) .init_global() } diff --git a/libs/@local/telemetry/src/profiling/collector.rs b/libs/@local/telemetry/src/profiling/collector.rs new file mode 100644 index 00000000000..df3e78f6abf --- /dev/null +++ b/libs/@local/telemetry/src/profiling/collector.rs @@ -0,0 +1,131 @@ +use core::{ops::ControlFlow, time::Duration}; +use std::{ + fs::{self, File}, + io::{self, BufWriter, Write as _}, + thread, +}; + +use crossbeam_channel::{Receiver, select, tick}; +use error_stack::Report; + +use super::{ControlMessage, Message, ProfilerConfig, SpanMessage, uploader::ProfileUploader}; + +#[derive(Debug)] +pub(super) struct ProfileCollector { + uploader: Option, + folded_file: Option>, + flush_interval: Duration, +} + +impl ProfileCollector { + pub(crate) fn new(mut config: ProfilerConfig) -> Result> { + if let Some(endpoint) = &mut config.pyroscope_endpoint { + endpoint.push_str("/ingest"); + } + config.service_name = format!("{}.wall", config.service_name.replace(' ', "-")); + + Ok(Self { + uploader: config + .pyroscope_endpoint + .map(|endpoint| ProfileUploader::new(endpoint, config.service_name)), + folded_file: config + .folded_path + .as_ref() + .map(|path| { + fs::create_dir_all(path)?; + File::create(path.join("tracing.folded")).map(BufWriter::new) + }) + .transpose()?, + flush_interval: config.flush_interval, + }) + } + + pub(crate) fn run( + mut self, + message_rx: Receiver, + ctrl_rx: Receiver, + ) -> thread::JoinHandle<()> { + let tick = tick(self.flush_interval); + thread::spawn(move || { + loop { + let ctrl = select! { + recv(message_rx) -> message => match message { + Ok(message) => self.process_message(message), + Err(_) => self.process_ctrl_message(ControlMessage::Shutdown), + }, + recv(ctrl_rx) -> ctrl => match ctrl { + Ok(ctrl) => self.process_ctrl_message(ctrl), + Err(_) => self.process_ctrl_message(ControlMessage::Shutdown), + }, + recv(tick) -> _ => self.process_ctrl_message(ControlMessage::Flush), + }; + + match ctrl { + ControlFlow::Continue(()) => {} + ControlFlow::Break(()) => break, + } + } + }) + } + + fn process_message(&mut self, message: Message) -> ControlFlow<()> { + match message { + Message::RecordSpan(span) => { + self.record_message(&span); + ControlFlow::Continue(()) + } + } + } + + fn process_ctrl_message(&mut self, message: ControlMessage) -> ControlFlow<()> { + match message { + ControlMessage::Flush => { + self.flush(); + ControlFlow::Continue(()) + } + ControlMessage::Shutdown => { + self.flush(); + ControlFlow::Break(()) + } + } + } + + fn record_message(&mut self, message: &SpanMessage) { + let scopes = message.scopes.join(";"); + + let args = if scopes.is_empty() { + format_args!("{} {}\n", message.thread_name, message.duration.as_nanos()) + } else { + format_args!( + "{}; {} {}\n", + message.thread_name, + scopes, + message.duration.as_nanos() + ) + }; + + if let Some(file) = &mut self.folded_file + && let Err(error) = file.write_fmt(args) + { + tracing::error!(%error, "Failed to write profile data to file"); + } + if let Some(buffer) = &mut self.uploader + && let Err(error) = buffer.write_fmt(args) + { + tracing::error!(%error, "Failed to write profile data to uploader"); + } + } + + fn flush(&mut self) { + if let Some(file) = &mut self.folded_file + && let Err(error) = file.flush() + { + tracing::error!(%error, "Failed to flush profile data to file"); + } + if let Some(buffer) = &mut self.uploader + && let Err(error) = buffer.flush() + { + tracing::error!(%error, "Failed to flush profile data to uploader"); + } + } +} diff --git a/libs/@local/telemetry/src/profiling/mod.rs b/libs/@local/telemetry/src/profiling/mod.rs new file mode 100644 index 00000000000..74f64c8dbb6 --- /dev/null +++ b/libs/@local/telemetry/src/profiling/mod.rs @@ -0,0 +1,192 @@ +mod collector; +mod uploader; + +use alloc::sync::Arc; +use core::{fmt::Write as _, time::Duration}; +use std::{io, path::PathBuf, thread::JoinHandle, time::Instant}; + +use crossbeam_channel::Sender; +use error_stack::Report; +use tracing::{Subscriber, span}; +use tracing_subscriber::{ + Layer, + layer::Context, + registry::{LookupSpan, SpanRef}, +}; + +use self::collector::ProfileCollector; + +thread_local! { + /// Use thread name if available (e.g., "main", "tokio-runtime-worker"), otherwise thread ID. + /// + /// With work stealing (e.g., Tokio), the actual execution thread may differ from the thread + /// where the span was created. Typically, work-stealing threads share the same name so in the + /// context of profiling the spans share the same thread name. + static THREAD_NAME: Arc = { + let thread = std::thread::current(); + thread.name().map_or_else(|| Arc::from(format!("{:?}", thread.id())), Arc::from) + }; +} + +struct SpanMessage { + thread_name: Arc, + scopes: Vec, + duration: Duration, +} + +impl SpanMessage { + fn from_span<'r, R>(span: &SpanRef<'r, R>, duration: Duration) -> Self + where + R: LookupSpan<'r>, + { + Self { + thread_name: THREAD_NAME.with(Arc::clone), + scopes: span + .scope() + .from_root() + .map(|span| { + let mut span_str = String::new(); + if let Some(module_path) = span.metadata().module_path() { + let _ = write!(span_str, "{module_path}::"); + } + + _ = write!(span_str, "{}", span.name()); + + if let Some(file) = span.metadata().file() { + let _ = write!(span_str, ":{file}"); + } + if let Some(line) = span.metadata().line() { + let _ = write!(span_str, ":{line}"); + } + span_str + }) + .collect(), + duration, + } + } +} + +enum Message { + RecordSpan(SpanMessage), +} + +#[derive(Debug, Copy, Clone)] +enum ControlMessage { + Flush, + Shutdown, +} + +#[derive(Debug, Clone)] +#[cfg_attr(feature = "clap", derive(clap::Parser), clap(next_help_heading = Some("Profiler")))] +pub struct ProfilerCliConfig { + #[cfg_attr( + feature = "clap", + arg(long = "profile-endpoint", env = "HASH_PROFILER_ENDPOINT",) + )] + pub profile_endpoint: Option, + + #[cfg_attr( + feature = "clap", + arg( + long = "profile-flush-interval-seconds", + env = "HASH_GRAPH_PROFILE_FLUSH_INTERVAL_SECONDS", + default_value_t = 10, + ) + )] + pub flush_interval_seconds: u64, +} + +#[derive(Debug)] +pub struct ProfilerConfig { + pub pyroscope_endpoint: Option, + pub folded_path: Option, + pub service_name: String, + pub flush_interval: Duration, +} + +impl ProfilerConfig { + /// Builds the profiling layer and its associated dropper. + /// + /// # Errors + /// + /// Returns an error if the profiling layer cannot be created. + pub fn build(self) -> Result<(impl Layer, impl Drop), Report> + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + struct Dropper { + join: Option>, + control_tx: Sender, + } + + impl Drop for Dropper { + fn drop(&mut self) { + _ = self.control_tx.send(ControlMessage::Shutdown); + self.join.take().map(JoinHandle::join); + } + } + + let (message_tx, message_rx) = crossbeam_channel::bounded(1_000); + let (control_tx, control_rx) = crossbeam_channel::unbounded(); + + Ok(( + ProfileLayer { message_tx }, + Dropper { + join: Some(ProfileCollector::new(self)?.run(message_rx, control_rx)), + control_tx, + }, + )) + } +} + +struct Timings { + last: Instant, + // Track total time of all children for exclusive calculation + child_total: Duration, +} +#[derive(Debug)] +struct ProfileLayer { + message_tx: Sender, +} + +impl Layer for ProfileLayer +where + S: Subscriber + for<'s> LookupSpan<'s>, +{ + fn on_new_span(&self, _attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(id) else { + return; + }; + + span.extensions_mut().insert(Timings { + last: Instant::now(), + child_total: Duration::ZERO, + }); + } + + fn on_close(&self, id: span::Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(&id) else { + return; + }; + + let duration = if let Some(timings) = span.extensions_mut().get_mut::() { + let total_duration = timings.last.elapsed(); + + // Add this span's total duration to parent's child_total for exclusive calculation + if let Some(parent) = span.parent() + && let Some(parent_timings) = parent.extensions_mut().get_mut::() + { + parent_timings.child_total += total_duration; + } + + // Calculate exclusive time by subtracting child durations + total_duration.saturating_sub(timings.child_total) + } else { + return; + }; + + _ = self + .message_tx + .send(Message::RecordSpan(SpanMessage::from_span(&span, duration))); + } +} diff --git a/libs/@local/telemetry/src/profiling/uploader.rs b/libs/@local/telemetry/src/profiling/uploader.rs new file mode 100644 index 00000000000..c306d4a83cb --- /dev/null +++ b/libs/@local/telemetry/src/profiling/uploader.rs @@ -0,0 +1,95 @@ +use core::mem; +use std::{ + io::{self, Write}, + time::{SystemTime, UNIX_EPOCH}, +}; + +#[derive(Debug)] +pub(crate) struct ProfileUploader { + client: reqwest::Client, + buffer: Vec, + endpoint: String, + service_name: String, + last_upload_seconds: u64, + runtime_handle: Option, +} + +impl ProfileUploader { + pub(crate) fn new(endpoint: String, service_name: String) -> Self { + Self { + client: reqwest::Client::new(), + buffer: Vec::with_capacity(1024 * 1024), // 1MB initial capacity + endpoint, + service_name, + last_upload_seconds: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(), + runtime_handle: tokio::runtime::Handle::try_current().ok(), + } + } +} + +impl Write for ProfileUploader { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buffer.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + + if !self.buffer.is_empty() { + let request = async { + self.client + .post(&self.endpoint) + .query(&[ + ("format", "folded"), + ("units", "nanoseconds"), + ("sampleRate", "1000000000"), // nanosecond precision + ("name", &self.service_name), + ("from", &self.last_upload_seconds.to_string()), + ("until", &now.to_string()), + ]) + .header("Content-Type", "text/plain") + .body(mem::take(&mut self.buffer)) + .send() + .await + .map_err(io::Error::other) + }; + + let response = if let Some(handle) = &self.runtime_handle { + handle.block_on(request)? + } else if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.block_on(request)? + } else { + tracing::warn!( + "No Tokio runtime found, spawning a new one for Pyroscope upload. Consider \ + initializing a Tokio runtime earlier to avoid this overhead." + ); + + // If we're not in a Tokio runtime, spawn a new one for this blocking task. + tokio::runtime::Runtime::new() + .map_err(io::Error::other)? + .block_on(request)? + }; + + if !response.status().is_success() { + tracing::warn!( + status = %response.status(), + from = self.last_upload_seconds, + until = now, + "Pyroscope upload failed" + ); + return Ok(()); + } + } + + self.last_upload_seconds = now; + + Ok(()) + } +} diff --git a/tests/graph/benches/graph/lib.rs b/tests/graph/benches/graph/lib.rs index 248b589e1e9..ceab2647405 100644 --- a/tests/graph/benches/graph/lib.rs +++ b/tests/graph/benches/graph/lib.rs @@ -20,6 +20,7 @@ mod scenario; #[path = "../util.rs"] mod util; +use core::time::Duration; use std::{ collections::HashMap, fs::{self}, @@ -33,6 +34,7 @@ use hash_repo_chores::benches::generate_path; use hash_telemetry::{ OtlpConfig, TelemetryRegistry, logging::{ColorOption, ConsoleConfig, ConsoleStream, LogFormat}, + profiling::ProfilerConfig, }; use self::scenario::run_scenario_file; @@ -53,11 +55,18 @@ fn init_tracing(scenario: &str, bench: &str) -> impl Drop { }, "Graph Benches", ) - .with_flamegraph(Path::new("out").join(generate_path( - "scenarios", - Some(scenario), - Some(bench), - ))) + .with_tracing_profiler(ProfilerConfig { + pyroscope_endpoint: std::env::var("HASH_PROFILER_ENDPOINT").ok(), + folded_path: Some(Path::new("out").join(generate_path( + "scenarios", + Some(scenario), + Some(bench), + ))), + service_name: format!( + "graph-benches{{group=scenarios,function={scenario},value={bench}}}" + ), + flush_interval: Duration::from_secs(10), + }) .init() .expect("Failed to initialize tracing") } diff --git a/tests/graph/benches/util.rs b/tests/graph/benches/util.rs index 5d5aea2fcdd..4fe122ff7bc 100644 --- a/tests/graph/benches/util.rs +++ b/tests/graph/benches/util.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] -use core::mem::ManuallyDrop; +use core::{mem::ManuallyDrop, time::Duration}; use std::{collections::HashMap, path::Path}; use hash_graph_postgres_store::{ @@ -19,7 +19,7 @@ use hash_graph_store::{ query::ConflictBehavior, }; use hash_repo_chores::benches::generate_path; -use hash_telemetry::TelemetryRegistry; +use hash_telemetry::{TelemetryRegistry, profiling::ProfilerConfig}; use regex::Regex; use time::OffsetDateTime; use tokio::runtime::Runtime; @@ -59,8 +59,26 @@ pub fn setup_subscriber( function_id: Option<&str>, value_str: Option<&str>, ) -> impl Drop { + let labels = match (function_id, value_str) { + (Some(function_id), Some(value_str)) => { + format!(",function={function_id},value={value_str}") + } + (Some(function_id), None) => format!(",function={function_id}"), + (None, Some(value_str)) => format!(",value={value_str}"), + (None, None) => String::new(), + }; + TelemetryRegistry::default() - .with_flamegraph(Path::new("out").join(generate_path(group_id, function_id, value_str))) + .with_tracing_profiler(ProfilerConfig { + pyroscope_endpoint: std::env::var("HASH_PROFILER_ENDPOINT").ok(), + folded_path: Some(Path::new("out").join(generate_path( + group_id, + function_id, + value_str, + ))), + service_name: format!("graph-benches{{group={group_id}{labels}}}"), + flush_interval: Duration::from_secs(10), + }) .init() .expect("Failed to initialize tracing") } From ed827bc205c67cce16026fb4446f4201e6bf368c Mon Sep 17 00:00:00 2001 From: Tim Diekmann Date: Wed, 10 Sep 2025 11:19:52 +0200 Subject: [PATCH 2/4] Fix Route53 in AWS --- .../terraform/hash/hash_application/graph.tf | 3 + .../hash/hash_application/type_fetcher.tf | 2 + .../hash/hash_application/variables.tf | 5 ++ infra/terraform/hash/main.tf | 1 + .../hash/observability/alloy/target_groups.tf | 4 +- .../hash/observability/load_balancer.tf | 2 +- infra/terraform/hash/observability/outputs.tf | 5 ++ infra/terraform/hash/observability/tls.tf | 74 ++++--------------- infra/terraform/hash/tls.tf | 17 +++++ 9 files changed, 52 insertions(+), 61 deletions(-) diff --git a/infra/terraform/hash/hash_application/graph.tf b/infra/terraform/hash/hash_application/graph.tf index 322ebfd0f28..f933428beda 100644 --- a/infra/terraform/hash/hash_application/graph.tf +++ b/infra/terraform/hash/hash_application/graph.tf @@ -180,6 +180,8 @@ locals { ], [ { name = "HASH_OTLP_ENDPOINT", value = "grpc://${local.otel_grpc_container_port_dns}:${local.otel_grpc_container_port}" }, + { name = "HASH_GRAPH_LOG_LEVEL", value = "info" }, + # { name = "HASH_PROFILER_ENDPOINT", value = "https://${var.profile_exporter_endpoint}" }, ] ) secrets = [ @@ -241,6 +243,7 @@ locals { { name = "HASH_OTLP_ENDPOINT", value = "grpc://${local.otel_grpc_container_port_dns}:${local.otel_grpc_container_port}" }, { name = "HASH_TEMPORAL_SERVER_HOST", value = var.temporal_host }, { name = "HASH_TEMPORAL_SERVER_PORT", value = var.temporal_port }, + # { name = "HASH_PROFILER_ENDPOINT", value = "https://${var.profile_exporter_endpoint}" }, ] ) diff --git a/infra/terraform/hash/hash_application/type_fetcher.tf b/infra/terraform/hash/hash_application/type_fetcher.tf index c28565870c6..ea9d81d2727 100644 --- a/infra/terraform/hash/hash_application/type_fetcher.tf +++ b/infra/terraform/hash/hash_application/type_fetcher.tf @@ -36,6 +36,8 @@ locals { { name = "HASH_GRAPH_TYPE_FETCHER_HOST", value = "0.0.0.0" }, { name = "HASH_GRAPH_TYPE_FETCHER_PORT", value = tostring(local.type_fetcher_container_port) }, { name = "HASH_OTLP_ENDPOINT", value = "grpc://${local.otel_grpc_container_port_dns}:${local.otel_grpc_container_port}" }, + { name = "HASH_GRAPH_LOG_LEVEL", value = "info" }, + # { name = "HASH_PROFILER_ENDPOINT", value = "https://${var.profile_exporter_endpoint}" }, ] essential = true diff --git a/infra/terraform/hash/hash_application/variables.tf b/infra/terraform/hash/hash_application/variables.tf index cfe287fb2f9..9b471a56b42 100644 --- a/infra/terraform/hash/hash_application/variables.tf +++ b/infra/terraform/hash/hash_application/variables.tf @@ -215,6 +215,11 @@ variable "otel_exporter_otlp_endpoint" { description = "The DNS endpoint for the OpenTelemetry Collector OTLP exporter." } +variable "profile_exporter_endpoint" { + type = string + description = "The DNS endpoint for the Pyroscope profiler." +} + variable "amazon_trust_ca_bundle" { type = string description = "Combined Amazon Trust Services CA bundle for ACM certificate verification." diff --git a/infra/terraform/hash/main.tf b/infra/terraform/hash/main.tf index ec65635d2c9..06f11ea4ff0 100644 --- a/infra/terraform/hash/main.tf +++ b/infra/terraform/hash/main.tf @@ -515,5 +515,6 @@ module "application" { temporal_port = module.temporal.port otel_exporter_otlp_endpoint = module.observability.otel_otlp_endpoint + profile_exporter_endpoint = module.observability.profile_exporter_endpoint amazon_trust_ca_bundle = local.amazon_trust_ca_bundle } diff --git a/infra/terraform/hash/observability/alloy/target_groups.tf b/infra/terraform/hash/observability/alloy/target_groups.tf index 69c49f7daa0..333570dcb73 100644 --- a/infra/terraform/hash/observability/alloy/target_groups.tf +++ b/infra/terraform/hash/observability/alloy/target_groups.tf @@ -20,7 +20,7 @@ resource "aws_lb_target_group" "profile_internal" { tags = { Name = "${var.prefix}-profile-int" - Purpose = "OpenTelemetry Collector internal profile target group" + Purpose = "Alloy internal profile target group" } } @@ -46,6 +46,6 @@ resource "aws_lb_target_group" "profile_external" { tags = { Name = "${var.prefix}-profile-ext" - Purpose = "OpenTelemetry Collector external profile target group" + Purpose = "Alloy external profile target group" } } diff --git a/infra/terraform/hash/observability/load_balancer.tf b/infra/terraform/hash/observability/load_balancer.tf index df89ee99fbd..262987b7cde 100644 --- a/infra/terraform/hash/observability/load_balancer.tf +++ b/infra/terraform/hash/observability/load_balancer.tf @@ -295,7 +295,7 @@ resource "aws_lb_listener" "internal_https" { port = "443" protocol = "HTTPS" ssl_policy = "ELBSecurityPolicy-TLS13-1-2-2021-06" - certificate_arn = aws_acm_certificate_validation.otlp.certificate_arn + certificate_arn = aws_acm_certificate_validation.vpc_wildcard.certificate_arn # Default action - return 404 for unknown hosts default_action { diff --git a/infra/terraform/hash/observability/outputs.tf b/infra/terraform/hash/observability/outputs.tf index f5334cdbba6..7dc9d25069a 100644 --- a/infra/terraform/hash/observability/outputs.tf +++ b/infra/terraform/hash/observability/outputs.tf @@ -4,3 +4,8 @@ output "otel_otlp_endpoint" { description = "OTLP HTTPS DNS name for sending telemetry via internal domain" value = aws_route53_record.otlp.fqdn } + +output "profile_exporter_endpoint" { + description = "Profile exporter DNS name for sending profiling data via internal domain" + value = aws_route53_record.profile.fqdn +} diff --git a/infra/terraform/hash/observability/tls.tf b/infra/terraform/hash/observability/tls.tf index 7b6c759b168..f0b0c9aa4b7 100644 --- a/infra/terraform/hash/observability/tls.tf +++ b/infra/terraform/hash/observability/tls.tf @@ -4,16 +4,9 @@ data "cloudflare_zones" "hash_ai" { name = "hash.ai" } -resource "aws_route53_record" "otlp" { - zone_id = var.vpc_zone_id - name = "otlp" - type = "CNAME" - ttl = 300 - records = [aws_lb.observability_internal.dns_name] -} -resource "aws_acm_certificate" "otlp" { - domain_name = "otlp.vpc.hash.ai" +resource "aws_acm_certificate" "vpc_wildcard" { + domain_name = "*.vpc.hash.ai" validation_method = "DNS" lifecycle { @@ -21,14 +14,14 @@ resource "aws_acm_certificate" "otlp" { } tags = { - Name = "otlp.vpc.hash.ai" + Name = "vpc-wildcard.hash.ai" } } # Cloudflare DNS Records for ACM validation -resource "cloudflare_dns_record" "otlp_cert_validation" { +resource "cloudflare_dns_record" "vpc_wildcard_cert_validation" { for_each = { - for dvo in aws_acm_certificate.otlp.domain_validation_options : dvo.domain_name => { + for dvo in aws_acm_certificate.vpc_wildcard.domain_validation_options : dvo.domain_name => { name = dvo.resource_record_name record = dvo.resource_record_value type = dvo.resource_record_type @@ -45,62 +38,27 @@ resource "cloudflare_dns_record" "otlp_cert_validation" { } # Wait for ACM validation -resource "aws_acm_certificate_validation" "otlp" { - certificate_arn = aws_acm_certificate.otlp.arn - validation_record_fqdns = [for record in cloudflare_dns_record.otlp_cert_validation : trimsuffix(record.name, ".")] +resource "aws_acm_certificate_validation" "vpc_wildcard" { + certificate_arn = aws_acm_certificate.vpc_wildcard.arn + validation_record_fqdns = [for record in cloudflare_dns_record.vpc_wildcard_cert_validation : trimsuffix(record.name, ".")] timeouts { create = "5m" } } -resource "aws_route53_record" "profile" { +resource "aws_route53_record" "otlp" { zone_id = var.vpc_zone_id - name = "profile" + name = "otlp" type = "CNAME" ttl = 300 records = [aws_lb.observability_internal.dns_name] } - -resource "aws_acm_certificate" "profile" { - domain_name = "profile.vpc.hash.ai" - validation_method = "DNS" - - lifecycle { - create_before_destroy = true - } - - tags = { - Name = "profile.vpc.hash.ai" - } -} - -# Cloudflare DNS Records for ACM validation -resource "cloudflare_dns_record" "profile_cert_validation" { - for_each = { - for dvo in aws_acm_certificate.profile.domain_validation_options : dvo.domain_name => { - name = dvo.resource_record_name - record = dvo.resource_record_value - type = dvo.resource_record_type - } - } - - zone_id = data.cloudflare_zones.hash_ai.result[0].id - name = each.value.name - content = each.value.record - type = each.value.type - ttl = 1 - - tags = ["terraform"] -} - -# Wait for ACM validation -resource "aws_acm_certificate_validation" "profile" { - certificate_arn = aws_acm_certificate.profile.arn - validation_record_fqdns = [for record in cloudflare_dns_record.profile_cert_validation : trimsuffix(record.name, ".")] - - timeouts { - create = "5m" - } +resource "aws_route53_record" "profile" { + zone_id = var.vpc_zone_id + name = "profile" + type = "CNAME" + ttl = 300 + records = [aws_lb.observability_internal.dns_name] } diff --git a/infra/terraform/hash/tls.tf b/infra/terraform/hash/tls.tf index a5726d579a1..a2f6bb98370 100644 --- a/infra/terraform/hash/tls.tf +++ b/infra/terraform/hash/tls.tf @@ -52,6 +52,23 @@ resource "cloudflare_dns_record" "caa_hash_ai" { tags = ["terraform"] } +# CAA Record to allow AWS Certificate Manager to issue wildcard certificates +resource "cloudflare_dns_record" "caa_hash_ai_wildcard" { + zone_id = data.cloudflare_zones.hash_ai.result[0].id + name = "hash.ai" + type = "CAA" + + data = { + flags = "0" + tag = "issuewild" + value = "amazon.com" + } + + ttl = 1 + + tags = ["terraform"] +} + # Private Hosted Zone for internal AWS services # Shared across all modules for internal service DNS resolution resource "aws_route53_zone" "vpc" { From 8bad15df90ffc46c95edd897fc74db288293ef5a Mon Sep 17 00:00:00 2001 From: Tim Diekmann Date: Wed, 10 Sep 2025 12:57:28 +0200 Subject: [PATCH 3/4] Support CPU profiling in benchmarks --- Cargo.lock | 203 ++++++++++++++++-- Cargo.toml | 2 + libs/@local/telemetry/Cargo.toml | 2 + libs/@local/telemetry/src/lib.rs | 8 +- .../telemetry/src/profiling/collector.rs | 24 ++- libs/@local/telemetry/src/profiling/mod.rs | 102 +++++++-- .../telemetry/src/profiling/uploader.rs | 44 ++-- tests/graph/benches/graph/lib.rs | 12 +- .../manual_queries/entity_queries/mod.rs | 1 + .../graph/benches/policy/benchmark_matrix.rs | 1 + .../read_scaling/knowledge/complete/entity.rs | 2 + .../read_scaling/knowledge/linkless/entity.rs | 1 + .../graph/benches/representative_read/lib.rs | 8 +- tests/graph/benches/util.rs | 31 ++- 14 files changed, 361 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 218e342b151..3f2f21e7876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,6 +35,12 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "aead" version = "0.5.2" @@ -1721,6 +1727,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "417bef24afe1460300965a25ff4a24b8b45ad011948302ec221e8a0a81eb2c79" +[[package]] +name = "cpp_demangle" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d" +dependencies = [ + "cfg-if", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -3797,6 +3812,8 @@ dependencies = [ "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry_sdk", + "pyroscope", + "pyroscope_pprofrs", "reqwest", "sentry", "sentry-core", @@ -4707,6 +4724,15 @@ dependencies = [ "nom", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -4955,6 +4981,26 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +[[package]] +name = "libflate" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ff4ae71b685bbad2f2f391fe74f6b7659a34871c08b210fdc039e43bee07d18" +dependencies = [ + "adler32", + "crc32fast", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf" +dependencies = [ + "rle-decode-fast", +] + [[package]] name = "libfuzzer-sys" version = "0.4.10" @@ -5544,6 +5590,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memmap2" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7" +dependencies = [ + "libc", +] + [[package]] name = "miette" version = "7.6.0" @@ -5698,6 +5753,15 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "names" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bddcd3bf5144b6392de80e04c347cd7fab2508f6df16a85fc496ecd5cec39bc" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "napi" version = "2.16.17" @@ -5891,6 +5955,17 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.9.3", + "cfg-if", + "libc", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -6148,7 +6223,7 @@ dependencies = [ "opentelemetry", "opentelemetry-proto", "opentelemetry_sdk", - "prost", + "prost 0.13.5", "thiserror 2.0.16", "tokio", "tonic 0.13.1", @@ -6162,7 +6237,7 @@ checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" dependencies = [ "opentelemetry", "opentelemetry_sdk", - "prost", + "prost 0.13.5", "tonic 0.13.1", ] @@ -6953,6 +7028,26 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "pprof2" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8961ed0a916b512e565f8070eb0dfa05773dd140160b45ac9a5ad339b557adeb" +dependencies = [ + "backtrace", + "cfg-if", + "findshlibs", + "libc", + "log", + "nix 0.27.1", + "once_cell", + "parking_lot", + "smallvec 1.15.1", + "symbolic-demangle", + "tempfile", + "thiserror 2.0.16", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -7134,6 +7229,16 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.13.5" @@ -7141,7 +7246,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", ] [[package]] @@ -7157,13 +7262,26 @@ dependencies = [ "once_cell", "petgraph 0.7.1", "prettyplease", - "prost", + "prost 0.13.5", "prost-types", "regex", "syn 2.0.106", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.13.5" @@ -7183,7 +7301,7 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" dependencies = [ - "prost", + "prost 0.13.5", ] [[package]] @@ -7194,7 +7312,7 @@ checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" dependencies = [ "chrono", "inventory", - "prost", + "prost 0.13.5", "serde", "serde_derive", "serde_json", @@ -7208,7 +7326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" dependencies = [ "heck", - "prost", + "prost 0.13.5", "prost-build", "prost-types", "quote", @@ -7221,7 +7339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" dependencies = [ "chrono", - "prost", + "prost 0.13.5", "prost-build", "prost-types", "prost-wkt", @@ -7241,6 +7359,35 @@ dependencies = [ "cc", ] +[[package]] +name = "pyroscope" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a5f63b0d2727095db59045e6a0ef3259b28b90d481ae88f0e3d866d0234ce8" +dependencies = [ + "libc", + "libflate", + "log", + "names", + "prost 0.11.9", + "reqwest", + "serde_json", + "thiserror 1.0.69", + "url", + "winapi", +] + +[[package]] +name = "pyroscope_pprofrs" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50da7a8950c542357de489aa9ee628f46322b1beaac1f4fa3313bcdebe85b4ea" +dependencies = [ + "log", + "pprof2", + "pyroscope", +] + [[package]] name = "qoi" version = "0.4.1" @@ -7785,6 +7932,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "roaring" version = "0.11.2" @@ -7854,7 +8007,7 @@ dependencies = [ "netlink-packet-utils", "netlink-proto", "netlink-sys", - "nix", + "nix 0.26.4", "thiserror 1.0.69", "tokio", ] @@ -8761,6 +8914,29 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7401a30af6cb5818bb64852270bb722533397edcfc7344954a38f420819ece2" +[[package]] +name = "symbolic-common" +version = "12.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "918a8461341098b48201d587de5e059f46a21e3bb871b4c6f2be4f95f01bbe69" +dependencies = [ + "debugid", + "memmap2", + "stable_deref_trait", + "uuid", +] + +[[package]] +name = "symbolic-demangle" +version = "12.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59a41a39f701a19a0f7db09520b2e15c70f59fd16abc1c4f3d34e265ffef9228" +dependencies = [ + "cpp_demangle", + "rustc-demangle", + "symbolic-common", +] + [[package]] name = "syn" version = "1.0.109" @@ -8768,6 +8944,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", + "quote", "unicode-ident", ] @@ -8978,7 +9155,7 @@ dependencies = [ "async-trait", "derive_builder", "derive_more 1.0.0", - "prost", + "prost 0.13.5", "prost-types", "serde_json", "temporal-sdk-core-protos", @@ -8996,7 +9173,7 @@ dependencies = [ "anyhow", "base64 0.22.1", "derive_more 1.0.0", - "prost", + "prost 0.13.5", "prost-build", "prost-types", "prost-wkt", @@ -9492,7 +9669,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "rustls-native-certs", "rustls-pemfile", "socket2 0.5.10", @@ -9522,7 +9699,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "rustls-native-certs", "tokio", "tokio-rustls", diff --git a/Cargo.toml b/Cargo.toml index 29065039127..287f74aad85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -214,6 +214,8 @@ proc-macro-error2 = { version = "=2.0.1", default-features = fa proc-macro2 = { version = "=1.0.101", default-features = false } prometheus-client = { version = "=0.24.0", default-features = false } proptest = { version = "=1.7.0", default-features = false, features = ["alloc", "std"] } # `std` or `no_std` are required, `no_std` pulls in `libm` +pyroscope = { version = "=0.5.8", default-features = false } +pyroscope_pprofrs = { version = "=0.2.10", default-features = false } quote = { version = "=1.0.40", default-features = false } radix_trie = { version = "=0.2.1", default-features = false } rand = { version = "=0.9.2", default-features = false } diff --git a/libs/@local/telemetry/Cargo.toml b/libs/@local/telemetry/Cargo.toml index aecdcf14f37..650f3383a62 100644 --- a/libs/@local/telemetry/Cargo.toml +++ b/libs/@local/telemetry/Cargo.toml @@ -29,6 +29,8 @@ crossbeam-channel = { workspace = true } derive_more = { workspace = true, features = ["display", "error"] } opentelemetry-appender-tracing = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["trace", "logs", "metrics", "rt-tokio"] } +pyroscope = { workspace = true } +pyroscope_pprofrs = { workspace = true } reqwest = { workspace = true } sentry = { workspace = true } simple-mermaid = { workspace = true } diff --git a/libs/@local/telemetry/src/lib.rs b/libs/@local/telemetry/src/lib.rs index 6b11cf8fd75..5a53a76f347 100644 --- a/libs/@local/telemetry/src/lib.rs +++ b/libs/@local/telemetry/src/lib.rs @@ -23,6 +23,7 @@ pub mod traces; mod otlp; use core::time::Duration; +use std::collections::HashMap; use error_stack::{Report, ResultExt as _}; use opentelemetry_sdk::{ @@ -112,9 +113,7 @@ impl TelemetryRegistry { #[must_use] pub fn with_tracing_profiler(mut self, config: ProfilerConfig) -> Self { - if config.pyroscope_endpoint.is_some() || config.folded_path.is_some() { - self.profiler = Some(config); - } + self.profiler = Some(config); self } @@ -291,9 +290,12 @@ pub fn init_tracing( .with_sentry(config.sentry) .with_otlp(config.otlp, service_name) .with_tracing_profiler(ProfilerConfig { + enable_cpu: true, + enable_wall: true, pyroscope_endpoint: config.profile.profile_endpoint, folded_path: None, service_name: service_name.to_owned(), + labels: HashMap::new(), flush_interval: Duration::from_secs(config.profile.flush_interval_seconds), }) .init_global() diff --git a/libs/@local/telemetry/src/profiling/collector.rs b/libs/@local/telemetry/src/profiling/collector.rs index df3e78f6abf..50124b35ea9 100644 --- a/libs/@local/telemetry/src/profiling/collector.rs +++ b/libs/@local/telemetry/src/profiling/collector.rs @@ -22,12 +22,20 @@ impl ProfileCollector { if let Some(endpoint) = &mut config.pyroscope_endpoint { endpoint.push_str("/ingest"); } - config.service_name = format!("{}.wall", config.service_name.replace(' ', "-")); + let tags = config + .labels + .iter() + .map(|(key, value)| format!("{key}={value}")) + .collect::>() + .join(","); + + // Format is `.{}` + config.service_name = format!("{}.wall{{{}}}", config.service_name, tags); Ok(Self { uploader: config .pyroscope_endpoint - .map(|endpoint| ProfileUploader::new(endpoint, config.service_name)), + .map(|endpoint| dbg!(ProfileUploader::new(endpoint, config.service_name))), folded_file: config .folded_path .as_ref() @@ -107,12 +115,10 @@ impl ProfileCollector { if let Some(file) = &mut self.folded_file && let Err(error) = file.write_fmt(args) { - tracing::error!(%error, "Failed to write profile data to file"); + tracing::error!(?error, "Failed to write profile data to file"); } - if let Some(buffer) = &mut self.uploader - && let Err(error) = buffer.write_fmt(args) - { - tracing::error!(%error, "Failed to write profile data to uploader"); + if let Some(buffer) = &mut self.uploader { + buffer.write_fmt(args); } } @@ -120,12 +126,12 @@ impl ProfileCollector { if let Some(file) = &mut self.folded_file && let Err(error) = file.flush() { - tracing::error!(%error, "Failed to flush profile data to file"); + tracing::error!(?error, "Failed to flush profile data to file"); } if let Some(buffer) = &mut self.uploader && let Err(error) = buffer.flush() { - tracing::error!(%error, "Failed to flush profile data to uploader"); + tracing::error!(?error, "Failed to flush profile data to uploader"); } } } diff --git a/libs/@local/telemetry/src/profiling/mod.rs b/libs/@local/telemetry/src/profiling/mod.rs index 74f64c8dbb6..cc4b63f8063 100644 --- a/libs/@local/telemetry/src/profiling/mod.rs +++ b/libs/@local/telemetry/src/profiling/mod.rs @@ -1,12 +1,14 @@ mod collector; mod uploader; -use alloc::sync::Arc; -use core::{fmt::Write as _, time::Duration}; -use std::{io, path::PathBuf, thread::JoinHandle, time::Instant}; +use alloc::{borrow::Cow, sync::Arc}; +use core::{error::Error, fmt::Write as _, time::Duration}; +use std::{collections::HashMap, path::PathBuf, thread::JoinHandle, time::Instant}; use crossbeam_channel::Sender; -use error_stack::Report; +use error_stack::{Report, ResultExt as _}; +use pyroscope::{PyroscopeAgent, pyroscope::PyroscopeAgentRunning}; +use pyroscope_pprofrs::{PprofConfig, pprof_backend}; use tracing::{Subscriber, span}; use tracing_subscriber::{ Layer, @@ -96,11 +98,24 @@ pub struct ProfilerCliConfig { pub flush_interval_seconds: u64, } +#[derive(Debug, derive_more::Display)] +pub enum ProfileConfigError { + #[display("Failed to initialize CPU profiling")] + Cpu, + #[display("Failed to create Wall profiling")] + Wall, +} + +impl Error for ProfileConfigError {} + #[derive(Debug)] pub struct ProfilerConfig { + pub enable_wall: bool, + pub enable_cpu: bool, pub pyroscope_endpoint: Option, pub folded_path: Option, pub service_name: String, + pub labels: HashMap<&'static str, Cow<'static, str>>, pub flush_interval: Duration, } @@ -110,32 +125,83 @@ impl ProfilerConfig { /// # Errors /// /// Returns an error if the profiling layer cannot be created. - pub fn build(self) -> Result<(impl Layer, impl Drop), Report> + pub fn build(mut self) -> Result<(impl Layer, impl Drop), Report> where S: Subscriber + for<'a> LookupSpan<'a>, { struct Dropper { - join: Option>, - control_tx: Sender, + wall: Option<(Sender, JoinHandle<()>)>, + cpu: Option>, } + // TODO: We probably need to switch from `std::thread` to `tokio::task` but this prevents a + // `Drop` implementation. + // see https://linear.app/hash/issue/H-5339/implement-continuous-profiling-for-the-graph impl Drop for Dropper { fn drop(&mut self) { - _ = self.control_tx.send(ControlMessage::Shutdown); - self.join.take().map(JoinHandle::join); + if let Some((control_tx, handle)) = self.wall.take() { + _ = control_tx.send(ControlMessage::Shutdown); + if handle.join().is_err() { + tracing::warn!("Failed to join profiling thread"); + } + } + if let Some(agent) = self.cpu.take() { + match agent.stop() { + Ok(agent) => agent.shutdown(), + Err(error) => tracing::warn!("Failed to stop Pyroscope agent: {error}"), + } + } } } - let (message_tx, message_rx) = crossbeam_channel::bounded(1_000); - let (control_tx, control_rx) = crossbeam_channel::unbounded(); + self.service_name = self.service_name.replace(' ', "-").to_lowercase(); + + let cpu = if self.enable_cpu + && let Some(pyroscope_endpoint) = self.pyroscope_endpoint.as_deref() + { + let agent = PyroscopeAgent::builder(pyroscope_endpoint, "hash-graph-benchmarks") + .backend(pprof_backend(PprofConfig::new().sample_rate(100))) + .func(|mut report| { + report.data.retain(|trace, _| { + trace.frames.iter().any(|frame| { + frame.name.as_deref() != Some("std::thread::Thread::unpark") + }) + }); + report + }) + .application_name(&self.service_name) + .tags( + self.labels + .iter() + .map(|(key, value)| (*key, value.as_ref())) + .collect(), + ) + .build() + .change_context(ProfileConfigError::Cpu)?; + Some(agent.start().change_context(ProfileConfigError::Cpu)?) + } else { + None + }; + + let (layer, wall) = if self.enable_wall + && (self.pyroscope_endpoint.is_some() || self.folded_path.is_some()) + { + let (message_tx, message_rx) = crossbeam_channel::bounded(1_000); + let (control_tx, control_rx) = crossbeam_channel::unbounded(); + ( + Some(ProfileLayer { message_tx }), + Some(( + control_tx, + ProfileCollector::new(self) + .change_context(ProfileConfigError::Wall)? + .run(message_rx, control_rx), + )), + ) + } else { + (None, None) + }; - Ok(( - ProfileLayer { message_tx }, - Dropper { - join: Some(ProfileCollector::new(self)?.run(message_rx, control_rx)), - control_tx, - }, - )) + Ok((layer, Dropper { wall, cpu })) } } diff --git a/libs/@local/telemetry/src/profiling/uploader.rs b/libs/@local/telemetry/src/profiling/uploader.rs index c306d4a83cb..1c77b899efe 100644 --- a/libs/@local/telemetry/src/profiling/uploader.rs +++ b/libs/@local/telemetry/src/profiling/uploader.rs @@ -1,8 +1,17 @@ -use core::mem; -use std::{ - io::{self, Write}, - time::{SystemTime, UNIX_EPOCH}, -}; +use core::{error::Error, fmt, mem}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use error_stack::{Report, ResultExt as _}; + +#[derive(Debug, derive_more::Display)] +pub(crate) enum UploaderError { + #[display("Failed to send profile data")] + Send, + #[display("Failed to create runtime")] + Runtime, +} + +impl Error for UploaderError {} #[derive(Debug)] pub(crate) struct ProfileUploader { @@ -28,15 +37,16 @@ impl ProfileUploader { runtime_handle: tokio::runtime::Handle::try_current().ok(), } } -} -impl Write for ProfileUploader { - fn write(&mut self, buf: &[u8]) -> io::Result { + pub(crate) fn write(&mut self, buf: &[u8]) { self.buffer.extend_from_slice(buf); - Ok(buf.len()) } - fn flush(&mut self) -> io::Result<()> { + pub(crate) fn write_fmt(&mut self, fmt: fmt::Arguments) { + self.write(fmt.to_string().as_bytes()); + } + + pub(crate) fn flush(&mut self) -> Result<(), Report> { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards") @@ -58,22 +68,14 @@ impl Write for ProfileUploader { .body(mem::take(&mut self.buffer)) .send() .await - .map_err(io::Error::other) + .change_context(UploaderError::Send) }; let response = if let Some(handle) = &self.runtime_handle { handle.block_on(request)? - } else if let Ok(handle) = tokio::runtime::Handle::try_current() { - handle.block_on(request)? } else { - tracing::warn!( - "No Tokio runtime found, spawning a new one for Pyroscope upload. Consider \ - initializing a Tokio runtime earlier to avoid this overhead." - ); - - // If we're not in a Tokio runtime, spawn a new one for this blocking task. - tokio::runtime::Runtime::new() - .map_err(io::Error::other)? + tokio::runtime::Handle::try_current() + .change_context(UploaderError::Runtime)? .block_on(request)? }; diff --git a/tests/graph/benches/graph/lib.rs b/tests/graph/benches/graph/lib.rs index ceab2647405..7afb66e0bb7 100644 --- a/tests/graph/benches/graph/lib.rs +++ b/tests/graph/benches/graph/lib.rs @@ -20,6 +20,7 @@ mod scenario; #[path = "../util.rs"] mod util; +use alloc::borrow::Cow; use core::time::Duration; use std::{ collections::HashMap, @@ -56,15 +57,20 @@ fn init_tracing(scenario: &str, bench: &str) -> impl Drop { "Graph Benches", ) .with_tracing_profiler(ProfilerConfig { + enable_wall: true, + enable_cpu: true, pyroscope_endpoint: std::env::var("HASH_PROFILER_ENDPOINT").ok(), folded_path: Some(Path::new("out").join(generate_path( "scenarios", Some(scenario), Some(bench), ))), - service_name: format!( - "graph-benches{{group=scenarios,function={scenario},value={bench}}}" - ), + service_name: "graph-benches".to_owned(), + labels: HashMap::from([ + ("group", Cow::Borrowed("scenarios")), + ("function", Cow::Owned(scenario.to_owned())), + ("value", Cow::Owned(bench.to_owned())), + ]), flush_interval: Duration::from_secs(10), }) .init() diff --git a/tests/graph/benches/manual_queries/entity_queries/mod.rs b/tests/graph/benches/manual_queries/entity_queries/mod.rs index 779b54d7144..bdc51c030a6 100644 --- a/tests/graph/benches/manual_queries/entity_queries/mod.rs +++ b/tests/graph/benches/manual_queries/entity_queries/mod.rs @@ -385,6 +385,7 @@ fn bench_json_queries(crit: &mut Criterion) { for (request, parameter) in request.prepare_request() { group.bench_function(BenchmarkId::new(name, ¶meter), |bencher| { + let _enter = runtime.enter(); let _guard = setup_subscriber(&group_id, Some(name), Some(¶meter)); bencher.to_async(&runtime).iter_batched( diff --git a/tests/graph/benches/policy/benchmark_matrix.rs b/tests/graph/benches/policy/benchmark_matrix.rs index 20bcb75ab08..a5f76b98228 100644 --- a/tests/graph/benches/policy/benchmark_matrix.rs +++ b/tests/graph/benches/policy/benchmark_matrix.rs @@ -255,6 +255,7 @@ fn run_benchmark_seed_group( BenchmarkId::new("resolve_policies_for_actor", &bench_id), &(&test_actor, &actions), |bencher, &(test_actor, actions)| { + let _enter = runtime.enter(); let _guard = setup_subscriber( &group_name, Some("resolve_policies_for_actor"), diff --git a/tests/graph/benches/read_scaling/knowledge/complete/entity.rs b/tests/graph/benches/read_scaling/knowledge/complete/entity.rs index c7fc075ea78..7a7f1050cb0 100644 --- a/tests/graph/benches/read_scaling/knowledge/complete/entity.rs +++ b/tests/graph/benches/read_scaling/knowledge/complete/entity.rs @@ -305,6 +305,7 @@ fn bench_scaling_read_entity_zero_depths(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &(account_id, entity_metadata_list), |bencher, (_account_id, entity_list)| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); bench_get_entity_by_id( bencher, @@ -358,6 +359,7 @@ fn bench_scaling_read_entity_one_depth(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &(account_id, entity_metadata_list), |bencher, (_account_id, entity_metadata_list)| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); bench_get_entity_by_id( bencher, diff --git a/tests/graph/benches/read_scaling/knowledge/linkless/entity.rs b/tests/graph/benches/read_scaling/knowledge/linkless/entity.rs index 094fdd919ab..4489dfb9993 100644 --- a/tests/graph/benches/read_scaling/knowledge/linkless/entity.rs +++ b/tests/graph/benches/read_scaling/knowledge/linkless/entity.rs @@ -236,6 +236,7 @@ fn bench_scaling_read_entity(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &(account_id, entity_uuids), |bencher, (_account_id, entity_list)| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); bench_get_entity_by_id(bencher, &runtime, store, account_id, entity_list); }, diff --git a/tests/graph/benches/representative_read/lib.rs b/tests/graph/benches/representative_read/lib.rs index fb5969bc47a..28fb1e98690 100644 --- a/tests/graph/benches/representative_read/lib.rs +++ b/tests/graph/benches/representative_read/lib.rs @@ -81,6 +81,7 @@ fn bench_representative_read_entity(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &(account_id, entity_type_id, entity_uuids), |bencher, (_account_id, _entity_type_id, entity_uuids)| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); knowledge::entity::bench_get_entity_by_id( bencher, @@ -286,7 +287,7 @@ fn bench_representative_read_multiple_entities(crit: &mut Criterion) { for graph_resolve_depth in graph_resolve_depths { let function_id = "entity_by_property"; let parameter = format!( - "depths: DT={}, PT={}, ET={}, E={}", + "depths: DT={} PT={} ET={} E={}", [ graph_resolve_depth.constrains_values_on.incoming, graph_resolve_depth.constrains_values_on.outgoing, @@ -325,6 +326,7 @@ fn bench_representative_read_multiple_entities(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &graph_resolve_depth, |bencher, graph_resolve_depth| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); knowledge::entity::bench_get_entities_by_property( bencher, @@ -340,7 +342,7 @@ fn bench_representative_read_multiple_entities(crit: &mut Criterion) { for graph_resolve_depth in graph_resolve_depths { let function_id = "link_by_source_by_property"; let parameter = format!( - "depths: DT={}, PT={}, ET={}, E={}", + "depths: DT={} PT={} ET={} E={}", [ graph_resolve_depth.constrains_values_on.incoming, graph_resolve_depth.constrains_values_on.outgoing, @@ -379,6 +381,7 @@ fn bench_representative_read_multiple_entities(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &graph_resolve_depth, |bencher, graph_resolve_depth| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); knowledge::entity::bench_get_link_by_target_by_property( bencher, @@ -414,6 +417,7 @@ fn bench_representative_read_entity_type(crit: &mut Criterion) { BenchmarkId::new(function_id, ¶meter), &(account_id, entity_type_ids), |bencher, (_account_id, entity_type_ids)| { + let _enter = runtime.enter(); let _guard = setup_subscriber(group_id, Some(function_id), Some(¶meter)); ontology::entity_type::bench_get_entity_type_by_id( bencher, diff --git a/tests/graph/benches/util.rs b/tests/graph/benches/util.rs index 4fe122ff7bc..7851561a8aa 100644 --- a/tests/graph/benches/util.rs +++ b/tests/graph/benches/util.rs @@ -1,5 +1,8 @@ #![allow(dead_code)] +extern crate alloc; + +use alloc::borrow::Cow; use core::{mem::ManuallyDrop, time::Duration}; use std::{collections::HashMap, path::Path}; @@ -19,12 +22,16 @@ use hash_graph_store::{ query::ConflictBehavior, }; use hash_repo_chores::benches::generate_path; -use hash_telemetry::{TelemetryRegistry, profiling::ProfilerConfig}; +use hash_telemetry::{ + TelemetryRegistry, + logging::{ColorOption, ConsoleConfig, ConsoleStream, LogFormat}, + profiling::ProfilerConfig, +}; use regex::Regex; use time::OffsetDateTime; use tokio::runtime::Runtime; use tokio_postgres::NoTls; -use tracing::Instrument as _; +use tracing::{Instrument as _, Level}; use type_system::{ ontology::{ data_type::DataType, @@ -59,24 +66,26 @@ pub fn setup_subscriber( function_id: Option<&str>, value_str: Option<&str>, ) -> impl Drop { - let labels = match (function_id, value_str) { - (Some(function_id), Some(value_str)) => { - format!(",function={function_id},value={value_str}") - } - (Some(function_id), None) => format!(",function={function_id}"), - (None, Some(value_str)) => format!(",value={value_str}"), - (None, None) => String::new(), - }; + let mut labels = HashMap::from([("group", Cow::Owned(group_id.to_owned()))]); + if let Some(function_id) = function_id { + labels.insert("function", Cow::Owned(function_id.to_owned())); + } + if let Some(value_str) = value_str { + labels.insert("value", Cow::Owned(value_str.to_owned())); + } TelemetryRegistry::default() .with_tracing_profiler(ProfilerConfig { + enable_wall: true, + enable_cpu: true, pyroscope_endpoint: std::env::var("HASH_PROFILER_ENDPOINT").ok(), folded_path: Some(Path::new("out").join(generate_path( group_id, function_id, value_str, ))), - service_name: format!("graph-benches{{group={group_id}{labels}}}"), + service_name: "graph-benches".to_owned(), + labels, flush_interval: Duration::from_secs(10), }) .init() From 00ff4c724ff338693009fc7ab6bc1da0571e2694 Mon Sep 17 00:00:00 2001 From: Tim Diekmann Date: Wed, 10 Sep 2025 13:28:35 +0200 Subject: [PATCH 4/4] Fix linting --- libs/@local/telemetry/src/profiling/collector.rs | 2 +- tests/graph/benches/util.rs | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/libs/@local/telemetry/src/profiling/collector.rs b/libs/@local/telemetry/src/profiling/collector.rs index 50124b35ea9..131bc924365 100644 --- a/libs/@local/telemetry/src/profiling/collector.rs +++ b/libs/@local/telemetry/src/profiling/collector.rs @@ -35,7 +35,7 @@ impl ProfileCollector { Ok(Self { uploader: config .pyroscope_endpoint - .map(|endpoint| dbg!(ProfileUploader::new(endpoint, config.service_name))), + .map(|endpoint| ProfileUploader::new(endpoint, config.service_name)), folded_file: config .folded_path .as_ref() diff --git a/tests/graph/benches/util.rs b/tests/graph/benches/util.rs index 7851561a8aa..0bffb8d22d8 100644 --- a/tests/graph/benches/util.rs +++ b/tests/graph/benches/util.rs @@ -22,16 +22,12 @@ use hash_graph_store::{ query::ConflictBehavior, }; use hash_repo_chores::benches::generate_path; -use hash_telemetry::{ - TelemetryRegistry, - logging::{ColorOption, ConsoleConfig, ConsoleStream, LogFormat}, - profiling::ProfilerConfig, -}; +use hash_telemetry::{TelemetryRegistry, profiling::ProfilerConfig}; use regex::Regex; use time::OffsetDateTime; use tokio::runtime::Runtime; use tokio_postgres::NoTls; -use tracing::{Instrument as _, Level}; +use tracing::Instrument as _; use type_system::{ ontology::{ data_type::DataType,