Spaces:
Running
Running
File size: 5,841 Bytes
185fa42 b33456a 185fa42 e1d9193 185fa42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import streamlit as st
# Must be the first Streamlit command
st.set_page_config(
page_title="01_Data_Ingestion", # Use this format for ordering
page_icon="📥",
layout="wide"
)
import pandas as pd
from transcript_extractor import get_transcript, extract_video_id, get_channel_videos
from database import DatabaseHandler
from data_processor import DataProcessor
from utils import process_single_video
import logging
import sys
# Configure logging for stdout only
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
logger = logging.getLogger(__name__)
@st.cache_resource
def init_components():
return DatabaseHandler(), DataProcessor()
def process_multiple_videos(db_handler, data_processor, video_ids, embedding_model):
progress_bar = st.progress(0)
processed = 0
total = len(video_ids)
for video_id in video_ids:
if process_single_video(db_handler, data_processor, video_id, embedding_model):
processed += 1
progress_bar.progress(processed / total)
st.success(f"Processed {processed} out of {total} videos")
def main():
st.title("Data Ingestion 📥")
db_handler, data_processor = init_components()
# Model selection
embedding_model = st.selectbox(
"Select embedding model:",
["multi-qa-MiniLM-L6-cos-v1", "all-mpnet-base-v2"]
)
# Display existing videos
st.header("Processed Videos")
videos = db_handler.get_all_videos()
if videos:
video_df = pd.DataFrame(videos, columns=['youtube_id', 'title', 'channel_name', 'upload_date'])
channels = sorted(video_df['channel_name'].unique())
selected_channel = st.selectbox("Filter by Channel", ["All"] + channels)
if selected_channel != "All":
video_df = video_df[video_df['channel_name'] == selected_channel]
st.dataframe(video_df)
else:
st.info("No videos processed yet. Use the form below to add videos.")
# Process new videos
st.header("Process New Video")
with st.form("process_video_form"):
input_type = st.radio("Select input type:", ["Video URL", "Channel URL", "YouTube ID"])
input_value = st.text_input("Enter the URL or ID:")
submit_button = st.form_submit_button("Process")
if submit_button:
data_processor.set_embedding_model(embedding_model)
with st.spinner("Processing..."):
if input_type == "Video URL":
video_id = extract_video_id(input_value)
if video_id:
process_single_video(db_handler, data_processor, video_id, embedding_model)
elif input_type == "Channel URL":
channel_videos = get_channel_videos(input_value)
if channel_videos:
video_ids = [video['video_id'] for video in channel_videos]
process_multiple_videos(db_handler, data_processor, video_ids, embedding_model)
else:
st.error("Failed to retrieve videos from the channel")
else: # YouTube ID
process_single_video(db_handler, data_processor, input_value, embedding_model)
def process_single_video(db_handler, data_processor, video_id, embedding_model):
try:
existing_index = db_handler.get_elasticsearch_index_by_youtube_id(video_id)
if existing_index:
st.info(f"Video {video_id} already processed. Using existing index.")
return existing_index
transcript_data = get_transcript(video_id)
if not transcript_data:
st.error("Failed to retrieve transcript.")
return None
# Process transcript and create indices
processed_data = data_processor.process_transcript(video_id, transcript_data)
if not processed_data:
st.error("Failed to process transcript.")
return None
# Save to database and create index
video_data = {
'video_id': video_id,
'title': transcript_data['metadata'].get('title', 'Unknown'),
'author': transcript_data['metadata'].get('author', 'Unknown'),
'upload_date': transcript_data['metadata'].get('upload_date', ''),
'view_count': transcript_data['metadata'].get('view_count', 0),
'like_count': transcript_data['metadata'].get('like_count', 0),
'comment_count': transcript_data['metadata'].get('comment_count', 0),
'video_duration': transcript_data['metadata'].get('duration', ''),
'transcript_content': processed_data['content']
}
db_handler.add_video(video_data)
index_name = f"video_{video_id}_{embedding_model}".lower()
index_name = data_processor.build_index(index_name)
if index_name:
st.success(f"Successfully processed video: {video_data['title']}")
return index_name
except Exception as e:
st.error(f"Error processing video: {str(e)}")
logger.error(f"Error processing video {video_id}: {str(e)}")
return None
def process_multiple_videos(db_handler, data_processor, video_ids, embedding_model):
progress_bar = st.progress(0)
processed = 0
total = len(video_ids)
for video_id in video_ids:
if process_single_video(db_handler, data_processor, video_id, embedding_model):
processed += 1
progress_bar.progress(processed / total)
st.success(f"Processed {processed} out of {total} videos")
if __name__ == "__main__":
main() |