feat: add TCP connect checker plugin

This commit is contained in:
agatha 2026-03-14 15:31:56 -04:00
parent ce918b8fc2
commit 585fc260b0

View File

@ -0,0 +1,58 @@
from __future__ import annotations
import asyncio
from proxy_pool.config import Settings
from proxy_pool.plugins.protocols import CheckContext, CheckResult
class TcpConnectChecker:
name = "tcp_connect"
stage = 1
priority = 0
timeout = 5.0
def __init__(self, timeout: float = 5.0) -> None:
self.timeout = timeout
async def check(
self,
proxy_ip: str,
proxy_port: int,
proxy_protocol: str,
context: CheckContext,
) -> CheckResult:
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(proxy_ip, proxy_port),
timeout=self.timeout,
)
latency = context.elapsed_ms()
context.tcp_latency_ms = latency
writer.close()
await writer.wait_closed()
return CheckResult(
passed=True,
detail="TCP connect OK",
latency_ms=latency,
)
except TimeoutError:
return CheckResult(
passed=False,
detail=f"TCP connect timed out after {self.timeout}s",
)
except OSError as err:
return CheckResult(
passed=False,
detail=f"TCP connect failed: {err}",
)
def should_skip(self, proxy_protocol: str) -> bool:
return False
def create_plugin(settings: Settings) -> TcpConnectChecker:
return TcpConnectChecker(
timeout=settings.proxy.check_tcp_timeout,
)