Spaces:
Running
Running
import sqlite3 | |
import os | |
import logging | |
from datetime import datetime | |
import sys | |
# Configure logging for stdout only | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
stream=sys.stdout | |
) | |
logger = logging.getLogger(__name__) | |
class DatabaseHandler: | |
def __init__(self, db_path='data/sqlite.db'): | |
self.db_path = db_path | |
self.conn = None | |
os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
self.create_tables() | |
self.update_schema() | |
self.migrate_database() | |
def create_tables(self): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
# First, drop the existing user_feedback table if it exists | |
cursor.execute('DROP TABLE IF EXISTS user_feedback') | |
# Recreate the user_feedback table with the correct schema | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS user_feedback ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
query TEXT, | |
response TEXT, | |
feedback INTEGER CHECK (feedback IN (-1, 1)), | |
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
chat_id INTEGER, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id), | |
FOREIGN KEY (chat_id) REFERENCES chat_history (id) | |
) | |
''') | |
# Videos table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS videos ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
youtube_id TEXT UNIQUE, | |
title TEXT, | |
channel_name TEXT, | |
processed_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
upload_date TEXT, | |
view_count INTEGER, | |
like_count INTEGER, | |
comment_count INTEGER, | |
video_duration TEXT, | |
transcript_content TEXT | |
) | |
''') | |
# Chat History table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS chat_history ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
user_message TEXT, | |
assistant_message TEXT, | |
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id) | |
) | |
''') | |
# User Feedback table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS user_feedback ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
chat_id INTEGER, | |
query TEXT, | |
response TEXT, | |
feedback INTEGER CHECK (feedback IN (-1, 1)), | |
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id), | |
FOREIGN KEY (chat_id) REFERENCES chat_history (id) | |
) | |
''') | |
# Embedding Models table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS embedding_models ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
model_name TEXT UNIQUE, | |
description TEXT | |
) | |
''') | |
# Elasticsearch Indices table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS elasticsearch_indices ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id INTEGER, | |
index_name TEXT, | |
embedding_model_id INTEGER, | |
FOREIGN KEY (video_id) REFERENCES videos (id), | |
FOREIGN KEY (embedding_model_id) REFERENCES embedding_models (id) | |
) | |
''') | |
# Ground Truth table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS ground_truth ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
question TEXT, | |
generation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
UNIQUE(video_id, question), | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id) | |
) | |
''') | |
# Search Performance table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS search_performance ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
hit_rate REAL, | |
mrr REAL, | |
evaluation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id) | |
) | |
''') | |
# Search Parameters table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS search_parameters ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
parameter_name TEXT, | |
parameter_value REAL, | |
score REAL, | |
evaluation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id) | |
) | |
''') | |
# RAG Evaluations table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS rag_evaluations ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
question TEXT, | |
answer TEXT, | |
relevance TEXT, | |
explanation TEXT, | |
evaluation_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id) | |
) | |
''') | |
conn.commit() | |
def update_schema(self): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
# Check and update videos table | |
cursor.execute("PRAGMA table_info(videos)") | |
columns = [column[1] for column in cursor.fetchall()] | |
new_columns = [ | |
("upload_date", "TEXT"), | |
("view_count", "INTEGER"), | |
("like_count", "INTEGER"), | |
("comment_count", "INTEGER"), | |
("video_duration", "TEXT"), | |
("transcript_content", "TEXT") | |
] | |
for col_name, col_type in new_columns: | |
if col_name not in columns: | |
cursor.execute(f"ALTER TABLE videos ADD COLUMN {col_name} {col_type}") | |
conn.commit() | |
# Video Management Methods | |
def add_video(self, video_data): | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT OR REPLACE INTO videos | |
(youtube_id, title, channel_name, upload_date, view_count, like_count, | |
comment_count, video_duration, transcript_content) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
video_data['video_id'], | |
video_data['title'], | |
video_data['author'], | |
video_data['upload_date'], | |
video_data['view_count'], | |
video_data['like_count'], | |
video_data['comment_count'], | |
video_data['video_duration'], | |
video_data['transcript_content'] | |
)) | |
conn.commit() | |
return cursor.lastrowid | |
except Exception as e: | |
logger.error(f"Error adding video: {str(e)}") | |
raise | |
def get_video_by_youtube_id(self, youtube_id): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute('SELECT * FROM videos WHERE youtube_id = ?', (youtube_id,)) | |
return cursor.fetchone() | |
def get_all_videos(self): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT youtube_id, title, channel_name, upload_date | |
FROM videos | |
ORDER BY upload_date DESC | |
''') | |
return cursor.fetchall() | |
# Chat and Feedback Methods | |
def add_chat_message(self, video_id, user_message, assistant_message): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO chat_history (video_id, user_message, assistant_message) | |
VALUES (?, ?, ?) | |
''', (video_id, user_message, assistant_message)) | |
conn.commit() | |
return cursor.lastrowid | |
def get_chat_history(self, video_id): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT id, user_message, assistant_message, timestamp | |
FROM chat_history | |
WHERE video_id = ? | |
ORDER BY timestamp ASC | |
''', (video_id,)) | |
return cursor.fetchall() | |
def add_user_feedback(self, video_id, chat_id, query, response, feedback): | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
# First verify the video exists | |
cursor.execute('SELECT id FROM videos WHERE youtube_id = ?', (video_id,)) | |
if not cursor.fetchone(): | |
logger.error(f"Video {video_id} not found in database") | |
raise ValueError(f"Video {video_id} not found") | |
# Then verify the chat message exists if chat_id is provided | |
if chat_id: | |
cursor.execute('SELECT id FROM chat_history WHERE id = ?', (chat_id,)) | |
if not cursor.fetchone(): | |
logger.error(f"Chat message {chat_id} not found in database") | |
raise ValueError(f"Chat message {chat_id} not found") | |
# Insert the feedback | |
cursor.execute(''' | |
INSERT INTO user_feedback | |
(video_id, chat_id, query, response, feedback) | |
VALUES (?, ?, ?, ?, ?) | |
''', (video_id, chat_id, query, response, feedback)) | |
conn.commit() | |
logger.info(f"Added feedback for video {video_id}, chat {chat_id}") | |
return cursor.lastrowid | |
except sqlite3.Error as e: | |
logger.error(f"Database error: {str(e)}") | |
raise | |
except Exception as e: | |
logger.error(f"Error adding feedback: {str(e)}") | |
raise | |
def get_user_feedback_stats(self, video_id): | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT | |
COUNT(CASE WHEN feedback = 1 THEN 1 END) as positive_feedback, | |
COUNT(CASE WHEN feedback = -1 THEN 1 END) as negative_feedback | |
FROM user_feedback | |
WHERE video_id = ? | |
''', (video_id,)) | |
return cursor.fetchone() or (0, 0) # Return (0, 0) if no feedback exists | |
except sqlite3.Error as e: | |
logger.error(f"Database error getting feedback stats: {str(e)}") | |
return (0, 0) | |
# Embedding and Index Methods | |
def add_embedding_model(self, model_name, description): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT OR IGNORE INTO embedding_models (model_name, description) | |
VALUES (?, ?) | |
''', (model_name, description)) | |
conn.commit() | |
return cursor.lastrowid | |
def add_elasticsearch_index(self, video_id, index_name, embedding_model_id): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO elasticsearch_indices (video_id, index_name, embedding_model_id) | |
VALUES (?, ?, ?) | |
''', (video_id, index_name, embedding_model_id)) | |
conn.commit() | |
def get_elasticsearch_index(self, video_id, embedding_model): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT ei.index_name | |
FROM elasticsearch_indices ei | |
JOIN embedding_models em ON ei.embedding_model_id = em.id | |
JOIN videos v ON ei.video_id = v.id | |
WHERE v.youtube_id = ? AND em.model_name = ? | |
''', (video_id, embedding_model)) | |
result = cursor.fetchone() | |
return result[0] if result else None | |
def get_elasticsearch_index_by_youtube_id(self, youtube_id): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT ei.index_name | |
FROM elasticsearch_indices ei | |
JOIN videos v ON ei.video_id = v.id | |
WHERE v.youtube_id = ? | |
''', (youtube_id,)) | |
result = cursor.fetchone() | |
return result[0] if result else None | |
# Ground Truth Methods | |
def add_ground_truth_questions(self, video_id, questions): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
for question in questions: | |
try: | |
cursor.execute(''' | |
INSERT OR IGNORE INTO ground_truth (video_id, question) | |
VALUES (?, ?) | |
''', (video_id, question)) | |
except sqlite3.IntegrityError: | |
continue # Skip duplicate questions | |
conn.commit() | |
def get_ground_truth_by_video(self, video_id): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT gt.*, v.channel_name | |
FROM ground_truth gt | |
JOIN videos v ON gt.video_id = v.youtube_id | |
WHERE gt.video_id = ? | |
ORDER BY gt.generation_date DESC | |
''', (video_id,)) | |
return cursor.fetchall() | |
def get_ground_truth_by_channel(self, channel_name): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT gt.*, v.channel_name | |
FROM ground_truth gt | |
JOIN videos v ON gt.video_id = v.youtube_id | |
WHERE v.channel_name = ? | |
ORDER BY gt.generation_date DESC | |
''', (channel_name,)) | |
return cursor.fetchall() | |
def get_all_ground_truth(self): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT gt.*, v.channel_name | |
FROM ground_truth gt | |
JOIN videos v ON gt.video_id = v.youtube_id | |
ORDER BY gt.generation_date DESC | |
''') | |
return cursor.fetchall() | |
# Evaluation Methods | |
def save_search_performance(self, video_id, hit_rate, mrr): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO search_performance (video_id, hit_rate, mrr) | |
VALUES (?, ?, ?) | |
''', (video_id, hit_rate, mrr)) | |
conn.commit() | |
def save_search_parameters(self, video_id, parameters, score): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
for param_name, param_value in parameters.items(): | |
cursor.execute(''' | |
INSERT INTO search_parameters (video_id, parameter_name, parameter_value, score) | |
VALUES (?, ?, ?, ?) | |
''', (video_id, param_name, param_value, score)) | |
conn.commit() | |
def save_rag_evaluation(self, evaluation_data): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO rag_evaluations | |
(video_id, question, answer, relevance, explanation) | |
VALUES (?, ?, ?, ?, ?) | |
''', ( | |
evaluation_data['video_id'], | |
evaluation_data['question'], | |
evaluation_data['answer'], | |
evaluation_data['relevance'], | |
evaluation_data['explanation'] | |
)) | |
conn.commit() | |
def get_latest_evaluation_results(self, video_id=None): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
if video_id: | |
cursor.execute(''' | |
SELECT * FROM rag_evaluations | |
WHERE video_id = ? | |
ORDER BY evaluation_date DESC | |
''', (video_id,)) | |
else: | |
cursor.execute(''' | |
SELECT * FROM rag_evaluations | |
ORDER BY evaluation_date DESC | |
''') | |
return cursor.fetchall() | |
def get_latest_search_performance(self, video_id=None): | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
if video_id: | |
cursor.execute(''' | |
SELECT * FROM search_performance | |
WHERE video_id = ? | |
ORDER BY evaluation_date DESC | |
LIMIT 1 | |
''', (video_id,)) | |
else: | |
cursor.execute(''' | |
SELECT * FROM search_performance | |
ORDER BY evaluation_date DESC | |
''') | |
return cursor.fetchall() | |
def migrate_database(self): | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.cursor() | |
# Check if chat_id column exists in user_feedback | |
cursor.execute("PRAGMA table_info(user_feedback)") | |
columns = [column[1] for column in cursor.fetchall()] | |
if 'chat_id' not in columns: | |
logger.info("Migrating user_feedback table") | |
# Create temporary table with new schema | |
cursor.execute(''' | |
CREATE TABLE user_feedback_new ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
video_id TEXT, | |
query TEXT, | |
response TEXT, | |
feedback INTEGER CHECK (feedback IN (-1, 1)), | |
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
chat_id INTEGER, | |
FOREIGN KEY (video_id) REFERENCES videos (youtube_id), | |
FOREIGN KEY (chat_id) REFERENCES chat_history (id) | |
) | |
''') | |
# Copy existing data | |
cursor.execute(''' | |
INSERT INTO user_feedback_new (video_id, query, response, feedback, timestamp) | |
SELECT video_id, query, response, feedback, timestamp | |
FROM user_feedback | |
''') | |
# Drop old table and rename new one | |
cursor.execute('DROP TABLE user_feedback') | |
cursor.execute('ALTER TABLE user_feedback_new RENAME TO user_feedback') | |
logger.info("Migration completed successfully") | |
conn.commit() | |
except Exception as e: | |
logger.error(f"Error during migration: {str(e)}") | |
raise |