Feat: Implement JWT bearer token authentication

Protects image upload, delete, and tag-update endpoints behind
Bearer token auth. Public read endpoints remain open. Angular SPA
gains a login page, auth interceptor, and route guard for /upload.

- JWTAuthProvider (HS256, sub/iat/exp, secrets.compare_digest)
- POST /api/v1/auth/token login endpoint
- require_auth FastAPI dependency on all write routes
- AuthService, LoginComponent, authInterceptor, authGuard
- Detail page hides write controls for unauthenticated visitors
- 43 unit tests passing; integration tests require Docker stack

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-03 19:12:38 +00:00
parent d91a65abe5
commit 5fbbc1e67f
36 changed files with 3998 additions and 42 deletions

View File

@@ -1,12 +1,26 @@
import os
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
# Provide required settings for the test environment before any app imports resolve them
os.environ.setdefault("JWT_SECRET_KEY", "test-secret-key-for-testing-only")
os.environ.setdefault("OWNER_USERNAME", "testowner")
os.environ.setdefault("OWNER_PASSWORD", "testpassword")
from app.main import app
from app.config import get_settings
from app.database import Base
from app.dependencies import get_db, get_storage, get_auth
from app.auth.jwt_provider import JWTAuthProvider
# Bust the LRU cache so get_settings() picks up the env vars set above
get_settings.cache_clear()
_TEST_JWT_SECRET = os.environ["JWT_SECRET_KEY"]
_TEST_OWNER_USERNAME = os.environ["OWNER_USERNAME"]
_TEST_OWNER_PASSWORD = os.environ["OWNER_PASSWORD"]
@pytest_asyncio.fixture(scope="session", loop_scope="session")
@@ -57,3 +71,40 @@ async def client(db_session):
yield c
app.dependency_overrides.clear()
@pytest_asyncio.fixture
async def jwt_auth_provider() -> JWTAuthProvider:
return JWTAuthProvider(
secret_key=_TEST_JWT_SECRET,
expiry_seconds=3600,
owner_username=_TEST_OWNER_USERNAME,
owner_password=_TEST_OWNER_PASSWORD,
)
@pytest_asyncio.fixture
async def authed_client(db_session, jwt_auth_provider):
from app.storage.s3_backend import S3StorageBackend
storage = S3StorageBackend()
auth = jwt_auth_provider
async def override_db():
yield db_session
def override_storage():
return storage
def override_auth():
return auth
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_storage] = override_storage
app.dependency_overrides[get_auth] = override_auth
valid_token = auth.create_token()
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c, valid_token
app.dependency_overrides.clear()

View File

@@ -0,0 +1,51 @@
import pytest
_VALID_CREDS = {"username": "testowner", "password": "testpassword"}
@pytest.mark.asyncio
async def test_login_success(authed_client):
client, _ = authed_client
response = await client.post("/api/v1/auth/token", json=_VALID_CREDS)
assert response.status_code == 200
body = response.json()
assert isinstance(body.get("access_token"), str)
assert len(body["access_token"]) > 0
assert body.get("token_type") == "bearer"
assert body.get("expires_in", 0) > 0
@pytest.mark.asyncio
async def test_login_wrong_password(authed_client):
client, _ = authed_client
response = await client.post(
"/api/v1/auth/token",
json={"username": "testowner", "password": "wrongpassword"},
)
assert response.status_code == 401
assert response.json().get("code") == "invalid_credentials"
@pytest.mark.asyncio
async def test_login_wrong_username(authed_client):
client, _ = authed_client
response = await client.post(
"/api/v1/auth/token",
json={"username": "notowner", "password": "testpassword"},
)
assert response.status_code == 401
assert response.json().get("code") == "invalid_credentials"
@pytest.mark.asyncio
async def test_login_missing_password(authed_client):
client, _ = authed_client
response = await client.post("/api/v1/auth/token", json={"username": "testowner"})
assert response.status_code == 422
@pytest.mark.asyncio
async def test_login_missing_username(authed_client):
client, _ = authed_client
response = await client.post("/api/v1/auth/token", json={"password": "testpassword"})
assert response.status_code == 422

View File

@@ -0,0 +1,95 @@
"""
Tests that write endpoints require authentication (US2).
These use the authed_client fixture which wires JWTAuthProvider.
"""
import io
import uuid
import pytest
def _minimal_jpeg() -> bytes:
return (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x02"
b"\xff\xd9"
)
@pytest.mark.asyncio
async def test_upload_without_token_returns_401(authed_client):
client, _ = authed_client
data = _minimal_jpeg()
response = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
)
assert response.status_code == 401
assert response.json().get("code") == "unauthorized"
@pytest.mark.asyncio
async def test_upload_with_valid_token_succeeds(authed_client):
client, token = authed_client
data = _minimal_jpeg()
response = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code in (200, 201)
@pytest.mark.asyncio
async def test_delete_without_token_returns_401(authed_client):
client, _ = authed_client
fake_id = uuid.uuid4()
response = await client.delete(f"/api/v1/images/{fake_id}")
assert response.status_code == 401
assert response.json().get("code") == "unauthorized"
@pytest.mark.asyncio
async def test_delete_with_valid_token_succeeds(authed_client):
client, token = authed_client
data = _minimal_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("del-protected.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.delete(
f"/api/v1/images/{image_id}",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 204
@pytest.mark.asyncio
async def test_patch_tags_without_token_returns_401(authed_client):
client, _ = authed_client
fake_id = uuid.uuid4()
response = await client.patch(
f"/api/v1/images/{fake_id}/tags",
json={"tags": ["a"]},
)
assert response.status_code == 401
assert response.json().get("code") == "unauthorized"
@pytest.mark.asyncio
async def test_patch_tags_with_valid_token_succeeds(authed_client):
client, token = authed_client
data = _minimal_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("tag-protected.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.patch(
f"/api/v1/images/{image_id}/tags",
json={"tags": ["protected-tag"]},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200

View File

@@ -0,0 +1,71 @@
"""
US3 regression tests: all read endpoints must remain accessible without a token
even after require_auth is applied to write endpoints.
"""
import io
import uuid
import pytest
def _minimal_jpeg() -> bytes:
return (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x03"
b"\xff\xd9"
)
@pytest.mark.asyncio
async def test_list_images_without_token_is_200(authed_client):
client, _ = authed_client
response = await client.get("/api/v1/images")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_get_image_without_token_is_200(authed_client):
client, token = authed_client
data = _minimal_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("pub-test.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.get(f"/api/v1/images/{image_id}")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_serve_file_without_token_is_200(authed_client):
client, token = authed_client
data = _minimal_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("pub-file.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.get(f"/api/v1/images/{image_id}/file")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_serve_thumbnail_without_token_is_200(authed_client):
client, token = authed_client
data = _minimal_jpeg()
upload = await client.post(
"/api/v1/images",
files={"file": ("pub-thumb.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.get(f"/api/v1/images/{image_id}/thumbnail")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_list_tags_without_token_is_200(authed_client):
client, _ = authed_client
response = await client.get("/api/v1/tags")
assert response.status_code == 200

View File

@@ -2,14 +2,27 @@ import os
import pytest
_BASE_ENV = {
"DATABASE_URL": "postgresql+asyncpg://u:p@localhost/db",
"S3_ENDPOINT_URL": "http://localhost:9000",
"S3_BUCKET_NAME": "test-bucket",
"S3_ACCESS_KEY_ID": "key",
"S3_SECRET_ACCESS_KEY": "secret",
"S3_REGION": "us-east-1",
"API_BASE_URL": "http://localhost:8000",
"JWT_SECRET_KEY": "test-secret",
"OWNER_USERNAME": "admin",
"OWNER_PASSWORD": "password",
}
def _apply_env(monkeypatch, extra=None):
for k, v in {**_BASE_ENV, **(extra or {})}.items():
monkeypatch.setenv(k, v)
def test_settings_load_from_env(monkeypatch):
monkeypatch.setenv("DATABASE_URL", "postgresql+asyncpg://u:p@localhost/db")
monkeypatch.setenv("S3_ENDPOINT_URL", "http://localhost:9000")
monkeypatch.setenv("S3_BUCKET_NAME", "test-bucket")
monkeypatch.setenv("S3_ACCESS_KEY_ID", "key")
monkeypatch.setenv("S3_SECRET_ACCESS_KEY", "secret")
monkeypatch.setenv("S3_REGION", "us-east-1")
monkeypatch.setenv("API_BASE_URL", "http://localhost:8000")
_apply_env(monkeypatch)
# Import inside test to pick up monkeypatched env
import importlib
@@ -20,17 +33,13 @@ def test_settings_load_from_env(monkeypatch):
assert s.database_url == "postgresql+asyncpg://u:p@localhost/db"
assert s.s3_bucket_name == "test-bucket"
assert s.max_upload_bytes == 52428800 # default
assert s.jwt_secret_key == "test-secret"
assert s.jwt_expiry_seconds == 86400 # default
assert s.owner_username == "admin"
def test_settings_max_upload_bytes_override(monkeypatch):
monkeypatch.setenv("DATABASE_URL", "postgresql+asyncpg://u:p@localhost/db")
monkeypatch.setenv("S3_ENDPOINT_URL", "http://localhost:9000")
monkeypatch.setenv("S3_BUCKET_NAME", "test-bucket")
monkeypatch.setenv("S3_ACCESS_KEY_ID", "key")
monkeypatch.setenv("S3_SECRET_ACCESS_KEY", "secret")
monkeypatch.setenv("S3_REGION", "us-east-1")
monkeypatch.setenv("API_BASE_URL", "http://localhost:8000")
monkeypatch.setenv("MAX_UPLOAD_BYTES", "10485760")
_apply_env(monkeypatch, {"MAX_UPLOAD_BYTES": "10485760"})
import importlib
import app.config as config_module
@@ -38,3 +47,14 @@ def test_settings_max_upload_bytes_override(monkeypatch):
s = config_module.Settings()
assert s.max_upload_bytes == 10485760
def test_settings_jwt_expiry_override(monkeypatch):
_apply_env(monkeypatch, {"JWT_EXPIRY_SECONDS": "3600"})
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
assert s.jwt_expiry_seconds == 3600

View File

@@ -0,0 +1,97 @@
import time
import pytest
import jwt as pyjwt
from fastapi import HTTPException
from app.auth.jwt_provider import JWTAuthProvider
SECRET = "test-secret-key"
USERNAME = "owner"
PASSWORD = "hunter2"
def make_provider(**kwargs) -> JWTAuthProvider:
defaults = dict(
secret_key=SECRET,
expiry_seconds=3600,
owner_username=USERNAME,
owner_password=PASSWORD,
)
return JWTAuthProvider(**{**defaults, **kwargs})
def test_create_token_is_valid_jwt():
provider = make_provider()
token = provider.create_token()
payload = pyjwt.decode(token, SECRET, algorithms=["HS256"])
assert payload["sub"] == "owner"
assert "iat" in payload
assert "exp" in payload
@pytest.mark.asyncio
async def test_get_identity_returns_owner():
provider = make_provider()
token = provider.create_token()
identity = await provider.get_identity(f"Bearer {token}")
assert identity.id == "owner"
assert identity.anonymous is False
@pytest.mark.asyncio
async def test_get_identity_raises_on_expired_token():
provider = make_provider(expiry_seconds=-1)
token = provider.create_token()
with pytest.raises(HTTPException) as exc_info:
await provider.get_identity(f"Bearer {token}")
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_get_identity_raises_on_wrong_key():
provider = make_provider()
other = make_provider(secret_key="different-secret")
token = other.create_token()
with pytest.raises(HTTPException) as exc_info:
await provider.get_identity(f"Bearer {token}")
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_get_identity_raises_on_garbage():
provider = make_provider()
with pytest.raises(HTTPException) as exc_info:
await provider.get_identity("Bearer not.a.real.token")
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_get_identity_raises_on_missing_header():
provider = make_provider()
with pytest.raises(HTTPException) as exc_info:
await provider.get_identity(None)
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_get_identity_raises_on_missing_bearer_prefix():
provider = make_provider()
token = provider.create_token()
with pytest.raises(HTTPException) as exc_info:
await provider.get_identity(token)
assert exc_info.value.status_code == 401
def test_verify_credentials_true():
provider = make_provider()
assert provider.verify_credentials(USERNAME, PASSWORD) is True
def test_verify_credentials_false_wrong_password():
provider = make_provider()
assert provider.verify_credentials(USERNAME, "wrongpassword") is False
def test_verify_credentials_false_wrong_username():
provider = make_provider()
assert provider.verify_credentials("notowner", PASSWORD) is False