From b0b18fc7448373c637af6c301e3ae6bea01eac51 Mon Sep 17 00:00:00 2001 From: agatha Date: Sun, 15 Mar 2026 18:29:04 -0400 Subject: [PATCH] feat: add header classification --- src/proxy_pool/accounts/models.py | 2 +- src/proxy_pool/config.py | 4 + .../builtin/checkers/http_anonymity.py | 112 ++++++++++++-- src/proxy_pool/plugins/protocols.py | 1 + src/proxy_pool/proxy/models.py | 6 +- src/proxy_pool/worker/tasks_validate.py | 10 ++ tests/plugins/test_http_anonymity_checker.py | 139 +++++++++++++++++- 7 files changed, 251 insertions(+), 23 deletions(-) diff --git a/src/proxy_pool/accounts/models.py b/src/proxy_pool/accounts/models.py index ff07e86..9912736 100644 --- a/src/proxy_pool/accounts/models.py +++ b/src/proxy_pool/accounts/models.py @@ -9,7 +9,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship from proxy_pool.db.base import Base, TimestampMixin, UUIDPrimaryKeyMixin -class CreditTxType(str, enum.StrEnum): +class CreditTxType(enum.StrEnum): PURCHASE = "purchase" ACQUIRE = "acquire" REFUND = "refund" diff --git a/src/proxy_pool/config.py b/src/proxy_pool/config.py index f725333..bcf9367 100644 --- a/src/proxy_pool/config.py +++ b/src/proxy_pool/config.py @@ -70,6 +70,10 @@ class ProxyPipelineSettings(BaseSettings): default="http://httpbin.org/ip", description="URL for determining proxy exit IP", ) + judge_headers_url: str = Field( + default="http://httpbin.org/get", + description="URL that echoes back request headers and origin IP", + ) check_tcp_timeout: float = Field(default=5.0) check_http_timeout: float = Field(default=10.0) check_pipeline_timeout: float = Field(default=120.0) diff --git a/src/proxy_pool/plugins/builtin/checkers/http_anonymity.py b/src/proxy_pool/plugins/builtin/checkers/http_anonymity.py index 1ab209c..49010d8 100644 --- a/src/proxy_pool/plugins/builtin/checkers/http_anonymity.py +++ b/src/proxy_pool/plugins/builtin/checkers/http_anonymity.py @@ -5,6 +5,57 @@ import httpx from proxy_pool.config import Settings from proxy_pool.plugins.protocols import CheckContext, CheckResult +# Headers that reveal proxy usage or the client's real IP +REVEALING_HEADERS = { + "x-forwarded-for", + "x-real-ip", + "forwarded", + "via", + "x-proxy-id", + "x-proxy-connection", + "proxy-connection", + "x-forwarded-host", + "x-forwarded-proto", + "x-originating-ip", + "client-ip", + "true-client-ip", +} + + +def classify_anonymity( + headers: dict[str, str], + origin_ip: str | None, + proxy_ip: str, + real_ip: str | None, +) -> tuple[str, list[str]]: + """Classify anonymity level from echoed headers. + + Returns (anonymity_level, list of leaked header names). + """ + leaked: list[str] = [] + reveals_real_ip = False + + normalized = {k.lower(): v for k, v in headers.items()} + + for header_name in REVEALING_HEADERS: + value = normalized.get(header_name) + if value is None: + continue + + leaked.append(header_name) + + # Check if any revealing header contains the real IP + if real_ip and real_ip in value: + reveals_real_ip = True + + if reveals_real_ip: + return "transparent", leaked + + if leaked: + return "anonymous", leaked + + return "elite", leaked + class HttpAnonymityChecker: name = "http_anonymity" @@ -29,11 +80,14 @@ class HttpAnonymityChecker: from httpx_socks import AsyncProxyTransport transport = AsyncProxyTransport.from_url(proxy_url) - client_kwargs = {"transport": transport, "timeout": self.timeout} + client_kwargs = { + "transport": transport, + "timeout": self.timeout, + } except ImportError: return CheckResult( passed=False, - detail="httpx-socks not installed, cannot check SOCKS proxies", + detail="httpx-socks not installed", ) else: proxy_url = f"http://{proxy_ip}:{proxy_port}" @@ -50,7 +104,7 @@ class HttpAnonymityChecker: except httpx.TimeoutException: return CheckResult( passed=False, - detail=f"HTTP request through proxy timed out after {self.timeout}s", + detail=f"HTTP request timed out after {self.timeout}s", ) except httpx.ProxyError as err: return CheckResult( @@ -71,35 +125,63 @@ class HttpAnonymityChecker: latency = context.elapsed_ms() - (context.tcp_latency_ms or 0) context.http_latency_ms = latency + # Parse the judge response try: data = response.json() - exit_ip = data.get("origin") or data.get("ip") except Exception: - exit_ip = response.text.strip() + return CheckResult( + passed=True, + detail="Judge returned non-JSON response", + latency_ms=latency, + ) + # Extract origin IP + exit_ip = data.get("origin") if exit_ip: + # httpbin sometimes returns "ip1, ip2" for chained proxies + exit_ip = exit_ip.split(",")[0].strip() context.exit_ip = exit_ip - if exit_ip and exit_ip != proxy_ip: - context.anonymity_level = "elite" - elif exit_ip and exit_ip == proxy_ip: - context.anonymity_level = "anonymous" - else: - context.anonymity_level = "transparent" + # Extract headers the judge saw + echoed_headers = data.get("headers", {}) + + # Classify anonymity based on header analysis + # We don't know our real IP here, so we pass None + # This means we can detect "anonymous" (leaks proxy info) + # vs "elite" (no leaks), but transparent detection requires + # knowing the real IP to check if it appears in headers + anonymity, leaked = classify_anonymity( + headers=echoed_headers, + origin_ip=exit_ip, + proxy_ip=proxy_ip, + real_ip=context.real_ip, + ) + + context.anonymity_level = anonymity + context.headers_forwarded = leaked + + detail_parts = [f"Exit IP: {exit_ip}", f"Anonymity: {anonymity}"] + if leaked: + detail_parts.append(f"Leaked headers: {', '.join(leaked)}") return CheckResult( passed=True, - detail=f"Exit IP: {exit_ip}", + detail="; ".join(detail_parts), latency_ms=latency, - metadata={"exit_ip": exit_ip}, + metadata={ + "exit_ip": exit_ip, + "anonymity": anonymity, + "leaked_headers": leaked, + "echoed_headers": echoed_headers, + }, ) def should_skip(self, proxy_protocol: str) -> bool: - return False # We handle all protocols now + return False def create_plugin(settings: Settings) -> HttpAnonymityChecker: return HttpAnonymityChecker( - judge_url=settings.proxy.judge_url, + judge_url=settings.proxy.judge_headers_url, timeout=settings.proxy.check_http_timeout, ) diff --git a/src/proxy_pool/plugins/protocols.py b/src/proxy_pool/plugins/protocols.py index 2f3ccf1..d476721 100644 --- a/src/proxy_pool/plugins/protocols.py +++ b/src/proxy_pool/plugins/protocols.py @@ -25,6 +25,7 @@ class CheckContext: anonymity_level: str | None = None country: str | None = None headers_forwarded: list[str] = field(default_factory=list) + real_ip: str | None = None def elapsed_ms(self) -> float: return (datetime.now() - self.started_at).total_seconds() * 1000 diff --git a/src/proxy_pool/proxy/models.py b/src/proxy_pool/proxy/models.py index d1faf7f..4a3837d 100644 --- a/src/proxy_pool/proxy/models.py +++ b/src/proxy_pool/proxy/models.py @@ -18,20 +18,20 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship from proxy_pool.db.base import Base, TimestampMixin, UUIDPrimaryKeyMixin -class ProxyProtocol(str, enum.StrEnum): +class ProxyProtocol(enum.StrEnum): HTTP = "http" HTTPS = "https" SOCKS4 = "socks4" SOCKS5 = "socks5" -class ProxyStatus(str, enum.StrEnum): +class ProxyStatus(enum.StrEnum): UNCHECKED = "unchecked" ACTIVE = "active" DEAD = "dead" -class AnonymityLevel(str, enum.StrEnum): +class AnonymityLevel(enum.StrEnum): TRANSPARENT = "transparent" ANONYMOUS = "anonymous" ELITE = "elite" diff --git a/src/proxy_pool/worker/tasks_validate.py b/src/proxy_pool/worker/tasks_validate.py index a24fd95..cd0eebb 100644 --- a/src/proxy_pool/worker/tasks_validate.py +++ b/src/proxy_pool/worker/tasks_validate.py @@ -68,9 +68,19 @@ async def validate_proxy(ctx: dict, proxy_id: str) -> dict: async with httpx.AsyncClient( timeout=settings.proxy.check_http_timeout, ) as http_client: + # Detect our own IP for transparent proxy detection + real_ip = None + try: + ip_resp = await http_client.get(settings.proxy.judge_headers_url) + ip_data = ip_resp.json() + real_ip = ip_data.get("origin", "").split(",")[0].strip() or None + except Exception: + logger.debug("Could not detect real IP for anonymity classification") + context = CheckContext( started_at=datetime.now(), http_client=http_client, + real_ip=real_ip, ) all_results: list[tuple[object, CheckResult]] = [] diff --git a/tests/plugins/test_http_anonymity_checker.py b/tests/plugins/test_http_anonymity_checker.py index 5e12732..3f4687e 100644 --- a/tests/plugins/test_http_anonymity_checker.py +++ b/tests/plugins/test_http_anonymity_checker.py @@ -2,14 +2,17 @@ from datetime import datetime import pytest -from proxy_pool.plugins.builtin.checkers.http_anonymity import HttpAnonymityChecker +from proxy_pool.plugins.builtin.checkers.http_anonymity import ( + HttpAnonymityChecker, + classify_anonymity, +) from proxy_pool.plugins.protocols import CheckContext @pytest.fixture def checker(): return HttpAnonymityChecker( - judge_url="http://httpbin.org/ip", + judge_url="http://httpbin.org/get", timeout=10.0, ) @@ -20,9 +23,139 @@ def context(): started_at=datetime.now(), http_client=None, tcp_latency_ms=50.0, + real_ip="198.51.100.1", ) +class TestClassifyAnonymity: + def test_elite_no_revealing_headers(self): + headers = { + "Host": "httpbin.org", + "Accept": "*/*", + "User-Agent": "python-httpx/0.27", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "elite" + assert leaked == [] + + def test_transparent_real_ip_in_forwarded_for(self): + headers = { + "Host": "httpbin.org", + "X-Forwarded-For": "198.51.100.1", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "transparent" + assert "x-forwarded-for" in leaked + + def test_transparent_real_ip_in_chained_header(self): + headers = { + "X-Forwarded-For": "198.51.100.1, 10.0.0.1", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "transparent" + + def test_anonymous_via_header_present(self): + headers = { + "Via": "1.1 proxy.example.com", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "anonymous" + assert "via" in leaked + + def test_anonymous_proxy_headers_without_real_ip(self): + headers = { + "X-Forwarded-For": "10.0.0.1", + "Via": "1.1 squid", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "anonymous" + assert "x-forwarded-for" in leaked + assert "via" in leaked + + def test_multiple_revealing_headers(self): + headers = { + "X-Forwarded-For": "198.51.100.1", + "Via": "1.1 proxy", + "X-Real-Ip": "198.51.100.1", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "transparent" + assert len(leaked) == 3 + + def test_no_real_ip_known_falls_back_to_header_presence(self): + headers = { + "X-Forwarded-For": "10.0.0.1", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip=None, + ) + + assert level == "anonymous" + assert "x-forwarded-for" in leaked + + def test_case_insensitive_header_matching(self): + headers = { + "X-FORWARDED-FOR": "198.51.100.1", + "VIA": "1.1 proxy", + } + + level, leaked = classify_anonymity( + headers=headers, + origin_ip="203.0.113.1", + proxy_ip="203.0.113.1", + real_ip="198.51.100.1", + ) + + assert level == "transparent" + assert len(leaked) == 2 + + class TestHttpAnonymityChecker: def test_stage_and_priority(self, checker): assert checker.stage == 2 @@ -30,8 +163,6 @@ class TestHttpAnonymityChecker: def test_does_not_skip_any_protocol(self, checker): assert checker.should_skip("http") is False - assert checker.should_skip("https") is False - assert checker.should_skip("socks4") is False assert checker.should_skip("socks5") is False async def test_fails_on_unreachable_proxy(self, checker, context):