From 223eeff0be12ed8e6749550bfdc9e74c37556f70 Mon Sep 17 00:00:00 2001 From: agatha Date: Wed, 12 Jun 2024 22:45:54 -0400 Subject: [PATCH] thx gpt --- src/main.py | 14 ++++++-- src/requirements.txt | 1 + src/rmq.py | 79 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 74 insertions(+), 20 deletions(-) diff --git a/src/main.py b/src/main.py index e8555d4..1db91b8 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,7 @@ import asyncio import os import sys +import time import threading from queue import Queue from loguru import logger @@ -15,6 +16,10 @@ def init_logger(): async def main(): + # LAZY HACK THIS IS BAD + # TODO: Get rid of this trash and add retries if RabbitMQ isn't ready + time.sleep(5) + config = load_config("config.json") queue = Queue() monitor = EventMonitor( @@ -36,8 +41,13 @@ async def main(): while True: alert_text = queue.get() - producer.send_event(message={'body': alert_text}) - queue.task_done() + try: + producer.send_event(message={'body': alert_text}) + queue.task_done() + except Exception as e: + logger.error(f"bigtime issue here: {e}") + queue.put(alert_text) + # try: # await bot.send_markdown(event) diff --git a/src/requirements.txt b/src/requirements.txt index 10082b8..961bf3e 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -1,3 +1,4 @@ +pika loguru web3 requests diff --git a/src/rmq.py b/src/rmq.py index 779f1d3..d51c0c9 100644 --- a/src/rmq.py +++ b/src/rmq.py @@ -1,5 +1,6 @@ import json import pika +import time from loguru import logger @@ -10,32 +11,74 @@ class RabbitMQProducer: self.exchange = exchange self.exchange_type = exchange_type + self.max_retries = 5 + self.initial_delay = 1 # Initial delay in seconds for exponential backoff + self.connection = None self.channel = None self.connect() def connect(self): - try: - self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port)) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type) - except Exception as e: - logger.warning(f"Could not connect to RabbitMQ: {e}") + retries = 0 + while retries < self.max_retries: + try: + self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port)) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type) + logger.info(f"Successfully connected to RabbitMQ at {self.host}:{self.port}") + return + except Exception as e: + retries += 1 + wait_time = self.initial_delay * (2 ** (retries - 1)) + logger.warning(f"Could not connect to RabbitMQ (attempt {retries}/{self.max_retries}): {e}") + if retries < self.max_retries: + logger.info(f"Retrying in {wait_time} seconds...") + time.sleep(wait_time) + else: + logger.error("All connection attempts failed.") + raise def send_event(self, message: dict): - if self.connection.is_closed: - self.connect() + retries = 0 - try: - self.channel.basic_publish( - exchange=self.exchange, - routing_key='', # Empty routing key for fanout exchange - body=json.dumps(message), - properties=pika.BasicProperties(content_type='application/json') - ) - except Exception as e: - logger.warning(f"Failed to send message: {e}") + while retries < self.max_retries: + if self.connection is None or not self.connection.is_open: + logger.info("Connection is closed. Reconnecting...") + self.connect() + + try: + self.channel.basic_publish( + exchange=self.exchange, + routing_key='', # Empty routing key for fanout exchange + body=json.dumps(message), + properties=pika.BasicProperties(content_type='application/json') + ) + logger.info(f"Message sent: {message}") + return + except pika.exceptions.AMQPConnectionError as e: + retries += 1 + wait_time = self.initial_delay * (2 ** (retries - 1)) + logger.warning(f"Connection error (attempt {retries}/{self.max_retries}): {e}") + if retries < self.max_retries: + logger.info(f"Retrying in {wait_time} seconds...") + self.connect() + time.sleep(wait_time) + else: + logger.error("All retry attempts failed.") + raise + except Exception as e: + logger.warning(f"Failed to send message: {e}") + raise def close_connection(self): - if self.connection and not self.connection.is_closed: + if self.connection and self.connection.is_open: self.connection.close() + + +# Usage example: +if __name__ == "__main__": + producer = RabbitMQProducer(host='localhost', port=5672, exchange='logs') + try: + producer.send_event({"message": "Hello, RabbitMQ!"}) + finally: + producer.close_connection()