diff --git a/src/proxy_pool/proxy/router.py b/src/proxy_pool/proxy/router.py index 9b88e65..7eebd8c 100644 --- a/src/proxy_pool/proxy/router.py +++ b/src/proxy_pool/proxy/router.py @@ -1,10 +1,13 @@ +from datetime import datetime from uuid import UUID +import httpx from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from proxy_pool.common.dependencies import get_db, get_registry +from proxy_pool.config import get_settings from proxy_pool.plugins.registry import PluginRegistry from proxy_pool.proxy.models import Proxy, ProxySource from proxy_pool.proxy.schemas import ( @@ -131,6 +134,81 @@ async def delete_source( await db.commit() +@router.post("/{source_id}/scrape") +async def trigger_scrape( + source_id: UUID, + db: AsyncSession = Depends(get_db), + registry: PluginRegistry = Depends(get_registry), +): + source = await db.get(ProxySource, source_id) + if source is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Source not found", + ) from None + + try: + parser = registry.get_parser(source.parser_name) + except Exception: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Parser '{source.parser_name}' not registered", + ) from None + + settings = get_settings() + + try: + async with httpx.AsyncClient( + timeout=settings.proxy.scrape_timeout_seconds, + headers={"User-Agent": settings.proxy.scrape_user_agent}, + ) as client: + response = await client.get(str(source.url)) + response.raise_for_status() + except httpx.HTTPError as err: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Failed to fetch source: {err}", + ) from None + + discovered = await parser.parse( + raw=response.content, + source_url=str(source.url), + source_id=source.id, + default_protocol=source.default_protocol.value, + ) + + if discovered: + from sqlalchemy.dialects.postgresql import insert as pg_insert + + from proxy_pool.proxy.models import Proxy, ProxyStatus + + values = [ + { + "ip": p.ip, + "port": p.port, + "protocol": p.protocol, + "source_id": source.id, + "status": ProxyStatus.UNCHECKED, + } + for p in discovered + ] + + stmt = pg_insert(Proxy).values(values) + stmt = stmt.on_conflict_do_update( + index_elements=["ip", "port", "protocol"], + set_={"source_id": stmt.excluded.source_id}, + ) + await db.execute(stmt) + + source.last_scraped_at = datetime.now() + await db.commit() + + return { + "source_id": str(source.id), + "proxies_discovered": len(discovered), + } + + @proxy_router.get("", response_model=ProxyListResponse) async def list_proxies( params: ProxyListParams = Depends(),