From 1ab3e7bcfab9e6217a1f83d03f5302806ae6bd03 Mon Sep 17 00:00:00 2001 From: agatha Date: Wed, 12 Jun 2024 21:43:13 -0400 Subject: [PATCH] initial trash commit, this code is baaaaaaad --- .gitignore | 6 ++ README.md | 0 src/db.py | 50 +++++++++++++++++ src/main.py | 68 +++++++++++++++++++++++ src/monitor.py | 129 +++++++++++++++++++++++++++++++++++++++++++ src/requirements.txt | 4 ++ src/rmq.py | 41 ++++++++++++++ src/util.py | 64 +++++++++++++++++++++ 8 files changed, 362 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 src/db.py create mode 100644 src/main.py create mode 100644 src/monitor.py create mode 100644 src/requirements.txt create mode 100644 src/rmq.py create mode 100644 src/util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..37997b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea/ +venv/ +config.json +uniswap.db +__pycache__/ +*.py[cod] diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..ef4219f --- /dev/null +++ b/src/db.py @@ -0,0 +1,50 @@ +from sqlalchemy import create_engine, Column, String, BigInteger, ForeignKey, TIMESTAMP, UniqueConstraint +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, sessionmaker +from datetime import datetime + +Base = declarative_base() + + +class Token(Base): + __tablename__ = 'tokens' + + id = Column(String(42), primary_key=True) + name = Column(String(255), nullable=True) + symbol = Column(String(32), nullable=True) + total_supply = Column(String, nullable=True) + + +class Pair(Base): + __tablename__ = 'pairs' + + id = Column(String(42), primary_key=True) + token0 = Column(String(42), ForeignKey('tokens.id'), nullable=False) + token1 = Column(String(42), ForeignKey('tokens.id'), nullable=False) + created_at = Column(TIMESTAMP, default=datetime.utcnow) + + # Ensuring the combination of token0, token1, and pool_address is unique + __table_args__ = (UniqueConstraint('token0', 'token1', name='_token_pool_uc'),) + + # Relationships + token0_rel = relationship("Token", foreign_keys=[token0]) + token1_rel = relationship("Token", foreign_keys=[token1]) + + +# Engine creation +engine = create_engine('sqlite:///uniswap.db', echo=True) + +# Create tables +Base.metadata.create_all(engine) + +# Create a session factory +SessionLocal = sessionmaker(bind=engine) + + +# Dependency to get a database session +def get_session(): + session = SessionLocal() + try: + yield session + finally: + session.close() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..463754e --- /dev/null +++ b/src/main.py @@ -0,0 +1,68 @@ +import asyncio +import sys +import threading +from queue import Queue +from loguru import logger +from monitor import EventMonitor +from util import load_config +from rmq import RabbitMQProducer + + +def init_logger(): + logger.remove() + logger.add(sys.stdout, level="INFO") + + +async def main(): + config = load_config("config.json") + queue = Queue() + monitor = EventMonitor( + config={ + "infura_url": config['infura_url'], + "pool_address": config['pool_address'], + "etherscan_key": config['etherscan_key'] + }, + queue=queue + ) + monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,)) + monitor_thread.start() + + # RabbitMQ + host = 'localhost' + port = 5672 + exchange = 'matrix_alerts' + producer = RabbitMQProducer(host=host, port=port, exchange=exchange) + + while True: + alert_text = queue.get() + producer.send_event(message={'body': alert_text}) + queue.task_done() + + # try: + # await bot.send_markdown(event) + # await bot.logout() + # queue.task_done() + # except nio.exceptions.LocalProtocolError: + # pass + + +if __name__ == '__main__': + init_logger() + asyncio.get_event_loop().run_until_complete(main()) + + +# def main(): +# host = 'localhost' +# port = 5672 +# exchange = 'matrix_alerts' +# +# producer = RabbitMQProducer(host=host, port=port, exchange=exchange) +# +# message = {'body': 'This is a test of RabbitMQ.'} +# producer.send_event(message=message) +# +# producer.close_connection() +# +# +# if __name__ == '__main__': +# main() diff --git a/src/monitor.py b/src/monitor.py new file mode 100644 index 0000000..45334fe --- /dev/null +++ b/src/monitor.py @@ -0,0 +1,129 @@ +import time +from queue import Queue +from loguru import logger +from web3 import Web3 +from util import fetch_abi, format_matrix_alert +from db import Token, Pair, get_session + + +class EventMonitor: + """Monitor Ethereum event logs""" + def __init__(self, config: dict, queue: Queue): + self.config = config + self.queue = queue + + self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url'])) + + pool_address = self.config['pool_address'] + pool_abi = fetch_abi(pool_address, key=self.config['etherscan_key']) + self.contract = self.web3.eth.contract( + address=pool_address, + abi=pool_abi + ) + self.event_filter = self.contract.events.PairCreated.create_filter(fromBlock='latest') + + def log_loop(self, interval: int): + while True: + try: + if not self._check_connection(): + logger.error("No connection to Ethereum RPC") + continue + for event in self.event_filter.get_new_entries(): + self._handle_event(event) + except Exception as e: + logger.error(e) + + time.sleep(interval) + + def fetch_token_info(self, token_address): + try: + token_abi = fetch_abi(token_address, key=self.config['etherscan_key']) + except Exception as err: + logger.warning(f"Failed to fetch info for {token_address}: {err}") + token_abi = None + + if not token_abi: + return { + 'name': None, + 'symbol': None, + 'total_supply': None + } + + # Create the contract instance + token_contract = self.web3.eth.contract(address=token_address, abi=token_abi) + + # Fetch the name, symbol, and total supply + name = token_contract.functions.name().call() + symbol = token_contract.functions.symbol().call() + total_supply = token_contract.functions.totalSupply().call() + + return { + 'name': name, + 'symbol': symbol, + 'total_supply': str(total_supply) + } + + def _check_connection(self): + if self.web3.is_connected(): + logger.debug("Connected to Ethereum RPC") + return True + else: + logger.warning("Failed to connect to Ethereum RPC") + return False + + def _handle_event(self, event): + # Get token addresses from the event + token0_address = event.args['token0'] + token1_address = event.args['token1'] + pair_address = event.args['pair'] + + # Try to resolve name, symbol, and supply + token0_info = self.fetch_token_info(token0_address) + token1_info = self.fetch_token_info(token1_address) + + # Create SQLAlchemy objects + new_token0 = Token( + id=token0_address, + name=token0_info['name'], + symbol=token0_info['symbol'], + total_supply=token0_info['total_supply'] + ) + new_token1 = Token( + id=token1_address, + name=token1_info['name'], + symbol=token1_info['symbol'], + total_supply=token1_info['total_supply'] + ) + new_pair = Pair(id=pair_address, token0=token0_address, token1=token1_address) + + # Add token0 + try: + session = next(get_session()) + session.add(new_token0) + session.commit() + except Exception as err: + logger.warning(f"Could not add token0: {err}") + + # Add token1 + try: + session = next(get_session()) + session.add(new_token1) + session.commit() + except Exception as err: + logger.warning(f"Could not add token1: {err}") + + # Add pair + try: + session = next(get_session()) + session.add(new_pair) + session.commit() + except Exception as err: + logger.warning(f"Could not add new pair: {err}") + + # Add alert to queue + logger.info(f"New pair: {token0_address} + {token1_address}") + formatted_alert = format_matrix_alert( + token0={'name': token0_info['name'], 'address': token0_address}, + token1={'name': token1_info['name'], 'address': token1_address} + ) + self.queue.put(formatted_alert) diff --git a/src/requirements.txt b/src/requirements.txt new file mode 100644 index 0000000..10082b8 --- /dev/null +++ b/src/requirements.txt @@ -0,0 +1,4 @@ +loguru +web3 +requests +SQLAlchemy \ No newline at end of file diff --git a/src/rmq.py b/src/rmq.py new file mode 100644 index 0000000..779f1d3 --- /dev/null +++ b/src/rmq.py @@ -0,0 +1,41 @@ +import json +import pika +from loguru import logger + + +class RabbitMQProducer: + def __init__(self, host: str, port: int, exchange: str, exchange_type: str = 'fanout'): + self.host = host + self.port = port + self.exchange = exchange + self.exchange_type = exchange_type + + self.connection = None + self.channel = None + self.connect() + + def connect(self): + try: + self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port)) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type) + except Exception as e: + logger.warning(f"Could not connect to RabbitMQ: {e}") + + def send_event(self, message: dict): + if self.connection.is_closed: + self.connect() + + try: + self.channel.basic_publish( + exchange=self.exchange, + routing_key='', # Empty routing key for fanout exchange + body=json.dumps(message), + properties=pika.BasicProperties(content_type='application/json') + ) + except Exception as e: + logger.warning(f"Failed to send message: {e}") + + def close_connection(self): + if self.connection and not self.connection.is_closed: + self.connection.close() diff --git a/src/util.py b/src/util.py new file mode 100644 index 0000000..5fe6ad9 --- /dev/null +++ b/src/util.py @@ -0,0 +1,64 @@ +import json +import requests +from loguru import logger +from typing import Any, Optional, Dict +from web3.types import ChecksumAddress + +STANDARD_ERC20_ABI = [ + { + "constant": True, + "inputs": [], + "name": "name", + "outputs": [{"name": "", "type": "string"}], + "payable": False, + "stateMutability": "view", + "type": "function", + }, + { + "constant": True, + "inputs": [], + "name": "symbol", + "outputs": [{"name": "", "type": "string"}], + "payable": False, + "stateMutability": "view", + "type": "function", + }, + { + "constant": True, + "inputs": [], + "name": "totalSupply", + "outputs": [{"name": "", "type": "uint256"}], + "payable": False, + "stateMutability": "view", + "type": "function", + } +] + + +def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, key: Optional[str] = None) -> Optional[Dict[str, Any]]: + url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}&apikey={key}' + response = requests.get(url, headers=headers, params=params) + response.raise_for_status() + data = response.json() + + # Check for error in the response + if data['status'] == '0': + logger.error(f"Error fetching ABI: {data.get('result')}") + logger.warning(f"Using fallback ABI.") + return STANDARD_ERC20_ABI + + return json.loads(data['result']) + + +def load_config(path: str) -> dict: + with open(path, 'r', encoding='utf-8') as f: + return json.loads(f.read()) + + +def format_matrix_alert(token0: dict, token1: dict) -> str: + return f""" +🚨 New Uniswap Pair Alert 🚨\n +🌐 **Asset Pair:** [{token0['name']}](https://etherscan.io/token/{token0['address']}) + [{token1['name']}](https://etherscan.io/token/{token1['address']})\n +🏠 **Network:** ETH +""" \ No newline at end of file