3 Commits

Author SHA1 Message Date
7a835d3172 Feat: Rate-limit login endpoint to block brute-force attacks
After LOGIN_MAX_FAILURES consecutive failed attempts from the same source
IP within LOGIN_WINDOW_SECONDS, POST /api/v1/auth/token returns HTTP 429
with a Retry-After header for LOGIN_COOLDOWN_SECONDS. A successful login
resets the counter. Trusted upstream proxy IPs/CIDRs can be declared via
LOGIN_TRUSTED_PROXY_IPS so X-Forwarded-For is honoured correctly behind
nginx ingress or similar reverse proxies.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 21:01:37 +00:00
f3e0021ee8 Feat: Enforce PostgreSQL for integration tests; add Docker test stack
- conftest.py: pytest_configure guard rejects non-postgresql+asyncpg:// URLs
  before any test collects (per constitution §2.5/§5.2 v1.3.0)
- docker-compose.test.yml: isolated postgres-test (5433) + minio-test (9002)
  + api-test runner; one command runs the full suite against real PostgreSQL
- Makefile: test-unit and test-integration targets
- .env.test.example: documents variables needed to run tests outside Docker
- Fix pre-existing test bug: integration tests using client fixture (NoOpAuthProvider)
  for write operations (upload/delete/patch) now use authed_client with Bearer
  token — these were never caught because tests never ran against a live stack

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 19:14:12 +00:00
354c85292d Docs: Bump constitution to v1.3.0 — require PostgreSQL for integration tests
§2.5: Remove the planned PostgreSQL→SQLite refactor note; prohibit
alternative database engines in integration tests.
§5.2: Explicitly require a real PostgreSQL instance for integration
tests; ban SQLite — a GROUP BY/HAVING production bug was masked by
SQLite's permissive dialect in feature 007.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 18:47:57 +00:00
33 changed files with 2093 additions and 53 deletions

View File

@@ -19,3 +19,11 @@ JWT_SECRET_KEY=change-me-to-a-long-random-string
JWT_EXPIRY_SECONDS=86400
OWNER_USERNAME=owner
OWNER_PASSWORD=change-me
# Login brute-force protection
LOGIN_MAX_FAILURES=5
LOGIN_WINDOW_SECONDS=300
LOGIN_COOLDOWN_SECONDS=900
# Comma-separated IPs/CIDRs of trusted upstream proxies (e.g. nginx ingress pod CIDR).
# Leave empty when not behind a reverse proxy.
LOGIN_TRUSTED_PROXY_IPS=

36
.env.test.example Normal file
View File

@@ -0,0 +1,36 @@
# Integration test environment variables
# Used when running pytest directly on the host (outside Docker).
#
# Start test services first:
# docker compose -f docker-compose.test.yml up -d postgres-test minio-test minio-init-test
#
# Then source this file and run tests:
# export $(grep -v '^#' .env.test.example | xargs)
# cd api && python -m pytest tests/integration/ -v
# PostgreSQL test database (postgres-test container on host port 5433)
TEST_DATABASE_URL=postgresql+asyncpg://reactbin:reactbin@localhost:5433/reactbin_test
DATABASE_URL=postgresql+asyncpg://reactbin:reactbin@localhost:5433/reactbin_test
# MinIO test instance (minio-test container on host port 9002)
S3_ENDPOINT_URL=http://localhost:9002
S3_BUCKET_NAME=reactbin-test
S3_ACCESS_KEY_ID=minioadmin
S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
# Auth (test values — not for production)
JWT_SECRET_KEY=test-secret-key-for-testing-only
OWNER_USERNAME=testowner
OWNER_PASSWORD=testpassword
# API
API_BASE_URL=http://localhost:8000
MAX_UPLOAD_BYTES=52428800
# Login brute-force protection
LOGIN_MAX_FAILURES=5
LOGIN_WINDOW_SECONDS=300
LOGIN_COOLDOWN_SECONDS=900
# Comma-separated IPs/CIDRs of trusted upstream proxies; leave empty for direct connections.
LOGIN_TRUSTED_PROXY_IPS=

1
.gitignore vendored
View File

@@ -5,6 +5,7 @@ notes/
.env
.env.*
!.env.example
!.env.test.example
# Python
__pycache__/

View File

@@ -1 +1,3 @@
{"feature_directory":"specs/007-tag-browser"}
{
"feature_directory": "specs/009-login-rate-limiting"
}

View File

@@ -1,8 +1,8 @@
<!--
SYNC IMPACT REPORT
==================
Version change: 1.1.1 → 1.2.0
Ratified: 2026-05-01 | Last amended: 2026-05-03
Version change: 1.2.0 → 1.3.0
Ratified: 2026-05-01 | Last amended: 2026-05-06
Principles introduced (first population from docs/CONSTITUTION.md):
- §2 Architecture Principles (6 sub-principles)
@@ -94,10 +94,11 @@ The constitution acknowledges all three; the spec governs which is built.
### 2.5 Database abstraction
PostgreSQL is the Phase 1 database. All DB access MUST go through a repository
layer (one repository class per domain aggregate). Raw SQL or an ORM is
acceptable, but no query logic MAY live outside a repository. This makes the
planned PostgreSQL → SQLite refactor a repository-layer change only.
PostgreSQL is the database. All DB access MUST go through a repository layer
(one repository class per domain aggregate). Raw SQL or an ORM is acceptable,
but no query logic MAY live outside a repository. No alternative database
engine (SQLite, DuckDB, in-memory substitutes) MAY be used in integration
tests — dialect differences mask production bugs.
### 2.6 No speculative abstraction
@@ -179,8 +180,11 @@ before any implementation step.
### 5.2 Test pyramid
- **Unit tests** — pure logic, repository mocks, no I/O
- **Integration tests** — API routes tested against a real (test) database
and a real (test) S3-compatible bucket (e.g. MinIO in Docker)
- **Integration tests** — API routes tested against a real PostgreSQL instance
and a real S3-compatible bucket (e.g. MinIO in Docker). SQLite and other
in-memory database substitutes are **prohibited** — PostgreSQL-specific
behaviour (GROUP BY enforcement, JSON operators, constraint handling) MUST
be exercised by the test suite.
- **E2E tests** — Angular + API, minimal set covering the core happy paths
Unit and integration tests are required. E2E tests are best-effort in v1.
@@ -284,7 +288,8 @@ Phase 1 design is complete.
| 1.1.0 | 2026-05-02 | Adopted into Spec Kit memory; fixed duplicate §4.3 → §4.4; strengthened "should" language to MUST/MUST NOT; added §9 Governance |
| 1.1.1 | 2026-05-03 | Clarify that the only acceptable form of image transformation or editing is thumbnail generation |
| 1.2.0 | 2026-05-03 | §2.4: Mark Phase 2 (JWT bearer auth) complete, reword phase status; §6: Add PyJWT to tech stack table; §8: Remove username/password auth from out-of-scope (now shipped) |
| 1.3.0 | 2026-05-06 | §2.5: Remove planned PostgreSQL → SQLite refactor note; prohibit alternative database engines in integration tests. §5.2: Explicitly require PostgreSQL for integration tests; prohibit SQLite — a production HAVING/GROUP BY bug was masked by SQLite's permissive dialect. |
---
**Version**: 1.2.0 | **Ratified**: 2026-05-01 | **Last Amended**: 2026-05-03
**Version**: 1.3.0 | **Ratified**: 2026-05-01 | **Last Amended**: 2026-05-06

View File

@@ -1,5 +1,5 @@
<!-- SPECKIT START -->
For additional context about technologies to be used, project structure,
shell commands, and other important information, read the current plan at
`specs/007-tag-browser/plan.md`.
`specs/009-login-rate-limiting/plan.md`.
<!-- SPECKIT END -->

7
Makefile Normal file
View File

@@ -0,0 +1,7 @@
.PHONY: test-unit test-integration
test-unit:
cd api && python -m pytest tests/unit/ -v
test-integration:
docker compose -f docker-compose.test.yml run --rm api-test

View File

@@ -0,0 +1,91 @@
import ipaddress
import logging
import time
from dataclasses import dataclass, field
from ipaddress import IPv4Network, IPv6Network
from threading import Lock
from starlette.requests import Request
logger = logging.getLogger(__name__)
def get_client_ip(
request: Request,
trusted_networks: list[IPv4Network | IPv6Network],
) -> str:
"""Return the resolved client IP, honouring X-Forwarded-For when the
TCP peer is a trusted upstream proxy. Falls back to the TCP peer address
when no trusted networks are configured or the peer is not in the list."""
peer = request.client.host if request.client else "unknown"
if trusted_networks and peer != "unknown":
try:
peer_addr = ipaddress.ip_address(peer)
if any(peer_addr in net for net in trusted_networks):
xff = request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
if xff:
return xff
real_ip = request.headers.get("X-Real-IP", "").strip()
if real_ip:
return real_ip
except ValueError:
pass
return peer
@dataclass
class _Record:
failures: int = 0
window_start: float = field(default_factory=time.time)
blocked_until: float = 0.0
class LoginRateLimiter:
def __init__(
self,
max_failures: int = 5,
window_seconds: int = 300,
cooldown_seconds: int = 900,
) -> None:
self._max = max_failures
self._window = window_seconds
self._cooldown = cooldown_seconds
self._store: dict[str, _Record] = {}
self._lock = Lock()
@property
def cooldown_seconds(self) -> int:
return self._cooldown
def is_blocked(self, ip: str) -> bool:
now = time.time()
with self._lock:
rec = self._store.get(ip)
if rec is None:
return False
if rec.blocked_until > now:
return True
if rec.blocked_until > 0:
del self._store[ip]
return False
def record_failure(self, ip: str) -> None:
now = time.time()
with self._lock:
rec = self._store.get(ip)
if rec is None:
rec = _Record(window_start=now)
self._store[ip] = rec
if now - rec.window_start > self._window:
rec.failures = 0
rec.window_start = now
rec.failures += 1
if rec.failures >= self._max:
rec.blocked_until = now + self._cooldown
logger.warning(
"Login blocked for %s after %d failures", ip, rec.failures
)
def record_success(self, ip: str) -> None:
with self._lock:
self._store.pop(ip, None)

View File

@@ -18,6 +18,10 @@ class Settings(BaseSettings):
jwt_expiry_seconds: int = 86400
owner_username: str
owner_password: str
login_max_failures: int = 5
login_window_seconds: int = 300
login_cooldown_seconds: int = 900
login_trusted_proxy_ips: str = ""
@lru_cache

View File

@@ -1,17 +1,30 @@
from contextlib import asynccontextmanager
import ipaddress
from contextlib import asynccontextmanager, suppress
from fastapi import FastAPI, Request
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
from app.auth.rate_limiter import LoginRateLimiter
from app.config import get_settings
from app.database import Base, get_engine
@asynccontextmanager
async def lifespan(application: FastAPI):
get_settings()
# Verify DB connection and run migrations on startup
settings = get_settings()
application.state.login_rate_limiter = LoginRateLimiter(
max_failures=settings.login_max_failures,
window_seconds=settings.login_window_seconds,
cooldown_seconds=settings.login_cooldown_seconds,
)
trusted_networks = []
for part in settings.login_trusted_proxy_ips.split(","):
part = part.strip()
if part:
with suppress(ValueError):
trusted_networks.append(ipaddress.ip_network(part, strict=False))
application.state.login_trusted_networks = trusted_networks
engine = get_engine()
async with engine.begin() as conn:
# In production, Alembic handles migrations; this is a dev convenience
@@ -22,6 +35,10 @@ async def lifespan(application: FastAPI):
app = FastAPI(title="Reactbin API", version="1.0.0", lifespan=lifespan)
# Defaults so app.state is populated even when lifespan doesn't run (e.g. tests)
app.state.login_rate_limiter = LoginRateLimiter()
app.state.login_trusted_networks = []
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):

View File

@@ -1,7 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.auth.jwt_provider import JWTAuthProvider
from app.auth.rate_limiter import LoginRateLimiter, get_client_ip
from app.dependencies import get_jwt_auth
router = APIRouter(tags=["auth"])
@@ -19,12 +21,32 @@ class TokenResponse(BaseModel):
@router.post("/auth/token", response_model=TokenResponse)
async def login(body: LoginRequest, auth: JWTAuthProvider = Depends(get_jwt_auth)):
async def login(
request: Request,
body: LoginRequest,
auth: JWTAuthProvider = Depends(get_jwt_auth),
):
limiter: LoginRateLimiter = request.app.state.login_rate_limiter
ip: str = get_client_ip(request, request.app.state.login_trusted_networks)
if limiter.is_blocked(ip):
return JSONResponse(
status_code=429,
content={
"detail": "Too many failed login attempts. Please try again later.",
"code": "login_rate_limited",
},
headers={"Retry-After": str(limiter.cooldown_seconds)},
)
if not auth.verify_credentials(body.username, body.password):
limiter.record_failure(ip)
raise HTTPException(
status_code=401,
detail={"detail": "Invalid credentials", "code": "invalid_credentials"},
)
limiter.record_success(ip)
token = auth.create_token()
return TokenResponse(
access_token=token,

View File

@@ -1,5 +1,6 @@
import os
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
@@ -9,11 +10,11 @@ os.environ.setdefault("JWT_SECRET_KEY", "test-secret-key-for-testing-only")
os.environ.setdefault("OWNER_USERNAME", "testowner")
os.environ.setdefault("OWNER_PASSWORD", "testpassword")
from app.auth.jwt_provider import JWTAuthProvider
from app.config import get_settings
from app.database import Base
from app.dependencies import get_auth, get_db, get_storage
from app.main import app
from app.auth.jwt_provider import JWTAuthProvider # noqa: E402
from app.config import get_settings # noqa: E402
from app.database import Base # noqa: E402
from app.dependencies import get_auth, get_db, get_storage # noqa: E402
from app.main import app # noqa: E402
# Bust the LRU cache so get_settings() picks up the env vars set above
get_settings.cache_clear()
@@ -26,8 +27,6 @@ _TEST_OWNER_PASSWORD = os.environ["OWNER_PASSWORD"]
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def engine():
settings = get_settings()
# Use a separate test database URL if TEST_DATABASE_URL is set
import os
db_url = os.getenv("TEST_DATABASE_URL", settings.database_url)
eng = create_async_engine(db_url, echo=False)
async with eng.begin() as conn:
@@ -108,3 +107,15 @@ async def authed_client(db_session, jwt_auth_provider):
yield c, valid_token
app.dependency_overrides.clear()
def pytest_configure(config):
db_url = os.getenv("TEST_DATABASE_URL") or os.getenv("DATABASE_URL", "")
if not db_url.startswith("postgresql+asyncpg://"):
pytest.exit(
"Integration tests require a PostgreSQL database "
"(postgresql+asyncpg://...). "
"Set TEST_DATABASE_URL or DATABASE_URL accordingly. "
f"Got: {db_url!r}",
returncode=1,
)

View File

@@ -19,15 +19,18 @@ def _minimal_jpeg_v2() -> bytes:
@pytest.mark.asyncio
async def test_delete_removes_record(client):
async def test_delete_removes_record(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_jpeg_v2()
upload = await client.post(
"/api/v1/images",
files={"file": ("del-test.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
image_id = upload.json()["id"]
delete_resp = await client.delete(f"/api/v1/images/{image_id}")
delete_resp = await client.delete(f"/api/v1/images/{image_id}", headers=headers)
assert delete_resp.status_code == 204
get_resp = await client.get(f"/api/v1/images/{image_id}")
@@ -36,16 +39,19 @@ async def test_delete_removes_record(client):
@pytest.mark.asyncio
async def test_delete_removes_storage_object(client):
async def test_delete_removes_storage_object(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_jpeg_v2() + b"\x00"
upload = await client.post(
"/api/v1/images",
files={"file": ("del-storage-test.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert upload.status_code in (200, 201)
image_id = upload.json()["id"]
delete_resp = await client.delete(f"/api/v1/images/{image_id}")
delete_resp = await client.delete(f"/api/v1/images/{image_id}", headers=headers)
assert delete_resp.status_code == 204
# Confirm storage redirect no longer works (404 since record is gone)
@@ -54,15 +60,21 @@ async def test_delete_removes_storage_object(client):
@pytest.mark.asyncio
async def test_delete_unknown_id_returns_404(client):
response = await client.delete(f"/api/v1/images/{uuid.uuid4()}")
async def test_delete_unknown_id_returns_404(authed_client):
client, token = authed_client
response = await client.delete(
f"/api/v1/images/{uuid.uuid4()}",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"
@pytest.mark.asyncio
async def test_delete_removes_thumbnail(client):
async def test_delete_removes_thumbnail(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
buf = io.BytesIO()
PILImage.new("RGB", (200, 150), color=(60, 90, 120)).save(buf, format="JPEG")
data = buf.getvalue()
@@ -70,12 +82,13 @@ async def test_delete_removes_thumbnail(client):
upload = await client.post(
"/api/v1/images",
files={"file": ("thumb-del.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert upload.status_code == 201
image_id = upload.json()["id"]
assert upload.json()["thumbnail_key"] is not None
delete_resp = await client.delete(f"/api/v1/images/{image_id}")
delete_resp = await client.delete(f"/api/v1/images/{image_id}", headers=headers)
assert delete_resp.status_code == 204
thumb_resp = await client.get(f"/api/v1/images/{image_id}/thumbnail")

View File

@@ -0,0 +1,121 @@
import os
import pytest
from httpx import AsyncClient
from app.auth.rate_limiter import LoginRateLimiter
from app.main import app
BAD_CREDS = {"username": "attacker", "password": "wrong"}
VALID_CREDS = {
"username": os.environ.get("OWNER_USERNAME", "testowner"),
"password": os.environ.get("OWNER_PASSWORD", "testpassword"),
}
def _fresh_limiter():
return LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=30)
@pytest.mark.asyncio
async def test_repeated_failures_trigger_429(client: AsyncClient):
original_limiter = app.state.login_rate_limiter
original_networks = app.state.login_trusted_networks
app.state.login_rate_limiter = _fresh_limiter()
app.state.login_trusted_networks = []
try:
for _ in range(3):
await client.post("/api/v1/auth/token", json=BAD_CREDS)
resp = await client.post("/api/v1/auth/token", json=BAD_CREDS)
assert resp.status_code == 429
assert resp.json()["code"] == "login_rate_limited"
finally:
app.state.login_rate_limiter = original_limiter
app.state.login_trusted_networks = original_networks
@pytest.mark.asyncio
async def test_success_resets_counter(client: AsyncClient):
original_limiter = app.state.login_rate_limiter
original_networks = app.state.login_trusted_networks
app.state.login_rate_limiter = _fresh_limiter()
app.state.login_trusted_networks = []
try:
for _ in range(2):
await client.post("/api/v1/auth/token", json=BAD_CREDS)
await client.post("/api/v1/auth/token", json=VALID_CREDS)
for _ in range(3):
resp = await client.post("/api/v1/auth/token", json=BAD_CREDS)
assert resp.status_code == 401, "counter should have reset after success"
finally:
app.state.login_rate_limiter = original_limiter
app.state.login_trusted_networks = original_networks
@pytest.mark.asyncio
async def test_429_has_retry_after_header(client: AsyncClient):
original_limiter = app.state.login_rate_limiter
original_networks = app.state.login_trusted_networks
app.state.login_rate_limiter = _fresh_limiter()
app.state.login_trusted_networks = []
try:
for _ in range(3):
await client.post("/api/v1/auth/token", json=BAD_CREDS)
resp = await client.post("/api/v1/auth/token", json=BAD_CREDS)
assert resp.status_code == 429
assert "Retry-After" in resp.headers
assert int(resp.headers["Retry-After"]) > 0
finally:
app.state.login_rate_limiter = original_limiter
app.state.login_trusted_networks = original_networks
@pytest.mark.asyncio
async def test_429_body_shape(client: AsyncClient):
original_limiter = app.state.login_rate_limiter
original_networks = app.state.login_trusted_networks
app.state.login_rate_limiter = _fresh_limiter()
app.state.login_trusted_networks = []
try:
for _ in range(3):
await client.post("/api/v1/auth/token", json=BAD_CREDS)
resp = await client.post("/api/v1/auth/token", json=BAD_CREDS)
assert resp.status_code == 429
assert resp.json() == {
"detail": "Too many failed login attempts. Please try again later.",
"code": "login_rate_limited",
}
finally:
app.state.login_rate_limiter = original_limiter
app.state.login_trusted_networks = original_networks
@pytest.mark.asyncio
async def test_xff_header_ignored_when_no_trusted_networks(client: AsyncClient):
original_limiter = app.state.login_rate_limiter
original_networks = app.state.login_trusted_networks
app.state.login_rate_limiter = _fresh_limiter()
app.state.login_trusted_networks = []
try:
# Send 3 failures all claiming to be "1.2.3.4" via XFF
for _ in range(3):
await client.post(
"/api/v1/auth/token",
json=BAD_CREDS,
headers={"X-Forwarded-For": "1.2.3.4"},
)
# 4th request with a *different* XFF — if XFF were trusted, this
# would appear to be a fresh IP and get 401. Since XFF is ignored,
# the real peer ("testclient") is blocked and we get 429.
resp = await client.post(
"/api/v1/auth/token",
json=BAD_CREDS,
headers={"X-Forwarded-For": "9.9.9.9"},
)
assert resp.status_code == 429, (
"XFF should be ignored when no trusted networks are configured; "
"expected real peer to be blocked"
)
finally:
app.state.login_rate_limiter = original_limiter
app.state.login_trusted_networks = original_networks

View File

@@ -16,7 +16,9 @@ def _minimal_gif() -> bytes:
@pytest.mark.asyncio
async def test_and_filter_returns_only_matching_images(client):
async def test_and_filter_returns_only_matching_images(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_gif()
# Image with both tags
@@ -24,6 +26,7 @@ async def test_and_filter_returns_only_matching_images(client):
"/api/v1/images",
files={"file": ("both.gif", io.BytesIO(data), "image/gif")},
data={"tags": "andcat,andfunny"},
headers=headers,
)
both_id = r_both.json()["id"]
@@ -32,6 +35,7 @@ async def test_and_filter_returns_only_matching_images(client):
"/api/v1/images",
files={"file": ("one.gif", io.BytesIO(data + b"\x00"), "image/gif")},
data={"tags": "andcat"},
headers=headers,
)
response = await client.get("/api/v1/images?tags=andcat,andfunny")
@@ -43,7 +47,9 @@ async def test_and_filter_returns_only_matching_images(client):
@pytest.mark.asyncio
async def test_filter_excludes_partial_tag_match(client):
async def test_filter_excludes_partial_tag_match(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_gif()
# Image with only "exclcat"
@@ -51,6 +57,7 @@ async def test_filter_excludes_partial_tag_match(client):
"/api/v1/images",
files={"file": ("partial.gif", io.BytesIO(data + b"\x01"), "image/gif")},
data={"tags": "exclcat"},
headers=headers,
)
# Filter requires both exclcat and exclother

View File

@@ -29,11 +29,13 @@ def _minimal_webp() -> bytes:
@pytest.mark.asyncio
async def test_file_returns_200_with_content(client):
async def test_file_returns_200_with_content(authed_client):
client, token = authed_client
data = _minimal_webp()
upload = await client.post(
"/api/v1/images",
files={"file": ("img.webp", io.BytesIO(data), "image/webp")},
headers={"Authorization": f"Bearer {token}"},
)
assert upload.status_code in (200, 201)
upload_body = upload.json()
@@ -57,11 +59,13 @@ async def test_file_unknown_id_returns_404(client):
@pytest.mark.asyncio
async def test_file_response_exposes_no_storage_details(client):
async def test_file_response_exposes_no_storage_details(authed_client):
client, token = authed_client
data = _minimal_webp()
upload = await client.post(
"/api/v1/images",
files={"file": ("img.webp", io.BytesIO(data), "image/webp")},
headers={"Authorization": f"Bearer {token}"},
)
assert upload.status_code in (200, 201)
image_id = upload.json()["id"]
@@ -75,11 +79,13 @@ async def test_file_response_exposes_no_storage_details(client):
@pytest.mark.asyncio
async def test_thumbnail_returns_webp(client):
async def test_thumbnail_returns_webp(authed_client):
client, token = authed_client
data = _real_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("t.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert upload.status_code == 201
body = upload.json()
@@ -95,11 +101,13 @@ async def test_thumbnail_returns_webp(client):
@pytest.mark.asyncio
async def test_thumbnail_fallback_returns_original(client, db_session):
async def test_thumbnail_fallback_returns_original(authed_client, db_session):
client, token = authed_client
data = _real_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("fallback.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert upload.status_code == 201
image_id = upload.json()["id"]

View File

@@ -31,12 +31,14 @@ def _minimal_png() -> bytes:
@pytest.mark.asyncio
async def test_upload_with_tags_persists_tags(client):
async def test_upload_with_tags_persists_tags(authed_client):
client, token = authed_client
data = _minimal_png()
response = await client.post(
"/api/v1/images",
files={"file": ("img.png", io.BytesIO(data), "image/png")},
data={"tags": "cat,funny"},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
body = response.json()
@@ -44,12 +46,15 @@ async def test_upload_with_tags_persists_tags(client):
@pytest.mark.asyncio
async def test_duplicate_upload_tags_unchanged(client):
async def test_duplicate_upload_tags_unchanged(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_png()
r1 = await client.post(
"/api/v1/images",
files={"file": ("img.png", io.BytesIO(data), "image/png")},
data={"tags": "original-tag"},
headers=headers,
)
assert r1.status_code in (200, 201)
original_tags = set(r1.json()["tags"])
@@ -58,6 +63,7 @@ async def test_duplicate_upload_tags_unchanged(client):
"/api/v1/images",
files={"file": ("img.png", io.BytesIO(data), "image/png")},
data={"tags": "different-tag"},
headers=headers,
)
assert r2.status_code == 200
assert r2.json()["duplicate"] is True
@@ -65,18 +71,22 @@ async def test_duplicate_upload_tags_unchanged(client):
@pytest.mark.asyncio
async def test_patch_replaces_tag_set(client):
async def test_patch_replaces_tag_set(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_png()
r1 = await client.post(
"/api/v1/images",
files={"file": ("patch-test.png", io.BytesIO(data), "image/png")},
data={"tags": "old-tag"},
headers=headers,
)
image_id = r1.json()["id"]
patch = await client.patch(
f"/api/v1/images/{image_id}/tags",
json={"tags": ["new-tag", "another"]},
headers=headers,
)
assert patch.status_code == 200
body = patch.json()
@@ -85,17 +95,21 @@ async def test_patch_replaces_tag_set(client):
@pytest.mark.asyncio
async def test_patch_invalid_tag_returns_422(client):
async def test_patch_invalid_tag_returns_422(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _minimal_png()
r1 = await client.post(
"/api/v1/images",
files={"file": ("invalid-tag-test.png", io.BytesIO(data), "image/png")},
headers=headers,
)
image_id = r1.json()["id"]
patch = await client.patch(
f"/api/v1/images/{image_id}/tags",
json={"tags": ["valid", "INVALID TAG WITH SPACES!"]},
headers=headers,
)
assert patch.status_code == 422
body = patch.json()
@@ -103,12 +117,14 @@ async def test_patch_invalid_tag_returns_422(client):
@pytest.mark.asyncio
async def test_list_tags_alphabetical_with_counts(client):
async def test_list_tags_alphabetical_with_counts(authed_client):
client, token = authed_client
data = _minimal_png()
await client.post(
"/api/v1/images",
files={"file": ("tag-list-test.png", io.BytesIO(data), "image/png")},
data={"tags": "zebra,apple"},
headers={"Authorization": f"Bearer {token}"},
)
response = await client.get("/api/v1/tags")
assert response.status_code == 200
@@ -121,12 +137,14 @@ async def test_list_tags_alphabetical_with_counts(client):
@pytest.mark.asyncio
async def test_list_tags_prefix_filter(client):
async def test_list_tags_prefix_filter(authed_client):
client, token = authed_client
data = _minimal_png()
await client.post(
"/api/v1/images",
files={"file": ("prefix-test.png", io.BytesIO(data), "image/png")},
data={"tags": "cat,catfish,caterpillar,dog"},
headers={"Authorization": f"Bearer {token}"},
)
response = await client.get("/api/v1/tags?q=cat")
assert response.status_code == 200
@@ -155,13 +173,16 @@ def _unique_png(seed: int) -> bytes:
@pytest.mark.asyncio
async def test_list_tags_sort_count_desc(client):
async def test_list_tags_sort_count_desc(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
# popular-sort-tag appears on 2 images, rare-sort-tag on 1 — verify count_desc ordering
for seed in (100, 101):
await client.post(
"/api/v1/images",
files={"file": (f"sort-{seed}.png", io.BytesIO(_unique_png(seed)), "image/png")},
data={"tags": "popular-sort-tag,rare-sort-tag" if seed == 100 else "popular-sort-tag"},
headers=headers,
)
response = await client.get("/api/v1/tags?sort=count_desc")
assert response.status_code == 200
@@ -177,13 +198,16 @@ async def test_list_tags_sort_count_desc(client):
@pytest.mark.asyncio
async def test_list_tags_min_count_excludes_below_threshold(client):
async def test_list_tags_min_count_excludes_below_threshold(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
# common-min-tag appears on 2 images, uncommon-min-tag on 1
for seed in (200, 201):
await client.post(
"/api/v1/images",
files={"file": (f"min-{seed}.png", io.BytesIO(_unique_png(seed)), "image/png")},
data={"tags": "common-min-tag,uncommon-min-tag" if seed == 200 else "common-min-tag"},
headers=headers,
)
# min_count=2 should exclude uncommon-min-tag (count=1) but keep common-min-tag (count=2)
response = await client.get("/api/v1/tags?min_count=2")

View File

@@ -6,6 +6,7 @@ T029 — file > MAX_UPLOAD_BYTES → 422 file_too_large
T079 — GET /api/v1/images/{id} 404 → error envelope shape
"""
import io
import uuid
from unittest.mock import patch
import pytest
@@ -27,11 +28,13 @@ def _minimal_jpeg() -> bytes:
@pytest.mark.asyncio
async def test_upload_new_image_returns_201(client):
async def test_upload_new_image_returns_201(authed_client):
client, token = authed_client
data = _minimal_jpeg()
response = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
body = response.json()
@@ -44,12 +47,15 @@ async def test_upload_new_image_returns_201(client):
@pytest.mark.asyncio
async def test_upload_duplicate_returns_200_with_flag(client):
async def test_upload_duplicate_returns_200_with_flag(authed_client):
client, token = authed_client
data = _minimal_jpeg()
headers = {"Authorization": f"Bearer {token}"}
# First upload
r1 = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert r1.status_code in (200, 201)
@@ -57,6 +63,7 @@ async def test_upload_duplicate_returns_200_with_flag(client):
r2 = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert r2.status_code == 200
body = r2.json()
@@ -65,10 +72,12 @@ async def test_upload_duplicate_returns_200_with_flag(client):
@pytest.mark.asyncio
async def test_upload_invalid_mime_type_returns_422(client):
async def test_upload_invalid_mime_type_returns_422(authed_client):
client, token = authed_client
response = await client.post(
"/api/v1/images",
files={"file": ("doc.pdf", io.BytesIO(b"%PDF-1.4"), "application/pdf")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 422
body = response.json()
@@ -77,11 +86,12 @@ async def test_upload_invalid_mime_type_returns_422(client):
@pytest.mark.asyncio
async def test_upload_oversized_file_returns_422(client):
async def test_upload_oversized_file_returns_422(authed_client):
import os
from app.config import get_settings
client, token = authed_client
os.environ["MAX_UPLOAD_BYTES"] = "10"
get_settings.cache_clear()
@@ -89,6 +99,7 @@ async def test_upload_oversized_file_returns_422(client):
response = await client.post(
"/api/v1/images",
files={"file": ("big.jpg", io.BytesIO(b"x" * 11), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 422
body = response.json()
@@ -100,7 +111,6 @@ async def test_upload_oversized_file_returns_422(client):
@pytest.mark.asyncio
async def test_get_unknown_image_returns_404_with_envelope(client):
import uuid
response = await client.get(f"/api/v1/images/{uuid.uuid4()}")
assert response.status_code == 404
body = response.json()
@@ -109,11 +119,13 @@ async def test_get_unknown_image_returns_404_with_envelope(client):
@pytest.mark.asyncio
async def test_upload_returns_thumbnail_key(client):
async def test_upload_returns_thumbnail_key(authed_client):
client, token = authed_client
data = _real_jpeg(color=(100, 150, 200))
response = await client.post(
"/api/v1/images",
files={"file": ("thumb_test.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
body = response.json()
@@ -123,17 +135,21 @@ async def test_upload_returns_thumbnail_key(client):
@pytest.mark.asyncio
async def test_duplicate_upload_reuses_thumbnail_key(client):
async def test_duplicate_upload_reuses_thumbnail_key(authed_client):
client, token = authed_client
headers = {"Authorization": f"Bearer {token}"}
data = _real_jpeg(color=(200, 100, 50))
r1 = await client.post(
"/api/v1/images",
files={"file": ("dup.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert r1.status_code in (200, 201)
r2 = await client.post(
"/api/v1/images",
files={"file": ("dup.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert r2.status_code == 200
@@ -144,12 +160,14 @@ async def test_duplicate_upload_reuses_thumbnail_key(client):
@pytest.mark.asyncio
async def test_upload_succeeds_when_thumbnail_fails(client):
async def test_upload_succeeds_when_thumbnail_fails(authed_client):
client, token = authed_client
data = _real_jpeg(color=(50, 200, 150))
with patch("app.routers.images.generate_thumbnail", side_effect=RuntimeError("simulated")):
response = await client.post(
"/api/v1/images",
files={"file": ("no_thumb.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code in (200, 201)
body = response.json()

View File

@@ -0,0 +1,98 @@
import ipaddress
from unittest.mock import MagicMock
from starlette.requests import Request
from app.auth.rate_limiter import LoginRateLimiter, get_client_ip
# ---------------------------------------------------------------------------
# LoginRateLimiter tests
# ---------------------------------------------------------------------------
def make_limiter():
return LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=300)
def test_not_blocked_initially():
assert make_limiter().is_blocked("1.2.3.4") is False
def test_blocked_after_threshold():
limiter = make_limiter()
for _ in range(3):
limiter.record_failure("1.2.3.4")
assert limiter.is_blocked("1.2.3.4") is True
def test_success_clears_failures():
limiter = make_limiter()
limiter.record_failure("1.2.3.4")
limiter.record_failure("1.2.3.4")
limiter.record_success("1.2.3.4")
assert limiter.is_blocked("1.2.3.4") is False
def test_ips_are_isolated():
limiter = make_limiter()
for _ in range(3):
limiter.record_failure("1.1.1.1")
assert limiter.is_blocked("2.2.2.2") is False
def test_window_resets_after_expiry():
import time
limiter = LoginRateLimiter(max_failures=3, window_seconds=0, cooldown_seconds=300)
limiter.record_failure("1.2.3.4")
limiter.record_failure("1.2.3.4")
time.sleep(0.01)
limiter.record_failure("1.2.3.4")
# window expired — counter reset on third call, so failures = 1, not 3
assert limiter.is_blocked("1.2.3.4") is False
def test_log_warning_on_lockout(caplog):
import logging
limiter = make_limiter()
with caplog.at_level(logging.WARNING, logger="app.auth.rate_limiter"):
for _ in range(3):
limiter.record_failure("5.6.7.8")
assert "Login blocked" in caplog.text
assert "5.6.7.8" in caplog.text
# ---------------------------------------------------------------------------
# get_client_ip tests
# ---------------------------------------------------------------------------
def make_request(peer: str, headers: dict) -> MagicMock:
req = MagicMock(spec=Request)
req.client.host = peer
req.headers = headers
return req
def test_get_client_ip_no_trusted_networks_returns_peer():
req = make_request("203.0.113.1", {"X-Forwarded-For": "10.0.0.1"})
assert get_client_ip(req, []) == "203.0.113.1"
def test_get_client_ip_trusted_peer_uses_xff():
req = make_request("10.0.0.1", {"X-Forwarded-For": "203.0.113.5"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "203.0.113.5"
def test_get_client_ip_untrusted_peer_ignores_xff():
req = make_request("8.8.8.8", {"X-Forwarded-For": "203.0.113.5"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "8.8.8.8"
def test_get_client_ip_trusted_peer_falls_back_to_real_ip():
req = make_request("10.0.0.1", {"X-Real-IP": "203.0.113.9"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "203.0.113.9"

67
docker-compose.test.yml Normal file
View File

@@ -0,0 +1,67 @@
services:
postgres-test:
image: postgres:16-alpine
environment:
POSTGRES_USER: reactbin
POSTGRES_PASSWORD: reactbin
POSTGRES_DB: reactbin_test
ports:
- "5433:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U reactbin"]
interval: 5s
timeout: 5s
retries: 5
minio-test:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9002:9000"
- "9003:9001"
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5
minio-init-test:
image: minio/mc:latest
depends_on:
minio-test:
condition: service_healthy
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
entrypoint: >
/bin/sh -c "
mc alias set local http://minio-test:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD &&
mc mb --ignore-existing local/reactbin-test
"
api-test:
build:
context: ./api
environment:
TEST_DATABASE_URL: postgresql+asyncpg://reactbin:reactbin@postgres-test:5432/reactbin_test
DATABASE_URL: postgresql+asyncpg://reactbin:reactbin@postgres-test:5432/reactbin_test
S3_ENDPOINT_URL: http://minio-test:9000
S3_BUCKET_NAME: reactbin-test
S3_ACCESS_KEY_ID: minioadmin
S3_SECRET_ACCESS_KEY: minioadmin
S3_REGION: us-east-1
JWT_SECRET_KEY: test-secret-key-for-testing-only
OWNER_USERNAME: testowner
OWNER_PASSWORD: testpassword
API_BASE_URL: http://localhost:8000
MAX_UPLOAD_BYTES: "52428800"
depends_on:
postgres-test:
condition: service_healthy
minio-init-test:
condition: service_completed_successfully
command: ["python", "-m", "pytest", "tests/", "-v"]
working_dir: /app

View File

@@ -0,0 +1,236 @@
# Implementation Plan: PostgreSQL Integration Test Infrastructure
**Branch**: `master` | **Date**: 2026-05-06 | **Spec**: specs/008-postgres-integration-tests/spec.md
**Input**: Feature specification from `specs/008-postgres-integration-tests/spec.md`
---
## Summary
Enforce the constitution's PostgreSQL mandate (§2.5, §5.2 v1.3.0) for integration tests. Three concrete deliverables: (1) a fast-fail guard in `conftest.py` that rejects non-PostgreSQL URLs before any test collects, (2) a `docker-compose.test.yml` that provides isolated `postgres-test` and `minio-test` services and an `api-test` runner, and (3) a `Makefile` + `.env.test.example` that document the canonical test commands.
---
## Technical Context
**Language/Version**: Python 3.12, Docker Compose v2
**Primary Dependencies**: pytest, pytest-asyncio, asyncpg, SQLAlchemy 2.x (all already in `pyproject.toml [dev]`)
**Storage**: PostgreSQL 16-alpine (test instance), MinIO (test instance)
**Testing**: pytest — this feature *is* the test infrastructure change
**Target Platform**: Developer workstation (Linux/macOS) with Docker
**Project Type**: Infrastructure / developer-experience
**Performance Goals**: Guard exits in < 2 s; full integration suite continues to run in < 60 s
**Constraints**: Must not break the existing dev compose stack; no changes to application source code
**Scale/Scope**: One guard, one compose file, one Makefile, one env example
---
## Constitution Check
| Principle | Status | Notes |
|-----------|--------|-------|
| §2.5 Database abstraction — no alternative DB in integration tests | ✅ ENFORCED | This feature implements the enforcement |
| §5.1 TDD — failing test before implementation | ✅ | Guard itself is tested by running with a bad URL before adding the guard |
| §5.2 Test pyramid — integration tests use real PostgreSQL | ✅ ENFORCED | docker-compose.test.yml provides the real instance |
| §5.4 CI must pass before task is done | ✅ | Verified by running the full suite via compose |
| §6 Tech stack — asyncpg driver, Docker Compose | ✅ | No new technologies introduced |
| §7.1 One-command local start | ✅ | `docker compose -f docker-compose.test.yml run --rm api-test` |
| §7.2 Environment config via env vars | ✅ | .env.test.example documents all vars |
| §7.3 Linting not optional | ✅ | ruff will run as part of task validation |
No violations.
---
## Project Structure
### Documentation (this feature)
```text
specs/008-postgres-integration-tests/
├── plan.md ← this file
├── research.md ← decisions made above
├── spec.md ← feature specification
└── tasks.md ← generated by /speckit-tasks
```
### Source changes
```text
# New files
docker-compose.test.yml ← isolated test services + api-test runner
.env.test.example ← documents test environment variables
Makefile ← test-unit / test-integration targets
# Modified files
api/tests/integration/conftest.py ← add postgresql+asyncpg:// dialect guard
```
No application source files (`api/app/`) are modified. No UI files are touched.
---
## Detailed Design
### 1. conftest.py — dialect guard
Add a module-level `pytest_configure` hook at the top of `api/tests/integration/conftest.py`. It resolves the database URL (same logic as the `engine` fixture: prefer `TEST_DATABASE_URL`, fall back to `settings.database_url`) and calls `pytest.exit()` if the scheme is not `postgresql+asyncpg`:
```python
def pytest_configure(config):
import os
db_url = os.getenv("TEST_DATABASE_URL") or os.getenv("DATABASE_URL", "")
if not db_url.startswith("postgresql+asyncpg://"):
pytest.exit(
"Integration tests require a PostgreSQL database "
"(postgresql+asyncpg://...). "
"Set TEST_DATABASE_URL or DATABASE_URL accordingly. "
f"Got: {db_url!r}",
returncode=1,
)
```
The hook runs before any fixture or collection, giving an immediate, unambiguous error.
**Note**: This guard goes in `api/tests/integration/conftest.py` only, not in `api/tests/conftest.py`, so that unit tests (which use no database) are unaffected.
### 2. docker-compose.test.yml
```yaml
services:
postgres-test:
image: postgres:16-alpine
environment:
POSTGRES_USER: reactbin
POSTGRES_PASSWORD: reactbin
POSTGRES_DB: reactbin_test
ports:
- "5433:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U reactbin"]
interval: 5s
timeout: 5s
retries: 5
minio-test:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9002:9000"
- "9003:9001"
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5
minio-init-test:
image: minio/mc:latest
depends_on:
minio-test:
condition: service_healthy
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
entrypoint: >
/bin/sh -c "
mc alias set local http://minio-test:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD &&
mc mb --ignore-existing local/reactbin-test
"
api-test:
build:
context: ./api
environment:
TEST_DATABASE_URL: postgresql+asyncpg://reactbin:reactbin@postgres-test:5432/reactbin_test
DATABASE_URL: postgresql+asyncpg://reactbin:reactbin@postgres-test:5432/reactbin_test
S3_ENDPOINT_URL: http://minio-test:9000
S3_BUCKET_NAME: reactbin-test
S3_ACCESS_KEY_ID: minioadmin
S3_SECRET_ACCESS_KEY: minioadmin
S3_REGION: us-east-1
JWT_SECRET_KEY: test-secret-key-for-testing-only
OWNER_USERNAME: testowner
OWNER_PASSWORD: testpassword
API_BASE_URL: http://localhost:8000
MAX_UPLOAD_BYTES: "52428800"
depends_on:
postgres-test:
condition: service_healthy
minio-init-test:
condition: service_completed_successfully
command: ["python", "-m", "pytest", "tests/", "-v"]
working_dir: /app
```
### 3. .env.test.example
Documents the variables needed to run integration tests from the host (with postgres-test and minio-test already running via compose):
```bash
# Integration test environment — used when running pytest directly on the host
# Start test services first: docker compose -f docker-compose.test.yml up -d postgres-test minio-test minio-init-test
TEST_DATABASE_URL=postgresql+asyncpg://reactbin:reactbin@localhost:5433/reactbin_test
DATABASE_URL=postgresql+asyncpg://reactbin:reactbin@localhost:5433/reactbin_test
S3_ENDPOINT_URL=http://localhost:9002
S3_BUCKET_NAME=reactbin-test
S3_ACCESS_KEY_ID=minioadmin
S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
JWT_SECRET_KEY=test-secret-key-for-testing-only
OWNER_USERNAME=testowner
OWNER_PASSWORD=testpassword
API_BASE_URL=http://localhost:8000
MAX_UPLOAD_BYTES=52428800
```
### 4. Makefile
```makefile
.PHONY: test-unit test-integration
test-unit:
cd api && python -m pytest tests/unit/ -v
test-integration:
docker compose -f docker-compose.test.yml run --rm api-test
```
---
## Phase Breakdown
### Phase 1: Guard (FR-001) — US1
- Write a failing test: run `pytest api/tests/integration/` with `TEST_DATABASE_URL=sqlite+aiosqlite:///test.db` — confirm it does NOT exit early (test that the guard is absent)
- Add `pytest_configure` guard to `api/tests/integration/conftest.py`
- Verify: running with SQLite URL now exits immediately with the correct message
- Verify: running with a PostgreSQL URL proceeds normally
### Phase 2: Docker Compose test stack (FR-002, FR-003) — US2
- Write `docker-compose.test.yml` with `postgres-test`, `minio-test`, `minio-init-test`, `api-test`
- Run `docker compose -f docker-compose.test.yml run --rm api-test` — all tests pass
- Confirm dev stack (port 5432, 9000) is unaffected
### Phase 3: Documentation (FR-004, FR-005) — US3
- Write `.env.test.example`
- Write `Makefile` with `test-unit` and `test-integration`
- Verify `make test-unit` runs unit tests without Docker
- Verify `make test-integration` invokes the compose command
### Phase 4: Polish
- `ruff check api/app/ api/tests/` — zero violations
- `ng lint` is unaffected (no UI changes)
---
## No data model or API contracts
This feature touches only developer tooling. No new API endpoints, database schema changes, or UI components.

View File

@@ -0,0 +1,38 @@
# Quickstart: Integration Test Infrastructure
## Run the full integration test suite (Docker, recommended)
```bash
docker compose -f docker-compose.test.yml run --rm api-test
```
Test services start automatically. The command exits with pytest's return code.
## Run unit tests only (no Docker required)
```bash
make test-unit
# or directly:
cd api && python -m pytest tests/unit/ -v
```
## Run integration tests from the host (test services must be running)
```bash
# Start test services
docker compose -f docker-compose.test.yml up -d postgres-test minio-test minio-init-test
# Copy and source test env vars
cp .env.test.example .env.test
export $(cat .env.test | xargs)
# Run tests
cd api && python -m pytest tests/integration/ -v
```
## Validate the guard works
```bash
TEST_DATABASE_URL=sqlite+aiosqlite:///test.db python -m pytest api/tests/integration/
# Expected: exits immediately with "Integration tests require postgresql+asyncpg://"
```

View File

@@ -0,0 +1,55 @@
# Research: PostgreSQL Integration Test Infrastructure
## Decision 1: How to enforce the PostgreSQL dialect in conftest.py
**Decision**: Add a `pytest_configure` hook (or a module-level guard in `conftest.py`) that calls `pytest.exit()` if the resolved database URL does not start with `postgresql+asyncpg://`.
**Rationale**: `pytest_configure` runs before collection, giving the clearest possible failure signal. A module-level assertion would also work but produces a less readable traceback. `pytest.exit()` with a human-readable message is the idiomatic approach.
**Alternatives considered**:
- A custom pytest plugin in a separate file — unnecessary complexity for a one-liner guard.
- Raising an exception in the `engine` fixture — runs too late (after collection); developers see confusing fixture errors instead of a clear message.
---
## Decision 2: Separate docker-compose.test.yml vs profiles in docker-compose.yml
**Decision**: Use a standalone `docker-compose.test.yml` at the repo root.
**Rationale**: Docker Compose profiles require the developer to remember `--profile test` on every command. A separate file is explicit and self-contained. The test file can define its own service names and ports without touching the dev compose file at all.
**Alternatives considered**:
- `docker-compose.yml` with a `test` profile — profile discovery is non-obvious; modifying the dev file risks breaking the dev stack.
- A `docker-compose.override.yml` — override files apply automatically to `docker compose up`, which is the opposite of what we want for tests.
---
## Decision 3: Port assignments for test services
**Decision**:
- `postgres-test`: host port 5433 (standard offset from dev 5432)
- `minio-test` API: host port 9002 (offset from dev 9000)
- `minio-test` console: host port 9003 (offset from dev 9001)
**Rationale**: Predictable offsets make it easy to remember. Developers running both stacks simultaneously won't hit port conflicts.
---
## Decision 4: S3 isolation strategy for tests
**Decision**: The `api-test` service sets `S3_BUCKET_NAME=reactbin-test` pointing to the dedicated `minio-test` instance. The `minio-init-test` sidecar creates that bucket before tests run.
**Rationale**: The existing conftest already manages database isolation via `create_all` / `drop_all`. MinIO requires bucket pre-creation (same as dev). A dedicated test bucket on a dedicated test MinIO instance gives full isolation. No changes to application storage code are needed.
---
## Decision 5: Makefile vs shell scripts
**Decision**: A `Makefile` at the repo root with `test-unit` and `test-integration` targets.
**Rationale**: `make` is universally available on Linux/macOS developer machines. The targets are short wrappers that document the canonical test invocation. No build logic; just convenience aliases.
**Alternatives considered**:
- Shell scripts (`scripts/test.sh`) — no discoverability; `make help` is more ergonomic.
- `package.json` scripts — wrong tool for a Python/Docker project.
- `justfile` — not universally installed.

View File

@@ -0,0 +1,95 @@
# Feature Specification: PostgreSQL Integration Test Infrastructure
**Feature Branch**: `008-postgres-integration-tests`
**Created**: 2026-05-06
**Status**: Draft
---
## Overview
Integration tests currently permit any SQLAlchemy-compatible database URL, including SQLite. This allowed a real production bug (incorrect `HAVING` without `GROUP BY`) to ship undetected because SQLite's permissive dialect did not reject it. The project constitution (§2.5, §5.2 v1.3.0) now explicitly mandates PostgreSQL for integration tests. This feature enforces that mandate with infrastructure and guardrails.
---
## User Scenarios & Testing
### User Story 1 — Integration tests are enforced to run against PostgreSQL (Priority: P1)
A developer running `pytest` against a non-PostgreSQL database URL receives an immediate, descriptive failure before any test runs.
**Why this priority**: Directly addresses the production bug that prompted this feature. Without this, the constitution mandate has no teeth.
**Independent Test**: Set `TEST_DATABASE_URL=sqlite+aiosqlite:///test.db` and run `pytest api/tests/integration/`. Confirm pytest exits immediately with a message identifying the dialect problem and naming the required scheme.
**Acceptance Scenarios**:
1. **Given** `TEST_DATABASE_URL` is set to a SQLite URL, **When** `pytest api/tests/integration/` is invoked, **Then** pytest exits before collecting any test with an error: `Integration tests require postgresql+asyncpg://`.
2. **Given** `DATABASE_URL` is unset and `TEST_DATABASE_URL` is unset, **When** pytest is invoked, **Then** pytest exits with a clear message about the missing database URL.
3. **Given** `TEST_DATABASE_URL` is a valid `postgresql+asyncpg://` URL, **When** pytest is invoked, **Then** tests collect and run normally.
---
### User Story 2 — One-command integration test run against isolated services (Priority: P1)
A developer can run the entire integration test suite against dedicated, isolated PostgreSQL and MinIO instances with a single command.
**Why this priority**: Without this, the PostgreSQL requirement is mandated but impractical — developers have no easy way to satisfy it.
**Independent Test**: From the repo root with Docker available, run `docker compose -f docker-compose.test.yml run --rm api-test`. Confirm all integration tests pass, test containers start and stop cleanly, and dev database/bucket are untouched.
**Acceptance Scenarios**:
1. **Given** Docker is running and dev services are stopped, **When** the test command is run, **Then** isolated `postgres-test` and `minio-test` services start, all tests run against them, and the command exits with pytest's return code.
2. **Given** dev services are running on their normal ports, **When** the test command is run, **Then** test services use different ports (5433, 9002/9003) and do not interfere with the dev stack.
3. **Given** any test data is written during the run, **When** the test run completes, **Then** all test schema is dropped (conftest teardown is unchanged).
---
### User Story 3 — Test infrastructure is documented (Priority: P2)
A developer new to the project can understand how to run unit tests vs integration tests without reading the source code.
**Independent Test**: Read `.env.test.example` and `Makefile`. Confirm all required environment variables are documented and `make test-unit` / `make test-integration` targets are present.
**Acceptance Scenarios**:
1. **Given** a fresh clone, **When** the developer reads `.env.test.example`, **Then** they see every variable needed to run integration tests outside Docker, with example values.
2. **Given** the Makefile, **When** the developer runs `make test-unit`, **Then** the pytest unit suite runs without requiring Docker.
3. **Given** the Makefile, **When** the developer runs `make test-integration`, **Then** the Docker Compose test command runs.
---
### Edge Cases
- What if `TEST_DATABASE_URL` is set but malformed? — The guard should still catch a non-PostgreSQL scheme; asyncpg will raise its own error for a malformed URL.
- What if Docker is not available? — `make test-integration` fails at the Docker level with Docker's own error; the Makefile does not need to guard for this.
- What if the test PostgreSQL port (5433) is already in use? — Standard Docker port conflict error; no special handling needed.
---
## Requirements
### Functional Requirements
- **FR-001**: `conftest.py` MUST assert the resolved database URL starts with `postgresql+asyncpg://` and call `pytest.exit()` with a descriptive message before any test collects.
- **FR-002**: A `docker-compose.test.yml` MUST define isolated `postgres-test` (port 5433) and `minio-test` (ports 9002/9003) services and an `api-test` runner service.
- **FR-003**: The `api-test` service MUST set `TEST_DATABASE_URL` pointing to `postgres-test` and all S3 env vars pointing to `minio-test`.
- **FR-004**: A `.env.test.example` MUST document all environment variables required to run integration tests outside Docker.
- **FR-005**: A `Makefile` MUST provide `test-unit` and `test-integration` targets.
---
## Success Criteria
- **SC-001**: Running `pytest api/tests/integration/` with a SQLite URL exits in under 2 seconds with a clear error message — no tests run.
- **SC-002**: `docker compose -f docker-compose.test.yml run --rm api-test` completes successfully with all integration tests passing.
- **SC-003**: Dev services (postgres on 5432, minio on 9000) are unaffected when the test command runs.
---
## Assumptions
- Docker Compose v2 (`docker compose`) is available in the developer environment.
- The existing `conftest.py` `engine` fixture (session-scoped `create_all` / `drop_all`) continues to handle schema lifecycle; no per-test transaction rollback mechanism is introduced.
- CI/CD pipeline configuration is out of scope for this feature.

View File

@@ -0,0 +1,113 @@
# Tasks: PostgreSQL Integration Test Infrastructure
**Input**: Design documents from `specs/008-postgres-integration-tests/`
**Prerequisites**: plan.md ✅, spec.md ✅, research.md ✅, quickstart.md ✅
**Tests**: TDD is non-negotiable (§5.1). For infrastructure tasks the "failing test" is a verification step that confirms the thing being built is absent before building it, then confirms it works after. Every user story has an explicit TDD red step before its implementation task.
**Organization**: No foundational blocking phase — all three user stories touch independent files and can proceed in order.
## Format: `[ID] [P?] [Story] Description`
- **[P]**: Can run in parallel with other [P] tasks in the same phase
- **[Story]**: Which user story this task belongs to
- Exact file paths included in every task description
---
## Phase 1: Setup
No new project structure required. The existing layout accommodates all changes.
---
## Phase 2: User Story 1 — Dialect guard in conftest (Priority: P1) 🎯 MVP
**Goal**: `pytest api/tests/integration/` exits immediately with a clear message if the database URL is not `postgresql+asyncpg://`.
**Independent Test**: Run `TEST_DATABASE_URL=sqlite+aiosqlite:///test.db python -m pytest api/tests/integration/ -q` — command exits in < 2 s with the error message `Integration tests require postgresql+asyncpg://` and no tests are collected.
- [X] T001 [US1] Confirm guard is absent (TDD red): from `api/`, run `TEST_DATABASE_URL=sqlite+aiosqlite:///test.db python -m pytest tests/integration/ -q --co 2>&1 | head -20` — observe that tests ARE collected and note the count (guard not yet in place)
- [X] T002 [US1] Add `pytest_configure` hook to `api/tests/integration/conftest.py` — resolve URL via `os.getenv("TEST_DATABASE_URL") or os.getenv("DATABASE_URL", "")`, call `pytest.exit("Integration tests require postgresql+asyncpg://...", returncode=1)` if URL does not start with `postgresql+asyncpg://`; place hook before any imports that depend on the database URL
- [X] T003 [US1] Verify guard works (TDD green): run `TEST_DATABASE_URL=sqlite+aiosqlite:///test.db python -m pytest api/tests/integration/ -q` — confirm immediate exit with the correct error message and zero tests collected; also confirm a valid `postgresql+asyncpg://` URL does not trigger the guard
**Checkpoint**: Dialect-mismatched test runs are blocked before any test collects.
---
## Phase 3: User Story 2 — Docker Compose test stack (Priority: P1)
**Goal**: `docker compose -f docker-compose.test.yml run --rm api-test` runs the full integration suite against isolated PostgreSQL and MinIO services on different ports than the dev stack.
**Independent Test**: Run `docker compose -f docker-compose.test.yml run --rm api-test` from the repo root — all tests pass; verify `docker compose ps` shows dev services (if running) are unaffected on their original ports.
- [X] T004 [US2] Confirm compose file is absent (TDD red): run `test -f docker-compose.test.yml && echo EXISTS || echo ABSENT` — confirm output is `ABSENT`
- [X] T005 [US2] Create `docker-compose.test.yml` at the repo root with four services: `postgres-test` (image `postgres:16-alpine`, host port 5433, db `reactbin_test`), `minio-test` (image `minio/minio:latest`, host ports 9002/9003), `minio-init-test` (creates bucket `reactbin-test`, depends on `minio-test` healthy), and `api-test` (builds from `./api`, runs `python -m pytest tests/ -v`, depends on `postgres-test` healthy and `minio-init-test` completed, environment sets `TEST_DATABASE_URL=postgresql+asyncpg://reactbin:reactbin@postgres-test:5432/reactbin_test`, `DATABASE_URL` to same value, and all S3 vars pointing to `minio-test:9000` with bucket `reactbin-test`) — follow exact design in `specs/008-postgres-integration-tests/plan.md`
- [X] T006 [US2] Verify compose stack (TDD green): run `docker compose -f docker-compose.test.yml run --rm api-test` — confirm all integration tests pass; confirm no errors about missing env vars or connection failures
**Checkpoint**: Full integration suite runs against real PostgreSQL via one command.
---
## Phase 4: User Story 3 — Test documentation (Priority: P2)
**Goal**: `.env.test.example` and `Makefile` document how to run both test tiers.
**Independent Test**: Read `.env.test.example` — all variables needed for integration tests are present with example values. Run `make test-unit` — pytest unit suite runs without Docker and passes.
- [X] T007 [P] [US3] Create `.env.test.example` at the repo root documenting all variables required to run integration tests outside Docker: `TEST_DATABASE_URL`, `DATABASE_URL`, `S3_ENDPOINT_URL`, `S3_BUCKET_NAME`, `S3_ACCESS_KEY_ID`, `S3_SECRET_ACCESS_KEY`, `S3_REGION`, `JWT_SECRET_KEY`, `OWNER_USERNAME`, `OWNER_PASSWORD`, `API_BASE_URL`, `MAX_UPLOAD_BYTES` — with example values pointing to `localhost:5433` and `localhost:9002` (test service ports); include a comment explaining how to start test services first — follow exact design in `specs/008-postgres-integration-tests/plan.md`
- [X] T008 [P] [US3] Create `Makefile` at the repo root with `.PHONY: test-unit test-integration`, `test-unit` target running `cd api && python -m pytest tests/unit/ -v`, and `test-integration` target running `docker compose -f docker-compose.test.yml run --rm api-test`
- [X] T009 [US3] Verify `make test-unit` — unit tests pass without Docker (validates the Makefile target and confirms unit tests have no Docker dependency)
- [X] T010 Verify `make test-integration` — Docker integration suite passes end-to-end (cross-story verification: exercises the US2 compose stack via the US3 Makefile target)
**Checkpoint**: All three user stories independently functional.
---
## Phase 5: Polish & Cross-Cutting Concerns
- [X] T011 Run `ruff check api/app/ api/tests/` — zero violations (conftest change must pass ruff; fix any issues)
---
## Dependencies & Execution Order
### Phase Dependencies
- **Phase 2 (US1)**: No external dependencies — can start immediately
- **Phase 3 (US2)**: Depends on Phase 2 (guard must be in place so the compose stack run exercises it)
- **Phase 4 (US3)**: T007 and T008 are independent file writes (can run in parallel with each other after Phase 3); T009 requires T008; T010 requires T008 and T006
- **Phase 5 (Polish)**: Depends on all prior phases
### Within Phase 4
- T007 ∥ T008 (different files, no dependency)
- T009 after T008 (Makefile must exist)
- T010 after T008 and T006 (requires both Makefile and compose stack)
### Execution Order Summary
```
Step 1: T001, T002, T003 (sequential — TDD for guard)
Step 2: T004, T005, T006 (sequential — TDD for compose stack)
Step 3 (parallel): T007, T008
Step 4: T009 (after T008), T010 (after T008 + T006)
Step 5: T011
```
---
## Implementation Strategy
### MVP (US1 — the guard)
1. Complete T001T003
2. **Validate**: SQLite URL is blocked; PostgreSQL URL proceeds
3. US2 and US3 add the infrastructure to make the mandate practical
### Incremental Delivery
- After Phase 2: Dialect bugs are caught immediately — core safety net is in place
- After Phase 3: Full integration suite runs against PostgreSQL via one Docker command
- After Phase 4: Both test tiers are documented and accessible via `make`
- After Phase 5: Lint clean, ready for merge

View File

@@ -0,0 +1,34 @@
# Specification Quality Checklist: Login Brute-Force Protection
**Purpose**: Validate specification completeness and quality before proceeding to planning
**Created**: 2026-05-06
**Feature**: [spec.md](../spec.md)
## Content Quality
- [X] No implementation details (languages, frameworks, APIs)
- [X] Focused on user value and business needs
- [X] Written for non-technical stakeholders
- [X] All mandatory sections completed
## Requirement Completeness
- [X] No [NEEDS CLARIFICATION] markers remain
- [X] Requirements are testable and unambiguous
- [X] Success criteria are measurable
- [X] Success criteria are technology-agnostic (no implementation details)
- [X] All acceptance scenarios are defined
- [X] Edge cases are identified
- [X] Scope is clearly bounded
- [X] Dependencies and assumptions identified
## Feature Readiness
- [X] All functional requirements have clear acceptance criteria
- [X] User scenarios cover primary flows
- [X] Feature meets measurable outcomes defined in Success Criteria
- [X] No implementation details leak into specification
## Notes
- All items pass. Spec is ready for `/speckit-plan`.

View File

@@ -0,0 +1,85 @@
# API Contract: Authentication
## POST /api/v1/auth/token
Authenticates the owner and returns a JWT access token.
**This endpoint is modified by feature 009** to enforce brute-force protection.
All previous behaviour is preserved. One new response code (429) is added.
### Request
```
POST /api/v1/auth/token
Content-Type: application/json
```
```json
{
"username": "string",
"password": "string"
}
```
### Responses
#### 200 OK — Credentials accepted
```json
{
"access_token": "<jwt>",
"token_type": "bearer",
"expires_in": 86400
}
```
Side effect: resets the failure counter for the caller's IP address.
---
#### 401 Unauthorized — Credentials rejected
```json
{
"detail": "Invalid credentials",
"code": "invalid_credentials"
}
```
Side effect: increments the failure counter for the caller's IP address. If the
counter reaches `LOGIN_MAX_FAILURES`, subsequent requests from this IP will receive
429 until the cooldown expires.
---
#### 429 Too Many Requests — Source blocked after repeated failures
**This response is new in feature 009.**
```
HTTP/1.1 429 Too Many Requests
Retry-After: 900
Content-Type: application/json
```
```json
{
"detail": "Too many failed login attempts. Please try again later.",
"code": "login_rate_limited"
}
```
The `Retry-After` header value is the configured cooldown duration in seconds (default: 900).
It reflects the maximum possible wait, not the exact remaining lockout time.
No credentials are verified when this response is returned — the request is
rejected before authentication is attempted.
---
### Notes
- The failure counter is per source IP address (TCP peer, not forwarded headers).
- Threshold values (`LOGIN_MAX_FAILURES`, `LOGIN_WINDOW_SECONDS`, `LOGIN_COOLDOWN_SECONDS`)
are not disclosed in any response.
- Counters are in-memory and reset on process restart.

View File

@@ -0,0 +1,53 @@
# Data Model: Login Brute-Force Protection
## Overview
This feature introduces no new database tables. The only data entity is a transient,
in-memory rate-limit record that does not survive process restarts. This is intentional
(see research.md Decision 3).
---
## Entity: Rate-Limit Record (in-memory only)
| Field | Type | Description |
|----------------|---------|-----------------------------------------------------------------------------|
| `failures` | int | Count of consecutive failed login attempts in the current window |
| `window_start` | float | Unix timestamp marking when the current counting window began |
| `blocked_until`| float | Unix timestamp after which the source is no longer blocked; 0.0 if not blocked |
**Keyed by**: resolved client IP address string (e.g., `"192.168.1.1"`); see `get_client_ip()` in `rate_limiter.py` for resolution logic
**Lifecycle**:
1. Record is created on the first failed login from a source.
2. `failures` increments on each subsequent failure within the window.
3. When `failures >= LOGIN_MAX_FAILURES`, `blocked_until` is set to `now + LOGIN_COOLDOWN_SECONDS`.
4. When `blocked_until` has passed, the record is deleted on the next request from that source.
5. A successful login deletes the record immediately (failure counter reset).
6. If `now - window_start > LOGIN_WINDOW_SECONDS` without triggering lockout, the counter resets within the existing record.
**State machine**:
```
[no record]
│ first failure
[tracking] ──── failure N ≥ max ────► [blocked]
│ │
│ success / window expires │ cooldown expires
▼ ▼
[no record] ◄─────────────────────── [no record]
```
---
## Configuration Entity: Rate-Limit Settings
Stored as environment variables; loaded via `app.config.Settings`:
| Env Var | Default | Description |
|----------------------------|---------|----------------------------------------------------------|
| `LOGIN_MAX_FAILURES` | `5` | Failures within window before lockout |
| `LOGIN_WINDOW_SECONDS` | `300` | Rolling window duration in seconds (5 minutes) |
| `LOGIN_COOLDOWN_SECONDS` | `900` | Lockout duration in seconds after threshold exceeded (15 minutes) |
| `LOGIN_TRUSTED_PROXY_IPS` | `""` | Comma-separated IPs/CIDRs of trusted upstream proxies (e.g., `10.0.0.0/8`); empty = disabled |

View File

@@ -0,0 +1,388 @@
# Implementation Plan: Login Brute-Force Protection
**Branch**: `009-login-rate-limiting` | **Date**: 2026-05-06 | **Spec**: [spec.md](spec.md)
**Input**: Feature specification from `specs/009-login-rate-limiting/spec.md`
## Summary
Add failure-counting brute-force protection to the login endpoint (`POST /api/v1/auth/token`).
After a configurable number of consecutive failed attempts from the same resolved client IP,
the endpoint returns HTTP 429 with a `Retry-After` header for a configurable cooldown period.
A successful login resets the counter. All thresholds are configurable via environment variables.
When deployed behind a reverse proxy (nginx, Kubernetes ingress), a `LOGIN_TRUSTED_PROXY_IPS`
setting enables extraction of the real client IP from `X-Forwarded-For`. No new infrastructure
(no Redis, no new DB table) — counters live in process memory.
---
## Technical Context
**Language/Version**: Python 3.12+
**Primary Dependencies**: FastAPI, pydantic-settings (already in use); no new dependencies added
**Storage**: In-memory `dict` (no persistence across restarts — intentional)
**Testing**: pytest + pytest-asyncio (existing test infrastructure)
**Target Platform**: Linux server (Docker)
**Project Type**: Web service (API only — this feature has no UI surface)
**Performance Goals**: Rate limiter adds negligible overhead (dict lookup + lock acquisition; sub-millisecond)
**Constraints**: Must not add new runtime service dependencies; must not change any auth behaviour for non-blocked sources
**Scale/Scope**: Single process, single user; in-memory store is sufficient
---
## Constitution Check
| Principle | Status | Notes |
|-----------|--------|-------|
| §2.4 Auth abstraction (AuthProvider interface) | ✅ Pass | Rate limiter is a guard *before* `JWTAuthProvider.verify_credentials()`, not a bypass of the interface |
| §2.5 DB abstraction (repository layer) | ✅ Pass | No database access; in-memory only |
| §2.6 No speculative abstraction | ✅ Pass | Concrete `LoginRateLimiter` class, no interface; only one implementation planned |
| §3.3 Error envelope (`detail` + `code`) | ✅ Pass | 429 response uses `{"detail": "...", "code": "login_rate_limited"}` |
| §5.1 TDD | ✅ Required | Tasks follow red → green order |
| §5.2 Integration tests against PostgreSQL | ✅ Pass | Integration test for the login endpoint will run against the Docker PostgreSQL stack |
| §7.2 Environment configuration | ✅ Pass | `LOGIN_MAX_FAILURES`, `LOGIN_WINDOW_SECONDS`, `LOGIN_COOLDOWN_SECONDS`, `LOGIN_TRUSTED_PROXY_IPS` from env vars |
| §7.3 Linting (ruff) | ✅ Required | All new files must pass `ruff check` |
**Gate result**: No violations. Cleared to proceed.
---
## Project Structure
### Documentation (this feature)
```text
specs/009-login-rate-limiting/
├── plan.md ← this file
├── research.md ← decisions on approach
├── data-model.md ← rate-limit record entity
├── quickstart.md ← curl runbook
├── contracts/
│ └── auth.md ← updated POST /api/v1/auth/token with 429
└── tasks.md ← generated by /speckit-tasks
```
### Source Code Changes
```text
api/
├── app/
│ ├── auth/
│ │ ├── rate_limiter.py ← NEW: LoginRateLimiter class
│ │ ├── jwt_provider.py (unchanged)
│ │ ├── noop.py (unchanged)
│ │ └── provider.py (unchanged)
│ ├── config.py ← add login_max_failures, login_window_seconds, login_cooldown_seconds, login_trusted_proxy_ips
│ ├── main.py ← init LoginRateLimiter in lifespan, attach to app.state
│ └── routers/
│ └── auth.py ← check rate limit before auth, record outcome
└── tests/
├── unit/
│ └── test_rate_limiter.py ← NEW: unit tests for LoginRateLimiter logic
└── integration/
└── test_login_rate_limit.py ← NEW: integration tests for 429 behaviour via HTTP
```
---
## Implementation Detail
### `api/app/auth/rate_limiter.py`
```python
import ipaddress
import logging
import time
from dataclasses import dataclass, field
from ipaddress import IPv4Network, IPv6Network
from threading import Lock
from starlette.requests import Request
logger = logging.getLogger(__name__)
def get_client_ip(
request: Request,
trusted_networks: list[IPv4Network | IPv6Network],
) -> str:
"""Return the resolved client IP, honouring X-Forwarded-For when the
TCP peer is a trusted upstream proxy. Falls back to the TCP peer address
when no trusted networks are configured or the peer is not in the list."""
peer = request.client.host if request.client else "unknown"
if trusted_networks and peer != "unknown":
try:
peer_addr = ipaddress.ip_address(peer)
if any(peer_addr in net for net in trusted_networks):
xff = request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
if xff:
return xff
real_ip = request.headers.get("X-Real-IP", "").strip()
if real_ip:
return real_ip
except ValueError:
pass
return peer
@dataclass
class _Record:
failures: int = 0
window_start: float = field(default_factory=time.time)
blocked_until: float = 0.0
class LoginRateLimiter:
def __init__(
self,
max_failures: int = 5,
window_seconds: int = 300,
cooldown_seconds: int = 900,
) -> None:
self._max = max_failures
self._window = window_seconds
self._cooldown = cooldown_seconds
self._store: dict[str, _Record] = {}
self._lock = Lock()
@property
def cooldown_seconds(self) -> int:
return self._cooldown
def is_blocked(self, ip: str) -> bool:
now = time.time()
with self._lock:
rec = self._store.get(ip)
if rec is None:
return False
if rec.blocked_until > now:
return True
if rec.blocked_until > 0:
del self._store[ip]
return False
def record_failure(self, ip: str) -> None:
now = time.time()
with self._lock:
rec = self._store.get(ip)
if rec is None:
rec = _Record(window_start=now)
self._store[ip] = rec
if now - rec.window_start > self._window:
rec.failures = 0
rec.window_start = now
rec.failures += 1
if rec.failures >= self._max:
rec.blocked_until = now + self._cooldown
logger.warning(
"Login blocked for %s after %d failures", ip, rec.failures
)
def record_success(self, ip: str) -> None:
with self._lock:
self._store.pop(ip, None)
```
### `api/app/config.py` additions
```python
login_max_failures: int = 5
login_window_seconds: int = 300
login_cooldown_seconds: int = 900
login_trusted_proxy_ips: str = "" # comma-separated IPs/CIDRs; empty = disabled
```
### `api/app/main.py` lifespan update
```python
import ipaddress
from app.auth.rate_limiter import LoginRateLimiter
@asynccontextmanager
async def lifespan(application: FastAPI):
settings = get_settings()
application.state.login_rate_limiter = LoginRateLimiter(
max_failures=settings.login_max_failures,
window_seconds=settings.login_window_seconds,
cooldown_seconds=settings.login_cooldown_seconds,
)
trusted_networks = []
for part in settings.login_trusted_proxy_ips.split(","):
part = part.strip()
if part:
try:
trusted_networks.append(ipaddress.ip_network(part, strict=False))
except ValueError:
pass # invalid entry — skip silently
application.state.login_trusted_networks = trusted_networks
# ... existing DB setup unchanged
engine = get_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
await engine.dispose()
```
### `api/app/routers/auth.py` update
```python
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.auth.jwt_provider import JWTAuthProvider
from app.auth.rate_limiter import LoginRateLimiter, get_client_ip
from app.dependencies import get_jwt_auth
router = APIRouter(tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
@router.post("/auth/token", response_model=TokenResponse)
async def login(
request: Request,
body: LoginRequest,
auth: JWTAuthProvider = Depends(get_jwt_auth),
):
limiter: LoginRateLimiter = request.app.state.login_rate_limiter
ip: str = get_client_ip(request, request.app.state.login_trusted_networks)
if limiter.is_blocked(ip):
return JSONResponse(
status_code=429,
content={
"detail": "Too many failed login attempts. Please try again later.",
"code": "login_rate_limited",
},
headers={"Retry-After": str(limiter.cooldown_seconds)},
)
if not auth.verify_credentials(body.username, body.password):
limiter.record_failure(ip)
raise HTTPException(
status_code=401,
detail={"detail": "Invalid credentials", "code": "invalid_credentials"},
)
limiter.record_success(ip)
token = auth.create_token()
return TokenResponse(
access_token=token,
token_type="bearer",
expires_in=auth._expiry_seconds,
)
```
### `api/tests/unit/test_rate_limiter.py` (representative cases)
```python
import time
import pytest
from app.auth.rate_limiter import LoginRateLimiter
def test_not_blocked_initially():
limiter = LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=300)
assert limiter.is_blocked("1.2.3.4") is False
def test_blocked_after_threshold():
limiter = LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=300)
for _ in range(3):
limiter.record_failure("1.2.3.4")
assert limiter.is_blocked("1.2.3.4") is True
def test_success_clears_failures():
limiter = LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=300)
limiter.record_failure("1.2.3.4")
limiter.record_failure("1.2.3.4")
limiter.record_success("1.2.3.4")
assert limiter.is_blocked("1.2.3.4") is False
def test_ips_are_isolated():
limiter = LoginRateLimiter(max_failures=2, window_seconds=60, cooldown_seconds=300)
limiter.record_failure("1.1.1.1")
limiter.record_failure("1.1.1.1")
assert limiter.is_blocked("2.2.2.2") is False
```
### `api/tests/integration/test_login_rate_limit.py` (representative cases)
```python
import pytest
from httpx import AsyncClient
# Uses the 'client' fixture (NoOpAuthProvider) from conftest — sufficient for this
# endpoint since we're testing the rate-limit layer, not auth correctness.
# The login endpoint instantiates its own limiter via app.state, so we need
# the full ASGI app.
BAD_CREDS = {"username": "attacker", "password": "wrong"}
@pytest.mark.asyncio
async def test_repeated_failures_trigger_429(client: AsyncClient):
# Use a custom limiter with low threshold to avoid slow tests
# (the app.state.login_rate_limiter is set in lifespan; override for test)
from app.auth.rate_limiter import LoginRateLimiter
from app.main import app
original = app.state.login_rate_limiter
app.state.login_rate_limiter = LoginRateLimiter(
max_failures=3, window_seconds=60, cooldown_seconds=30
)
try:
for _ in range(3):
await client.post("/api/v1/auth/token", json=BAD_CREDS)
resp = await client.post("/api/v1/auth/token", json=BAD_CREDS)
assert resp.status_code == 429
assert resp.json()["code"] == "login_rate_limited"
assert "Retry-After" in resp.headers
finally:
app.state.login_rate_limiter = original
```
---
## Implementation Phases
### Phase 1 (MVP — P1): Blocking after repeated failures
1. Add `login_max_failures`, `login_window_seconds`, `login_cooldown_seconds`, `login_trusted_proxy_ips` to `api/app/config.py`
2. Create `api/app/auth/rate_limiter.py` with `LoginRateLimiter` and `get_client_ip()`
3. Initialize rate limiter and parse trusted networks in `api/app/main.py` lifespan; attach both to `app.state`
4. Update `api/app/routers/auth.py` to resolve client IP via `get_client_ip()`, then check + record outcomes
5. Unit tests: `api/tests/unit/test_rate_limiter.py`
6. Integration tests: `api/tests/integration/test_login_rate_limit.py`
### Phase 2 (US2 — observability): Logging and response hints
Delivered as part of Phase 1 (the `logger.warning(...)` call and `Retry-After` header
are embedded in the same implementation). No separate phase needed.
---
## Environment Variables to Add to `.env.example`
```dotenv
# Login brute-force protection
LOGIN_MAX_FAILURES=5
LOGIN_WINDOW_SECONDS=300
LOGIN_COOLDOWN_SECONDS=900
# Comma-separated IPs/CIDRs of trusted upstream proxies (e.g. nginx ingress pod CIDR).
# Leave empty when not behind a reverse proxy.
LOGIN_TRUSTED_PROXY_IPS=
```
These are optional (have defaults) so existing `.env` files without them continue working.

View File

@@ -0,0 +1,112 @@
# Quickstart: Login Brute-Force Protection
## Prerequisites
- API running (via `docker compose up` or locally with `.env` set)
- `curl` available
---
## Scenario 1: Trigger the rate limiter
Send 6 consecutive failed login attempts (default threshold is 5):
```bash
for i in $(seq 1 6); do
echo "Attempt $i:"
curl -s -o /dev/null -w "%{http_code}\n" \
-X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/json" \
-d '{"username": "wrong", "password": "wrong"}'
done
```
Expected output:
```
Attempt 1: 401
Attempt 2: 401
Attempt 3: 401
Attempt 4: 401
Attempt 5: 401
Attempt 6: 429
```
The 6th attempt returns 429. Inspect the headers:
```bash
curl -i -X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/json" \
-d '{"username": "wrong", "password": "wrong"}'
```
Expected headers include:
```
HTTP/1.1 429 Too Many Requests
Retry-After: 900
```
Expected body:
```json
{"detail": "Too many failed login attempts. Please try again later.", "code": "login_rate_limited"}
```
---
## Scenario 2: Successful login resets the counter
Make some failed attempts, then log in with valid credentials:
```bash
# Fail twice
for i in 1 2; do
curl -s -o /dev/null -w "fail $i: %{http_code}\n" \
-X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/json" \
-d '{"username": "wrong", "password": "wrong"}'
done
# Succeed — resets counter
curl -s -o /dev/null -w "success: %{http_code}\n" \
-X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/json" \
-d '{"username": "'"$OWNER_USERNAME"'", "password": "'"$OWNER_PASSWORD"'"}'
# Now fail 5 more times — counter was reset, so no 429 yet
for i in $(seq 1 5); do
curl -s -o /dev/null -w "fail after reset $i: %{http_code}\n" \
-X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/json" \
-d '{"username": "wrong", "password": "wrong"}'
done
```
Expected: all "fail after reset" lines return 401 (not 429), confirming the counter was reset.
---
## Scenario 3: Observe log output
While triggering the rate limiter (Scenario 1), watch API logs:
```bash
docker compose logs -f api
```
After the threshold is crossed you should see a line like:
```
WARNING app.auth.rate_limiter:rate_limiter.py:NN Login blocked for 172.18.0.1 after 5 failures
```
---
## Environment variable overrides
To test with a lower threshold without code changes:
```bash
LOGIN_MAX_FAILURES=2 LOGIN_WINDOW_SECONDS=60 LOGIN_COOLDOWN_SECONDS=30 \
uvicorn app.main:app --reload
```
Then only 2 failures trigger the lockout, and it clears after 30 seconds.

View File

@@ -0,0 +1,67 @@
# Research: Login Brute-Force Protection
## Decision 1: Library vs. custom implementation
**Decision**: Custom in-memory failure tracker (no new library dependency)
**Rationale**: The requirement is to count *failed* login attempts specifically and reset on success — not to rate-limit all requests regardless of outcome. Popular libraries like `slowapi` count all requests to a route, which would break FR-004 (reset on success) without significant workarounds. A purpose-built 60-line class is simpler, more auditable, and has no dependency footprint.
**Alternatives considered**:
- `slowapi` (built on `limits`): Counts all requests, not failures. Requires patching the exception handler to decrement on success — fragile and non-obvious.
- `slowapi` with a custom key function: Could be done, but the library's storage model doesn't expose a "reset this key" API in a clean way.
- Redis-backed counter: Overkill for a single-user personal app with one instance. No new infrastructure justified.
---
## Decision 2: Fixed window vs. sliding window
**Decision**: Fixed window with per-source reset on successful login
**Rationale**: Fixed window is simpler to implement correctly and sufficient for this use case. The main attack — rapid sequential guessing — is fully addressed. The known "burst at window boundary" weakness is irrelevant here because: (a) the cooldown period is separate from the counting window, and (b) a successful login resets the counter entirely.
**Alternatives considered**:
- Sliding window: More accurate, but adds complexity (requires storing timestamps of each request). The marginal security benefit doesn't justify the implementation cost for a personal single-user app.
---
## Decision 3: In-memory backing store
**Decision**: Python `dict` keyed by source IP, protected by a threading `Lock`
**Rationale**: The application runs as a single process. In-memory storage means counters reset on restart — this is acceptable and matches the "fail open" assumption in the spec. No new infrastructure (Redis, database table) is required.
**Alternatives considered**:
- Database-backed counters: Persistent across restarts, but adds a DB round-trip to every login request (including successful ones). Not justified.
- Redis: Distributed-safe and persistent, but requires a new service dependency. Out of scope for a personal single-instance app.
---
## Decision 4: Source identifier
**Decision**: `request.client.host` (the TCP peer address)
**Rationale**: The spec explicitly states not to trust `X-Forwarded-For` headers unless the app is known to be behind a trusted proxy. `request.client.host` in Starlette/FastAPI is the actual TCP peer IP — it cannot be spoofed by an attacker sending arbitrary headers.
**Alternatives considered**:
- `X-Forwarded-For` first value: Spoofable if the app is not behind a trusted proxy (attacker can set arbitrary header values).
- `X-Real-IP`: Same spoofing concern.
---
## Decision 5: 429 response and Retry-After header
**Decision**: Return HTTP 429 with `{"detail": "...", "code": "login_rate_limited"}` and a `Retry-After` header set to the configured cooldown duration in seconds
**Rationale**: HTTP 429 is the standard "Too Many Requests" status. The `Retry-After` header is explicitly mentioned in the spec (US2 acceptance scenario) and is required by RFC 6585 for rate-limit responses. Setting it to the *configured* cooldown (not the exact remaining time) satisfies FR-005: it doesn't reveal precise expiry, just the maximum wait. The response body follows §3.3 of the constitution (error envelope with `detail` and `code`).
---
## Decision 6: Default threshold values
**Decision**: `LOGIN_MAX_FAILURES=5`, `LOGIN_WINDOW_SECONDS=300` (5 min), `LOGIN_COOLDOWN_SECONDS=900` (15 min)
**Rationale**: Industry standard for web apps. 5 attempts is enough for legitimate typos but makes brute-force infeasible at human scale. A 5-minute counting window matches typical "I fat-fingered my password" retry patterns. A 15-minute cooldown is a meaningful deterrent without locking out a legitimate owner indefinitely.
**Alternatives considered**:
- 3 failures / 60 s window / 300 s cooldown: More aggressive, but too likely to lock out the legitimate owner on a bad day.
- 10 failures: Too permissive for a brute-force defense.

View File

@@ -0,0 +1,84 @@
# Feature Specification: Login Brute-Force Protection
**Feature Branch**: `009-login-rate-limiting`
**Created**: 2026-05-06
**Status**: Draft
**Input**: User description: "Login API endpoints should be rate limited or otherwise protected against brute force attacks"
## User Scenarios & Testing *(mandatory)*
### User Story 1 - Repeated failed logins are blocked (Priority: P1)
An attacker (or misconfigured client) sending many rapid login attempts with the wrong password is slowed or blocked before they can exhaustively guess credentials. After a threshold number of consecutive failures from the same source, the system refuses further attempts for a cooldown period and returns a clear, non-leaking error.
**Why this priority**: Directly prevents credential-stuffing and brute-force attacks against the sole privileged account. Without this, the owner account is exposed to automated password guessing with no friction.
**Independent Test**: Send more than the allowed number of failed login requests in quick succession and confirm that subsequent attempts are rejected with a rate-limit or lockout response — without knowing or changing the real password.
**Acceptance Scenarios**:
1. **Given** an attacker sends N+1 failed login attempts within the configured window, **When** the (N+1)th request arrives, **Then** the system returns an error response indicating the request is blocked (not the normal "invalid credentials" error) and does not process the login attempt.
2. **Given** a legitimate user has been temporarily blocked after too many failures, **When** the cooldown period elapses and they retry with the correct password, **Then** they are logged in successfully.
3. **Given** a legitimate user makes a few failed attempts and then waits beyond the cooldown window, **When** they retry within the next window, **Then** their failure counter resets and they are not blocked.
---
### User Story 2 - Operators can observe and reason about blocking activity (Priority: P2)
When the protection triggers, the system produces enough observable signal (log entries, response metadata) that an operator can confirm the feature is working, diagnose false positives, and tune thresholds — without exposing sensitive details to the client.
**Why this priority**: Invisible security controls are unmanageable. Operators need to know the system is doing what it claims, and blocked legitimate users need a clear (but not exploitable) explanation.
**Independent Test**: Trigger the rate limiter and confirm that: (a) the response body or headers communicate that the request was blocked and when the client may retry; (b) the server logs an entry identifying the blocked source and the reason.
**Acceptance Scenarios**:
1. **Given** a source is blocked, **When** they receive the rejection response, **Then** the response indicates they should wait before retrying (e.g., a `Retry-After` hint) without disclosing the exact threshold values.
2. **Given** the rate limiter fires, **When** an operator inspects server logs, **Then** there is a log entry at WARNING level or above recording the blocked source and timestamp.
---
### Edge Cases
- What happens when a distributed attacker rotates IPs to avoid per-IP limits?
- How does the system behave if the backing store for rate-limit counters is temporarily unavailable — does it fail open (allow all) or fail closed (block all)?
- Are IPv6 addresses and IPv4-mapped-IPv6 addresses treated consistently?
- Does a successful login reset the failure counter for that source?
- What happens if many legitimate users share a NAT/proxy IP (e.g., corporate network)?
- What if `TRUSTED_PROXY_IPS` is configured to include an IP that an external attacker controls? (An attacker could then spoof `X-Forwarded-For` and rotate fake source IPs to bypass the rate limiter — operators must only list genuinely trusted upstream infrastructure.)
## Requirements *(mandatory)*
### Functional Requirements
- **FR-001**: The system MUST enforce a maximum number of failed login attempts per source identifier (the resolved client IP address) within a rolling time window before blocking further attempts.
- **FR-002**: Once a source exceeds the failure threshold, the system MUST reject subsequent login requests for a configurable cooldown period, returning a distinct response (not the normal invalid-credentials response).
- **FR-003**: After the cooldown period expires, the system MUST permit the source to attempt login again, resetting its failure count.
- **FR-004**: A successful login MUST reset the failure counter for that source, preventing accumulation of old failures from blocking future legitimate access.
- **FR-005**: The rejection response MUST NOT reveal the specific threshold values or remaining lockout duration in a way that aids an attacker in timing their attempts, but MUST provide enough information (e.g., "try again later") for a legitimate user to understand the situation.
- **FR-006**: The system MUST log a structured warning event whenever a source is blocked, including the source identifier and timestamp.
- **FR-007**: Rate-limit thresholds (maximum attempts, window duration, cooldown duration) MUST be configurable without code changes.
- **FR-008**: The system MUST support a configurable list of trusted upstream proxy IP addresses and CIDR ranges. When the TCP peer address matches a trusted proxy, the resolved client IP MUST be extracted from the `X-Forwarded-For` request header (first entry) or, if absent, `X-Real-IP`. When no trusted proxies are configured, the TCP peer address MUST be used directly and forwarded-IP headers MUST be ignored.
### Key Entities
- **Rate-limit record**: Tracks the number of consecutive failures and the window start time for a given source identifier; expires automatically after the cooldown period.
- **Source identifier**: The resolved client IP address used to key rate-limit records. When `LOGIN_TRUSTED_PROXY_IPS` is empty (default), this is the TCP peer address. When one or more proxy IPs/CIDRs are configured and the TCP peer matches, the first `X-Forwarded-For` entry (or `X-Real-IP`) is used instead.
## Success Criteria *(mandatory)*
### Measurable Outcomes
- **SC-001**: An automated script sending 100 consecutive failed login requests completes with at least 90 of those requests rejected after the threshold is crossed — verified in a controlled test environment.
- **SC-002**: A legitimate user who has been temporarily blocked can successfully log in within 5 minutes of the cooldown period expiring without any manual intervention.
- **SC-003**: Zero information about threshold values or exact lockout expiry is present in blocked response bodies or headers.
- **SC-004**: Every blocking event produces a corresponding log entry; 100% of triggered blocking events are observable in logs during testing.
## Assumptions
- The application has a single login endpoint used by all clients (the owner login introduced in feature 004).
- Source identification uses the resolved client IP address. By default (when `LOGIN_TRUSTED_PROXY_IPS` is empty) this is the TCP peer address. When one or more proxy IPs/CIDRs are configured, the first entry of `X-Forwarded-For` (or `X-Real-IP`) is used instead — but only when the TCP peer is in the trusted list, preventing header spoofing by external clients.
- If the rate-limit backing store is unavailable, the system fails open (allows the attempt through) rather than blocking all logins — this preserves the owner's access, which is critical for a single-user admin application.
- No CAPTCHA or multi-factor step is in scope; protection is purely count/time-based.
- The feature targets the login endpoint only; other endpoints are out of scope.
- The single-user nature of the app means IP-based identification is sufficient — there is no need for per-username lockout, and using IP (rather than username) avoids contributing to username enumeration risk.

View File

@@ -0,0 +1,120 @@
# Tasks: Login Brute-Force Protection
**Input**: Design documents from `specs/009-login-rate-limiting/`
**Prerequisites**: plan.md ✅, spec.md ✅, research.md ✅, data-model.md ✅, contracts/auth.md ✅, quickstart.md ✅
**Tests**: TDD is non-negotiable (§5.1). Every test task appears before the implementation task it covers. For each red step, run the test and confirm it fails before proceeding to the implementation.
**Organization**: Phase 1 adds env vars; Phase 2 adds config fields (shared by both stories); Phase 3 implements the core blocking behaviour (US1 MVP); Phase 4 adds observability-specific test coverage (US2); Phase 5 is polish.
## Format: `[ID] [P?] [Story] Description`
- **[P]**: Can run in parallel with other [P] tasks in the same phase
- **[Story]**: Which user story this task belongs to
- Exact file paths included in every task description
---
## Phase 1: Setup
- [X] T001 Add a `# Login brute-force protection` comment block with `LOGIN_MAX_FAILURES=5`, `LOGIN_WINDOW_SECONDS=300`, `LOGIN_COOLDOWN_SECONDS=900`, and `LOGIN_TRUSTED_PROXY_IPS=` (empty by default, with an inline comment explaining it accepts comma-separated IPs/CIDRs) to both `.env.example` and `.env.test.example` at the repo root
---
## Phase 2: Foundational
**Purpose**: Add the three new settings fields — required before any story implementation.
- [X] T002 Add `login_max_failures: int = 5`, `login_window_seconds: int = 300`, `login_cooldown_seconds: int = 900`, `login_trusted_proxy_ips: str = ""` to the `Settings` class in `api/app/config.py` (append after `owner_password`)
**Checkpoint**: `api/app/config.py` accepts all three new env vars with defaults.
---
## Phase 3: User Story 1 — Repeated failed logins are blocked (Priority: P1) 🎯 MVP
**Goal**: After `LOGIN_MAX_FAILURES` consecutive failed login attempts from the same source IP within `LOGIN_WINDOW_SECONDS`, `POST /api/v1/auth/token` returns HTTP 429 for `LOGIN_COOLDOWN_SECONDS`. A successful login resets the counter.
**Independent Test**: `cd api && python -m pytest tests/unit/test_rate_limiter.py tests/integration/test_login_rate_limit.py::test_repeated_failures_trigger_429 tests/integration/test_login_rate_limit.py::test_success_resets_counter tests/integration/test_login_rate_limit.py::test_429_has_retry_after_header tests/integration/test_login_rate_limit.py::test_xff_header_ignored_when_no_trusted_networks -v` — all pass.
### Tests for User Story 1 (TDD red — write first, confirm failure before T005)
- [X] T003 [P] [US1] Create `api/tests/unit/test_rate_limiter.py` with ten failing unit tests — import `LoginRateLimiter` and `get_client_ip` from `app.auth.rate_limiter`; for `LoginRateLimiter` (instantiate with `max_failures=3, window_seconds=60, cooldown_seconds=300`): `test_not_blocked_initially`, `test_blocked_after_threshold`, `test_success_clears_failures`, `test_ips_are_isolated`, `test_window_resets_after_expiry`, `test_log_warning_on_lockout` (caplog at WARNING level: call `record_failure()` until threshold, assert `"Login blocked" in caplog.text` and IP in log output); for `get_client_ip` (construct a mock using `from unittest.mock import MagicMock` and `from starlette.requests import Request`: `req = MagicMock(spec=Request); req.client.host = "10.0.0.1"; req.headers = {"X-Forwarded-For": "203.0.113.5"}`): `test_get_client_ip_no_trusted_networks_returns_peer` (empty `trusted_networks=[]` → returns `req.client.host`), `test_get_client_ip_trusted_peer_uses_xff` (peer `"10.0.0.1"` in trusted CIDR `"10.0.0.0/8"` → returns `"203.0.113.5"`), `test_get_client_ip_untrusted_peer_ignores_xff` (peer `"8.8.8.8"` not in trusted CIDR `"10.0.0.0/8"` → returns `"8.8.8.8"` despite XFF), `test_get_client_ip_trusted_peer_falls_back_to_real_ip` (peer trusted, no XFF header, `X-Real-IP: "203.0.113.9"` → returns `"203.0.113.9"`); run `python -m pytest tests/unit/test_rate_limiter.py -v` and confirm `ImportError` or `ModuleNotFoundError` (red)
- [X] T004 [P] [US1] Create `api/tests/integration/test_login_rate_limit.py` with four failing integration tests; each must override both `app.state.login_rate_limiter` (fresh `LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=30)`) and `app.state.login_trusted_networks` (set to `[]` for all four tests — the `ASGITransport` peer is `"testclient"`, not a valid IP, so trusted-network matching can't be exercised here; proxy extraction is fully covered by T003 unit tests) via try/finally: (1) `test_repeated_failures_trigger_429` — POST three bad-credential requests then assert fourth returns 429 with `resp.json()["code"] == "login_rate_limited"`; (2) `test_success_resets_counter` — two failures → one valid login using `{"username": os.environ["OWNER_USERNAME"], "password": os.environ["OWNER_PASSWORD"]}` (matching conftest.py defaults: `testowner`/`testpassword`) → three more failures → assert all three return 401, not 429; (3) `test_429_has_retry_after_header` — trigger lockout (three failures), then assert `"Retry-After" in resp.headers` and `int(resp.headers["Retry-After"]) > 0`; (4) `test_xff_header_ignored_when_no_trusted_networks` — send three bad-cred requests with `headers={"X-Forwarded-For": "1.2.3.4"}` then a fourth with `headers={"X-Forwarded-For": "9.9.9.9"}` — assert the fourth returns 429 (not 401), proving the limiter tracked the real peer `"testclient"` for all requests and XFF was ignored; run `python -m pytest tests/integration/test_login_rate_limit.py -v` and confirm failure (red)
### Implementation for User Story 1
- [X] T005 [US1] Create `api/app/auth/rate_limiter.py` with two exports: (1) `get_client_ip(request: Request, trusted_networks: list[IPv4Network | IPv6Network]) -> str` — imports `ipaddress`, `from ipaddress import IPv4Network, IPv6Network`, `from starlette.requests import Request`; extracts `peer = request.client.host if request.client else "unknown"`; if `trusted_networks` is non-empty and peer is parseable as an IP address and falls within any trusted network, returns first `X-Forwarded-For` entry (strip whitespace) or `X-Real-IP` value, otherwise returns `peer`; wraps `ipaddress.ip_address(peer)` in `try/except ValueError` and falls back to `peer` on error; (2) `LoginRateLimiter` class: `__init__(self, max_failures: int = 5, window_seconds: int = 300, cooldown_seconds: int = 900)` storing params as `_max`, `_window`, `_cooldown`; `_store: dict[str, _Record]` and `_lock: threading.Lock`; `@dataclass _Record` with `failures: int = 0`, `window_start: float = field(default_factory=time.time)`, `blocked_until: float = 0.0`; `is_blocked(ip: str) -> bool`, `record_failure(ip: str) -> None` (logs WARNING on lockout), `record_success(ip: str) -> None`, `cooldown_seconds` property; stdlib imports: `import ipaddress, logging, time`, `from dataclasses import dataclass, field`, `from threading import Lock`
- [X] T006 [US1] Update `api/app/main.py` lifespan: add `import ipaddress` at top; import `LoginRateLimiter` from `app.auth.rate_limiter`; inside `lifespan` before `engine = get_engine()`, consolidate to `settings = get_settings()` (remove the existing bare `get_settings()` call), then set `application.state.login_rate_limiter = LoginRateLimiter(max_failures=settings.login_max_failures, window_seconds=settings.login_window_seconds, cooldown_seconds=settings.login_cooldown_seconds)`; then parse `settings.login_trusted_proxy_ips` — split on `","`, strip each part, skip empty strings, call `ipaddress.ip_network(part, strict=False)` inside a `try/except ValueError` (skip invalid entries silently), collect results into `trusted_networks: list`; set `application.state.login_trusted_networks = trusted_networks`
- [X] T007 [US1] Update `api/app/routers/auth.py` login endpoint: add `Request` to FastAPI imports and add `from fastapi.responses import JSONResponse`; add `from app.auth.rate_limiter import LoginRateLimiter, get_client_ip`; add `request: Request` as first parameter to `login()`; extract `limiter: LoginRateLimiter = request.app.state.login_rate_limiter` and `ip: str = get_client_ip(request, request.app.state.login_trusted_networks)`; add guard block — if `limiter.is_blocked(ip)`: return `JSONResponse(status_code=429, content={"detail": "Too many failed login attempts. Please try again later.", "code": "login_rate_limited"}, headers={"Retry-After": str(limiter.cooldown_seconds)})`; after `verify_credentials` returns False: call `limiter.record_failure(ip)` before the existing `HTTPException`; after `auth.create_token()`: call `limiter.record_success(ip)` before returning `TokenResponse`
- [X] T008 [US1] Verify TDD green: run `cd api && python -m pytest tests/unit/test_rate_limiter.py -v` — all 10 pass; run `make test-integration` — all tests pass including `test_repeated_failures_trigger_429`, `test_success_resets_counter`, `test_429_has_retry_after_header`, and `test_xff_header_ignored_when_no_trusted_networks`
**Checkpoint**: Brute-force blocking is live. Automated repeated failures are stopped after threshold; the owner can still log in after cooldown; unit and integration tests pass.
---
## Phase 4: User Story 2 — Operators can observe blocking activity (Priority: P2)
**Goal**: The 429 response includes a `Retry-After` header with a positive integer; the response body `code` is `"login_rate_limited"` and contains no threshold numeric values; server logs a WARNING when blocking triggers.
**Independent Test**: Trigger the rate limiter (already works from Phase 3) and assert `Retry-After` header is present in the response and `code` field is `"login_rate_limited"`.
### Tests for User Story 2 (TDD red — extend existing file)
- [X] T009 [US2] Add one test to `api/tests/integration/test_login_rate_limit.py` targeting observability properties not yet covered: `test_429_body_shape` — override `app.state.login_rate_limiter` with a fresh `LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=30)` via try/finally (same isolation pattern as T004), trigger lockout (three failures), then assert `resp.json() == {"detail": "Too many failed login attempts. Please try again later.", "code": "login_rate_limited"}` (exact match — confirms no threshold values leak and shape is correct); confirm this test is green immediately against the US1 implementation (T007 already returns this exact body)
**Checkpoint**: US2 observability properties are explicitly exercised by integration tests; a future regression in the Retry-After header or code field will be caught.
---
## Phase 5: Polish & Cross-Cutting Concerns
- [X] T010 Run `cd api && ruff check app/auth/rate_limiter.py app/routers/auth.py app/config.py app/main.py tests/unit/test_rate_limiter.py tests/integration/test_login_rate_limit.py` — fix any violations
---
## Dependencies & Execution Order
### Phase Dependencies
- **Phase 1 (Setup)**: No external dependencies — can start immediately
- **Phase 2 (Foundational)**: No external dependencies — can start immediately (parallel with Phase 1)
- **Phase 3 (US1)**: Depends on Phase 2 (T002 must exist before T006 can use `settings.login_max_failures`)
- **Phase 4 (US2)**: Depends on Phase 3 (tests verify behaviour implemented in T007)
- **Phase 5 (Polish)**: Depends on all prior phases
### Within Phase 3
- T003 ∥ T004 (different files, no dependency — write tests in parallel)
- T005 after T003, T004 (implement after tests confirm they fail)
- T006 ∥ T007 after T005 (both import from `rate_limiter.py`; write to different files — `main.py` and `auth.py`; T006 sets `app.state.login_trusted_networks` which T007's router reads)
- T008 after T005, T006, T007 (verify all pass)
### Execution Order Summary
```
Step 1: T001 ∥ T002 (setup + foundational — parallel, different files)
Step 2: T003 ∥ T004 (write failing tests — parallel)
Step 3: T005 (implement LoginRateLimiter — after red tests confirmed)
Step 4: T006 ∥ T007 (wire limiter into app — parallel, different files)
Step 5: T008 (verify green)
Step 6: T009 (US2 observability tests — verify green immediately)
Step 7: T010 (ruff clean)
```
---
## Implementation Strategy
### MVP (US1 — the blocker)
1. Complete T001T002 (config setup)
2. Complete T003T008 (core blocking)
3. **Validate**: Run `make test-integration` — all 88 existing tests still pass; 2 new rate-limit tests pass
4. US2 adds verification coverage for already-implemented observability features
### Incremental Delivery
- After Phase 3: Brute-force attacks on the login endpoint are blocked — core security net is in place
- After Phase 4: Observability properties are explicitly tested — regressions in headers/logs will be caught
- After Phase 5: Lint clean, ready for merge