Files
reactbin/api/app/repositories/image_repo.py
agatha f953c88984 Feat: Pre-generate WebP thumbnails on upload for faster library load
- Add Pillow dependency and thumbnail.py with generate_thumbnail() — produces
  WebP ≤400px, preserves aspect ratio, never upscales, handles GIF frame 0
- Alembic migration 002 adds nullable thumbnail_key column to images table
- Upload route generates thumbnail via asyncio.to_thread (non-blocking),
  stores at {hash}-thumb; failure is tolerated and upload succeeds with null key
- New GET /api/v1/images/{id}/thumbnail endpoint: serves WebP thumbnail or
  falls back to original for pre-feature images; ETag + immutable cache headers
- Delete route cleans up thumbnail storage object alongside original
- Library grid switches from /file to /thumbnail for all image src bindings
- 59 tests passing (46 existing + 13 new across unit, upload, serving, delete)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-03 17:26:16 +00:00

96 lines
3.1 KiB
Python

import uuid
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import Image, ImageTag, Tag
class ImageRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_hash(self, hash_hex: str) -> Optional[Image]:
result = await self._session.execute(
select(Image).where(Image.hash == hash_hex).options(selectinload(Image.image_tags).selectinload(ImageTag.tag))
)
return result.scalar_one_or_none()
async def get_by_id(self, image_id: uuid.UUID) -> Optional[Image]:
result = await self._session.execute(
select(Image).where(Image.id == image_id).options(selectinload(Image.image_tags).selectinload(ImageTag.tag))
)
return result.scalar_one_or_none()
async def create(
self,
*,
hash_hex: str,
filename: str,
mime_type: str,
size_bytes: int,
width: int,
height: int,
storage_key: str,
thumbnail_key: str | None = None,
) -> Image:
image = Image(
hash=hash_hex,
filename=filename,
mime_type=mime_type,
size_bytes=size_bytes,
width=width,
height=height,
storage_key=storage_key,
thumbnail_key=thumbnail_key,
)
self._session.add(image)
await self._session.flush()
await self._session.refresh(image, ["image_tags"])
return image
async def list_images(
self,
tag_names: list[str] | None = None,
limit: int = 50,
offset: int = 0,
) -> tuple[list[Image], int]:
from sqlalchemy import func, and_
base_query = select(Image).options(
selectinload(Image.image_tags).selectinload(ImageTag.tag)
)
if tag_names:
for tag_name in tag_names:
subq = (
select(ImageTag.image_id)
.join(Tag, ImageTag.tag_id == Tag.id)
.where(Tag.name == tag_name)
.scalar_subquery()
)
base_query = base_query.where(Image.id.in_(subq))
count_query = select(func.count()).select_from(base_query.subquery())
total_result = await self._session.execute(count_query)
total = total_result.scalar_one()
paginated = base_query.order_by(Image.created_at.desc()).limit(limit).offset(offset)
result = await self._session.execute(paginated)
return result.scalars().all(), total
async def reload_with_tags(self, image_id: uuid.UUID) -> Image:
result = await self._session.execute(
select(Image)
.where(Image.id == image_id)
.options(selectinload(Image.image_tags).selectinload(ImageTag.tag))
.execution_options(populate_existing=True)
)
return result.scalar_one()
async def delete(self, image: Image) -> None:
await self._session.delete(image)
await self._session.flush()