"""Matrix Bot Framework""" import asyncio import json from loguru import logger from nio import AsyncClient, LoginResponse class MatrixBot: def __init__(self, config: dict): # TODO: Test configuration for required settings self.config = config self.client = AsyncClient( homeserver=self.config['homeserver'], user=self.config['username'] ) async def send_message(self, message: str): if not self.client.access_token: logged_in = await self.login() if not logged_in: return await self.client.room_send( room_id=self.config['room_id'], message_type="m.room.message", content={ "msgtype": "m.text", "body": message } ) logger.info("Message sent") async def login(self): response = await self.client.login( password=self.config['password'] ) if isinstance(response, LoginResponse): logger.info(f"Logged in as {self.config['username']}") else: logger.error(f"Failed to login as {self.config['username']}: {response}") return False return True async def logout(self): await self.client.logout() await self.client.close() logger.info(f"Logged out from {self.config['homeserver']}") def load_config(path: str) -> dict: with open(path, 'r', encoding='utf-8') as f: return json.loads(f.read()) async def main(): config = load_config("config.json") bot = MatrixBot(config['matrix']) await bot.send_message("beep boop, i'm a bot") await bot.logout() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())