feat: add scrape tasks

This commit is contained in:
agatha 2026-03-15 15:23:13 -04:00
parent b02793ae9b
commit 67089c570c

View File

@ -0,0 +1,157 @@
from __future__ import annotations
import logging
from datetime import datetime
from uuid import UUID
import httpx
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from proxy_pool.plugins.protocols import Event
from proxy_pool.plugins.registry import PluginRegistry
from proxy_pool.proxy.models import Proxy, ProxySource, ProxyStatus
logger = logging.getLogger(__name__)
async def scrape_source(ctx: dict, source_id: str) -> dict:
"""Scrape a single proxy source and upsert discovered proxies."""
session_factory: async_sessionmaker = ctx["session_factory"]
registry: PluginRegistry = ctx["registry"]
settings = ctx["settings"]
async with session_factory() as db:
source = await db.get(ProxySource, UUID(source_id))
if source is None:
logger.warning("Source %s not found, skipping", source_id)
return {"status": "skipped", "reason": "not_found"}
if not source.is_active:
return {"status": "skipped", "reason": "inactive"}
# Get the parser
try:
parser = registry.get_parser(source.parser_name)
except Exception:
logger.error(
"No parser '%s' for source %s",
source.parser_name,
source_id,
)
await registry.emit(
Event(
type="source.failed",
payload={
"source_id": source_id,
"error": f"Parser '{source.parser_name}' not found",
},
)
)
return {"status": "error", "reason": "parser_not_found"}
# Fetch the URL
try:
async with httpx.AsyncClient(
timeout=settings.proxy.scrape_timeout_seconds,
headers={"User-Agent": settings.proxy.scrape_user_agent},
) as client:
response = await client.get(str(source.url))
response.raise_for_status()
raw = response.content
except httpx.HTTPError as err:
logger.warning(
"Failed to fetch source %s: %s",
source.url,
err,
)
await registry.emit(
Event(
type="source.failed",
payload={"source_id": source_id, "error": str(err)},
)
)
return {"status": "error", "reason": str(err)}
# Parse
discovered = await parser.parse(
raw=raw,
source_url=str(source.url),
source_id=source.id,
default_protocol=source.default_protocol.value,
)
if not discovered:
logger.info("Source %s returned no proxies", source.url)
return {"status": "ok", "new": 0, "total": 0}
# Upsert
values = [
{
"ip": p.ip,
"port": p.port,
"protocol": p.protocol,
"source_id": source.id,
"status": ProxyStatus.UNCHECKED,
}
for p in discovered
]
stmt = insert(Proxy).values(values)
stmt = stmt.on_conflict_do_update(
index_elements=["ip", "port", "protocol"],
set_={
"source_id": stmt.excluded.source_id,
},
)
await db.execute(stmt)
# Update source timestamp
source.last_scraped_at = datetime.now()
await db.commit()
count = len(discovered)
logger.info(
"Scraped source %s: %d proxies discovered",
source.url,
count,
)
if count > 0:
await registry.emit(
Event(
type="proxy.new_batch",
payload={
"source_id": source_id,
"count": count,
},
)
)
return {"status": "ok", "total": count}
async def scrape_all(ctx: dict) -> dict:
"""Scrape all active sources sequentially."""
session_factory: async_sessionmaker = ctx["session_factory"]
async with session_factory() as db:
result = await db.execute(
select(ProxySource).where(ProxySource.is_active.is_(True))
)
sources = result.scalars().all()
source_ids = [str(s.id) for s in sources]
results = []
for source_id in source_ids:
result = await scrape_source(ctx, source_id)
results.append({"source_id": source_id, **result})
total = sum(r.get("total", 0) for r in results)
logger.info(
"Scrape sweep complete: %d sources, %d total proxies",
len(results),
total,
)
return {"sources": len(results), "total_proxies": total}