# Plugin system design ## Overview The plugin system allows extending Proxy Pool's functionality without modifying core code. Plugins can add new proxy list parsers, new validation methods, and new notification channels. The system uses Python's `typing.Protocol` for structural typing — plugins implement the right interface without inheriting from any base class. ## Plugin types ### SourceParser Responsible for extracting proxy entries from raw scraped content. Each parser handles a specific format (plain text lists, HTML tables, JSON APIs, etc.). ```python from typing import Protocol, runtime_checkable @runtime_checkable class SourceParser(Protocol): name: str def supports(self, url: str) -> bool: """Return True if this parser can handle the given source URL. Used as a fallback when no parser_name is explicitly set on a ProxySource. The registry calls supports() on each registered parser and uses the first match. """ ... async def parse(self, raw: bytes, source: ProxySource) -> list[DiscoveredProxy]: """Extract proxy entries from raw scraped content. Arguments: raw: The raw bytes fetched from the source URL. The parser is responsible for decoding (the encoding may vary by source). source: The ProxySource record, providing context like default_protocol. Returns: A list of DiscoveredProxy objects. Duplicates within a single parse call are acceptable — deduplication happens at the upsert layer. """ ... def default_schedule(self) -> str | None: """Optional cron expression for scrape frequency. If None, the schedule configured on the ProxySource record is used. This allows parsers to suggest a sensible default (e.g. "*/30 * * * *" for sources that update frequently). """ ... ``` **Registration key**: `parser_name` on the `ProxySource` record maps to `SourceParser.name`. **Built-in parsers**: `plaintext` (one `ip:port` per line), `html_table` (HTML table with IP/port columns), `json_api` (JSON array or nested structure). ### ProxyChecker Runs a single validation check against a proxy. Checkers are organized into stages — all checkers in stage N run before any in stage N+1. Within a stage, checkers run concurrently. ```python @runtime_checkable class ProxyChecker(Protocol): name: str stage: int # Pipeline ordering. Lower stages run first. priority: int # Ordering within a stage. Lower priority runs first. timeout: float # Per-check timeout in seconds. async def check(self, proxy: Proxy, context: CheckContext) -> CheckResult: """Run this check against the proxy. Arguments: proxy: The proxy being validated. context: Shared mutable state across the pipeline. Checkers in earlier stages populate fields (exit_ip, tcp_latency_ms) that later stages can read. Also provides a pre-configured httpx.AsyncClient. Returns: CheckResult with passed=True/False and a detail string. """ ... def should_skip(self, proxy: Proxy) -> bool: """Return True to skip this check for the given proxy. Example: A SOCKS5-specific checker returns True for HTTP-only proxies. """ ... ``` **Pipeline execution**: The orchestrator in `proxy_pool.proxy.pipeline` groups checkers by stage, runs each group concurrently via `asyncio.gather`, and aborts the pipeline on the first stage with any failure. Every individual check result is logged to the `proxy_checks` table. **`CheckContext`**: A mutable dataclass that travels through the pipeline: ```python @dataclass class CheckContext: started_at: datetime http_client: httpx.AsyncClient # Populated by checkers as they run: exit_ip: str | None = None tcp_latency_ms: float | None = None http_latency_ms: float | None = None anonymity_level: AnonymityLevel | None = None country: str | None = None headers_forwarded: list[str] = field(default_factory=list) def elapsed_ms(self) -> float: return (utcnow() - self.started_at).total_seconds() * 1000 ``` **`CheckResult`**: The return type from every checker: ```python @dataclass class CheckResult: passed: bool detail: str latency_ms: float | None = None metadata: dict[str, Any] = field(default_factory=dict) ``` **Built-in checkers**: | Name | Stage | What it does | |------|-------|-------------| | `tcp_connect` | 1 | Opens a TCP connection to verify the proxy is reachable | | `socks_handshake` | 1 | Performs a SOCKS4/5 handshake (skipped for HTTP proxies) | | `http_anonymity` | 2 | Sends an HTTP request through the proxy to a judge URL, determines exit IP and which headers are forwarded | | `geoip_lookup` | 2 | Resolves the exit IP to a country code using MaxMind GeoLite2 | | `site_reach` | 3 | Optional: tests whether the proxy can reach specific target URLs | ### Notifier Reacts to system events. Notifiers are called asynchronously (fire-and-forget) and must never block the main application path. ```python @runtime_checkable class Notifier(Protocol): name: str subscribes_to: list[str] # Glob patterns: "proxy.*", "credits.low_balance" async def notify(self, event: Event) -> None: """Handle an event. Called via asyncio.create_task — exceptions are caught and logged but do not propagate. Implementations should handle their own retries if needed. """ ... async def health_check(self) -> bool: """Verify the notification backend is reachable. Called periodically and surfaced in the admin stats endpoint. Return False if the backend is unreachable. """ ... ``` **Event types**: | Event | Payload | When emitted | |-------|---------|-------------| | `proxy.pool_low` | `{active_count, threshold}` | Active proxy count drops below configured threshold | | `proxy.new_batch` | `{source_id, count}` | A scrape discovers new proxies | | `source.failed` | `{source_id, error}` | A scrape attempt fails | | `source.stale` | `{source_id, hours_since_success}` | A source hasn't produced results in N hours | | `credits.low_balance` | `{user_id, balance, threshold}` | User balance drops below threshold | | `credits.exhausted` | `{user_id}` | User balance reaches zero | **Glob matching**: `proxy.*` matches all events starting with `proxy.`. Exact matches like `credits.low_balance` match only that event. A notifier can subscribe to multiple patterns. **Built-in notifiers**: `smtp` (email alerts), `webhook` (HTTP POST to a configured URL). ## Plugin registry The `PluginRegistry` class is the central coordinator. It stores registered plugins, validates them against Protocol contracts at registration time, and provides lookup methods used by the pipeline and event bus. ```python class PluginRegistry: def __init__(self) -> None: self._parsers: dict[str, SourceParser] = {} self._checkers: list[ProxyChecker] = [] # Sorted by (stage, priority) self._notifiers: list[Notifier] = [] self._event_subs: dict[str, list[Notifier]] = {} def register_parser(self, plugin: SourceParser) -> None: ... def register_checker(self, plugin: ProxyChecker) -> None: ... def register_notifier(self, plugin: Notifier) -> None: ... def get_parser(self, name: str) -> SourceParser: ... def get_parser_for_url(self, url: str) -> SourceParser | None: ... def get_checker_pipeline(self) -> list[ProxyChecker]: ... async def emit(self, event: Event) -> None: ... ``` **Validation**: At registration time, `_validate_protocol()` uses `isinstance()` (enabled by `@runtime_checkable` on each Protocol) as a structural check, then inspects for missing attributes/methods and raises `PluginValidationError` with a descriptive message. **Conflict detection**: Two parsers with the same `name` raise `PluginConflictError`. Checkers and notifiers are additive (duplicates are allowed, though unusual). ## Plugin discovery Plugins are discovered at application startup by scanning two directories: - `proxy_pool/plugins/builtin/` — Ships with the application. Tested in CI. - `proxy_pool/plugins/contrib/` — User-provided plugins. Can be mounted as a Docker volume. ### Convention Each plugin is a Python module (single `.py` file or package directory) that defines a `create_plugin(settings: Settings)` function. This factory function: - Receives the application settings (for reading config like SMTP credentials) - Returns a plugin instance, or `None` if the plugin should not activate (e.g. SMTP not configured) ```python # Example: proxy_pool/plugins/builtin/parsers/plaintext.py class PlaintextParser: name = "plaintext" def supports(self, url: str) -> bool: return url.endswith(".txt") async def parse(self, raw: bytes, source: ProxySource) -> list[DiscoveredProxy]: # ... parsing logic ... return results def default_schedule(self) -> str | None: return "*/30 * * * *" def create_plugin(settings: Settings) -> PlaintextParser: return PlaintextParser() ``` ### Discovery algorithm ```python async def discover_plugins(plugins_dir: Path, registry: PluginRegistry, settings: Settings): for path in sorted(plugins_dir.rglob("*.py")): if path.name.startswith("_"): continue module = importlib.import_module(derive_module_path(path)) if not hasattr(module, "create_plugin"): continue plugin = module.create_plugin(settings) if plugin is None: continue # Plugin opted out (unconfigured) # Type-based routing: match plugin: case SourceParser(): registry.register_parser(plugin) case ProxyChecker(): registry.register_checker(plugin) case Notifier(): registry.register_notifier(plugin) ``` The `match` statement uses structural pattern matching against `@runtime_checkable` Protocol types. The order matters — if a plugin somehow satisfies multiple Protocols, the first match wins. ## Wiring into FastAPI The plugin registry is created during the FastAPI lifespan and stored on `app.state`: ```python @asynccontextmanager async def lifespan(app: FastAPI): settings = get_settings() registry = PluginRegistry() # Discover from both builtin and contrib directories await discover_plugins(Path("proxy_pool/plugins/builtin"), registry, settings) await discover_plugins(Path("/app/plugins-contrib"), registry, settings) # Health-check notifiers for notifier in registry.notifiers: healthy = await notifier.health_check() if not healthy: logger.warning(f"Notifier '{notifier.name}' failed health check at startup") app.state.registry = registry # ... other setup (db, redis) ... yield # ... cleanup ... ``` Route handlers access the registry via a FastAPI dependency: ```python async def get_registry(request: Request) -> PluginRegistry: return request.app.state.registry ``` ## Writing a third-party plugin 1. Create a `.py` file in `plugins/contrib/` (or mount a directory as a Docker volume at `/app/plugins-contrib/`). 2. Implement the relevant Protocol methods and attributes. 3. Define `create_plugin(settings: Settings) -> YourPlugin | None`. 4. Restart the application. The plugin will be discovered and registered automatically. No inheritance required. No registration decorators. Just implement the shape and provide the factory. ### Testing a plugin ```python from proxy_pool.config import Settings from your_plugin import create_plugin def test_plugin_creates_successfully(): settings = Settings(smtp_host="localhost", ...) plugin = create_plugin(settings) assert plugin is not None assert plugin.name == "my_custom_parser" async def test_parser_extracts_proxies(): plugin = create_plugin(Settings(...)) raw = b"192.168.1.1:8080\n10.0.0.1:3128\n" source = make_test_source() results = await plugin.parse(raw, source) assert len(results) == 2 assert results[0].ip == "192.168.1.1" ```