diff --git a/examples/agent_dispatch.py b/examples/agent_dispatch.py index 9e754b44..c53c4ae2 100644 --- a/examples/agent_dispatch.py +++ b/examples/agent_dispatch.py @@ -43,11 +43,7 @@ async def create_token_with_agent_dispatch() -> str: .with_grants(api.VideoGrants(room_join=True, room=room_name)) .with_room_config( api.RoomConfiguration( - agents=[ - api.RoomAgentDispatch( - agent_name="test-agent", metadata="my_metadata" - ) - ], + agents=[api.RoomAgentDispatch(agent_name="test-agent", metadata="my_metadata")], ), ) .to_jwt() diff --git a/examples/basic_room.py b/examples/basic_room.py index bf5a8dde..ceff05fd 100644 --- a/examples/basic_room.py +++ b/examples/basic_room.py @@ -12,15 +12,11 @@ async def main(room: rtc.Room) -> None: @room.on("participant_connected") def on_participant_connected(participant: rtc.RemoteParticipant) -> None: - logging.info( - "participant connected: %s %s", participant.sid, participant.identity - ) + logging.info("participant connected: %s %s", participant.sid, participant.identity) @room.on("participant_disconnected") def on_participant_disconnected(participant: rtc.RemoteParticipant): - logging.info( - "participant disconnected: %s %s", participant.sid, participant.identity - ) + logging.info("participant disconnected: %s %s", participant.sid, participant.identity) @room.on("local_track_published") def on_local_track_published( @@ -78,9 +74,7 @@ def on_track_unsubscribed( logging.info("track unsubscribed: %s", publication.sid) @room.on("track_muted") - def on_track_muted( - publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant - ): + def on_track_muted(publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): logging.info("track muted: %s", publication.sid) @room.on("track_unmuted") @@ -94,9 +88,7 @@ def on_data_received(data: rtc.DataPacket): logging.info("received data from %s: %s", data.participant.identity, data.data) @room.on("connection_quality_changed") - def on_connection_quality_changed( - participant: rtc.Participant, quality: rtc.ConnectionQuality - ): + def on_connection_quality_changed(participant: rtc.Participant, quality: rtc.ConnectionQuality): logging.info("connection quality changed for %s", participant.identity) @room.on("track_subscription_failed") diff --git a/examples/data-streams/data_streams.py b/examples/data-streams/data_streams.py index 997055bd..c35152c2 100644 --- a/examples/data-streams/data_streams.py +++ b/examples/data-streams/data_streams.py @@ -27,20 +27,12 @@ async def greetParticipant(identity: str): topic="files", ) - async def on_chat_message_received( - reader: rtc.TextStreamReader, participant_identity: str - ): + async def on_chat_message_received(reader: rtc.TextStreamReader, participant_identity: str): full_text = await reader.read_all() - logger.info( - "Received chat message from %s: '%s'", participant_identity, full_text - ) + logger.info("Received chat message from %s: '%s'", participant_identity, full_text) - async def on_welcome_image_received( - reader: rtc.ByteStreamReader, participant_identity: str - ): - logger.info( - "Received image from %s: '%s'", participant_identity, reader.info["name"] - ) + async def on_welcome_image_received(reader: rtc.ByteStreamReader, participant_identity: str): + logger.info("Received image from %s: '%s'", participant_identity, reader.info["name"]) with open(reader.info["name"], mode="wb") as f: async for chunk in reader: f.write(chunk) @@ -49,9 +41,7 @@ async def on_welcome_image_received( @room.on("participant_connected") def on_participant_connected(participant: rtc.RemoteParticipant): - logger.info( - "participant connected: %s %s", participant.sid, participant.identity - ) + logger.info("participant connected: %s %s", participant.sid, participant.identity) asyncio.create_task(greetParticipant(participant.identity)) room.set_text_stream_handler( diff --git a/examples/e2ee.py b/examples/e2ee.py index fb1f79f4..28212059 100644 --- a/examples/e2ee.py +++ b/examples/e2ee.py @@ -97,9 +97,7 @@ async def draw_cube(source: rtc.VideoSource): async def main(room: rtc.Room): @room.on("e2ee_state_changed") - def on_e2ee_state_changed( - participant: rtc.Participant, state: rtc.EncryptionState - ) -> None: + def on_e2ee_state_changed(participant: rtc.Participant, state: rtc.EncryptionState) -> None: logging.info("e2ee state changed: %s %s", participant.identity, state) logging.info("connecting to %s", URL) diff --git a/examples/face_landmark/face_landmark.py b/examples/face_landmark/face_landmark.py index edc20f36..99fe04e3 100644 --- a/examples/face_landmark/face_landmark.py +++ b/examples/face_landmark/face_landmark.py @@ -72,9 +72,7 @@ def draw_landmarks_on_image(rgb_image, detection_result): face_landmarks_proto = landmark_pb2.NormalizedLandmarkList() face_landmarks_proto.landmark.extend( [ - landmark_pb2.NormalizedLandmark( - x=landmark.x, y=landmark.y, z=landmark.z - ) + landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks ] ) @@ -113,9 +111,7 @@ async def frame_loop(video_stream: rtc.VideoStream) -> None: arr = arr.reshape((buffer.height, buffer.width, 3)) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=arr) - detection_result = landmarker.detect_for_video( - mp_image, frame_event.timestamp_us - ) + detection_result = landmarker.detect_for_video(mp_image, frame_event.timestamp_us) draw_landmarks_on_image(arr, detection_result) diff --git a/examples/room_example.py b/examples/room_example.py index 336a762d..6efb0361 100644 --- a/examples/room_example.py +++ b/examples/room_example.py @@ -15,9 +15,7 @@ async def main(): @room.on("participant_connected") def on_participant_connected(participant: rtc.RemoteParticipant): - logger.info( - "participant connected: %s %s", participant.sid, participant.identity - ) + logger.info("participant connected: %s %s", participant.sid, participant.identity) async def receive_frames(stream: rtc.VideoStream): async for frame in stream: diff --git a/examples/rpc.py b/examples/rpc.py index ac1ce337..005701fa 100644 --- a/examples/rpc.py +++ b/examples/rpc.py @@ -10,9 +10,7 @@ LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET") LIVEKIT_URL = os.getenv("LIVEKIT_URL") if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL: - raise ValueError( - "Missing required environment variables. Please check your .env.local file." - ) + raise ValueError("Missing required environment variables. Please check your .env.local file.") async def main(): @@ -82,9 +80,7 @@ async def main(): finally: # Clean up all rooms print("Disconnecting all participants...") - await asyncio.gather( - *(room.disconnect() for room in rooms), return_exceptions=True - ) + await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True) print("Cleanup complete") @@ -121,9 +117,7 @@ async def divide_method( json_data = json.loads(data.payload) dividend = json_data["dividend"] divisor = json_data["divisor"] - print( - f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}." - ) + print(f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}.") result = dividend / divisor return json.dumps({"result": result}) @@ -132,9 +126,7 @@ async def divide_method( async def long_calculation_method( data: RpcInvocationData, ): - print( - f"[Math Genius] Starting a very long calculation for {data.caller_identity}" - ) + print(f"[Math Genius] Starting a very long calculation for {data.caller_identity}") print( f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds" ) @@ -202,9 +194,7 @@ async def perform_divide(room: rtc.Room): print(f"[Caller] The result is {parsed_response['result']}") except rtc.RpcError as error: if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR: - print( - "[Caller] Aww something went wrong with that one, lets try something else." - ) + print("[Caller] Aww something went wrong with that one, lets try something else.") else: print(f"[Caller] RPC call failed with unexpected RpcError: {error}") except Exception as error: diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py index 9fbf1e76..8c600b9a 100644 --- a/examples/video-stream/audio_wave.py +++ b/examples/video-stream/audio_wave.py @@ -104,9 +104,7 @@ def draw_timestamp(self, canvas: np.ndarray, fps: float): y = int((height - text_height) * 0.4 + baseline) cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) - def draw_current_wave( - self, canvas: np.ndarray, audio_samples: np.ndarray - ) -> np.ndarray: + def draw_current_wave(self, canvas: np.ndarray, audio_samples: np.ndarray) -> np.ndarray: """Draw the current waveform and return the current values""" height, width = canvas.shape[:2] center_y = height // 2 + 100 @@ -136,9 +134,7 @@ def draw_volume_history(self, canvas: np.ndarray, current_volume: float): center_y = height // 2 self.volume_history.append(current_volume) - cv2.line( - canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1 - ) + cv2.line(canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1) volume_x = np.linspace(0, width, len(self.volume_history), dtype=int) volume_y = center_y - 250 + (np.array(self.volume_history) * 200) @@ -158,9 +154,7 @@ async def video_generator( input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], av_sync: rtc.AVSynchronizer, # only used for drawing the actual fps on the video ) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]: - canvas = np.zeros( - (media_info.video_height, media_info.video_width, 4), dtype=np.uint8 - ) + canvas = np.zeros((media_info.video_height, media_info.video_width, 4), dtype=np.uint8) canvas.fill(255) def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: diff --git a/examples/video-stream/video_play.py b/examples/video-stream/video_play.py index 72b9b0bc..f40ed4ec 100644 --- a/examples/video-stream/video_play.py +++ b/examples/video-stream/video_play.py @@ -14,9 +14,7 @@ try: import av except ImportError: - raise RuntimeError( - "av is required to run this example, install with `pip install av`" - ) + raise RuntimeError("av is required to run this example, install with `pip install av`") # ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index 2a1e0553..7411b4ac 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -10,9 +10,7 @@ class Service(ABC): - def __init__( - self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str - ): + def __init__(self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str): self._client = TwirpClient(session, host, "livekit") self.api_key = api_key self.api_secret = api_secret diff --git a/livekit-api/livekit/api/access_token.py b/livekit-api/livekit/api/access_token.py index dd8bd277..0a86ed3b 100644 --- a/livekit-api/livekit/api/access_token.py +++ b/livekit-api/livekit/api/access_token.py @@ -91,9 +91,7 @@ def asdict(self) -> dict: claims = dataclasses.asdict( self, dict_factory=lambda items: { - snake_to_lower_camel(k): v - for k, v in items - if v is not None and v != "" + snake_to_lower_camel(k): v for k, v in items if v is not None and v != "" }, ) if self.room_config: @@ -178,13 +176,9 @@ def to_jwt(self) -> str: { "sub": self.identity, "iss": self.api_key, - "nbf": calendar.timegm( - datetime.datetime.now(datetime.timezone.utc).utctimetuple() - ), + "nbf": calendar.timegm(datetime.datetime.now(datetime.timezone.utc).utctimetuple()), "exp": calendar.timegm( - ( - datetime.datetime.now(datetime.timezone.utc) + self.ttl - ).utctimetuple() + (datetime.datetime.now(datetime.timezone.utc) + self.ttl).utctimetuple() ), } ) @@ -220,16 +214,12 @@ def verify(self, token: str) -> Claims: video_dict = claims.get("video", dict()) video_dict = {camel_to_snake(k): v for k, v in video_dict.items()} - video_dict = { - k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__ - } + video_dict = {k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__} video = VideoGrants(**video_dict) sip_dict = claims.get("sip", dict()) sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()} - sip_dict = { - k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__ - } + sip_dict = {k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__} sip = SIPGrants(**sip_dict) grant_claims = Claims( @@ -259,6 +249,4 @@ def camel_to_snake(t: str): def snake_to_lower_camel(t: str): - return "".join( - word.capitalize() if i else word for i, word in enumerate(t.split("_")) - ) + return "".join(word.capitalize() if i else word for i, word in enumerate(t.split("_"))) diff --git a/livekit-api/livekit/api/agent_dispatch_service.py b/livekit-api/livekit/api/agent_dispatch_service.py index 5cdfc1e7..0c8f1d7a 100644 --- a/livekit-api/livekit/api/agent_dispatch_service.py +++ b/livekit-api/livekit/api/agent_dispatch_service.py @@ -26,9 +26,7 @@ class AgentDispatchService(Service): ``` """ - def __init__( - self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str - ): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): super().__init__(session, url, api_key, api_secret) async def create_dispatch(self, req: CreateAgentDispatchRequest) -> AgentDispatch: @@ -89,9 +87,7 @@ async def list_dispatch(self, room_name: str) -> list[AgentDispatch]: ) return list(res.agent_dispatches) - async def get_dispatch( - self, dispatch_id: str, room_name: str - ) -> Optional[AgentDispatch]: + async def get_dispatch(self, dispatch_id: str, room_name: str) -> Optional[AgentDispatch]: """Get an Agent dispatch by ID Args: diff --git a/livekit-api/livekit/api/egress_service.py b/livekit-api/livekit/api/egress_service.py index b5d4984e..862bd1a6 100644 --- a/livekit-api/livekit/api/egress_service.py +++ b/livekit-api/livekit/api/egress_service.py @@ -33,14 +33,10 @@ class EgressService(Service): Also see https://docs.livekit.io/home/egress/overview/ """ - def __init__( - self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str - ): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): super().__init__(session, url, api_key, api_secret) - async def start_room_composite_egress( - self, start: RoomCompositeEgressRequest - ) -> EgressInfo: + async def start_room_composite_egress(self, start: RoomCompositeEgressRequest) -> EgressInfo: return await self._client.request( SVC, "StartRoomCompositeEgress", @@ -58,9 +54,7 @@ async def start_web_egress(self, start: WebEgressRequest) -> EgressInfo: EgressInfo, ) - async def start_participant_egress( - self, start: ParticipantEgressRequest - ) -> EgressInfo: + async def start_participant_egress(self, start: ParticipantEgressRequest) -> EgressInfo: return await self._client.request( SVC, "StartParticipantEgress", @@ -69,9 +63,7 @@ async def start_participant_egress( EgressInfo, ) - async def start_track_composite_egress( - self, start: TrackCompositeEgressRequest - ) -> EgressInfo: + async def start_track_composite_egress(self, start: TrackCompositeEgressRequest) -> EgressInfo: return await self._client.request( SVC, "StartTrackCompositeEgress", diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py index 7b658bb8..296dabbe 100644 --- a/livekit-api/livekit/api/ingress_service.py +++ b/livekit-api/livekit/api/ingress_service.py @@ -28,9 +28,7 @@ class IngressService(Service): Also see https://docs.livekit.io/home/ingress/overview/ """ - def __init__( - self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str - ): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): super().__init__(session, url, api_key, api_secret) async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo: diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index 63efda02..82a93e95 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -53,9 +53,7 @@ def __init__( self._ingress = IngressService(self._session, url, api_key, api_secret) self._egress = EgressService(self._session, url, api_key, api_secret) self._sip = SipService(self._session, url, api_key, api_secret) - self._agent_dispatch = AgentDispatchService( - self._session, url, api_key, api_secret - ) + self._agent_dispatch = AgentDispatchService(self._session, url, api_key, api_secret) @property def agent_dispatch(self) -> AgentDispatchService: diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index e6d1a39a..023d041d 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -41,9 +41,7 @@ class RoomService(Service): Also see https://docs.livekit.io/home/server/managing-rooms/ and https://docs.livekit.io/home/server/managing-participants/ """ - def __init__( - self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str - ): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): super().__init__(session, url, api_key, api_secret) async def create_room( @@ -130,9 +128,7 @@ async def update_room_metadata(self, update: UpdateRoomMetadataRequest) -> Room: Room, ) - async def list_participants( - self, list: ListParticipantsRequest - ) -> ListParticipantsResponse: + async def list_participants(self, list: ListParticipantsRequest) -> ListParticipantsResponse: """Lists all participants in a room. Args: @@ -226,9 +222,7 @@ async def mute_published_track( MuteRoomTrackResponse, ) - async def update_participant( - self, update: UpdateParticipantRequest - ) -> ParticipantInfo: + async def update_participant(self, update: UpdateParticipantRequest) -> ParticipantInfo: """Updates a participant's metadata or permissions. Args: diff --git a/livekit-api/livekit/api/sip_service.py b/livekit-api/livekit/api/sip_service.py index e404e0f6..4cdc6453 100644 --- a/livekit-api/livekit/api/sip_service.py +++ b/livekit-api/livekit/api/sip_service.py @@ -41,9 +41,7 @@ class SipService(Service): ``` """ - def __init__( - self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str - ): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): super().__init__(session, url, api_key, api_secret) async def create_sip_trunk(self, create: CreateSIPTrunkRequest) -> SIPTrunkInfo: diff --git a/livekit-api/livekit/api/twirp_client.py b/livekit-api/livekit/api/twirp_client.py index 1c930270..decc952f 100644 --- a/livekit-api/livekit/api/twirp_client.py +++ b/livekit-api/livekit/api/twirp_client.py @@ -90,9 +90,7 @@ async def request( headers["Content-Type"] = "application/protobuf" serialized_data = data.SerializeToString() - async with self._session.post( - url, headers=headers, data=serialized_data - ) as resp: + async with self._session.post(url, headers=headers, data=serialized_data) as resp: if resp.status == 200: return response_class.FromString(await resp.read()) else: diff --git a/livekit-api/tests/test_access_token.py b/livekit-api/tests/test_access_token.py index 245e2eaa..48835ea1 100644 --- a/livekit-api/tests/test_access_token.py +++ b/livekit-api/tests/test_access_token.py @@ -75,11 +75,7 @@ def test_agent_config(): def test_verify_token_invalid(): - token = ( - AccessToken(TEST_API_KEY, TEST_API_SECRET) - .with_identity("test_identity") - .to_jwt() - ) + token = AccessToken(TEST_API_KEY, TEST_API_SECRET).with_identity("test_identity").to_jwt() token_verifier = TokenVerifier(TEST_API_KEY, "invalid_secret") with pytest.raises(Exception): diff --git a/livekit-rtc/livekit/rtc/audio_filter.py b/livekit-rtc/livekit/rtc/audio_filter.py index 8de4de03..a6ec7296 100644 --- a/livekit-rtc/livekit/rtc/audio_filter.py +++ b/livekit-rtc/livekit/rtc/audio_filter.py @@ -5,9 +5,7 @@ class AudioFilter: - def __init__( - self, module_id: str, path: str, dependencies: Optional[List[str]] = None - ) -> None: + def __init__(self, module_id: str, path: str, dependencies: Optional[List[str]] = None) -> None: self._path = path req = proto_ffi.FfiRequest() diff --git a/livekit-rtc/livekit/rtc/audio_frame.py b/livekit-rtc/livekit/rtc/audio_frame.py index 4c3d6ede..4c364efc 100644 --- a/livekit-rtc/livekit/rtc/audio_frame.py +++ b/livekit-rtc/livekit/rtc/audio_frame.py @@ -50,9 +50,7 @@ def __init__( """ data = memoryview(data).cast("B") - if len(data) < num_channels * samples_per_channel * ctypes.sizeof( - ctypes.c_int16 - ): + if len(data) < num_channels * samples_per_channel * ctypes.sizeof(ctypes.c_int16): raise ValueError( "data length must be >= num_channels * samples_per_channel * sizeof(int16)" ) @@ -69,9 +67,7 @@ def __init__( self._samples_per_channel = samples_per_channel @staticmethod - def create( - sample_rate: int, num_channels: int, samples_per_channel: int - ) -> "AudioFrame": + def create(sample_rate: int, num_channels: int, samples_per_channel: int) -> "AudioFrame": """ Create a new empty AudioFrame instance with specified sample rate, number of channels, and samples per channel. @@ -95,9 +91,7 @@ def _from_owned_info(owned_info: proto_audio.OwnedAudioFrameBuffer) -> "AudioFra cdata = (ctypes.c_int16 * size).from_address(info.data_ptr) data = bytearray(cdata) FfiHandle(owned_info.handle.id) - return AudioFrame( - data, info.sample_rate, info.num_channels, info.samples_per_channel - ) + return AudioFrame(data, info.sample_rate, info.num_channels, info.samples_per_channel) def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame": """Resample the audio frame to the given sample rate and number of channels. @@ -234,12 +228,8 @@ def validate_audio_frame(value: Any) -> "AudioFrame": core_schema.model_fields_schema( { "data": core_schema.model_field(core_schema.str_schema()), - "sample_rate": core_schema.model_field( - core_schema.int_schema() - ), - "num_channels": core_schema.model_field( - core_schema.int_schema() - ), + "sample_rate": core_schema.model_field(core_schema.int_schema()), + "num_channels": core_schema.model_field(core_schema.int_schema()), "samples_per_channel": core_schema.model_field( core_schema.int_schema() ), @@ -248,9 +238,7 @@ def validate_audio_frame(value: Any) -> "AudioFrame": core_schema.no_info_plain_validator_function(validate_audio_frame), ] ), - python_schema=core_schema.no_info_plain_validator_function( - validate_audio_frame - ), + python_schema=core_schema.no_info_plain_validator_function(validate_audio_frame), serialization=core_schema.plain_serializer_function_ser_schema( lambda instance: { "data": base64.b64encode(instance.data).decode("utf-8"), diff --git a/livekit-rtc/livekit/rtc/audio_resampler.py b/livekit-rtc/livekit/rtc/audio_resampler.py index c821d3b8..a8559b87 100644 --- a/livekit-rtc/livekit/rtc/audio_resampler.py +++ b/livekit-rtc/livekit/rtc/audio_resampler.py @@ -117,8 +117,7 @@ def push(self, data: bytearray | AudioFrame) -> list[AudioFrame]: output_data, self._output_rate, self._num_channels, - len(output_data) - // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), + len(output_data) // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), ) ] @@ -153,8 +152,7 @@ def flush(self) -> list[AudioFrame]: output_data, self._output_rate, self._num_channels, - len(output_data) - // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), + len(output_data) // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), ) ] diff --git a/livekit-rtc/livekit/rtc/audio_source.py b/livekit-rtc/livekit/rtc/audio_source.py index 839203d8..63cc1a5d 100644 --- a/livekit-rtc/livekit/rtc/audio_source.py +++ b/livekit-rtc/livekit/rtc/audio_source.py @@ -56,9 +56,7 @@ def __init__( self._loop = loop or asyncio.get_event_loop() req = proto_ffi.FfiRequest() - req.new_audio_source.type = ( - proto_audio_frame.AudioSourceType.AUDIO_SOURCE_NATIVE - ) + req.new_audio_source.type = proto_audio_frame.AudioSourceType.AUDIO_SOURCE_NATIVE req.new_audio_source.sample_rate = sample_rate req.new_audio_source.num_channels = num_channels req.new_audio_source.queue_size_ms = queue_size_ms @@ -142,8 +140,7 @@ async def capture_frame(self, frame: AudioFrame) -> None: try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( - lambda e: e.capture_audio_frame.async_id - == resp.capture_audio_frame.async_id + lambda e: e.capture_audio_frame.async_id == resp.capture_audio_frame.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 37008664..b06f05d4 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -226,9 +226,7 @@ def _create_owned_stream(self) -> Any: if self._audio_filter_module is not None: new_audio_stream.audio_filter_module_id = self._audio_filter_module if self._audio_filter_options is not None: - new_audio_stream.audio_filter_options = json.dumps( - self._audio_filter_options - ) + new_audio_stream.audio_filter_options = json.dumps(self._audio_filter_options) resp = FfiClient.instance.request(req) return resp.new_audio_stream.stream @@ -237,19 +235,13 @@ def _create_owned_stream_from_participant( ) -> Any: req = proto_ffi.FfiRequest() audio_stream_from_participant = req.audio_stream_from_participant - audio_stream_from_participant.participant_handle = ( - participant._ffi_handle.handle - ) + audio_stream_from_participant.participant_handle = participant._ffi_handle.handle audio_stream_from_participant.sample_rate = self._sample_rate audio_stream_from_participant.num_channels = self._num_channels - audio_stream_from_participant.type = ( - proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE - ) + audio_stream_from_participant.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE audio_stream_from_participant.track_source = track_source if self._audio_filter_module is not None: - audio_stream_from_participant.audio_filter_module_id = ( - self._audio_filter_module - ) + audio_stream_from_participant.audio_filter_module_id = self._audio_filter_module if self._audio_filter_options is not None: audio_stream_from_participant.audio_filter_options = json.dumps( self._audio_filter_options diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 4e8054de..8344f67c 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -112,9 +112,7 @@ def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None: attributes=dict(header.attributes), name=header.byte_header.name, ) - self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue( - capacity - ) + self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue(capacity) async def _on_chunk_update(self, chunk: proto_DataStream.Chunk): await self._queue.put(chunk) @@ -182,8 +180,7 @@ async def _send_header(self): try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( - lambda e: e.send_stream_header.async_id - == resp.send_stream_header.async_id + lambda e: e.send_stream_header.async_id == resp.send_stream_header.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) @@ -207,8 +204,7 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk): try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( - lambda e: e.send_stream_chunk.async_id - == resp.send_stream_chunk.async_id + lambda e: e.send_stream_chunk.async_id == resp.send_stream_chunk.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) @@ -229,8 +225,7 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( - lambda e: e.send_stream_trailer.async_id - == resp.send_stream_trailer.async_id + lambda e: e.send_stream_trailer.async_id == resp.send_stream_trailer.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) @@ -238,9 +233,7 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): if cb.send_stream_chunk.error: raise ConnectionError(cb.send_stream_trailer.error) - async def aclose( - self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None - ): + async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None): if self._closed: raise RuntimeError("Stream already closed") self._closed = True @@ -343,8 +336,7 @@ def __init__( async def write(self, data: bytes): async with self._write_lock: chunked_data = [ - data[i : i + STREAM_CHUNK_SIZE] - for i in range(0, len(data), STREAM_CHUNK_SIZE) + data[i : i + STREAM_CHUNK_SIZE] for i in range(0, len(data), STREAM_CHUNK_SIZE) ] for chunk in chunked_data: diff --git a/livekit-rtc/livekit/rtc/e2ee.py b/livekit-rtc/livekit/rtc/e2ee.py index 58cf7e3b..6fe62258 100644 --- a/livekit-rtc/livekit/rtc/e2ee.py +++ b/livekit-rtc/livekit/rtc/e2ee.py @@ -180,9 +180,7 @@ def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: class FrameCryptor: - def __init__( - self, room_handle: int, participant_identity: str, key_index: int, enabled: bool - ): + def __init__(self, room_handle: int, participant_identity: str, key_index: int, enabled: bool): self._room_handle = room_handle self._enabled = enabled self._participant_identity = participant_identity @@ -244,9 +242,7 @@ def __init__(self, room_handle: int, options: Optional[E2EEOptions]): self._enabled = options is not None if options is not None: - self._key_provider = KeyProvider( - self._room_handle, options.key_provider_options - ) + self._key_provider = KeyProvider(self._room_handle, options.key_provider_options) @property def key_provider(self) -> Optional[KeyProvider]: diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index e22f6648..5bd1f205 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -139,10 +139,7 @@ def disconnect_reason( - USER_REJECTED - SIP callee rejected the call (busy) - SIP_TRUNK_FAILURE - SIP protocol failure or unexpected response """ - if ( - self._info.disconnect_reason - == proto_participant.DisconnectReason.UNKNOWN_REASON - ): + if self._info.disconnect_reason == proto_participant.DisconnectReason.UNKNOWN_REASON: return None return self._info.disconnect_reason @@ -275,8 +272,7 @@ async def publish_transcription(self, transcription: Transcription) -> None: try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( - lambda e: e.publish_transcription.async_id - == resp.publish_transcription.async_id + lambda e: e.publish_transcription.async_id == resp.publish_transcription.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) @@ -332,9 +328,7 @@ async def perform_rpc( def register_rpc_method( self, method_name: str, - handler: Optional[ - Callable[[RpcInvocationData], Union[Awaitable[str], str]] - ] = None, + handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None, ) -> Union[None, Callable]: """ Establishes the participant as a receiver for calls of the specified RPC method. @@ -418,15 +412,9 @@ def set_track_subscription_permissions( participant_permissions = [] req = proto_ffi.FfiRequest() - req.set_track_subscription_permissions.local_participant_handle = ( - self._ffi_handle.handle - ) - req.set_track_subscription_permissions.all_participants_allowed = ( - allow_all_participants - ) - req.set_track_subscription_permissions.permissions.extend( - participant_permissions - ) + req.set_track_subscription_permissions.local_participant_handle = self._ffi_handle.handle + req.set_track_subscription_permissions.all_participants_allowed = allow_all_participants + req.set_track_subscription_permissions.permissions.extend(participant_permissions) FfiClient.instance.request(req) async def _handle_rpc_method_invocation( @@ -441,9 +429,7 @@ async def _handle_rpc_method_invocation( response_error: Optional[RpcError] = None response_payload: Optional[str] = None - params = RpcInvocationData( - request_id, caller_identity, payload, response_timeout - ) + params = RpcInvocationData(request_id, caller_identity, payload, response_timeout) handler = self._rpc_handlers.get(method) @@ -452,9 +438,7 @@ async def _handle_rpc_method_invocation( else: try: if asyncio.iscoroutinefunction(handler): - async_handler = cast( - Callable[[RpcInvocationData], Awaitable[str]], handler - ) + async_handler = cast(Callable[[RpcInvocationData], Awaitable[str]], handler) async def run_handler(): try: @@ -470,9 +454,7 @@ async def run_handler(): except asyncio.TimeoutError: raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT) except asyncio.CancelledError: - raise RpcError._built_in( - RpcError.ErrorCode.RECIPIENT_DISCONNECTED - ) + raise RpcError._built_in(RpcError.ErrorCode.RECIPIENT_DISCONNECTED) else: sync_handler = cast(Callable[[RpcInvocationData], str], handler) response_payload = sync_handler(params) @@ -484,9 +466,7 @@ async def run_handler(): "Returning APPLICATION_ERROR instead. " f"Original error: {error}" ) - response_error = RpcError._built_in( - RpcError.ErrorCode.APPLICATION_ERROR - ) + response_error = RpcError._built_in(RpcError.ErrorCode.APPLICATION_ERROR) req = proto_ffi.FfiRequest( rpc_method_invocation_response=RpcMethodInvocationResponseRequest( @@ -520,8 +500,7 @@ async def set_metadata(self, metadata: str) -> None: try: resp = FfiClient.instance.request(req) await queue.wait_for( - lambda e: e.set_local_metadata.async_id - == resp.set_local_metadata.async_id + lambda e: e.set_local_metadata.async_id == resp.set_local_metadata.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) @@ -573,8 +552,7 @@ async def set_attributes(self, attributes: dict[str, str]) -> None: try: resp = FfiClient.instance.request(req) await queue.wait_for( - lambda e: e.set_local_attributes.async_id - == resp.set_local_attributes.async_id + lambda e: e.set_local_attributes.async_id == resp.set_local_attributes.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) @@ -675,9 +653,7 @@ async def send_file( file_name = os.path.basename(file_path) mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: - mime_type = ( - "application/octet-stream" # Fallback MIME type for unknown files - ) + mime_type = "application/octet-stream" # Fallback MIME type for unknown files writer: ByteStreamWriter = await self.stream_bytes( name=file_name, diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index e7886383..db0f92bf 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -315,9 +315,7 @@ def on_participant_connected(participant): """ return super().on(event, callback) - async def connect( - self, url: str, token: str, options: RoomOptions = RoomOptions() - ) -> None: + async def connect(self, url: str, token: str, options: RoomOptions = RoomOptions()) -> None: """Connects to a LiveKit room using the specified URL and token. Parameters: @@ -370,9 +368,7 @@ def on_participant_connected(participant): req.connect.options.rtc_config.continual_gathering_policy = ( options.rtc_config.continual_gathering_policy ) # type: ignore - req.connect.options.rtc_config.ice_servers.extend( - options.rtc_config.ice_servers - ) + req.connect.options.rtc_config.ice_servers.extend(options.rtc_config.ice_servers) # subscribe before connecting so we don't miss any events self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) @@ -447,9 +443,7 @@ async def disconnect(self) -> None: queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) - await queue.wait_for( - lambda e: e.disconnect.async_id == resp.disconnect.async_id - ) + await queue.wait_for(lambda e: e.disconnect.async_id == resp.disconnect.async_id) finally: FfiClient.instance.queue.unsubscribe(queue) await self._task @@ -487,10 +481,7 @@ def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): if self._local_participant is None: return - if ( - rpc_invocation.local_participant_handle - == self._local_participant._ffi_handle.handle - ): + if rpc_invocation.local_participant_handle == self._local_participant._ffi_handle.handle: task = self._loop.create_task( self._local_participant._handle_rpc_method_invocation( rpc_invocation.invocation_id, @@ -507,16 +498,12 @@ def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): def _on_room_event(self, event: proto_room.RoomEvent): which = event.WhichOneof("message") if which == "participant_connected": - rparticipant = self._create_remote_participant( - event.participant_connected.info - ) + rparticipant = self._create_remote_participant(event.participant_connected.info) self.emit("participant_connected", rparticipant) elif which == "participant_disconnected": identity = event.participant_disconnected.participant_identity rparticipant = self._remote_participants.pop(identity) - rparticipant._info.disconnect_reason = ( - event.participant_disconnected.disconnect_reason - ) + rparticipant._info.disconnect_reason = event.participant_disconnected.disconnect_reason self.emit("participant_disconnected", rparticipant) elif which == "local_track_published": sid = event.local_track_published.track_sid @@ -533,16 +520,12 @@ def _on_room_event(self, event: proto_room.RoomEvent): lpublication._first_subscription.set_result(None) self.emit("local_track_subscribed", lpublication.track) elif which == "track_published": - rparticipant = self._remote_participants[ - event.track_published.participant_identity - ] + rparticipant = self._remote_participants[event.track_published.participant_identity] rpublication = RemoteTrackPublication(event.track_published.publication) rparticipant._track_publications[rpublication.sid] = rpublication self.emit("track_published", rpublication, rparticipant) elif which == "track_unpublished": - rparticipant = self._remote_participants[ - event.track_unpublished.participant_identity - ] + rparticipant = self._remote_participants[event.track_unpublished.participant_identity] rpublication = rparticipant._track_publications.pop( event.track_unpublished.publication_sid ) @@ -550,29 +533,21 @@ def _on_room_event(self, event: proto_room.RoomEvent): elif which == "track_subscribed": owned_track_info = event.track_subscribed.track track_info = owned_track_info.info - rparticipant = self._remote_participants[ - event.track_subscribed.participant_identity - ] + rparticipant = self._remote_participants[event.track_subscribed.participant_identity] rpublication = rparticipant.track_publications[track_info.sid] rpublication.subscribed = True if track_info.kind == TrackKind.KIND_VIDEO: remote_video_track = RemoteVideoTrack(owned_track_info) rpublication.track = remote_video_track - self.emit( - "track_subscribed", remote_video_track, rpublication, rparticipant - ) + self.emit("track_subscribed", remote_video_track, rpublication, rparticipant) elif track_info.kind == TrackKind.KIND_AUDIO: remote_audio_track = RemoteAudioTrack(owned_track_info) rpublication.track = remote_audio_track - self.emit( - "track_subscribed", remote_audio_track, rpublication, rparticipant - ) + self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant) elif which == "track_unsubscribed": identity = event.track_unsubscribed.participant_identity rparticipant = self._remote_participants[identity] - rpublication = rparticipant.track_publications[ - event.track_unsubscribed.track_sid - ] + rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid] track = rpublication.track rpublication.track = None rpublication.subscribed = False @@ -646,9 +621,7 @@ def _on_room_event(self, event: proto_room.RoomEvent): assert isinstance(participant, Participant) old_name = participant.name participant._info.name = event.participant_name_changed.name - self.emit( - "participant_name_changed", participant, old_name, participant.name - ) + self.emit("participant_name_changed", participant, old_name, participant.name) elif which == "participant_attributes_changed": identity = event.participant_attributes_changed.participant_identity attributes = event.participant_attributes_changed.attributes @@ -659,9 +632,7 @@ def _on_room_event(self, event: proto_room.RoomEvent): participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) participant._info.attributes.clear() - participant._info.attributes.update( - (entry.key, entry.value) for entry in attributes - ) + participant._info.attributes.update((entry.key, entry.value) for entry in attributes) self.emit( "participant_attributes_changed", changed_attributes, @@ -737,9 +708,7 @@ def _on_room_event(self, event: proto_room.RoomEvent): identity = event.e2ee_state_changed.participant_identity e2ee_state = event.e2ee_state_changed.state # TODO: pass participant identity - self.emit( - "e2ee_state_changed", self._retrieve_participant(identity), e2ee_state - ) + self.emit("e2ee_state_changed", self._retrieve_participant(identity), e2ee_state) elif which == "connection_state_changed": connection_state = event.connection_state_changed.state self._connection_state = connection_state @@ -758,9 +727,7 @@ def _on_room_event(self, event: proto_room.RoomEvent): event.stream_header_received.participant_identity, ) elif which == "stream_chunk_received": - task = asyncio.create_task( - self._handle_stream_chunk(event.stream_chunk_received.chunk) - ) + task = asyncio.create_task(self._handle_stream_chunk(event.stream_chunk_received.chunk)) self._data_stream_tasks.add(task) task.add_done_callback(self._data_stream_tasks.discard) @@ -835,9 +802,7 @@ async def _drain_data_stream_tasks(self) -> None: task.cancel() await asyncio.gather(*self._data_stream_tasks, return_exceptions=True) - def _retrieve_remote_participant( - self, identity: str - ) -> Optional[RemoteParticipant]: + def _retrieve_remote_participant(self, identity: str) -> Optional[RemoteParticipant]: """Retrieve a remote participant by identity""" return self._remote_participants.get(identity, None) diff --git a/livekit-rtc/livekit/rtc/rpc.py b/livekit-rtc/livekit/rtc/rpc.py index 10e4d6d8..02a5b7f4 100644 --- a/livekit-rtc/livekit/rtc/rpc.py +++ b/livekit-rtc/livekit/rtc/rpc.py @@ -117,8 +117,6 @@ def _to_proto(self) -> proto_rpc.RpcError: return proto_rpc.RpcError(code=self.code, message=self.message, data=self.data) @classmethod - def _built_in( - cls, code: "RpcError.ErrorCode", data: Optional[str] = None - ) -> "RpcError": + def _built_in(cls, code: "RpcError.ErrorCode", data: Optional[str] = None) -> "RpcError": message = cls.ErrorMessage[code] return cls(code, message, data) diff --git a/livekit-rtc/livekit/rtc/synchronizer.py b/livekit-rtc/livekit/rtc/synchronizer.py index dda1ea0c..8c5f6347 100644 --- a/livekit-rtc/livekit/rtc/synchronizer.py +++ b/livekit-rtc/livekit/rtc/synchronizer.py @@ -48,9 +48,7 @@ def __init__( self._last_video_time: float = 0 self._last_audio_time: float = 0 - self._video_queue_max_size = int( - self._video_fps * self._video_queue_size_ms / 1000 - ) + self._video_queue_max_size = int(self._video_fps * self._video_queue_size_ms / 1000) if self._video_queue_size_ms > 0: # ensure queue is bounded if queue size is specified self._video_queue_max_size = max(1, self._video_queue_max_size) @@ -128,9 +126,7 @@ def last_audio_time(self) -> float: class _FPSController: - def __init__( - self, *, expected_fps: float, max_delay_tolerance_ms: float = 300 - ) -> None: + def __init__(self, *, expected_fps: float, max_delay_tolerance_ms: float = 300) -> None: """Controls frame rate by adjusting sleep time based on actual FPS. Usage: @@ -178,16 +174,12 @@ async def wait_next_process(self) -> None: else: # check if significantly behind schedule if -sleep_time > self._max_delay_tolerance_secs: - logger.warning( - f"Frame capture was behind schedule for {-sleep_time * 1000:.2f} ms" - ) + logger.warning(f"Frame capture was behind schedule for {-sleep_time * 1000:.2f} ms") self._next_frame_time = time.perf_counter() def after_process(self) -> None: """Update timing information after processing a frame.""" - assert self._next_frame_time is not None, ( - "wait_next_process must be called first" - ) + assert self._next_frame_time is not None, "wait_next_process must be called first" # update timing information self._send_timestamps.append(time.perf_counter()) diff --git a/livekit-rtc/livekit/rtc/video_frame.py b/livekit-rtc/livekit/rtc/video_frame.py index cf34a8bd..b3fd65a5 100644 --- a/livekit-rtc/livekit/rtc/video_frame.py +++ b/livekit-rtc/livekit/rtc/video_frame.py @@ -111,9 +111,7 @@ def _from_owned_info(owned_info: proto_video.OwnedVideoBuffer) -> "VideoFrame": def _proto_info(self) -> proto_video.VideoBufferInfo: info = proto_video.VideoBufferInfo() addr = get_address(self.data) - info.components.extend( - _get_plane_infos(addr, self.type, self.width, self.height) - ) + info.components.extend(_get_plane_infos(addr, self.type, self.width, self.height)) info.width = self.width info.height = self.height info.type = self.type @@ -146,9 +144,7 @@ def get_plane(self, plane_nth: int) -> Optional[memoryview]: Optional[memoryview]: A memoryview of the specified plane's data, or None if the index is out of bounds for the format. """ - plane_infos = _get_plane_infos( - get_address(self.data), self.type, self.width, self.height - ) + plane_infos = _get_plane_infos(get_address(self.data), self.type, self.width, self.height) if plane_nth >= len(plane_infos): return None @@ -241,9 +237,7 @@ def validate_video_frame(value: Any) -> "VideoFrame": core_schema.no_info_plain_validator_function(validate_video_frame), ] ), - python_schema=core_schema.no_info_plain_validator_function( - validate_video_frame - ), + python_schema=core_schema.no_info_plain_validator_function(validate_video_frame), serialization=core_schema.plain_serializer_function_ser_schema( lambda instance: { "width": instance.width, @@ -265,9 +259,7 @@ def _component_info( return cmpt -def _get_plane_length( - type: proto_video.VideoBufferType.ValueType, width: int, height: int -) -> int: +def _get_plane_length(type: proto_video.VideoBufferType.ValueType, width: int, height: int) -> int: """ Return the size in bytes of a participant video buffer type based on its size (This ignores the strides) """ @@ -311,32 +303,22 @@ def _get_plane_infos( chroma_width = (width + 1) // 2 chroma_height = (height + 1) // 2 y = _component_info(addr, width, width * height) - u = _component_info( - y.data_ptr + y.size, chroma_width, chroma_width * chroma_height - ) - v = _component_info( - u.data_ptr + u.size, chroma_width, chroma_width * chroma_height - ) + u = _component_info(y.data_ptr + y.size, chroma_width, chroma_width * chroma_height) + v = _component_info(u.data_ptr + u.size, chroma_width, chroma_width * chroma_height) return [y, u, v] elif type == proto_video.VideoBufferType.I420A: chroma_width = (width + 1) // 2 chroma_height = (height + 1) // 2 y = _component_info(addr, width, width * height) - u = _component_info( - y.data_ptr + y.size, chroma_width, chroma_width * chroma_height - ) - v = _component_info( - u.data_ptr + u.size, chroma_width, chroma_width * chroma_height - ) + u = _component_info(y.data_ptr + y.size, chroma_width, chroma_width * chroma_height) + v = _component_info(u.data_ptr + u.size, chroma_width, chroma_width * chroma_height) a = _component_info(v.data_ptr + v.size, width, width * height) return [y, u, v, a] elif type == proto_video.VideoBufferType.I422: chroma_width = (width + 1) // 2 y = _component_info(addr, width, width * height) u = _component_info(y.data_ptr + y.size, chroma_width, chroma_width * height) - v = _component_info( - u.data_ptr + u.size + u.size, chroma_width, chroma_width * height - ) + v = _component_info(u.data_ptr + u.size + u.size, chroma_width, chroma_width * height) return [y, u, v] elif type == proto_video.VideoBufferType.I444: y = _component_info(addr, width, width * height) @@ -347,12 +329,8 @@ def _get_plane_infos( chroma_width = (width + 1) // 2 chroma_height = (height + 1) // 2 y = _component_info(addr, width * 2, width * height * 2) - u = _component_info( - y.data_ptr + y.size, chroma_width * 2, chroma_width * chroma_height * 2 - ) - v = _component_info( - u.data_ptr + u.size, chroma_width * 2, chroma_width * chroma_height * 2 - ) + u = _component_info(y.data_ptr + y.size, chroma_width * 2, chroma_width * chroma_height * 2) + v = _component_info(u.data_ptr + u.size, chroma_width * 2, chroma_width * chroma_height * 2) return [y, u, v] elif type == proto_video.VideoBufferType.NV12: chroma_width = (width + 1) // 2 diff --git a/livekit-rtc/livekit/rtc/video_stream.py b/livekit-rtc/livekit/rtc/video_stream.py index d47970d3..916328aa 100644 --- a/livekit-rtc/livekit/rtc/video_stream.py +++ b/livekit-rtc/livekit/rtc/video_stream.py @@ -122,12 +122,8 @@ def _create_owned_stream_from_participant( ) -> Any: req = proto_ffi.FfiRequest() video_stream_from_participant = req.video_stream_from_participant - video_stream_from_participant.participant_handle = ( - participant._ffi_handle.handle - ) - video_stream_from_participant.type = ( - proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE - ) + video_stream_from_participant.participant_handle = participant._ffi_handle.handle + video_stream_from_participant.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE video_stream_from_participant.track_source = track_source video_stream_from_participant.normalize_stride = True if self._format is not None: diff --git a/livekit-rtc/tests/test_chat.py b/livekit-rtc/tests/test_chat.py index 161bf9be..0d2011d9 100644 --- a/livekit-rtc/tests/test_chat.py +++ b/livekit-rtc/tests/test_chat.py @@ -20,9 +20,9 @@ def test_message_serialization(): msg2 = ChatMessage.from_jsondict(json.loads(json.dumps(data))) assert msg2.message == msg.message, "message should be the same" assert msg2.id == msg.id, "id should be the same" - assert int(msg2.timestamp.timestamp() / 1000) == int( - msg.timestamp.timestamp() / 1000 - ), "timestamp should be the same" + assert int(msg2.timestamp.timestamp() / 1000) == int(msg.timestamp.timestamp() / 1000), ( + "timestamp should be the same" + ) assert not msg2.deleted, "not deleted" # deletion is handled diff --git a/ruff.toml b/ruff.toml index c1dc76a4..8d44a7dc 100644 --- a/ruff.toml +++ b/ruff.toml @@ -3,7 +3,7 @@ exclude = [ "livekit-protocol/livekit/protocol" ] -line-length = 88 +line-length = 100 indent-width = 4 target-version = "py39"