diff --git a/src/proxy_pool/app.py b/src/proxy_pool/app.py index 9801269..55dc2ec 100644 --- a/src/proxy_pool/app.py +++ b/src/proxy_pool/app.py @@ -10,6 +10,7 @@ from proxy_pool.config import get_settings from proxy_pool.db.session import create_session_factory from proxy_pool.plugins.discovery import discover_plugins from proxy_pool.plugins.registry import PluginRegistry +from proxy_pool.proxy.acquire import router as acquire_router from proxy_pool.proxy.router import proxy_router from proxy_pool.proxy.router import router as source_router @@ -64,6 +65,7 @@ def create_app() -> FastAPI: app.include_router(health_router) app.include_router(source_router) app.include_router(proxy_router) + app.include_router(acquire_router) app.include_router(auth_router) app.include_router(account_router) diff --git a/src/proxy_pool/proxy/acquire.py b/src/proxy_pool/proxy/acquire.py new file mode 100644 index 0000000..bc1e568 --- /dev/null +++ b/src/proxy_pool/proxy/acquire.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +from datetime import datetime, timedelta + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel, Field +from redis.asyncio import Redis +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from proxy_pool.accounts.auth import get_current_user +from proxy_pool.accounts.models import CreditTxType, ProxyLease, User +from proxy_pool.accounts.service import debit_credits, get_credit_balance +from proxy_pool.common.dependencies import get_db, get_redis +from proxy_pool.config import get_settings +from proxy_pool.proxy.models import ( + AnonymityLevel, + Proxy, + ProxyProtocol, + ProxyStatus, +) +from proxy_pool.proxy.schemas import ProxyResponse + +router = APIRouter(prefix="/proxies", tags=["proxies"]) + + +class AcquireRequest(BaseModel): + protocol: ProxyProtocol | None = None + country: str | None = Field(default=None, min_length=2, max_length=2) + anonymity: AnonymityLevel | None = None + min_score: float | None = Field(default=None, ge=0.0, le=1.0) + lease_duration_seconds: int = Field(default=300, ge=30, le=3600) + + +class AcquireResponse(BaseModel): + lease_id: str + proxy: ProxyResponse + expires_at: datetime + credits_remaining: int + + +class ReleaseResponse(BaseModel): + lease_id: str + released: bool + + +@router.post("/acquire", response_model=AcquireResponse) +async def acquire_proxy( + body: AcquireRequest, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), + redis: Redis = Depends(get_redis), +) -> AcquireResponse: + settings = get_settings() + key_prefix = settings.redis.key_prefix + + # Step 1: Check credit balance + balance = await get_credit_balance(db, user.id) + if balance < 1: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail=f"Insufficient credits. Current balance: {balance}", + ) + + # Step 2: Build query for eligible proxies + query = ( + select(Proxy) + .where(Proxy.status == ProxyStatus.ACTIVE) + .order_by(Proxy.score.desc().nulls_last()) + ) + + if body.protocol is not None: + query = query.where(Proxy.protocol == body.protocol) + if body.country is not None: + query = query.where(Proxy.country == body.country.upper()) + if body.anonymity is not None: + query = query.where(Proxy.anonymity == body.anonymity) + if body.min_score is not None: + query = query.where(Proxy.score >= body.min_score) + + result = await db.execute(query) + candidates = list(result.scalars().all()) + + if not candidates: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="No proxies match the requested filters", + ) + + # Step 3: Try to lease a proxy (skip already-leased ones) + proxy = None + lease_key = None + + for candidate in candidates: + lease_key = f"{key_prefix}lease:{candidate.id}" + acquired = await redis.set( + lease_key, + str(user.id), + ex=body.lease_duration_seconds, + nx=True, + ) + if acquired: + proxy = candidate + break + + if proxy is None: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="All matching proxies are currently leased", + ) + + # Step 4: Debit credit and record lease in DB + try: + new_balance = await debit_credits( + db, + user_id=user.id, + amount=1, + tx_type=CreditTxType.ACQUIRE, + description=f"Proxy acquired: {proxy.ip}:{proxy.port}", + reference_id=proxy.id, + ) + except ValueError: + # Race condition: balance changed between check and debit + await redis.delete(lease_key) + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail="Insufficient credits", + ) from None + + # Step 5: Record lease in PostgreSQL for audit trail + expires_at = datetime.now() + timedelta(seconds=body.lease_duration_seconds) + + lease = ProxyLease( + user_id=user.id, + proxy_id=proxy.id, + expires_at=expires_at, + ) + db.add(lease) + await db.commit() + + # Step 6: Invalidate cached credit balance + await redis.delete(f"{key_prefix}credits:{user.id}") + + return AcquireResponse( + lease_id=str(lease.id), + proxy=ProxyResponse.model_validate(proxy), + expires_at=expires_at, + credits_remaining=new_balance, + ) + + +@router.post("/acquire/{lease_id}/release", response_model=ReleaseResponse) +async def release_proxy( + lease_id: str, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), + redis: Redis = Depends(get_redis), +) -> ReleaseResponse: + settings = get_settings() + key_prefix = settings.redis.key_prefix + + result = await db.execute( + select(ProxyLease).where( + ProxyLease.id == lease_id, + ProxyLease.user_id == user.id, + ProxyLease.is_released.is_(False), + ) + ) + lease = result.scalar_one_or_none() + + if lease is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Lease not found or already released", + ) + + lease.is_released = True + await db.commit() + + # Remove Redis lease key so proxy is immediately available + await redis.delete(f"{key_prefix}lease:{lease.proxy_id}") + + return ReleaseResponse( + lease_id=str(lease.id), + released=True, + )