83 lines
2.4 KiB
Python

from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
from proxy_pool.proxy.models import AnonymityLevel, ProxyProtocol, ProxyStatus
class ProxySourceCreate(BaseModel):
url: str = Field(max_length=2048)
parser_name: str = Field(max_length=64)
cron_schedule: str | None = Field(default=None, max_length=64)
default_protocol: ProxyProtocol = ProxyProtocol.HTTP
class ProxySourceUpdate(BaseModel):
url: str | None = Field(default=None, max_length=2048)
parser_name: str | None = Field(default=None, max_length=64)
cron_schedule: str | None = None
default_protocol: ProxyProtocol | None = None
is_active: bool | None = None
class ProxySourceResponse(BaseModel):
model_config = {"from_attributes": True}
id: UUID
url: str
parser_name: str
cron_schedule: str | None
default_protocol: ProxyProtocol
is_active: bool
last_scraped_at: datetime | None
created_at: datetime
class ProxyResponse(BaseModel):
model_config = {"from_attributes": True}
id: UUID
ip: str
port: int
protocol: ProxyProtocol
status: ProxyStatus
anonymity: AnonymityLevel | None
exit_ip: str | None
country: str | None
score: float
avg_latency_ms: float | None
uptime_pct: float | None
last_checked_at: datetime | None
first_seen_at: datetime
created_at: datetime
@field_validator("ip", "exit_ip", mode="before")
@classmethod
def coerce_inet(cls, value: object) -> str | None:
if value is None:
return None
return str(value)
class ProxyListParams(BaseModel):
status: ProxyStatus | None = None
protocol: ProxyProtocol | None = None
anonymity: AnonymityLevel | None = None
country: str | None = Field(default=None, min_length=2, max_length=2)
min_score: float | None = Field(default=None, ge=0.0, le=1.0)
max_latency_ms: float | None = Field(default=None, gt=0)
min_uptime_pct: float | None = Field(default=None, ge=0.0, le=100.0)
verified_within_minutes: int | None = Field(default=None, gt=0)
sort_by: str = Field(default="score")
sort_order: str = Field(default="desc", pattern="^(asc|desc)$")
limit: int = Field(default=50, ge=1, le=200)
offset: int = Field(default=0, ge=0)
class ProxyListResponse(BaseModel):
items: list[ProxyResponse]
total_count: int
limit: int
offset: int