120 lines
4.3 KiB
Python
120 lines
4.3 KiB
Python
import json
|
|
import sys
|
|
import time
|
|
import threading
|
|
from web3 import Web3
|
|
from loguru import logger
|
|
from typing import Dict
|
|
|
|
CONFIG_PATH = "config.json"
|
|
|
|
# Set the default logging level to INFO
|
|
logger.remove() # Remove the default logger
|
|
logger.add(sys.stdout, level="INFO") # Add a new logger with INFO level
|
|
|
|
|
|
class InfuraMonitor:
|
|
def __init__(self, infura_url: str):
|
|
self.infura_url = infura_url
|
|
self.last_block = None
|
|
self.addresses = []
|
|
|
|
self.web3 = Web3(Web3.HTTPProvider(self.infura_url))
|
|
if self.web3.is_connected():
|
|
self.last_block = self.web3.eth.block_number
|
|
logger.info(f"Connected to Infura. Last block: #{self.last_block}")
|
|
else:
|
|
raise Exception("Couldn't connect to Infura")
|
|
|
|
def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]):
|
|
for block_num in range(start_block, end_block + 1):
|
|
try:
|
|
block = self.web3.eth.get_block(block_num, full_transactions=True)
|
|
logger.debug(f"Fetched block #{block_num}")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching block #{block_num}: {e}")
|
|
continue
|
|
|
|
for tx in block['transactions']:
|
|
from_address = tx.get('from', None)
|
|
to_address = tx.get('to', None)
|
|
|
|
if from_address or to_address:
|
|
from_name = address_names.get(from_address, from_address)
|
|
to_name = address_names.get(to_address, to_address)
|
|
|
|
if from_address in self.addresses or to_address in self.addresses:
|
|
logger.info(f"#{block_num}: {from_name} -> {to_name}")
|
|
logger.debug(f"Transaction details: {tx}")
|
|
|
|
def start_monitor(self, addresses: Dict[str, str]):
|
|
self.addresses = list(addresses.keys())
|
|
|
|
while True:
|
|
try:
|
|
current_block = self.web3.eth.block_number
|
|
logger.debug(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}")
|
|
if current_block > self.last_block:
|
|
self.check_txs(self.last_block + 1, current_block, addresses)
|
|
self.last_block = current_block
|
|
else:
|
|
logger.debug(f"No new blocks since last check. Last block: #{self.last_block}")
|
|
except Exception as e:
|
|
logger.error(f"Error during monitoring: {e}")
|
|
|
|
time.sleep(10)
|
|
|
|
|
|
def load_config(path: str) -> dict:
|
|
try:
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
return config
|
|
except FileNotFoundError:
|
|
logger.error(f"Configuration file not found: {path}")
|
|
raise
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Error decoding JSON config: {path}")
|
|
raise
|
|
|
|
|
|
def monitor_thread(monitor, addresses):
|
|
try:
|
|
monitor.start_monitor(addresses)
|
|
except Exception as e:
|
|
logger.error(f"Error in monitoring thread: {e}")
|
|
|
|
|
|
def main():
|
|
try:
|
|
config = load_config(CONFIG_PATH)
|
|
infura_url = config['infura_url']
|
|
|
|
addresses = {
|
|
"0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT",
|
|
"0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEVELOPER WALLET",
|
|
"0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET",
|
|
"0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET",
|
|
"0x8588f0c49849c011d5b5e3318bb0d1fb8534266b": "UNISWAP POOL",
|
|
}
|
|
|
|
monitor = InfuraMonitor(infura_url)
|
|
|
|
monitor_thread_instance = threading.Thread(target=monitor_thread, args=(monitor, addresses))
|
|
monitor_thread_instance.start()
|
|
|
|
logger.info("Press Ctrl+C to stop the monitoring...")
|
|
|
|
try:
|
|
while monitor_thread_instance.is_alive():
|
|
monitor_thread_instance.join(1) # Wait a little for the thread to finish
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down monitoring due to user interruption.")
|
|
monitor_thread_instance.join() # Ensure the thread has finished
|
|
except Exception as e:
|
|
logger.error(f"Error in main execution: {e}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|