Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions examples/agent_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ async def create_token_with_agent_dispatch() -> str:
.with_grants(api.VideoGrants(room_join=True, room=room_name))
.with_room_config(
api.RoomConfiguration(
agents=[
api.RoomAgentDispatch(
agent_name="test-agent", metadata="my_metadata"
)
],
agents=[api.RoomAgentDispatch(agent_name="test-agent", metadata="my_metadata")],
),
)
.to_jwt()
Expand Down
16 changes: 4 additions & 12 deletions examples/basic_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
async def main(room: rtc.Room) -> None:
@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant) -> None:
logging.info(
"participant connected: %s %s", participant.sid, participant.identity
)
logging.info("participant connected: %s %s", participant.sid, participant.identity)

@room.on("participant_disconnected")
def on_participant_disconnected(participant: rtc.RemoteParticipant):
logging.info(
"participant disconnected: %s %s", participant.sid, participant.identity
)
logging.info("participant disconnected: %s %s", participant.sid, participant.identity)

@room.on("local_track_published")
def on_local_track_published(
Expand Down Expand Up @@ -78,9 +74,7 @@ def on_track_unsubscribed(
logging.info("track unsubscribed: %s", publication.sid)

@room.on("track_muted")
def on_track_muted(
publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant
):
def on_track_muted(publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
logging.info("track muted: %s", publication.sid)

@room.on("track_unmuted")
Expand All @@ -94,9 +88,7 @@ def on_data_received(data: rtc.DataPacket):
logging.info("received data from %s: %s", data.participant.identity, data.data)

@room.on("connection_quality_changed")
def on_connection_quality_changed(
participant: rtc.Participant, quality: rtc.ConnectionQuality
):
def on_connection_quality_changed(participant: rtc.Participant, quality: rtc.ConnectionQuality):
logging.info("connection quality changed for %s", participant.identity)

@room.on("track_subscription_failed")
Expand Down
20 changes: 5 additions & 15 deletions examples/data-streams/data_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,12 @@ async def greetParticipant(identity: str):
topic="files",
)

async def on_chat_message_received(
reader: rtc.TextStreamReader, participant_identity: str
):
async def on_chat_message_received(reader: rtc.TextStreamReader, participant_identity: str):
full_text = await reader.read_all()
logger.info(
"Received chat message from %s: '%s'", participant_identity, full_text
)
logger.info("Received chat message from %s: '%s'", participant_identity, full_text)

async def on_welcome_image_received(
reader: rtc.ByteStreamReader, participant_identity: str
):
logger.info(
"Received image from %s: '%s'", participant_identity, reader.info["name"]
)
async def on_welcome_image_received(reader: rtc.ByteStreamReader, participant_identity: str):
logger.info("Received image from %s: '%s'", participant_identity, reader.info["name"])
with open(reader.info["name"], mode="wb") as f:
async for chunk in reader:
f.write(chunk)
Expand All @@ -49,9 +41,7 @@ async def on_welcome_image_received(

@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
logger.info(
"participant connected: %s %s", participant.sid, participant.identity
)
logger.info("participant connected: %s %s", participant.sid, participant.identity)
asyncio.create_task(greetParticipant(participant.identity))

room.set_text_stream_handler(
Expand Down
4 changes: 1 addition & 3 deletions examples/e2ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ async def draw_cube(source: rtc.VideoSource):

async def main(room: rtc.Room):
@room.on("e2ee_state_changed")
def on_e2ee_state_changed(
participant: rtc.Participant, state: rtc.EncryptionState
) -> None:
def on_e2ee_state_changed(participant: rtc.Participant, state: rtc.EncryptionState) -> None:
logging.info("e2ee state changed: %s %s", participant.identity, state)

logging.info("connecting to %s", URL)
Expand Down
8 changes: 2 additions & 6 deletions examples/face_landmark/face_landmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def draw_landmarks_on_image(rgb_image, detection_result):
face_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
face_landmarks_proto.landmark.extend(
[
landmark_pb2.NormalizedLandmark(
x=landmark.x, y=landmark.y, z=landmark.z
)
landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z)
for landmark in face_landmarks
]
)
Expand Down Expand Up @@ -113,9 +111,7 @@ async def frame_loop(video_stream: rtc.VideoStream) -> None:
arr = arr.reshape((buffer.height, buffer.width, 3))

mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=arr)
detection_result = landmarker.detect_for_video(
mp_image, frame_event.timestamp_us
)
detection_result = landmarker.detect_for_video(mp_image, frame_event.timestamp_us)

draw_landmarks_on_image(arr, detection_result)

Expand Down
4 changes: 1 addition & 3 deletions examples/room_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ async def main():

@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
logger.info(
"participant connected: %s %s", participant.sid, participant.identity
)
logger.info("participant connected: %s %s", participant.sid, participant.identity)

async def receive_frames(stream: rtc.VideoStream):
async for frame in stream:
Expand Down
20 changes: 5 additions & 15 deletions examples/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL:
raise ValueError(
"Missing required environment variables. Please check your .env.local file."
)
raise ValueError("Missing required environment variables. Please check your .env.local file.")


async def main():
Expand Down Expand Up @@ -82,9 +80,7 @@ async def main():
finally:
# Clean up all rooms
print("Disconnecting all participants...")
await asyncio.gather(
*(room.disconnect() for room in rooms), return_exceptions=True
)
await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True)
print("Cleanup complete")


Expand Down Expand Up @@ -121,9 +117,7 @@ async def divide_method(
json_data = json.loads(data.payload)
dividend = json_data["dividend"]
divisor = json_data["divisor"]
print(
f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}."
)
print(f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}.")

result = dividend / divisor
return json.dumps({"result": result})
Expand All @@ -132,9 +126,7 @@ async def divide_method(
async def long_calculation_method(
data: RpcInvocationData,
):
print(
f"[Math Genius] Starting a very long calculation for {data.caller_identity}"
)
print(f"[Math Genius] Starting a very long calculation for {data.caller_identity}")
print(
f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds"
)
Expand Down Expand Up @@ -202,9 +194,7 @@ async def perform_divide(room: rtc.Room):
print(f"[Caller] The result is {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR:
print(
"[Caller] Aww something went wrong with that one, lets try something else."
)
print("[Caller] Aww something went wrong with that one, lets try something else.")
else:
print(f"[Caller] RPC call failed with unexpected RpcError: {error}")
except Exception as error:
Expand Down
12 changes: 3 additions & 9 deletions examples/video-stream/audio_wave.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ def draw_timestamp(self, canvas: np.ndarray, fps: float):
y = int((height - text_height) * 0.4 + baseline)
cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness)

def draw_current_wave(
self, canvas: np.ndarray, audio_samples: np.ndarray
) -> np.ndarray:
def draw_current_wave(self, canvas: np.ndarray, audio_samples: np.ndarray) -> np.ndarray:
"""Draw the current waveform and return the current values"""
height, width = canvas.shape[:2]
center_y = height // 2 + 100
Expand Down Expand Up @@ -136,9 +134,7 @@ def draw_volume_history(self, canvas: np.ndarray, current_volume: float):
center_y = height // 2

self.volume_history.append(current_volume)
cv2.line(
canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1
)
cv2.line(canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1)

volume_x = np.linspace(0, width, len(self.volume_history), dtype=int)
volume_y = center_y - 250 + (np.array(self.volume_history) * 200)
Expand All @@ -158,9 +154,7 @@ async def video_generator(
input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]],
av_sync: rtc.AVSynchronizer, # only used for drawing the actual fps on the video
) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]:
canvas = np.zeros(
(media_info.video_height, media_info.video_width, 4), dtype=np.uint8
)
canvas = np.zeros((media_info.video_height, media_info.video_width, 4), dtype=np.uint8)
canvas.fill(255)

def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame:
Expand Down
4 changes: 1 addition & 3 deletions examples/video-stream/video_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
try:
import av
except ImportError:
raise RuntimeError(
"av is required to run this example, install with `pip install av`"
)
raise RuntimeError("av is required to run this example, install with `pip install av`")

# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set

Expand Down
4 changes: 1 addition & 3 deletions livekit-api/livekit/api/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@


class Service(ABC):
def __init__(
self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str
):
def __init__(self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str):
self._client = TwirpClient(session, host, "livekit")
self.api_key = api_key
self.api_secret = api_secret
Expand Down
24 changes: 6 additions & 18 deletions livekit-api/livekit/api/access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ def asdict(self) -> dict:
claims = dataclasses.asdict(
self,
dict_factory=lambda items: {
snake_to_lower_camel(k): v
for k, v in items
if v is not None and v != ""
snake_to_lower_camel(k): v for k, v in items if v is not None and v != ""
},
)
if self.room_config:
Expand Down Expand Up @@ -178,13 +176,9 @@ def to_jwt(self) -> str:
{
"sub": self.identity,
"iss": self.api_key,
"nbf": calendar.timegm(
datetime.datetime.now(datetime.timezone.utc).utctimetuple()
),
"nbf": calendar.timegm(datetime.datetime.now(datetime.timezone.utc).utctimetuple()),
"exp": calendar.timegm(
(
datetime.datetime.now(datetime.timezone.utc) + self.ttl
).utctimetuple()
(datetime.datetime.now(datetime.timezone.utc) + self.ttl).utctimetuple()
),
}
)
Expand Down Expand Up @@ -220,16 +214,12 @@ def verify(self, token: str) -> Claims:

video_dict = claims.get("video", dict())
video_dict = {camel_to_snake(k): v for k, v in video_dict.items()}
video_dict = {
k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__
}
video_dict = {k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__}
video = VideoGrants(**video_dict)

sip_dict = claims.get("sip", dict())
sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()}
sip_dict = {
k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__
}
sip_dict = {k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__}
sip = SIPGrants(**sip_dict)

grant_claims = Claims(
Expand Down Expand Up @@ -259,6 +249,4 @@ def camel_to_snake(t: str):


def snake_to_lower_camel(t: str):
return "".join(
word.capitalize() if i else word for i, word in enumerate(t.split("_"))
)
return "".join(word.capitalize() if i else word for i, word in enumerate(t.split("_")))
8 changes: 2 additions & 6 deletions livekit-api/livekit/api/agent_dispatch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ class AgentDispatchService(Service):
```
"""

def __init__(
self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
):
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)

async def create_dispatch(self, req: CreateAgentDispatchRequest) -> AgentDispatch:
Expand Down Expand Up @@ -89,9 +87,7 @@ async def list_dispatch(self, room_name: str) -> list[AgentDispatch]:
)
return list(res.agent_dispatches)

async def get_dispatch(
self, dispatch_id: str, room_name: str
) -> Optional[AgentDispatch]:
async def get_dispatch(self, dispatch_id: str, room_name: str) -> Optional[AgentDispatch]:
"""Get an Agent dispatch by ID

Args:
Expand Down
16 changes: 4 additions & 12 deletions livekit-api/livekit/api/egress_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,10 @@ class EgressService(Service):
Also see https://docs.livekit.io/home/egress/overview/
"""

def __init__(
self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
):
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)

async def start_room_composite_egress(
self, start: RoomCompositeEgressRequest
) -> EgressInfo:
async def start_room_composite_egress(self, start: RoomCompositeEgressRequest) -> EgressInfo:
return await self._client.request(
SVC,
"StartRoomCompositeEgress",
Expand All @@ -58,9 +54,7 @@ async def start_web_egress(self, start: WebEgressRequest) -> EgressInfo:
EgressInfo,
)

async def start_participant_egress(
self, start: ParticipantEgressRequest
) -> EgressInfo:
async def start_participant_egress(self, start: ParticipantEgressRequest) -> EgressInfo:
return await self._client.request(
SVC,
"StartParticipantEgress",
Expand All @@ -69,9 +63,7 @@ async def start_participant_egress(
EgressInfo,
)

async def start_track_composite_egress(
self, start: TrackCompositeEgressRequest
) -> EgressInfo:
async def start_track_composite_egress(self, start: TrackCompositeEgressRequest) -> EgressInfo:
return await self._client.request(
SVC,
"StartTrackCompositeEgress",
Expand Down
4 changes: 1 addition & 3 deletions livekit-api/livekit/api/ingress_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class IngressService(Service):
Also see https://docs.livekit.io/home/ingress/overview/
"""

def __init__(
self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
):
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)

async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo:
Expand Down
4 changes: 1 addition & 3 deletions livekit-api/livekit/api/livekit_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ def __init__(
self._ingress = IngressService(self._session, url, api_key, api_secret)
self._egress = EgressService(self._session, url, api_key, api_secret)
self._sip = SipService(self._session, url, api_key, api_secret)
self._agent_dispatch = AgentDispatchService(
self._session, url, api_key, api_secret
)
self._agent_dispatch = AgentDispatchService(self._session, url, api_key, api_secret)

@property
def agent_dispatch(self) -> AgentDispatchService:
Expand Down
Loading
Loading