This commit is contained in:
agatha 2024-06-12 22:45:54 -04:00
parent cb64c49092
commit 223eeff0be
3 changed files with 74 additions and 20 deletions

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
import os import os
import sys import sys
import time
import threading import threading
from queue import Queue from queue import Queue
from loguru import logger from loguru import logger
@ -15,6 +16,10 @@ def init_logger():
async def main(): async def main():
# LAZY HACK THIS IS BAD
# TODO: Get rid of this trash and add retries if RabbitMQ isn't ready
time.sleep(5)
config = load_config("config.json") config = load_config("config.json")
queue = Queue() queue = Queue()
monitor = EventMonitor( monitor = EventMonitor(
@ -36,8 +41,13 @@ async def main():
while True: while True:
alert_text = queue.get() alert_text = queue.get()
producer.send_event(message={'body': alert_text}) try:
queue.task_done() producer.send_event(message={'body': alert_text})
queue.task_done()
except Exception as e:
logger.error(f"bigtime issue here: {e}")
queue.put(alert_text)
# try: # try:
# await bot.send_markdown(event) # await bot.send_markdown(event)

View File

@ -1,3 +1,4 @@
pika
loguru loguru
web3 web3
requests requests

View File

@ -1,5 +1,6 @@
import json import json
import pika import pika
import time
from loguru import logger from loguru import logger
@ -10,32 +11,74 @@ class RabbitMQProducer:
self.exchange = exchange self.exchange = exchange
self.exchange_type = exchange_type self.exchange_type = exchange_type
self.max_retries = 5
self.initial_delay = 1 # Initial delay in seconds for exponential backoff
self.connection = None self.connection = None
self.channel = None self.channel = None
self.connect() self.connect()
def connect(self): def connect(self):
try: retries = 0
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port)) while retries < self.max_retries:
self.channel = self.connection.channel() try:
self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type) self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port))
except Exception as e: self.channel = self.connection.channel()
logger.warning(f"Could not connect to RabbitMQ: {e}") self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type)
logger.info(f"Successfully connected to RabbitMQ at {self.host}:{self.port}")
return
except Exception as e:
retries += 1
wait_time = self.initial_delay * (2 ** (retries - 1))
logger.warning(f"Could not connect to RabbitMQ (attempt {retries}/{self.max_retries}): {e}")
if retries < self.max_retries:
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
logger.error("All connection attempts failed.")
raise
def send_event(self, message: dict): def send_event(self, message: dict):
if self.connection.is_closed: retries = 0
self.connect()
try: while retries < self.max_retries:
self.channel.basic_publish( if self.connection is None or not self.connection.is_open:
exchange=self.exchange, logger.info("Connection is closed. Reconnecting...")
routing_key='', # Empty routing key for fanout exchange self.connect()
body=json.dumps(message),
properties=pika.BasicProperties(content_type='application/json') try:
) self.channel.basic_publish(
except Exception as e: exchange=self.exchange,
logger.warning(f"Failed to send message: {e}") routing_key='', # Empty routing key for fanout exchange
body=json.dumps(message),
properties=pika.BasicProperties(content_type='application/json')
)
logger.info(f"Message sent: {message}")
return
except pika.exceptions.AMQPConnectionError as e:
retries += 1
wait_time = self.initial_delay * (2 ** (retries - 1))
logger.warning(f"Connection error (attempt {retries}/{self.max_retries}): {e}")
if retries < self.max_retries:
logger.info(f"Retrying in {wait_time} seconds...")
self.connect()
time.sleep(wait_time)
else:
logger.error("All retry attempts failed.")
raise
except Exception as e:
logger.warning(f"Failed to send message: {e}")
raise
def close_connection(self): def close_connection(self):
if self.connection and not self.connection.is_closed: if self.connection and self.connection.is_open:
self.connection.close() self.connection.close()
# Usage example:
if __name__ == "__main__":
producer = RabbitMQProducer(host='localhost', port=5672, exchange='logs')
try:
producer.send_event({"message": "Hello, RabbitMQ!"})
finally:
producer.close_connection()