import os import pytest from httpx import AsyncClient from app.auth.rate_limiter import LoginRateLimiter from app.main import app BAD_CREDS = {"username": "attacker", "password": "wrong"} VALID_CREDS = { "username": os.environ.get("OWNER_USERNAME", "testowner"), "password": os.environ.get("OWNER_PASSWORD", "testpassword"), } def _fresh_limiter(): return LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=30) @pytest.mark.asyncio async def test_repeated_failures_trigger_429(client: AsyncClient): original_limiter = app.state.login_rate_limiter original_networks = app.state.login_trusted_networks app.state.login_rate_limiter = _fresh_limiter() app.state.login_trusted_networks = [] try: for _ in range(3): await client.post("/api/v1/auth/token", json=BAD_CREDS) resp = await client.post("/api/v1/auth/token", json=BAD_CREDS) assert resp.status_code == 429 assert resp.json()["code"] == "login_rate_limited" finally: app.state.login_rate_limiter = original_limiter app.state.login_trusted_networks = original_networks @pytest.mark.asyncio async def test_success_resets_counter(client: AsyncClient): original_limiter = app.state.login_rate_limiter original_networks = app.state.login_trusted_networks app.state.login_rate_limiter = _fresh_limiter() app.state.login_trusted_networks = [] try: for _ in range(2): await client.post("/api/v1/auth/token", json=BAD_CREDS) await client.post("/api/v1/auth/token", json=VALID_CREDS) for _ in range(3): resp = await client.post("/api/v1/auth/token", json=BAD_CREDS) assert resp.status_code == 401, "counter should have reset after success" finally: app.state.login_rate_limiter = original_limiter app.state.login_trusted_networks = original_networks @pytest.mark.asyncio async def test_429_has_retry_after_header(client: AsyncClient): original_limiter = app.state.login_rate_limiter original_networks = app.state.login_trusted_networks app.state.login_rate_limiter = _fresh_limiter() app.state.login_trusted_networks = [] try: for _ in range(3): await client.post("/api/v1/auth/token", json=BAD_CREDS) resp = await client.post("/api/v1/auth/token", json=BAD_CREDS) assert resp.status_code == 429 assert "Retry-After" in resp.headers assert int(resp.headers["Retry-After"]) > 0 finally: app.state.login_rate_limiter = original_limiter app.state.login_trusted_networks = original_networks @pytest.mark.asyncio async def test_429_body_shape(client: AsyncClient): original_limiter = app.state.login_rate_limiter original_networks = app.state.login_trusted_networks app.state.login_rate_limiter = _fresh_limiter() app.state.login_trusted_networks = [] try: for _ in range(3): await client.post("/api/v1/auth/token", json=BAD_CREDS) resp = await client.post("/api/v1/auth/token", json=BAD_CREDS) assert resp.status_code == 429 assert resp.json() == { "detail": "Too many failed login attempts. Please try again later.", "code": "login_rate_limited", } finally: app.state.login_rate_limiter = original_limiter app.state.login_trusted_networks = original_networks @pytest.mark.asyncio async def test_xff_header_ignored_when_no_trusted_networks(client: AsyncClient): original_limiter = app.state.login_rate_limiter original_networks = app.state.login_trusted_networks app.state.login_rate_limiter = _fresh_limiter() app.state.login_trusted_networks = [] try: # Send 3 failures all claiming to be "1.2.3.4" via XFF for _ in range(3): await client.post( "/api/v1/auth/token", json=BAD_CREDS, headers={"X-Forwarded-For": "1.2.3.4"}, ) # 4th request with a *different* XFF — if XFF were trusted, this # would appear to be a fresh IP and get 401. Since XFF is ignored, # the real peer ("testclient") is blocked and we get 429. resp = await client.post( "/api/v1/auth/token", json=BAD_CREDS, headers={"X-Forwarded-For": "9.9.9.9"}, ) assert resp.status_code == 429, ( "XFF should be ignored when no trusted networks are configured; " "expected real peer to be blocked" ) finally: app.state.login_rate_limiter = original_limiter app.state.login_trusted_networks = original_networks