feat: add proxy acquire and release endpoints

This commit is contained in:
agatha 2026-03-14 16:43:02 -04:00
parent 27cfa144db
commit 6f8658c08f
2 changed files with 188 additions and 0 deletions

View File

@ -10,6 +10,7 @@ from proxy_pool.config import get_settings
from proxy_pool.db.session import create_session_factory
from proxy_pool.plugins.discovery import discover_plugins
from proxy_pool.plugins.registry import PluginRegistry
from proxy_pool.proxy.acquire import router as acquire_router
from proxy_pool.proxy.router import proxy_router
from proxy_pool.proxy.router import router as source_router
@ -64,6 +65,7 @@ def create_app() -> FastAPI:
app.include_router(health_router)
app.include_router(source_router)
app.include_router(proxy_router)
app.include_router(acquire_router)
app.include_router(auth_router)
app.include_router(account_router)

View File

@ -0,0 +1,186 @@
from __future__ import annotations
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from redis.asyncio import Redis
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from proxy_pool.accounts.auth import get_current_user
from proxy_pool.accounts.models import CreditTxType, ProxyLease, User
from proxy_pool.accounts.service import debit_credits, get_credit_balance
from proxy_pool.common.dependencies import get_db, get_redis
from proxy_pool.config import get_settings
from proxy_pool.proxy.models import (
AnonymityLevel,
Proxy,
ProxyProtocol,
ProxyStatus,
)
from proxy_pool.proxy.schemas import ProxyResponse
router = APIRouter(prefix="/proxies", tags=["proxies"])
class AcquireRequest(BaseModel):
protocol: ProxyProtocol | None = None
country: str | None = Field(default=None, min_length=2, max_length=2)
anonymity: AnonymityLevel | None = None
min_score: float | None = Field(default=None, ge=0.0, le=1.0)
lease_duration_seconds: int = Field(default=300, ge=30, le=3600)
class AcquireResponse(BaseModel):
lease_id: str
proxy: ProxyResponse
expires_at: datetime
credits_remaining: int
class ReleaseResponse(BaseModel):
lease_id: str
released: bool
@router.post("/acquire", response_model=AcquireResponse)
async def acquire_proxy(
body: AcquireRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
) -> AcquireResponse:
settings = get_settings()
key_prefix = settings.redis.key_prefix
# Step 1: Check credit balance
balance = await get_credit_balance(db, user.id)
if balance < 1:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Insufficient credits. Current balance: {balance}",
)
# Step 2: Build query for eligible proxies
query = (
select(Proxy)
.where(Proxy.status == ProxyStatus.ACTIVE)
.order_by(Proxy.score.desc().nulls_last())
)
if body.protocol is not None:
query = query.where(Proxy.protocol == body.protocol)
if body.country is not None:
query = query.where(Proxy.country == body.country.upper())
if body.anonymity is not None:
query = query.where(Proxy.anonymity == body.anonymity)
if body.min_score is not None:
query = query.where(Proxy.score >= body.min_score)
result = await db.execute(query)
candidates = list(result.scalars().all())
if not candidates:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No proxies match the requested filters",
)
# Step 3: Try to lease a proxy (skip already-leased ones)
proxy = None
lease_key = None
for candidate in candidates:
lease_key = f"{key_prefix}lease:{candidate.id}"
acquired = await redis.set(
lease_key,
str(user.id),
ex=body.lease_duration_seconds,
nx=True,
)
if acquired:
proxy = candidate
break
if proxy is None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="All matching proxies are currently leased",
)
# Step 4: Debit credit and record lease in DB
try:
new_balance = await debit_credits(
db,
user_id=user.id,
amount=1,
tx_type=CreditTxType.ACQUIRE,
description=f"Proxy acquired: {proxy.ip}:{proxy.port}",
reference_id=proxy.id,
)
except ValueError:
# Race condition: balance changed between check and debit
await redis.delete(lease_key)
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail="Insufficient credits",
) from None
# Step 5: Record lease in PostgreSQL for audit trail
expires_at = datetime.now() + timedelta(seconds=body.lease_duration_seconds)
lease = ProxyLease(
user_id=user.id,
proxy_id=proxy.id,
expires_at=expires_at,
)
db.add(lease)
await db.commit()
# Step 6: Invalidate cached credit balance
await redis.delete(f"{key_prefix}credits:{user.id}")
return AcquireResponse(
lease_id=str(lease.id),
proxy=ProxyResponse.model_validate(proxy),
expires_at=expires_at,
credits_remaining=new_balance,
)
@router.post("/acquire/{lease_id}/release", response_model=ReleaseResponse)
async def release_proxy(
lease_id: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
) -> ReleaseResponse:
settings = get_settings()
key_prefix = settings.redis.key_prefix
result = await db.execute(
select(ProxyLease).where(
ProxyLease.id == lease_id,
ProxyLease.user_id == user.id,
ProxyLease.is_released.is_(False),
)
)
lease = result.scalar_one_or_none()
if lease is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Lease not found or already released",
)
lease.is_released = True
await db.commit()
# Remove Redis lease key so proxy is immediately available
await redis.delete(f"{key_prefix}lease:{lease.proxy_id}")
return ReleaseResponse(
lease_id=str(lease.id),
released=True,
)