initial commit

This commit is contained in:
agatha 2024-06-12 20:37:48 -04:00
commit cb0be3719b
8 changed files with 298 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.idea/
venv/
config.json
__pycache__
*.py[cod]

14
README.md Normal file
View File

@ -0,0 +1,14 @@
# matrix-alerts
RabbitMQ consumer that sends alerts to Matrix.
## Configuration
```
{
"matrix": {
"homeserver": "",
"username": "",
"password": "",
"room_id": ""
}
}
```

0
src/bots/__init__.py Normal file
View File

90
src/bots/matrix.py Normal file
View File

@ -0,0 +1,90 @@
import markdown
from loguru import logger
from nio import AsyncClient, LoginResponse
class MatrixBot:
def __init__(self, config: dict):
# TODO: Test configuration for required settings
self.config = config
self.client = AsyncClient(
homeserver=self.config['homeserver'],
user=self.config['username']
)
self.logged_in = False
async def ensure_logged_in(self):
if not self.logged_in:
try:
response = await self.client.login(password=self.config['password'])
if isinstance(response, LoginResponse):
self.logged_in = True
logger.info(f"Logged in as {self.config['username']}")
else:
logger.error(f"Failed to login as {self.config['username']}: {response}")
logger.error("Closing nio session")
await self.client.close()
except Exception as e:
logger.error(f"Exception during login: {e}")
await self.client.close()
raise
async def send_message(self, message: str):
await self.ensure_logged_in()
if not self.logged_in:
logger.error("Unable to send message, login failed")
return
try:
await self.client.room_send(
room_id=self.config['room_id'],
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message
}
)
logger.info("Message sent")
except Exception as e:
logger.error(f"Exception during sending message: {e}")
raise
async def send_markdown(self, message: str):
await self.ensure_logged_in()
if not self.logged_in:
logger.error("Unable to send message, login failed")
return
try:
# Convert message to markdown
html = markdown.markdown(message)
# Send markdown formatted message
await self.client.room_send(
room_id=self.config['room_id'],
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
"format": "org.matrix.custom.html",
"formatted_body": html
}
)
logger.info("Markdown message sent")
except Exception as e:
logger.error(f"Exception during sending markdown message: {e}")
raise
async def close(self):
if self.logged_in:
try:
await self.client.logout()
self.logged_in = False
logger.info(f"Logged out from {self.config['homeserver']}")
except Exception as e:
logger.error(f"Exception during logout: {e}")
finally:
await self.client.close() # Ensure the client is closed

92
src/main.py Normal file
View File

@ -0,0 +1,92 @@
import json
import asyncio
import signal
from loguru import logger
from bots.matrix import MatrixBot
from rmq.consumers import RMQConsumer
def load_config(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f:
return json.loads(f.read())
async def main():
# Load configuration
config = load_config('config.json')
# Initialize Matrix bot
bot = MatrixBot(config['matrix'])
# Initialize RabbitMQ consumer
consumer = RMQConsumer(
host='localhost',
exchange='matrix_alerts'
)
# RabbitMQ message handler
async def on_message(ch, method, properties, body):
try:
logger.debug(f"Event received: {body}")
message = json.loads(body)
await bot.send_markdown(message['body'])
logger.debug(f"Message sent to Matrix: {message['body']}")
except Exception as e:
logger.error(f"Failed to send message to Matrix: {e}")
# Ensure cleanup on shutdown
def shutdown():
logger.info("Shutdown initiated.")
all_tasks = asyncio.all_tasks(loop)
for task in all_tasks:
task.cancel()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
# Start the RabbitMQ consumer
consume_task = asyncio.create_task(consumer.start_consuming(on_message))
try:
# Await the consume task to keep the main running until shutdown
await consume_task
except asyncio.CancelledError:
logger.info("Main task cancellation requested.")
finally:
logger.info("Stopping consumer and cleaning up resources.")
# Stop the RabbitMQ consumer if it is running
if consumer.channel:
try:
await consumer.stop_consuming()
except Exception as e:
logger.warning(f"Failed to stop consumer cleanly: {e}")
# Close the Matrix bot
logger.info("Shutting down bot.")
await bot.close()
logger.info("Shutdown complete.")
def main_with_shutdown():
loop = asyncio.get_event_loop()
main_task = loop.create_task(main())
try:
loop.run_until_complete(main_task)
except asyncio.CancelledError:
logger.info("Main task has been cancelled.")
finally:
pending_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
if pending_tasks:
loop.run_until_complete(asyncio.gather(*pending_tasks, return_exceptions=True))
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
logger.info("Event loop closed.")
if __name__ == '__main__':
main_with_shutdown()

4
src/requirements.txt Normal file
View File

@ -0,0 +1,4 @@
matrix-nio
loguru
markdown
pika

0
src/rmq/__init__.py Normal file
View File

93
src/rmq/consumers.py Normal file
View File

@ -0,0 +1,93 @@
import asyncio
import pika
from loguru import logger
from pika.channel import Channel
from typing import Callable, Optional, Awaitable
class RMQConsumer:
"""
A handler class for RabbitMQ to manage connection and message consumption.
Attributes:
host (str): RabbitMQ server host.
queue_name (str): Name of the queue.
exchange (str): Name of the exchange.
exchange_type (str): Type of the exchange (default is 'fanout').
port (int): Port number for RabbitMQ server (default is 5672).
virtual_host (str): Virtual host (default is '/').
username (str): Username for RabbitMQ (default is 'guest').
password (str): Password for RabbitMQ (default is 'guest').
"""
def __init__(self, host: str, queue_name: str = '', exchange: str = '', exchange_type: str = 'fanout',
port: int = 5672, virtual_host: str = '/', username: str = 'guest', password: str = 'guest'):
self.host = host
self.queue_name = queue_name
self.exchange = exchange
self.exchange_type = exchange_type
self.port = port
self.virtual_host = virtual_host
self.username = username
self.password = password
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[Channel] = None
def connect(self) -> None:
"""Establish a connection to RabbitMQ and set up the channel and queue."""
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials
)
try:
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type)
result = self.channel.queue_declare(queue=self.queue_name, durable=True)
self.queue_name = result.method.queue
self.channel.queue_bind(exchange=self.exchange, queue=self.queue_name)
except pika.exceptions.AMQPError as error:
logger.error(f"Error connecting to RabbitMQ: {error}")
async def start_consuming(self, on_message_callback: Callable[
[Channel, pika.spec.Basic.Deliver, pika.spec.BasicProperties, bytes], Awaitable[None]]
) -> None:
"""
Start consuming messages from the queue.
Args:
on_message_callback (Callable): User-defined callback function for processing messages.
"""
def callback_wrapper(ch, method, properties, body):
asyncio.create_task(on_message_callback(ch, method, properties, body))
self.connect()
if not self.channel:
logger.error("Failed to connect to RabbitMQ.")
return
self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback_wrapper, auto_ack=True)
await self._consume_async()
async def stop_consuming(self) -> None:
"""Stop consuming messages and close the connection."""
if self.channel and self.connection:
self.channel.stop_consuming()
await self.close()
async def _consume_async(self) -> None:
"""Asynchronously process events."""
while self.channel and self.channel.is_open:
self.connection.process_data_events(time_limit=1)
await asyncio.sleep(0.1)
async def close(self) -> None:
"""Close the RabbitMQ connection."""
if self.connection and not self.connection.is_closed:
self.connection.close()