Short IDs become the canonical identifier in URLs (/i/:short_id), MinIO/R2 storage keys, and all API responses. Hash-based deduplication is preserved. Includes two-phase Alembic migration (003 adds nullable column, 004 enforces NOT NULL) with a backfill script to copy storage objects and populate short_id for existing images. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
import ipaddress
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from ipaddress import IPv4Network, IPv6Network
|
|
from threading import Lock
|
|
|
|
from starlette.requests import Request
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_client_ip(
|
|
request: Request,
|
|
trusted_networks: list[IPv4Network | IPv6Network],
|
|
) -> str:
|
|
"""Return the resolved client IP.
|
|
|
|
Prefers X-Real-IP over X-Forwarded-For when the TCP peer is a trusted
|
|
proxy. ingress-nginx sets X-Real-IP via its realip module using an
|
|
authoritative CIDR allowlist; it overwrites any client-supplied value, so
|
|
it cannot be spoofed via XFF injection. XFF[0] is the fallback for paths
|
|
that lack nginx (none currently exist, but kept for defence in depth).
|
|
"""
|
|
peer = request.client.host if request.client else "unknown"
|
|
if trusted_networks and peer != "unknown":
|
|
try:
|
|
peer_addr = ipaddress.ip_address(peer)
|
|
if any(peer_addr in net for net in trusted_networks):
|
|
real_ip = request.headers.get("X-Real-IP", "").strip()
|
|
if real_ip:
|
|
return real_ip
|
|
# XFF[0] fallback — warn because this path should not be
|
|
# reached in production (nginx always sets X-Real-IP).
|
|
xff = request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
|
|
if xff:
|
|
logger.warning(
|
|
"X-Real-IP absent from trusted peer %s; falling back to XFF[0]", peer
|
|
)
|
|
return xff
|
|
except ValueError:
|
|
pass
|
|
return peer
|
|
|
|
|
|
@dataclass
|
|
class _Record:
|
|
failures: int = 0
|
|
window_start: float = field(default_factory=time.time)
|
|
blocked_until: float = 0.0
|
|
|
|
|
|
class LoginRateLimiter:
|
|
def __init__(
|
|
self,
|
|
max_failures: int = 5,
|
|
window_seconds: int = 300,
|
|
cooldown_seconds: int = 900,
|
|
) -> None:
|
|
self._max = max_failures
|
|
self._window = window_seconds
|
|
self._cooldown = cooldown_seconds
|
|
self._store: dict[str, _Record] = {}
|
|
self._lock = Lock()
|
|
|
|
@property
|
|
def cooldown_seconds(self) -> int:
|
|
return self._cooldown
|
|
|
|
def is_blocked(self, ip: str) -> bool:
|
|
now = time.time()
|
|
with self._lock:
|
|
rec = self._store.get(ip)
|
|
if rec is None:
|
|
return False
|
|
if rec.blocked_until > now:
|
|
return True
|
|
if rec.blocked_until > 0:
|
|
del self._store[ip]
|
|
return False
|
|
|
|
def record_failure(self, ip: str) -> None:
|
|
now = time.time()
|
|
with self._lock:
|
|
rec = self._store.get(ip)
|
|
if rec is None:
|
|
rec = _Record(window_start=now)
|
|
self._store[ip] = rec
|
|
if now - rec.window_start > self._window:
|
|
rec.failures = 0
|
|
rec.window_start = now
|
|
rec.failures += 1
|
|
if rec.failures >= self._max:
|
|
rec.blocked_until = now + self._cooldown
|
|
logger.warning("Login blocked for %s after %d failures", ip, rec.failures)
|
|
|
|
def record_success(self, ip: str) -> None:
|
|
with self._lock:
|
|
self._store.pop(ip, None)
|