Spaces:
Running
Running
import os | |
import google.generativeai as genai | |
import requests | |
import streamlit as st | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
import asyncio | |
import aiohttp | |
from fpdf import FPDF | |
# Set the page configuration as the first command in the app | |
st.set_page_config(page_title="AI Web Scraper", page_icon=":robot_face:", layout="wide") | |
# Configure generative AI | |
genai.configure(api_key=os.environ["AI_API_KEY"]) | |
BASE_URL = "https://transcripts.sl.nsw.gov.au" | |
async def fetch_html(session, url): | |
"""Fetch and parse the HTML content from a given URL asynchronously.""" | |
try: | |
headers = {'User-Agent': 'Mozilla/5.0'} | |
async with session.get(url, headers=headers) as response: | |
response.raise_for_status() | |
return await response.text() | |
except Exception as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
async def parse_shortlink(soup): | |
"""Extract the shortlink (node ID) from the parsed HTML.""" | |
shortlink_tag = soup.find("link", rel="shortlink") | |
shortlink = shortlink_tag["href"].strip() if shortlink_tag and "href" in shortlink_tag.attrs else None | |
print(f"Extracted Shortlink: {shortlink}") | |
return shortlink | |
async def extract_page_data(session, soup, limit=10): | |
"""Extract title, URL, and node ID for the given page content.""" | |
data = [] | |
# Extract page title | |
title_meta = soup.find("meta", attrs={"name": "dcterms.title"}) | |
page_title = title_meta["content"].strip() if title_meta else "Unknown Title" | |
# Extract links and node IDs | |
for row in soup.find_all("div", class_="views-row")[:limit]: | |
a_tag = row.find("a") | |
if a_tag: | |
url = BASE_URL + a_tag.get("href", "").strip() | |
node_soup = await fetch_html(session, url) | |
if node_soup: | |
node_soup = BeautifulSoup(node_soup, "html.parser") | |
node_id = await parse_shortlink(node_soup) | |
data.append([page_title, url, node_id]) | |
return pd.DataFrame(data, columns=["Page Title", "URL", "NodeId"]) | |
async def traverse_links(session, url, visited=set(), limit=None): | |
"""Recursively traverse through series, document, and pages URLs extracting shortlinks asynchronously.""" | |
if url in visited: | |
print(f"Skipping already visited URL: {url}") | |
return pd.DataFrame() | |
visited.add(url) | |
print(f"Traversing URL: {url}") | |
soup = await fetch_html(session, url) | |
if not soup: | |
return pd.DataFrame() | |
soup = BeautifulSoup(soup, "html.parser") | |
page_data = await extract_page_data(session, soup, limit) | |
recursive_data = pd.DataFrame(columns=["Page Title", "URL", "NodeId"]) | |
# Recursively traverse deeper levels | |
tasks = [] | |
for link in soup.find_all("a", href=True): | |
href = link["href"].strip() | |
if href.startswith(("/series/", "/document/", "/pages/")): | |
full_url = BASE_URL + href | |
tasks.append(traverse_links(session, full_url, visited, limit)) | |
results = await asyncio.gather(*tasks) | |
for result in results: | |
recursive_data = pd.concat([recursive_data, result], ignore_index=True) | |
return pd.concat([page_data, recursive_data], ignore_index=True) | |
async def process_url_field(session, table_df, limit=None, field="document"): | |
"""Process the 'document' or 'page' URLs from the provided table and fetch their node IDs asynchronously.""" | |
data = [] | |
tasks = [] | |
for url in table_df["URL"]: | |
tasks.append(process_url_field_helper(session, url, field)) | |
results = await asyncio.gather(*tasks) | |
for result in results: | |
data.extend(result) | |
return pd.DataFrame(data, columns=["URL", "NodeId"]).drop_duplicates(subset=["URL"]) | |
async def process_url_field_helper(session, url, field): | |
"""Helper to process each URL asynchronously.""" | |
data = [] | |
node_soup = await fetch_html(session, url) | |
if node_soup: | |
node_soup = BeautifulSoup(node_soup, "html.parser") | |
node_id = await parse_shortlink(node_soup) | |
# Find all hrefs that contain the given field (document or page) and process each | |
for link in node_soup.find_all("a", href=True): | |
href = link["href"].strip() | |
if field in href: | |
new_url = BASE_URL + href # Prepend the base URL to create a full URL | |
print(f"Found '{field}' link: {new_url}") | |
new_node_soup = await fetch_html(session, new_url) | |
if new_node_soup: | |
new_node_soup = BeautifulSoup(new_node_soup, "html.parser") | |
new_node_id = await parse_shortlink(new_node_soup) | |
data.append([new_url, new_node_id]) | |
data.append([url, node_id]) # Add the original URL and its node ID | |
return data | |
async def combine_data_frames(df1, df2, df3): | |
"""Combine multiple dataframes and maintain the series -> document -> page hierarchy.""" | |
combined_data = pd.concat([df1, df2, df3], ignore_index=True) | |
combined_data = combined_data.drop_duplicates(subset=["URL"]) | |
# Add columns to maintain the hierarchy | |
combined_data['Page Title'] = combined_data['Page Title'].fillna("Unknown Title") | |
combined_data['NodeId'] = combined_data['NodeId'].fillna("Unknown Node ID") | |
return combined_data[['Page Title', 'URL', 'NodeId']] | |
async def main(): | |
st.title("Custom Scrape") | |
url = st.text_input("Enter the URL to scrape:") | |
if st.button("Scrape"): | |
print("Starting the scraping process...") | |
async with aiohttp.ClientSession() as session: | |
# Step 1: Scrape initial data and create Table 1 (series links) | |
result_df = await traverse_links(session, url) | |
if not result_df.empty: | |
# Step 2: Fetch and display additional data using the 'URL' field from result_df (Table 2 - document links) | |
additional_data_df = await process_url_field(session, result_df, field="document") | |
if not additional_data_df.empty: | |
# Step 3: Now fetch data from 'pages' field in Table 2 and create Table 3 | |
table_3_df = await process_url_field(session, additional_data_df, field="page") | |
if not table_3_df.empty: | |
# Combine all the tables (result_df, additional_data_df, and table_3_df) | |
final_combined_df = await combine_data_frames(result_df, additional_data_df, table_3_df) | |
# Only show the final combined dataframe | |
st.subheader("Final Combined Data (Series -> Document -> Page Hierarchy)") | |
st.dataframe(final_combined_df) # Display the combined data | |
else: | |
st.error("No data found for the 'pages' URLs.") | |
else: | |
st.error("No data found for the 'document' URLs.") | |
else: | |
st.error("Failed to fetch data or no shortlinks found.") | |
print("Scraping process completed.") | |
if __name__ == "__main__": | |
asyncio.run(main()) |