7.1 KiB
Worker and task reference
Overview
Background tasks run in a separate ARQ worker process. The worker connects to the same PostgreSQL and Redis instances as the API. Tasks are defined in proxy_pool.worker.tasks_* modules and registered in proxy_pool.worker.settings.
Running the worker
# Development
uv run arq proxy_pool.worker.settings.WorkerSettings
# Docker
docker compose up worker
The worker process is independent of the API process. You can run multiple worker instances, though for most deployments one is sufficient (ARQ handles job deduplication via Redis).
Worker settings
# proxy_pool/worker/settings.py
class WorkerSettings:
functions = [
scrape_source,
scrape_all,
validate_proxy,
revalidate_sweep,
prune_dead_proxies,
prune_old_checks,
expire_leases,
]
cron_jobs = [
cron(scrape_all, minute={0, 30}), # Every 30 minutes
cron(revalidate_sweep, minute={10, 25, 40, 55}), # Every 15 minutes
cron(prune_dead_proxies, hour={3}, minute={0}), # Daily at 3:00 AM
cron(prune_old_checks, hour={4}, minute={0}), # Daily at 4:00 AM
cron(expire_leases, minute=set(range(60))), # Every minute
]
redis_settings = RedisSettings.from_dsn(settings.redis_url)
max_jobs = 50
job_timeout = 300 # 5 minutes
keep_result = 3600 # Keep results for 1 hour
Task definitions
Scrape tasks
scrape_all(ctx)
Periodic task that iterates over all active ProxySource records and enqueues a scrape_source job for each one. Sources whose cron_schedule or parser's default_schedule() indicates they aren't due yet are skipped.
Schedule: Every 30 minutes (configurable).
Behavior: Enqueues individual scrape_source jobs rather than scraping inline. This allows the worker pool to parallelize across sources and provides per-source error isolation.
scrape_source(ctx, source_id: str)
Fetches the URL for a single ProxySource, selects the appropriate SourceParser plugin, parses the content, and upserts discovered proxies.
Steps:
- Load the
ProxySourceby ID. - Fetch the URL via
httpx.AsyncClientwith a configurable timeout (default: 30s). - Look up the parser by
source.parser_namein the plugin registry. - Call
parser.parse(raw_bytes, source)to get a list ofDiscoveredProxy. - Upsert each proxy using
INSERT ... ON CONFLICT (ip, port, protocol) DO UPDATE SET source_id = ?, last_seen_at = now(). - Update
source.last_scraped_at. - Emit
proxy.new_batchevent if new proxies were discovered. - On failure, emit
source.failedevent and log the error.
Error handling: HTTP errors, parse errors, and database errors are caught and logged. The source is not deactivated on failure — transient errors are expected. A separate source.stale event is emitted if a source hasn't produced results in a configurable number of hours.
Timeout: 60 seconds (includes fetch + parse + upsert).
Validation tasks
revalidate_sweep(ctx)
Periodic task that selects proxies due for revalidation and enqueues validate_proxy jobs.
Selection criteria (in priority order):
- Proxies with
status = unchecked(never validated, highest priority). - Proxies with
status = activeandlast_checked_at < now() - interval(stale active proxies). - Proxies with
status = deadandlast_checked_at < now() - longer_interval(periodic dead re-check, lower frequency).
Configurable intervals:
- Active proxy recheck: every 10 minutes (default).
- Dead proxy recheck: every 6 hours (default).
- Batch size per sweep: 200 proxies (default).
Schedule: Every 15 minutes.
validate_proxy(ctx, proxy_id: str)
Runs the full checker pipeline for a single proxy.
Steps:
- Load the
Proxyby ID. - Create a
CheckContextwith a freshhttpx.AsyncClient. - Call
run_checker_pipeline(proxy, registry, http_client, db_session). - The pipeline runs all registered checkers in stage order (see plugin system docs).
- Compute composite score from results.
- Update the proxy record with new status, score, latency, uptime, exit IP, country, anonymity.
- If the proxy transitioned from
activetodeador vice versa, check pool health thresholds and emitproxy.pool_lowif needed.
Timeout: 120 seconds (individual checker timeouts are enforced within the pipeline).
Concurrency: Multiple validate_proxy jobs can run simultaneously. Each job operates on a different proxy, so there are no conflicts. ARQ's job_id parameter is set to validate:{proxy_id} to prevent duplicate validation of the same proxy.
Cleanup tasks
prune_dead_proxies(ctx)
Removes proxies that have been dead for an extended period.
Criteria: status = dead AND last_checked_at < now() - retention_days (default: 30 days).
Behavior: Hard deletes the proxy row. CASCADE deletes remove associated proxy_checks, proxy_tags, and proxy_leases.
Schedule: Daily at 3:00 AM.
prune_old_checks(ctx)
Trims the proxy_checks table to control storage growth.
Strategy: For each proxy, keep the most recent N check records (default: 100) and delete anything older than a retention period (default: 7 days). Both conditions must be met — recent proxies keep all their checks even if they have more than 100, while old checks are always pruned.
Schedule: Daily at 4:00 AM.
expire_leases(ctx)
Cleans up expired proxy leases.
Steps:
- Query
proxy_leases WHERE is_released = false AND expires_at < now(). - For each expired lease, set
is_released = true. - Delete the corresponding Redis lease key (if it still exists — it should have expired via TTL, but this is a safety net).
Schedule: Every minute.
Note: Redis TTL is the primary expiration mechanism. This task is a consistency backstop that ensures the PostgreSQL records are accurate even if Redis keys expire silently.
Task retry behavior
ARQ retries are configured per-task:
| Task | Max retries | Retry delay |
|---|---|---|
scrape_source |
2 | 60s exponential |
validate_proxy |
1 | 30s |
prune_dead_proxies |
0 | — |
prune_old_checks |
0 | — |
expire_leases |
1 | 10s |
Retry delays use exponential backoff. Failed tasks after max retries are logged and the job result is stored in Redis for inspection.
Monitoring
Job results
ARQ stores job results in Redis for keep_result seconds (default: 3600). Query results via:
from arq.connections import ArqRedis
redis = ArqRedis(...)
result = await redis.get_result("job_id")
Health indicators
The GET /stats/pool endpoint includes last_scrape_at and last_validation_at timestamps. If these fall behind schedule, the worker may be down or stuck.
Logging
Tasks log at structured INFO level on start/completion and WARN/ERROR on failures:
INFO scrape_source source_id=abc count_new=23 count_updated=119 duration_ms=1540
WARN scrape_source source_id=def error="HTTP 503" retrying=true attempt=2
ERROR validate_proxy proxy_id=ghi error="Pipeline timeout after 120s"