fix: add concurrency to revalidation sweep and increase job timeout

This commit is contained in:
agatha 2026-03-15 17:05:49 -04:00
parent 4dada38f60
commit 9452399fdf
2 changed files with 20 additions and 6 deletions

View File

@ -41,5 +41,5 @@ class WorkerSettings:
redis_settings = RedisSettings.from_dsn(settings.redis.url)
max_jobs = 50
job_timeout = 300
job_timeout = 600
keep_result = 3600

View File

@ -222,10 +222,22 @@ async def revalidate_sweep(ctx: dict) -> dict:
all_ids = unchecked_ids + stale_active_ids + dead_ids
results = []
for proxy_id in all_ids:
result = await validate_proxy(ctx, proxy_id)
results.append(result)
semaphore = asyncio.Semaphore(10)
async def validate_with_limit(proxy_id: str) -> dict:
async with semaphore:
return await validate_proxy(ctx, proxy_id)
results = await asyncio.gather(
*(validate_with_limit(pid) for pid in all_ids),
return_exceptions=True,
)
completed = [r for r in results if isinstance(r, dict)]
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning("Revalidation had %d errors", len(errors))
logger.info(
"Revalidation sweep: %d unchecked, %d stale active, %d dead recheck",
@ -235,7 +247,9 @@ async def revalidate_sweep(ctx: dict) -> dict:
)
return {
"total": len(results),
"total": len(all_ids),
"completed": len(completed),
"errors": len(errors),
"unchecked": len(unchecked_ids),
"stale_active": len(stale_active_ids),
"dead_recheck": len(dead_ids),