proxy-pool/docs/05-worker-tasks.md

7.1 KiB

Worker and task reference

Overview

Background tasks run in a separate ARQ worker process. The worker connects to the same PostgreSQL and Redis instances as the API. Tasks are defined in proxy_pool.worker.tasks_* modules and registered in proxy_pool.worker.settings.

Running the worker

# Development
uv run arq proxy_pool.worker.settings.WorkerSettings

# Docker
docker compose up worker

The worker process is independent of the API process. You can run multiple worker instances, though for most deployments one is sufficient (ARQ handles job deduplication via Redis).

Worker settings

# proxy_pool/worker/settings.py

class WorkerSettings:
    functions = [
        scrape_source,
        scrape_all,
        validate_proxy,
        revalidate_sweep,
        prune_dead_proxies,
        prune_old_checks,
        expire_leases,
    ]

    cron_jobs = [
        cron(scrape_all,        minute={0, 30}),           # Every 30 minutes
        cron(revalidate_sweep,  minute={10, 25, 40, 55}),  # Every 15 minutes
        cron(prune_dead_proxies, hour={3}, minute={0}),    # Daily at 3:00 AM
        cron(prune_old_checks,  hour={4}, minute={0}),     # Daily at 4:00 AM
        cron(expire_leases,     minute=set(range(60))),     # Every minute
    ]

    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    max_jobs = 50
    job_timeout = 300  # 5 minutes
    keep_result = 3600  # Keep results for 1 hour

Task definitions

Scrape tasks

scrape_all(ctx)

Periodic task that iterates over all active ProxySource records and enqueues a scrape_source job for each one. Sources whose cron_schedule or parser's default_schedule() indicates they aren't due yet are skipped.

Schedule: Every 30 minutes (configurable).

Behavior: Enqueues individual scrape_source jobs rather than scraping inline. This allows the worker pool to parallelize across sources and provides per-source error isolation.

scrape_source(ctx, source_id: str)

Fetches the URL for a single ProxySource, selects the appropriate SourceParser plugin, parses the content, and upserts discovered proxies.

Steps:

  1. Load the ProxySource by ID.
  2. Fetch the URL via httpx.AsyncClient with a configurable timeout (default: 30s).
  3. Look up the parser by source.parser_name in the plugin registry.
  4. Call parser.parse(raw_bytes, source) to get a list of DiscoveredProxy.
  5. Upsert each proxy using INSERT ... ON CONFLICT (ip, port, protocol) DO UPDATE SET source_id = ?, last_seen_at = now().
  6. Update source.last_scraped_at.
  7. Emit proxy.new_batch event if new proxies were discovered.
  8. On failure, emit source.failed event and log the error.

Error handling: HTTP errors, parse errors, and database errors are caught and logged. The source is not deactivated on failure — transient errors are expected. A separate source.stale event is emitted if a source hasn't produced results in a configurable number of hours.

Timeout: 60 seconds (includes fetch + parse + upsert).

Validation tasks

revalidate_sweep(ctx)

Periodic task that selects proxies due for revalidation and enqueues validate_proxy jobs.

Selection criteria (in priority order):

  1. Proxies with status = unchecked (never validated, highest priority).
  2. Proxies with status = active and last_checked_at < now() - interval (stale active proxies).
  3. Proxies with status = dead and last_checked_at < now() - longer_interval (periodic dead re-check, lower frequency).

Configurable intervals:

  • Active proxy recheck: every 10 minutes (default).
  • Dead proxy recheck: every 6 hours (default).
  • Batch size per sweep: 200 proxies (default).

Schedule: Every 15 minutes.

validate_proxy(ctx, proxy_id: str)

Runs the full checker pipeline for a single proxy.

Steps:

  1. Load the Proxy by ID.
  2. Create a CheckContext with a fresh httpx.AsyncClient.
  3. Call run_checker_pipeline(proxy, registry, http_client, db_session).
  4. The pipeline runs all registered checkers in stage order (see plugin system docs).
  5. Compute composite score from results.
  6. Update the proxy record with new status, score, latency, uptime, exit IP, country, anonymity.
  7. If the proxy transitioned from active to dead or vice versa, check pool health thresholds and emit proxy.pool_low if needed.

Timeout: 120 seconds (individual checker timeouts are enforced within the pipeline).

Concurrency: Multiple validate_proxy jobs can run simultaneously. Each job operates on a different proxy, so there are no conflicts. ARQ's job_id parameter is set to validate:{proxy_id} to prevent duplicate validation of the same proxy.

Cleanup tasks

prune_dead_proxies(ctx)

Removes proxies that have been dead for an extended period.

Criteria: status = dead AND last_checked_at < now() - retention_days (default: 30 days).

Behavior: Hard deletes the proxy row. CASCADE deletes remove associated proxy_checks, proxy_tags, and proxy_leases.

Schedule: Daily at 3:00 AM.

prune_old_checks(ctx)

Trims the proxy_checks table to control storage growth.

Strategy: For each proxy, keep the most recent N check records (default: 100) and delete anything older than a retention period (default: 7 days). Both conditions must be met — recent proxies keep all their checks even if they have more than 100, while old checks are always pruned.

Schedule: Daily at 4:00 AM.

expire_leases(ctx)

Cleans up expired proxy leases.

Steps:

  1. Query proxy_leases WHERE is_released = false AND expires_at < now().
  2. For each expired lease, set is_released = true.
  3. Delete the corresponding Redis lease key (if it still exists — it should have expired via TTL, but this is a safety net).

Schedule: Every minute.

Note: Redis TTL is the primary expiration mechanism. This task is a consistency backstop that ensures the PostgreSQL records are accurate even if Redis keys expire silently.

Task retry behavior

ARQ retries are configured per-task:

Task Max retries Retry delay
scrape_source 2 60s exponential
validate_proxy 1 30s
prune_dead_proxies 0
prune_old_checks 0
expire_leases 1 10s

Retry delays use exponential backoff. Failed tasks after max retries are logged and the job result is stored in Redis for inspection.

Monitoring

Job results

ARQ stores job results in Redis for keep_result seconds (default: 3600). Query results via:

from arq.connections import ArqRedis
redis = ArqRedis(...)
result = await redis.get_result("job_id")

Health indicators

The GET /stats/pool endpoint includes last_scrape_at and last_validation_at timestamps. If these fall behind schedule, the worker may be down or stuck.

Logging

Tasks log at structured INFO level on start/completion and WARN/ERROR on failures:

INFO  scrape_source source_id=abc count_new=23 count_updated=119 duration_ms=1540
WARN  scrape_source source_id=def error="HTTP 503" retrying=true attempt=2
ERROR validate_proxy proxy_id=ghi error="Pipeline timeout after 120s"