feat: add header classification

This commit is contained in:
agatha 2026-03-15 18:29:04 -04:00
parent f45229ceb2
commit b0b18fc744
7 changed files with 251 additions and 23 deletions

View File

@ -9,7 +9,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from proxy_pool.db.base import Base, TimestampMixin, UUIDPrimaryKeyMixin
class CreditTxType(str, enum.StrEnum):
class CreditTxType(enum.StrEnum):
PURCHASE = "purchase"
ACQUIRE = "acquire"
REFUND = "refund"

View File

@ -70,6 +70,10 @@ class ProxyPipelineSettings(BaseSettings):
default="http://httpbin.org/ip",
description="URL for determining proxy exit IP",
)
judge_headers_url: str = Field(
default="http://httpbin.org/get",
description="URL that echoes back request headers and origin IP",
)
check_tcp_timeout: float = Field(default=5.0)
check_http_timeout: float = Field(default=10.0)
check_pipeline_timeout: float = Field(default=120.0)

View File

@ -5,6 +5,57 @@ import httpx
from proxy_pool.config import Settings
from proxy_pool.plugins.protocols import CheckContext, CheckResult
# Headers that reveal proxy usage or the client's real IP
REVEALING_HEADERS = {
"x-forwarded-for",
"x-real-ip",
"forwarded",
"via",
"x-proxy-id",
"x-proxy-connection",
"proxy-connection",
"x-forwarded-host",
"x-forwarded-proto",
"x-originating-ip",
"client-ip",
"true-client-ip",
}
def classify_anonymity(
headers: dict[str, str],
origin_ip: str | None,
proxy_ip: str,
real_ip: str | None,
) -> tuple[str, list[str]]:
"""Classify anonymity level from echoed headers.
Returns (anonymity_level, list of leaked header names).
"""
leaked: list[str] = []
reveals_real_ip = False
normalized = {k.lower(): v for k, v in headers.items()}
for header_name in REVEALING_HEADERS:
value = normalized.get(header_name)
if value is None:
continue
leaked.append(header_name)
# Check if any revealing header contains the real IP
if real_ip and real_ip in value:
reveals_real_ip = True
if reveals_real_ip:
return "transparent", leaked
if leaked:
return "anonymous", leaked
return "elite", leaked
class HttpAnonymityChecker:
name = "http_anonymity"
@ -29,11 +80,14 @@ class HttpAnonymityChecker:
from httpx_socks import AsyncProxyTransport
transport = AsyncProxyTransport.from_url(proxy_url)
client_kwargs = {"transport": transport, "timeout": self.timeout}
client_kwargs = {
"transport": transport,
"timeout": self.timeout,
}
except ImportError:
return CheckResult(
passed=False,
detail="httpx-socks not installed, cannot check SOCKS proxies",
detail="httpx-socks not installed",
)
else:
proxy_url = f"http://{proxy_ip}:{proxy_port}"
@ -50,7 +104,7 @@ class HttpAnonymityChecker:
except httpx.TimeoutException:
return CheckResult(
passed=False,
detail=f"HTTP request through proxy timed out after {self.timeout}s",
detail=f"HTTP request timed out after {self.timeout}s",
)
except httpx.ProxyError as err:
return CheckResult(
@ -71,35 +125,63 @@ class HttpAnonymityChecker:
latency = context.elapsed_ms() - (context.tcp_latency_ms or 0)
context.http_latency_ms = latency
# Parse the judge response
try:
data = response.json()
exit_ip = data.get("origin") or data.get("ip")
except Exception:
exit_ip = response.text.strip()
return CheckResult(
passed=True,
detail="Judge returned non-JSON response",
latency_ms=latency,
)
# Extract origin IP
exit_ip = data.get("origin")
if exit_ip:
# httpbin sometimes returns "ip1, ip2" for chained proxies
exit_ip = exit_ip.split(",")[0].strip()
context.exit_ip = exit_ip
if exit_ip and exit_ip != proxy_ip:
context.anonymity_level = "elite"
elif exit_ip and exit_ip == proxy_ip:
context.anonymity_level = "anonymous"
else:
context.anonymity_level = "transparent"
# Extract headers the judge saw
echoed_headers = data.get("headers", {})
# Classify anonymity based on header analysis
# We don't know our real IP here, so we pass None
# This means we can detect "anonymous" (leaks proxy info)
# vs "elite" (no leaks), but transparent detection requires
# knowing the real IP to check if it appears in headers
anonymity, leaked = classify_anonymity(
headers=echoed_headers,
origin_ip=exit_ip,
proxy_ip=proxy_ip,
real_ip=context.real_ip,
)
context.anonymity_level = anonymity
context.headers_forwarded = leaked
detail_parts = [f"Exit IP: {exit_ip}", f"Anonymity: {anonymity}"]
if leaked:
detail_parts.append(f"Leaked headers: {', '.join(leaked)}")
return CheckResult(
passed=True,
detail=f"Exit IP: {exit_ip}",
detail="; ".join(detail_parts),
latency_ms=latency,
metadata={"exit_ip": exit_ip},
metadata={
"exit_ip": exit_ip,
"anonymity": anonymity,
"leaked_headers": leaked,
"echoed_headers": echoed_headers,
},
)
def should_skip(self, proxy_protocol: str) -> bool:
return False # We handle all protocols now
return False
def create_plugin(settings: Settings) -> HttpAnonymityChecker:
return HttpAnonymityChecker(
judge_url=settings.proxy.judge_url,
judge_url=settings.proxy.judge_headers_url,
timeout=settings.proxy.check_http_timeout,
)

View File

@ -25,6 +25,7 @@ class CheckContext:
anonymity_level: str | None = None
country: str | None = None
headers_forwarded: list[str] = field(default_factory=list)
real_ip: str | None = None
def elapsed_ms(self) -> float:
return (datetime.now() - self.started_at).total_seconds() * 1000

View File

@ -18,20 +18,20 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from proxy_pool.db.base import Base, TimestampMixin, UUIDPrimaryKeyMixin
class ProxyProtocol(str, enum.StrEnum):
class ProxyProtocol(enum.StrEnum):
HTTP = "http"
HTTPS = "https"
SOCKS4 = "socks4"
SOCKS5 = "socks5"
class ProxyStatus(str, enum.StrEnum):
class ProxyStatus(enum.StrEnum):
UNCHECKED = "unchecked"
ACTIVE = "active"
DEAD = "dead"
class AnonymityLevel(str, enum.StrEnum):
class AnonymityLevel(enum.StrEnum):
TRANSPARENT = "transparent"
ANONYMOUS = "anonymous"
ELITE = "elite"

View File

@ -68,9 +68,19 @@ async def validate_proxy(ctx: dict, proxy_id: str) -> dict:
async with httpx.AsyncClient(
timeout=settings.proxy.check_http_timeout,
) as http_client:
# Detect our own IP for transparent proxy detection
real_ip = None
try:
ip_resp = await http_client.get(settings.proxy.judge_headers_url)
ip_data = ip_resp.json()
real_ip = ip_data.get("origin", "").split(",")[0].strip() or None
except Exception:
logger.debug("Could not detect real IP for anonymity classification")
context = CheckContext(
started_at=datetime.now(),
http_client=http_client,
real_ip=real_ip,
)
all_results: list[tuple[object, CheckResult]] = []

View File

@ -2,14 +2,17 @@ from datetime import datetime
import pytest
from proxy_pool.plugins.builtin.checkers.http_anonymity import HttpAnonymityChecker
from proxy_pool.plugins.builtin.checkers.http_anonymity import (
HttpAnonymityChecker,
classify_anonymity,
)
from proxy_pool.plugins.protocols import CheckContext
@pytest.fixture
def checker():
return HttpAnonymityChecker(
judge_url="http://httpbin.org/ip",
judge_url="http://httpbin.org/get",
timeout=10.0,
)
@ -20,9 +23,139 @@ def context():
started_at=datetime.now(),
http_client=None,
tcp_latency_ms=50.0,
real_ip="198.51.100.1",
)
class TestClassifyAnonymity:
def test_elite_no_revealing_headers(self):
headers = {
"Host": "httpbin.org",
"Accept": "*/*",
"User-Agent": "python-httpx/0.27",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "elite"
assert leaked == []
def test_transparent_real_ip_in_forwarded_for(self):
headers = {
"Host": "httpbin.org",
"X-Forwarded-For": "198.51.100.1",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "transparent"
assert "x-forwarded-for" in leaked
def test_transparent_real_ip_in_chained_header(self):
headers = {
"X-Forwarded-For": "198.51.100.1, 10.0.0.1",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "transparent"
def test_anonymous_via_header_present(self):
headers = {
"Via": "1.1 proxy.example.com",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "anonymous"
assert "via" in leaked
def test_anonymous_proxy_headers_without_real_ip(self):
headers = {
"X-Forwarded-For": "10.0.0.1",
"Via": "1.1 squid",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "anonymous"
assert "x-forwarded-for" in leaked
assert "via" in leaked
def test_multiple_revealing_headers(self):
headers = {
"X-Forwarded-For": "198.51.100.1",
"Via": "1.1 proxy",
"X-Real-Ip": "198.51.100.1",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "transparent"
assert len(leaked) == 3
def test_no_real_ip_known_falls_back_to_header_presence(self):
headers = {
"X-Forwarded-For": "10.0.0.1",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip=None,
)
assert level == "anonymous"
assert "x-forwarded-for" in leaked
def test_case_insensitive_header_matching(self):
headers = {
"X-FORWARDED-FOR": "198.51.100.1",
"VIA": "1.1 proxy",
}
level, leaked = classify_anonymity(
headers=headers,
origin_ip="203.0.113.1",
proxy_ip="203.0.113.1",
real_ip="198.51.100.1",
)
assert level == "transparent"
assert len(leaked) == 2
class TestHttpAnonymityChecker:
def test_stage_and_priority(self, checker):
assert checker.stage == 2
@ -30,8 +163,6 @@ class TestHttpAnonymityChecker:
def test_does_not_skip_any_protocol(self, checker):
assert checker.should_skip("http") is False
assert checker.should_skip("https") is False
assert checker.should_skip("socks4") is False
assert checker.should_skip("socks5") is False
async def test_fails_on_unreachable_proxy(self, checker, context):