diff --git a/src/proxy_pool/worker/context.py b/src/proxy_pool/worker/context.py new file mode 100644 index 0000000..8dfd9ab --- /dev/null +++ b/src/proxy_pool/worker/context.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import logging + +from redis.asyncio import from_url as redis_from_url + +from proxy_pool.config import get_settings +from proxy_pool.db.session import create_session_factory +from proxy_pool.plugins.discovery import discover_plugins +from proxy_pool.plugins.registry import PluginRegistry + +logger = logging.getLogger(__name__) + + +async def startup(ctx: dict) -> None: + """Called once when the ARQ worker boots. + + Populates ctx with shared resources that every task can access. + """ + settings = get_settings() + + ctx["settings"] = settings + ctx["session_factory"] = create_session_factory(settings) + ctx["redis"] = redis_from_url(settings.redis.url, decode_responses=True) + + registry = PluginRegistry() + discover_plugins("proxy_pool.plugins.builtin.parsers", registry, settings) + discover_plugins("proxy_pool.plugins.builtin.checkers", registry, settings) + discover_plugins("proxy_pool.plugins.builtin.notifiers", registry, settings) + ctx["registry"] = registry + + logger.info( + "Worker started: %d parsers, %d checkers, %d notifiers", + len(registry.parsers), + len(registry.checkers), + len(registry.notifiers), + ) + + +async def shutdown(ctx: dict) -> None: + """Called once when the ARQ worker shuts down.""" + if "redis" in ctx: + await ctx["redis"].close() + if "session_factory" in ctx: + engine = ctx["session_factory"].kw["bind"] + await engine.dispose() + + logger.info("Worker shut down")