import time from queue import Queue from loguru import logger from web3 import Web3 from util import fetch_abi from db import Token, Pair, get_session class EventMonitor: """Monitor Ethereum event logs""" def __init__(self, config: dict, queue: Queue): self.config = config self.queue = queue self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url'])) pool_address = self.config['pool_address'] pool_abi = fetch_abi(pool_address, key=self.config['etherscan_key']) self.contract = self.web3.eth.contract( address=pool_address, abi=pool_abi ) self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest') def log_loop(self, interval: int): while True: try: if not self._check_connection(): logger.error("No connection to Ethereum RPC") continue for event in self.event_filter.get_new_entries(): self._handle_event(event) except Exception as e: logger.error(e) time.sleep(interval) def fetch_token_info(self, token_address): try: token_abi = fetch_abi(token_address, key=self.config['etherscan_key']) except Exception as err: logger.warning(f"Failed to fetch info for {token_address}: {err}") token_abi = None if not token_abi: return { 'name': None, 'symbol': None, 'total_supply': None } # Create the contract instance token_contract = self.web3.eth.contract(address=token_address, abi=token_abi) # Fetch the name, symbol, and total supply name = token_contract.functions.name().call() symbol = token_contract.functions.symbol().call() total_supply = token_contract.functions.totalSupply().call() return { 'name': name, 'symbol': symbol, 'total_supply': str(total_supply) } def _check_connection(self): if self.web3.is_connected(): logger.debug("Connected to Ethereum RPC") return True else: logger.warning("Failed to connect to Ethereum RPC") return False def _handle_event(self, event): # Get token addresses from the event token0_address = event.args['token0'] token1_address = event.args['token1'] pair_address = event.args['pair'] # Try to resolve name, symbol, and supply token0_info = self.fetch_token_info(token0_address) token1_info = self.fetch_token_info(token1_address) # Create SQLAlchemy objects new_token0 = Token( id=token0_address, name=token0_info['name'], symbol=token0_info['symbol'], total_supply=token0_info['total_supply'] ) new_token1 = Token( id=token1_address, name=token1_info['name'], symbol=token1_info['symbol'], total_supply=token1_info['total_supply'] ) new_pair = Pair(id=pair_address, token0=token0_address, token1=token1_address) # Add token0 try: session = next(get_session()) session.add(new_token0) session.commit() except Exception as err: logger.warning(f"Could not add token0: {err}") # Add token1 try: session = next(get_session()) session.add(new_token1) session.commit() except Exception as err: logger.warning(f"Could not add token1: {err}") # Add pair try: session = next(get_session()) session.add(new_pair) session.commit() except Exception as err: logger.warning(f"Could not add new pair: {err}") # Add alert to queue logger.info(f"New pair: {token0_address} + {token1_address}") self.queue.put( f"New Uniswap pair: [{token0_info['name']}](https://etherscan.io/address/{token0_address}) +" f"[{token1_info['name']}](https://etherscan.io/address/{token1_address})" )