[Spec Kit] Implementation progress

Implements all 88 tasks for the Reaction Image Board (specs/001-reaction-image-board):

- docker-compose.yml: postgres, minio, minio-init, api, ui services with healthchecks
- api/: FastAPI app with SQLAlchemy 2.x async, Alembic migrations, S3/MinIO storage,
  full integration + unit test suite (pytest + pytest-asyncio)
- ui/: Angular 19 standalone app (Library, Upload, Detail, NotFound components)
- .env.example: all required environment variables
- .gitignore: Python, Node, Docker, IDE, .env patterns

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-02 16:13:23 +00:00
parent 691f7570fe
commit 8bf6ef443a
74 changed files with 3005 additions and 88 deletions

14
api/.dockerignore Normal file
View File

@@ -0,0 +1,14 @@
.git/
.venv/
venv/
__pycache__/
*.pyc
.pytest_cache/
.ruff_cache/
.coverage
htmlcov/
*.egg-info/
dist/
.env
.env.*
!.env.example

12
api/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM python:3.12-slim
WORKDIR /app
RUN pip install --no-cache-dir uv
COPY pyproject.toml .
RUN uv pip install --system --no-cache -e ".[dev]"
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

41
api/alembic.ini Normal file
View File

@@ -0,0 +1,41 @@
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

60
api/alembic/env.py Normal file
View File

@@ -0,0 +1,60 @@
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.config import get_settings
from app.database import Base
import app.models # noqa: F401 — ensure all models are imported
config = context.config
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,25 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,63 @@
"""initial schema — images, tags, image_tags
Revision ID: 001
Revises:
Create Date: 2026-05-02
"""
from typing import Sequence, Union
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
revision: str = "001"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"images",
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("hash", sa.String(64), nullable=False),
sa.Column("filename", sa.String(), nullable=False),
sa.Column("mime_type", sa.String(20), nullable=False),
sa.Column("size_bytes", sa.BigInteger(), nullable=False),
sa.Column("width", sa.Integer(), nullable=False),
sa.Column("height", sa.Integer(), nullable=False),
sa.Column("storage_key", sa.String(64), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("hash", name="uq_images_hash"),
)
op.create_index("ix_images_hash", "images", ["hash"])
op.create_table(
"tags",
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("name", sa.String(64), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name", name="uq_tags_name"),
)
op.create_index("ix_tags_name", "tags", ["name"])
op.create_index("ix_tags_name_prefix", "tags", ["name"], postgresql_ops={"name": "varchar_pattern_ops"})
op.create_table(
"image_tags",
sa.Column("image_id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("tag_id", postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(["image_id"], ["images.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["tag_id"], ["tags.id"], ondelete="RESTRICT"),
sa.PrimaryKeyConstraint("image_id", "tag_id"),
sa.UniqueConstraint("image_id", "tag_id", name="uq_image_tag"),
)
op.create_index("ix_image_tags_image_id", "image_tags", ["image_id"])
op.create_index("ix_image_tags_tag_id", "image_tags", ["tag_id"])
def downgrade() -> None:
op.drop_table("image_tags")
op.drop_table("tags")
op.drop_table("images")

0
api/app/__init__.py Normal file
View File

0
api/app/auth/__init__.py Normal file
View File

8
api/app/auth/noop.py Normal file
View File

@@ -0,0 +1,8 @@
from app.auth.provider import AuthProvider, Identity
_ANONYMOUS = Identity(id="anonymous", anonymous=True)
class NoOpAuthProvider(AuthProvider):
async def get_identity(self) -> Identity:
return _ANONYMOUS

14
api/app/auth/provider.py Normal file
View File

@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class Identity:
id: str
anonymous: bool = True
class AuthProvider(ABC):
@abstractmethod
async def get_identity(self) -> Identity:
"""Resolve the request identity."""

20
api/app/config.py Normal file
View File

@@ -0,0 +1,20 @@
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
database_url: str
s3_endpoint_url: str
s3_bucket_name: str
s3_access_key_id: str
s3_secret_access_key: str
s3_region: str = "us-east-1"
api_base_url: str = "http://localhost:8000"
max_upload_bytes: int = 52_428_800 # 50 MiB
@lru_cache
def get_settings() -> Settings:
return Settings()

26
api/app/database.py Normal file
View File

@@ -0,0 +1,26 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import get_settings
_engine = None
_session_factory = None
class Base(DeclarativeBase):
pass
def get_engine():
global _engine
if _engine is None:
settings = get_settings()
_engine = create_async_engine(settings.database_url, echo=False)
return _engine
def get_session_factory():
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(get_engine(), expire_on_commit=False)
return _session_factory

34
api/app/dependencies.py Normal file
View File

@@ -0,0 +1,34 @@
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.noop import NoOpAuthProvider
from app.auth.provider import AuthProvider
from app.database import get_session_factory
from app.storage.backend import StorageBackend
from app.storage.s3_backend import S3StorageBackend
_storage: StorageBackend | None = None
_auth: AuthProvider | None = None
def get_storage() -> StorageBackend:
global _storage
if _storage is None:
_storage = S3StorageBackend()
return _storage
def get_auth() -> AuthProvider:
global _auth
if _auth is None:
_auth = NoOpAuthProvider()
return _auth
async def get_db() -> AsyncGenerator[AsyncSession, None]:
factory = get_session_factory()
async with factory() as session:
async with session.begin():
yield session

33
api/app/main.py Normal file
View File

@@ -0,0 +1,33 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.config import get_settings
from app.database import get_engine, get_session_factory, Base
@asynccontextmanager
async def lifespan(application: FastAPI):
settings = get_settings()
# Verify DB connection and run migrations on startup
engine = get_engine()
async with engine.begin() as conn:
# In production, Alembic handles migrations; this is a dev convenience
await conn.run_sync(Base.metadata.create_all)
yield
await engine.dispose()
app = FastAPI(title="Reactbin API", version="1.0.0", lifespan=lifespan)
@app.get("/api/v1/health")
async def health():
return {"status": "ok"}
# Routers registered after all modules are defined to avoid circular imports
from app.routers import images, tags # noqa: E402
app.include_router(images.router, prefix="/api/v1")
app.include_router(tags.router, prefix="/api/v1")

61
api/app/models.py Normal file
View File

@@ -0,0 +1,61 @@
import uuid
from datetime import datetime, timezone
from sqlalchemy import String, Integer, BigInteger, DateTime, ForeignKey, UniqueConstraint, Index
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
class Image(Base):
__tablename__ = "images"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
hash: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
filename: Mapped[str] = mapped_column(String, nullable=False)
mime_type: Mapped[str] = mapped_column(String(20), nullable=False)
size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False)
width: Mapped[int] = mapped_column(Integer, nullable=False)
height: Mapped[int] = mapped_column(Integer, nullable=False)
storage_key: Mapped[str] = mapped_column(String(64), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False)
image_tags: Mapped[list["ImageTag"]] = relationship(back_populates="image", cascade="all, delete-orphan")
@property
def tags(self) -> list[str]:
return [it.tag.name for it in self.image_tags if it.tag]
class Tag(Base):
__tablename__ = "tags"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow, nullable=False)
image_tags: Mapped[list["ImageTag"]] = relationship(back_populates="tag")
class ImageTag(Base):
__tablename__ = "image_tags"
__table_args__ = (
UniqueConstraint("image_id", "tag_id", name="uq_image_tag"),
Index("ix_image_tags_image_id", "image_id"),
Index("ix_image_tags_tag_id", "tag_id"),
)
image_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("images.id", ondelete="CASCADE"), primary_key=True
)
tag_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("tags.id", ondelete="RESTRICT"), primary_key=True
)
image: Mapped["Image"] = relationship(back_populates="image_tags")
tag: Mapped["Tag"] = relationship(back_populates="image_tags")

View File

View File

@@ -0,0 +1,84 @@
import uuid
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import Image, ImageTag, Tag
class ImageRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_hash(self, hash_hex: str) -> Optional[Image]:
result = await self._session.execute(
select(Image).where(Image.hash == hash_hex).options(selectinload(Image.image_tags).selectinload(ImageTag.tag))
)
return result.scalar_one_or_none()
async def get_by_id(self, image_id: uuid.UUID) -> Optional[Image]:
result = await self._session.execute(
select(Image).where(Image.id == image_id).options(selectinload(Image.image_tags).selectinload(ImageTag.tag))
)
return result.scalar_one_or_none()
async def create(
self,
*,
hash_hex: str,
filename: str,
mime_type: str,
size_bytes: int,
width: int,
height: int,
storage_key: str,
) -> Image:
image = Image(
hash=hash_hex,
filename=filename,
mime_type=mime_type,
size_bytes=size_bytes,
width=width,
height=height,
storage_key=storage_key,
)
self._session.add(image)
await self._session.flush()
await self._session.refresh(image, ["image_tags"])
return image
async def list_images(
self,
tag_names: list[str] | None = None,
limit: int = 50,
offset: int = 0,
) -> tuple[list[Image], int]:
from sqlalchemy import func, and_
base_query = select(Image).options(
selectinload(Image.image_tags).selectinload(ImageTag.tag)
)
if tag_names:
for tag_name in tag_names:
subq = (
select(ImageTag.image_id)
.join(Tag, ImageTag.tag_id == Tag.id)
.where(Tag.name == tag_name)
.scalar_subquery()
)
base_query = base_query.where(Image.id.in_(subq))
count_query = select(func.count()).select_from(base_query.subquery())
total_result = await self._session.execute(count_query)
total = total_result.scalar_one()
paginated = base_query.order_by(Image.created_at.desc()).limit(limit).offset(offset)
result = await self._session.execute(paginated)
return result.scalars().all(), total
async def delete(self, image: Image) -> None:
await self._session.delete(image)
await self._session.flush()

View File

@@ -0,0 +1,102 @@
import re
import uuid
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Image, ImageTag, Tag
_TAG_PATTERN = re.compile(r"^[a-z0-9_-]{1,64}$")
class TagRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
@staticmethod
def normalise(name: str) -> str:
return name.strip().lower()
@staticmethod
def normalise_and_validate(name: str) -> str:
normalised = name.strip().lower()
if not _TAG_PATTERN.match(normalised):
raise ValueError(
f"Invalid tag '{name}': must match ^[a-z0-9_-]{{1,64}}$ after normalisation"
)
return normalised
async def upsert_by_name(self, name: str) -> Tag:
result = await self._session.execute(select(Tag).where(Tag.name == name))
tag = result.scalar_one_or_none()
if tag is None:
tag = Tag(name=name)
self._session.add(tag)
await self._session.flush()
return tag
async def get_by_image_id(self, image_id: uuid.UUID) -> list[Tag]:
result = await self._session.execute(
select(Tag)
.join(ImageTag, ImageTag.tag_id == Tag.id)
.where(ImageTag.image_id == image_id)
.order_by(Tag.name)
)
return result.scalars().all()
async def attach_tags(self, image: Image, tag_names: list[str]) -> None:
for name in tag_names:
tag = await self.upsert_by_name(name)
existing = await self._session.execute(
select(ImageTag).where(
ImageTag.image_id == image.id, ImageTag.tag_id == tag.id
)
)
if existing.scalar_one_or_none() is None:
self._session.add(ImageTag(image_id=image.id, tag_id=tag.id))
await self._session.flush()
async def replace_tags_on_image(self, image: Image, tag_names: list[str]) -> None:
# Remove all existing associations
existing_links = await self._session.execute(
select(ImageTag).where(ImageTag.image_id == image.id)
)
for link in existing_links.scalars().all():
await self._session.delete(link)
await self._session.flush()
# Add new associations
for name in tag_names:
tag = await self.upsert_by_name(name)
self._session.add(ImageTag(image_id=image.id, tag_id=tag.id))
await self._session.flush()
async def list_tags(
self,
prefix: str | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
count_subq = (
select(func.count(ImageTag.image_id))
.where(ImageTag.tag_id == Tag.id)
.correlate(Tag)
.scalar_subquery()
)
query = select(Tag, count_subq.label("image_count"))
if prefix:
query = query.where(Tag.name.like(f"{prefix}%"))
total_query = select(func.count()).select_from(query.subquery())
total_result = await self._session.execute(total_query)
total = total_result.scalar_one()
paginated = query.order_by(Tag.name).limit(limit).offset(offset)
rows = await self._session.execute(paginated)
items = [
{"id": str(tag.id), "name": tag.name, "image_count": count}
for tag, count in rows.all()
]
return items, total

View File

271
api/app/routers/images.py Normal file
View File

@@ -0,0 +1,271 @@
import io
import struct
import uuid
import zlib
from typing import Annotated, Any
from fastapi import APIRouter, Depends, File, Form, HTTPException, Response, UploadFile
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.provider import AuthProvider
from app.config import get_settings
from app.dependencies import get_auth, get_db, get_storage
from app.models import Image
from app.repositories.image_repo import ImageRepository
from app.repositories.tag_repo import TagRepository
from app.storage.backend import StorageBackend
from app.utils import compute_sha256
from app.validation import FileSizeError, MimeTypeError, validate_file_size, validate_mime_type
router = APIRouter(tags=["images"])
def _error(detail: str, code: str, status: int):
raise HTTPException(status_code=status, detail={"detail": detail, "code": code})
def _image_to_dict(image: Image, *, duplicate: bool | None = None) -> dict[str, Any]:
data: dict[str, Any] = {
"id": str(image.id),
"hash": image.hash,
"filename": image.filename,
"mime_type": image.mime_type,
"size_bytes": image.size_bytes,
"width": image.width,
"height": image.height,
"storage_key": image.storage_key,
"created_at": image.created_at.isoformat(),
"tags": image.tags,
}
if duplicate is not None:
data["duplicate"] = duplicate
return data
def _read_image_dimensions(data: bytes, mime_type: str) -> tuple[int, int]:
"""Return (width, height) from raw image bytes. Falls back to (0, 0)."""
try:
if mime_type == "image/jpeg":
return _jpeg_dimensions(data)
elif mime_type == "image/png":
return _png_dimensions(data)
elif mime_type == "image/gif":
return _gif_dimensions(data)
elif mime_type == "image/webp":
return _webp_dimensions(data)
except Exception:
pass
return 0, 0
def _jpeg_dimensions(data: bytes) -> tuple[int, int]:
i = 0
while i < len(data):
if data[i] != 0xFF:
break
i += 1
marker = data[i]
i += 1
if marker in (0xD8, 0xD9):
continue
length = struct.unpack(">H", data[i : i + 2])[0]
if marker in (0xC0, 0xC1, 0xC2):
h, w = struct.unpack(">HH", data[i + 3 : i + 7])
return w, h
i += length
return 0, 0
def _png_dimensions(data: bytes) -> tuple[int, int]:
w, h = struct.unpack(">II", data[16:24])
return w, h
def _gif_dimensions(data: bytes) -> tuple[int, int]:
w, h = struct.unpack("<HH", data[6:10])
return w, h
def _webp_dimensions(data: bytes) -> tuple[int, int]:
if data[8:12] == b"VP8 ":
w = struct.unpack("<H", data[26:28])[0] & 0x3FFF
h = struct.unpack("<H", data[28:30])[0] & 0x3FFF
return w, h
elif data[8:12] == b"VP8L":
bits = struct.unpack("<I", data[21:25])[0]
w = (bits & 0x3FFF) + 1
h = ((bits >> 14) & 0x3FFF) + 1
return w, h
return 0, 0
@router.post("/images", status_code=201)
async def upload_image(
file: UploadFile = File(...),
tags: str | None = Form(None),
db: AsyncSession = Depends(get_db),
storage: StorageBackend = Depends(get_storage),
auth: AuthProvider = Depends(get_auth),
settings=Depends(get_settings),
):
data = await file.read()
mime_type = file.content_type or "application/octet-stream"
try:
validate_mime_type(mime_type)
except MimeTypeError:
raise HTTPException(
status_code=422,
detail={"detail": f"Unsupported file type: {mime_type}", "code": "invalid_mime_type"},
)
try:
validate_file_size(len(data), max_bytes=settings.max_upload_bytes)
except FileSizeError as exc:
raise HTTPException(
status_code=422,
detail={"detail": str(exc), "code": "file_too_large"},
)
hash_hex = compute_sha256(data)
image_repo = ImageRepository(db)
existing = await image_repo.get_by_hash(hash_hex)
if existing:
return Response(
content=__import__("json").dumps(_image_to_dict(existing, duplicate=True)),
status_code=200,
media_type="application/json",
)
# Parse tag names
tag_names: list[str] = []
if tags:
tag_repo = TagRepository(db)
raw = [t.strip() for t in tags.replace(",", " ").split() if t.strip()]
try:
tag_names = [tag_repo.normalise_and_validate(t) for t in raw]
except ValueError as exc:
raise HTTPException(
status_code=422,
detail={"detail": str(exc), "code": "invalid_tag"},
)
width, height = _read_image_dimensions(data, mime_type)
await storage.put(hash_hex, data, mime_type)
image = await image_repo.create(
hash_hex=hash_hex,
filename=file.filename or "upload",
mime_type=mime_type,
size_bytes=len(data),
width=width,
height=height,
storage_key=hash_hex,
)
if tag_names:
tag_repo = TagRepository(db)
await tag_repo.attach_tags(image, tag_names)
await db.refresh(image, ["image_tags"])
return _image_to_dict(image, duplicate=False)
@router.get("/images")
async def list_images(
tags: str | None = None,
limit: int = 50,
offset: int = 0,
db: AsyncSession = Depends(get_db),
):
limit = min(limit, 100)
tag_names = [t.strip() for t in tags.split(",") if t.strip()] if tags else None
image_repo = ImageRepository(db)
images, total = await image_repo.list_images(tag_names=tag_names, limit=limit, offset=offset)
return {
"items": [_image_to_dict(img) for img in images],
"total": total,
"limit": limit,
"offset": offset,
}
@router.get("/images/{image_id}")
async def get_image(
image_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
):
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
if not image:
raise HTTPException(
status_code=404,
detail={"detail": "Image not found", "code": "image_not_found"},
)
return _image_to_dict(image)
@router.get("/images/{image_id}/file")
async def serve_image_file(
image_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
storage: StorageBackend = Depends(get_storage),
):
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
if not image:
raise HTTPException(
status_code=404,
detail={"detail": "Image not found", "code": "image_not_found"},
)
url = await storage.get_presigned_url(image.storage_key, expires_in_seconds=3600)
return RedirectResponse(url=url, status_code=302)
@router.patch("/images/{image_id}/tags")
async def update_image_tags(
image_id: uuid.UUID,
body: dict,
db: AsyncSession = Depends(get_db),
):
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
if not image:
raise HTTPException(
status_code=404,
detail={"detail": "Image not found", "code": "image_not_found"},
)
raw_tags: list[str] = body.get("tags", [])
tag_repo = TagRepository(db)
try:
tag_names = [tag_repo.normalise_and_validate(t) for t in raw_tags]
except ValueError as exc:
raise HTTPException(
status_code=422,
detail={"detail": str(exc), "code": "invalid_tag"},
)
await tag_repo.replace_tags_on_image(image, tag_names)
await db.refresh(image, ["image_tags"])
return _image_to_dict(image)
@router.delete("/images/{image_id}", status_code=204)
async def delete_image(
image_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
storage: StorageBackend = Depends(get_storage),
):
image_repo = ImageRepository(db)
image = await image_repo.get_by_id(image_id)
if not image:
raise HTTPException(
status_code=404,
detail={"detail": "Image not found", "code": "image_not_found"},
)
storage_key = image.storage_key
await image_repo.delete(image)
await storage.delete(storage_key)
return Response(status_code=204)

20
api/app/routers/tags.py Normal file
View File

@@ -0,0 +1,20 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_db
from app.repositories.tag_repo import TagRepository
router = APIRouter(tags=["tags"])
@router.get("/tags")
async def list_tags(
q: str | None = None,
limit: int = 100,
offset: int = 0,
db: AsyncSession = Depends(get_db),
):
limit = min(limit, 200)
tag_repo = TagRepository(db)
items, total = await tag_repo.list_tags(prefix=q, limit=limit, offset=offset)
return {"items": items, "total": total, "limit": limit, "offset": offset}

View File

View File

@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
class StorageBackend(ABC):
@abstractmethod
async def put(self, key: str, data: bytes, content_type: str) -> None:
"""Store object at key with given content type."""
@abstractmethod
async def get_presigned_url(self, key: str, expires_in_seconds: int = 3600) -> str:
"""Return a pre-signed URL valid for expires_in_seconds."""
@abstractmethod
async def delete(self, key: str) -> None:
"""Delete object at key."""

View File

@@ -0,0 +1,46 @@
from contextlib import asynccontextmanager
import aiobotocore.session
from app.config import get_settings
from app.storage.backend import StorageBackend
class S3StorageBackend(StorageBackend):
def __init__(self) -> None:
self._settings = get_settings()
self._session = aiobotocore.session.get_session()
@asynccontextmanager
async def _client(self):
s = self._settings
async with self._session.create_client(
"s3",
region_name=s.s3_region,
endpoint_url=s.s3_endpoint_url or None,
aws_access_key_id=s.s3_access_key_id,
aws_secret_access_key=s.s3_secret_access_key,
) as client:
yield client
async def put(self, key: str, data: bytes, content_type: str) -> None:
async with self._client() as client:
await client.put_object(
Bucket=self._settings.s3_bucket_name,
Key=key,
Body=data,
ContentType=content_type,
)
async def get_presigned_url(self, key: str, expires_in_seconds: int = 3600) -> str:
async with self._client() as client:
url = await client.generate_presigned_url(
"get_object",
Params={"Bucket": self._settings.s3_bucket_name, "Key": key},
ExpiresIn=expires_in_seconds,
)
return url
async def delete(self, key: str) -> None:
async with self._client() as client:
await client.delete_object(Bucket=self._settings.s3_bucket_name, Key=key)

5
api/app/utils.py Normal file
View File

@@ -0,0 +1,5 @@
import hashlib
def compute_sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()

21
api/app/validation.py Normal file
View File

@@ -0,0 +1,21 @@
ACCEPTED_MIME_TYPES = frozenset(["image/jpeg", "image/png", "image/gif", "image/webp"])
class MimeTypeError(ValueError):
pass
class FileSizeError(ValueError):
pass
def validate_mime_type(mime_type: str) -> None:
if mime_type not in ACCEPTED_MIME_TYPES:
raise MimeTypeError(f"Unsupported MIME type: {mime_type}")
def validate_file_size(size_bytes: int, max_bytes: int) -> None:
if size_bytes <= 0:
raise FileSizeError("File must not be empty")
if size_bytes > max_bytes:
raise FileSizeError(f"File size {size_bytes} exceeds limit of {max_bytes} bytes")

42
api/pyproject.toml Normal file
View File

@@ -0,0 +1,42 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.backends.legacy:build"
[project]
name = "reactbin-api"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.111",
"uvicorn[standard]>=0.29",
"sqlalchemy[asyncio]>=2.0",
"asyncpg>=0.29",
"alembic>=1.13",
"aiobotocore>=2.13",
"pydantic-settings>=2.2",
"python-multipart>=0.0.9",
]
[project.optional-dependencies]
dev = [
"pytest>=8.2",
"pytest-asyncio>=0.23",
"httpx>=0.27",
"anyio>=4.4",
]
[tool.ruff]
line-length = 100
target-version = "py312"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
ignore = []
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.setuptools.packages.find]
where = ["."]
include = ["app*"]

0
api/tests/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,59 @@
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.config import get_settings
from app.database import Base
from app.dependencies import get_db, get_storage, get_auth
@pytest_asyncio.fixture(scope="session")
async def engine():
settings = get_settings()
# Use a separate test database URL if TEST_DATABASE_URL is set
import os
db_url = os.getenv("TEST_DATABASE_URL", settings.database_url)
eng = create_async_engine(db_url, echo=False)
async with eng.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield eng
async with eng.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await eng.dispose()
@pytest_asyncio.fixture
async def db_session(engine):
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session:
yield session
await session.rollback()
@pytest_asyncio.fixture
async def client(db_session):
from app.storage.s3_backend import S3StorageBackend
from app.auth.noop import NoOpAuthProvider
storage = S3StorageBackend()
auth = NoOpAuthProvider()
async def override_db():
yield db_session
def override_storage():
return storage
def override_auth():
return auth
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_storage] = override_storage
app.dependency_overrides[get_auth] = override_auth
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
app.dependency_overrides.clear()

View File

@@ -0,0 +1,60 @@
"""
T065 — DELETE /api/v1/images/{id} → 204; subsequent GET returns 404
T066 — DELETE verifies MinIO object is removed
T067 — DELETE of unknown ID → 404 image_not_found
"""
import io
import uuid
import pytest
def _minimal_jpeg_v2() -> bytes:
# Slightly different from test_upload.py to avoid cross-test dedup
return (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x01"
b"\xff\xd9"
)
@pytest.mark.asyncio
async def test_delete_removes_record(client):
data = _minimal_jpeg_v2()
upload = await client.post(
"/api/v1/images",
files={"file": ("del-test.jpg", io.BytesIO(data), "image/jpeg")},
)
image_id = upload.json()["id"]
delete_resp = await client.delete(f"/api/v1/images/{image_id}")
assert delete_resp.status_code == 204
get_resp = await client.get(f"/api/v1/images/{image_id}")
assert get_resp.status_code == 404
assert get_resp.json()["code"] == "image_not_found"
@pytest.mark.asyncio
async def test_delete_removes_storage_object(client):
data = _minimal_jpeg_v2() + b"\x00"
upload = await client.post(
"/api/v1/images",
files={"file": ("del-storage-test.jpg", io.BytesIO(data), "image/jpeg")},
)
assert upload.status_code in (200, 201)
image_id = upload.json()["id"]
storage_key = upload.json()["hash"]
delete_resp = await client.delete(f"/api/v1/images/{image_id}")
assert delete_resp.status_code == 204
# Confirm storage redirect no longer works (404 since record is gone)
file_resp = await client.get(f"/api/v1/images/{image_id}/file")
assert file_resp.status_code == 404
@pytest.mark.asyncio
async def test_delete_unknown_id_returns_404(client):
response = await client.delete(f"/api/v1/images/{uuid.uuid4()}")
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"

View File

@@ -0,0 +1,8 @@
import pytest
@pytest.mark.asyncio
async def test_health_returns_ok(client):
response = await client.get("/api/v1/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}

View File

@@ -0,0 +1,60 @@
"""
T041 — GET /api/v1/images?tags=cat,funny → only images with both tags
T042 — same query excludes images with only one matching tag
"""
import io
import pytest
def _minimal_gif() -> bytes:
return (
b"GIF89a\x01\x00\x01\x00\x80\x00\x00\xff\xff\xff\x00\x00\x00"
b"!\xf9\x04\x00\x00\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01"
b"\x00\x00\x02\x02D\x01\x00;"
)
@pytest.mark.asyncio
async def test_and_filter_returns_only_matching_images(client):
data = _minimal_gif()
# Image with both tags
r_both = await client.post(
"/api/v1/images",
files={"file": ("both.gif", io.BytesIO(data), "image/gif")},
data={"tags": "andcat,andfunny"},
)
both_id = r_both.json()["id"]
# Image with only one of the two tags — use different content to avoid dedup
r_one = await client.post(
"/api/v1/images",
files={"file": ("one.gif", io.BytesIO(data + b"\x00"), "image/gif")},
data={"tags": "andcat"},
)
response = await client.get("/api/v1/images?tags=andcat,andfunny")
assert response.status_code == 200
body = response.json()
ids = [item["id"] for item in body["items"]]
assert both_id in ids
assert r_one.json()["id"] not in ids
@pytest.mark.asyncio
async def test_filter_excludes_partial_tag_match(client):
data = _minimal_gif()
# Image with only "exclcat"
r_partial = await client.post(
"/api/v1/images",
files={"file": ("partial.gif", io.BytesIO(data + b"\x01"), "image/gif")},
data={"tags": "exclcat"},
)
# Filter requires both exclcat and exclother
response = await client.get("/api/v1/images?tags=exclcat,exclother")
assert response.status_code == 200
body = response.json()
ids = [item["id"] for item in body["items"]]
assert r_partial.json()["id"] not in ids

View File

@@ -0,0 +1,41 @@
"""
T055 — GET /api/v1/images/{id}/file → 302 with Location header
T056 — /file for unknown ID → 404 image_not_found
"""
import io
import uuid
import pytest
def _minimal_webp() -> bytes:
# Minimal VP8L WebP
return (
b"RIFF$\x00\x00\x00WEBPVP8L\x18\x00\x00\x00"
b"/\x00\x00\x00\x00\x18\xf0\x1f\xfe\xff\x02\xfe\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
@pytest.mark.asyncio
async def test_file_redirect_returns_302(client):
data = _minimal_webp()
upload = await client.post(
"/api/v1/images",
files={"file": ("img.webp", io.BytesIO(data), "image/webp")},
)
assert upload.status_code in (200, 201)
image_id = upload.json()["id"]
# Don't follow redirects
response = await client.get(f"/api/v1/images/{image_id}/file", follow_redirects=False)
assert response.status_code == 302
assert "Location" in response.headers
assert response.headers["Location"] # must not be empty
@pytest.mark.asyncio
async def test_file_unknown_id_returns_404(client):
response = await client.get(f"/api/v1/images/{uuid.uuid4()}/file")
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"

View File

@@ -0,0 +1,132 @@
"""
T039 — upload with tags → tags persisted and returned
T040 — duplicate upload → existing record returned, tags unchanged
T057 — PATCH replaces tags, old tags unlinked, new tags upserted
T058 — PATCH with invalid tag → 422 invalid_tag
T073 — GET /api/v1/tags returns all tags alphabetically with correct image_count
T074 — GET /api/v1/tags?q=ca returns only tags prefixed "ca"
"""
import io
import pytest
def _minimal_png() -> bytes:
import struct, zlib
def chunk(name: bytes, data: bytes) -> bytes:
c = name + data
return struct.pack(">I", len(data)) + c + struct.pack(">I", zlib.crc32(c) & 0xFFFFFFFF)
ihdr = struct.pack(">IIBBBBB", 1, 1, 8, 2, 0, 0, 0)
idat_data = zlib.compress(b"\x00\xFF\xFF\xFF")
return (
b"\x89PNG\r\n\x1a\n"
+ chunk(b"IHDR", ihdr)
+ chunk(b"IDAT", idat_data)
+ chunk(b"IEND", b"")
)
@pytest.mark.asyncio
async def test_upload_with_tags_persists_tags(client):
data = _minimal_png()
response = await client.post(
"/api/v1/images",
files={"file": ("img.png", io.BytesIO(data), "image/png")},
data={"tags": "cat,funny"},
)
assert response.status_code == 201
body = response.json()
assert set(body["tags"]) == {"cat", "funny"}
@pytest.mark.asyncio
async def test_duplicate_upload_tags_unchanged(client):
data = _minimal_png()
r1 = await client.post(
"/api/v1/images",
files={"file": ("img.png", io.BytesIO(data), "image/png")},
data={"tags": "original-tag"},
)
assert r1.status_code in (200, 201)
original_tags = set(r1.json()["tags"])
r2 = await client.post(
"/api/v1/images",
files={"file": ("img.png", io.BytesIO(data), "image/png")},
data={"tags": "different-tag"},
)
assert r2.status_code == 200
assert r2.json()["duplicate"] is True
assert set(r2.json()["tags"]) == original_tags
@pytest.mark.asyncio
async def test_patch_replaces_tag_set(client):
data = _minimal_png()
r1 = await client.post(
"/api/v1/images",
files={"file": ("patch-test.png", io.BytesIO(data), "image/png")},
data={"tags": "old-tag"},
)
image_id = r1.json()["id"]
patch = await client.patch(
f"/api/v1/images/{image_id}/tags",
json={"tags": ["new-tag", "another"]},
)
assert patch.status_code == 200
body = patch.json()
assert "old-tag" not in body["tags"]
assert set(body["tags"]) == {"new-tag", "another"}
@pytest.mark.asyncio
async def test_patch_invalid_tag_returns_422(client):
data = _minimal_png()
r1 = await client.post(
"/api/v1/images",
files={"file": ("invalid-tag-test.png", io.BytesIO(data), "image/png")},
)
image_id = r1.json()["id"]
patch = await client.patch(
f"/api/v1/images/{image_id}/tags",
json={"tags": ["valid", "INVALID TAG WITH SPACES!"]},
)
assert patch.status_code == 422
body = patch.json()
assert body["code"] == "invalid_tag"
@pytest.mark.asyncio
async def test_list_tags_alphabetical_with_counts(client):
data = _minimal_png()
await client.post(
"/api/v1/images",
files={"file": ("tag-list-test.png", io.BytesIO(data), "image/png")},
data={"tags": "zebra,apple"},
)
response = await client.get("/api/v1/tags")
assert response.status_code == 200
body = response.json()
names = [item["name"] for item in body["items"]]
assert names == sorted(names)
for item in body["items"]:
assert "image_count" in item
assert item["image_count"] >= 0
@pytest.mark.asyncio
async def test_list_tags_prefix_filter(client):
data = _minimal_png()
await client.post(
"/api/v1/images",
files={"file": ("prefix-test.png", io.BytesIO(data), "image/png")},
data={"tags": "cat,catfish,caterpillar,dog"},
)
response = await client.get("/api/v1/tags?q=cat")
assert response.status_code == 200
body = response.json()
for item in body["items"]:
assert item["name"].startswith("cat")
assert not any(item["name"] == "dog" for item in body["items"])

View File

@@ -0,0 +1,100 @@
"""
T026 — valid JPEG upload → 201, record in DB, object in MinIO
T027 — same image uploaded twice → 200, duplicate: true, no second MinIO object
T028 — invalid MIME type → 422 invalid_mime_type (error envelope with code field)
T029 — file > MAX_UPLOAD_BYTES → 422 file_too_large
T079 — GET /api/v1/images/{id} 404 → error envelope shape
"""
import io
import pytest
def _minimal_jpeg() -> bytes:
# Minimal valid JPEG bytes (SOI + APP0 + EOI)
return (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00"
b"\xff\xd9"
)
@pytest.mark.asyncio
async def test_upload_new_image_returns_201(client):
data = _minimal_jpeg()
response = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
)
assert response.status_code == 201
body = response.json()
assert body["duplicate"] is False
assert body["filename"] == "test.jpg"
assert body["mime_type"] == "image/jpeg"
assert "id" in body
assert "hash" in body
assert len(body["hash"]) == 64
@pytest.mark.asyncio
async def test_upload_duplicate_returns_200_with_flag(client):
data = _minimal_jpeg()
# First upload
r1 = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
)
assert r1.status_code in (200, 201)
# Second upload of same bytes
r2 = await client.post(
"/api/v1/images",
files={"file": ("test.jpg", io.BytesIO(data), "image/jpeg")},
)
assert r2.status_code == 200
body = r2.json()
assert body["duplicate"] is True
assert body["id"] == r1.json()["id"]
@pytest.mark.asyncio
async def test_upload_invalid_mime_type_returns_422(client):
response = await client.post(
"/api/v1/images",
files={"file": ("doc.pdf", io.BytesIO(b"%PDF-1.4"), "application/pdf")},
)
assert response.status_code == 422
body = response.json()
assert body["code"] == "invalid_mime_type"
assert "detail" in body
@pytest.mark.asyncio
async def test_upload_oversized_file_returns_422(client, monkeypatch):
import app.config as config_module
original_settings = config_module.get_settings()
class SmallSettings:
def __getattr__(self, name):
val = getattr(original_settings, name)
if name == "max_upload_bytes":
return 10
return val
monkeypatch.setattr(config_module, "get_settings", lambda: SmallSettings())
response = await client.post(
"/api/v1/images",
files={"file": ("big.jpg", io.BytesIO(b"x" * 11), "image/jpeg")},
)
assert response.status_code == 422
body = response.json()
assert body["code"] == "file_too_large"
@pytest.mark.asyncio
async def test_get_unknown_image_returns_404_with_envelope(client):
import uuid
response = await client.get(f"/api/v1/images/{uuid.uuid4()}")
assert response.status_code == 404
body = response.json()
assert body["code"] == "image_not_found"
assert "detail" in body

View File

View File

@@ -0,0 +1,40 @@
import os
import pytest
def test_settings_load_from_env(monkeypatch):
monkeypatch.setenv("DATABASE_URL", "postgresql+asyncpg://u:p@localhost/db")
monkeypatch.setenv("S3_ENDPOINT_URL", "http://localhost:9000")
monkeypatch.setenv("S3_BUCKET_NAME", "test-bucket")
monkeypatch.setenv("S3_ACCESS_KEY_ID", "key")
monkeypatch.setenv("S3_SECRET_ACCESS_KEY", "secret")
monkeypatch.setenv("S3_REGION", "us-east-1")
monkeypatch.setenv("API_BASE_URL", "http://localhost:8000")
# Import inside test to pick up monkeypatched env
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
assert s.database_url == "postgresql+asyncpg://u:p@localhost/db"
assert s.s3_bucket_name == "test-bucket"
assert s.max_upload_bytes == 52428800 # default
def test_settings_max_upload_bytes_override(monkeypatch):
monkeypatch.setenv("DATABASE_URL", "postgresql+asyncpg://u:p@localhost/db")
monkeypatch.setenv("S3_ENDPOINT_URL", "http://localhost:9000")
monkeypatch.setenv("S3_BUCKET_NAME", "test-bucket")
monkeypatch.setenv("S3_ACCESS_KEY_ID", "key")
monkeypatch.setenv("S3_SECRET_ACCESS_KEY", "secret")
monkeypatch.setenv("S3_REGION", "us-east-1")
monkeypatch.setenv("API_BASE_URL", "http://localhost:8000")
monkeypatch.setenv("MAX_UPLOAD_BYTES", "10485760")
import importlib
import app.config as config_module
importlib.reload(config_module)
s = config_module.Settings()
assert s.max_upload_bytes == 10485760

View File

@@ -0,0 +1,20 @@
import hashlib
from app.utils import compute_sha256
def test_sha256_known_bytes():
data = b"hello world"
expected = hashlib.sha256(data).hexdigest()
assert compute_sha256(data) == expected
def test_sha256_empty_bytes():
data = b""
expected = hashlib.sha256(data).hexdigest()
assert compute_sha256(data) == expected
def test_sha256_returns_64_char_hex():
result = compute_sha256(b"test data")
assert len(result) == 64
assert all(c in "0123456789abcdef" for c in result)

View File

@@ -0,0 +1,42 @@
"""
T037 — tag normalisation: uppercase → lowercase, whitespace stripped
T038 — tag validation: rejects names > 64 chars, invalid chars
"""
import pytest
from app.repositories.tag_repo import TagRepository
@pytest.mark.parametrize("raw,expected", [
("Cat", "cat"),
(" funny ", "funny"),
("REACTION", "reaction"),
(" MiXeD ", "mixed"),
])
def test_normalise_lowercases_and_strips(raw, expected):
assert TagRepository.normalise(raw) == expected
def test_validate_accepts_valid_tags():
for name in ["cat", "funny-face", "my_tag", "tag123", "a" * 64]:
TagRepository.normalise_and_validate(name) # should not raise
def test_validate_rejects_too_long():
with pytest.raises(ValueError):
TagRepository.normalise_and_validate("a" * 65)
def test_validate_rejects_invalid_chars():
with pytest.raises(ValueError):
TagRepository.normalise_and_validate("bad tag!") # space + exclamation
def test_validate_rejects_empty():
with pytest.raises(ValueError):
TagRepository.normalise_and_validate("")
def test_validate_applies_normalisation_first():
# "CAT" normalises to "cat" which is valid
result = TagRepository.normalise_and_validate("CAT")
assert result == "cat"

View File

@@ -0,0 +1,34 @@
import pytest
from app.validation import validate_mime_type, validate_file_size, MimeTypeError, FileSizeError
ACCEPTED_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"]
REJECTED_TYPES = ["application/pdf", "video/mp4", "text/plain", "application/octet-stream"]
@pytest.mark.parametrize("mime_type", ACCEPTED_TYPES)
def test_mime_type_accepts_images(mime_type):
validate_mime_type(mime_type) # should not raise
@pytest.mark.parametrize("mime_type", REJECTED_TYPES)
def test_mime_type_rejects_non_images(mime_type):
with pytest.raises(MimeTypeError):
validate_mime_type(mime_type)
def test_file_size_accepts_within_limit():
validate_file_size(1024, max_bytes=52_428_800) # should not raise
def test_file_size_accepts_exact_limit():
validate_file_size(52_428_800, max_bytes=52_428_800) # should not raise
def test_file_size_rejects_over_limit():
with pytest.raises(FileSizeError):
validate_file_size(52_428_801, max_bytes=52_428_800)
def test_file_size_rejects_zero():
with pytest.raises(FileSizeError):
validate_file_size(0, max_bytes=52_428_800)