import json import sys import time import threading from web3 import Web3 from loguru import logger from typing import Dict CONFIG_PATH = "config.json" # Set the default logging level to INFO logger.remove() # Remove the default logger logger.add(sys.stdout, level="INFO") # Add a new logger with INFO level class InfuraMonitor: def __init__(self, infura_url: str): self.infura_url = infura_url self.last_block = None self.addresses = [] self.web3 = Web3(Web3.HTTPProvider(self.infura_url)) if self.web3.is_connected(): self.last_block = self.web3.eth.block_number logger.info(f"Connected to Infura. Last block: #{self.last_block}") else: raise Exception("Couldn't connect to Infura") def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]): for block_num in range(start_block, end_block + 1): try: block = self.web3.eth.get_block(block_num, full_transactions=True) logger.debug(f"Fetched block #{block_num}") except Exception as e: logger.error(f"Error fetching block #{block_num}: {e}") continue for tx in block['transactions']: from_address = tx.get('from', None) to_address = tx.get('to', None) if from_address or to_address: from_name = address_names.get(from_address, from_address) to_name = address_names.get(to_address, to_address) if from_address in self.addresses or to_address in self.addresses: logger.info(f"#{block_num}: {from_name} -> {to_name}") logger.debug(f"Transaction details: {tx}") def start_monitor(self, addresses: Dict[str, str]): self.addresses = list(addresses.keys()) while True: try: current_block = self.web3.eth.block_number logger.debug(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}") if current_block > self.last_block: self.check_txs(self.last_block + 1, current_block, addresses) self.last_block = current_block else: logger.debug(f"No new blocks since last check. Last block: #{self.last_block}") except Exception as e: logger.error(f"Error during monitoring: {e}") time.sleep(10) def load_config(path: str) -> dict: try: with open(path, 'r', encoding='utf-8') as f: config = json.load(f) return config except FileNotFoundError: logger.error(f"Configuration file not found: {path}") raise except json.JSONDecodeError: logger.error(f"Error decoding JSON config: {path}") raise def monitor_thread(monitor, addresses): try: monitor.start_monitor(addresses) except Exception as e: logger.error(f"Error in monitoring thread: {e}") def main(): try: config = load_config(CONFIG_PATH) infura_url = config['infura_url'] addresses = { "0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT", "0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEVELOPER WALLET", "0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET", "0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET", "0x8588f0c49849c011d5b5e3318bb0d1fb8534266b": "UNISWAP POOL", } monitor = InfuraMonitor(infura_url) monitor_thread_instance = threading.Thread(target=monitor_thread, args=(monitor, addresses)) monitor_thread_instance.start() logger.info("Press Ctrl+C to stop the monitoring...") try: while monitor_thread_instance.is_alive(): monitor_thread_instance.join(1) # Wait a little for the thread to finish except KeyboardInterrupt: logger.info("Shutting down monitoring due to user interruption.") monitor_thread_instance.join() # Ensure the thread has finished except Exception as e: logger.error(f"Error in main execution: {e}") if __name__ == '__main__': main()