diff --git a/app/config/config.py b/app/config/config.py index 3cc21cc1..232cabd8 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -66,6 +66,12 @@ def __init__(self): self.cdp_api_key_name = self.load("CDP_API_KEY_NAME") self.cdp_api_key_private_key = self.load("CDP_API_KEY_PRIVATE_KEY") self.openai_api_key = self.load("OPENAI_API_KEY") + self.slack_token = self.load("SLACK_TOKEN") + self.slack_channel = self.load("SLACK_CHANNEL") + self.tg_base_url = self.load("TG_BASE_URL") + self.tg_server_host = self.load("TG_SERVER_HOST", "127.0.0.1") + self.tg_server_port = self.load("TG_SERVER_PORT", "8081") + self.tg_new_agent_poll_interval = self.load("TG_NEW_AGENT_POLL_INTERVAL", "60") self.twitter_endpoint_interval = int( self.load("TWITTER_ENDPOINT_INTERVAL", "15") ) # in minutes diff --git a/app/entrypoints/tg.py b/app/entrypoints/tg.py new file mode 100644 index 00000000..f18c7e51 --- /dev/null +++ b/app/entrypoints/tg.py @@ -0,0 +1,80 @@ +import asyncio +import logging +import signal +import sys + +from sqlmodel import Session, select + +from app.config.config import config +from app.models.db import get_engine, init_db +from app.models.agent import Agent +from tg.bot import pool +from tg.bot.pool import BotPool + +logger = logging.getLogger(__name__) + + +class AgentScheduler: + def __init__(self, bot_pool): + self.bot_pool = bot_pool + + def check_new_bots(self): + with Session(get_engine()) as db: + # Get all telegram agents + agents = db.exec( + select(Agent).where( + Agent.telegram_enabled, + ) + ).all() + + new_bots = [] + for agent in agents: + if agent.telegram_config["token"] not in pool._bots: + agent.telegram_config["agent_id"] = agent.id + new_bots.append(agent.telegram_config) + logger.info("New agent with id {id} found...".format(id=agent.id)) + + return new_bots + + async def start(self, interval): + logger.info("New agent addition tracking started...") + while True: + logger.info("check for new bots...") + await asyncio.sleep(interval) + if self.check_new_bots() is not None: + for new_bot in self.check_new_bots(): + await self.bot_pool.init_new_bot( + new_bot["agent_id"], new_bot["kind"], new_bot["token"] + ) + + +def run_telegram_server() -> None: + # Initialize database connection + init_db(**config.db) + + # Signal handler for graceful shutdown + def signal_handler(signum, frame): + logger.info("Received termination signal. Shutting down gracefully...") + scheduler.shutdown() + sys.exit(0) + + # Register signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Initialize bot pool...") + bot_pool = BotPool(config.tg_base_url) + + bot_pool.init_god_bot() + bot_pool.init_all_dispatchers() + + scheduler = AgentScheduler(bot_pool) + + loop = asyncio.new_event_loop() + loop.create_task(scheduler.start(int(config.tg_new_agent_poll_interval))) + + bot_pool.start(loop, config.tg_server_host, int(config.tg_server_port)) + + +if __name__ == "__main__": + run_telegram_server() diff --git a/app/models/agent.py b/app/models/agent.py index 42039773..d4e94b3e 100644 --- a/app/models/agent.py +++ b/app/models/agent.py @@ -34,6 +34,11 @@ class Agent(SQLModel, table=True): # twitter skills require config, but not require twitter_enabled flag. # As long as twitter_skills is not empty, the corresponding skills will be loaded. twitter_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) + telegram_enabled: bool = Field(default=False) + telegram_config: Optional[dict] = Field(sa_column=Column(JSONB, nullable=True)) + # twitter skills require config, but not require twitter_enabled flag. + # As long as twitter_skills is not empty, the corresponding skills will be loaded. + telegram_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) # crestal skills crestal_skills: Optional[List[str]] = Field(sa_column=Column(ARRAY(String))) # skills not require config diff --git a/debug/create_agent.py b/debug/create_agent.py index 5ffa0547..c6be4f61 100644 --- a/debug/create_agent.py +++ b/debug/create_agent.py @@ -26,6 +26,9 @@ twitter_enabled=False, twitter_config={}, # Ensure this dict structure aligns with expected config format twitter_skills=[], # Confirm if no specific Twitter skills are to be enabled + telegram_enabled=False, + telegram_config={}, # Ensure this dict structure aligns with expected config format + telegram_skills=[], # Confirm if no specific Telegram skills are to be enabled common_skills=[], # Confirm if no common skills are to be added initially ) diff --git a/example.env b/example.env index b1bd7f58..5dd77c50 100644 --- a/example.env +++ b/example.env @@ -11,5 +11,9 @@ DB_PASSWORD= DB_NAME= DB_AUTO_MIGRATE=true +TG_TOKEN_GOD_BOT= +TG_BASE_URL= +TG_NEW_AGENT_POLL_INTERVAL= + CDP_API_KEY_NAME= CDP_API_KEY_PRIVATE_KEY= diff --git a/poetry.lock b/poetry.lock index e5addfd9..4c8d60d3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,46 @@ # This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. +[[package]] +name = "aiofiles" +version = "24.1.0" +description = "File support for asyncio." +optional = false +python-versions = ">=3.8" +files = [ + {file = "aiofiles-24.1.0-py3-none-any.whl", hash = "sha256:b4ec55f4195e3eb5d7abd1bf7e061763e864dd4954231fb8539a0ef8bb8260e5"}, + {file = "aiofiles-24.1.0.tar.gz", hash = "sha256:22a075c9e5a3810f0c2e48f3008c94d68c65d763b9b03857924c99e57355166c"}, +] + +[[package]] +name = "aiogram" +version = "3.16.0" +description = "Modern and fully asynchronous framework for Telegram Bot API" +optional = false +python-versions = ">=3.9" +files = [ + {file = "aiogram-3.16.0-py3-none-any.whl", hash = "sha256:c92a52aff032217bce5df4089a8ff8f0b86ce0533126c22f3fe55e6b1b230a66"}, + {file = "aiogram-3.16.0.tar.gz", hash = "sha256:b7fd7a6c6434d831472c1d6d971e23348966a9aeb71a1c0e575a01990390e0f1"}, +] + +[package.dependencies] +aiofiles = ">=23.2.1,<24.2" +aiohttp = ">=3.9.0,<3.12" +certifi = ">=2023.7.22" +magic-filter = ">=1.0.12,<1.1" +pydantic = ">=2.4.1,<2.11" +typing-extensions = ">=4.7.0,<=5.0" + +[package.extras] +cli = ["aiogram-cli (>=1.1.0,<2.0.0)"] +dev = ["black (>=24.4.2,<24.5.0)", "isort (>=5.13.2,<5.14.0)", "motor-types (>=1.0.0b4,<1.1.0)", "mypy (>=1.10.0,<1.11.0)", "packaging (>=24.1,<25.0)", "pre-commit (>=3.5,<4.0)", "ruff (>=0.5.1,<0.6.0)", "toml (>=0.10.2,<0.11.0)"] +docs = ["furo (>=2024.8.6,<2024.9.0)", "markdown-include (>=0.8.1,<0.9.0)", "pygments (>=2.18.0,<2.19.0)", "pymdown-extensions (>=10.3,<11.0)", "sphinx (>=8.0.2,<8.1.0)", "sphinx-autobuild (>=2024.9.3,<2024.10.0)", "sphinx-copybutton (>=0.5.2,<0.6.0)", "sphinx-intl (>=2.2.0,<2.3.0)", "sphinx-substitution-extensions (>=2024.8.6,<2024.9.0)", "sphinxcontrib-towncrier (>=0.4.0a0,<0.5.0)", "towncrier (>=24.8.0,<24.9.0)"] +fast = ["aiodns (>=3.0.0)", "uvloop (>=0.17.0)", "uvloop (>=0.21.0)"] +i18n = ["babel (>=2.13.0,<2.14.0)"] +mongo = ["motor (>=3.3.2,<3.7.0)"] +proxy = ["aiohttp-socks (>=0.8.3,<0.9.0)"] +redis = ["redis[hiredis] (>=5.0.1,<5.1.0)"] +test = ["aresponses (>=2.1.6,<2.2.0)", "pycryptodomex (>=3.19.0,<3.20.0)", "pytest (>=7.4.2,<7.5.0)", "pytest-aiohttp (>=1.0.5,<1.1.0)", "pytest-asyncio (>=0.21.1,<0.22.0)", "pytest-cov (>=4.1.0,<4.2.0)", "pytest-html (>=4.0.2,<4.1.0)", "pytest-lazy-fixture (>=0.6.3,<0.7.0)", "pytest-mock (>=3.12.0,<3.13.0)", "pytest-mypy (>=0.10.3,<0.11.0)", "pytz (>=2023.3,<2024.0)"] + [[package]] name = "aiohappyeyeballs" version = "2.4.4" @@ -416,13 +457,13 @@ files = [ [[package]] name = "botocore" -version = "1.35.87" +version = "1.35.90" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.87-py3-none-any.whl", hash = "sha256:81cf84f12030d9ab3829484b04765d5641697ec53c2ac2b3987a99eefe501692"}, - {file = "botocore-1.35.87.tar.gz", hash = "sha256:3062d073ce4170a994099270f469864169dc1a1b8b3d4a21c14ce0ae995e0f89"}, + {file = "botocore-1.35.90-py3-none-any.whl", hash = "sha256:51dcbe1b32e2ac43dac17091f401a00ce5939f76afe999081802009cce1e92e4"}, + {file = "botocore-1.35.90.tar.gz", hash = "sha256:f007f58e8e3c1ad0412a6ddfae40ed92a7bca571c068cb959902bcf107f2ae48"}, ] [package.dependencies] @@ -1989,6 +2030,20 @@ requests-toolbelt = ">=1.0.0,<2.0.0" compression = ["zstandard (>=0.23.0,<0.24.0)"] langsmith-pyo3 = ["langsmith-pyo3 (>=0.1.0rc2,<0.2.0)"] +[[package]] +name = "magic-filter" +version = "1.0.12" +description = "" +optional = false +python-versions = ">=3.7" +files = [ + {file = "magic_filter-1.0.12-py3-none-any.whl", hash = "sha256:e5929e544f310c2b1f154318db8c5cdf544dd658efa998172acd2e4ba0f6c6a6"}, + {file = "magic_filter-1.0.12.tar.gz", hash = "sha256:4751d0b579a5045d1dc250625c4c508c18c3def5ea6afaf3957cb4530d03f7f9"}, +] + +[package.extras] +dev = ["black (>=22.8.0,<22.9.0)", "flake8 (>=5.0.4,<5.1.0)", "isort (>=5.11.5,<5.12.0)", "mypy (>=1.4.1,<1.5.0)", "pre-commit (>=2.20.0,<2.21.0)", "pytest (>=7.1.3,<7.2.0)", "pytest-cov (>=3.0.0,<3.1.0)", "pytest-html (>=3.1.1,<3.2.0)", "types-setuptools (>=65.3.0,<65.4.0)"] + [[package]] name = "mako" version = "1.3.8" @@ -3938,4 +3993,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "99ab1654f5e31068c163f274f578e436de0f8c6c428d727a6af546ca65caefd7" +content-hash = "0a3147bee62ac900c21007e9778ead24d22460bffd8d34a725d5e2d131a4ed81" diff --git a/pyproject.toml b/pyproject.toml index 842f3300..e5a0745e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,8 @@ anyio = "^4.7.0" slack-sdk = "^3.34.0" requests = "^2.32.3" aws-secretsmanager-caching = "^1.1.3" -botocore = "^1.35.81" +botocore = "^1.35.90" +aiogram = "^3.16.0" [tool.poetry.group.dev] optional = true diff --git a/tg/__init__.py b/tg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tg/bot/__init__.py b/tg/bot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tg/bot/filter/chat_type.py b/tg/bot/filter/chat_type.py new file mode 100644 index 00000000..225b8270 --- /dev/null +++ b/tg/bot/filter/chat_type.py @@ -0,0 +1,18 @@ +from aiogram.filters import BaseFilter +from aiogram.types import Message + + +class ChatTypeFilter(BaseFilter): + def __init__(self, chat_type: str | list): + self.chat_type = chat_type + + async def __call__(self, message: Message) -> bool: + if isinstance(self.chat_type, str): + return message.chat.type == self.chat_type + else: + return message.chat.type in self.chat_type + + +class GroupOnlyFilter(ChatTypeFilter): + def __init__(self): + super().__init__(["group", "supergroup"]) diff --git a/tg/bot/filter/content_type.py b/tg/bot/filter/content_type.py new file mode 100644 index 00000000..a54a3185 --- /dev/null +++ b/tg/bot/filter/content_type.py @@ -0,0 +1,15 @@ +from aiogram.filters import BaseFilter +from aiogram.types import Message, ContentType + + +class ContentTypeFilter(BaseFilter): + def __init__(self, content_types: ContentType | list): + self.content_types = content_types + + async def __call__(self, message: Message) -> bool: + return message.content_type in self.content_types + + +class TextOnlyFilter(ContentTypeFilter): + def __init__(self): + super().__init__([ContentType.TEXT]) diff --git a/tg/bot/kind/__init__.py b/tg/bot/kind/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tg/bot/kind/ai_relayer/__init__.py b/tg/bot/kind/ai_relayer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tg/bot/kind/ai_relayer/router.py b/tg/bot/kind/ai_relayer/router.py new file mode 100644 index 00000000..48ed0c5f --- /dev/null +++ b/tg/bot/kind/ai_relayer/router.py @@ -0,0 +1,81 @@ +from aiogram import Router +from aiogram.filters import CommandStart +from aiogram.fsm.context import FSMContext +from aiogram.fsm.state import State, StatesGroup +from aiogram.types import Message + +from app.core.ai import execute_agent +from tg.bot import pool +from tg.bot.filter.chat_type import GroupOnlyFilter +from tg.bot.filter.content_type import TextOnlyFilter + +general_router = Router() + + +class GeneralForm(StatesGroup): + name = State() + like_bots = State() + language = State() + + +## group commands and messages + + +@general_router.message(GroupOnlyFilter(), TextOnlyFilter(), CommandStart()) +async def gp_command_start(message: Message): + if message.from_user.is_bot: + return + + group_title = message.from_user.first_name + await message.answer( + text=f"🤖 Hi Everybody, {group_title}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.", + ) + + +@general_router.message(GroupOnlyFilter(), TextOnlyFilter()) +async def gp_process_message(message: Message) -> None: + if message.from_user.is_bot: + return + + bot = await message.bot.get_me() + if ( + message.reply_to_message + and message.reply_to_message.from_user.id == message.bot.id + ) or bot.username in message.text: + agent_id = pool.bot_by_token(message.bot.token)["agent_id"] + thread_id = pool.agent_thread_id(agent_id, message.chat.id) + response = execute_agent(agent_id, message.text, thread_id) + await message.answer( + text="\n".join(response), + reply_to_message_id=message.message_id, + ) + + +## direct commands and messages + + +@general_router.message(CommandStart(), TextOnlyFilter()) +async def command_start(message: Message, state: FSMContext) -> None: + if message.from_user.is_bot: + return + + first_name = message.from_user.first_name + await message.answer( + text=f"🤖 Hi, {first_name}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.", + ) + + +@general_router.message( + TextOnlyFilter(), +) +async def process_message(message: Message, state: FSMContext) -> None: + if message.from_user.is_bot: + return + + agent_id = pool.bot_by_token(message.bot.token)["agent_id"] + thread_id = pool.agent_thread_id(agent_id, message.chat.id) + response = execute_agent(agent_id, message.text, thread_id) + await message.answer( + text="\n".join(response), + reply_to_message_id=message.message_id, + ) diff --git a/tg/bot/kind/god/__init__.py b/tg/bot/kind/god/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tg/bot/kind/god/router.py b/tg/bot/kind/god/router.py new file mode 100644 index 00000000..bb2aa974 --- /dev/null +++ b/tg/bot/kind/god/router.py @@ -0,0 +1,31 @@ +from typing import Any, Dict, Union + +from aiogram import Bot, F, Router +from aiogram.exceptions import TelegramUnauthorizedError +from aiogram.filters import Command, CommandObject +from aiogram.types import Message +from aiogram.utils.token import TokenValidationError, validate_token + +god_router = Router() + + +def is_bot_token(value: str) -> Union[bool, Dict[str, Any]]: + try: + validate_token(value) + except TokenValidationError: + return False + return True + + +@god_router.message(Command("add", magic=F.args.func(is_bot_token))) +async def command_add_bot(message: Message, command: CommandObject, bot: Bot) -> Any: + new_bot = Bot(token=command.args, session=bot.session) + try: + bot_user = await new_bot.get_me() + except TelegramUnauthorizedError: + return message.answer("Invalid token") + # await new_bot.delete_webhook(drop_pending_updates=True) + # await new_bot.set_webhook(OTHER_BOTS_URL.format(bot_token=command.args)) + return await message.answer( + f"Your Bot is @{bot_user.username} but, it should be registered in Intent Kit first!" + ) diff --git a/tg/bot/kind/god/startup.py b/tg/bot/kind/god/startup.py new file mode 100644 index 00000000..6e0769dd --- /dev/null +++ b/tg/bot/kind/god/startup.py @@ -0,0 +1,11 @@ +from os import getenv + +from aiogram import Bot, Dispatcher + +BASE_URL = getenv("TG_BASE_URL") +GOD_BOT_PATH = "/webhook/god" +GOD_BOT_TOKEN = getenv("TG_TOKEN_GOD_BOT") + + +async def on_startup(dispatcher: Dispatcher, bot: Bot): + await bot.set_webhook(f"{BASE_URL}{GOD_BOT_PATH}") diff --git a/tg/bot/pool.py b/tg/bot/pool.py new file mode 100644 index 00000000..ec7b1730 --- /dev/null +++ b/tg/bot/pool.py @@ -0,0 +1,103 @@ +import logging + +from aiogram import Bot, Dispatcher +from aiogram.client.bot import DefaultBotProperties +from aiogram.enums import ParseMode +from aiogram.fsm.storage.memory import MemoryStorage +from aiogram.webhook.aiohttp_server import ( + SimpleRequestHandler, + TokenBasedRequestHandler, + setup_application, +) +from aiohttp import web + +from tg.bot.kind.ai_relayer.router import general_router +from tg.bot.kind.god.router import god_router +from tg.bot.kind.god.startup import GOD_BOT_PATH, GOD_BOT_TOKEN, on_startup +from tg.bot.types.kind import Kind +from tg.bot.types.router_obj import RouterObj + +logger = logging.getLogger(__name__) + +BOTS_PATH = "/webhook/tgbot/{kind}/{bot_token}" + +_bots = {} +_agent_bots = {} + + +def bot_by_token(token): + return _bots[token] + + +def bot_by_agent_id(agent_id): + return _agent_bots[agent_id] + + +def agent_thread_id(agent_id, chat_id): + return f"{agent_id}-telegram-{chat_id}" + + +async def health_handler(request): + """Health check endpoint handler.""" + return web.json_response({"status": "healthy"}) + + +class BotPool: + def __init__(self, base_url): + self.app = web.Application() + self.app.router.add_get("/health", health_handler) + self.base_url = f"{base_url}{BOTS_PATH}" + self.routers = { + Kind.AiRelayer: RouterObj(general_router), + } + + def init_god_bot(self): + if GOD_BOT_TOKEN is not None: + logger.info("Initialize god bot...") + self.god_bot = Bot( + token=GOD_BOT_TOKEN, + default=DefaultBotProperties(parse_mode=ParseMode.HTML), + ) + storage = MemoryStorage() + # In order to use RedisStorage you need to use Key Builder with bot ID: + # storage = RedisStorage.from_url(TG_REDIS_DSN, key_builder=DefaultKeyBuilder(with_bot_id=True)) + dp = Dispatcher(storage=storage) + dp.include_router(god_router) + dp.startup.register(on_startup) + SimpleRequestHandler(dispatcher=dp, bot=self.god_bot).register( + self.app, path=GOD_BOT_PATH + ) + setup_application(self.app, dp, bot=self.god_bot) + + def init_all_dispatchers(self): + logger.info("Initialize all dispatchers...") + for kind, b in self.routers.items(): + storage = MemoryStorage() + # In order to use RedisStorage you need to use Key Builder with bot ID: + # storage = RedisStorage.from_url(TG_REDIS_DSN, key_builder=DefaultKeyBuilder(with_bot_id=True)) + b.set_dispatcher(Dispatcher(storage=storage)) + b.get_dispatcher().include_router(b.get_router()) + TokenBasedRequestHandler( + dispatcher=b.get_dispatcher(), + default=DefaultBotProperties(parse_mode=ParseMode.HTML), + ).register( + self.app, + path=BOTS_PATH.format(kind=kind.value, bot_token="{bot_token}"), + ) + setup_application(self.app, b.get_dispatcher()) + logger.info("{kind} router initialized...".format(kind=kind)) + + async def init_new_bot(self, agent_id, kind, token): + bot = Bot( + token=token, + default=DefaultBotProperties(parse_mode=ParseMode.HTML), + ) + await bot.delete_webhook(drop_pending_updates=True) + await bot.set_webhook(self.base_url.format(kind=kind, bot_token=token)) + + _bots[token] = {"agent_id": agent_id, "bot": bot} + _agent_bots[agent_id] = {"token": token, "bot": bot} + logger.info("Bot with token {token} initialized...".format(token=token)) + + def start(self, asyncio_loop, host, port): + web.run_app(self.app, loop=asyncio_loop, host=host, port=port) diff --git a/tg/bot/types/kind.py b/tg/bot/types/kind.py new file mode 100644 index 00000000..14caab06 --- /dev/null +++ b/tg/bot/types/kind.py @@ -0,0 +1,5 @@ +from enum import Enum + + +class Kind(Enum): + AiRelayer = 1 diff --git a/tg/bot/types/router_obj.py b/tg/bot/types/router_obj.py new file mode 100644 index 00000000..fc46dd33 --- /dev/null +++ b/tg/bot/types/router_obj.py @@ -0,0 +1,12 @@ +class RouterObj: + def __init__(self, router): + self.router = router + + def get_router(self): + return self.router + + def set_dispatcher(self, dp): + self.dispatcher = dp + + def get_dispatcher(self): + return self.dispatcher