import sqlite3 import os import logging from datetime import datetime import sys # Configure logging for stdout only logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout ) logger = logging.getLogger(__name__) class DatabaseHandler: def __init__(self, db_path='data/sqlite.db'): self.db_path = db_path self.conn = None os.makedirs(os.path.dirname(db_path), exist_ok=True) self.create_tables() self.update_schema() self.migrate_database() def create_tables(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # First, drop the existing user_feedback table if it exists cursor.execute('DROP TABLE IF EXISTS user_feedback') # Recreate the user_feedback table with the correct schema cursor.execute(''' CREATE TABLE IF NOT EXISTS user_feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, query TEXT, response TEXT, feedback INTEGER CHECK (feedback IN (-1, 1)), timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, chat_id INTEGER, FOREIGN KEY (video_id) REFERENCES videos (youtube_id), FOREIGN KEY (chat_id) REFERENCES chat_history (id) ) ''') # Videos table cursor.execute(''' CREATE TABLE IF NOT EXISTS videos ( id INTEGER PRIMARY KEY AUTOINCREMENT, youtube_id TEXT UNIQUE, title TEXT, channel_name TEXT, processed_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, upload_date TEXT, view_count INTEGER, like_count INTEGER, comment_count INTEGER, video_duration TEXT, transcript_content TEXT ) ''') # Chat History table cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, user_message TEXT, assistant_message TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos (youtube_id) ) ''') # User Feedback table cursor.execute(''' CREATE TABLE IF NOT EXISTS user_feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, chat_id INTEGER, query TEXT, response TEXT, feedback INTEGER CHECK (feedback IN (-1, 1)), timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos (youtube_id), FOREIGN KEY (chat_id) REFERENCES chat_history (id) ) ''') # Embedding Models table cursor.execute(''' CREATE TABLE IF NOT EXISTS embedding_models ( id INTEGER PRIMARY KEY AUTOINCREMENT, model_name TEXT UNIQUE, description TEXT ) ''') # Elasticsearch Indices table cursor.execute(''' CREATE TABLE IF NOT EXISTS elasticsearch_indices ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id INTEGER, index_name TEXT, embedding_model_id INTEGER, FOREIGN KEY (video_id) REFERENCES videos (id), FOREIGN KEY (embedding_model_id) REFERENCES embedding_models (id) ) ''') # Ground Truth table cursor.execute(''' CREATE TABLE IF NOT EXISTS ground_truth ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, question TEXT, generation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(video_id, question), FOREIGN KEY (video_id) REFERENCES videos (youtube_id) ) ''') # Search Performance table cursor.execute(''' CREATE TABLE IF NOT EXISTS search_performance ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, hit_rate REAL, mrr REAL, evaluation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos (youtube_id) ) ''') # Search Parameters table cursor.execute(''' CREATE TABLE IF NOT EXISTS search_parameters ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, parameter_name TEXT, parameter_value REAL, score REAL, evaluation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos (youtube_id) ) ''') # RAG Evaluations table cursor.execute(''' CREATE TABLE IF NOT EXISTS rag_evaluations ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, question TEXT, answer TEXT, relevance TEXT, explanation TEXT, evaluation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (video_id) REFERENCES videos (youtube_id) ) ''') conn.commit() def update_schema(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check and update videos table cursor.execute("PRAGMA table_info(videos)") columns = [column[1] for column in cursor.fetchall()] new_columns = [ ("upload_date", "TEXT"), ("view_count", "INTEGER"), ("like_count", "INTEGER"), ("comment_count", "INTEGER"), ("video_duration", "TEXT"), ("transcript_content", "TEXT") ] for col_name, col_type in new_columns: if col_name not in columns: cursor.execute(f"ALTER TABLE videos ADD COLUMN {col_name} {col_type}") conn.commit() # Video Management Methods def add_video(self, video_data): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO videos (youtube_id, title, channel_name, upload_date, view_count, like_count, comment_count, video_duration, transcript_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( video_data['video_id'], video_data['title'], video_data['author'], video_data['upload_date'], video_data['view_count'], video_data['like_count'], video_data['comment_count'], video_data['video_duration'], video_data['transcript_content'] )) conn.commit() return cursor.lastrowid except Exception as e: logger.error(f"Error adding video: {str(e)}") raise def get_video_by_youtube_id(self, youtube_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM videos WHERE youtube_id = ?', (youtube_id,)) return cursor.fetchone() def get_all_videos(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT youtube_id, title, channel_name, upload_date FROM videos ORDER BY upload_date DESC ''') return cursor.fetchall() # Chat and Feedback Methods def add_chat_message(self, video_id, user_message, assistant_message): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO chat_history (video_id, user_message, assistant_message) VALUES (?, ?, ?) ''', (video_id, user_message, assistant_message)) conn.commit() return cursor.lastrowid def get_chat_history(self, video_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT id, user_message, assistant_message, timestamp FROM chat_history WHERE video_id = ? ORDER BY timestamp ASC ''', (video_id,)) return cursor.fetchall() def add_user_feedback(self, video_id, chat_id, query, response, feedback): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # First verify the video exists cursor.execute('SELECT id FROM videos WHERE youtube_id = ?', (video_id,)) if not cursor.fetchone(): logger.error(f"Video {video_id} not found in database") raise ValueError(f"Video {video_id} not found") # Then verify the chat message exists if chat_id is provided if chat_id: cursor.execute('SELECT id FROM chat_history WHERE id = ?', (chat_id,)) if not cursor.fetchone(): logger.error(f"Chat message {chat_id} not found in database") raise ValueError(f"Chat message {chat_id} not found") # Insert the feedback cursor.execute(''' INSERT INTO user_feedback (video_id, chat_id, query, response, feedback) VALUES (?, ?, ?, ?, ?) ''', (video_id, chat_id, query, response, feedback)) conn.commit() logger.info(f"Added feedback for video {video_id}, chat {chat_id}") return cursor.lastrowid except sqlite3.Error as e: logger.error(f"Database error: {str(e)}") raise except Exception as e: logger.error(f"Error adding feedback: {str(e)}") raise def get_user_feedback_stats(self, video_id): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT COUNT(CASE WHEN feedback = 1 THEN 1 END) as positive_feedback, COUNT(CASE WHEN feedback = -1 THEN 1 END) as negative_feedback FROM user_feedback WHERE video_id = ? ''', (video_id,)) return cursor.fetchone() or (0, 0) # Return (0, 0) if no feedback exists except sqlite3.Error as e: logger.error(f"Database error getting feedback stats: {str(e)}") return (0, 0) # Embedding and Index Methods def add_embedding_model(self, model_name, description): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR IGNORE INTO embedding_models (model_name, description) VALUES (?, ?) ''', (model_name, description)) conn.commit() return cursor.lastrowid def add_elasticsearch_index(self, video_id, index_name, embedding_model_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO elasticsearch_indices (video_id, index_name, embedding_model_id) VALUES (?, ?, ?) ''', (video_id, index_name, embedding_model_id)) conn.commit() def get_elasticsearch_index(self, video_id, embedding_model): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT ei.index_name FROM elasticsearch_indices ei JOIN embedding_models em ON ei.embedding_model_id = em.id JOIN videos v ON ei.video_id = v.id WHERE v.youtube_id = ? AND em.model_name = ? ''', (video_id, embedding_model)) result = cursor.fetchone() return result[0] if result else None def get_elasticsearch_index_by_youtube_id(self, youtube_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT ei.index_name FROM elasticsearch_indices ei JOIN videos v ON ei.video_id = v.id WHERE v.youtube_id = ? ''', (youtube_id,)) result = cursor.fetchone() return result[0] if result else None # Ground Truth Methods def add_ground_truth_questions(self, video_id, questions): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() for question in questions: try: cursor.execute(''' INSERT OR IGNORE INTO ground_truth (video_id, question) VALUES (?, ?) ''', (video_id, question)) except sqlite3.IntegrityError: continue # Skip duplicate questions conn.commit() def get_ground_truth_by_video(self, video_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT gt.*, v.channel_name FROM ground_truth gt JOIN videos v ON gt.video_id = v.youtube_id WHERE gt.video_id = ? ORDER BY gt.generation_date DESC ''', (video_id,)) return cursor.fetchall() def get_ground_truth_by_channel(self, channel_name): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT gt.*, v.channel_name FROM ground_truth gt JOIN videos v ON gt.video_id = v.youtube_id WHERE v.channel_name = ? ORDER BY gt.generation_date DESC ''', (channel_name,)) return cursor.fetchall() def get_all_ground_truth(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT gt.*, v.channel_name FROM ground_truth gt JOIN videos v ON gt.video_id = v.youtube_id ORDER BY gt.generation_date DESC ''') return cursor.fetchall() # Evaluation Methods def save_search_performance(self, video_id, hit_rate, mrr): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO search_performance (video_id, hit_rate, mrr) VALUES (?, ?, ?) ''', (video_id, hit_rate, mrr)) conn.commit() def save_search_parameters(self, video_id, parameters, score): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() for param_name, param_value in parameters.items(): cursor.execute(''' INSERT INTO search_parameters (video_id, parameter_name, parameter_value, score) VALUES (?, ?, ?, ?) ''', (video_id, param_name, param_value, score)) conn.commit() def save_rag_evaluation(self, evaluation_data): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO rag_evaluations (video_id, question, answer, relevance, explanation) VALUES (?, ?, ?, ?, ?) ''', ( evaluation_data['video_id'], evaluation_data['question'], evaluation_data['answer'], evaluation_data['relevance'], evaluation_data['explanation'] )) conn.commit() def get_latest_evaluation_results(self, video_id=None): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if video_id: cursor.execute(''' SELECT * FROM rag_evaluations WHERE video_id = ? ORDER BY evaluation_date DESC ''', (video_id,)) else: cursor.execute(''' SELECT * FROM rag_evaluations ORDER BY evaluation_date DESC ''') return cursor.fetchall() def get_latest_search_performance(self, video_id=None): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if video_id: cursor.execute(''' SELECT * FROM search_performance WHERE video_id = ? ORDER BY evaluation_date DESC LIMIT 1 ''', (video_id,)) else: cursor.execute(''' SELECT * FROM search_performance ORDER BY evaluation_date DESC ''') return cursor.fetchall() def migrate_database(self): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if chat_id column exists in user_feedback cursor.execute("PRAGMA table_info(user_feedback)") columns = [column[1] for column in cursor.fetchall()] if 'chat_id' not in columns: logger.info("Migrating user_feedback table") # Create temporary table with new schema cursor.execute(''' CREATE TABLE user_feedback_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT, query TEXT, response TEXT, feedback INTEGER CHECK (feedback IN (-1, 1)), timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, chat_id INTEGER, FOREIGN KEY (video_id) REFERENCES videos (youtube_id), FOREIGN KEY (chat_id) REFERENCES chat_history (id) ) ''') # Copy existing data cursor.execute(''' INSERT INTO user_feedback_new (video_id, query, response, feedback, timestamp) SELECT video_id, query, response, feedback, timestamp FROM user_feedback ''') # Drop old table and rename new one cursor.execute('DROP TABLE user_feedback') cursor.execute('ALTER TABLE user_feedback_new RENAME TO user_feedback') logger.info("Migration completed successfully") conn.commit() except Exception as e: logger.error(f"Error during migration: {str(e)}") raise