Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

193 changes: 115 additions & 78 deletions apps/api/src/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,87 +16,124 @@ export const listenSocketHandler: Handler<AppBindings> = async (c, next) => {
const clientUrl = new URL(c.req.url, "http://localhost");
const provider = clientUrl.searchParams.get("provider") ?? "deepgram";

let connection: WsProxyConnection;
try {
connection = createProxyFromRequest(clientUrl, c.req.raw.headers);
await connection.preconnectUpstream();
emit({ type: "stt.websocket.connected", userId, provider });
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : "upstream_connect_failed";
console.error("[listen] preconnect failed:", {
provider,
error: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
});
Sentry.captureException(error, {
tags: {
operation: "stt_preconnect",
provider,
},
extra: {
errorMessage,
userId,
},
});
emit({
type: "stt.websocket.error",
userId,
provider,
error:
error instanceof Error ? error : new Error("upstream_connect_failed"),
});
const status = errorMessage === "upstream_connect_timeout" ? 504 : 502;
return c.json(
{ error: "upstream_connect_failed", detail: errorMessage },
status,
);
}

const connectionStartTime = performance.now();
const sentryTrace = c.req.header("sentry-trace");
const baggage = c.req.header("baggage");

const handler = upgradeWebSocket(() => {
return {
onOpen(_event, ws) {
connection.initializeUpstream(ws.raw);
},
async onMessage(event) {
const payload = await normalizeWsData(event.data);
if (!payload) {
return;
return Sentry.continueTrace({ sentryTrace, baggage }, () => {
return Sentry.startSpan(
{ name: `WebSocket /listen ${provider}`, op: "websocket.server" },
async () => {
let connection: WsProxyConnection;
try {
connection = createProxyFromRequest(clientUrl, c.req.raw.headers);
await connection.preconnectUpstream();
emit({ type: "stt.websocket.connected", userId, provider });
Sentry.addBreadcrumb({
category: "websocket",
message: "Upstream STT service connected",
level: "info",
data: { provider },
});
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : "upstream_connect_failed";
console.error("[listen] preconnect failed:", {
provider,
error: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
});
Sentry.addBreadcrumb({
category: "websocket",
message: "Upstream connection failed",
level: "error",
data: { provider, error: String(error) },
});
Sentry.captureException(error, {
tags: {
operation: "stt_preconnect",
provider,
},
extra: {
errorMessage,
userId,
},
});
emit({
type: "stt.websocket.error",
userId,
provider,
error:
error instanceof Error
? error
: new Error("upstream_connect_failed"),
});
const status =
errorMessage === "upstream_connect_timeout" ? 504 : 502;
return c.json(
{ error: "upstream_connect_failed", detail: errorMessage },
status,
);
}
await connection.sendToUpstream(payload);
},
onClose(event) {
const code = event?.code ?? 1000;
const reason = event?.reason || "client_closed";
connection.closeConnections(code, reason);
emit({
type: "stt.websocket.disconnected",
userId,
provider,
durationMs: performance.now() - connectionStartTime,
});
},
onError(event) {
emit({
type: "stt.websocket.error",
userId,
provider,
error:
event instanceof Error
? event
: new Error("websocket_client_error"),

const connectionStartTime = performance.now();

const handler = upgradeWebSocket(() => {
return {
onOpen(_event, ws) {
connection.initializeUpstream(ws.raw);
},
async onMessage(event) {
const payload = await normalizeWsData(event.data);
if (!payload) {
return;
}
await connection.sendToUpstream(payload);
},
onClose(event) {
const code = event?.code ?? 1000;
const reason = event?.reason || "client_closed";
connection.closeConnections(code, reason);
Sentry.addBreadcrumb({
category: "websocket",
message: "Client WebSocket closed",
level: "info",
data: { provider, code, reason },
});
emit({
type: "stt.websocket.disconnected",
userId,
provider,
durationMs: performance.now() - connectionStartTime,
});
},
onError(event) {
Sentry.addBreadcrumb({
category: "websocket",
message: "Client WebSocket error",
level: "error",
data: { provider },
});
emit({
type: "stt.websocket.error",
userId,
provider,
error:
event instanceof Error
? event
: new Error("websocket_client_error"),
});
connection.closeConnections(1011, "client_error");
},
};
});
connection.closeConnections(1011, "client_error");

const response = await handler(c, next);
if (!response) {
connection.closeConnections();
return c.json({ error: "upgrade_failed" }, 400);
}
return response;
},
};
);
});

const response = await handler(c, next);
if (!response) {
connection.closeConnections();
return c.json({ error: "upgrade_failed" }, 400);
}
return response;
};
23 changes: 23 additions & 0 deletions crates/owhisper-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,23 @@ pub struct ListenClientBuilder<A: RealtimeSttAdapter = DeepgramAdapter> {
api_base: Option<String>,
api_key: Option<String>,
params: Option<owhisper_interface::ListenParams>,
trace_headers: Option<TraceHeaders>,
_marker: PhantomData<A>,
}

#[derive(Clone, Default)]
pub struct TraceHeaders {
pub sentry_trace: Option<String>,
pub baggage: Option<String>,
}

impl Default for ListenClientBuilder {
fn default() -> Self {
Self {
api_base: None,
api_key: None,
params: None,
trace_headers: None,
_marker: PhantomData,
}
}
Expand All @@ -57,11 +65,17 @@ impl<A: RealtimeSttAdapter> ListenClientBuilder<A> {
self
}

pub fn trace_headers(mut self, headers: TraceHeaders) -> Self {
self.trace_headers = Some(headers);
self
}

pub fn adapter<B: RealtimeSttAdapter>(self) -> ListenClientBuilder<B> {
ListenClientBuilder {
api_base: self.api_base,
api_key: self.api_key,
params: self.params,
trace_headers: self.trace_headers,
_marker: PhantomData,
}
}
Expand Down Expand Up @@ -100,6 +114,15 @@ impl<A: RealtimeSttAdapter> ListenClientBuilder<A> {
request = request.with_header(header_name, header_value);
}

if let Some(ref trace_headers) = self.trace_headers {
if let Some(ref sentry_trace) = trace_headers.sentry_trace {
request = request.with_header("sentry-trace", sentry_trace.clone());
}
if let Some(ref baggage) = trace_headers.baggage {
request = request.with_header("baggage", baggage.clone());
}
}

request
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,5 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

sentry = { workspace = true }
Loading
Loading