Spaces:
Sleeping
Sleeping
import hashlib | |
from typing import Dict, List, Set, Tuple | |
from urllib.parse import urljoin, urlparse, urlunparse | |
import requests | |
from bs4 import BeautifulSoup, NavigableString, Tag | |
class ContentCrawler: | |
def __init__( | |
self, base_url: str, ignore_prefixes: List[str] = None, max_length: int = 8000 | |
): | |
""" | |
Initialize the crawler with the base URL, a list of URL prefixes to ignore, and the maximum chunk size. | |
Args: | |
base_url: The website URL to crawl. | |
ignore_prefixes: List of URL path prefixes to ignore. | |
max_length: Maximum allowed size for a chunk. | |
""" | |
self.base_url = base_url | |
self.visited = set() | |
self.results = [] | |
self.max_length = max_length | |
self.ignore_prefixes = ignore_prefixes or [ | |
"manage/", | |
"password/", | |
"media/", | |
"notes-de-mises-a-jour/", | |
] | |
# Pour éviter les doublons de contenu | |
self.content_hashes = set() | |
def crawl(self) -> List[Dict[str, str]]: | |
""" | |
Recursively crawl the website starting from the homepage. | |
Returns: | |
A list of dictionaries with keys 'url' and 'text' (in markdown format). | |
""" | |
try: | |
homepage_response = requests.get(self.base_url) | |
homepage_response.raise_for_status() | |
except Exception as e: | |
print(f"Error fetching homepage {self.base_url}: {e}") | |
return [] | |
homepage_soup = BeautifulSoup(homepage_response.text, "html.parser") | |
initial_links = self._get_internal_links(homepage_soup) | |
# Utiliser un ensemble pour éviter les doublons d'URLs dans la file | |
queue = set() | |
for link in initial_links: | |
full_url = self._normalize_url(urljoin(self.base_url, link)) | |
if full_url != self.base_url: | |
queue.add(full_url) | |
self.visited.add(full_url) | |
# Convertir en liste pour le traitement | |
queue_list = list(queue) | |
while queue_list: | |
current_url = queue_list.pop(0) | |
print(f"Processing {current_url}") | |
result, new_links = self._parse_page(current_url) | |
if result: | |
self.results.extend(result) | |
# Ajouter seulement les liens non visités | |
for link in new_links: | |
full_url = self._normalize_url(urljoin(self.base_url, link)) | |
if full_url not in self.visited and full_url != self.base_url: | |
self.visited.add(full_url) | |
queue_list.append(full_url) | |
return self.results | |
def _normalize_url(self, url: str) -> str: | |
"""Normaliser l'URL en supprimant les fragments et paramètres de requête.""" | |
parsed = urlparse(url) | |
# Supprimer fragment et query params | |
normalized = parsed._replace(fragment="", query="") | |
return urlunparse(normalized) | |
def _get_internal_links(self, soup: BeautifulSoup) -> Set[str]: | |
""" | |
Retrieve internal links from the BeautifulSoup object, | |
ignoring those whose path starts with any of the specified prefixes. | |
""" | |
links = set() | |
for a_tag in soup.find_all("a", href=True): | |
href = a_tag["href"] | |
if href.startswith("#") or href.startswith("javascript:"): | |
continue | |
parsed_href = urlparse(href) | |
path = parsed_href.path.lstrip("/") | |
if any(path.startswith(prefix) for prefix in self.ignore_prefixes): | |
continue | |
# S'assurer que le lien est interne | |
is_internal = ( | |
not parsed_href.netloc | |
or self.base_url in href | |
or parsed_href.netloc == urlparse(self.base_url).netloc | |
) | |
if is_internal: | |
links.add(href) | |
return links | |
def _parse_page(self, url: str) -> Tuple[List[Dict[str, str]], Set[str]]: | |
"""Parse une page et extrait son contenu ainsi que ses liens.""" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
except Exception as e: | |
print(f"Error fetching {url}: {e}") | |
return [], set() | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Trouver la div principale de contenu | |
content_div = soup.find(id="content") | |
if not content_div: | |
print(f"No content div found in {url}") | |
return [], self._get_internal_links(soup) | |
# Nettoyer le contenu | |
for script in content_div.find_all(["script", "style"]): | |
script.decompose() | |
# Récupérer le titre | |
h1_tag = content_div.find("h1") | |
page_title = h1_tag.get_text(strip=True) if h1_tag else "" | |
# Créer le markdown complet | |
markdown_content = self._extract_structured_content(content_div, page_title) | |
# Vérifier si le contenu est un doublon | |
content_hash = self._hash_content(markdown_content) | |
if content_hash in self.content_hashes: | |
print(f"Duplicate content skipped for {url}") | |
return [], self._get_internal_links(soup) | |
self.content_hashes.add(content_hash) | |
# Diviser en chunks si nécessaire | |
chunks = self._split_content(markdown_content) | |
# Créer la liste des résultats | |
results = [] | |
for i, chunk in enumerate(chunks): | |
results.append({"url": f"{url}#chunk-{i+1}", "text": chunk}) | |
return results, self._get_internal_links(soup) | |
def _extract_structured_content(self, content_div: Tag, page_title: str) -> str: | |
"""Extrait le contenu de manière structurée en respectant la hiérarchie de titres.""" | |
lines = [] | |
# Ajouter le titre principal | |
if page_title: | |
lines.append(f"# {page_title}") | |
# Identifier tous les titres et le contenu | |
current_element = content_div.find_next() | |
while current_element and current_element.parent == content_div: | |
if current_element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: | |
# Convertir le niveau de titre | |
level = int(current_element.name[1]) | |
text = current_element.get_text(strip=True) | |
lines.append(f"{'#' * level} {text}") | |
else: | |
markdown = self._convert_element_to_markdown(current_element) | |
if markdown: | |
lines.append(markdown) | |
# Passer à l'élément suivant au même niveau | |
current_element = current_element.find_next_sibling() | |
return "\n\n".join(line for line in lines if line) | |
def _convert_element_to_markdown(self, element) -> str: | |
"""Convertit un élément HTML en markdown.""" | |
if isinstance(element, NavigableString): | |
text = element.strip() | |
return text if text else "" | |
if isinstance(element, Tag): | |
if element.name in ["script", "style", "iframe"]: | |
return "" | |
if element.name == "p": | |
return element.get_text(strip=True) | |
elif element.name == "a" and element.get("href"): | |
text = element.get_text(strip=True) | |
href = element.get("href") | |
return f"[{text}]({href})" | |
elif element.name in ["ul", "ol"]: | |
items = [] | |
for li in element.find_all("li", recursive=False): | |
text = li.get_text(strip=True) | |
if text: | |
items.append(f"* {text}") | |
return "\n".join(items) | |
elif element.name == "table": | |
# Extraction basique des tableaux | |
rows = [] | |
for tr in element.find_all("tr"): | |
cols = [] | |
for td in tr.find_all(["td", "th"]): | |
cols.append(td.get_text(strip=True)) | |
rows.append(" | ".join(cols)) | |
if rows: | |
# Ajouter la ligne de séparation après l'en-tête | |
if len(rows) > 1: | |
rows.insert(1, "-" * len(rows[0])) | |
return "\n".join(rows) | |
return "" | |
elif element.name in ["div", "section", "article"]: | |
parts = [] | |
for child in element.children: | |
part = self._convert_element_to_markdown(child) | |
if part: | |
parts.append(part) | |
return "\n\n".join(parts) | |
else: | |
text = element.get_text(strip=True) | |
return text if text else "" | |
return "" | |
def _split_content(self, content: str) -> List[str]: | |
"""Divise le contenu en chunks de taille maximale.""" | |
if len(content) <= self.max_length: | |
return [content] | |
# Extraction du titre principal pour le préserver dans chaque chunk | |
lines = content.split("\n\n") | |
main_title = lines[0] if lines and lines[0].startswith("# ") else "" | |
chunks = [] | |
current_chunk = main_title | |
current_length = len(main_title) | |
for line in lines: | |
# Ignorer le titre principal déjà traité | |
if line == main_title: | |
continue | |
line_length = len(line) | |
# Si la ligne seule dépasse la taille max, on doit la diviser | |
if line_length > self.max_length: | |
# D'abord ajouter le chunk courant s'il y a du contenu | |
if current_length > len(main_title): | |
chunks.append(current_chunk) | |
# Diviser cette longue ligne en sous-parties | |
start = 0 | |
while start < line_length: | |
part = line[start : start + self.max_length] | |
if main_title and not part.startswith("#"): | |
chunks.append(f"{main_title}\n\n{part}") | |
else: | |
chunks.append(part) | |
start += self.max_length | |
# Réinitialiser le chunk courant | |
current_chunk = main_title | |
current_length = len(main_title) | |
else: | |
# Si l'ajout de cette ligne dépasse la taille max, créer un nouveau chunk | |
if current_length + line_length + 4 > self.max_length: # +4 pour \n\n | |
chunks.append(current_chunk) | |
current_chunk = main_title | |
current_length = len(main_title) | |
if main_title and current_chunk: | |
current_chunk += "\n\n" | |
current_length += 2 | |
# Ajouter la ligne au chunk courant | |
if current_chunk: | |
current_chunk += "\n\n" + line | |
current_length += line_length + 2 | |
else: | |
current_chunk = line | |
current_length = line_length | |
# Ajouter le dernier chunk s'il reste du contenu | |
if current_length > len(main_title): | |
chunks.append(current_chunk) | |
return chunks | |
def _hash_content(self, content: str) -> str: | |
"""Crée un hash du contenu pour identifier les doublons.""" | |
# Utiliser seulement le contenu principal (pas les URLs) pour la détection de doublons | |
return hashlib.md5(content.encode()).hexdigest() | |