feat: add plaintext source parser plugin

This commit is contained in:
agatha 2026-03-14 15:30:54 -04:00
parent ce39232c08
commit ce918b8fc2

View File

@ -0,0 +1,47 @@
from __future__ import annotations
import re
from typing import Any
from proxy_pool.config import Settings
from proxy_pool.plugins.protocols import DiscoveredProxy
_LINE_RE = re.compile(r"^(\d{1,3}(?:\.\d{1,3}){3}):(\d{2,5})$")
class PlaintextParser:
name = "plaintext"
def supports(self, url: str) -> bool:
return url.endswith(".txt") or "plain" in url.lower()
async def parse(
self,
raw: bytes,
source_url: str,
source_id: Any,
default_protocol: str,
) -> list[DiscoveredProxy]:
results: list[DiscoveredProxy] = []
text = raw.decode("utf-8", errors="ignore")
for line in text.splitlines():
match = _LINE_RE.match(line.strip())
if match:
results.append(
DiscoveredProxy(
ip=match.group(1),
port=int(match.group(2)),
protocol=default_protocol,
source_id=source_id,
)
)
return results
def default_schedule(self) -> str | None:
return "*/30 * * * *"
def create_plugin(settings: Settings) -> PlaintextParser:
return PlaintextParser()