from __future__ import annotations import asyncio import fnmatch import logging from proxy_pool.plugins.protocols import ( Event, Notifier, ProxyChecker, SourceParser, ) logger = logging.getLogger(__name__) class PluginError(Exception): pass class PluginConflictError(PluginError): pass class PluginValidationError(PluginError): pass class PluginNotFoundError(PluginError): pass class PluginRegistry: def __init__(self) -> None: self._parsers: dict[str, SourceParser] = {} self._checkers: list[ProxyChecker] = [] self._notifiers: list[Notifier] = [] self._event_subs: dict[str, list[Notifier]] = {} def register_parser(self, plugin: SourceParser) -> None: if not isinstance(plugin, SourceParser): msg = f"{type(plugin).__name__} does not satisfy SourceParser protocol" raise PluginValidationError(msg) if plugin.name in self._parsers: msg = f"Parser '{plugin.name}' already registered" raise PluginConflictError(msg) self._parsers[plugin.name] = plugin logger.info("Registered parser: %s", plugin.name) def register_checker(self, plugin: ProxyChecker) -> None: if not isinstance(plugin, ProxyChecker): msg = f"{type(plugin).__name__} does not satisfy ProxyChecker protocol" raise PluginValidationError(msg) self._checkers.append(plugin) self._checkers.sort(key=lambda c: (c.stage, c.priority)) logger.info( "Registered checker: %s (stage=%d, priority=%d)", plugin.name, plugin.stage, plugin.priority, ) def register_notifier(self, plugin: Notifier) -> None: if not isinstance(plugin, Notifier): msg = f"{type(plugin).__name__} does not satisfy Notifier protocol" raise PluginValidationError(msg) self._notifiers.append(plugin) for pattern in plugin.subscribes_to: self._event_subs.setdefault(pattern, []).append(plugin) logger.info( "Registered notifier: %s (subscribes_to=%s)", plugin.name, plugin.subscribes_to, ) def get_parser(self, name: str) -> SourceParser: try: return self._parsers[name] except KeyError as err: raise PluginNotFoundError( f"No parser registered with name '{name}'" ) from err def get_parser_for_url(self, url: str) -> SourceParser: return next((p for p in self._parsers.values() if p.supports(url)), None) def get_checker_pipeline(self) -> list[ProxyChecker]: return list(self._checkers) @property def parsers(self) -> dict[str, SourceParser]: return dict(self._parsers) @property def checkers(self) -> list[ProxyChecker]: return list(self._checkers) @property def notifiers(self) -> list[Notifier]: return list(self._notifiers) async def emit(self, event: Event) -> None: notified: set[str] = set() for pattern, subscribers in self._event_subs.items(): if fnmatch.fnmatch(event.type, pattern): for notifier in subscribers: if notifier.name in notified: continue notified.add(notifier.name) asyncio.create_task( self._safe_notify(notifier, event), ) @staticmethod async def _safe_notify(notifier: Notifier, event: Event) -> None: try: await notifier.notify(event) except Exception: logger.exception( "Notifier '%s' failed handling event '%s'", notifier.name, event.type, )