import time from queue import Queue from loguru import logger from web3 import Web3 from util import fetch_abi class EventMonitor: """Monitor Ethereum event logs""" def __init__(self, config: dict, queue: Queue): self.config = config self.queue = queue self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url'])) pool_address = self.config['pool_address'] pool_abi = fetch_abi(pool_address) self.contract = self.web3.eth.contract( address=pool_address, abi=pool_abi ) self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest') def log_loop(self, interval: int): while True: try: if not self._check_connection(): logger.error("No connection to Ethereum RPC") continue for event in self.event_filter.get_new_entries(): self._handle_event(event) except Exception as e: logger.error(e) time.sleep(interval) def _check_connection(self): if self.web3.is_connected(): logger.debug("Connected to Ethereum RPC") return True else: logger.warning("Failed to connect to Ethereum RPC") return False def _handle_event(self, event): # TODO: Get token names # TODO: Resolve token names # TODO: Create data structure for the event token0 = event.args['token0'] token1 = event.args['token1'] logger.info(f"New pair: {token0} + {token1}") self.queue.put( f"New Uniswap pair: https://etherscan.io/address/{token0} + https://etherscan.io/address/{token1}" )