Compare commits

..

No commits in common. "7bd73cae08c5f932508a07ff79ccb38bef0d56d7" and "0cfff490b787c28d3ec1f71c86f63c4f64ca6684" have entirely different histories.

7 changed files with 16 additions and 141 deletions

1
.gitignore vendored
View File

@ -2,5 +2,4 @@
venv/ venv/
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*.db
config.json config.json

View File

@ -1,9 +1,5 @@
# mbot # mbot
Uniswap PairCreation monitor that sends alerts to Matrix. Simple Matrix bot that will eventually send Web3 `PairCreation` event alerts to the Juggalol Homeserver.
## Requirements
- Infura API URL
- Matrix bot account
## Configuration ## Configuration
```json ```json
@ -15,6 +11,6 @@ Uniswap PairCreation monitor that sends alerts to Matrix.
"room_id": "" "room_id": ""
}, },
"infura_url": "", "infura_url": "",
"pool_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f" "pool_address": ""
} }
``` ```

49
db.py
View File

@ -1,49 +0,0 @@
from sqlalchemy import create_engine, Column, String, BigInteger, ForeignKey, TIMESTAMP, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime
Base = declarative_base()
class Token(Base):
__tablename__ = 'tokens'
id = Column(String(42), primary_key=True)
name = Column(String(255), nullable=True)
symbol = Column(String(32), nullable=True)
total_supply = Column(String, nullable=True)
class Pair(Base):
__tablename__ = 'pairs'
id = Column(String(42), primary_key=True)
token0 = Column(String(42), ForeignKey('tokens.id'), nullable=False)
token1 = Column(String(42), ForeignKey('tokens.id'), nullable=False)
created_at = Column(TIMESTAMP, default=datetime.utcnow)
# Ensuring the combination of token0, token1, and pool_address is unique
__table_args__ = (UniqueConstraint('token0', 'token1', name='_token_pool_uc'),)
# Relationships
token0_rel = relationship("Token", foreign_keys=[token0])
token1_rel = relationship("Token", foreign_keys=[token1])
# Engine creation
engine = create_engine('sqlite:///uniswap.db', echo=True)
# Create tables
Base.metadata.create_all(engine)
# Create a session factory
SessionLocal = sessionmaker(bind=engine)
# Dependency to get a database session
def get_session():
session = SessionLocal()
try:
yield session
finally:
session.close()

View File

@ -19,11 +19,7 @@ async def main():
queue = Queue() queue = Queue()
bot = MatrixBot(config['matrix']) bot = MatrixBot(config['matrix'])
monitor = EventMonitor( monitor = EventMonitor(
config={ config={"infura_url": config['infura_url'], "pool_address": config['pool_address']},
"infura_url": config['infura_url'],
"pool_address": config['pool_address'],
"etherscan_key": config['etherscan_key']
},
queue=queue queue=queue
) )
monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,)) monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,))

View File

@ -3,7 +3,6 @@ from queue import Queue
from loguru import logger from loguru import logger
from web3 import Web3 from web3 import Web3
from util import fetch_abi from util import fetch_abi
from db import Token, Pair, get_session
class EventMonitor: class EventMonitor:
@ -15,7 +14,7 @@ class EventMonitor:
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url'])) self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
pool_address = self.config['pool_address'] pool_address = self.config['pool_address']
pool_abi = fetch_abi(pool_address, key=self.config['etherscan_key']) pool_abi = fetch_abi(pool_address)
self.contract = self.web3.eth.contract( self.contract = self.web3.eth.contract(
address=pool_address, address=pool_address,
abi=pool_abi abi=pool_abi
@ -35,31 +34,6 @@ class EventMonitor:
time.sleep(interval) time.sleep(interval)
def fetch_token_info(self, token_address):
try:
token_abi = fetch_abi(token_address, key=self.config['etherscan_key'])
except Exception as err:
logger.warning(f"Failed to fetch info for {token_address}: {err}")
return {
'name': None,
'symbol': None,
'total_supply': None
}
# Create the contract instance
token_contract = self.web3.eth.contract(address=token_address, abi=token_abi)
# Fetch the name, symbol, and total supply
name = token_contract.functions.name().call()
symbol = token_contract.functions.symbol().call()
total_supply = token_contract.functions.totalSupply().call()
return {
'name': name,
'symbol': symbol,
'total_supply': str(total_supply)
}
def _check_connection(self): def _check_connection(self):
if self.web3.is_connected(): if self.web3.is_connected():
logger.debug("Connected to Ethereum RPC") logger.debug("Connected to Ethereum RPC")
@ -69,57 +43,16 @@ class EventMonitor:
return False return False
def _handle_event(self, event): def _handle_event(self, event):
# Get token addresses from the event # TODO: Get token names
token0_address = event.args['token0'] # TODO: Resolve token names
token1_address = event.args['token1'] # TODO: Create data structure for the event
pair_address = event.args['pair']
# Try to resolve name, symbol, and supply token0 = event.args['token0']
token0_info = self.fetch_token_info(token0_address) token1 = event.args['token1']
token1_info = self.fetch_token_info(token1_address)
# Create SQLAlchemy objects logger.info(f"New pair: {token0} + {token1}")
new_token0 = Token(
id=token0_address,
name=token0_info['name'],
symbol=token0_info['symbol'],
total_supply=token0_info['total_supply']
)
new_token1 = Token(
id=token1_address,
name=token1_info['name'],
symbol=token1_info['symbol'],
total_supply=token1_info['total_supply']
)
new_pair = Pair(id=pair_address, token0=token0_address, token1=token1_address)
# Add token0
try:
session = next(get_session())
session.add(new_token0)
session.commit()
except Exception as err:
logger.warning(f"Could not add token0: {err}")
# Add token1
try:
session = next(get_session())
session.add(new_token1)
session.commit()
except Exception as err:
logger.warning(f"Could not add token1: {err}")
# Add pair
try:
session = next(get_session())
session.add(new_pair)
session.commit()
except Exception as err:
logger.warning(f"Could not add new pair: {err}")
# Add alert to queue
logger.info(f"New pair: {token0_address} + {token1_address}")
self.queue.put( self.queue.put(
f"New Uniswap pair: [{token0_info['name']}](https://etherscan.io/address/{token0_address}) +" f"New Uniswap pair: https://etherscan.io/address/{token0} + https://etherscan.io/address/{token1}"
f"[{token1_info['name']}](https://etherscan.io/address/{token1_address})"
) )

View File

@ -1,4 +1,3 @@
matrix-nio matrix-nio
loguru loguru
web3 web3
SQLAlchemy

View File

@ -6,8 +6,8 @@ from web3.types import ChecksumAddress
def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None, def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None, key: Optional[str] = None) -> Optional[Dict[str, Any]]: params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}&apikey={key}' url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}'
response = requests.get(url, headers=headers, params=params) response = requests.get(url, headers=headers, params=params)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
@ -23,3 +23,4 @@ def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None
def load_config(path: str) -> dict: def load_config(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f: with open(path, 'r', encoding='utf-8') as f:
return json.loads(f.read()) return json.loads(f.read())