diff --git a/src/proxy_pool/worker/settings.py b/src/proxy_pool/worker/settings.py index 726c196..9cd49e2 100644 --- a/src/proxy_pool/worker/settings.py +++ b/src/proxy_pool/worker/settings.py @@ -41,5 +41,5 @@ class WorkerSettings: redis_settings = RedisSettings.from_dsn(settings.redis.url) max_jobs = 50 - job_timeout = 300 + job_timeout = 600 keep_result = 3600 diff --git a/src/proxy_pool/worker/tasks_validate.py b/src/proxy_pool/worker/tasks_validate.py index 3c3f0c1..5b7a444 100644 --- a/src/proxy_pool/worker/tasks_validate.py +++ b/src/proxy_pool/worker/tasks_validate.py @@ -222,10 +222,22 @@ async def revalidate_sweep(ctx: dict) -> dict: all_ids = unchecked_ids + stale_active_ids + dead_ids - results = [] - for proxy_id in all_ids: - result = await validate_proxy(ctx, proxy_id) - results.append(result) + semaphore = asyncio.Semaphore(10) + + async def validate_with_limit(proxy_id: str) -> dict: + async with semaphore: + return await validate_proxy(ctx, proxy_id) + + results = await asyncio.gather( + *(validate_with_limit(pid) for pid in all_ids), + return_exceptions=True, + ) + + completed = [r for r in results if isinstance(r, dict)] + errors = [r for r in results if isinstance(r, Exception)] + + if errors: + logger.warning("Revalidation had %d errors", len(errors)) logger.info( "Revalidation sweep: %d unchecked, %d stale active, %d dead recheck", @@ -235,7 +247,9 @@ async def revalidate_sweep(ctx: dict) -> dict: ) return { - "total": len(results), + "total": len(all_ids), + "completed": len(completed), + "errors": len(errors), "unchecked": len(unchecked_ids), "stale_active": len(stale_active_ids), "dead_recheck": len(dead_ids),