""" Migrate existing images to use short_id-based storage keys. Run after applying Alembic migration 003 (adds short_id column). Run before applying migration 004 (sets short_id NOT NULL). Usage: python -m scripts.migrate_to_short_ids """ import asyncio import logging from typing import Any from sqlalchemy import select from app.database import get_session_factory from app.models import Image from app.storage.s3_backend import S3StorageBackend from app.utils import generate_short_id logger = logging.getLogger(__name__) async def migrate_image(image: Any, storage: Any, session: Any) -> bool: """Migrate one image to a short_id-based key. Returns True if migrated, False if skipped.""" if image.short_id is not None: return False new_short_id = generate_short_id() old_key = image.storage_key old_thumb_key = image.thumbnail_key try: data = await storage.get(old_key) await storage.put(new_short_id, data, image.mime_type) # Verify copy succeeded await storage.get(new_short_id) except Exception as exc: logger.error("Failed to copy storage object for image %s: %s", image.id, exc) return False new_thumb_key: str | None = None if old_thumb_key: try: thumb_data = await storage.get(old_thumb_key) new_thumb_key = f"{new_short_id}-thumb" await storage.put(new_thumb_key, thumb_data, "image/webp") await storage.get(new_thumb_key) except Exception as exc: logger.warning("Failed to copy thumbnail for image %s: %s", image.id, exc) new_thumb_key = None try: image.short_id = new_short_id image.storage_key = new_short_id image.thumbnail_key = new_thumb_key await session.flush() await storage.delete(old_key) if old_thumb_key and new_thumb_key: await storage.delete(old_thumb_key) except Exception as exc: logger.error("Failed to update DB record for image %s: %s", image.id, exc) return False return True async def run_migration(images: list, storage: Any, session: Any) -> tuple[int, int, int]: """Process a list of images. Returns (migrated, skipped, failed) counts.""" migrated = skipped = failed = 0 for image in images: if image.short_id is not None: skipped += 1 continue try: success = await migrate_image(image, storage, session) if success: migrated += 1 else: failed += 1 except Exception as exc: logger.error("Unexpected error migrating image %s: %s", image.id, exc) failed += 1 return migrated, skipped, failed async def main() -> None: logging.basicConfig(level=logging.INFO) storage = S3StorageBackend() async with get_session_factory()() as session: result = await session.execute(select(Image).where(Image.short_id.is_(None))) images = list(result.scalars().all()) logger.info("Found %d images to migrate", len(images)) migrated, skipped, failed = await run_migration(images, storage, session) await session.commit() print(f"Migrated: {migrated}, Skipped: {skipped}, Failed: {failed}") if __name__ == "__main__": asyncio.run(main())