Compare commits

..

No commits in common. "master" and "dev-db" have entirely different histories.

6 changed files with 14 additions and 94 deletions

View File

@ -3,7 +3,6 @@ Uniswap PairCreation monitor that sends alerts to Matrix.
## Requirements ## Requirements
- Infura API URL - Infura API URL
- Etherscan API key
- Matrix bot account - Matrix bot account
## Configuration ## Configuration
@ -16,7 +15,6 @@ Uniswap PairCreation monitor that sends alerts to Matrix.
"room_id": "" "room_id": ""
}, },
"infura_url": "", "infura_url": "",
"pool_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f", "pool_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"
"etherscan_key": ""
} }
``` ```

16
main.py
View File

@ -11,8 +11,7 @@ from util import load_config
def init_logger(): def init_logger():
logger.remove() logger.add(sys.stdout, level="DEBUG")
logger.add(sys.stdout, level="INFO")
async def main(): async def main():
@ -22,25 +21,17 @@ async def main():
monitor = EventMonitor( monitor = EventMonitor(
config={ config={
"infura_url": config['infura_url'], "infura_url": config['infura_url'],
"pool_address": config['pool_address'], "pool_address": config['pool_address']
"etherscan_key": config['etherscan_key']
}, },
queue=queue queue=queue
) )
monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,)) monitor_thread = threading.Thread(target=monitor.log_loop, args=(15,))
monitor_thread.start() monitor_thread.start()
# Send "online message"
try:
await bot.send_markdown("**MBOT ONLINE**")
await bot.logout()
except nio.exceptions.LocalProtocolError:
pass
while True: while True:
event = queue.get() event = queue.get()
try: try:
await bot.send_markdown(event) await bot.send_message(event)
await bot.logout() await bot.logout()
queue.task_done() queue.task_done()
except nio.exceptions.LocalProtocolError: except nio.exceptions.LocalProtocolError:
@ -48,5 +39,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
init_logger()
asyncio.get_event_loop().run_until_complete(main()) asyncio.get_event_loop().run_until_complete(main())

View File

@ -1,4 +1,3 @@
import markdown
from loguru import logger from loguru import logger
from nio import AsyncClient, LoginResponse from nio import AsyncClient, LoginResponse
@ -29,28 +28,6 @@ class MatrixBot:
) )
logger.info("Message sent") logger.info("Message sent")
async def send_markdown(self, message: str):
if not self.client.access_token:
logged_in = await self.login()
if not logged_in:
return
# Convert message to markdown
html = markdown.markdown(message)
# Send markdown formatted message
await self.client.room_send(
room_id=self.config['room_id'],
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
"format": "org.matrix.custom.html",
"formatted_body": html
}
)
logger.info("Markdown message sent")
async def login(self): async def login(self):
response = await self.client.login( response = await self.client.login(
password=self.config['password'] password=self.config['password']

View File

@ -2,7 +2,7 @@ import time
from queue import Queue from queue import Queue
from loguru import logger from loguru import logger
from web3 import Web3 from web3 import Web3
from util import fetch_abi, format_matrix_alert from util import fetch_abi
from db import Token, Pair, get_session from db import Token, Pair, get_session
@ -15,7 +15,7 @@ class EventMonitor:
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url'])) self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
pool_address = self.config['pool_address'] pool_address = self.config['pool_address']
pool_abi = fetch_abi(pool_address, key=self.config['etherscan_key']) pool_abi = fetch_abi(pool_address)
self.contract = self.web3.eth.contract( self.contract = self.web3.eth.contract(
address=pool_address, address=pool_address,
abi=pool_abi abi=pool_abi
@ -37,12 +37,9 @@ class EventMonitor:
def fetch_token_info(self, token_address): def fetch_token_info(self, token_address):
try: try:
token_abi = fetch_abi(token_address, key=self.config['etherscan_key']) token_abi = fetch_abi(token_address)
except Exception as err: except Exception as err:
logger.warning(f"Failed to fetch info for {token_address}: {err}") logger.warning(f"Failed to fetch info for {token_address}: {err}")
token_abi = None
if not token_abi:
return { return {
'name': None, 'name': None,
'symbol': None, 'symbol': None,
@ -122,8 +119,7 @@ class EventMonitor:
# Add alert to queue # Add alert to queue
logger.info(f"New pair: {token0_address} + {token1_address}") logger.info(f"New pair: {token0_address} + {token1_address}")
formatted_alert = format_matrix_alert( self.queue.put(
token0={'name': token0_info['name'], 'address': token0_address}, f"New Uniswap pair: [{token0_info['name']}](https://etherscan.io/address/{token0_address}) +"
token1={'name': token1_info['name'], 'address': token1_address} f"[{token1_info['name']}](https://etherscan.io/address/{token1_address})"
) )
self.queue.put(formatted_alert)

View File

@ -1,6 +1,4 @@
matrix-nio matrix-nio
loguru loguru
web3 web3
requests
SQLAlchemy SQLAlchemy
markdown

45
util.py
View File

@ -4,40 +4,10 @@ from loguru import logger
from typing import Any, Optional, Dict from typing import Any, Optional, Dict
from web3.types import ChecksumAddress from web3.types import ChecksumAddress
STANDARD_ERC20_ABI = [
{
"constant": True,
"inputs": [],
"name": "name",
"outputs": [{"name": "", "type": "string"}],
"payable": False,
"stateMutability": "view",
"type": "function",
},
{
"constant": True,
"inputs": [],
"name": "symbol",
"outputs": [{"name": "", "type": "string"}],
"payable": False,
"stateMutability": "view",
"type": "function",
},
{
"constant": True,
"inputs": [],
"name": "totalSupply",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function",
}
]
def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None, def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None, key: Optional[str] = None) -> Optional[Dict[str, Any]]: params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}&apikey={key}' url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}'
response = requests.get(url, headers=headers, params=params) response = requests.get(url, headers=headers, params=params)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
@ -45,8 +15,7 @@ def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None
# Check for error in the response # Check for error in the response
if data['status'] == '0': if data['status'] == '0':
logger.error(f"Error fetching ABI: {data.get('result')}") logger.error(f"Error fetching ABI: {data.get('result')}")
logger.warning(f"Using fallback ABI.") return None
return STANDARD_ERC20_ABI
return json.loads(data['result']) return json.loads(data['result'])
@ -54,11 +23,3 @@ def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None
def load_config(path: str) -> dict: def load_config(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f: with open(path, 'r', encoding='utf-8') as f:
return json.loads(f.read()) return json.loads(f.read())
def format_matrix_alert(token0: dict, token1: dict) -> str:
return f"""
🚨 New Uniswap Pair Alert 🚨\n
🌐 **Asset Pair:** [{token0['name']}](https://etherscan.io/token/{token0['address']}) + [{token1['name']}](https://etherscan.io/token/{token1['address']})\n
🏠 **Network:** ETH
"""