import io import struct import uuid import zlib from typing import Annotated, Any from fastapi import APIRouter, Depends, File, Form, HTTPException, Response, UploadFile from fastapi.responses import RedirectResponse from sqlalchemy.ext.asyncio import AsyncSession from app.auth.provider import AuthProvider from app.config import get_settings from app.dependencies import get_auth, get_db, get_storage from app.models import Image from app.repositories.image_repo import ImageRepository from app.repositories.tag_repo import TagRepository from app.storage.backend import StorageBackend from app.utils import compute_sha256 from app.validation import FileSizeError, MimeTypeError, validate_file_size, validate_mime_type router = APIRouter(tags=["images"]) def _error(detail: str, code: str, status: int): raise HTTPException(status_code=status, detail={"detail": detail, "code": code}) def _image_to_dict(image: Image, *, duplicate: bool | None = None) -> dict[str, Any]: data: dict[str, Any] = { "id": str(image.id), "hash": image.hash, "filename": image.filename, "mime_type": image.mime_type, "size_bytes": image.size_bytes, "width": image.width, "height": image.height, "storage_key": image.storage_key, "created_at": image.created_at.isoformat(), "tags": image.tags, } if duplicate is not None: data["duplicate"] = duplicate return data def _read_image_dimensions(data: bytes, mime_type: str) -> tuple[int, int]: """Return (width, height) from raw image bytes. Falls back to (0, 0).""" try: if mime_type == "image/jpeg": return _jpeg_dimensions(data) elif mime_type == "image/png": return _png_dimensions(data) elif mime_type == "image/gif": return _gif_dimensions(data) elif mime_type == "image/webp": return _webp_dimensions(data) except Exception: pass return 0, 0 def _jpeg_dimensions(data: bytes) -> tuple[int, int]: i = 0 while i < len(data): if data[i] != 0xFF: break i += 1 marker = data[i] i += 1 if marker in (0xD8, 0xD9): continue length = struct.unpack(">H", data[i : i + 2])[0] if marker in (0xC0, 0xC1, 0xC2): h, w = struct.unpack(">HH", data[i + 3 : i + 7]) return w, h i += length return 0, 0 def _png_dimensions(data: bytes) -> tuple[int, int]: w, h = struct.unpack(">II", data[16:24]) return w, h def _gif_dimensions(data: bytes) -> tuple[int, int]: w, h = struct.unpack(" tuple[int, int]: if data[8:12] == b"VP8 ": w = struct.unpack("> 14) & 0x3FFF) + 1 return w, h return 0, 0 @router.post("/images", status_code=201) async def upload_image( file: UploadFile = File(...), tags: str | None = Form(None), db: AsyncSession = Depends(get_db), storage: StorageBackend = Depends(get_storage), auth: AuthProvider = Depends(get_auth), settings=Depends(get_settings), ): data = await file.read() mime_type = file.content_type or "application/octet-stream" try: validate_mime_type(mime_type) except MimeTypeError: raise HTTPException( status_code=422, detail={"detail": f"Unsupported file type: {mime_type}", "code": "invalid_mime_type"}, ) try: validate_file_size(len(data), max_bytes=settings.max_upload_bytes) except FileSizeError as exc: raise HTTPException( status_code=422, detail={"detail": str(exc), "code": "file_too_large"}, ) hash_hex = compute_sha256(data) image_repo = ImageRepository(db) existing = await image_repo.get_by_hash(hash_hex) if existing: return Response( content=__import__("json").dumps(_image_to_dict(existing, duplicate=True)), status_code=200, media_type="application/json", ) # Parse tag names tag_names: list[str] = [] if tags: tag_repo = TagRepository(db) raw = [t.strip() for t in tags.replace(",", " ").split() if t.strip()] try: tag_names = [tag_repo.normalise_and_validate(t) for t in raw] except ValueError as exc: raise HTTPException( status_code=422, detail={"detail": str(exc), "code": "invalid_tag"}, ) width, height = _read_image_dimensions(data, mime_type) await storage.put(hash_hex, data, mime_type) image = await image_repo.create( hash_hex=hash_hex, filename=file.filename or "upload", mime_type=mime_type, size_bytes=len(data), width=width, height=height, storage_key=hash_hex, ) if tag_names: tag_repo = TagRepository(db) await tag_repo.attach_tags(image, tag_names) image = await image_repo.reload_with_tags(image.id) return _image_to_dict(image, duplicate=False) @router.get("/images") async def list_images( tags: str | None = None, limit: int = 50, offset: int = 0, db: AsyncSession = Depends(get_db), ): limit = min(limit, 100) tag_names = [t.strip() for t in tags.split(",") if t.strip()] if tags else None image_repo = ImageRepository(db) images, total = await image_repo.list_images(tag_names=tag_names, limit=limit, offset=offset) return { "items": [_image_to_dict(img) for img in images], "total": total, "limit": limit, "offset": offset, } @router.get("/images/{image_id}") async def get_image( image_id: uuid.UUID, db: AsyncSession = Depends(get_db), ): image_repo = ImageRepository(db) image = await image_repo.get_by_id(image_id) if not image: raise HTTPException( status_code=404, detail={"detail": "Image not found", "code": "image_not_found"}, ) return _image_to_dict(image) @router.get("/images/{image_id}/file") async def serve_image_file( image_id: uuid.UUID, db: AsyncSession = Depends(get_db), storage: StorageBackend = Depends(get_storage), ): image_repo = ImageRepository(db) image = await image_repo.get_by_id(image_id) if not image: raise HTTPException( status_code=404, detail={"detail": "Image not found", "code": "image_not_found"}, ) url = await storage.get_presigned_url(image.storage_key, expires_in_seconds=3600) return RedirectResponse(url=url, status_code=302) @router.patch("/images/{image_id}/tags") async def update_image_tags( image_id: uuid.UUID, body: dict, db: AsyncSession = Depends(get_db), ): image_repo = ImageRepository(db) image = await image_repo.get_by_id(image_id) if not image: raise HTTPException( status_code=404, detail={"detail": "Image not found", "code": "image_not_found"}, ) raw_tags: list[str] = body.get("tags", []) tag_repo = TagRepository(db) try: tag_names = [tag_repo.normalise_and_validate(t) for t in raw_tags] except ValueError as exc: raise HTTPException( status_code=422, detail={"detail": str(exc), "code": "invalid_tag"}, ) await tag_repo.replace_tags_on_image(image, tag_names) image = await image_repo.reload_with_tags(image.id) return _image_to_dict(image) @router.delete("/images/{image_id}", status_code=204) async def delete_image( image_id: uuid.UUID, db: AsyncSession = Depends(get_db), storage: StorageBackend = Depends(get_storage), ): image_repo = ImageRepository(db) image = await image_repo.get_by_id(image_id) if not image: raise HTTPException( status_code=404, detail={"detail": "Image not found", "code": "image_not_found"}, ) storage_key = image.storage_key await image_repo.delete(image) await storage.delete(storage_key) return Response(status_code=204)