w3sandbox/monitor/main.py
2024-06-02 18:58:01 -04:00

63 lines
1.8 KiB
Python

import json
import time
from web3 import Web3
CONFIG_PATH = "config.json"
class InfuraMonitor:
def __init__(self, infura_url: str):
self.infura_url = infura_url
self.last_block = None
self.addresses = []
self.web3 = Web3(Web3.HTTPProvider(self.infura_url))
if self.web3.is_connected():
self.last_block = self.web3.eth.block_number
print(f"[#{self.last_block}] Connected to Infura")
else:
raise Exception("couldn't connect")
def check_txs(self, start_block, end_block):
for block_num in range(start_block, end_block + 1):
block = self.web3.eth.get_block(block_num, full_transactions=True)
for tx in block['transactions']:
if tx['from'] in self.addresses or tx['to'] in self.addresses:
print(tx)
def start_monitor(self, addresses):
self.addresses = addresses
while True:
current_block = self.web3.eth.block_number
print(f"[#{current_block}] Checking TXes")
self.check_txs(self.last_block, current_block)
time.sleep(10)
self.last_block = current_block
def load_config(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f:
return json.loads(f.read())
def main():
config = load_config(CONFIG_PATH)
infura_url = config['infura_url']
addresses = [
"0x482702745260Ffd69FC19943f70cFFE2caCd70e9", # Contract address
"0xb41af5ce8c1b86e0204a0bc6625041376c70ba81", # Owner/dev wallet
"0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2", # Marketing wallet
"0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f", # Liquidity wallet
]
monitor = InfuraMonitor(infura_url)
monitor.start_monitor(addresses)
if __name__ == '__main__':
main()