publik_rag / crawler.py
jbl2024's picture
Upload folder using huggingface_hub
50705f6 verified
import hashlib
from typing import Dict, List, Set, Tuple
from urllib.parse import urljoin, urlparse, urlunparse
import requests
from bs4 import BeautifulSoup, NavigableString, Tag
class ContentCrawler:
def __init__(
self, base_url: str, ignore_prefixes: List[str] = None, max_length: int = 8000
):
"""
Initialize the crawler with the base URL, a list of URL prefixes to ignore, and the maximum chunk size.
Args:
base_url: The website URL to crawl.
ignore_prefixes: List of URL path prefixes to ignore.
max_length: Maximum allowed size for a chunk.
"""
self.base_url = base_url
self.visited = set()
self.results = []
self.max_length = max_length
self.ignore_prefixes = ignore_prefixes or [
"manage/",
"password/",
"media/",
"notes-de-mises-a-jour/",
]
# Pour éviter les doublons de contenu
self.content_hashes = set()
def crawl(self) -> List[Dict[str, str]]:
"""
Recursively crawl the website starting from the homepage.
Returns:
A list of dictionaries with keys 'url' and 'text' (in markdown format).
"""
try:
homepage_response = requests.get(self.base_url)
homepage_response.raise_for_status()
except Exception as e:
print(f"Error fetching homepage {self.base_url}: {e}")
return []
homepage_soup = BeautifulSoup(homepage_response.text, "html.parser")
initial_links = self._get_internal_links(homepage_soup)
# Utiliser un ensemble pour éviter les doublons d'URLs dans la file
queue = set()
for link in initial_links:
full_url = self._normalize_url(urljoin(self.base_url, link))
if full_url != self.base_url:
queue.add(full_url)
self.visited.add(full_url)
# Convertir en liste pour le traitement
queue_list = list(queue)
while queue_list:
current_url = queue_list.pop(0)
print(f"Processing {current_url}")
result, new_links = self._parse_page(current_url)
if result:
self.results.extend(result)
# Ajouter seulement les liens non visités
for link in new_links:
full_url = self._normalize_url(urljoin(self.base_url, link))
if full_url not in self.visited and full_url != self.base_url:
self.visited.add(full_url)
queue_list.append(full_url)
return self.results
def _normalize_url(self, url: str) -> str:
"""Normaliser l'URL en supprimant les fragments et paramètres de requête."""
parsed = urlparse(url)
# Supprimer fragment et query params
normalized = parsed._replace(fragment="", query="")
return urlunparse(normalized)
def _get_internal_links(self, soup: BeautifulSoup) -> Set[str]:
"""
Retrieve internal links from the BeautifulSoup object,
ignoring those whose path starts with any of the specified prefixes.
"""
links = set()
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
if href.startswith("#") or href.startswith("javascript:"):
continue
parsed_href = urlparse(href)
path = parsed_href.path.lstrip("/")
if any(path.startswith(prefix) for prefix in self.ignore_prefixes):
continue
# S'assurer que le lien est interne
is_internal = (
not parsed_href.netloc
or self.base_url in href
or parsed_href.netloc == urlparse(self.base_url).netloc
)
if is_internal:
links.add(href)
return links
def _parse_page(self, url: str) -> Tuple[List[Dict[str, str]], Set[str]]:
"""Parse une page et extrait son contenu ainsi que ses liens."""
try:
response = requests.get(url)
response.raise_for_status()
except Exception as e:
print(f"Error fetching {url}: {e}")
return [], set()
soup = BeautifulSoup(response.text, "html.parser")
# Trouver la div principale de contenu
content_div = soup.find(id="content")
if not content_div:
print(f"No content div found in {url}")
return [], self._get_internal_links(soup)
# Nettoyer le contenu
for script in content_div.find_all(["script", "style"]):
script.decompose()
# Récupérer le titre
h1_tag = content_div.find("h1")
page_title = h1_tag.get_text(strip=True) if h1_tag else ""
# Créer le markdown complet
markdown_content = self._extract_structured_content(content_div, page_title)
# Vérifier si le contenu est un doublon
content_hash = self._hash_content(markdown_content)
if content_hash in self.content_hashes:
print(f"Duplicate content skipped for {url}")
return [], self._get_internal_links(soup)
self.content_hashes.add(content_hash)
# Diviser en chunks si nécessaire
chunks = self._split_content(markdown_content)
# Créer la liste des résultats
results = []
for i, chunk in enumerate(chunks):
results.append({"url": f"{url}#chunk-{i+1}", "text": chunk})
return results, self._get_internal_links(soup)
def _extract_structured_content(self, content_div: Tag, page_title: str) -> str:
"""Extrait le contenu de manière structurée en respectant la hiérarchie de titres."""
lines = []
# Ajouter le titre principal
if page_title:
lines.append(f"# {page_title}")
# Identifier tous les titres et le contenu
current_element = content_div.find_next()
while current_element and current_element.parent == content_div:
if current_element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# Convertir le niveau de titre
level = int(current_element.name[1])
text = current_element.get_text(strip=True)
lines.append(f"{'#' * level} {text}")
else:
markdown = self._convert_element_to_markdown(current_element)
if markdown:
lines.append(markdown)
# Passer à l'élément suivant au même niveau
current_element = current_element.find_next_sibling()
return "\n\n".join(line for line in lines if line)
def _convert_element_to_markdown(self, element) -> str:
"""Convertit un élément HTML en markdown."""
if isinstance(element, NavigableString):
text = element.strip()
return text if text else ""
if isinstance(element, Tag):
if element.name in ["script", "style", "iframe"]:
return ""
if element.name == "p":
return element.get_text(strip=True)
elif element.name == "a" and element.get("href"):
text = element.get_text(strip=True)
href = element.get("href")
return f"[{text}]({href})"
elif element.name in ["ul", "ol"]:
items = []
for li in element.find_all("li", recursive=False):
text = li.get_text(strip=True)
if text:
items.append(f"* {text}")
return "\n".join(items)
elif element.name == "table":
# Extraction basique des tableaux
rows = []
for tr in element.find_all("tr"):
cols = []
for td in tr.find_all(["td", "th"]):
cols.append(td.get_text(strip=True))
rows.append(" | ".join(cols))
if rows:
# Ajouter la ligne de séparation après l'en-tête
if len(rows) > 1:
rows.insert(1, "-" * len(rows[0]))
return "\n".join(rows)
return ""
elif element.name in ["div", "section", "article"]:
parts = []
for child in element.children:
part = self._convert_element_to_markdown(child)
if part:
parts.append(part)
return "\n\n".join(parts)
else:
text = element.get_text(strip=True)
return text if text else ""
return ""
def _split_content(self, content: str) -> List[str]:
"""Divise le contenu en chunks de taille maximale."""
if len(content) <= self.max_length:
return [content]
# Extraction du titre principal pour le préserver dans chaque chunk
lines = content.split("\n\n")
main_title = lines[0] if lines and lines[0].startswith("# ") else ""
chunks = []
current_chunk = main_title
current_length = len(main_title)
for line in lines:
# Ignorer le titre principal déjà traité
if line == main_title:
continue
line_length = len(line)
# Si la ligne seule dépasse la taille max, on doit la diviser
if line_length > self.max_length:
# D'abord ajouter le chunk courant s'il y a du contenu
if current_length > len(main_title):
chunks.append(current_chunk)
# Diviser cette longue ligne en sous-parties
start = 0
while start < line_length:
part = line[start : start + self.max_length]
if main_title and not part.startswith("#"):
chunks.append(f"{main_title}\n\n{part}")
else:
chunks.append(part)
start += self.max_length
# Réinitialiser le chunk courant
current_chunk = main_title
current_length = len(main_title)
else:
# Si l'ajout de cette ligne dépasse la taille max, créer un nouveau chunk
if current_length + line_length + 4 > self.max_length: # +4 pour \n\n
chunks.append(current_chunk)
current_chunk = main_title
current_length = len(main_title)
if main_title and current_chunk:
current_chunk += "\n\n"
current_length += 2
# Ajouter la ligne au chunk courant
if current_chunk:
current_chunk += "\n\n" + line
current_length += line_length + 2
else:
current_chunk = line
current_length = line_length
# Ajouter le dernier chunk s'il reste du contenu
if current_length > len(main_title):
chunks.append(current_chunk)
return chunks
def _hash_content(self, content: str) -> str:
"""Crée un hash du contenu pour identifier les doublons."""
# Utiliser seulement le contenu principal (pas les URLs) pour la détection de doublons
return hashlib.md5(content.encode()).hexdigest()