diff --git a/Cargo.lock b/Cargo.lock index 482de563f8..e803271936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17398,6 +17398,7 @@ dependencies = [ "ractor", "ractor-supervisor", "rodio", + "sentry", "serde", "serde_json", "specta", diff --git a/apps/api/src/listen.ts b/apps/api/src/listen.ts index cf6b3899db..f71292ddf4 100644 --- a/apps/api/src/listen.ts +++ b/apps/api/src/listen.ts @@ -16,87 +16,124 @@ export const listenSocketHandler: Handler = async (c, next) => { const clientUrl = new URL(c.req.url, "http://localhost"); const provider = clientUrl.searchParams.get("provider") ?? "deepgram"; - let connection: WsProxyConnection; - try { - connection = createProxyFromRequest(clientUrl, c.req.raw.headers); - await connection.preconnectUpstream(); - emit({ type: "stt.websocket.connected", userId, provider }); - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : "upstream_connect_failed"; - console.error("[listen] preconnect failed:", { - provider, - error: errorMessage, - stack: error instanceof Error ? error.stack : undefined, - }); - Sentry.captureException(error, { - tags: { - operation: "stt_preconnect", - provider, - }, - extra: { - errorMessage, - userId, - }, - }); - emit({ - type: "stt.websocket.error", - userId, - provider, - error: - error instanceof Error ? error : new Error("upstream_connect_failed"), - }); - const status = errorMessage === "upstream_connect_timeout" ? 504 : 502; - return c.json( - { error: "upstream_connect_failed", detail: errorMessage }, - status, - ); - } - - const connectionStartTime = performance.now(); + const sentryTrace = c.req.header("sentry-trace"); + const baggage = c.req.header("baggage"); - const handler = upgradeWebSocket(() => { - return { - onOpen(_event, ws) { - connection.initializeUpstream(ws.raw); - }, - async onMessage(event) { - const payload = await normalizeWsData(event.data); - if (!payload) { - return; + return Sentry.continueTrace({ sentryTrace, baggage }, () => { + return Sentry.startSpan( + { name: `WebSocket /listen ${provider}`, op: "websocket.server" }, + async () => { + let connection: WsProxyConnection; + try { + connection = createProxyFromRequest(clientUrl, c.req.raw.headers); + await connection.preconnectUpstream(); + emit({ type: "stt.websocket.connected", userId, provider }); + Sentry.addBreadcrumb({ + category: "websocket", + message: "Upstream STT service connected", + level: "info", + data: { provider }, + }); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : "upstream_connect_failed"; + console.error("[listen] preconnect failed:", { + provider, + error: errorMessage, + stack: error instanceof Error ? error.stack : undefined, + }); + Sentry.addBreadcrumb({ + category: "websocket", + message: "Upstream connection failed", + level: "error", + data: { provider, error: String(error) }, + }); + Sentry.captureException(error, { + tags: { + operation: "stt_preconnect", + provider, + }, + extra: { + errorMessage, + userId, + }, + }); + emit({ + type: "stt.websocket.error", + userId, + provider, + error: + error instanceof Error + ? error + : new Error("upstream_connect_failed"), + }); + const status = + errorMessage === "upstream_connect_timeout" ? 504 : 502; + return c.json( + { error: "upstream_connect_failed", detail: errorMessage }, + status, + ); } - await connection.sendToUpstream(payload); - }, - onClose(event) { - const code = event?.code ?? 1000; - const reason = event?.reason || "client_closed"; - connection.closeConnections(code, reason); - emit({ - type: "stt.websocket.disconnected", - userId, - provider, - durationMs: performance.now() - connectionStartTime, - }); - }, - onError(event) { - emit({ - type: "stt.websocket.error", - userId, - provider, - error: - event instanceof Error - ? event - : new Error("websocket_client_error"), + + const connectionStartTime = performance.now(); + + const handler = upgradeWebSocket(() => { + return { + onOpen(_event, ws) { + connection.initializeUpstream(ws.raw); + }, + async onMessage(event) { + const payload = await normalizeWsData(event.data); + if (!payload) { + return; + } + await connection.sendToUpstream(payload); + }, + onClose(event) { + const code = event?.code ?? 1000; + const reason = event?.reason || "client_closed"; + connection.closeConnections(code, reason); + Sentry.addBreadcrumb({ + category: "websocket", + message: "Client WebSocket closed", + level: "info", + data: { provider, code, reason }, + }); + emit({ + type: "stt.websocket.disconnected", + userId, + provider, + durationMs: performance.now() - connectionStartTime, + }); + }, + onError(event) { + Sentry.addBreadcrumb({ + category: "websocket", + message: "Client WebSocket error", + level: "error", + data: { provider }, + }); + emit({ + type: "stt.websocket.error", + userId, + provider, + error: + event instanceof Error + ? event + : new Error("websocket_client_error"), + }); + connection.closeConnections(1011, "client_error"); + }, + }; }); - connection.closeConnections(1011, "client_error"); + + const response = await handler(c, next); + if (!response) { + connection.closeConnections(); + return c.json({ error: "upgrade_failed" }, 400); + } + return response; }, - }; + ); }); - - const response = await handler(c, next); - if (!response) { - connection.closeConnections(); - return c.json({ error: "upgrade_failed" }, 400); - } - return response; }; diff --git a/crates/owhisper-client/src/lib.rs b/crates/owhisper-client/src/lib.rs index c5330516d9..9b0ba2a9bc 100644 --- a/crates/owhisper-client/src/lib.rs +++ b/crates/owhisper-client/src/lib.rs @@ -27,15 +27,23 @@ pub struct ListenClientBuilder { api_base: Option, api_key: Option, params: Option, + trace_headers: Option, _marker: PhantomData, } +#[derive(Clone, Default)] +pub struct TraceHeaders { + pub sentry_trace: Option, + pub baggage: Option, +} + impl Default for ListenClientBuilder { fn default() -> Self { Self { api_base: None, api_key: None, params: None, + trace_headers: None, _marker: PhantomData, } } @@ -57,11 +65,17 @@ impl ListenClientBuilder { self } + pub fn trace_headers(mut self, headers: TraceHeaders) -> Self { + self.trace_headers = Some(headers); + self + } + pub fn adapter(self) -> ListenClientBuilder { ListenClientBuilder { api_base: self.api_base, api_key: self.api_key, params: self.params, + trace_headers: self.trace_headers, _marker: PhantomData, } } @@ -100,6 +114,15 @@ impl ListenClientBuilder { request = request.with_header(header_name, header_value); } + if let Some(ref trace_headers) = self.trace_headers { + if let Some(ref sentry_trace) = trace_headers.sentry_trace { + request = request.with_header("sentry-trace", sentry_trace.clone()); + } + if let Some(ref baggage) = trace_headers.baggage { + request = request.with_header("baggage", baggage.clone()); + } + } + request } diff --git a/plugins/listener/Cargo.toml b/plugins/listener/Cargo.toml index 4927ad3385..16cbe0378a 100644 --- a/plugins/listener/Cargo.toml +++ b/plugins/listener/Cargo.toml @@ -65,3 +65,5 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio-stream = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } + +sentry = { workspace = true } diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index 6842dec621..ebc4968fb6 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -7,6 +7,7 @@ use tokio::time::error::Elapsed; use owhisper_client::{ AdapterKind, ArgmaxAdapter, AssemblyAIAdapter, DeepgramAdapter, FinalizeHandle, FireworksAdapter, GladiaAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter, + TraceHeaders, }; use owhisper_interface::stream::{Extra, StreamResponse}; use owhisper_interface::{ControlMessage, MixedMessage}; @@ -306,6 +307,46 @@ fn build_extra(args: &ListenerArgs) -> (f64, Extra) { (session_offset_secs, extra) } +fn build_trace_headers(args: &ListenerArgs, provider_name: &str) -> TraceHeaders { + let mut trace_headers = TraceHeaders::default(); + + sentry::with_scope( + |scope| { + scope.set_tag("session_id", &args.session_id); + scope.set_tag("stt_provider", provider_name); + scope.set_tag("channel_mode", format!("{:?}", args.mode)); + scope.set_tag("model", &args.model); + scope.set_tag( + "languages", + args.languages + .iter() + .map(|l| l.iso639().code()) + .collect::>() + .join(","), + ); + scope.set_tag("onboarding", args.onboarding.to_string()); + }, + || { + sentry::configure_scope(|scope| { + if let Some(span) = scope.get_span() { + trace_headers.sentry_trace = span + .iter_headers() + .find(|(k, _)| *k == "sentry-trace") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); + trace_headers.baggage = span + .iter_headers() + .find(|(k, _)| *k == "baggage") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); + } + }); + }, + ); + + trace_headers +} + async fn spawn_rx_task_single_with_adapter( args: ListenerArgs, myself: ActorRef, @@ -322,11 +363,23 @@ async fn spawn_rx_task_single_with_adapter( let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let adapter = A::default(); + let trace_headers = build_trace_headers(&args, adapter.provider_name()); + + tracing::info!( + session_id = %args.session_id, + channel_mode = ?args.mode, + model = %args.model, + provider = adapter.provider_name(), + "listener_actor_starting(single)" + ); + let client = owhisper_client::ListenClient::builder() .adapter::() .api_base(args.base_url.clone()) .api_key(args.api_key.clone()) .params(build_listen_params(&args)) + .trace_headers(trace_headers) .build_single() .await; @@ -339,6 +392,7 @@ async fn spawn_rx_task_single_with_adapter( Err(_elapsed) => { tracing::error!( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), + session_id = %args.session_id, "listen_ws_connect_timeout(single)" ); let _ = (SessionErrorEvent::ConnectionError { @@ -350,7 +404,7 @@ async fn spawn_rx_task_single_with_adapter( return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { - tracing::error!(error = ?e, "listen_ws_connect_failed(single)"); + tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(single)"); let _ = (SessionErrorEvent::ConnectionError { session_id: args.session_id.clone(), error: format!("listen_ws_connect_failed: {:?}", e), @@ -394,11 +448,23 @@ async fn spawn_rx_task_dual_with_adapter( let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let adapter = A::default(); + let trace_headers = build_trace_headers(&args, adapter.provider_name()); + + tracing::info!( + session_id = %args.session_id, + channel_mode = ?args.mode, + model = %args.model, + provider = adapter.provider_name(), + "listener_actor_starting(dual)" + ); + let client = owhisper_client::ListenClient::builder() .adapter::() .api_base(args.base_url.clone()) .api_key(args.api_key.clone()) .params(build_listen_params(&args)) + .trace_headers(trace_headers) .build_dual() .await; @@ -411,6 +477,7 @@ async fn spawn_rx_task_dual_with_adapter( Err(_elapsed) => { tracing::error!( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), + session_id = %args.session_id, "listen_ws_connect_timeout(dual)" ); let _ = (SessionErrorEvent::ConnectionError { @@ -422,7 +489,7 @@ async fn spawn_rx_task_dual_with_adapter( return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { - tracing::error!(error = ?e, "listen_ws_connect_failed(dual)"); + tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(dual)"); let _ = (SessionErrorEvent::ConnectionError { session_id: args.session_id.clone(), error: format!("listen_ws_connect_failed: {:?}", e),