commit cb0be3719b68cb6b2ac5d9dc22d700a0ad71ee80 Author: agatha Date: Wed Jun 12 20:37:48 2024 -0400 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..345c4eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea/ +venv/ +config.json +__pycache__ +*.py[cod] diff --git a/README.md b/README.md new file mode 100644 index 0000000..6d914ac --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +# matrix-alerts +RabbitMQ consumer that sends alerts to Matrix. + +## Configuration +``` +{ + "matrix": { + "homeserver": "", + "username": "", + "password": "", + "room_id": "" + } +} +``` \ No newline at end of file diff --git a/src/bots/__init__.py b/src/bots/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bots/matrix.py b/src/bots/matrix.py new file mode 100644 index 0000000..7316d1a --- /dev/null +++ b/src/bots/matrix.py @@ -0,0 +1,90 @@ +import markdown +from loguru import logger +from nio import AsyncClient, LoginResponse + + +class MatrixBot: + def __init__(self, config: dict): + # TODO: Test configuration for required settings + self.config = config + + self.client = AsyncClient( + homeserver=self.config['homeserver'], + user=self.config['username'] + ) + self.logged_in = False + + async def ensure_logged_in(self): + if not self.logged_in: + try: + response = await self.client.login(password=self.config['password']) + if isinstance(response, LoginResponse): + self.logged_in = True + logger.info(f"Logged in as {self.config['username']}") + else: + logger.error(f"Failed to login as {self.config['username']}: {response}") + logger.error("Closing nio session") + await self.client.close() + except Exception as e: + logger.error(f"Exception during login: {e}") + await self.client.close() + raise + + async def send_message(self, message: str): + await self.ensure_logged_in() + + if not self.logged_in: + logger.error("Unable to send message, login failed") + return + + try: + await self.client.room_send( + room_id=self.config['room_id'], + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": message + } + ) + logger.info("Message sent") + except Exception as e: + logger.error(f"Exception during sending message: {e}") + raise + + async def send_markdown(self, message: str): + await self.ensure_logged_in() + + if not self.logged_in: + logger.error("Unable to send message, login failed") + return + + try: + # Convert message to markdown + html = markdown.markdown(message) + + # Send markdown formatted message + await self.client.room_send( + room_id=self.config['room_id'], + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": message, + "format": "org.matrix.custom.html", + "formatted_body": html + } + ) + logger.info("Markdown message sent") + except Exception as e: + logger.error(f"Exception during sending markdown message: {e}") + raise + + async def close(self): + if self.logged_in: + try: + await self.client.logout() + self.logged_in = False + logger.info(f"Logged out from {self.config['homeserver']}") + except Exception as e: + logger.error(f"Exception during logout: {e}") + finally: + await self.client.close() # Ensure the client is closed diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..1e90f4c --- /dev/null +++ b/src/main.py @@ -0,0 +1,92 @@ +import json +import asyncio +import signal +from loguru import logger + +from bots.matrix import MatrixBot +from rmq.consumers import RMQConsumer + + +def load_config(path: str) -> dict: + with open(path, 'r', encoding='utf-8') as f: + return json.loads(f.read()) + + +async def main(): + # Load configuration + config = load_config('config.json') + + # Initialize Matrix bot + bot = MatrixBot(config['matrix']) + + # Initialize RabbitMQ consumer + consumer = RMQConsumer( + host='localhost', + exchange='matrix_alerts' + ) + + # RabbitMQ message handler + async def on_message(ch, method, properties, body): + try: + logger.debug(f"Event received: {body}") + message = json.loads(body) + await bot.send_markdown(message['body']) + logger.debug(f"Message sent to Matrix: {message['body']}") + except Exception as e: + logger.error(f"Failed to send message to Matrix: {e}") + + # Ensure cleanup on shutdown + def shutdown(): + logger.info("Shutdown initiated.") + all_tasks = asyncio.all_tasks(loop) + for task in all_tasks: + task.cancel() + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, shutdown) + loop.add_signal_handler(signal.SIGTERM, shutdown) + + # Start the RabbitMQ consumer + consume_task = asyncio.create_task(consumer.start_consuming(on_message)) + + try: + # Await the consume task to keep the main running until shutdown + await consume_task + except asyncio.CancelledError: + logger.info("Main task cancellation requested.") + finally: + logger.info("Stopping consumer and cleaning up resources.") + + # Stop the RabbitMQ consumer if it is running + if consumer.channel: + try: + await consumer.stop_consuming() + except Exception as e: + logger.warning(f"Failed to stop consumer cleanly: {e}") + + # Close the Matrix bot + logger.info("Shutting down bot.") + await bot.close() + + logger.info("Shutdown complete.") + + +def main_with_shutdown(): + loop = asyncio.get_event_loop() + main_task = loop.create_task(main()) + + try: + loop.run_until_complete(main_task) + except asyncio.CancelledError: + logger.info("Main task has been cancelled.") + finally: + pending_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()] + if pending_tasks: + loop.run_until_complete(asyncio.gather(*pending_tasks, return_exceptions=True)) + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + logger.info("Event loop closed.") + + +if __name__ == '__main__': + main_with_shutdown() diff --git a/src/requirements.txt b/src/requirements.txt new file mode 100644 index 0000000..29a7c04 --- /dev/null +++ b/src/requirements.txt @@ -0,0 +1,4 @@ +matrix-nio +loguru +markdown +pika \ No newline at end of file diff --git a/src/rmq/__init__.py b/src/rmq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rmq/consumers.py b/src/rmq/consumers.py new file mode 100644 index 0000000..00d283f --- /dev/null +++ b/src/rmq/consumers.py @@ -0,0 +1,93 @@ +import asyncio +import pika +from loguru import logger +from pika.channel import Channel +from typing import Callable, Optional, Awaitable + + +class RMQConsumer: + """ + A handler class for RabbitMQ to manage connection and message consumption. + + Attributes: + host (str): RabbitMQ server host. + queue_name (str): Name of the queue. + exchange (str): Name of the exchange. + exchange_type (str): Type of the exchange (default is 'fanout'). + port (int): Port number for RabbitMQ server (default is 5672). + virtual_host (str): Virtual host (default is '/'). + username (str): Username for RabbitMQ (default is 'guest'). + password (str): Password for RabbitMQ (default is 'guest'). + """ + + def __init__(self, host: str, queue_name: str = '', exchange: str = '', exchange_type: str = 'fanout', + port: int = 5672, virtual_host: str = '/', username: str = 'guest', password: str = 'guest'): + self.host = host + self.queue_name = queue_name + self.exchange = exchange + self.exchange_type = exchange_type + self.port = port + self.virtual_host = virtual_host + self.username = username + self.password = password + self.connection: Optional[pika.BlockingConnection] = None + self.channel: Optional[Channel] = None + + def connect(self) -> None: + """Establish a connection to RabbitMQ and set up the channel and queue.""" + credentials = pika.PlainCredentials(self.username, self.password) + parameters = pika.ConnectionParameters( + host=self.host, + port=self.port, + virtual_host=self.virtual_host, + credentials=credentials + ) + + try: + self.connection = pika.BlockingConnection(parameters) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type) + + result = self.channel.queue_declare(queue=self.queue_name, durable=True) + self.queue_name = result.method.queue + self.channel.queue_bind(exchange=self.exchange, queue=self.queue_name) + except pika.exceptions.AMQPError as error: + logger.error(f"Error connecting to RabbitMQ: {error}") + + async def start_consuming(self, on_message_callback: Callable[ + [Channel, pika.spec.Basic.Deliver, pika.spec.BasicProperties, bytes], Awaitable[None]] + ) -> None: + """ + Start consuming messages from the queue. + + Args: + on_message_callback (Callable): User-defined callback function for processing messages. + """ + + def callback_wrapper(ch, method, properties, body): + asyncio.create_task(on_message_callback(ch, method, properties, body)) + + self.connect() + if not self.channel: + logger.error("Failed to connect to RabbitMQ.") + return + + self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback_wrapper, auto_ack=True) + await self._consume_async() + + async def stop_consuming(self) -> None: + """Stop consuming messages and close the connection.""" + if self.channel and self.connection: + self.channel.stop_consuming() + await self.close() + + async def _consume_async(self) -> None: + """Asynchronously process events.""" + while self.channel and self.channel.is_open: + self.connection.process_data_events(time_limit=1) + await asyncio.sleep(0.1) + + async def close(self) -> None: + """Close the RabbitMQ connection.""" + if self.connection and not self.connection.is_closed: + self.connection.close()