From a38d6a5e366daee4b6f123d2fe2e82a5887e6c34 Mon Sep 17 00:00:00 2001 From: agatha Date: Sat, 14 Mar 2026 16:15:49 -0400 Subject: [PATCH] feat: add proxy service layer with upsert and query --- src/proxy_pool/proxy/service.py | 99 +++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 src/proxy_pool/proxy/service.py diff --git a/src/proxy_pool/proxy/service.py b/src/proxy_pool/proxy/service.py new file mode 100644 index 0000000..abecf46 --- /dev/null +++ b/src/proxy_pool/proxy/service.py @@ -0,0 +1,99 @@ +from sqlalchemy import func, select +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.ext.asyncio import AsyncSession + +from proxy_pool.proxy.models import ( + AnonymityLevel, + Proxy, + ProxyProtocol, + ProxyStatus, +) + + +async def upsert_proxies( + db: AsyncSession, + proxies: list[dict], +) -> tuple[int, int]: + """Upsert discovered proxies. Returns (new_count, updated_count).""" + if not proxies: + return 0, 0 + + stmt = insert(Proxy).values(proxies) + stmt = stmt.on_conflict_do_update( + index_elements=["ip", "port", "protocol"], + set_={"source_id": stmt.excluded.source_id}, + ) + + result = await db.execute(stmt) + total = result.rowcount + await db.commit() + + # rowcount from on_conflict_do_update counts both inserts and updates + # We can't easily distinguish them without a subquery, so return total + return total, 0 + + +async def query_proxies( + db: AsyncSession, + *, + status: ProxyStatus | None = None, + protocol: ProxyProtocol | None = None, + anonymity: AnonymityLevel | None = None, + country: str | None = None, + min_score: float | None = None, + max_latency_ms: float | None = None, + min_uptime_pct: float | None = None, + verified_within_minutes: int | None = None, + sort_by: str = "score", + sort_order: str = "desc", + limit: int = 50, + offset: int = 0, +) -> tuple[list[Proxy], int]: + """Query proxies with filtering and sorting. Returns (proxies, total_count).""" + query = select(Proxy) + + if status is not None: + query = query.where(Proxy.status == status) + if protocol is not None: + query = query.where(Proxy.protocol == protocol) + if anonymity is not None: + query = query.where(Proxy.anonymity == anonymity) + if country is not None: + query = query.where(Proxy.country == country.upper()) + if min_score is not None: + query = query.where(Proxy.score >= min_score) + if max_latency_ms is not None: + query = query.where(Proxy.avg_latency_ms <= max_latency_ms) + if min_uptime_pct is not None: + query = query.where(Proxy.uptime_pct >= min_uptime_pct) + if verified_within_minutes is not None: + query = query.where( + Proxy.last_checked_at + >= func.now() + - func.make_interval(0, 0, 0, 0, 0, verified_within_minutes, 0) + ) + + # Count before pagination + count_query = select(func.count()).select_from(query.subquery()) + total = (await db.execute(count_query)).scalar_one() + + # Sorting + sort_columns = { + "score": Proxy.score, + "latency": Proxy.avg_latency_ms, + "uptime": Proxy.uptime_pct, + "last_checked": Proxy.last_checked_at, + "created": Proxy.created_at, + } + sort_col = sort_columns.get(sort_by, Proxy.score) + if sort_order == "asc": + query = query.order_by(sort_col.asc().nulls_last()) + else: + query = query.order_by(sort_col.desc().nulls_last()) + + query = query.offset(offset).limit(limit) + + result = await db.execute(query) + proxies = list(result.scalars().all()) + + return proxies, total