#!/usr/bin/env python3 """ MongoDB database migration script. """ import os import sys import logging from motor.motor_asyncio import AsyncIOMotorClient import asyncio from datetime import datetime # Add the app directory to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Import app configuration after setting up path from app.db import MONGO_URI, DB_NAME, AUDIOBOOKS_COLLECTION, VOICES_COLLECTION, AUDIO_CACHE_COLLECTION async def run_migrations(): """Run database migrations.""" try: logger.info(f"Starting MongoDB migrations for database: {DB_NAME}") # Connect to MongoDB client = AsyncIOMotorClient(MONGO_URI) db = client[DB_NAME] # Create collections if they don't exist collections = await db.list_collection_names() # Audiobooks collection if AUDIOBOOKS_COLLECTION not in collections: logger.info(f"Creating collection: {AUDIOBOOKS_COLLECTION}") await db.create_collection(AUDIOBOOKS_COLLECTION) # Create indexes await db[AUDIOBOOKS_COLLECTION].create_index("id", unique=True) await db[AUDIOBOOKS_COLLECTION].create_index("created_at") await db[AUDIOBOOKS_COLLECTION].create_index("status") # Voices collection if VOICES_COLLECTION not in collections: logger.info(f"Creating collection: {VOICES_COLLECTION}") await db.create_collection(VOICES_COLLECTION) # Create indexes await db[VOICES_COLLECTION].create_index("id", unique=True) await db[VOICES_COLLECTION].create_index("name") await db[VOICES_COLLECTION].create_index("type") # Audio cache collection if AUDIO_CACHE_COLLECTION not in collections: logger.info(f"Creating collection: {AUDIO_CACHE_COLLECTION}") await db.create_collection(AUDIO_CACHE_COLLECTION) # Create indexes await db[AUDIO_CACHE_COLLECTION].create_index("hash", unique=True) await db[AUDIO_CACHE_COLLECTION].create_index("created_at") # Add any future migrations here # Example: # await migrate_v1_to_v2(db) logger.info("Database migrations completed successfully") except Exception as e: logger.error(f"Error during migrations: {str(e)}") raise finally: # Close the client connection client.close() async def migrate_v1_to_v2(db): """ Example migration function for future use. Updates documents from v1 to v2 schema. """ try: # Example: Add a new field to all documents result = await db[AUDIOBOOKS_COLLECTION].update_many( {"version": {"$exists": False}}, { "$set": { "version": "2.0", "updated_at": datetime.utcnow() } } ) logger.info(f"Migrated {result.modified_count} documents to v2") except Exception as e: logger.error(f"Error in v1 to v2 migration: {str(e)}") raise def main(): """Main entry point for migrations.""" try: # Run migrations asyncio.run(run_migrations()) logger.info("Migrations completed successfully") sys.exit(0) except Exception as e: logger.error(f"Migration failed: {str(e)}") sys.exit(1) if __name__ == "__main__": main()