w3sandbox/monitor/gpt2.py

120 lines
4.3 KiB
Python
Raw Normal View History

2024-06-02 22:58:01 +00:00
import json
import sys
import time
import threading
from web3 import Web3
from loguru import logger
from typing import Dict
CONFIG_PATH = "config.json"
# Set the default logging level to INFO
logger.remove() # Remove the default logger
logger.add(sys.stdout, level="INFO") # Add a new logger with INFO level
class InfuraMonitor:
def __init__(self, infura_url: str):
self.infura_url = infura_url
self.last_block = None
self.addresses = []
self.web3 = Web3(Web3.HTTPProvider(self.infura_url))
if self.web3.is_connected():
self.last_block = self.web3.eth.block_number
logger.info(f"Connected to Infura. Last block: #{self.last_block}")
else:
raise Exception("Couldn't connect to Infura")
def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]):
for block_num in range(start_block, end_block + 1):
try:
block = self.web3.eth.get_block(block_num, full_transactions=True)
logger.debug(f"Fetched block #{block_num}")
except Exception as e:
logger.error(f"Error fetching block #{block_num}: {e}")
continue
for tx in block['transactions']:
from_address = tx.get('from', None)
to_address = tx.get('to', None)
if from_address or to_address:
from_name = address_names.get(from_address, from_address)
to_name = address_names.get(to_address, to_address)
if from_address in self.addresses or to_address in self.addresses:
2024-06-03 04:14:43 +00:00
logger.info(f"#{block_num}: {from_name} -> {to_name}")
2024-06-02 22:58:01 +00:00
logger.debug(f"Transaction details: {tx}")
def start_monitor(self, addresses: Dict[str, str]):
self.addresses = list(addresses.keys())
while True:
try:
current_block = self.web3.eth.block_number
logger.debug(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}")
if current_block > self.last_block:
self.check_txs(self.last_block + 1, current_block, addresses)
self.last_block = current_block
else:
logger.debug(f"No new blocks since last check. Last block: #{self.last_block}")
except Exception as e:
logger.error(f"Error during monitoring: {e}")
time.sleep(10)
def load_config(path: str) -> dict:
try:
with open(path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
raise
except json.JSONDecodeError:
logger.error(f"Error decoding JSON config: {path}")
raise
def monitor_thread(monitor, addresses):
try:
monitor.start_monitor(addresses)
except Exception as e:
logger.error(f"Error in monitoring thread: {e}")
def main():
try:
config = load_config(CONFIG_PATH)
infura_url = config['infura_url']
addresses = {
"0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT",
"0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEVELOPER WALLET",
"0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET",
"0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET",
"0x8588f0c49849c011d5b5e3318bb0d1fb8534266b": "UNISWAP POOL",
}
monitor = InfuraMonitor(infura_url)
monitor_thread_instance = threading.Thread(target=monitor_thread, args=(monitor, addresses))
monitor_thread_instance.start()
logger.info("Press Ctrl+C to stop the monitoring...")
try:
while monitor_thread_instance.is_alive():
monitor_thread_instance.join(1) # Wait a little for the thread to finish
except KeyboardInterrupt:
logger.info("Shutting down monitoring due to user interruption.")
monitor_thread_instance.join() # Ensure the thread has finished
except Exception as e:
logger.error(f"Error in main execution: {e}")
if __name__ == '__main__':
main()