From ce6c76602006a723181a34d77cc10ee48838802d Mon Sep 17 00:00:00 2001 From: agatha Date: Sat, 14 Mar 2026 14:49:05 -0400 Subject: [PATCH] feat: define plugin protocol contracts and vocabulary types --- src/proxy_pool/plugins/protocols.py | 90 +++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 src/proxy_pool/plugins/protocols.py diff --git a/src/proxy_pool/plugins/protocols.py b/src/proxy_pool/plugins/protocols.py new file mode 100644 index 0000000..364c874 --- /dev/null +++ b/src/proxy_pool/plugins/protocols.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Protocol, runtime_checkable + +import httpx + + +@dataclass +class DiscoveredProxy: + ip: str + port: int + protocol: str + source_id: Any = None + + +@dataclass +class CheckContext: + started_at: datetime + http_client: httpx.AsyncClient + exit_ip: str | None = None + tcp_latency_ms: float | None = None + http_latency_ms: float | None = None + anonymity_level: str | None = None + country: str | None = None + headers_forwarded: list[str] = field(default_factory=list) + + def elapsed_ms(self) -> float: + return (datetime.now() - self.started_at).total_seconds() * 1000 + + +@dataclass +class CheckResult: + passed: bool + detail: str + latency_ms: float | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class Event: + type: str + payload: dict[str, Any] + timestamp: datetime = field(default_factory=datetime.now) + + +@runtime_checkable +class SourceParser(Protocol): + name: str + + def supports(self, url: str) -> bool: ... + + async def parse( + self, + raw: bytes, + source_url: str, + source_id: Any, + default_protocol: str, + ) -> list[DiscoveredProxy]: ... + + def default_schedule(self) -> str | None: ... + + +@runtime_checkable +class ProxyChecker(Protocol): + name: str + stage: int + priority: int + timeout: float + + async def check( + self, + proxy_ip: str, + proxy_port: int, + proxy_protocol: str, + context: CheckContext, + ) -> CheckResult: ... + + def should_skip(self, proxy_protocol: str) -> bool: ... + + +@runtime_checkable +class Notifier(Protocol): + name: str + subscribe_to: list[str] + + async def notify(self, event: Event) -> None: ... + + async def health_check(self) -> bool: ...