feat: add cleanup tasks for dead proxies, old checks, and expired leases
This commit is contained in:
parent
e0cdf94063
commit
4ea2a2aba8
89
src/proxy_pool/worker/tasks_cleanup.py
Normal file
89
src/proxy_pool/worker/tasks_cleanup.py
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
from sqlalchemy import delete, select
|
||||||
|
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||||
|
|
||||||
|
from proxy_pool.accounts.models import ProxyLease
|
||||||
|
from proxy_pool.proxy.models import Proxy, ProxyCheck, ProxyStatus
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def prune_dead_proxies(ctx: dict) -> dict:
|
||||||
|
"""Remove proxies that have been dead beyond the retention period."""
|
||||||
|
session_factory: async_sessionmaker = ctx["session_factory"]
|
||||||
|
settings = ctx["settings"]
|
||||||
|
|
||||||
|
cutoff = datetime.now() - timedelta(days=settings.cleanup.prune_dead_after_days)
|
||||||
|
|
||||||
|
async with session_factory() as db:
|
||||||
|
result = await db.execute(
|
||||||
|
delete(Proxy).where(
|
||||||
|
Proxy.status == ProxyStatus.DEAD,
|
||||||
|
Proxy.last_checked_at < cutoff,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
count = result.rowcount
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Pruned %d dead proxies older than %d days",
|
||||||
|
count,
|
||||||
|
settings.cleanup.prune_dead_after_days,
|
||||||
|
)
|
||||||
|
return {"pruned": count}
|
||||||
|
|
||||||
|
|
||||||
|
async def prune_old_checks(ctx: dict) -> dict:
|
||||||
|
"""Delete old check history beyond the retention period."""
|
||||||
|
session_factory: async_sessionmaker = ctx["session_factory"]
|
||||||
|
settings = ctx["settings"]
|
||||||
|
|
||||||
|
cutoff = datetime.now() - timedelta(days=settings.cleanup.prune_checks_after_days)
|
||||||
|
|
||||||
|
async with session_factory() as db:
|
||||||
|
result = await db.execute(
|
||||||
|
delete(ProxyCheck).where(ProxyCheck.created_at < cutoff)
|
||||||
|
)
|
||||||
|
count = result.rowcount
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Pruned %d check records older than %d days",
|
||||||
|
count,
|
||||||
|
settings.cleanup.prune_checks_after_days,
|
||||||
|
)
|
||||||
|
return {"pruned": count}
|
||||||
|
|
||||||
|
|
||||||
|
async def expire_leases(ctx: dict) -> dict:
|
||||||
|
"""Mark expired leases as released and clean up Redis keys."""
|
||||||
|
session_factory: async_sessionmaker = ctx["session_factory"]
|
||||||
|
redis = ctx["redis"]
|
||||||
|
settings = ctx["settings"]
|
||||||
|
key_prefix = settings.redis.key_prefix
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
|
||||||
|
async with session_factory() as db:
|
||||||
|
result = await db.execute(
|
||||||
|
select(ProxyLease).where(
|
||||||
|
ProxyLease.is_released.is_(False),
|
||||||
|
ProxyLease.expires_at < now,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
expired = result.scalars().all()
|
||||||
|
|
||||||
|
for lease in expired:
|
||||||
|
lease.is_released = True
|
||||||
|
# Safety net: delete Redis key in case TTL didn't fire
|
||||||
|
await redis.delete(f"{key_prefix}lease:{lease.proxy_id}")
|
||||||
|
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
if expired:
|
||||||
|
logger.info("Expired %d leases", len(expired))
|
||||||
|
return {"expired": len(expired)}
|
||||||
Loading…
x
Reference in New Issue
Block a user