import json import socket import threading import database from database.models import Agent from loguru import logger from datetime import datetime class Server: def __init__(self, host, port): """ Initializes a new Server instance. This method will initialize a new Server instance. It takes in the host and port to listen on, and will create a new database session and a lock to protect access to the self.commands dictionary. Args: host (str): The host to listen on. port (int): The port to listen on. """ self.host = host self.port = port self.commands = {} self.lock = threading.Lock() self.session = database.init_db() def start(self): """Starts the server. This method will start the server and listen on the specified host and port. It will then accept incoming connections and start a new thread to handle each connection. Raises: Exception: If there is an error starting the server. """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((self.host, self.port)) s.listen() logger.info(f"Listening on {self.host}:{self.port}") while True: conn, addr = s.accept() logger.info(f"Connection from {addr}") threading.Thread(target=self.handle_agent, args=(conn, addr)).start() def handle_agent(self, conn, addr): """Handles a connection from an agent. This method is responsible for handling a connection from an agent. It will either register the agent if it's the first time it's connecting, or update the agent's information. It will then send a command to the agent based on the command registered for that agent. If the agent is new, the command will be to set the agent's id. If the agent is already known, the command will be the last command registered for that agent. Args: conn (socket.socket): The socket object for the connection to the agent. addr (tuple): The address of the agent. Raises: Exception: If there is an error handling the agent. """ try: agent_info = json.loads(conn.recv(1024).decode()) agent_id = agent_info.get("id", None) if not agent_id: agent_id = self.register_agent(agent_info) logger.info(f"Agent {agent_id} registered from {addr}") self.commands[agent_id] = f"set_id {agent_id}" else: self.update_agent(agent_id, agent_info) logger.info(f"Agent {agent_id} connected from {addr}") with self.lock: if agent_id in self.commands: command = self.commands[agent_id] del self.commands[agent_id] else: command = "nop" conn.send(json.dumps({"command": command}).encode()) except Exception as e: logger.error(f"Error handling agent {agent_id}: {e}") finally: conn.close() def register_agent(self, agent_info): """ Registers a new agent with the server. This method will register a new agent with the server. It will create a new Agent instance with the provided agent_info and add it to the database. If the registration is successful, it will return the id of the newly registered agent. If there is an error registering the agent, it will log the error and return None. Args: agent_info (dict): The information about the agent to register. Returns: int or None: The id of the newly registered agent, or None if there is an error. """ session = self.session() try: agent = Agent(**agent_info) session.add(agent) session.commit() agent_id = agent.id except Exception as e: logger.error(f"Error registering agent: {e}") session.rollback() agent_id = None finally: session.close() return agent_id def update_agent(self, agent_id, agent_info): """ Updates an existing agent with the server. This method will update an existing agent with the server. It will query the database for the agent with the provided agent_id, and then update the agent's information with the provided agent_info. If the agent does not exist, it will create a new Agent instance with the provided agent_info and add it to the database. If there is an error updating the agent, it will log the error. Args: agent_id (int): The id of the agent to update. agent_info (dict): The information about the agent to update. Returns: None """ session = self.session() try: agent = session.query(Agent).filter_by(id=agent_id).first() if not agent: agent = Agent(**agent_info) session.add(agent) else: agent_info["last_seen"] = datetime.utcnow() for key, value in agent_info.items(): setattr(agent, key, value) session.commit() except Exception as e: logger.error(f"Error updating agent {agent_id}: {e}") session.rollback() finally: session.close() if __name__ == '__main__': server = Server("0.0.0.0", 9999) server.start()