feat: add proxy service layer with upsert and query
This commit is contained in:
parent
8532b9ca8d
commit
a38d6a5e36
99
src/proxy_pool/proxy/service.py
Normal file
99
src/proxy_pool/proxy/service.py
Normal file
@ -0,0 +1,99 @@
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from proxy_pool.proxy.models import (
|
||||
AnonymityLevel,
|
||||
Proxy,
|
||||
ProxyProtocol,
|
||||
ProxyStatus,
|
||||
)
|
||||
|
||||
|
||||
async def upsert_proxies(
|
||||
db: AsyncSession,
|
||||
proxies: list[dict],
|
||||
) -> tuple[int, int]:
|
||||
"""Upsert discovered proxies. Returns (new_count, updated_count)."""
|
||||
if not proxies:
|
||||
return 0, 0
|
||||
|
||||
stmt = insert(Proxy).values(proxies)
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=["ip", "port", "protocol"],
|
||||
set_={"source_id": stmt.excluded.source_id},
|
||||
)
|
||||
|
||||
result = await db.execute(stmt)
|
||||
total = result.rowcount
|
||||
await db.commit()
|
||||
|
||||
# rowcount from on_conflict_do_update counts both inserts and updates
|
||||
# We can't easily distinguish them without a subquery, so return total
|
||||
return total, 0
|
||||
|
||||
|
||||
async def query_proxies(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
status: ProxyStatus | None = None,
|
||||
protocol: ProxyProtocol | None = None,
|
||||
anonymity: AnonymityLevel | None = None,
|
||||
country: str | None = None,
|
||||
min_score: float | None = None,
|
||||
max_latency_ms: float | None = None,
|
||||
min_uptime_pct: float | None = None,
|
||||
verified_within_minutes: int | None = None,
|
||||
sort_by: str = "score",
|
||||
sort_order: str = "desc",
|
||||
limit: int = 50,
|
||||
offset: int = 0,
|
||||
) -> tuple[list[Proxy], int]:
|
||||
"""Query proxies with filtering and sorting. Returns (proxies, total_count)."""
|
||||
query = select(Proxy)
|
||||
|
||||
if status is not None:
|
||||
query = query.where(Proxy.status == status)
|
||||
if protocol is not None:
|
||||
query = query.where(Proxy.protocol == protocol)
|
||||
if anonymity is not None:
|
||||
query = query.where(Proxy.anonymity == anonymity)
|
||||
if country is not None:
|
||||
query = query.where(Proxy.country == country.upper())
|
||||
if min_score is not None:
|
||||
query = query.where(Proxy.score >= min_score)
|
||||
if max_latency_ms is not None:
|
||||
query = query.where(Proxy.avg_latency_ms <= max_latency_ms)
|
||||
if min_uptime_pct is not None:
|
||||
query = query.where(Proxy.uptime_pct >= min_uptime_pct)
|
||||
if verified_within_minutes is not None:
|
||||
query = query.where(
|
||||
Proxy.last_checked_at
|
||||
>= func.now()
|
||||
- func.make_interval(0, 0, 0, 0, 0, verified_within_minutes, 0)
|
||||
)
|
||||
|
||||
# Count before pagination
|
||||
count_query = select(func.count()).select_from(query.subquery())
|
||||
total = (await db.execute(count_query)).scalar_one()
|
||||
|
||||
# Sorting
|
||||
sort_columns = {
|
||||
"score": Proxy.score,
|
||||
"latency": Proxy.avg_latency_ms,
|
||||
"uptime": Proxy.uptime_pct,
|
||||
"last_checked": Proxy.last_checked_at,
|
||||
"created": Proxy.created_at,
|
||||
}
|
||||
sort_col = sort_columns.get(sort_by, Proxy.score)
|
||||
if sort_order == "asc":
|
||||
query = query.order_by(sort_col.asc().nulls_last())
|
||||
else:
|
||||
query = query.order_by(sort_col.desc().nulls_last())
|
||||
|
||||
query = query.offset(offset).limit(limit)
|
||||
|
||||
result = await db.execute(query)
|
||||
proxies = list(result.scalars().all())
|
||||
|
||||
return proxies, total
|
||||
Loading…
x
Reference in New Issue
Block a user