import streamlit as st from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service import pandas as pd from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.chrome import ChromeType import transformers import torch import plotly.express as px st.subheader("YouTube Comments Sentiment Analysis", divider="red") tokenizer = transformers.DistilBertTokenizer.from_pretrained("tabularisai/robust-sentiment-analysis") model = transformers.DistilBertForSequenceClassification.from_pretrained("tabularisai/robust-sentiment-analysis") if 'url_count' not in st.session_state: st.session_state['url_count'] = 0 max_attempts = 2 def update_url_count(): st.session_state['url_count'] += 1 def clear_question(): st.session_state["url"] = "" url = st.text_input("Enter YouTube URL:", key="url") st.button("Clear question", on_click=clear_question) if st.button("Sentiment Analysis", type="secondary"): if st.session_state['url_count'] < max_attempts: if url: with st.spinner("Wait for it...", show_time=True): options = Options() options.add_argument("--headless") options.add_argument("--disable-gpu") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--start-maximized") service = Service(ChromeDriverManager(chrome_type=ChromeType.CHROMIUM).install()) driver = webdriver.Chrome(service=service, options=options) data = [] wait = WebDriverWait(driver, 30) driver.get(url) placeholder = st.empty() progress_bar = st.progress(0) for item in range(30): try: driver.execute_script("window.scrollBy(0, 500);") wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#content #content-text"))) placeholder.text(f"Scrolled {item + 1} times") progress_bar.progress((item + 1) / 30) time.sleep(1) #Increased wait time for dynamic loading except Exception as e: st.error(f"Exception during scrolling: {e}") break placeholder.text("Scrolling complete.") progress_bar.empty() try: wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#contents #contents"))) comments = driver.find_elements(By.CSS_SELECTOR, "#content #content-text") for comment in comments: timestamp = None try: comment_text = comment.text date_match = re.search(r'\d+ (day|week|month|year)s? ago', comment_text) #Example regex. if date_match: timestamp = date_match.group(0) except Exception as e: st.error(f"Error extracting date with regex: {e}") data.append({"Comment": comment.text, "comment_date": timestamp}) except Exception as e: st.error(f"Exception during comment extraction: {e}") driver.quit() df = pd.DataFrame(data, columns=["Comment", "comment_date"]) if not df.empty and not df['Comment'].tolist() == []: st.dataframe(df) inputs = tokenizer(df['Comment'].tolist(), return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): logits = model(**inputs).logits predicted_probabilities = torch.nn.functional.softmax(logits, dim=-1) predicted_labels = predicted_probabilities.argmax(dim=1) results = [] for i, label in enumerate(predicted_labels): results.append({'Review Number': i + 1, 'Sentiment': model.config.id2label[label.item()]}) sentiment_df = pd.DataFrame(results) value_counts1 = sentiment_df['Sentiment'].value_counts().rename_axis('Sentiment').reset_index(name='count') final_df = value_counts1 tab1, tab2 = st.tabs(["Pie Chart", "Bar Chart"]) with tab1: fig1 = px.pie(final_df, values='count', names='Sentiment', hover_data=['count'], labels={'count': 'count'}) fig1.update_traces(textposition='inside', textinfo='percent+label') st.plotly_chart(fig1) result = pd.concat([df, sentiment_df], axis=1) st.dataframe(result) with tab2: fig2 = px.bar(result, x="Sentiment", y="comment_date", color="Sentiment") st.plotly_chart(fig2) csv = result.to_csv(index=False) st.download_button(label="Download data as CSV", data=csv, file_name='Summary of the results.csv', mime='text/csv') else: st.warning("No comments were scraped. Sentiment analysis could not be performed.") else: st.warning("Please enter a URL.") else: st.warning(f"You have reached the maximum URL attempts ({max_attempts}).") if 'url_count' in st.session_state: st.write(f"URL pasted {st.session_state['url_count']} times.")