186 lines
7.1 KiB
Markdown
186 lines
7.1 KiB
Markdown
# Worker and task reference
|
|
|
|
## Overview
|
|
|
|
Background tasks run in a separate ARQ worker process. The worker connects to the same PostgreSQL and Redis instances as the API. Tasks are defined in `proxy_pool.worker.tasks_*` modules and registered in `proxy_pool.worker.settings`.
|
|
|
|
## Running the worker
|
|
|
|
```bash
|
|
# Development
|
|
uv run arq proxy_pool.worker.settings.WorkerSettings
|
|
|
|
# Docker
|
|
docker compose up worker
|
|
```
|
|
|
|
The worker process is independent of the API process. You can run multiple worker instances, though for most deployments one is sufficient (ARQ handles job deduplication via Redis).
|
|
|
|
## Worker settings
|
|
|
|
```python
|
|
# proxy_pool/worker/settings.py
|
|
|
|
class WorkerSettings:
|
|
functions = [
|
|
scrape_source,
|
|
scrape_all,
|
|
validate_proxy,
|
|
revalidate_sweep,
|
|
prune_dead_proxies,
|
|
prune_old_checks,
|
|
expire_leases,
|
|
]
|
|
|
|
cron_jobs = [
|
|
cron(scrape_all, minute={0, 30}), # Every 30 minutes
|
|
cron(revalidate_sweep, minute={10, 25, 40, 55}), # Every 15 minutes
|
|
cron(prune_dead_proxies, hour={3}, minute={0}), # Daily at 3:00 AM
|
|
cron(prune_old_checks, hour={4}, minute={0}), # Daily at 4:00 AM
|
|
cron(expire_leases, minute=set(range(60))), # Every minute
|
|
]
|
|
|
|
redis_settings = RedisSettings.from_dsn(settings.redis_url)
|
|
max_jobs = 50
|
|
job_timeout = 300 # 5 minutes
|
|
keep_result = 3600 # Keep results for 1 hour
|
|
```
|
|
|
|
## Task definitions
|
|
|
|
### Scrape tasks
|
|
|
|
#### `scrape_all(ctx)`
|
|
|
|
Periodic task that iterates over all active `ProxySource` records and enqueues a `scrape_source` job for each one. Sources whose `cron_schedule` or parser's `default_schedule()` indicates they aren't due yet are skipped.
|
|
|
|
**Schedule**: Every 30 minutes (configurable).
|
|
|
|
**Behavior**: Enqueues individual `scrape_source` jobs rather than scraping inline. This allows the worker pool to parallelize across sources and provides per-source error isolation.
|
|
|
|
#### `scrape_source(ctx, source_id: str)`
|
|
|
|
Fetches the URL for a single `ProxySource`, selects the appropriate `SourceParser` plugin, parses the content, and upserts discovered proxies.
|
|
|
|
**Steps**:
|
|
1. Load the `ProxySource` by ID.
|
|
2. Fetch the URL via `httpx.AsyncClient` with a configurable timeout (default: 30s).
|
|
3. Look up the parser by `source.parser_name` in the plugin registry.
|
|
4. Call `parser.parse(raw_bytes, source)` to get a list of `DiscoveredProxy`.
|
|
5. Upsert each proxy using `INSERT ... ON CONFLICT (ip, port, protocol) DO UPDATE SET source_id = ?, last_seen_at = now()`.
|
|
6. Update `source.last_scraped_at`.
|
|
7. Emit `proxy.new_batch` event if new proxies were discovered.
|
|
8. On failure, emit `source.failed` event and log the error.
|
|
|
|
**Error handling**: HTTP errors, parse errors, and database errors are caught and logged. The source is not deactivated on failure — transient errors are expected. A separate `source.stale` event is emitted if a source hasn't produced results in a configurable number of hours.
|
|
|
|
**Timeout**: 60 seconds (includes fetch + parse + upsert).
|
|
|
|
### Validation tasks
|
|
|
|
#### `revalidate_sweep(ctx)`
|
|
|
|
Periodic task that selects proxies due for revalidation and enqueues `validate_proxy` jobs.
|
|
|
|
**Selection criteria** (in priority order):
|
|
1. Proxies with `status = unchecked` (never validated, highest priority).
|
|
2. Proxies with `status = active` and `last_checked_at < now() - interval` (stale active proxies).
|
|
3. Proxies with `status = dead` and `last_checked_at < now() - longer_interval` (periodic dead re-check, lower frequency).
|
|
|
|
**Configurable intervals**:
|
|
- Active proxy recheck: every 10 minutes (default).
|
|
- Dead proxy recheck: every 6 hours (default).
|
|
- Batch size per sweep: 200 proxies (default).
|
|
|
|
**Schedule**: Every 15 minutes.
|
|
|
|
#### `validate_proxy(ctx, proxy_id: str)`
|
|
|
|
Runs the full checker pipeline for a single proxy.
|
|
|
|
**Steps**:
|
|
1. Load the `Proxy` by ID.
|
|
2. Create a `CheckContext` with a fresh `httpx.AsyncClient`.
|
|
3. Call `run_checker_pipeline(proxy, registry, http_client, db_session)`.
|
|
4. The pipeline runs all registered checkers in stage order (see plugin system docs).
|
|
5. Compute composite score from results.
|
|
6. Update the proxy record with new status, score, latency, uptime, exit IP, country, anonymity.
|
|
7. If the proxy transitioned from `active` to `dead` or vice versa, check pool health thresholds and emit `proxy.pool_low` if needed.
|
|
|
|
**Timeout**: 120 seconds (individual checker timeouts are enforced within the pipeline).
|
|
|
|
**Concurrency**: Multiple `validate_proxy` jobs can run simultaneously. Each job operates on a different proxy, so there are no conflicts. ARQ's `job_id` parameter is set to `validate:{proxy_id}` to prevent duplicate validation of the same proxy.
|
|
|
|
### Cleanup tasks
|
|
|
|
#### `prune_dead_proxies(ctx)`
|
|
|
|
Removes proxies that have been dead for an extended period.
|
|
|
|
**Criteria**: `status = dead` AND `last_checked_at < now() - retention_days` (default: 30 days).
|
|
|
|
**Behavior**: Hard deletes the proxy row. CASCADE deletes remove associated `proxy_checks`, `proxy_tags`, and `proxy_leases`.
|
|
|
|
**Schedule**: Daily at 3:00 AM.
|
|
|
|
#### `prune_old_checks(ctx)`
|
|
|
|
Trims the `proxy_checks` table to control storage growth.
|
|
|
|
**Strategy**: For each proxy, keep the most recent N check records (default: 100) and delete anything older than a retention period (default: 7 days). Both conditions must be met — recent proxies keep all their checks even if they have more than 100, while old checks are always pruned.
|
|
|
|
**Schedule**: Daily at 4:00 AM.
|
|
|
|
#### `expire_leases(ctx)`
|
|
|
|
Cleans up expired proxy leases.
|
|
|
|
**Steps**:
|
|
1. Query `proxy_leases WHERE is_released = false AND expires_at < now()`.
|
|
2. For each expired lease, set `is_released = true`.
|
|
3. Delete the corresponding Redis lease key (if it still exists — it should have expired via TTL, but this is a safety net).
|
|
|
|
**Schedule**: Every minute.
|
|
|
|
**Note**: Redis TTL is the primary expiration mechanism. This task is a consistency backstop that ensures the PostgreSQL records are accurate even if Redis keys expire silently.
|
|
|
|
## Task retry behavior
|
|
|
|
ARQ retries are configured per-task:
|
|
|
|
| Task | Max retries | Retry delay |
|
|
|------|-------------|-------------|
|
|
| `scrape_source` | 2 | 60s exponential |
|
|
| `validate_proxy` | 1 | 30s |
|
|
| `prune_dead_proxies` | 0 | — |
|
|
| `prune_old_checks` | 0 | — |
|
|
| `expire_leases` | 1 | 10s |
|
|
|
|
Retry delays use exponential backoff. Failed tasks after max retries are logged and the job result is stored in Redis for inspection.
|
|
|
|
## Monitoring
|
|
|
|
### Job results
|
|
|
|
ARQ stores job results in Redis for `keep_result` seconds (default: 3600). Query results via:
|
|
|
|
```python
|
|
from arq.connections import ArqRedis
|
|
redis = ArqRedis(...)
|
|
result = await redis.get_result("job_id")
|
|
```
|
|
|
|
### Health indicators
|
|
|
|
The `GET /stats/pool` endpoint includes `last_scrape_at` and `last_validation_at` timestamps. If these fall behind schedule, the worker may be down or stuck.
|
|
|
|
### Logging
|
|
|
|
Tasks log at structured INFO level on start/completion and WARN/ERROR on failures:
|
|
|
|
```
|
|
INFO scrape_source source_id=abc count_new=23 count_updated=119 duration_ms=1540
|
|
WARN scrape_source source_id=def error="HTTP 503" retrying=true attempt=2
|
|
ERROR validate_proxy proxy_id=ghi error="Pipeline timeout after 120s"
|
|
```
|