jamiya / scripts /migrate.py
jameszokah's picture
Enhance requirements and refactor migration script: add new dependencies for SQLAlchemy and Alembic, improve MongoDB migration logic with collection creation and indexing, and update logging for better traceability.
e2eee76
#!/usr/bin/env python3
"""
MongoDB database migration script.
"""
import os
import sys
import logging
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
from datetime import datetime
# Add the app directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Import app configuration after setting up path
from app.db import MONGO_URI, DB_NAME, AUDIOBOOKS_COLLECTION, VOICES_COLLECTION, AUDIO_CACHE_COLLECTION
async def run_migrations():
"""Run database migrations."""
try:
logger.info(f"Starting MongoDB migrations for database: {DB_NAME}")
# Connect to MongoDB
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
# Create collections if they don't exist
collections = await db.list_collection_names()
# Audiobooks collection
if AUDIOBOOKS_COLLECTION not in collections:
logger.info(f"Creating collection: {AUDIOBOOKS_COLLECTION}")
await db.create_collection(AUDIOBOOKS_COLLECTION)
# Create indexes
await db[AUDIOBOOKS_COLLECTION].create_index("id", unique=True)
await db[AUDIOBOOKS_COLLECTION].create_index("created_at")
await db[AUDIOBOOKS_COLLECTION].create_index("status")
# Voices collection
if VOICES_COLLECTION not in collections:
logger.info(f"Creating collection: {VOICES_COLLECTION}")
await db.create_collection(VOICES_COLLECTION)
# Create indexes
await db[VOICES_COLLECTION].create_index("id", unique=True)
await db[VOICES_COLLECTION].create_index("name")
await db[VOICES_COLLECTION].create_index("type")
# Audio cache collection
if AUDIO_CACHE_COLLECTION not in collections:
logger.info(f"Creating collection: {AUDIO_CACHE_COLLECTION}")
await db.create_collection(AUDIO_CACHE_COLLECTION)
# Create indexes
await db[AUDIO_CACHE_COLLECTION].create_index("hash", unique=True)
await db[AUDIO_CACHE_COLLECTION].create_index("created_at")
# Add any future migrations here
# Example:
# await migrate_v1_to_v2(db)
logger.info("Database migrations completed successfully")
except Exception as e:
logger.error(f"Error during migrations: {str(e)}")
raise
finally:
# Close the client connection
client.close()
async def migrate_v1_to_v2(db):
"""
Example migration function for future use.
Updates documents from v1 to v2 schema.
"""
try:
# Example: Add a new field to all documents
result = await db[AUDIOBOOKS_COLLECTION].update_many(
{"version": {"$exists": False}},
{
"$set": {
"version": "2.0",
"updated_at": datetime.utcnow()
}
}
)
logger.info(f"Migrated {result.modified_count} documents to v2")
except Exception as e:
logger.error(f"Error in v1 to v2 migration: {str(e)}")
raise
def main():
"""Main entry point for migrations."""
try:
# Run migrations
asyncio.run(run_migrations())
logger.info("Migrations completed successfully")
sys.exit(0)
except Exception as e:
logger.error(f"Migration failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()