From b02793ae9ba61c4d292a89b639226e4ef78ba479 Mon Sep 17 00:00:00 2001 From: agatha Date: Sun, 15 Mar 2026 15:22:36 -0400 Subject: [PATCH] feat: add ARQ worker lifespan context --- src/proxy_pool/worker/context.py | 48 ++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 src/proxy_pool/worker/context.py diff --git a/src/proxy_pool/worker/context.py b/src/proxy_pool/worker/context.py new file mode 100644 index 0000000..8dfd9ab --- /dev/null +++ b/src/proxy_pool/worker/context.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import logging + +from redis.asyncio import from_url as redis_from_url + +from proxy_pool.config import get_settings +from proxy_pool.db.session import create_session_factory +from proxy_pool.plugins.discovery import discover_plugins +from proxy_pool.plugins.registry import PluginRegistry + +logger = logging.getLogger(__name__) + + +async def startup(ctx: dict) -> None: + """Called once when the ARQ worker boots. + + Populates ctx with shared resources that every task can access. + """ + settings = get_settings() + + ctx["settings"] = settings + ctx["session_factory"] = create_session_factory(settings) + ctx["redis"] = redis_from_url(settings.redis.url, decode_responses=True) + + registry = PluginRegistry() + discover_plugins("proxy_pool.plugins.builtin.parsers", registry, settings) + discover_plugins("proxy_pool.plugins.builtin.checkers", registry, settings) + discover_plugins("proxy_pool.plugins.builtin.notifiers", registry, settings) + ctx["registry"] = registry + + logger.info( + "Worker started: %d parsers, %d checkers, %d notifiers", + len(registry.parsers), + len(registry.checkers), + len(registry.notifiers), + ) + + +async def shutdown(ctx: dict) -> None: + """Called once when the ARQ worker shuts down.""" + if "redis" in ctx: + await ctx["redis"].close() + if "session_factory" in ctx: + engine = ctx["session_factory"].kw["bind"] + await engine.dispose() + + logger.info("Worker shut down")