initial commit

This commit is contained in:
agatha 2024-06-02 18:58:01 -04:00
commit a730487c71
9 changed files with 483 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.idea/
venv/
__pycache__
*.py[cod]

2
README.md Normal file
View File

@ -0,0 +1,2 @@
# sandbox
web3 related shit that barely works, mainly to show potential ideas

3
monitor/config.json Normal file
View File

@ -0,0 +1,3 @@
{
"infura_url": "https://mainnet.infura.io/v3/c7ac880c27d9477280ae53b18fc26391"
}

91
monitor/gpt.py Normal file
View File

@ -0,0 +1,91 @@
import json
import time
from web3 import Web3
from loguru import logger
from typing import Dict, List
CONFIG_PATH = "config.json"
class InfuraMonitor:
def __init__(self, infura_url: str):
self.infura_url = infura_url
self.last_block = None
self.addresses = []
self.web3 = Web3(Web3.HTTPProvider(self.infura_url))
if self.web3.is_connected():
self.last_block = self.web3.eth.block_number
logger.info(f"Connected to Infura. Last block: #{self.last_block}")
else:
raise Exception("Couldn't connect to Infura")
def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]):
for block_num in range(start_block, end_block + 1):
try:
block = self.web3.eth.get_block(block_num, full_transactions=True)
except Exception as e:
logger.error(f"Error fetching block #{block_num}: {e}")
continue
for tx in block['transactions']:
from_address = tx['from']
to_address = tx['to']
from_name = address_names.get(from_address, from_address)
to_name = address_names.get(to_address, to_address)
if from_address in self.addresses or to_address in self.addresses:
logger.info(f"Transaction from {from_name} to {to_name} detected in block #{block_num}")
logger.info(f"Transaction details: {tx}")
def start_monitor(self, addresses: Dict[str, str]):
self.addresses = list(addresses.keys())
while True:
try:
current_block = self.web3.eth.block_number
logger.info(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}")
if current_block > self.last_block:
self.check_txs(self.last_block + 1, current_block, addresses)
self.last_block = current_block
else:
logger.info(f"No new blocks since last check. Last block: #{self.last_block}")
except Exception as e:
logger.error(f"Error during monitoring: {e}")
time.sleep(15)
def load_config(path: str) -> dict:
try:
with open(path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
raise
except json.JSONDecodeError:
logger.error(f"Error decoding JSON config: {path}")
raise
def main():
try:
config = load_config(CONFIG_PATH)
infura_url = config['infura_url']
addresses = {
"0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT",
"0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEV WALLET",
"0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET",
"0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET",
}
monitor = InfuraMonitor(infura_url)
monitor.start_monitor(addresses)
except Exception as e:
logger.error(f"Error in main execution: {e}")
if __name__ == '__main__':
main()

119
monitor/gpt2.py Normal file
View File

@ -0,0 +1,119 @@
import json
import sys
import time
import threading
from web3 import Web3
from loguru import logger
from typing import Dict
CONFIG_PATH = "config.json"
# Set the default logging level to INFO
logger.remove() # Remove the default logger
logger.add(sys.stdout, level="INFO") # Add a new logger with INFO level
class InfuraMonitor:
def __init__(self, infura_url: str):
self.infura_url = infura_url
self.last_block = None
self.addresses = []
self.web3 = Web3(Web3.HTTPProvider(self.infura_url))
if self.web3.is_connected():
self.last_block = self.web3.eth.block_number
logger.info(f"Connected to Infura. Last block: #{self.last_block}")
else:
raise Exception("Couldn't connect to Infura")
def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]):
for block_num in range(start_block, end_block + 1):
try:
block = self.web3.eth.get_block(block_num, full_transactions=True)
logger.debug(f"Fetched block #{block_num}")
except Exception as e:
logger.error(f"Error fetching block #{block_num}: {e}")
continue
for tx in block['transactions']:
from_address = tx.get('from', None)
to_address = tx.get('to', None)
if from_address or to_address:
from_name = address_names.get(from_address, from_address)
to_name = address_names.get(to_address, to_address)
if from_address in self.addresses or to_address in self.addresses:
logger.info(f"TX detected in block #{block_num}: {from_name} -> {to_name}")
logger.debug(f"Transaction details: {tx}")
def start_monitor(self, addresses: Dict[str, str]):
self.addresses = list(addresses.keys())
while True:
try:
current_block = self.web3.eth.block_number
logger.debug(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}")
if current_block > self.last_block:
self.check_txs(self.last_block + 1, current_block, addresses)
self.last_block = current_block
else:
logger.debug(f"No new blocks since last check. Last block: #{self.last_block}")
except Exception as e:
logger.error(f"Error during monitoring: {e}")
time.sleep(10)
def load_config(path: str) -> dict:
try:
with open(path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
raise
except json.JSONDecodeError:
logger.error(f"Error decoding JSON config: {path}")
raise
def monitor_thread(monitor, addresses):
try:
monitor.start_monitor(addresses)
except Exception as e:
logger.error(f"Error in monitoring thread: {e}")
def main():
try:
config = load_config(CONFIG_PATH)
infura_url = config['infura_url']
addresses = {
"0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT",
"0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEVELOPER WALLET",
"0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET",
"0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET",
"0x8588f0c49849c011d5b5e3318bb0d1fb8534266b": "UNISWAP POOL",
}
monitor = InfuraMonitor(infura_url)
monitor_thread_instance = threading.Thread(target=monitor_thread, args=(monitor, addresses))
monitor_thread_instance.start()
logger.info("Press Ctrl+C to stop the monitoring...")
try:
while monitor_thread_instance.is_alive():
monitor_thread_instance.join(1) # Wait a little for the thread to finish
except KeyboardInterrupt:
logger.info("Shutting down monitoring due to user interruption.")
monitor_thread_instance.join() # Ensure the thread has finished
except Exception as e:
logger.error(f"Error in main execution: {e}")
if __name__ == '__main__':
main()

62
monitor/main.py Normal file
View File

@ -0,0 +1,62 @@
import json
import time
from web3 import Web3
CONFIG_PATH = "config.json"
class InfuraMonitor:
def __init__(self, infura_url: str):
self.infura_url = infura_url
self.last_block = None
self.addresses = []
self.web3 = Web3(Web3.HTTPProvider(self.infura_url))
if self.web3.is_connected():
self.last_block = self.web3.eth.block_number
print(f"[#{self.last_block}] Connected to Infura")
else:
raise Exception("couldn't connect")
def check_txs(self, start_block, end_block):
for block_num in range(start_block, end_block + 1):
block = self.web3.eth.get_block(block_num, full_transactions=True)
for tx in block['transactions']:
if tx['from'] in self.addresses or tx['to'] in self.addresses:
print(tx)
def start_monitor(self, addresses):
self.addresses = addresses
while True:
current_block = self.web3.eth.block_number
print(f"[#{current_block}] Checking TXes")
self.check_txs(self.last_block, current_block)
time.sleep(10)
self.last_block = current_block
def load_config(path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f:
return json.loads(f.read())
def main():
config = load_config(CONFIG_PATH)
infura_url = config['infura_url']
addresses = [
"0x482702745260Ffd69FC19943f70cFFE2caCd70e9", # Contract address
"0xb41af5ce8c1b86e0204a0bc6625041376c70ba81", # Owner/dev wallet
"0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2", # Marketing wallet
"0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f", # Liquidity wallet
]
monitor = InfuraMonitor(infura_url)
monitor.start_monitor(addresses)
if __name__ == '__main__':
main()

2
monitor/requirements.txt Normal file
View File

@ -0,0 +1,2 @@
web3
loguru

112
staking/gpt.py Normal file
View File

@ -0,0 +1,112 @@
from decimal import Decimal, getcontext
# Set precision context if needed
getcontext().prec = 28
class StakingContract:
def __init__(self):
self.rewards_pct = Decimal('0.6')
self.community_pct = Decimal('0.3')
self.developer_pct = Decimal('0.1')
self.balances = {
"stakes": Decimal('0.0'),
"reward": Decimal('0.0'),
"developer": Decimal('0.0'),
"community": Decimal('0.0'),
}
self.users = {}
def deposit(self, user: str, amount: float):
amount = Decimal(amount) # Convert to Decimal
stake, reward, community, developer = self.calc_fee(amount)
print(f'User {user} staked {amount} ($JENNER)')
self.add_fees_to_balances(stake, reward, community, developer)
self.add_stake_to_user(user, stake)
def withdraw(self, user: str):
if user not in self.users:
print(f"No staked amount found for user {user}.")
return Decimal('0.0')
initial_stake = self.users[user]
reward_share = self.calculate_reward_share(user)
# Update balances
self.balances['stakes'] -= initial_stake
self.balances['reward'] -= reward_share
# Reset user stake to 0
self.users[user] = Decimal('0.0')
# Total amount returned to the user
total_withdrawal = initial_stake + reward_share
print(f'User {user} withdrew {total_withdrawal} ($JENNER)')
return total_withdrawal
def calculate_reward_share(self, user):
total_stakes = sum(self.users.values())
if total_stakes == 0: # Avoid division by zero
return Decimal('0.0')
user_stake = self.users[user]
reward_share = (user_stake / total_stakes) * self.balances['reward']
return reward_share
def calc_fee(self, amount):
fee = amount * Decimal('0.03')
stake_amount = amount - fee
rewards = fee * self.rewards_pct
community = fee * self.community_pct
developer = fee * self.developer_pct
return stake_amount, rewards, community, developer
def show_stats(self):
print('\n--- POOL STATS ---')
for k, v in self.balances.items():
print(f'{k}: {v}')
print('')
print('--- USERS IN POOL ---')
for k, v in self.users.items():
print(f'{k}: {v}')
print('')
def add_stake_to_user(self, user: str, stake: Decimal):
if user in self.users:
self.users[user] += stake
else:
self.users[user] = stake
def add_fees_to_balances(self, stake: Decimal, reward: Decimal, community: Decimal, developer: Decimal):
self.balances['stakes'] += stake
self.balances['reward'] += reward
self.balances['developer'] += developer
self.balances['community'] += community
def main():
pool = StakingContract()
pool.deposit("owner", 1)
pool.deposit("user5", 1000000)
pool.deposit("user1", 1000)
pool.deposit("user2", 200000)
pool.deposit("user3", 90283)
pool.deposit("user4", 1000)
pool.show_stats()
pool.withdraw("user1")
pool.withdraw("user2")
pool.withdraw("user3")
pool.withdraw("user4")
pool.withdraw("user5")
pool.withdraw("owner")
pool.show_stats()
if __name__ == '__main__':
main()

87
staking/stakecalc.py Normal file
View File

@ -0,0 +1,87 @@
"""
CONCEPT HODLJENNER.COM
A community-driven staking project for the Etherum token $JENNER designed to encourage and reward the community by
holding onto the token.
Dedicated $JENNER HODLERS (not holders) can stake some of their $JENNER to the pool. When doing so, 3% of the total
staked amount will be shared amongst the stakers already in the pool.
A staker can exit the pool at any time they wish. They will be given back their INITIAL STAKE (minus fees) plus
their share of the Fee rewards.
"""
class StakingContract:
def __init__(self):
self.rewards_pct = 0.6
self.community_pct = 0.3
self.developer_pct = 0.1
self.balances = {
"stakes": 0.0,
"reward": 0.0,
"developer": 0.0,
"community": 0.0,
}
self.users = {}
def deposit(self, user: str, amount: float):
stake, reward, community, developer = self.calc_fee(amount)
print('--- DEPOSIT ---')
print(f'user address: {user}')
print(f'staked amount: {stake}')
print(f'reward amount: {reward}')
print(f'community amount: {community}')
print(f'developer amount: {developer}')
print('')
self.balances['stakes'] += stake
self.balances['reward'] += reward
self.balances['developer'] += developer
self.balances['community'] += community
if user in self.users.keys():
self.users[user] += stake
else:
self.users[user] = stake
def withdraw(self, user: str):
pass
def calc_fee(self, amount):
fee = amount * 0.03
stake_amount = amount - fee
rewards = fee * self.rewards_pct
community = fee * self.community_pct
developer = fee * self.developer_pct
return stake_amount, rewards, community, developer
def show_stats(self):
print('--- POOL STATS ---')
for k, v in self.balances.items():
print(f'{k}: {v}')
print('')
print('--- USERS IN POOL ---')
for k, v in self.users.items():
print(f'{k}: {v}')
print('')
def main():
pool = StakingContract()
pool.show_stats()
pool.deposit("user1", 1000)
pool.deposit("user2", 200000)
pool.deposit("user3", 90283)
pool.show_stats()
user = pool.withdraw("user1")
pool.show_stats()
if __name__ == '__main__':
main()