diff --git a/src/proxy_pool/worker/tasks_cleanup.py b/src/proxy_pool/worker/tasks_cleanup.py new file mode 100644 index 0000000..e1688a1 --- /dev/null +++ b/src/proxy_pool/worker/tasks_cleanup.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import logging +from datetime import datetime, timedelta + +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import async_sessionmaker + +from proxy_pool.accounts.models import ProxyLease +from proxy_pool.proxy.models import Proxy, ProxyCheck, ProxyStatus + +logger = logging.getLogger(__name__) + + +async def prune_dead_proxies(ctx: dict) -> dict: + """Remove proxies that have been dead beyond the retention period.""" + session_factory: async_sessionmaker = ctx["session_factory"] + settings = ctx["settings"] + + cutoff = datetime.now() - timedelta(days=settings.cleanup.prune_dead_after_days) + + async with session_factory() as db: + result = await db.execute( + delete(Proxy).where( + Proxy.status == ProxyStatus.DEAD, + Proxy.last_checked_at < cutoff, + ) + ) + count = result.rowcount + await db.commit() + + logger.info( + "Pruned %d dead proxies older than %d days", + count, + settings.cleanup.prune_dead_after_days, + ) + return {"pruned": count} + + +async def prune_old_checks(ctx: dict) -> dict: + """Delete old check history beyond the retention period.""" + session_factory: async_sessionmaker = ctx["session_factory"] + settings = ctx["settings"] + + cutoff = datetime.now() - timedelta(days=settings.cleanup.prune_checks_after_days) + + async with session_factory() as db: + result = await db.execute( + delete(ProxyCheck).where(ProxyCheck.created_at < cutoff) + ) + count = result.rowcount + await db.commit() + + logger.info( + "Pruned %d check records older than %d days", + count, + settings.cleanup.prune_checks_after_days, + ) + return {"pruned": count} + + +async def expire_leases(ctx: dict) -> dict: + """Mark expired leases as released and clean up Redis keys.""" + session_factory: async_sessionmaker = ctx["session_factory"] + redis = ctx["redis"] + settings = ctx["settings"] + key_prefix = settings.redis.key_prefix + + now = datetime.now() + + async with session_factory() as db: + result = await db.execute( + select(ProxyLease).where( + ProxyLease.is_released.is_(False), + ProxyLease.expires_at < now, + ) + ) + expired = result.scalars().all() + + for lease in expired: + lease.is_released = True + # Safety net: delete Redis key in case TTL didn't fire + await redis.delete(f"{key_prefix}lease:{lease.proxy_id}") + + await db.commit() + + if expired: + logger.info("Expired %d leases", len(expired)) + return {"expired": len(expired)}