From 0944e233bb0d8e2db8d21f5dc88f0a715ec991f2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 10:44:25 +0000 Subject: [PATCH 1/3] feat: add Sentry trace propagation from Rust to API - Add TraceHeaders struct to owhisper-client for passing sentry-trace and baggage headers - Update ListenClientBuilder to support trace_headers() method - Inject trace headers into WebSocket upgrade requests - Add build_trace_headers() helper in listener plugin to extract Sentry context - Enrich Sentry scope with domain tags (session_id, stt_provider, channel_mode, model, languages, onboarding) - Add structured tracing logs with domain context at listener actor startup - Update apps/api/src/listen.ts to continue trace from headers using Sentry.continueTrace() - Add breadcrumbs at key WebSocket lifecycle points (connect, open, close, error) Co-Authored-By: yujonglee --- Cargo.lock | 1 + apps/api/src/listen.ts | 147 ++++++++++++++++-------- owhisper/owhisper-client/src/lib.rs | 23 ++++ plugins/listener/Cargo.toml | 2 + plugins/listener/src/actors/listener.rs | 67 ++++++++++- 5 files changed, 188 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6dab1900a0..23c1fa44e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15933,6 +15933,7 @@ dependencies = [ "ractor", "ractor-supervisor", "rodio", + "sentry", "serde", "serde_json", "specta", diff --git a/apps/api/src/listen.ts b/apps/api/src/listen.ts index e9a85cfd2d..4290734e0e 100644 --- a/apps/api/src/listen.ts +++ b/apps/api/src/listen.ts @@ -13,58 +13,107 @@ export const listenSocketHandler: Handler = async (c, next) => { const clientUrl = new URL(c.req.url, "http://localhost"); const provider = clientUrl.searchParams.get("provider") ?? "deepgram"; - let connection: WsProxyConnection; - try { - connection = createProxyFromRequest(clientUrl, c.req.raw.headers); - await connection.preconnectUpstream(); - Metrics.websocketConnected(provider); - } catch (error) { - Sentry.captureException(error, { - tags: { provider, operation: "upstream_connect" }, - }); - const detail = - error instanceof Error ? error.message : "upstream_connect_failed"; - const status = detail === "upstream_connect_timeout" ? 504 : 502; - return c.json({ error: "upstream_connect_failed", detail }, status); - } + const sentryTrace = c.req.header("sentry-trace"); + const baggage = c.req.header("baggage"); - const connectionStartTime = performance.now(); + return Sentry.continueTrace({ sentryTrace, baggage }, () => { + return Sentry.startSpan( + { name: `WebSocket /listen ${provider}`, op: "websocket.server" }, + async () => { + Sentry.addBreadcrumb({ + category: "websocket", + message: `Starting WebSocket connection for provider: ${provider}`, + level: "info", + data: { provider }, + }); - const handler = upgradeWebSocket(() => { - return { - onOpen(_event, ws) { - connection.initializeUpstream(ws.raw); - }, - async onMessage(event) { - const payload = await normalizeWsData(event.data); - if (!payload) { - return; + let connection: WsProxyConnection; + try { + connection = createProxyFromRequest(clientUrl, c.req.raw.headers); + await connection.preconnectUpstream(); + Metrics.websocketConnected(provider); + Sentry.addBreadcrumb({ + category: "websocket", + message: "Upstream STT service connected", + level: "info", + data: { provider }, + }); + } catch (error) { + Sentry.addBreadcrumb({ + category: "websocket", + message: "Upstream connection failed", + level: "error", + data: { provider, error: String(error) }, + }); + Sentry.captureException(error, { + tags: { provider, operation: "upstream_connect" }, + }); + const detail = + error instanceof Error ? error.message : "upstream_connect_failed"; + const status = detail === "upstream_connect_timeout" ? 504 : 502; + return c.json({ error: "upstream_connect_failed", detail }, status); } - await connection.sendToUpstream(payload); - }, - onClose(event) { - const code = event?.code ?? 1000; - const reason = event?.reason || "client_closed"; - connection.closeConnections(code, reason); - Metrics.websocketDisconnected( - provider, - performance.now() - connectionStartTime, - ); - }, - onError(event) { - Sentry.captureException( - event instanceof Error ? event : new Error("websocket_client_error"), - { tags: { provider, operation: "websocket" } }, - ); - connection.closeConnections(1011, "client_error"); + + const connectionStartTime = performance.now(); + + const handler = upgradeWebSocket(() => { + return { + onOpen(_event, ws) { + connection.initializeUpstream(ws.raw); + Sentry.addBreadcrumb({ + category: "websocket", + message: "Client WebSocket opened", + level: "info", + data: { provider }, + }); + }, + async onMessage(event) { + const payload = await normalizeWsData(event.data); + if (!payload) { + return; + } + await connection.sendToUpstream(payload); + }, + onClose(event) { + const code = event?.code ?? 1000; + const reason = event?.reason || "client_closed"; + connection.closeConnections(code, reason); + Sentry.addBreadcrumb({ + category: "websocket", + message: "Client WebSocket closed", + level: "info", + data: { provider, code, reason }, + }); + Metrics.websocketDisconnected( + provider, + performance.now() - connectionStartTime, + ); + }, + onError(event) { + Sentry.addBreadcrumb({ + category: "websocket", + message: "Client WebSocket error", + level: "error", + data: { provider }, + }); + Sentry.captureException( + event instanceof Error + ? event + : new Error("websocket_client_error"), + { tags: { provider, operation: "websocket" } }, + ); + connection.closeConnections(1011, "client_error"); + }, + }; + }); + + const response = await handler(c, next); + if (!response) { + connection.closeConnections(); + return c.json({ error: "upgrade_failed" }, 400); + } + return response; }, - }; + ); }); - - const response = await handler(c, next); - if (!response) { - connection.closeConnections(); - return c.json({ error: "upgrade_failed" }, 400); - } - return response; }; diff --git a/owhisper/owhisper-client/src/lib.rs b/owhisper/owhisper-client/src/lib.rs index 9cda052329..164bdb2b7c 100644 --- a/owhisper/owhisper-client/src/lib.rs +++ b/owhisper/owhisper-client/src/lib.rs @@ -23,15 +23,23 @@ pub struct ListenClientBuilder { api_base: Option, api_key: Option, params: Option, + trace_headers: Option, _marker: PhantomData, } +#[derive(Clone, Default)] +pub struct TraceHeaders { + pub sentry_trace: Option, + pub baggage: Option, +} + impl Default for ListenClientBuilder { fn default() -> Self { Self { api_base: None, api_key: None, params: None, + trace_headers: None, _marker: PhantomData, } } @@ -53,11 +61,17 @@ impl ListenClientBuilder { self } + pub fn trace_headers(mut self, headers: TraceHeaders) -> Self { + self.trace_headers = Some(headers); + self + } + pub fn adapter(self) -> ListenClientBuilder { ListenClientBuilder { api_base: self.api_base, api_key: self.api_key, params: self.params, + trace_headers: self.trace_headers, _marker: PhantomData, } } @@ -86,6 +100,15 @@ impl ListenClientBuilder { request = request.with_header(header_name, header_value); } + if let Some(ref trace_headers) = self.trace_headers { + if let Some(ref sentry_trace) = trace_headers.sentry_trace { + request = request.with_header("sentry-trace", sentry_trace.clone()); + } + if let Some(ref baggage) = trace_headers.baggage { + request = request.with_header("baggage", baggage.clone()); + } + } + request } diff --git a/plugins/listener/Cargo.toml b/plugins/listener/Cargo.toml index f393347b91..0684fc62ed 100644 --- a/plugins/listener/Cargo.toml +++ b/plugins/listener/Cargo.toml @@ -63,3 +63,5 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio-stream = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } + +sentry = { workspace = true } diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index fd50f92b07..c9d121bc1c 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -6,7 +6,7 @@ use tokio::time::error::Elapsed; use owhisper_client::{ AdapterKind, ArgmaxAdapter, AssemblyAIAdapter, DeepgramAdapter, FinalizeHandle, - FireworksAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter, + FireworksAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter, TraceHeaders, }; use owhisper_interface::stream::{Extra, StreamResponse}; use owhisper_interface::{ControlMessage, MixedMessage}; @@ -270,6 +270,41 @@ fn build_extra(args: &ListenerArgs) -> (f64, Extra) { (session_offset_secs, extra) } +fn build_trace_headers(args: &ListenerArgs, provider_name: &str) -> TraceHeaders { + let mut trace_headers = TraceHeaders::default(); + + sentry::configure_scope(|scope| { + scope.set_tag("session_id", &args.session_id); + scope.set_tag("stt_provider", provider_name); + scope.set_tag("channel_mode", format!("{:?}", args.mode)); + scope.set_tag("model", &args.model); + scope.set_tag( + "languages", + args.languages + .iter() + .map(|l| l.iso639().code()) + .collect::>() + .join(","), + ); + scope.set_tag("onboarding", args.onboarding.to_string()); + + if let Some(span) = scope.get_span() { + trace_headers.sentry_trace = Some( + span.iter_headers() + .find(|(k, _)| *k == "sentry-trace") + .map(|(_, v)| v.to_string()) + .unwrap_or_default(), + ); + trace_headers.baggage = span + .iter_headers() + .find(|(k, _)| *k == "baggage") + .map(|(_, v)| v.to_string()); + } + }); + + trace_headers +} + async fn spawn_rx_task_single_with_adapter( args: ListenerArgs, myself: ActorRef, @@ -286,11 +321,23 @@ async fn spawn_rx_task_single_with_adapter( let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let adapter = A::default(); + let trace_headers = build_trace_headers(&args, adapter.provider_name()); + + tracing::info!( + session_id = %args.session_id, + channel_mode = ?args.mode, + model = %args.model, + provider = adapter.provider_name(), + "listener_actor_starting(single)" + ); + let client = owhisper_client::ListenClient::builder() .adapter::() .api_base(args.base_url.clone()) .api_key(args.api_key.clone()) .params(build_listen_params(&args)) + .trace_headers(trace_headers) .build_single(); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -302,12 +349,13 @@ async fn spawn_rx_task_single_with_adapter( Err(_elapsed) => { tracing::error!( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), + session_id = %args.session_id, "listen_ws_connect_timeout(single)" ); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { - tracing::error!(error = ?e, "listen_ws_connect_failed(single)"); + tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(single)"); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, @@ -345,11 +393,23 @@ async fn spawn_rx_task_dual_with_adapter( let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let adapter = A::default(); + let trace_headers = build_trace_headers(&args, adapter.provider_name()); + + tracing::info!( + session_id = %args.session_id, + channel_mode = ?args.mode, + model = %args.model, + provider = adapter.provider_name(), + "listener_actor_starting(dual)" + ); + let client = owhisper_client::ListenClient::builder() .adapter::() .api_base(args.base_url.clone()) .api_key(args.api_key.clone()) .params(build_listen_params(&args)) + .trace_headers(trace_headers) .build_dual(); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -361,12 +421,13 @@ async fn spawn_rx_task_dual_with_adapter( Err(_elapsed) => { tracing::error!( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), + session_id = %args.session_id, "listen_ws_connect_timeout(dual)" ); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { - tracing::error!(error = ?e, "listen_ws_connect_failed(dual)"); + tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(dual)"); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, From 2e9f8dd373c348e9e9650d6bec4bc7c3f11c0981 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 10:54:26 +0000 Subject: [PATCH 2/3] fix: filter out empty trace headers instead of sending empty strings - Changed sentry_trace extraction to use .filter(|s| !s.is_empty()) instead of .unwrap_or_default() - Added same filter to baggage header for consistency - This ensures None is returned when headers are absent or empty, rather than Some("") Co-Authored-By: yujonglee --- plugins/listener/src/actors/listener.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index c9d121bc1c..6922beaa3c 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -289,16 +289,16 @@ fn build_trace_headers(args: &ListenerArgs, provider_name: &str) -> TraceHeaders scope.set_tag("onboarding", args.onboarding.to_string()); if let Some(span) = scope.get_span() { - trace_headers.sentry_trace = Some( - span.iter_headers() - .find(|(k, _)| *k == "sentry-trace") - .map(|(_, v)| v.to_string()) - .unwrap_or_default(), - ); + trace_headers.sentry_trace = span + .iter_headers() + .find(|(k, _)| *k == "sentry-trace") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); trace_headers.baggage = span .iter_headers() .find(|(k, _)| *k == "baggage") - .map(|(_, v)| v.to_string()); + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); } }); From fcc1a242e2330013f9111852b8de9db6f0162848 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 11:44:12 +0000 Subject: [PATCH 3/3] fix: remove excessive breadcrumbs and fix Sentry scope leaking - Remove non-error breadcrumbs (onOpen, onClose, starting, upstream connected) to reduce noise - Use sentry::with_scope() instead of configure_scope() to prevent tags from leaking to unrelated events Co-Authored-By: yujonglee --- apps/api/src/listen.ts | 25 ---------- plugins/listener/src/actors/listener.rs | 61 +++++++++++++------------ 2 files changed, 33 insertions(+), 53 deletions(-) diff --git a/apps/api/src/listen.ts b/apps/api/src/listen.ts index 4290734e0e..921c5a809d 100644 --- a/apps/api/src/listen.ts +++ b/apps/api/src/listen.ts @@ -20,24 +20,11 @@ export const listenSocketHandler: Handler = async (c, next) => { return Sentry.startSpan( { name: `WebSocket /listen ${provider}`, op: "websocket.server" }, async () => { - Sentry.addBreadcrumb({ - category: "websocket", - message: `Starting WebSocket connection for provider: ${provider}`, - level: "info", - data: { provider }, - }); - let connection: WsProxyConnection; try { connection = createProxyFromRequest(clientUrl, c.req.raw.headers); await connection.preconnectUpstream(); Metrics.websocketConnected(provider); - Sentry.addBreadcrumb({ - category: "websocket", - message: "Upstream STT service connected", - level: "info", - data: { provider }, - }); } catch (error) { Sentry.addBreadcrumb({ category: "websocket", @@ -60,12 +47,6 @@ export const listenSocketHandler: Handler = async (c, next) => { return { onOpen(_event, ws) { connection.initializeUpstream(ws.raw); - Sentry.addBreadcrumb({ - category: "websocket", - message: "Client WebSocket opened", - level: "info", - data: { provider }, - }); }, async onMessage(event) { const payload = await normalizeWsData(event.data); @@ -78,12 +59,6 @@ export const listenSocketHandler: Handler = async (c, next) => { const code = event?.code ?? 1000; const reason = event?.reason || "client_closed"; connection.closeConnections(code, reason); - Sentry.addBreadcrumb({ - category: "websocket", - message: "Client WebSocket closed", - level: "info", - data: { provider, code, reason }, - }); Metrics.websocketDisconnected( provider, performance.now() - connectionStartTime, diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index 6922beaa3c..e6439fc2d0 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -273,34 +273,39 @@ fn build_extra(args: &ListenerArgs) -> (f64, Extra) { fn build_trace_headers(args: &ListenerArgs, provider_name: &str) -> TraceHeaders { let mut trace_headers = TraceHeaders::default(); - sentry::configure_scope(|scope| { - scope.set_tag("session_id", &args.session_id); - scope.set_tag("stt_provider", provider_name); - scope.set_tag("channel_mode", format!("{:?}", args.mode)); - scope.set_tag("model", &args.model); - scope.set_tag( - "languages", - args.languages - .iter() - .map(|l| l.iso639().code()) - .collect::>() - .join(","), - ); - scope.set_tag("onboarding", args.onboarding.to_string()); - - if let Some(span) = scope.get_span() { - trace_headers.sentry_trace = span - .iter_headers() - .find(|(k, _)| *k == "sentry-trace") - .map(|(_, v)| v.to_string()) - .filter(|s| !s.is_empty()); - trace_headers.baggage = span - .iter_headers() - .find(|(k, _)| *k == "baggage") - .map(|(_, v)| v.to_string()) - .filter(|s| !s.is_empty()); - } - }); + sentry::with_scope( + |scope| { + scope.set_tag("session_id", &args.session_id); + scope.set_tag("stt_provider", provider_name); + scope.set_tag("channel_mode", format!("{:?}", args.mode)); + scope.set_tag("model", &args.model); + scope.set_tag( + "languages", + args.languages + .iter() + .map(|l| l.iso639().code()) + .collect::>() + .join(","), + ); + scope.set_tag("onboarding", args.onboarding.to_string()); + }, + || { + sentry::configure_scope(|scope| { + if let Some(span) = scope.get_span() { + trace_headers.sentry_trace = span + .iter_headers() + .find(|(k, _)| *k == "sentry-trace") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); + trace_headers.baggage = span + .iter_headers() + .find(|(k, _)| *k == "baggage") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); + } + }); + }, + ); trace_headers }