Spaces:
Running
Running
# 1. modules/database/database_init.py | |
import os | |
import logging | |
from azure.cosmos import CosmosClient | |
from pymongo import MongoClient | |
import certifi | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
# Variables globales para Cosmos DB SQL API | |
cosmos_client = None | |
user_database = None | |
user_container = None | |
application_requests_container = None | |
user_feedback_container = None | |
user_sessions_container = None | |
# Variables globales para Cosmos DB MongoDB API | |
mongo_client = None | |
mongo_db = None | |
def verify_container_partition_key(container, expected_path): | |
"""Verifica la configuraci贸n de partition key de un contenedor""" | |
try: | |
container_props = container.read() | |
partition_key_paths = container_props['partitionKey']['paths'] | |
logger.info(f"Container: {container.id}, Partition Key Paths: {partition_key_paths}") | |
return expected_path in partition_key_paths | |
except Exception as e: | |
logger.error(f"Error verificando partition key en {container.id}: {str(e)}") | |
return False | |
def get_container(container_name): | |
"""Obtiene un contenedor espec铆fico""" | |
containers = { | |
"users": (user_container, "/id"), | |
"application_requests": (application_requests_container, "/email"), | |
"user_feedback": (user_feedback_container, "/username"), | |
"users_sessions": (user_sessions_container, "/username") | |
} | |
if container_name not in containers: | |
logger.error(f"Contenedor no v谩lido: {container_name}") | |
return None | |
container, partition_path = containers[container_name] | |
if container is None: | |
initialize_cosmos_sql_connection() | |
container, _ = containers[container_name] | |
return container | |
def initialize_cosmos_sql_connection(): | |
"""Inicializa y verifica la conexi贸n a Cosmos DB SQL API""" | |
global cosmos_client, user_database, user_container, \ | |
application_requests_container, user_feedback_container, \ | |
user_sessions_container | |
try: | |
cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT") | |
cosmos_key = os.environ.get("COSMOS_KEY") | |
if not cosmos_endpoint or not cosmos_key: | |
raise ValueError("COSMOS_ENDPOINT and COSMOS_KEY environment variables must be set") | |
cosmos_client = CosmosClient(cosmos_endpoint, cosmos_key) | |
user_database = cosmos_client.get_database_client("user_database") | |
# Configuraci贸n de contenedores | |
containers_config = { | |
"users": ("/id", user_container), | |
"application_requests": ("/email", application_requests_container), | |
"user_feedback": ("/username", user_feedback_container), | |
"users_sessions": ("/username", user_sessions_container) | |
} | |
# Inicializar contenedores | |
for container_id, (expected_path, container_ref) in containers_config.items(): | |
container = user_database.get_container_client(container_id) | |
if not verify_container_partition_key(container, expected_path): | |
logger.error(f"Verificaci贸n de partition key fall贸 para {container_id}") | |
return False | |
# Actualizar referencia global | |
globals()[f"{container_id}_container"] = container | |
logger.info("Conexi贸n a Cosmos DB SQL API exitosa") | |
return True | |
except Exception as e: | |
logger.error(f"Error al conectar con Cosmos DB SQL API: {str(e)}") | |
return False | |
def initialize_mongodb_connection(): | |
"""Inicializa la conexi贸n a Cosmos DB MongoDB API""" | |
global mongo_client, mongo_db | |
try: | |
cosmos_mongodb_connection_string = os.getenv("MONGODB_CONNECTION_STRING") | |
if not cosmos_mongodb_connection_string: | |
raise ValueError("MONGODB_CONNECTION_STRING environment variable is not set") | |
mongo_client = MongoClient( | |
cosmos_mongodb_connection_string, | |
tls=True, | |
tlsCAFile=certifi.where(), | |
retryWrites=False, | |
serverSelectionTimeoutMS=5000, | |
connectTimeoutMS=10000, | |
socketTimeoutMS=10000 | |
) | |
mongo_client.admin.command('ping') | |
mongo_db = mongo_client['aideatext_db'] | |
logger.info("Conexi贸n a Cosmos DB MongoDB API exitosa") | |
return True | |
except Exception as e: | |
logger.error(f"Error al conectar con Cosmos DB MongoDB API: {str(e)}") | |
return False | |
def initialize_database_connections(): | |
"""Inicializa todas las conexiones a bases de datos""" | |
return initialize_cosmos_sql_connection() and initialize_mongodb_connection() | |
def get_mongodb(): | |
"""Obtiene la conexi贸n MongoDB""" | |
if mongo_db is None: | |
initialize_mongodb_connection() | |
return mongo_db |