Feat: Replace UUID image identifiers with 8-character base62 short IDs

Short IDs become the canonical identifier in URLs (/i/:short_id),
MinIO/R2 storage keys, and all API responses. Hash-based deduplication
is preserved. Includes two-phase Alembic migration (003 adds nullable
column, 004 enforces NOT NULL) with a backfill script to copy storage
objects and populate short_id for existing images.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-10 00:13:55 +00:00
parent 87eb2703f5
commit 61d923d5be
41 changed files with 1445 additions and 137 deletions

View File

@@ -0,0 +1,24 @@
"""add short_id column to images
Revision ID: 003
Revises: 002
Create Date: 2026-05-09
"""
from alembic import op
import sqlalchemy as sa
revision = "003"
down_revision = "002"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column("images", sa.Column("short_id", sa.String(8), nullable=True))
op.create_index("ix_images_short_id", "images", ["short_id"], unique=True)
def downgrade() -> None:
op.drop_index("ix_images_short_id", table_name="images")
op.drop_column("images", "short_id")

View File

@@ -0,0 +1,24 @@
"""set short_id NOT NULL on images
Revision ID: 004
Revises: 003
Create Date: 2026-05-09
IMPORTANT: Run migrate_to_short_ids.py script BEFORE applying this migration.
This migration will fail if any rows still have short_id IS NULL.
"""
from alembic import op
revision = "004"
down_revision = "003"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.alter_column("images", "short_id", nullable=False)
def downgrade() -> None:
op.alter_column("images", "short_id", nullable=True)

View File

@@ -92,9 +92,7 @@ class LoginRateLimiter:
rec.failures += 1
if rec.failures >= self._max:
rec.blocked_until = now + self._cooldown
logger.warning(
"Login blocked for %s after %d failures", ip, rec.failures
)
logger.warning("Login blocked for %s after %d failures", ip, rec.failures)
def record_success(self, ip: str) -> None:
with self._lock:

View File

@@ -22,6 +22,7 @@ class Image(Base):
size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False)
width: Mapped[int] = mapped_column(Integer, nullable=False)
height: Mapped[int] = mapped_column(Integer, nullable=False)
short_id: Mapped[str | None] = mapped_column(String(8), unique=True, nullable=True, index=True)
storage_key: Mapped[str] = mapped_column(String(64), nullable=False)
thumbnail_key: Mapped[str | None] = mapped_column(String(70), nullable=True, default=None)
created_at: Mapped[datetime] = mapped_column(

View File

@@ -27,6 +27,14 @@ class ImageRepository:
)
return result.scalar_one_or_none()
async def get_by_short_id(self, short_id: str) -> Image | None:
result = await self._session.execute(
select(Image)
.where(Image.short_id == short_id)
.options(selectinload(Image.image_tags).selectinload(ImageTag.tag))
)
return result.scalar_one_or_none()
async def create(
self,
*,
@@ -37,6 +45,7 @@ class ImageRepository:
width: int,
height: int,
storage_key: str,
short_id: str,
thumbnail_key: str | None = None,
) -> Image:
image = Image(
@@ -47,6 +56,7 @@ class ImageRepository:
width=width,
height=height,
storage_key=storage_key,
short_id=short_id,
thumbnail_key=thumbnail_key,
)
self._session.add(image)

View File

@@ -48,9 +48,7 @@ class TagRepository:
for name in tag_names:
tag = await self.upsert_by_name(name)
existing = await self._session.execute(
select(ImageTag).where(
ImageTag.image_id == image.id, ImageTag.tag_id == tag.id
)
select(ImageTag).where(ImageTag.image_id == image.id, ImageTag.tag_id == tag.id)
)
if existing.scalar_one_or_none() is None:
self._session.add(ImageTag(image_id=image.id, tag_id=tag.id))
@@ -102,7 +100,6 @@ class TagRepository:
rows = await self._session.execute(paginated)
items = [
{"id": str(tag.id), "name": tag.name, "image_count": count}
for tag, count in rows.all()
{"id": str(tag.id), "name": tag.name, "image_count": count} for tag, count in rows.all()
]
return items, total

View File

@@ -1,7 +1,7 @@
import asyncio
import logging
import re
import struct
import uuid
from typing import Any
from fastapi import APIRouter, Depends, File, Form, HTTPException, Response, UploadFile
@@ -15,7 +15,7 @@ from app.repositories.image_repo import ImageRepository
from app.repositories.tag_repo import TagRepository
from app.storage.backend import StorageBackend
from app.thumbnail import generate_thumbnail
from app.utils import compute_sha256
from app.utils import compute_sha256, generate_short_id
from app.validation import FileSizeError, MimeTypeError, validate_file_size, validate_mime_type
logger = logging.getLogger(__name__)
@@ -23,22 +23,35 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=["images"])
_SHORT_ID_RE = re.compile(r"^[a-zA-Z0-9]{8}$")
def _error(detail: str, code: str, status: int):
raise HTTPException(status_code=status, detail={"detail": detail, "code": code})
def _validate_short_id(short_id: str) -> str:
if not _SHORT_ID_RE.match(short_id):
raise HTTPException(
status_code=422,
detail={"detail": "Invalid image ID", "code": "invalid_short_id"},
)
return short_id
def _image_to_dict(
image: Image, *, cdn_base: str | None = None, duplicate: bool | None = None
) -> dict[str, Any]:
_base = cdn_base.strip().rstrip("/") if cdn_base else None
file_url = f"{_base}/{image.storage_key}" if _base else f"/api/v1/images/{image.id}/file"
file_url = f"{_base}/{image.storage_key}" if _base else f"/api/v1/i/{image.short_id}/file"
thumbnail_url = (
(f"{_base}/{image.thumbnail_key}" if _base else f"/api/v1/images/{image.id}/thumbnail")
(f"{_base}/{image.thumbnail_key}" if _base else f"/api/v1/i/{image.short_id}/thumbnail")
if image.thumbnail_key
else None
)
data: dict[str, Any] = {
"id": str(image.id),
"short_id": image.short_id,
"hash": image.hash,
"filename": image.filename,
"mime_type": image.mime_type,
@@ -169,29 +182,49 @@ async def upload_image(
)
width, height = _read_image_dimensions(data, mime_type)
await storage.put(hash_hex, data, mime_type)
thumbnail_key: str | None = None
try:
thumb_bytes = await asyncio.to_thread(generate_thumbnail, data, mime_type)
await storage.put(f"{hash_hex}-thumb", thumb_bytes, "image/webp")
thumbnail_key = f"{hash_hex}-thumb"
except Exception:
logger.warning(
"Thumbnail generation failed for %s; upload will proceed without thumbnail", hash_hex
from sqlalchemy.exc import IntegrityError
for _ in range(10):
short_id = generate_short_id()
await storage.put(short_id, data, mime_type)
thumbnail_key: str | None = None
try:
thumb_bytes = await asyncio.to_thread(generate_thumbnail, data, mime_type)
await storage.put(f"{short_id}-thumb", thumb_bytes, "image/webp")
thumbnail_key = f"{short_id}-thumb"
except Exception:
logger.warning(
"Thumbnail generation failed for %s; proceeding without thumbnail", short_id
)
try:
image = await image_repo.create(
hash_hex=hash_hex,
filename=file.filename or "upload",
mime_type=mime_type,
size_bytes=len(data),
width=width,
height=height,
storage_key=short_id,
short_id=short_id,
thumbnail_key=thumbnail_key,
)
break
except IntegrityError:
await db.rollback()
await storage.delete(short_id)
if thumbnail_key:
await storage.delete(thumbnail_key)
thumbnail_key = None
continue
else:
raise HTTPException(
status_code=500,
detail={"detail": "Failed to assign unique ID", "code": "id_collision"},
)
image = await image_repo.create(
hash_hex=hash_hex,
filename=file.filename or "upload",
mime_type=mime_type,
size_bytes=len(data),
width=width,
height=height,
storage_key=hash_hex,
thumbnail_key=thumbnail_key,
)
if tag_names:
tag_repo = TagRepository(db)
await tag_repo.attach_tags(image, tag_names)
@@ -221,15 +254,16 @@ async def list_images(
}
@router.get("/images/{image_id}")
@router.get("/i/{short_id}")
async def get_image(
image_id: uuid.UUID,
short_id: str,
db: AsyncSession = Depends(get_db),
settings=Depends(get_settings),
):
_validate_short_id(short_id)
_cdn_base = settings.s3_public_base_url
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
image = await image_repo.get_by_short_id(short_id)
if not image:
raise HTTPException(
status_code=404,
@@ -238,14 +272,15 @@ async def get_image(
return _image_to_dict(image, cdn_base=_cdn_base)
@router.get("/images/{image_id}/file")
@router.get("/i/{short_id}/file")
async def serve_image_file(
image_id: uuid.UUID,
short_id: str,
db: AsyncSession = Depends(get_db),
storage: StorageBackend = Depends(get_storage),
):
_validate_short_id(short_id)
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
image = await image_repo.get_by_short_id(short_id)
if not image:
raise HTTPException(
status_code=404,
@@ -268,14 +303,15 @@ async def serve_image_file(
)
@router.get("/images/{image_id}/thumbnail")
@router.get("/i/{short_id}/thumbnail")
async def serve_image_thumbnail(
image_id: uuid.UUID,
short_id: str,
db: AsyncSession = Depends(get_db),
storage: StorageBackend = Depends(get_storage),
):
_validate_short_id(short_id)
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
image = await image_repo.get_by_short_id(short_id)
if not image:
raise HTTPException(
status_code=404,
@@ -300,17 +336,18 @@ async def serve_image_thumbnail(
)
@router.patch("/images/{image_id}/tags")
@router.patch("/i/{short_id}/tags")
async def update_image_tags(
image_id: uuid.UUID,
short_id: str,
body: dict,
db: AsyncSession = Depends(get_db),
_: Identity = Depends(require_auth),
settings=Depends(get_settings),
):
_validate_short_id(short_id)
_cdn_base = settings.s3_public_base_url
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
image = await image_repo.get_by_short_id(short_id)
if not image:
raise HTTPException(
status_code=404,
@@ -332,15 +369,16 @@ async def update_image_tags(
return _image_to_dict(image, cdn_base=_cdn_base)
@router.delete("/images/{image_id}", status_code=204)
@router.delete("/i/{short_id}", status_code=204)
async def delete_image(
image_id: uuid.UUID,
short_id: str,
db: AsyncSession = Depends(get_db),
storage: StorageBackend = Depends(get_storage),
_: Identity = Depends(require_auth),
):
_validate_short_id(short_id)
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
image = await image_repo.get_by_short_id(short_id)
if not image:
raise HTTPException(
status_code=404,

View File

@@ -1,5 +1,13 @@
import hashlib
import secrets
import string
BASE62 = string.ascii_letters + string.digits
def compute_sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def generate_short_id(length: int = 8) -> str:
return "".join(secrets.choice(BASE62) for _ in range(length))

0
api/scripts/__init__.py Normal file
View File

View File

@@ -0,0 +1,107 @@
"""
Migrate existing images to use short_id-based storage keys.
Run after applying Alembic migration 003 (adds short_id column).
Run before applying migration 004 (sets short_id NOT NULL).
Usage:
python -m scripts.migrate_to_short_ids
"""
import asyncio
import logging
from typing import Any
from sqlalchemy import select
from app.database import get_session_factory
from app.models import Image
from app.storage.s3_backend import S3StorageBackend
from app.utils import generate_short_id
logger = logging.getLogger(__name__)
async def migrate_image(image: Any, storage: Any, session: Any) -> bool:
"""Migrate one image to a short_id-based key. Returns True if migrated, False if skipped."""
if image.short_id is not None:
return False
new_short_id = generate_short_id()
old_key = image.storage_key
old_thumb_key = image.thumbnail_key
try:
data = await storage.get(old_key)
await storage.put(new_short_id, data, image.mime_type)
# Verify copy succeeded
await storage.get(new_short_id)
except Exception as exc:
logger.error("Failed to copy storage object for image %s: %s", image.id, exc)
return False
new_thumb_key: str | None = None
if old_thumb_key:
try:
thumb_data = await storage.get(old_thumb_key)
new_thumb_key = f"{new_short_id}-thumb"
await storage.put(new_thumb_key, thumb_data, "image/webp")
await storage.get(new_thumb_key)
except Exception as exc:
logger.warning("Failed to copy thumbnail for image %s: %s", image.id, exc)
new_thumb_key = None
try:
image.short_id = new_short_id
image.storage_key = new_short_id
image.thumbnail_key = new_thumb_key
await session.flush()
await storage.delete(old_key)
if old_thumb_key and new_thumb_key:
await storage.delete(old_thumb_key)
except Exception as exc:
logger.error("Failed to update DB record for image %s: %s", image.id, exc)
return False
return True
async def run_migration(images: list, storage: Any, session: Any) -> tuple[int, int, int]:
"""Process a list of images. Returns (migrated, skipped, failed) counts."""
migrated = skipped = failed = 0
for image in images:
if image.short_id is not None:
skipped += 1
continue
try:
success = await migrate_image(image, storage, session)
if success:
migrated += 1
else:
failed += 1
except Exception as exc:
logger.error("Unexpected error migrating image %s: %s", image.id, exc)
failed += 1
return migrated, skipped, failed
async def main() -> None:
logging.basicConfig(level=logging.INFO)
storage = S3StorageBackend()
async with get_session_factory()() as session:
result = await session.execute(select(Image).where(Image.short_id.is_(None)))
images = list(result.scalars().all())
logger.info("Found %d images to migrate", len(images))
migrated, skipped, failed = await run_migration(images, storage, session)
await session.commit()
print(f"Migrated: {migrated}, Skipped: {skipped}, Failed: {failed}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,10 +1,9 @@
"""
T065 — DELETE /api/v1/images/{id} → 204; subsequent GET returns 404
T065 — DELETE /api/v1/i/{short_id} → 204; subsequent GET returns 404
T066 — DELETE verifies MinIO object is removed
T067 — DELETE of unknown ID → 404 image_not_found
"""
import io
import uuid
import pytest
from PIL import Image as PILImage
@@ -28,12 +27,12 @@ async def test_delete_removes_record(authed_client):
files={"file": ("del-test.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
delete_resp = await client.delete(f"/api/v1/images/{image_id}", headers=headers)
delete_resp = await client.delete(f"/api/v1/i/{image_id}", headers=headers)
assert delete_resp.status_code == 204
get_resp = await client.get(f"/api/v1/images/{image_id}")
get_resp = await client.get(f"/api/v1/i/{image_id}")
assert get_resp.status_code == 404
assert get_resp.json()["code"] == "image_not_found"
@@ -49,13 +48,13 @@ async def test_delete_removes_storage_object(authed_client):
headers=headers,
)
assert upload.status_code in (200, 201)
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
delete_resp = await client.delete(f"/api/v1/images/{image_id}", headers=headers)
delete_resp = await client.delete(f"/api/v1/i/{image_id}", headers=headers)
assert delete_resp.status_code == 204
# Confirm storage redirect no longer works (404 since record is gone)
file_resp = await client.get(f"/api/v1/images/{image_id}/file")
file_resp = await client.get(f"/api/v1/i/{image_id}/file")
assert file_resp.status_code == 404
@@ -63,7 +62,7 @@ async def test_delete_removes_storage_object(authed_client):
async def test_delete_unknown_id_returns_404(authed_client):
client, token = authed_client
response = await client.delete(
f"/api/v1/images/{uuid.uuid4()}",
"/api/v1/i/NotFound",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 404
@@ -85,12 +84,12 @@ async def test_delete_removes_thumbnail(authed_client):
headers=headers,
)
assert upload.status_code == 201
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
assert upload.json()["thumbnail_key"] is not None
delete_resp = await client.delete(f"/api/v1/images/{image_id}", headers=headers)
delete_resp = await client.delete(f"/api/v1/i/{image_id}", headers=headers)
assert delete_resp.status_code == 204
thumb_resp = await client.get(f"/api/v1/images/{image_id}/thumbnail")
thumb_resp = await client.get(f"/api/v1/i/{image_id}/thumbnail")
assert thumb_resp.status_code == 404
assert thumb_resp.json()["code"] == "image_not_found"

View File

@@ -3,7 +3,6 @@ Tests that write endpoints require authentication (US2).
These use the authed_client fixture which wires JWTAuthProvider.
"""
import io
import uuid
import pytest
@@ -42,8 +41,7 @@ async def test_upload_with_valid_token_succeeds(authed_client):
@pytest.mark.asyncio
async def test_delete_without_token_returns_401(authed_client):
client, _ = authed_client
fake_id = uuid.uuid4()
response = await client.delete(f"/api/v1/images/{fake_id}")
response = await client.delete("/api/v1/i/NotFound")
assert response.status_code == 401
assert response.json().get("code") == "unauthorized"
@@ -57,9 +55,9 @@ async def test_delete_with_valid_token_succeeds(authed_client):
files={"file": ("del-protected.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
response = await client.delete(
f"/api/v1/images/{image_id}",
f"/api/v1/i/{image_id}",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 204
@@ -68,9 +66,8 @@ async def test_delete_with_valid_token_succeeds(authed_client):
@pytest.mark.asyncio
async def test_patch_tags_without_token_returns_401(authed_client):
client, _ = authed_client
fake_id = uuid.uuid4()
response = await client.patch(
f"/api/v1/images/{fake_id}/tags",
"/api/v1/i/NotFound/tags",
json={"tags": ["a"]},
)
assert response.status_code == 401
@@ -86,9 +83,9 @@ async def test_patch_tags_with_valid_token_succeeds(authed_client):
files={"file": ("tag-protected.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
response = await client.patch(
f"/api/v1/images/{image_id}/tags",
f"/api/v1/i/{image_id}/tags",
json={"tags": ["protected-tag"]},
headers={"Authorization": f"Bearer {token}"},
)

View File

@@ -30,8 +30,8 @@ async def test_get_image_without_token_is_200(authed_client):
files={"file": ("pub-test.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.get(f"/api/v1/images/{image_id}")
image_id = upload.json()["short_id"]
response = await client.get(f"/api/v1/i/{image_id}")
assert response.status_code == 200
@@ -44,8 +44,8 @@ async def test_serve_file_without_token_is_200(authed_client):
files={"file": ("pub-file.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.get(f"/api/v1/images/{image_id}/file")
image_id = upload.json()["short_id"]
response = await client.get(f"/api/v1/i/{image_id}/file")
assert response.status_code == 200
@@ -58,8 +58,8 @@ async def test_serve_thumbnail_without_token_is_200(authed_client):
files={"file": ("pub-thumb.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
image_id = upload.json()["id"]
response = await client.get(f"/api/v1/images/{image_id}/thumbnail")
image_id = upload.json()["short_id"]
response = await client.get(f"/api/v1/i/{image_id}/thumbnail")
assert response.status_code == 200

View File

@@ -1,10 +1,9 @@
"""
T055 — GET /api/v1/images/{id}/file → 200 with binary content, ETag, Cache-Control
T055 — GET /api/v1/i/{short_id}/file → 200 with binary content, ETag, Cache-Control
T056 — /file for unknown ID → 404 image_not_found
T057 — /file response exposes no storage-specific details
"""
import io
import uuid
import pytest
from PIL import Image as PILImage
@@ -39,10 +38,10 @@ async def test_file_returns_200_with_content(authed_client):
)
assert upload.status_code in (200, 201)
upload_body = upload.json()
image_id = upload_body["id"]
image_id = upload_body["short_id"]
image_hash = upload_body["hash"]
response = await client.get(f"/api/v1/images/{image_id}/file")
response = await client.get(f"/api/v1/i/{image_id}/file")
assert response.status_code == 200
assert response.headers["content-type"].startswith("image/")
assert response.headers["etag"] == f'"{image_hash}"'
@@ -52,7 +51,7 @@ async def test_file_returns_200_with_content(authed_client):
@pytest.mark.asyncio
async def test_file_unknown_id_returns_404(client):
response = await client.get(f"/api/v1/images/{uuid.uuid4()}/file")
response = await client.get("/api/v1/i/NotFound/file")
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"
@@ -68,9 +67,9 @@ async def test_file_response_exposes_no_storage_details(authed_client):
headers={"Authorization": f"Bearer {token}"},
)
assert upload.status_code in (200, 201)
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
response = await client.get(f"/api/v1/images/{image_id}/file")
response = await client.get(f"/api/v1/i/{image_id}/file")
assert response.status_code == 200
assert "location" not in response.headers
assert "minio" not in response.text.lower()
@@ -89,10 +88,10 @@ async def test_thumbnail_returns_webp(authed_client):
)
assert upload.status_code == 201
body = upload.json()
image_id = body["id"]
image_id = body["short_id"]
image_hash = body["hash"]
response = await client.get(f"/api/v1/images/{image_id}/thumbnail")
response = await client.get(f"/api/v1/i/{image_id}/thumbnail")
assert response.status_code == 200
assert response.headers["content-type"] == "image/webp"
assert response.headers["etag"] == f'"{image_hash}"'
@@ -110,15 +109,15 @@ async def test_thumbnail_fallback_returns_original(authed_client, db_session):
headers={"Authorization": f"Bearer {token}"},
)
assert upload.status_code == 201
image_id = upload.json()["id"]
image_id = upload.json()["short_id"]
await db_session.execute(
update(Image).where(Image.id == uuid.UUID(image_id)).values(thumbnail_key=None)
update(Image).where(Image.short_id == image_id).values(thumbnail_key=None)
)
await db_session.flush()
db_session.expire_all()
response = await client.get(f"/api/v1/images/{image_id}/thumbnail")
response = await client.get(f"/api/v1/i/{image_id}/thumbnail")
assert response.status_code == 200
assert "image/jpeg" in response.headers["content-type"]
assert len(response.content) > 0
@@ -126,7 +125,7 @@ async def test_thumbnail_fallback_returns_original(authed_client, db_session):
@pytest.mark.asyncio
async def test_thumbnail_unknown_id_returns_404(client):
response = await client.get(f"/api/v1/images/{uuid.uuid4()}/thumbnail")
response = await client.get("/api/v1/i/NotFound/thumbnail")
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"

View File

@@ -81,10 +81,10 @@ async def test_patch_replaces_tag_set(authed_client):
data={"tags": "old-tag"},
headers=headers,
)
image_id = r1.json()["id"]
image_id = r1.json()["short_id"]
patch = await client.patch(
f"/api/v1/images/{image_id}/tags",
f"/api/v1/i/{image_id}/tags",
json={"tags": ["new-tag", "another"]},
headers=headers,
)
@@ -104,10 +104,10 @@ async def test_patch_invalid_tag_returns_422(authed_client):
files={"file": ("invalid-tag-test.png", io.BytesIO(data), "image/png")},
headers=headers,
)
image_id = r1.json()["id"]
image_id = r1.json()["short_id"]
patch = await client.patch(
f"/api/v1/images/{image_id}/tags",
f"/api/v1/i/{image_id}/tags",
json={"tags": ["valid", "INVALID TAG WITH SPACES!"]},
headers=headers,
)

View File

@@ -3,10 +3,10 @@ T026 — valid JPEG upload → 201, record in DB, object in MinIO
T027 — same image uploaded twice → 200, duplicate: true, no second MinIO object
T028 — invalid MIME type → 422 invalid_mime_type (error envelope with code field)
T029 — file > MAX_UPLOAD_BYTES → 422 file_too_large
T079GET /api/v1/images/{id} 404 → error envelope shape
T013upload produces short_id; storage_key equals short_id; thumbnail_key = {short_id}-thumb
"""
import io
import uuid
import re
from unittest.mock import patch
import pytest
@@ -111,13 +111,81 @@ async def test_upload_oversized_file_returns_422(authed_client):
@pytest.mark.asyncio
async def test_get_unknown_image_returns_404_with_envelope(client):
response = await client.get(f"/api/v1/images/{uuid.uuid4()}")
response = await client.get("/api/v1/i/NotFound")
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"
assert "detail" in body
_SHORT_ID_RE = re.compile(r"^[a-zA-Z0-9]{8}$")
@pytest.mark.asyncio
async def test_upload_returns_short_id(authed_client):
client, token = authed_client
data = _minimal_jpeg()
response = await client.post(
"/api/v1/images",
files={"file": ("s1.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
body = response.json()
assert "short_id" in body
assert _SHORT_ID_RE.match(body["short_id"]), f"short_id invalid: {body['short_id']}"
@pytest.mark.asyncio
async def test_upload_storage_key_equals_short_id(authed_client):
client, token = authed_client
data = _real_jpeg(color=(10, 20, 30))
response = await client.post(
"/api/v1/images",
files={"file": ("s2.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
body = response.json()
assert body["storage_key"] == body["short_id"]
@pytest.mark.asyncio
async def test_upload_thumbnail_key_equals_short_id_thumb(authed_client):
client, token = authed_client
data = _real_jpeg(color=(30, 60, 90))
response = await client.post(
"/api/v1/images",
files={"file": ("s3.jpg", io.BytesIO(data), "image/jpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 201
body = response.json()
if body["thumbnail_key"] is not None:
assert body["thumbnail_key"] == f"{body['short_id']}-thumb"
@pytest.mark.asyncio
async def test_duplicate_upload_returns_same_short_id(authed_client):
client, token = authed_client
data = _real_jpeg(color=(200, 100, 50))
headers = {"Authorization": f"Bearer {token}"}
r1 = await client.post(
"/api/v1/images",
files={"file": ("dup_short.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert r1.status_code in (200, 201)
r2 = await client.post(
"/api/v1/images",
files={"file": ("dup_short.jpg", io.BytesIO(data), "image/jpeg")},
headers=headers,
)
assert r2.status_code == 200
assert r2.json()["duplicate"] is True
assert r2.json()["short_id"] == r1.json()["short_id"]
@pytest.mark.asyncio
async def test_upload_returns_thumbnail_key(authed_client):
client, token = authed_client
@@ -133,9 +201,9 @@ async def test_upload_returns_thumbnail_key(authed_client):
assert body["thumbnail_key"] is not None
assert body["thumbnail_key"].endswith("-thumb")
assert "file_url" in body
assert body["file_url"].startswith("/api/v1/images/")
assert body["file_url"].startswith("/api/v1/i/")
assert "thumbnail_url" in body
assert body["thumbnail_url"].startswith("/api/v1/images/")
assert body["thumbnail_url"].startswith("/api/v1/i/")
@pytest.mark.asyncio
@@ -177,5 +245,5 @@ async def test_upload_succeeds_when_thumbnail_fails(authed_client):
body = response.json()
assert body["thumbnail_key"] is None
assert "file_url" in body
assert body["file_url"].startswith("/api/v1/images/")
assert body["file_url"].startswith("/api/v1/i/")
assert body["thumbnail_url"] is None

View File

@@ -1,5 +1,3 @@
_BASE_ENV = {
"DATABASE_URL": "postgresql+asyncpg://u:p@localhost/db",
"S3_ENDPOINT_URL": "http://localhost:9000",
@@ -26,6 +24,7 @@ def test_settings_load_from_env(monkeypatch):
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
@@ -43,6 +42,7 @@ def test_settings_max_upload_bytes_override(monkeypatch):
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
@@ -55,6 +55,7 @@ def test_settings_jwt_expiry_override(monkeypatch):
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
@@ -67,6 +68,7 @@ def test_api_docs_enabled_default(monkeypatch):
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
@@ -79,6 +81,7 @@ def test_api_docs_enabled_false(monkeypatch):
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
@@ -91,6 +94,7 @@ def test_api_docs_invalid_value_defaults_to_enabled(monkeypatch):
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()

View File

@@ -1,6 +1,6 @@
import hashlib
from app.utils import compute_sha256
from app.utils import compute_sha256, generate_short_id
def test_sha256_known_bytes():
@@ -19,3 +19,24 @@ def test_sha256_returns_64_char_hex():
result = compute_sha256(b"test data")
assert len(result) == 64
assert all(c in "0123456789abcdef" for c in result)
def test_generate_short_id_length():
assert len(generate_short_id()) == 8
def test_generate_short_id_charset():
result = generate_short_id()
assert all(
c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" for c in result
)
def test_generate_short_id_randomness():
assert generate_short_id() != generate_short_id()
def test_generate_short_id_importable():
from app.utils import generate_short_id as fn
assert callable(fn)

View File

@@ -0,0 +1,110 @@
"""Unit tests for migrate_to_short_ids script logic."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture
def mock_image_null_short_id():
img = MagicMock()
img.id = "img-uuid-1"
img.short_id = None
img.storage_key = "oldhashkey1234567890"
img.thumbnail_key = "oldhashkey1234567890-thumb"
img.mime_type = "image/jpeg"
return img
@pytest.fixture
def mock_image_with_short_id():
img = MagicMock()
img.id = "img-uuid-2"
img.short_id = "AbCd1234"
img.storage_key = "AbCd1234"
img.thumbnail_key = "AbCd1234-thumb"
img.mime_type = "image/jpeg"
return img
@pytest.mark.asyncio
async def test_migrate_processes_image_without_short_id(mock_image_null_short_id):
"""Images with short_id IS NULL are processed: storage copied, DB updated, old keys deleted."""
from scripts.migrate_to_short_ids import migrate_image
storage = MagicMock()
storage.get = AsyncMock(return_value=b"imagedata")
storage.put = AsyncMock()
storage.delete = AsyncMock()
session = MagicMock()
session.execute = AsyncMock()
session.flush = AsyncMock()
old_key = mock_image_null_short_id.storage_key
new_short_id = "NewSh123"
with patch("scripts.migrate_to_short_ids.generate_short_id", return_value=new_short_id):
result = await migrate_image(mock_image_null_short_id, storage, session)
assert result is True
storage.put.assert_any_call(new_short_id, b"imagedata", "image/jpeg")
storage.delete.assert_any_call(old_key)
@pytest.mark.asyncio
async def test_migrate_skips_image_with_short_id(mock_image_with_short_id):
"""Images that already have a short_id are skipped."""
from scripts.migrate_to_short_ids import migrate_image
storage = MagicMock()
session = MagicMock()
result = await migrate_image(mock_image_with_short_id, storage, session)
assert result is False
storage.get.assert_not_called() if hasattr(storage.get, "assert_not_called") else None
@pytest.mark.asyncio
async def test_migrate_continues_on_storage_error(mock_image_null_short_id):
"""If storage copy fails, error is logged and migrate_image returns False without aborting."""
from scripts.migrate_to_short_ids import migrate_image
storage = MagicMock()
storage.get = AsyncMock(side_effect=Exception("storage read error"))
storage.put = AsyncMock()
storage.delete = AsyncMock()
session = MagicMock()
session.execute = AsyncMock()
session.flush = AsyncMock()
with patch("scripts.migrate_to_short_ids.generate_short_id", return_value="ErrSh123"):
result = await migrate_image(mock_image_null_short_id, storage, session)
assert result is False
storage.put.assert_not_called()
@pytest.mark.asyncio
async def test_migrate_summary_counts(mock_image_null_short_id, mock_image_with_short_id):
"""run_migration reports correct migrated and skipped counts."""
from scripts.migrate_to_short_ids import run_migration
storage = MagicMock()
storage.get = AsyncMock(return_value=b"data")
storage.put = AsyncMock()
storage.delete = AsyncMock()
session = MagicMock()
session.execute = AsyncMock()
session.flush = AsyncMock()
images = [mock_image_null_short_id, mock_image_with_short_id]
with patch("scripts.migrate_to_short_ids.generate_short_id", return_value="NewSh999"):
migrated, skipped, failed = await run_migration(images, storage, session)
assert migrated == 1
assert skipped == 1
assert failed == 0

View File

@@ -0,0 +1,59 @@
"""Unit tests for short_id generation, validation, and repository lookup."""
import re
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi import HTTPException
from app.routers.images import _validate_short_id
from app.utils import generate_short_id
_SHORT_ID_RE = re.compile(r"^[a-zA-Z0-9]{8}$")
def test_validate_short_id_accepts_valid():
_validate_short_id("AbCd1234") # must not raise
def test_validate_short_id_rejects_too_long():
with pytest.raises(HTTPException) as exc:
_validate_short_id("toolong!!")
assert exc.value.status_code == 422
def test_validate_short_id_rejects_too_short():
with pytest.raises(HTTPException) as exc:
_validate_short_id("short")
assert exc.value.status_code == 422
def test_validate_short_id_rejects_invalid_chars():
with pytest.raises(HTTPException) as exc:
_validate_short_id("has spa!")
assert exc.value.status_code == 422
def test_generate_short_id_unique():
ids = {generate_short_id() for _ in range(100)}
assert len(ids) > 90 # collision in 100 draws would be astronomically unlikely
def test_repo_get_by_short_id_uses_correct_field():
"""get_by_short_id selects on Image.short_id, not Image.id."""
import asyncio
from app.repositories.image_repo import ImageRepository
mock_session = MagicMock()
scalar = MagicMock()
scalar.scalar_one_or_none = MagicMock(return_value=None)
mock_session.execute = AsyncMock(return_value=scalar)
repo = ImageRepository(mock_session)
asyncio.get_event_loop().run_until_complete(repo.get_by_short_id("AbCd1234"))
call_args = mock_session.execute.call_args[0][0]
compiled = call_args.compile(compile_kwargs={"literal_binds": True})
assert "short_id" in str(compiled)
assert "AbCd1234" in str(compiled)

View File

@@ -2,17 +2,21 @@
T037 — tag normalisation: uppercase → lowercase, whitespace stripped
T038 — tag validation: rejects names > 64 chars, invalid chars
"""
import pytest
from app.repositories.tag_repo import TagRepository
@pytest.mark.parametrize("raw,expected", [
("Cat", "cat"),
(" funny ", "funny"),
("REACTION", "reaction"),
(" MiXeD ", "mixed"),
])
@pytest.mark.parametrize(
"raw,expected",
[
("Cat", "cat"),
(" funny ", "funny"),
("REACTION", "reaction"),
(" MiXeD ", "mixed"),
],
)
def test_normalise_lowercases_and_strips(raw, expected):
assert TagRepository.normalise(raw) == expected

View File

@@ -1,4 +1,5 @@
"""Unit tests for thumbnail generation utility."""
import io
from PIL import Image as PILImage

View File

@@ -1,14 +1,13 @@
import uuid
from unittest.mock import MagicMock
import pytest
from app.routers.images import _image_to_dict
def _make_image(*, thumbnail_key=None):
img = MagicMock()
img.id = uuid.UUID("00000000-0000-0000-0000-000000000001")
img.short_id = "AbCd1234"
img.hash = "abc123"
img.filename = "test.jpg"
img.mime_type = "image/jpeg"
@@ -27,6 +26,7 @@ def test_cdn_configured_with_thumbnail():
result = _image_to_dict(img, cdn_base="https://cdn.example.com")
assert result["file_url"] == "https://cdn.example.com/abc123storagekey"
assert result["thumbnail_url"] == "https://cdn.example.com/abc123storagekey-thumb"
assert result["short_id"] == "AbCd1234"
def test_cdn_configured_no_thumbnail():
@@ -34,19 +34,20 @@ def test_cdn_configured_no_thumbnail():
result = _image_to_dict(img, cdn_base="https://cdn.example.com")
assert result["file_url"] == "https://cdn.example.com/abc123storagekey"
assert result["thumbnail_url"] is None
assert result["short_id"] == "AbCd1234"
def test_no_cdn_with_thumbnail():
img = _make_image(thumbnail_key="abc123storagekey-thumb")
result = _image_to_dict(img, cdn_base=None)
assert result["file_url"] == "/api/v1/images/00000000-0000-0000-0000-000000000001/file"
assert result["thumbnail_url"] == "/api/v1/images/00000000-0000-0000-0000-000000000001/thumbnail"
assert result["file_url"] == "/api/v1/i/AbCd1234/file"
assert result["thumbnail_url"] == "/api/v1/i/AbCd1234/thumbnail"
def test_no_cdn_no_thumbnail():
img = _make_image(thumbnail_key=None)
result = _image_to_dict(img, cdn_base=None)
assert result["file_url"] == "/api/v1/images/00000000-0000-0000-0000-000000000001/file"
assert result["file_url"] == "/api/v1/i/AbCd1234/file"
assert result["thumbnail_url"] is None
@@ -63,3 +64,9 @@ def test_cdn_trailing_whitespace_normalised():
result = _image_to_dict(img, cdn_base="https://cdn.example.com ")
assert result["file_url"] == "https://cdn.example.com/abc123storagekey"
assert result["thumbnail_url"] == "https://cdn.example.com/abc123storagekey-thumb"
def test_short_id_in_response():
img = _make_image()
result = _image_to_dict(img, cdn_base=None)
assert result["short_id"] == "AbCd1234"