diff --git a/main.py b/main.py index dbe6d74..af2c763 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,38 @@ """Matrix Bot Framework""" import asyncio -import json - +import nio.exceptions +import sys +import threading +from queue import Queue +from loguru import logger from matrix import MatrixBot +from monitor import EventMonitor +from util import load_config -def load_config(path: str) -> dict: - with open(path, 'r', encoding='utf-8') as f: - return json.loads(f.read()) +def init_logger(): + logger.add(sys.stdout, level="DEBUG") async def main(): config = load_config("config.json") + queue = Queue() bot = MatrixBot(config['matrix']) - await bot.send_message("beep boop, i'm a bot") - await bot.logout() + monitor = EventMonitor( + config={"infura_url": config['infura_url'], "pool_address": config['pool_address']}, + queue=queue + ) + monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,)) + monitor_thread.start() + + while True: + event = queue.get() + try: + await bot.send_message(event) + await bot.logout() + queue.task_done() + except nio.exceptions.LocalProtocolError: + pass if __name__ == '__main__': diff --git a/matrix.py b/matrix.py index af7e036..765b956 100644 --- a/matrix.py +++ b/matrix.py @@ -36,6 +36,8 @@ class MatrixBot: logger.info(f"Logged in as {self.config['username']}") else: logger.error(f"Failed to login as {self.config['username']}: {response}") + logger.error("Closing nio session") + await self.client.close() return False return True diff --git a/monitor.py b/monitor.py new file mode 100644 index 0000000..f5af22c --- /dev/null +++ b/monitor.py @@ -0,0 +1,58 @@ +import time +from queue import Queue +from loguru import logger +from web3 import Web3 +from util import fetch_abi + + +class EventMonitor: + """Monitor Ethereum event logs""" + def __init__(self, config: dict, queue: Queue): + self.config = config + self.queue = queue + + self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url'])) + + pool_address = self.config['pool_address'] + pool_abi = fetch_abi(pool_address) + self.contract = self.web3.eth.contract( + address=pool_address, + abi=pool_abi + ) + self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest') + + def log_loop(self, interval: int): + while True: + try: + if not self._check_connection(): + logger.error("No connection to Ethereum RPC") + continue + for event in self.event_filter.get_new_entries(): + self._handle_event(event) + except Exception as e: + logger.error(e) + + time.sleep(interval) + + def _check_connection(self): + if self.web3.is_connected(): + logger.debug("Connected to Ethereum RPC") + return True + else: + logger.warning("Failed to connect to Ethereum RPC") + return False + + def _handle_event(self, event): + # TODO: Get token names + # TODO: Resolve token names + # TODO: Create data structure for the event + + token0 = event.args['token0'] + token1 = event.args['token1'] + + logger.info(f"New pair: {token0} + {token1}") + + self.queue.put( + f"New Uniswap pair: https://etherscan.io/address/{token0} + https://etherscan.io/address/{token1}" + ) + diff --git a/requirements.txt b/requirements.txt index 855170c..6076af8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ matrix-nio -loguru \ No newline at end of file +loguru +web3 \ No newline at end of file diff --git a/util.py b/util.py new file mode 100644 index 0000000..9dca619 --- /dev/null +++ b/util.py @@ -0,0 +1,26 @@ +import json +import requests +from loguru import logger +from typing import Any, Optional, Dict +from web3.types import ChecksumAddress + + +def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: + url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}' + response = requests.get(url, headers=headers, params=params) + response.raise_for_status() + data = response.json() + + # Check for error in the response + if data['status'] == '0': + logger.error(f"Error fetching ABI: {data.get('result')}") + return None + + return json.loads(data['result']) + + +def load_config(path: str) -> dict: + with open(path, 'r', encoding='utf-8') as f: + return json.loads(f.read()) +