Spaces:
Running
Running

Enhance requirements and refactor migration script: add new dependencies for SQLAlchemy and Alembic, improve MongoDB migration logic with collection creation and indexing, and update logging for better traceability.
e2eee76
#!/usr/bin/env python3 | |
""" | |
MongoDB database migration script. | |
""" | |
import os | |
import sys | |
import logging | |
from motor.motor_asyncio import AsyncIOMotorClient | |
import asyncio | |
from datetime import datetime | |
# Add the app directory to the Python path | |
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Import app configuration after setting up path | |
from app.db import MONGO_URI, DB_NAME, AUDIOBOOKS_COLLECTION, VOICES_COLLECTION, AUDIO_CACHE_COLLECTION | |
async def run_migrations(): | |
"""Run database migrations.""" | |
try: | |
logger.info(f"Starting MongoDB migrations for database: {DB_NAME}") | |
# Connect to MongoDB | |
client = AsyncIOMotorClient(MONGO_URI) | |
db = client[DB_NAME] | |
# Create collections if they don't exist | |
collections = await db.list_collection_names() | |
# Audiobooks collection | |
if AUDIOBOOKS_COLLECTION not in collections: | |
logger.info(f"Creating collection: {AUDIOBOOKS_COLLECTION}") | |
await db.create_collection(AUDIOBOOKS_COLLECTION) | |
# Create indexes | |
await db[AUDIOBOOKS_COLLECTION].create_index("id", unique=True) | |
await db[AUDIOBOOKS_COLLECTION].create_index("created_at") | |
await db[AUDIOBOOKS_COLLECTION].create_index("status") | |
# Voices collection | |
if VOICES_COLLECTION not in collections: | |
logger.info(f"Creating collection: {VOICES_COLLECTION}") | |
await db.create_collection(VOICES_COLLECTION) | |
# Create indexes | |
await db[VOICES_COLLECTION].create_index("id", unique=True) | |
await db[VOICES_COLLECTION].create_index("name") | |
await db[VOICES_COLLECTION].create_index("type") | |
# Audio cache collection | |
if AUDIO_CACHE_COLLECTION not in collections: | |
logger.info(f"Creating collection: {AUDIO_CACHE_COLLECTION}") | |
await db.create_collection(AUDIO_CACHE_COLLECTION) | |
# Create indexes | |
await db[AUDIO_CACHE_COLLECTION].create_index("hash", unique=True) | |
await db[AUDIO_CACHE_COLLECTION].create_index("created_at") | |
# Add any future migrations here | |
# Example: | |
# await migrate_v1_to_v2(db) | |
logger.info("Database migrations completed successfully") | |
except Exception as e: | |
logger.error(f"Error during migrations: {str(e)}") | |
raise | |
finally: | |
# Close the client connection | |
client.close() | |
async def migrate_v1_to_v2(db): | |
""" | |
Example migration function for future use. | |
Updates documents from v1 to v2 schema. | |
""" | |
try: | |
# Example: Add a new field to all documents | |
result = await db[AUDIOBOOKS_COLLECTION].update_many( | |
{"version": {"$exists": False}}, | |
{ | |
"$set": { | |
"version": "2.0", | |
"updated_at": datetime.utcnow() | |
} | |
} | |
) | |
logger.info(f"Migrated {result.modified_count} documents to v2") | |
except Exception as e: | |
logger.error(f"Error in v1 to v2 migration: {str(e)}") | |
raise | |
def main(): | |
"""Main entry point for migrations.""" | |
try: | |
# Run migrations | |
asyncio.run(run_migrations()) | |
logger.info("Migrations completed successfully") | |
sys.exit(0) | |
except Exception as e: | |
logger.error(f"Migration failed: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |