add event monitor

This commit is contained in:
agatha 2024-06-07 18:00:14 -04:00
parent bd57db05dc
commit fc1e2386c3
5 changed files with 113 additions and 8 deletions

30
main.py
View File

@ -1,20 +1,38 @@
"""Matrix Bot Framework""" """Matrix Bot Framework"""
import asyncio import asyncio
import json import nio.exceptions
import sys
import threading
from queue import Queue
from loguru import logger
from matrix import MatrixBot from matrix import MatrixBot
from monitor import EventMonitor
from util import load_config
def load_config(path: str) -> dict: def init_logger():
with open(path, 'r', encoding='utf-8') as f: logger.add(sys.stdout, level="DEBUG")
return json.loads(f.read())
async def main(): async def main():
config = load_config("config.json") config = load_config("config.json")
queue = Queue()
bot = MatrixBot(config['matrix']) bot = MatrixBot(config['matrix'])
await bot.send_message("beep boop, i'm a bot") monitor = EventMonitor(
config={"infura_url": config['infura_url'], "pool_address": config['pool_address']},
queue=queue
)
monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,))
monitor_thread.start()
while True:
event = queue.get()
try:
await bot.send_message(event)
await bot.logout() await bot.logout()
queue.task_done()
except nio.exceptions.LocalProtocolError:
pass
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -36,6 +36,8 @@ class MatrixBot:
logger.info(f"Logged in as {self.config['username']}") logger.info(f"Logged in as {self.config['username']}")
else: else:
logger.error(f"Failed to login as {self.config['username']}: {response}") logger.error(f"Failed to login as {self.config['username']}: {response}")
logger.error("Closing nio session")
await self.client.close()
return False return False
return True return True

58
monitor.py Normal file
View File

@ -0,0 +1,58 @@
import time
from queue import Queue
from loguru import logger
from web3 import Web3
from util import fetch_abi
class EventMonitor:
"""Monitor Ethereum event logs"""
def __init__(self, config: dict, queue: Queue):
self.config = config
self.queue = queue
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
pool_address = self.config['pool_address']
pool_abi = fetch_abi(pool_address)
self.contract = self.web3.eth.contract(
address=pool_address,
abi=pool_abi
)
self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest')
def log_loop(self, interval: int):
while True:
try:
if not self._check_connection():
logger.error("No connection to Ethereum RPC")
continue
for event in self.event_filter.get_new_entries():
self._handle_event(event)
except Exception as e:
logger.error(e)
time.sleep(interval)
def _check_connection(self):
if self.web3.is_connected():
logger.debug("Connected to Ethereum RPC")
return True
else:
logger.warning("Failed to connect to Ethereum RPC")
return False
def _handle_event(self, event):
# TODO: Get token names
# TODO: Resolve token names
# TODO: Create data structure for the event
token0 = event.args['token0']
token1 = event.args['token1']
logger.info(f"New pair: {token0} + {token1}")
self.queue.put(
f"New Uniswap pair: https://etherscan.io/address/{token0} + https://etherscan.io/address/{token1}"
)

View File

@ -1,2 +1,3 @@
matrix-nio matrix-nio
loguru loguru
web3

26
util.py Normal file
View File

@ -0,0 +1,26 @@
import json
import requests
from loguru import logger
from typing import Any, Optional, Dict
from web3.types import ChecksumAddress
def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}'
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
# Check for error in the response
if data['status'] == '0':
logger.error(f"Error fetching ABI: {data.get('result')}")
return None
return json.loads(data['result'])
def load_config(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f:
return json.loads(f.read())