126 lines
4.1 KiB
Python
126 lines
4.1 KiB
Python
import time
|
|
from queue import Queue
|
|
from loguru import logger
|
|
from web3 import Web3
|
|
from util import fetch_abi
|
|
from db import Token, Pair, get_session
|
|
|
|
|
|
class EventMonitor:
|
|
"""Monitor Ethereum event logs"""
|
|
def __init__(self, config: dict, queue: Queue):
|
|
self.config = config
|
|
self.queue = queue
|
|
|
|
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
|
|
|
|
pool_address = self.config['pool_address']
|
|
pool_abi = fetch_abi(pool_address)
|
|
self.contract = self.web3.eth.contract(
|
|
address=pool_address,
|
|
abi=pool_abi
|
|
)
|
|
self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest')
|
|
|
|
def log_loop(self, interval: int):
|
|
while True:
|
|
try:
|
|
if not self._check_connection():
|
|
logger.error("No connection to Ethereum RPC")
|
|
continue
|
|
for event in self.event_filter.get_new_entries():
|
|
self._handle_event(event)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
time.sleep(interval)
|
|
|
|
def fetch_token_info(self, token_address):
|
|
try:
|
|
token_abi = fetch_abi(token_address)
|
|
except Exception as err:
|
|
logger.warning(f"Failed to fetch info for {token_address}: {err}")
|
|
return {
|
|
'name': None,
|
|
'symbol': None,
|
|
'total_supply': None
|
|
}
|
|
|
|
# Create the contract instance
|
|
token_contract = self.web3.eth.contract(address=token_address, abi=token_abi)
|
|
|
|
# Fetch the name, symbol, and total supply
|
|
name = token_contract.functions.name().call()
|
|
symbol = token_contract.functions.symbol().call()
|
|
total_supply = token_contract.functions.totalSupply().call()
|
|
|
|
return {
|
|
'name': name,
|
|
'symbol': symbol,
|
|
'total_supply': str(total_supply)
|
|
}
|
|
|
|
def _check_connection(self):
|
|
if self.web3.is_connected():
|
|
logger.debug("Connected to Ethereum RPC")
|
|
return True
|
|
else:
|
|
logger.warning("Failed to connect to Ethereum RPC")
|
|
return False
|
|
|
|
def _handle_event(self, event):
|
|
# Get token addresses from the event
|
|
token0_address = event.args['token0']
|
|
token1_address = event.args['token1']
|
|
pair_address = event.args['pair']
|
|
|
|
# Try to resolve name, symbol, and supply
|
|
token0_info = self.fetch_token_info(token0_address)
|
|
token1_info = self.fetch_token_info(token1_address)
|
|
|
|
# Create SQLAlchemy objects
|
|
new_token0 = Token(
|
|
id=token0_address,
|
|
name=token0_info['name'],
|
|
symbol=token0_info['symbol'],
|
|
total_supply=token0_info['total_supply']
|
|
)
|
|
new_token1 = Token(
|
|
id=token1_address,
|
|
name=token1_info['name'],
|
|
symbol=token1_info['symbol'],
|
|
total_supply=token1_info['total_supply']
|
|
)
|
|
new_pair = Pair(id=pair_address, token0=token0_address, token1=token1_address)
|
|
|
|
# Add token0
|
|
try:
|
|
session = next(get_session())
|
|
session.add(new_token0)
|
|
session.commit()
|
|
except Exception as err:
|
|
logger.warning(f"Could not add token0: {err}")
|
|
|
|
# Add token1
|
|
try:
|
|
session = next(get_session())
|
|
session.add(new_token1)
|
|
session.commit()
|
|
except Exception as err:
|
|
logger.warning(f"Could not add token1: {err}")
|
|
|
|
# Add pair
|
|
try:
|
|
session = next(get_session())
|
|
session.add(new_pair)
|
|
session.commit()
|
|
except Exception as err:
|
|
logger.warning(f"Could not add new pair: {err}")
|
|
|
|
# Add alert to queue
|
|
logger.info(f"New pair: {token0_address} + {token1_address}")
|
|
self.queue.put(
|
|
f"New Uniswap pair: [{token0_info['name']}](https://etherscan.io/address/{token0_address}) +"
|
|
f"[{token1_info['name']}](https://etherscan.io/address/{token1_address})"
|
|
)
|