mbot/monitor.py

130 lines
4.2 KiB
Python
Raw Normal View History

2024-06-07 22:00:14 +00:00
import time
from queue import Queue
from loguru import logger
from web3 import Web3
2024-06-08 20:31:07 +00:00
from util import fetch_abi, format_matrix_alert
2024-06-08 18:16:53 +00:00
from db import Token, Pair, get_session
2024-06-07 22:00:14 +00:00
class EventMonitor:
"""Monitor Ethereum event logs"""
def __init__(self, config: dict, queue: Queue):
self.config = config
self.queue = queue
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
pool_address = self.config['pool_address']
pool_abi = fetch_abi(pool_address, key=self.config['etherscan_key'])
2024-06-07 22:00:14 +00:00
self.contract = self.web3.eth.contract(
address=pool_address,
abi=pool_abi
)
self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest')
def log_loop(self, interval: int):
while True:
try:
if not self._check_connection():
logger.error("No connection to Ethereum RPC")
continue
for event in self.event_filter.get_new_entries():
self._handle_event(event)
except Exception as e:
logger.error(e)
time.sleep(interval)
2024-06-08 18:16:53 +00:00
def fetch_token_info(self, token_address):
try:
token_abi = fetch_abi(token_address, key=self.config['etherscan_key'])
2024-06-08 18:16:53 +00:00
except Exception as err:
logger.warning(f"Failed to fetch info for {token_address}: {err}")
2024-06-08 18:41:38 +00:00
token_abi = None
if not token_abi:
2024-06-08 18:16:53 +00:00
return {
'name': None,
'symbol': None,
'total_supply': None
}
# Create the contract instance
token_contract = self.web3.eth.contract(address=token_address, abi=token_abi)
# Fetch the name, symbol, and total supply
name = token_contract.functions.name().call()
symbol = token_contract.functions.symbol().call()
total_supply = token_contract.functions.totalSupply().call()
return {
'name': name,
'symbol': symbol,
'total_supply': str(total_supply)
}
2024-06-07 22:00:14 +00:00
def _check_connection(self):
if self.web3.is_connected():
logger.debug("Connected to Ethereum RPC")
return True
else:
logger.warning("Failed to connect to Ethereum RPC")
return False
def _handle_event(self, event):
2024-06-08 18:16:53 +00:00
# Get token addresses from the event
token0_address = event.args['token0']
token1_address = event.args['token1']
pair_address = event.args['pair']
# Try to resolve name, symbol, and supply
token0_info = self.fetch_token_info(token0_address)
token1_info = self.fetch_token_info(token1_address)
2024-06-07 22:00:14 +00:00
2024-06-08 18:16:53 +00:00
# Create SQLAlchemy objects
new_token0 = Token(
id=token0_address,
name=token0_info['name'],
symbol=token0_info['symbol'],
total_supply=token0_info['total_supply']
)
new_token1 = Token(
id=token1_address,
name=token1_info['name'],
symbol=token1_info['symbol'],
total_supply=token1_info['total_supply']
)
new_pair = Pair(id=pair_address, token0=token0_address, token1=token1_address)
# Add token0
try:
session = next(get_session())
session.add(new_token0)
session.commit()
except Exception as err:
logger.warning(f"Could not add token0: {err}")
2024-06-07 22:00:14 +00:00
2024-06-08 18:16:53 +00:00
# Add token1
try:
session = next(get_session())
session.add(new_token1)
session.commit()
except Exception as err:
logger.warning(f"Could not add token1: {err}")
2024-06-07 22:00:14 +00:00
2024-06-08 18:16:53 +00:00
# Add pair
try:
session = next(get_session())
session.add(new_pair)
session.commit()
except Exception as err:
logger.warning(f"Could not add new pair: {err}")
# Add alert to queue
logger.info(f"New pair: {token0_address} + {token1_address}")
2024-06-08 20:31:07 +00:00
formatted_alert = format_matrix_alert(
token0={'name': token0_info['name'], 'address': token0_address},
token1={'name': token1_info['name'], 'address': token1_address}
2024-06-07 22:00:14 +00:00
)
2024-06-08 20:31:07 +00:00
self.queue.put(formatted_alert)