commit a730487c717d2075b18e715e70f8a909945c12a5 Author: agatha Date: Sun Jun 2 18:58:01 2024 -0400 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4145371 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea/ +venv/ + +__pycache__ +*.py[cod] diff --git a/README.md b/README.md new file mode 100644 index 0000000..8913cbb --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# sandbox +web3 related shit that barely works, mainly to show potential ideas diff --git a/monitor/config.json b/monitor/config.json new file mode 100644 index 0000000..097b048 --- /dev/null +++ b/monitor/config.json @@ -0,0 +1,3 @@ +{ + "infura_url": "https://mainnet.infura.io/v3/c7ac880c27d9477280ae53b18fc26391" +} \ No newline at end of file diff --git a/monitor/gpt.py b/monitor/gpt.py new file mode 100644 index 0000000..fa9869c --- /dev/null +++ b/monitor/gpt.py @@ -0,0 +1,91 @@ +import json +import time +from web3 import Web3 +from loguru import logger +from typing import Dict, List + +CONFIG_PATH = "config.json" + + +class InfuraMonitor: + def __init__(self, infura_url: str): + self.infura_url = infura_url + self.last_block = None + self.addresses = [] + + self.web3 = Web3(Web3.HTTPProvider(self.infura_url)) + if self.web3.is_connected(): + self.last_block = self.web3.eth.block_number + logger.info(f"Connected to Infura. Last block: #{self.last_block}") + else: + raise Exception("Couldn't connect to Infura") + + def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]): + for block_num in range(start_block, end_block + 1): + try: + block = self.web3.eth.get_block(block_num, full_transactions=True) + except Exception as e: + logger.error(f"Error fetching block #{block_num}: {e}") + continue + + for tx in block['transactions']: + from_address = tx['from'] + to_address = tx['to'] + from_name = address_names.get(from_address, from_address) + to_name = address_names.get(to_address, to_address) + + if from_address in self.addresses or to_address in self.addresses: + logger.info(f"Transaction from {from_name} to {to_name} detected in block #{block_num}") + logger.info(f"Transaction details: {tx}") + + def start_monitor(self, addresses: Dict[str, str]): + self.addresses = list(addresses.keys()) + + while True: + try: + current_block = self.web3.eth.block_number + logger.info(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}") + if current_block > self.last_block: + self.check_txs(self.last_block + 1, current_block, addresses) + self.last_block = current_block + else: + logger.info(f"No new blocks since last check. Last block: #{self.last_block}") + except Exception as e: + logger.error(f"Error during monitoring: {e}") + + time.sleep(15) + + +def load_config(path: str) -> dict: + try: + with open(path, 'r', encoding='utf-8') as f: + config = json.load(f) + return config + except FileNotFoundError: + logger.error(f"Configuration file not found: {path}") + raise + except json.JSONDecodeError: + logger.error(f"Error decoding JSON config: {path}") + raise + + +def main(): + try: + config = load_config(CONFIG_PATH) + infura_url = config['infura_url'] + + addresses = { + "0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT", + "0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEV WALLET", + "0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET", + "0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET", + } + + monitor = InfuraMonitor(infura_url) + monitor.start_monitor(addresses) + except Exception as e: + logger.error(f"Error in main execution: {e}") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/monitor/gpt2.py b/monitor/gpt2.py new file mode 100644 index 0000000..d01a842 --- /dev/null +++ b/monitor/gpt2.py @@ -0,0 +1,119 @@ +import json +import sys +import time +import threading +from web3 import Web3 +from loguru import logger +from typing import Dict + +CONFIG_PATH = "config.json" + +# Set the default logging level to INFO +logger.remove() # Remove the default logger +logger.add(sys.stdout, level="INFO") # Add a new logger with INFO level + + +class InfuraMonitor: + def __init__(self, infura_url: str): + self.infura_url = infura_url + self.last_block = None + self.addresses = [] + + self.web3 = Web3(Web3.HTTPProvider(self.infura_url)) + if self.web3.is_connected(): + self.last_block = self.web3.eth.block_number + logger.info(f"Connected to Infura. Last block: #{self.last_block}") + else: + raise Exception("Couldn't connect to Infura") + + def check_txs(self, start_block: int, end_block: int, address_names: Dict[str, str]): + for block_num in range(start_block, end_block + 1): + try: + block = self.web3.eth.get_block(block_num, full_transactions=True) + logger.debug(f"Fetched block #{block_num}") + except Exception as e: + logger.error(f"Error fetching block #{block_num}: {e}") + continue + + for tx in block['transactions']: + from_address = tx.get('from', None) + to_address = tx.get('to', None) + + if from_address or to_address: + from_name = address_names.get(from_address, from_address) + to_name = address_names.get(to_address, to_address) + + if from_address in self.addresses or to_address in self.addresses: + logger.info(f"TX detected in block #{block_num}: {from_name} -> {to_name}") + logger.debug(f"Transaction details: {tx}") + + def start_monitor(self, addresses: Dict[str, str]): + self.addresses = list(addresses.keys()) + + while True: + try: + current_block = self.web3.eth.block_number + logger.debug(f"Current block: #{current_block}. Checking transactions from block #{self.last_block + 1} to #{current_block}") + if current_block > self.last_block: + self.check_txs(self.last_block + 1, current_block, addresses) + self.last_block = current_block + else: + logger.debug(f"No new blocks since last check. Last block: #{self.last_block}") + except Exception as e: + logger.error(f"Error during monitoring: {e}") + + time.sleep(10) + + +def load_config(path: str) -> dict: + try: + with open(path, 'r', encoding='utf-8') as f: + config = json.load(f) + return config + except FileNotFoundError: + logger.error(f"Configuration file not found: {path}") + raise + except json.JSONDecodeError: + logger.error(f"Error decoding JSON config: {path}") + raise + + +def monitor_thread(monitor, addresses): + try: + monitor.start_monitor(addresses) + except Exception as e: + logger.error(f"Error in monitoring thread: {e}") + + +def main(): + try: + config = load_config(CONFIG_PATH) + infura_url = config['infura_url'] + + addresses = { + "0x482702745260Ffd69FC19943f70cFFE2caCd70e9": "$JENNER CONTRACT", + "0xb41af5ce8c1b86e0204a0bc6625041376c70ba81": "DEVELOPER WALLET", + "0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2": "MARKETING WALLET", + "0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f": "LIQUIDITY WALLET", + "0x8588f0c49849c011d5b5e3318bb0d1fb8534266b": "UNISWAP POOL", + } + + monitor = InfuraMonitor(infura_url) + + monitor_thread_instance = threading.Thread(target=monitor_thread, args=(monitor, addresses)) + monitor_thread_instance.start() + + logger.info("Press Ctrl+C to stop the monitoring...") + + try: + while monitor_thread_instance.is_alive(): + monitor_thread_instance.join(1) # Wait a little for the thread to finish + except KeyboardInterrupt: + logger.info("Shutting down monitoring due to user interruption.") + monitor_thread_instance.join() # Ensure the thread has finished + except Exception as e: + logger.error(f"Error in main execution: {e}") + + +if __name__ == '__main__': + main() diff --git a/monitor/main.py b/monitor/main.py new file mode 100644 index 0000000..373c490 --- /dev/null +++ b/monitor/main.py @@ -0,0 +1,62 @@ +import json +import time +from web3 import Web3 + +CONFIG_PATH = "config.json" + + +class InfuraMonitor: + def __init__(self, infura_url: str): + self.infura_url = infura_url + self.last_block = None + self.addresses = [] + + self.web3 = Web3(Web3.HTTPProvider(self.infura_url)) + if self.web3.is_connected(): + self.last_block = self.web3.eth.block_number + print(f"[#{self.last_block}] Connected to Infura") + else: + raise Exception("couldn't connect") + + def check_txs(self, start_block, end_block): + for block_num in range(start_block, end_block + 1): + block = self.web3.eth.get_block(block_num, full_transactions=True) + for tx in block['transactions']: + if tx['from'] in self.addresses or tx['to'] in self.addresses: + print(tx) + + def start_monitor(self, addresses): + self.addresses = addresses + + while True: + current_block = self.web3.eth.block_number + print(f"[#{current_block}] Checking TXes") + self.check_txs(self.last_block, current_block) + + time.sleep(10) + + self.last_block = current_block + + +def load_config(path: str) -> dict: + with open(path, 'r', encoding='utf-8') as f: + return json.loads(f.read()) + + +def main(): + config = load_config(CONFIG_PATH) + infura_url = config['infura_url'] + + addresses = [ + "0x482702745260Ffd69FC19943f70cFFE2caCd70e9", # Contract address + "0xb41af5ce8c1b86e0204a0bc6625041376c70ba81", # Owner/dev wallet + "0xC152A863312F4AB4C5B6a52447abe8bFDD741aa2", # Marketing wallet + "0xedc3D54605d6d25cF405f214B56d63b7bCD80d1f", # Liquidity wallet + ] + + monitor = InfuraMonitor(infura_url) + monitor.start_monitor(addresses) + + +if __name__ == '__main__': + main() diff --git a/monitor/requirements.txt b/monitor/requirements.txt new file mode 100644 index 0000000..b76e898 --- /dev/null +++ b/monitor/requirements.txt @@ -0,0 +1,2 @@ +web3 +loguru \ No newline at end of file diff --git a/staking/gpt.py b/staking/gpt.py new file mode 100644 index 0000000..aff5248 --- /dev/null +++ b/staking/gpt.py @@ -0,0 +1,112 @@ +from decimal import Decimal, getcontext + +# Set precision context if needed +getcontext().prec = 28 + +class StakingContract: + def __init__(self): + self.rewards_pct = Decimal('0.6') + self.community_pct = Decimal('0.3') + self.developer_pct = Decimal('0.1') + + self.balances = { + "stakes": Decimal('0.0'), + "reward": Decimal('0.0'), + "developer": Decimal('0.0'), + "community": Decimal('0.0'), + } + + self.users = {} + + def deposit(self, user: str, amount: float): + amount = Decimal(amount) # Convert to Decimal + stake, reward, community, developer = self.calc_fee(amount) + print(f'User {user} staked {amount} ($JENNER)') + + self.add_fees_to_balances(stake, reward, community, developer) + self.add_stake_to_user(user, stake) + + def withdraw(self, user: str): + if user not in self.users: + print(f"No staked amount found for user {user}.") + return Decimal('0.0') + + initial_stake = self.users[user] + reward_share = self.calculate_reward_share(user) + + # Update balances + self.balances['stakes'] -= initial_stake + self.balances['reward'] -= reward_share + + # Reset user stake to 0 + self.users[user] = Decimal('0.0') + + # Total amount returned to the user + total_withdrawal = initial_stake + reward_share + print(f'User {user} withdrew {total_withdrawal} ($JENNER)') + return total_withdrawal + + def calculate_reward_share(self, user): + total_stakes = sum(self.users.values()) + if total_stakes == 0: # Avoid division by zero + return Decimal('0.0') + user_stake = self.users[user] + reward_share = (user_stake / total_stakes) * self.balances['reward'] + return reward_share + + def calc_fee(self, amount): + fee = amount * Decimal('0.03') + stake_amount = amount - fee + + rewards = fee * self.rewards_pct + community = fee * self.community_pct + developer = fee * self.developer_pct + + return stake_amount, rewards, community, developer + + def show_stats(self): + print('\n--- POOL STATS ---') + for k, v in self.balances.items(): + print(f'{k}: {v}') + print('') + print('--- USERS IN POOL ---') + for k, v in self.users.items(): + print(f'{k}: {v}') + print('') + + def add_stake_to_user(self, user: str, stake: Decimal): + if user in self.users: + self.users[user] += stake + else: + self.users[user] = stake + + def add_fees_to_balances(self, stake: Decimal, reward: Decimal, community: Decimal, developer: Decimal): + self.balances['stakes'] += stake + self.balances['reward'] += reward + self.balances['developer'] += developer + self.balances['community'] += community + + +def main(): + pool = StakingContract() + + pool.deposit("owner", 1) + pool.deposit("user5", 1000000) + pool.deposit("user1", 1000) + pool.deposit("user2", 200000) + pool.deposit("user3", 90283) + pool.deposit("user4", 1000) + pool.show_stats() + + pool.withdraw("user1") + pool.withdraw("user2") + pool.withdraw("user3") + pool.withdraw("user4") + pool.withdraw("user5") + pool.withdraw("owner") + + pool.show_stats() + + +if __name__ == '__main__': + main() diff --git a/staking/stakecalc.py b/staking/stakecalc.py new file mode 100644 index 0000000..a8e0b1e --- /dev/null +++ b/staking/stakecalc.py @@ -0,0 +1,87 @@ +""" +CONCEPT HODLJENNER.COM +A community-driven staking project for the Etherum token $JENNER designed to encourage and reward the community by +holding onto the token. + +Dedicated $JENNER HODLERS (not holders) can stake some of their $JENNER to the pool. When doing so, 3% of the total +staked amount will be shared amongst the stakers already in the pool. + +A staker can exit the pool at any time they wish. They will be given back their INITIAL STAKE (minus fees) plus +their share of the Fee rewards. +""" + + +class StakingContract: + def __init__(self): + self.rewards_pct = 0.6 + self.community_pct = 0.3 + self.developer_pct = 0.1 + + self.balances = { + "stakes": 0.0, + "reward": 0.0, + "developer": 0.0, + "community": 0.0, + } + + self.users = {} + + def deposit(self, user: str, amount: float): + stake, reward, community, developer = self.calc_fee(amount) + print('--- DEPOSIT ---') + print(f'user address: {user}') + print(f'staked amount: {stake}') + print(f'reward amount: {reward}') + print(f'community amount: {community}') + print(f'developer amount: {developer}') + print('') + + self.balances['stakes'] += stake + self.balances['reward'] += reward + self.balances['developer'] += developer + self.balances['community'] += community + + if user in self.users.keys(): + self.users[user] += stake + else: + self.users[user] = stake + + def withdraw(self, user: str): + pass + + def calc_fee(self, amount): + fee = amount * 0.03 + stake_amount = amount - fee + + rewards = fee * self.rewards_pct + community = fee * self.community_pct + developer = fee * self.developer_pct + + return stake_amount, rewards, community, developer + + def show_stats(self): + print('--- POOL STATS ---') + for k, v in self.balances.items(): + print(f'{k}: {v}') + print('') + print('--- USERS IN POOL ---') + for k, v in self.users.items(): + print(f'{k}: {v}') + print('') + + +def main(): + pool = StakingContract() + pool.show_stats() + pool.deposit("user1", 1000) + pool.deposit("user2", 200000) + pool.deposit("user3", 90283) + pool.show_stats() + + user = pool.withdraw("user1") + pool.show_stats() + + + +if __name__ == '__main__': + main()