File size: 5,841 Bytes
185fa42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b33456a
185fa42
e1d9193
 
 
 
 
 
185fa42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import streamlit as st

# Must be the first Streamlit command
st.set_page_config(
    page_title="01_Data_Ingestion",  # Use this format for ordering
    page_icon="📥",
    layout="wide"
)

import pandas as pd
from transcript_extractor import get_transcript, extract_video_id, get_channel_videos
from database import DatabaseHandler
from data_processor import DataProcessor
from utils import process_single_video
import logging
import sys

# Configure logging for stdout only
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    stream=sys.stdout
)
logger = logging.getLogger(__name__)

@st.cache_resource
def init_components():
    return DatabaseHandler(), DataProcessor()

def process_multiple_videos(db_handler, data_processor, video_ids, embedding_model):
    progress_bar = st.progress(0)
    processed = 0
    total = len(video_ids)
    
    for video_id in video_ids:
        if process_single_video(db_handler, data_processor, video_id, embedding_model):
            processed += 1
        progress_bar.progress(processed / total)
    
    st.success(f"Processed {processed} out of {total} videos")

def main():
    st.title("Data Ingestion 📥")
    
    db_handler, data_processor = init_components()
    
    # Model selection
    embedding_model = st.selectbox(
        "Select embedding model:",
        ["multi-qa-MiniLM-L6-cos-v1", "all-mpnet-base-v2"]
    )
    
    # Display existing videos
    st.header("Processed Videos")
    videos = db_handler.get_all_videos()
    if videos:
        video_df = pd.DataFrame(videos, columns=['youtube_id', 'title', 'channel_name', 'upload_date'])
        channels = sorted(video_df['channel_name'].unique())
        
        selected_channel = st.selectbox("Filter by Channel", ["All"] + channels)
        if selected_channel != "All":
            video_df = video_df[video_df['channel_name'] == selected_channel]
        
        st.dataframe(video_df)
    else:
        st.info("No videos processed yet. Use the form below to add videos.")
    
    # Process new videos
    st.header("Process New Video")
    with st.form("process_video_form"):
        input_type = st.radio("Select input type:", ["Video URL", "Channel URL", "YouTube ID"])
        input_value = st.text_input("Enter the URL or ID:")
        submit_button = st.form_submit_button("Process")
        
        if submit_button:
            data_processor.set_embedding_model(embedding_model)
            
            with st.spinner("Processing..."):
                if input_type == "Video URL":
                    video_id = extract_video_id(input_value)
                    if video_id:
                        process_single_video(db_handler, data_processor, video_id, embedding_model)
                
                elif input_type == "Channel URL":
                    channel_videos = get_channel_videos(input_value)
                    if channel_videos:
                        video_ids = [video['video_id'] for video in channel_videos]
                        process_multiple_videos(db_handler, data_processor, video_ids, embedding_model)
                    else:
                        st.error("Failed to retrieve videos from the channel")
                
                else:  # YouTube ID
                    process_single_video(db_handler, data_processor, input_value, embedding_model)

def process_single_video(db_handler, data_processor, video_id, embedding_model):
    try:
        existing_index = db_handler.get_elasticsearch_index_by_youtube_id(video_id)
        if existing_index:
            st.info(f"Video {video_id} already processed. Using existing index.")
            return existing_index
        
        transcript_data = get_transcript(video_id)
        if not transcript_data:
            st.error("Failed to retrieve transcript.")
            return None
            
        # Process transcript and create indices
        processed_data = data_processor.process_transcript(video_id, transcript_data)
        if not processed_data:
            st.error("Failed to process transcript.")
            return None
            
        # Save to database and create index
        video_data = {
            'video_id': video_id,
            'title': transcript_data['metadata'].get('title', 'Unknown'),
            'author': transcript_data['metadata'].get('author', 'Unknown'),
            'upload_date': transcript_data['metadata'].get('upload_date', ''),
            'view_count': transcript_data['metadata'].get('view_count', 0),
            'like_count': transcript_data['metadata'].get('like_count', 0),
            'comment_count': transcript_data['metadata'].get('comment_count', 0),
            'video_duration': transcript_data['metadata'].get('duration', ''),
            'transcript_content': processed_data['content']
        }
        
        db_handler.add_video(video_data)
        
        index_name = f"video_{video_id}_{embedding_model}".lower()
        index_name = data_processor.build_index(index_name)
        
        if index_name:
            st.success(f"Successfully processed video: {video_data['title']}")
            return index_name
    except Exception as e:
        st.error(f"Error processing video: {str(e)}")
        logger.error(f"Error processing video {video_id}: {str(e)}")
        return None

def process_multiple_videos(db_handler, data_processor, video_ids, embedding_model):
    progress_bar = st.progress(0)
    processed = 0
    total = len(video_ids)
    
    for video_id in video_ids:
        if process_single_video(db_handler, data_processor, video_id, embedding_model):
            processed += 1
        progress_bar.progress(processed / total)
    
    st.success(f"Processed {processed} out of {total} videos")

if __name__ == "__main__":
    main()