From 7e769872a7197f5ab9fb50d2f3312c6ab2913b75 Mon Sep 17 00:00:00 2001 From: agatha Date: Sat, 14 Mar 2026 15:05:13 -0400 Subject: [PATCH] feat: implement plugin registry with validation and event bus --- src/proxy_pool/plugins/protocols.py | 2 +- src/proxy_pool/plugins/registry.py | 123 ++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 src/proxy_pool/plugins/registry.py diff --git a/src/proxy_pool/plugins/protocols.py b/src/proxy_pool/plugins/protocols.py index 364c874..2f3ccf1 100644 --- a/src/proxy_pool/plugins/protocols.py +++ b/src/proxy_pool/plugins/protocols.py @@ -83,7 +83,7 @@ class ProxyChecker(Protocol): @runtime_checkable class Notifier(Protocol): name: str - subscribe_to: list[str] + subscribes_to: list[str] async def notify(self, event: Event) -> None: ... diff --git a/src/proxy_pool/plugins/registry.py b/src/proxy_pool/plugins/registry.py new file mode 100644 index 0000000..75b882e --- /dev/null +++ b/src/proxy_pool/plugins/registry.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import asyncio +import fnmatch +import logging + +from proxy_pool.plugins.protocols import ( + Event, + Notifier, + ProxyChecker, + SourceParser, +) + +logger = logging.getLogger(__name__) + + +class PluginError(Exception): + pass + + +class PluginConflictError(PluginError): + pass + + +class PluginValidationError(PluginError): + pass + + +class PluginNotFoundError(PluginError): + pass + + +class PluginRegistry: + def __init__(self) -> None: + self._parsers: dict[str, SourceParser] = {} + self._checkers: list[ProxyChecker] = [] + self._notifiers: list[Notifier] = [] + self._event_subs: dict[str, list[Notifier]] = {} + + def register_parser(self, plugin: SourceParser) -> None: + if not isinstance(plugin, SourceParser): + msg = f"{type(plugin).__name__} does not satisfy SourceParser protocol" + raise PluginValidationError(msg) + if plugin.name in self._parsers: + msg = f"Parser '{plugin.name}' already registered" + raise PluginConflictError(msg) + self._parsers[plugin.name] = plugin + logger.info("Registered parser: %s", plugin.name) + + def register_checker(self, plugin: ProxyChecker) -> None: + if not isinstance(plugin, ProxyChecker): + msg = f"{type(plugin).__name__} does not satisfy ProxyChecker protocol" + raise PluginValidationError(msg) + self._checkers.append(plugin) + self._checkers.sort(key=lambda c: (c.stage, c.priority)) + logger.info( + "Registered checker: %s (stage=%d, priority=%d)", + plugin.name, + plugin.stage, + plugin.priority, + ) + + def register_notifier(self, plugin: Notifier) -> None: + if not isinstance(plugin, Notifier): + msg = f"{type(plugin).__name__} does not satisfy Notifier protocol" + raise PluginValidationError(msg) + self._notifiers.append(plugin) + for pattern in plugin.subscribes_to: + self._event_subs.setdefault(pattern, []).append(plugin) + logger.info( + "Registered notifier: %s (subscribes_to=%s)", + plugin.name, + plugin.subscribes_to, + ) + + def get_parser(self, name: str) -> SourceParser: + try: + return self._parsers[name] + except KeyError as err: + raise PluginNotFoundError( + f"No parser registered with name '{name}'" + ) from err + + def get_parser_for_url(self, url: str) -> SourceParser: + return next((p for p in self._parsers.values() if p.supports(url)), None) + + def get_checker_pipeline(self) -> list[ProxyChecker]: + return list(self._checkers) + + @property + def parsers(self) -> dict[str, SourceParser]: + return dict(self._parsers) + + @property + def checkers(self) -> list[ProxyChecker]: + return list(self._checkers) + + @property + def notifiers(self) -> list[Notifier]: + return list(self._notifiers) + + async def emit(self, event: Event) -> None: + notified: set[str] = set() + for pattern, subscribers in self._event_subs.items(): + if fnmatch.fnmatch(event.type, pattern): + for notifier in subscribers: + if notifier.name in notified: + continue + notified.add(notifier.name) + asyncio.create_task( + self._safe_notify(notifier, event), + ) + + @staticmethod + async def _safe_notify(notifier: Notifier, event: Event) -> None: + try: + await notifier.notify(event) + except Exception: + logger.exception( + "Notifier '%s' failed handling event '%s'", + notifier.name, + event.type, + )