From 67089c570c9d97f3cc792efc4ff4dd69b999ac4a Mon Sep 17 00:00:00 2001 From: agatha Date: Sun, 15 Mar 2026 15:23:13 -0400 Subject: [PATCH] feat: add scrape tasks --- src/proxy_pool/worker/tasks_scrape.py | 157 ++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 src/proxy_pool/worker/tasks_scrape.py diff --git a/src/proxy_pool/worker/tasks_scrape.py b/src/proxy_pool/worker/tasks_scrape.py new file mode 100644 index 0000000..47551cd --- /dev/null +++ b/src/proxy_pool/worker/tasks_scrape.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +import logging +from datetime import datetime +from uuid import UUID + +import httpx +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.ext.asyncio import async_sessionmaker + +from proxy_pool.plugins.protocols import Event +from proxy_pool.plugins.registry import PluginRegistry +from proxy_pool.proxy.models import Proxy, ProxySource, ProxyStatus + +logger = logging.getLogger(__name__) + + +async def scrape_source(ctx: dict, source_id: str) -> dict: + """Scrape a single proxy source and upsert discovered proxies.""" + session_factory: async_sessionmaker = ctx["session_factory"] + registry: PluginRegistry = ctx["registry"] + settings = ctx["settings"] + + async with session_factory() as db: + source = await db.get(ProxySource, UUID(source_id)) + if source is None: + logger.warning("Source %s not found, skipping", source_id) + return {"status": "skipped", "reason": "not_found"} + + if not source.is_active: + return {"status": "skipped", "reason": "inactive"} + + # Get the parser + try: + parser = registry.get_parser(source.parser_name) + except Exception: + logger.error( + "No parser '%s' for source %s", + source.parser_name, + source_id, + ) + await registry.emit( + Event( + type="source.failed", + payload={ + "source_id": source_id, + "error": f"Parser '{source.parser_name}' not found", + }, + ) + ) + return {"status": "error", "reason": "parser_not_found"} + + # Fetch the URL + try: + async with httpx.AsyncClient( + timeout=settings.proxy.scrape_timeout_seconds, + headers={"User-Agent": settings.proxy.scrape_user_agent}, + ) as client: + response = await client.get(str(source.url)) + response.raise_for_status() + raw = response.content + except httpx.HTTPError as err: + logger.warning( + "Failed to fetch source %s: %s", + source.url, + err, + ) + await registry.emit( + Event( + type="source.failed", + payload={"source_id": source_id, "error": str(err)}, + ) + ) + return {"status": "error", "reason": str(err)} + + # Parse + discovered = await parser.parse( + raw=raw, + source_url=str(source.url), + source_id=source.id, + default_protocol=source.default_protocol.value, + ) + + if not discovered: + logger.info("Source %s returned no proxies", source.url) + return {"status": "ok", "new": 0, "total": 0} + + # Upsert + values = [ + { + "ip": p.ip, + "port": p.port, + "protocol": p.protocol, + "source_id": source.id, + "status": ProxyStatus.UNCHECKED, + } + for p in discovered + ] + + stmt = insert(Proxy).values(values) + stmt = stmt.on_conflict_do_update( + index_elements=["ip", "port", "protocol"], + set_={ + "source_id": stmt.excluded.source_id, + }, + ) + await db.execute(stmt) + + # Update source timestamp + source.last_scraped_at = datetime.now() + await db.commit() + + count = len(discovered) + logger.info( + "Scraped source %s: %d proxies discovered", + source.url, + count, + ) + + if count > 0: + await registry.emit( + Event( + type="proxy.new_batch", + payload={ + "source_id": source_id, + "count": count, + }, + ) + ) + + return {"status": "ok", "total": count} + + +async def scrape_all(ctx: dict) -> dict: + """Scrape all active sources sequentially.""" + session_factory: async_sessionmaker = ctx["session_factory"] + + async with session_factory() as db: + result = await db.execute( + select(ProxySource).where(ProxySource.is_active.is_(True)) + ) + sources = result.scalars().all() + source_ids = [str(s.id) for s in sources] + + results = [] + for source_id in source_ids: + result = await scrape_source(ctx, source_id) + results.append({"source_id": source_id, **result}) + + total = sum(r.get("total", 0) for r in results) + logger.info( + "Scrape sweep complete: %d sources, %d total proxies", + len(results), + total, + ) + return {"sources": len(results), "total_proxies": total}