From 000d45c4c403fb2c51c7720fd99d0e04f13774e1 Mon Sep 17 00:00:00 2001 From: Gerry Date: Mon, 16 Dec 2024 14:23:23 -0800 Subject: [PATCH 1/4] Add threading.Thread Class Override --- .../clients/common/v1/abstract_sync_websocket.py | 10 +++++++--- deepgram/clients/listen/v1/websocket/client.py | 12 ++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/deepgram/clients/common/v1/abstract_sync_websocket.py b/deepgram/clients/common/v1/abstract_sync_websocket.py index 75c6ab71..52645f9f 100644 --- a/deepgram/clients/common/v1/abstract_sync_websocket.py +++ b/deepgram/clients/common/v1/abstract_sync_websocket.py @@ -4,7 +4,7 @@ import json import time import logging -from typing import Dict, Union, Optional, cast, Any, Callable +from typing import Dict, Union, Optional, cast, Any, Callable, Type from datetime import datetime import threading from abc import ABC, abstractmethod @@ -52,12 +52,14 @@ class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-att _listen_thread: Union[threading.Thread, None] _delegate: Optional[Speaker] = None + _thread_cls: Type[threading.Thread] + _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None _options: Optional[Dict] = None _headers: Optional[Dict] = None - def __init__(self, config: DeepgramClientOptions, endpoint: str = ""): + def __init__(self, config: DeepgramClientOptions, endpoint: str = "", thread_cls: Type[threading.Thread] = threading.Thread) -> None: if config is None: raise DeepgramError("Config is required") if endpoint == "": @@ -73,6 +75,8 @@ def __init__(self, config: DeepgramClientOptions, endpoint: str = ""): self._listen_thread = None + self._thread_cls = thread_cls + # exit self._exit_event = threading.Event() @@ -152,7 +156,7 @@ def start( self._delegate.set_push_callback(self._process_message) else: self._logger.notice("create _listening thread") - self._listen_thread = threading.Thread(target=self._listening) + self._listen_thread = self._thread_cls(target=self._listening) self._listen_thread.start() # debug the threads diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index a8743007..616886b9 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -4,7 +4,7 @@ import json import time import logging -from typing import Dict, Union, Optional, cast, Any, Callable +from typing import Dict, Union, Optional, cast, Any, Callable, Type from datetime import datetime import threading @@ -55,12 +55,14 @@ class ListenWebSocketClient( _flush_thread: Union[threading.Thread, None] _last_datagram: Optional[datetime] = None + _thread_cls: Type[threading.Thread] + _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None _options: Optional[Dict] = None _headers: Optional[Dict] = None - def __init__(self, config: DeepgramClientOptions): + def __init__(self, config: DeepgramClientOptions, thread_cls: Type[threading.Thread] = threading.Thread): if config is None: raise DeepgramError("Config is required") @@ -78,6 +80,8 @@ def __init__(self, config: DeepgramClientOptions): self._last_datagram = None self._lock_flush = threading.Lock() + self._thread_cls = thread_cls + # init handlers self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() @@ -154,7 +158,7 @@ def start( # keepalive thread if self._config.is_keep_alive_enabled(): self._logger.notice("keepalive is enabled") - self._keep_alive_thread = threading.Thread(target=self._keep_alive) + self._keep_alive_thread = self._thread_cls(target=self._keep_alive) self._keep_alive_thread.start() else: self._logger.notice("keepalive is disabled") @@ -162,7 +166,7 @@ def start( # flush thread if self._config.is_auto_flush_reply_enabled(): self._logger.notice("autoflush is enabled") - self._flush_thread = threading.Thread(target=self._flush) + self._flush_thread = self._thread_cls(target=self._flush) self._flush_thread.start() else: self._logger.notice("autoflush is disabled") From 4c0a60135b040d3b5f855f0a44123a47e9218aa4 Mon Sep 17 00:00:00 2001 From: Gerry Date: Mon, 16 Dec 2024 14:28:53 -0800 Subject: [PATCH 2/4] lint --- deepgram/clients/common/v1/abstract_sync_websocket.py | 7 ++++++- deepgram/clients/listen/v1/websocket/client.py | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/deepgram/clients/common/v1/abstract_sync_websocket.py b/deepgram/clients/common/v1/abstract_sync_websocket.py index 52645f9f..1a587a84 100644 --- a/deepgram/clients/common/v1/abstract_sync_websocket.py +++ b/deepgram/clients/common/v1/abstract_sync_websocket.py @@ -59,7 +59,12 @@ class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-att _options: Optional[Dict] = None _headers: Optional[Dict] = None - def __init__(self, config: DeepgramClientOptions, endpoint: str = "", thread_cls: Type[threading.Thread] = threading.Thread) -> None: + def __init__( + self, + config: DeepgramClientOptions, + endpoint: str = "", + thread_cls: Type[threading.Thread] = threading.Thread, + ): if config is None: raise DeepgramError("Config is required") if endpoint == "": diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index 616886b9..28ce396f 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -62,7 +62,11 @@ class ListenWebSocketClient( _options: Optional[Dict] = None _headers: Optional[Dict] = None - def __init__(self, config: DeepgramClientOptions, thread_cls: Type[threading.Thread] = threading.Thread): + def __init__( + self, + config: DeepgramClientOptions, + thread_cls: Type[threading.Thread] = threading.Thread, + ): if config is None: raise DeepgramError("Config is required") From e84be76608d52c45e58097337f2ae959e2964908 Mon Sep 17 00:00:00 2001 From: Gerry Date: Tue, 17 Dec 2024 09:53:02 -0800 Subject: [PATCH 3/4] pass in --- deepgram/clients/listen/v1/websocket/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index 28ce396f..28549d9d 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -92,7 +92,11 @@ def __init__( } # call the parent constructor - super().__init__(self._config, self._endpoint) + super().__init__( + config=self._config, + endpoint=self._endpoint, + thread_cls=self._thread_cls, + ) # pylint: disable=too-many-statements,too-many-branches def start( From e4b2ab72921f1ca159cfdd350c110ad248b6cf07 Mon Sep 17 00:00:00 2001 From: Gerry Date: Mon, 23 Dec 2024 17:47:57 -0800 Subject: [PATCH 4/4] address task rabbit comments --- deepgram/clients/common/v1/abstract_sync_websocket.py | 6 ++++++ deepgram/clients/listen/v1/websocket/client.py | 8 +++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/deepgram/clients/common/v1/abstract_sync_websocket.py b/deepgram/clients/common/v1/abstract_sync_websocket.py index 1a587a84..6dc44375 100644 --- a/deepgram/clients/common/v1/abstract_sync_websocket.py +++ b/deepgram/clients/common/v1/abstract_sync_websocket.py @@ -38,6 +38,12 @@ class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-att This class provides methods to establish a WebSocket connection generically for use in all WebSocket clients. + + Args: + config (DeepgramClientOptions): all the options for the client + endpoint (str): the endpoint to connect to + thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, + defaults to threading.Thread. Useful for custom thread management like ContextVar support. """ _logger: verboselogs.VerboseLogger diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index 28549d9d..e6633689 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -38,10 +38,12 @@ class ListenWebSocketClient( """ Client for interacting with Deepgram's live transcription services over WebSockets. - This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. + This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. - Args: - config (DeepgramClientOptions): all the options for the client. + Args: + config (DeepgramClientOptions): all the options for the client. + thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, + defaults to threading.Thread. Useful for custom thread management like ContextVar support. """ _logger: verboselogs.VerboseLogger