|
import streamlit as st |
|
from selenium import webdriver |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.chrome.options import Options |
|
from selenium.webdriver.chrome.service import Service |
|
import pandas as pd |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
import time |
|
from webdriver_manager.chrome import ChromeDriverManager |
|
from webdriver_manager.chrome import ChromeType |
|
import transformers |
|
import torch |
|
import plotly.express as px |
|
|
|
st.subheader("YouTube Comments Sentiment Analysis", divider="red") |
|
tokenizer = transformers.DistilBertTokenizer.from_pretrained("tabularisai/robust-sentiment-analysis") |
|
model = transformers.DistilBertForSequenceClassification.from_pretrained("tabularisai/robust-sentiment-analysis") |
|
|
|
if 'url_count' not in st.session_state: |
|
st.session_state['url_count'] = 0 |
|
|
|
max_attempts = 2 |
|
|
|
def update_url_count(): |
|
st.session_state['url_count'] += 1 |
|
|
|
def clear_question(): |
|
st.session_state["url"] = "" |
|
|
|
url = st.text_input("Enter YouTube URL:", key="url") |
|
st.button("Clear question", on_click=clear_question) |
|
|
|
if st.button("Sentiment Analysis", type="secondary"): |
|
if st.session_state['url_count'] < max_attempts: |
|
if url: |
|
with st.spinner("Wait for it...", show_time=True): |
|
options = Options() |
|
options.add_argument("--headless") |
|
options.add_argument("--disable-gpu") |
|
options.add_argument("--no-sandbox") |
|
options.add_argument("--disable-dev-shm-usage") |
|
options.add_argument("--start-maximized") |
|
service = Service(ChromeDriverManager(chrome_type=ChromeType.CHROMIUM).install()) |
|
driver = webdriver.Chrome(service=service, options=options) |
|
data = [] |
|
wait = WebDriverWait(driver, 30) |
|
driver.get(url) |
|
|
|
placeholder = st.empty() |
|
progress_bar = st.progress(0) |
|
|
|
for item in range(30): |
|
try: |
|
driver.execute_script("window.scrollBy(0, 500);") |
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#content #content-text"))) |
|
placeholder.text(f"Scrolled {item + 1} times") |
|
progress_bar.progress((item + 1) / 30) |
|
time.sleep(1) |
|
except Exception as e: |
|
st.error(f"Exception during scrolling: {e}") |
|
break |
|
|
|
placeholder.text("Scrolling complete.") |
|
progress_bar.empty() |
|
|
|
try: |
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#contents #contents"))) |
|
comments = driver.find_elements(By.CSS_SELECTOR, "#content #content-text") |
|
for comment in comments: |
|
timestamp = None |
|
try: |
|
comment_text = comment.text |
|
date_match = re.search(r'\d+ (day|week|month|year)s? ago', comment_text) |
|
if date_match: |
|
timestamp = date_match.group(0) |
|
except Exception as e: |
|
st.error(f"Error extracting date with regex: {e}") |
|
data.append({"Comment": comment.text, "comment_date": timestamp}) |
|
|
|
|
|
except Exception as e: |
|
st.error(f"Exception during comment extraction: {e}") |
|
driver.quit() |
|
df = pd.DataFrame(data, columns=["Comment", "comment_date"]) |
|
|
|
if not df.empty and not df['Comment'].tolist() == []: |
|
st.dataframe(df) |
|
inputs = tokenizer(df['Comment'].tolist(), return_tensors="pt", padding=True, truncation=True) |
|
with torch.no_grad(): |
|
logits = model(**inputs).logits |
|
predicted_probabilities = torch.nn.functional.softmax(logits, dim=-1) |
|
predicted_labels = predicted_probabilities.argmax(dim=1) |
|
results = [] |
|
for i, label in enumerate(predicted_labels): |
|
results.append({'Review Number': i + 1, 'Sentiment': model.config.id2label[label.item()]}) |
|
sentiment_df = pd.DataFrame(results) |
|
|
|
value_counts1 = sentiment_df['Sentiment'].value_counts().rename_axis('Sentiment').reset_index(name='count') |
|
final_df = value_counts1 |
|
tab1, tab2 = st.tabs(["Pie Chart", "Bar Chart"]) |
|
with tab1: |
|
fig1 = px.pie(final_df, values='count', names='Sentiment', hover_data=['count'], labels={'count': 'count'}) |
|
fig1.update_traces(textposition='inside', textinfo='percent+label') |
|
st.plotly_chart(fig1) |
|
|
|
result = pd.concat([df, sentiment_df], axis=1) |
|
st.dataframe(result) |
|
|
|
with tab2: |
|
fig2 = px.bar(result, x="Sentiment", y="comment_date", color="Sentiment") |
|
st.plotly_chart(fig2) |
|
|
|
csv = result.to_csv(index=False) |
|
st.download_button(label="Download data as CSV", data=csv, file_name='Summary of the results.csv', mime='text/csv') |
|
else: |
|
st.warning("No comments were scraped. Sentiment analysis could not be performed.") |
|
|
|
else: |
|
st.warning("Please enter a URL.") |
|
else: |
|
st.warning(f"You have reached the maximum URL attempts ({max_attempts}).") |
|
|
|
if 'url_count' in st.session_state: |
|
st.write(f"URL pasted {st.session_state['url_count']} times.") |