Short IDs become the canonical identifier in URLs (/i/:short_id), MinIO/R2 storage keys, and all API responses. Hash-based deduplication is preserved. Includes two-phase Alembic migration (003 adds nullable column, 004 enforces NOT NULL) with a backfill script to copy storage objects and populate short_id for existing images. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
108 lines
3.3 KiB
Python
108 lines
3.3 KiB
Python
"""
|
|
Migrate existing images to use short_id-based storage keys.
|
|
|
|
Run after applying Alembic migration 003 (adds short_id column).
|
|
Run before applying migration 004 (sets short_id NOT NULL).
|
|
|
|
Usage:
|
|
python -m scripts.migrate_to_short_ids
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.database import get_session_factory
|
|
from app.models import Image
|
|
from app.storage.s3_backend import S3StorageBackend
|
|
from app.utils import generate_short_id
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def migrate_image(image: Any, storage: Any, session: Any) -> bool:
|
|
"""Migrate one image to a short_id-based key. Returns True if migrated, False if skipped."""
|
|
if image.short_id is not None:
|
|
return False
|
|
|
|
new_short_id = generate_short_id()
|
|
old_key = image.storage_key
|
|
old_thumb_key = image.thumbnail_key
|
|
|
|
try:
|
|
data = await storage.get(old_key)
|
|
await storage.put(new_short_id, data, image.mime_type)
|
|
# Verify copy succeeded
|
|
await storage.get(new_short_id)
|
|
except Exception as exc:
|
|
logger.error("Failed to copy storage object for image %s: %s", image.id, exc)
|
|
return False
|
|
|
|
new_thumb_key: str | None = None
|
|
if old_thumb_key:
|
|
try:
|
|
thumb_data = await storage.get(old_thumb_key)
|
|
new_thumb_key = f"{new_short_id}-thumb"
|
|
await storage.put(new_thumb_key, thumb_data, "image/webp")
|
|
await storage.get(new_thumb_key)
|
|
except Exception as exc:
|
|
logger.warning("Failed to copy thumbnail for image %s: %s", image.id, exc)
|
|
new_thumb_key = None
|
|
|
|
try:
|
|
image.short_id = new_short_id
|
|
image.storage_key = new_short_id
|
|
image.thumbnail_key = new_thumb_key
|
|
await session.flush()
|
|
|
|
await storage.delete(old_key)
|
|
if old_thumb_key and new_thumb_key:
|
|
await storage.delete(old_thumb_key)
|
|
except Exception as exc:
|
|
logger.error("Failed to update DB record for image %s: %s", image.id, exc)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def run_migration(images: list, storage: Any, session: Any) -> tuple[int, int, int]:
|
|
"""Process a list of images. Returns (migrated, skipped, failed) counts."""
|
|
migrated = skipped = failed = 0
|
|
for image in images:
|
|
if image.short_id is not None:
|
|
skipped += 1
|
|
continue
|
|
try:
|
|
success = await migrate_image(image, storage, session)
|
|
if success:
|
|
migrated += 1
|
|
else:
|
|
failed += 1
|
|
except Exception as exc:
|
|
logger.error("Unexpected error migrating image %s: %s", image.id, exc)
|
|
failed += 1
|
|
|
|
return migrated, skipped, failed
|
|
|
|
|
|
async def main() -> None:
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
storage = S3StorageBackend()
|
|
|
|
async with get_session_factory()() as session:
|
|
result = await session.execute(select(Image).where(Image.short_id.is_(None)))
|
|
images = list(result.scalars().all())
|
|
logger.info("Found %d images to migrate", len(images))
|
|
|
|
migrated, skipped, failed = await run_migration(images, storage, session)
|
|
await session.commit()
|
|
|
|
print(f"Migrated: {migrated}, Skipped: {skipped}, Failed: {failed}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|