12 KiB
Plugin system design
Overview
The plugin system allows extending Proxy Pool's functionality without modifying core code. Plugins can add new proxy list parsers, new validation methods, and new notification channels. The system uses Python's typing.Protocol for structural typing — plugins implement the right interface without inheriting from any base class.
Plugin types
SourceParser
Responsible for extracting proxy entries from raw scraped content. Each parser handles a specific format (plain text lists, HTML tables, JSON APIs, etc.).
from typing import Protocol, runtime_checkable
@runtime_checkable
class SourceParser(Protocol):
name: str
def supports(self, url: str) -> bool:
"""Return True if this parser can handle the given source URL.
Used as a fallback when no parser_name is explicitly set on a ProxySource.
The registry calls supports() on each registered parser and uses the first match.
"""
...
async def parse(self, raw: bytes, source: ProxySource) -> list[DiscoveredProxy]:
"""Extract proxy entries from raw scraped content.
Arguments:
raw: The raw bytes fetched from the source URL. The parser is responsible
for decoding (the encoding may vary by source).
source: The ProxySource record, providing context like default_protocol.
Returns:
A list of DiscoveredProxy objects. Duplicates within a single parse call
are acceptable — deduplication happens at the upsert layer.
"""
...
def default_schedule(self) -> str | None:
"""Optional cron expression for scrape frequency.
If None, the schedule configured on the ProxySource record is used.
This allows parsers to suggest a sensible default (e.g. "*/30 * * * *"
for sources that update frequently).
"""
...
Registration key: parser_name on the ProxySource record maps to SourceParser.name.
Built-in parsers: plaintext (one ip:port per line), html_table (HTML table with IP/port columns), json_api (JSON array or nested structure).
ProxyChecker
Runs a single validation check against a proxy. Checkers are organized into stages — all checkers in stage N run before any in stage N+1. Within a stage, checkers run concurrently.
@runtime_checkable
class ProxyChecker(Protocol):
name: str
stage: int # Pipeline ordering. Lower stages run first.
priority: int # Ordering within a stage. Lower priority runs first.
timeout: float # Per-check timeout in seconds.
async def check(self, proxy: Proxy, context: CheckContext) -> CheckResult:
"""Run this check against the proxy.
Arguments:
proxy: The proxy being validated.
context: Shared mutable state across the pipeline. Checkers in earlier
stages populate fields (exit_ip, tcp_latency_ms) that later
stages can read. Also provides a pre-configured httpx.AsyncClient.
Returns:
CheckResult with passed=True/False and a detail string.
"""
...
def should_skip(self, proxy: Proxy) -> bool:
"""Return True to skip this check for the given proxy.
Example: A SOCKS5-specific checker returns True for HTTP-only proxies.
"""
...
Pipeline execution: The orchestrator in proxy_pool.proxy.pipeline groups checkers by stage, runs each group concurrently via asyncio.gather, and aborts the pipeline on the first stage with any failure. Every individual check result is logged to the proxy_checks table.
CheckContext: A mutable dataclass that travels through the pipeline:
@dataclass
class CheckContext:
started_at: datetime
http_client: httpx.AsyncClient
# Populated by checkers as they run:
exit_ip: str | None = None
tcp_latency_ms: float | None = None
http_latency_ms: float | None = None
anonymity_level: AnonymityLevel | None = None
country: str | None = None
headers_forwarded: list[str] = field(default_factory=list)
def elapsed_ms(self) -> float:
return (utcnow() - self.started_at).total_seconds() * 1000
CheckResult: The return type from every checker:
@dataclass
class CheckResult:
passed: bool
detail: str
latency_ms: float | None = None
metadata: dict[str, Any] = field(default_factory=dict)
Built-in checkers:
| Name | Stage | What it does |
|---|---|---|
tcp_connect |
1 | Opens a TCP connection to verify the proxy is reachable |
socks_handshake |
1 | Performs a SOCKS4/5 handshake (skipped for HTTP proxies) |
http_anonymity |
2 | Sends an HTTP request through the proxy to a judge URL, determines exit IP and which headers are forwarded |
geoip_lookup |
2 | Resolves the exit IP to a country code using MaxMind GeoLite2 |
site_reach |
3 | Optional: tests whether the proxy can reach specific target URLs |
Notifier
Reacts to system events. Notifiers are called asynchronously (fire-and-forget) and must never block the main application path.
@runtime_checkable
class Notifier(Protocol):
name: str
subscribes_to: list[str] # Glob patterns: "proxy.*", "credits.low_balance"
async def notify(self, event: Event) -> None:
"""Handle an event.
Called via asyncio.create_task — exceptions are caught and logged
but do not propagate. Implementations should handle their own
retries if needed.
"""
...
async def health_check(self) -> bool:
"""Verify the notification backend is reachable.
Called periodically and surfaced in the admin stats endpoint.
Return False if the backend is unreachable.
"""
...
Event types:
| Event | Payload | When emitted |
|---|---|---|
proxy.pool_low |
{active_count, threshold} |
Active proxy count drops below configured threshold |
proxy.new_batch |
{source_id, count} |
A scrape discovers new proxies |
source.failed |
{source_id, error} |
A scrape attempt fails |
source.stale |
{source_id, hours_since_success} |
A source hasn't produced results in N hours |
credits.low_balance |
{user_id, balance, threshold} |
User balance drops below threshold |
credits.exhausted |
{user_id} |
User balance reaches zero |
Glob matching: proxy.* matches all events starting with proxy.. Exact matches like credits.low_balance match only that event. A notifier can subscribe to multiple patterns.
Built-in notifiers: smtp (email alerts), webhook (HTTP POST to a configured URL).
Plugin registry
The PluginRegistry class is the central coordinator. It stores registered plugins, validates them against Protocol contracts at registration time, and provides lookup methods used by the pipeline and event bus.
class PluginRegistry:
def __init__(self) -> None:
self._parsers: dict[str, SourceParser] = {}
self._checkers: list[ProxyChecker] = [] # Sorted by (stage, priority)
self._notifiers: list[Notifier] = []
self._event_subs: dict[str, list[Notifier]] = {}
def register_parser(self, plugin: SourceParser) -> None: ...
def register_checker(self, plugin: ProxyChecker) -> None: ...
def register_notifier(self, plugin: Notifier) -> None: ...
def get_parser(self, name: str) -> SourceParser: ...
def get_parser_for_url(self, url: str) -> SourceParser | None: ...
def get_checker_pipeline(self) -> list[ProxyChecker]: ...
async def emit(self, event: Event) -> None: ...
Validation: At registration time, _validate_protocol() uses isinstance() (enabled by @runtime_checkable on each Protocol) as a structural check, then inspects for missing attributes/methods and raises PluginValidationError with a descriptive message.
Conflict detection: Two parsers with the same name raise PluginConflictError. Checkers and notifiers are additive (duplicates are allowed, though unusual).
Plugin discovery
Plugins are discovered at application startup by scanning two directories:
proxy_pool/plugins/builtin/— Ships with the application. Tested in CI.proxy_pool/plugins/contrib/— User-provided plugins. Can be mounted as a Docker volume.
Convention
Each plugin is a Python module (single .py file or package directory) that defines a create_plugin(settings: Settings) function. This factory function:
- Receives the application settings (for reading config like SMTP credentials)
- Returns a plugin instance, or
Noneif the plugin should not activate (e.g. SMTP not configured)
# Example: proxy_pool/plugins/builtin/parsers/plaintext.py
class PlaintextParser:
name = "plaintext"
def supports(self, url: str) -> bool:
return url.endswith(".txt")
async def parse(self, raw: bytes, source: ProxySource) -> list[DiscoveredProxy]:
# ... parsing logic ...
return results
def default_schedule(self) -> str | None:
return "*/30 * * * *"
def create_plugin(settings: Settings) -> PlaintextParser:
return PlaintextParser()
Discovery algorithm
async def discover_plugins(plugins_dir: Path, registry: PluginRegistry, settings: Settings):
for path in sorted(plugins_dir.rglob("*.py")):
if path.name.startswith("_"):
continue
module = importlib.import_module(derive_module_path(path))
if not hasattr(module, "create_plugin"):
continue
plugin = module.create_plugin(settings)
if plugin is None:
continue # Plugin opted out (unconfigured)
# Type-based routing:
match plugin:
case SourceParser(): registry.register_parser(plugin)
case ProxyChecker(): registry.register_checker(plugin)
case Notifier(): registry.register_notifier(plugin)
The match statement uses structural pattern matching against @runtime_checkable Protocol types. The order matters — if a plugin somehow satisfies multiple Protocols, the first match wins.
Wiring into FastAPI
The plugin registry is created during the FastAPI lifespan and stored on app.state:
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
registry = PluginRegistry()
# Discover from both builtin and contrib directories
await discover_plugins(Path("proxy_pool/plugins/builtin"), registry, settings)
await discover_plugins(Path("/app/plugins-contrib"), registry, settings)
# Health-check notifiers
for notifier in registry.notifiers:
healthy = await notifier.health_check()
if not healthy:
logger.warning(f"Notifier '{notifier.name}' failed health check at startup")
app.state.registry = registry
# ... other setup (db, redis) ...
yield
# ... cleanup ...
Route handlers access the registry via a FastAPI dependency:
async def get_registry(request: Request) -> PluginRegistry:
return request.app.state.registry
Writing a third-party plugin
- Create a
.pyfile inplugins/contrib/(or mount a directory as a Docker volume at/app/plugins-contrib/). - Implement the relevant Protocol methods and attributes.
- Define
create_plugin(settings: Settings) -> YourPlugin | None. - Restart the application. The plugin will be discovered and registered automatically.
No inheritance required. No registration decorators. Just implement the shape and provide the factory.
Testing a plugin
from proxy_pool.config import Settings
from your_plugin import create_plugin
def test_plugin_creates_successfully():
settings = Settings(smtp_host="localhost", ...)
plugin = create_plugin(settings)
assert plugin is not None
assert plugin.name == "my_custom_parser"
async def test_parser_extracts_proxies():
plugin = create_plugin(Settings(...))
raw = b"192.168.1.1:8080\n10.0.0.1:3128\n"
source = make_test_source()
results = await plugin.parse(raw, source)
assert len(results) == 2
assert results[0].ip == "192.168.1.1"