Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/game/game_state_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
are atomic).
"""

import asyncio
import dataclasses

from loguru import logger
Expand Down Expand Up @@ -42,6 +43,7 @@ class GameStateController:
def __init__(self):
self.state_device_stack: list[tuple[sd.StateDevice, StackState]] = []
self.add_state_device(room.room_manager.get_room(cache.get_cache()["player_location"]))
self.input_lock: asyncio.Lock = asyncio.Lock()

# Built-ins

Expand Down Expand Up @@ -121,6 +123,23 @@ def deliver_input(self, user_input: any) -> bool:

return False

async def deliver_input_async(self, user_input: any) -> bool:
"""
Deliver the user's input to the top sd.StateDevice. Returns True if the
device accepts the input.

Args:
user_input: Input that the user delivers to the service via the API

Returns: True if the input is accepted, False otherwise.
"""
async with self.input_lock:
if self._get_state_device().validate_input(user_input):
self._get_state_device().input(user_input)
return True

return False

def add_state_device(self, device: sd.StateDevice) -> None:
"""
Appends a sd.StateDevice to the top of the state_device_stack
Expand Down
30 changes: 29 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import game

from fastapi import FastAPI
from fastapi import FastAPI, WebSocket
from loguru import logger

from timeit import default_timer
Expand All @@ -27,6 +27,34 @@ def root_put(user_input: int | str):
return r


@tx_engine.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
"""
An interactive websocket endpoint. Used to communicate in real time with clients instead of in blocking-series.
Functionally equivalent to get/put.

Args:
websocket: The active websocket object used to manage the connection

Returns: None
"""

await websocket.accept()
while True:
data = await websocket.receive_json()
start = default_timer()
r = game.state_device_controller.deliver_input(data)
duration = default_timer() - start
logger.info(f"Completed input submission in {duration}s")

start = default_timer()
r = game.state_device_controller.get_current_frame()
duration = default_timer() - start
logger.info(f"Completed state retrieval in {duration}s")

await websocket.send_text(r.model_dump_json())


@tx_engine.get("/cache")
def cache(cache_path: str):
from game.cache import get_cache
Expand Down
133 changes: 133 additions & 0 deletions src/viewer/ws_viewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import json
import os
import asyncio
from websockets.asyncio.client import connect

from loguru import logger
from rich import print


def formatting_to_tags(tags: list[str], opening_tag: bool = None, closing_tag: bool = None) -> str:
buf = ""
if opening_tag:
for tag in tags:
buf = buf + f"[{tag}]"

elif closing_tag:
for tag in tags:
buf = buf + f"[/{tag}]"

return buf


def format_string(content: str, tags: list[str]) -> str:
return formatting_to_tags(tags, opening_tag=True) + content + formatting_to_tags(tags, closing_tag=True)


def parse_content(content: list) -> str:
buf = ""
for element in content:
if type(element) is str:
buf = buf + element
elif type(element) is dict:
buf = (
buf
+ formatting_to_tags(element["formatting"], opening_tag=True)
+ element["value"]
+ formatting_to_tags(element["formatting"], closing_tag=True)
)
return buf


class WebsocketViewer:
"""
A primitive TXEngine client built for websocket connections.
"""

def __init__(self):
self._ip = input("Enter ip address (default: localhost)")
if self._ip.strip() == "":
self._ip = "localhost"

@classmethod
def clear(cls):
os.system("cls")

def get_text_header(self, tx_engine_response: dict) -> str:
input_type = (
tx_engine_response["input_type"]
if type(tx_engine_response["input_type"]) is str
else tx_engine_response["input_type"][0]
)
input_range = tx_engine_response["input_range"]

formatting = ["italic"]

if input_type == "int":
hdr = f"Enter a number between ({input_range['min']} and {input_range['max']}):"

elif input_type == "none":
hdr = "Press any key:"

elif input_type == "str":
hdr = "Enter a string: "

elif input_type == "affirmative":
hdr = "Enter y, n, yes, or no:"
elif input_type == "any":
hdr = "Press any key..."
else:
logger.error(f"Unexpected input type: {input_type}")
logger.debug(f"Failed frame: {str(tx_engine_response)}")
raise ValueError(f"Unexpected input type: {input_type}")

return format_string(hdr, formatting)

def display(self, tx_engine_response: dict):
"""
Primitively print GET results
"""
self.clear()

def entity_to_str(entity_dict: dict[str, any]) -> str:
entity_name = entity_dict["name"]
primary_resource_name = entity_dict["primary_resource_name"]
primary_resource_value = entity_dict["primary_resource_val"]
primary_resource_max = entity_dict["primary_resource_max"]
return f"{entity_name}\n{primary_resource_name}]: [{primary_resource_value}/{primary_resource_max}]"

if "enemies" in tx_engine_response["components"]:
print("ENEMIES")
for enemy in tx_engine_response["components"]["enemies"]:
print(entity_to_str(enemy))

if "allies" in tx_engine_response["components"]:
print("ALLIES")
for ally in tx_engine_response["components"]["allies"]:
print(entity_to_str(ally))

print(parse_content(tx_engine_response["components"]["content"]))

if "options" in tx_engine_response["components"] and type(tx_engine_response["components"]["options"]) is list:
for idx, opt in enumerate(tx_engine_response["components"]["options"]):
print(f"[{idx}] {parse_content(opt)}")

print(self.get_text_header(tx_engine_response))

async def client(self) -> None:
async with connect(f"ws://{self._ip}:8000") as websocket:
await websocket.send("{}") # Ping to get a baseline response
response = await websocket.recv()
while True:
self.clear()
self.display(json.loads(response))
user_input = input()
if user_input.strip() == "":
user_input = "{}"
await websocket.send(user_input)
response = await websocket.recv()


if __name__ == "__main__":
client = WebsocketViewer()
asyncio.run(client.client())