import markdown from loguru import logger from nio import AsyncClient, LoginResponse class MatrixBot: def __init__(self, config: dict): # TODO: Test configuration for required settings self.config = config self.client = AsyncClient( homeserver=self.config['homeserver'], user=self.config['username'] ) async def send_message(self, message: str): if not self.client.access_token: logged_in = await self.login() if not logged_in: return await self.client.room_send( room_id=self.config['room_id'], message_type="m.room.message", content={ "msgtype": "m.text", "body": message } ) logger.info("Message sent") async def send_markdown(self, message: str): if not self.client.access_token: logged_in = await self.login() if not logged_in: return # Convert message to markdown html = markdown.markdown(message) # Send markdown formatted message await self.client.room_send( room_id=self.config['room_id'], message_type="m.room.message", content={ "msgtype": "m.text", "body": message, "format": "org.matrix.custom.html", "formatted_body": html } ) logger.info("Markdown message sent") async def login(self): response = await self.client.login( password=self.config['password'] ) if isinstance(response, LoginResponse): logger.info(f"Logged in as {self.config['username']}") else: logger.error(f"Failed to login as {self.config['username']}: {response}") logger.error("Closing nio session") await self.client.close() return False return True async def logout(self): await self.client.logout() await self.client.close() logger.info(f"Logged out from {self.config['homeserver']}")