add etherscan api key as optional parameter
This commit is contained in:
parent
280a5f830f
commit
7bd73cae08
3
main.py
3
main.py
@ -21,7 +21,8 @@ async def main():
|
|||||||
monitor = EventMonitor(
|
monitor = EventMonitor(
|
||||||
config={
|
config={
|
||||||
"infura_url": config['infura_url'],
|
"infura_url": config['infura_url'],
|
||||||
"pool_address": config['pool_address']
|
"pool_address": config['pool_address'],
|
||||||
|
"etherscan_key": config['etherscan_key']
|
||||||
},
|
},
|
||||||
queue=queue
|
queue=queue
|
||||||
)
|
)
|
||||||
|
@ -15,7 +15,7 @@ class EventMonitor:
|
|||||||
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
|
self.web3 = Web3(Web3.HTTPProvider(self.config['infura_url']))
|
||||||
|
|
||||||
pool_address = self.config['pool_address']
|
pool_address = self.config['pool_address']
|
||||||
pool_abi = fetch_abi(pool_address)
|
pool_abi = fetch_abi(pool_address, key=self.config['etherscan_key'])
|
||||||
self.contract = self.web3.eth.contract(
|
self.contract = self.web3.eth.contract(
|
||||||
address=pool_address,
|
address=pool_address,
|
||||||
abi=pool_abi
|
abi=pool_abi
|
||||||
@ -37,7 +37,7 @@ class EventMonitor:
|
|||||||
|
|
||||||
def fetch_token_info(self, token_address):
|
def fetch_token_info(self, token_address):
|
||||||
try:
|
try:
|
||||||
token_abi = fetch_abi(token_address)
|
token_abi = fetch_abi(token_address, key=self.config['etherscan_key'])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.warning(f"Failed to fetch info for {token_address}: {err}")
|
logger.warning(f"Failed to fetch info for {token_address}: {err}")
|
||||||
return {
|
return {
|
||||||
|
4
util.py
4
util.py
@ -6,8 +6,8 @@ from web3.types import ChecksumAddress
|
|||||||
|
|
||||||
|
|
||||||
def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None,
|
def fetch_abi(address: ChecksumAddress, headers: Optional[Dict[str, Any]] = None,
|
||||||
params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
|
params: Optional[Dict[str, Any]] = None, key: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
||||||
url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}'
|
url = f'https://api.etherscan.io/api?module=contract&action=getabi&address={address}&apikey={key}'
|
||||||
response = requests.get(url, headers=headers, params=params)
|
response = requests.get(url, headers=headers, params=params)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
data = response.json()
|
data = response.json()
|
||||||
|
Loading…
Reference in New Issue
Block a user