Compare commits

...

2 Commits

Author SHA1 Message Date
223eeff0be thx gpt 2024-06-12 22:45:54 -04:00
cb64c49092 use env var for rabbitmq host 2024-06-12 22:00:21 -04:00
3 changed files with 76 additions and 21 deletions

View File

@ -1,5 +1,7 @@
import asyncio import asyncio
import os
import sys import sys
import time
import threading import threading
from queue import Queue from queue import Queue
from loguru import logger from loguru import logger
@ -14,6 +16,10 @@ def init_logger():
async def main(): async def main():
# LAZY HACK THIS IS BAD
# TODO: Get rid of this trash and add retries if RabbitMQ isn't ready
time.sleep(5)
config = load_config("config.json") config = load_config("config.json")
queue = Queue() queue = Queue()
monitor = EventMonitor( monitor = EventMonitor(
@ -28,15 +34,20 @@ async def main():
monitor_thread.start() monitor_thread.start()
# RabbitMQ # RabbitMQ
host = 'localhost' host = os.getenv("RABBITMQ_HOST", "localhost")
port = 5672 port = 5672
exchange = 'matrix_alerts' exchange = 'matrix_alerts'
producer = RabbitMQProducer(host=host, port=port, exchange=exchange) producer = RabbitMQProducer(host=host, port=port, exchange=exchange)
while True: while True:
alert_text = queue.get() alert_text = queue.get()
producer.send_event(message={'body': alert_text}) try:
queue.task_done() producer.send_event(message={'body': alert_text})
queue.task_done()
except Exception as e:
logger.error(f"bigtime issue here: {e}")
queue.put(alert_text)
# try: # try:
# await bot.send_markdown(event) # await bot.send_markdown(event)

View File

@ -1,3 +1,4 @@
pika
loguru loguru
web3 web3
requests requests

View File

@ -1,5 +1,6 @@
import json import json
import pika import pika
import time
from loguru import logger from loguru import logger
@ -10,32 +11,74 @@ class RabbitMQProducer:
self.exchange = exchange self.exchange = exchange
self.exchange_type = exchange_type self.exchange_type = exchange_type
self.max_retries = 5
self.initial_delay = 1 # Initial delay in seconds for exponential backoff
self.connection = None self.connection = None
self.channel = None self.channel = None
self.connect() self.connect()
def connect(self): def connect(self):
try: retries = 0
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port)) while retries < self.max_retries:
self.channel = self.connection.channel() try:
self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type) self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port))
except Exception as e: self.channel = self.connection.channel()
logger.warning(f"Could not connect to RabbitMQ: {e}") self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type)
logger.info(f"Successfully connected to RabbitMQ at {self.host}:{self.port}")
return
except Exception as e:
retries += 1
wait_time = self.initial_delay * (2 ** (retries - 1))
logger.warning(f"Could not connect to RabbitMQ (attempt {retries}/{self.max_retries}): {e}")
if retries < self.max_retries:
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
logger.error("All connection attempts failed.")
raise
def send_event(self, message: dict): def send_event(self, message: dict):
if self.connection.is_closed: retries = 0
self.connect()
try: while retries < self.max_retries:
self.channel.basic_publish( if self.connection is None or not self.connection.is_open:
exchange=self.exchange, logger.info("Connection is closed. Reconnecting...")
routing_key='', # Empty routing key for fanout exchange self.connect()
body=json.dumps(message),
properties=pika.BasicProperties(content_type='application/json') try:
) self.channel.basic_publish(
except Exception as e: exchange=self.exchange,
logger.warning(f"Failed to send message: {e}") routing_key='', # Empty routing key for fanout exchange
body=json.dumps(message),
properties=pika.BasicProperties(content_type='application/json')
)
logger.info(f"Message sent: {message}")
return
except pika.exceptions.AMQPConnectionError as e:
retries += 1
wait_time = self.initial_delay * (2 ** (retries - 1))
logger.warning(f"Connection error (attempt {retries}/{self.max_retries}): {e}")
if retries < self.max_retries:
logger.info(f"Retrying in {wait_time} seconds...")
self.connect()
time.sleep(wait_time)
else:
logger.error("All retry attempts failed.")
raise
except Exception as e:
logger.warning(f"Failed to send message: {e}")
raise
def close_connection(self): def close_connection(self):
if self.connection and not self.connection.is_closed: if self.connection and self.connection.is_open:
self.connection.close() self.connection.close()
# Usage example:
if __name__ == "__main__":
producer = RabbitMQProducer(host='localhost', port=5672, exchange='logs')
try:
producer.send_event({"message": "Hello, RabbitMQ!"})
finally:
producer.close_connection()