proxy-pool/docs/05-worker-tasks.md

186 lines
7.1 KiB
Markdown

# Worker and task reference
## Overview
Background tasks run in a separate ARQ worker process. The worker connects to the same PostgreSQL and Redis instances as the API. Tasks are defined in `proxy_pool.worker.tasks_*` modules and registered in `proxy_pool.worker.settings`.
## Running the worker
```bash
# Development
uv run arq proxy_pool.worker.settings.WorkerSettings
# Docker
docker compose up worker
```
The worker process is independent of the API process. You can run multiple worker instances, though for most deployments one is sufficient (ARQ handles job deduplication via Redis).
## Worker settings
```python
# proxy_pool/worker/settings.py
class WorkerSettings:
functions = [
scrape_source,
scrape_all,
validate_proxy,
revalidate_sweep,
prune_dead_proxies,
prune_old_checks,
expire_leases,
]
cron_jobs = [
cron(scrape_all, minute={0, 30}), # Every 30 minutes
cron(revalidate_sweep, minute={10, 25, 40, 55}), # Every 15 minutes
cron(prune_dead_proxies, hour={3}, minute={0}), # Daily at 3:00 AM
cron(prune_old_checks, hour={4}, minute={0}), # Daily at 4:00 AM
cron(expire_leases, minute=set(range(60))), # Every minute
]
redis_settings = RedisSettings.from_dsn(settings.redis_url)
max_jobs = 50
job_timeout = 300 # 5 minutes
keep_result = 3600 # Keep results for 1 hour
```
## Task definitions
### Scrape tasks
#### `scrape_all(ctx)`
Periodic task that iterates over all active `ProxySource` records and enqueues a `scrape_source` job for each one. Sources whose `cron_schedule` or parser's `default_schedule()` indicates they aren't due yet are skipped.
**Schedule**: Every 30 minutes (configurable).
**Behavior**: Enqueues individual `scrape_source` jobs rather than scraping inline. This allows the worker pool to parallelize across sources and provides per-source error isolation.
#### `scrape_source(ctx, source_id: str)`
Fetches the URL for a single `ProxySource`, selects the appropriate `SourceParser` plugin, parses the content, and upserts discovered proxies.
**Steps**:
1. Load the `ProxySource` by ID.
2. Fetch the URL via `httpx.AsyncClient` with a configurable timeout (default: 30s).
3. Look up the parser by `source.parser_name` in the plugin registry.
4. Call `parser.parse(raw_bytes, source)` to get a list of `DiscoveredProxy`.
5. Upsert each proxy using `INSERT ... ON CONFLICT (ip, port, protocol) DO UPDATE SET source_id = ?, last_seen_at = now()`.
6. Update `source.last_scraped_at`.
7. Emit `proxy.new_batch` event if new proxies were discovered.
8. On failure, emit `source.failed` event and log the error.
**Error handling**: HTTP errors, parse errors, and database errors are caught and logged. The source is not deactivated on failure — transient errors are expected. A separate `source.stale` event is emitted if a source hasn't produced results in a configurable number of hours.
**Timeout**: 60 seconds (includes fetch + parse + upsert).
### Validation tasks
#### `revalidate_sweep(ctx)`
Periodic task that selects proxies due for revalidation and enqueues `validate_proxy` jobs.
**Selection criteria** (in priority order):
1. Proxies with `status = unchecked` (never validated, highest priority).
2. Proxies with `status = active` and `last_checked_at < now() - interval` (stale active proxies).
3. Proxies with `status = dead` and `last_checked_at < now() - longer_interval` (periodic dead re-check, lower frequency).
**Configurable intervals**:
- Active proxy recheck: every 10 minutes (default).
- Dead proxy recheck: every 6 hours (default).
- Batch size per sweep: 200 proxies (default).
**Schedule**: Every 15 minutes.
#### `validate_proxy(ctx, proxy_id: str)`
Runs the full checker pipeline for a single proxy.
**Steps**:
1. Load the `Proxy` by ID.
2. Create a `CheckContext` with a fresh `httpx.AsyncClient`.
3. Call `run_checker_pipeline(proxy, registry, http_client, db_session)`.
4. The pipeline runs all registered checkers in stage order (see plugin system docs).
5. Compute composite score from results.
6. Update the proxy record with new status, score, latency, uptime, exit IP, country, anonymity.
7. If the proxy transitioned from `active` to `dead` or vice versa, check pool health thresholds and emit `proxy.pool_low` if needed.
**Timeout**: 120 seconds (individual checker timeouts are enforced within the pipeline).
**Concurrency**: Multiple `validate_proxy` jobs can run simultaneously. Each job operates on a different proxy, so there are no conflicts. ARQ's `job_id` parameter is set to `validate:{proxy_id}` to prevent duplicate validation of the same proxy.
### Cleanup tasks
#### `prune_dead_proxies(ctx)`
Removes proxies that have been dead for an extended period.
**Criteria**: `status = dead` AND `last_checked_at < now() - retention_days` (default: 30 days).
**Behavior**: Hard deletes the proxy row. CASCADE deletes remove associated `proxy_checks`, `proxy_tags`, and `proxy_leases`.
**Schedule**: Daily at 3:00 AM.
#### `prune_old_checks(ctx)`
Trims the `proxy_checks` table to control storage growth.
**Strategy**: For each proxy, keep the most recent N check records (default: 100) and delete anything older than a retention period (default: 7 days). Both conditions must be met — recent proxies keep all their checks even if they have more than 100, while old checks are always pruned.
**Schedule**: Daily at 4:00 AM.
#### `expire_leases(ctx)`
Cleans up expired proxy leases.
**Steps**:
1. Query `proxy_leases WHERE is_released = false AND expires_at < now()`.
2. For each expired lease, set `is_released = true`.
3. Delete the corresponding Redis lease key (if it still exists — it should have expired via TTL, but this is a safety net).
**Schedule**: Every minute.
**Note**: Redis TTL is the primary expiration mechanism. This task is a consistency backstop that ensures the PostgreSQL records are accurate even if Redis keys expire silently.
## Task retry behavior
ARQ retries are configured per-task:
| Task | Max retries | Retry delay |
|------|-------------|-------------|
| `scrape_source` | 2 | 60s exponential |
| `validate_proxy` | 1 | 30s |
| `prune_dead_proxies` | 0 | — |
| `prune_old_checks` | 0 | — |
| `expire_leases` | 1 | 10s |
Retry delays use exponential backoff. Failed tasks after max retries are logged and the job result is stored in Redis for inspection.
## Monitoring
### Job results
ARQ stores job results in Redis for `keep_result` seconds (default: 3600). Query results via:
```python
from arq.connections import ArqRedis
redis = ArqRedis(...)
result = await redis.get_result("job_id")
```
### Health indicators
The `GET /stats/pool` endpoint includes `last_scrape_at` and `last_validation_at` timestamps. If these fall behind schedule, the worker may be down or stuck.
### Logging
Tasks log at structured INFO level on start/completion and WARN/ERROR on failures:
```
INFO scrape_source source_id=abc count_new=23 count_updated=119 duration_ms=1540
WARN scrape_source source_id=def error="HTTP 503" retrying=true attempt=2
ERROR validate_proxy proxy_id=ghi error="Pipeline timeout after 120s"
```