import hashlib from typing import Dict, List, Set, Tuple from urllib.parse import urljoin, urlparse, urlunparse import requests from bs4 import BeautifulSoup, NavigableString, Tag class ContentCrawler: def __init__( self, base_url: str, ignore_prefixes: List[str] = None, max_length: int = 8000 ): """ Initialize the crawler with the base URL, a list of URL prefixes to ignore, and the maximum chunk size. Args: base_url: The website URL to crawl. ignore_prefixes: List of URL path prefixes to ignore. max_length: Maximum allowed size for a chunk. """ self.base_url = base_url self.visited = set() self.results = [] self.max_length = max_length self.ignore_prefixes = ignore_prefixes or [ "manage/", "password/", "media/", "notes-de-mises-a-jour/", ] # Pour éviter les doublons de contenu self.content_hashes = set() def crawl(self) -> List[Dict[str, str]]: """ Recursively crawl the website starting from the homepage. Returns: A list of dictionaries with keys 'url' and 'text' (in markdown format). """ try: homepage_response = requests.get(self.base_url) homepage_response.raise_for_status() except Exception as e: print(f"Error fetching homepage {self.base_url}: {e}") return [] homepage_soup = BeautifulSoup(homepage_response.text, "html.parser") initial_links = self._get_internal_links(homepage_soup) # Utiliser un ensemble pour éviter les doublons d'URLs dans la file queue = set() for link in initial_links: full_url = self._normalize_url(urljoin(self.base_url, link)) if full_url != self.base_url: queue.add(full_url) self.visited.add(full_url) # Convertir en liste pour le traitement queue_list = list(queue) while queue_list: current_url = queue_list.pop(0) print(f"Processing {current_url}") result, new_links = self._parse_page(current_url) if result: self.results.extend(result) # Ajouter seulement les liens non visités for link in new_links: full_url = self._normalize_url(urljoin(self.base_url, link)) if full_url not in self.visited and full_url != self.base_url: self.visited.add(full_url) queue_list.append(full_url) return self.results def _normalize_url(self, url: str) -> str: """Normaliser l'URL en supprimant les fragments et paramètres de requête.""" parsed = urlparse(url) # Supprimer fragment et query params normalized = parsed._replace(fragment="", query="") return urlunparse(normalized) def _get_internal_links(self, soup: BeautifulSoup) -> Set[str]: """ Retrieve internal links from the BeautifulSoup object, ignoring those whose path starts with any of the specified prefixes. """ links = set() for a_tag in soup.find_all("a", href=True): href = a_tag["href"] if href.startswith("#") or href.startswith("javascript:"): continue parsed_href = urlparse(href) path = parsed_href.path.lstrip("/") if any(path.startswith(prefix) for prefix in self.ignore_prefixes): continue # S'assurer que le lien est interne is_internal = ( not parsed_href.netloc or self.base_url in href or parsed_href.netloc == urlparse(self.base_url).netloc ) if is_internal: links.add(href) return links def _parse_page(self, url: str) -> Tuple[List[Dict[str, str]], Set[str]]: """Parse une page et extrait son contenu ainsi que ses liens.""" try: response = requests.get(url) response.raise_for_status() except Exception as e: print(f"Error fetching {url}: {e}") return [], set() soup = BeautifulSoup(response.text, "html.parser") # Trouver la div principale de contenu content_div = soup.find(id="content") if not content_div: print(f"No content div found in {url}") return [], self._get_internal_links(soup) # Nettoyer le contenu for script in content_div.find_all(["script", "style"]): script.decompose() # Récupérer le titre h1_tag = content_div.find("h1") page_title = h1_tag.get_text(strip=True) if h1_tag else "" # Créer le markdown complet markdown_content = self._extract_structured_content(content_div, page_title) # Vérifier si le contenu est un doublon content_hash = self._hash_content(markdown_content) if content_hash in self.content_hashes: print(f"Duplicate content skipped for {url}") return [], self._get_internal_links(soup) self.content_hashes.add(content_hash) # Diviser en chunks si nécessaire chunks = self._split_content(markdown_content) # Créer la liste des résultats results = [] for i, chunk in enumerate(chunks): results.append({"url": f"{url}#chunk-{i+1}", "text": chunk}) return results, self._get_internal_links(soup) def _extract_structured_content(self, content_div: Tag, page_title: str) -> str: """Extrait le contenu de manière structurée en respectant la hiérarchie de titres.""" lines = [] # Ajouter le titre principal if page_title: lines.append(f"# {page_title}") # Identifier tous les titres et le contenu current_element = content_div.find_next() while current_element and current_element.parent == content_div: if current_element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: # Convertir le niveau de titre level = int(current_element.name[1]) text = current_element.get_text(strip=True) lines.append(f"{'#' * level} {text}") else: markdown = self._convert_element_to_markdown(current_element) if markdown: lines.append(markdown) # Passer à l'élément suivant au même niveau current_element = current_element.find_next_sibling() return "\n\n".join(line for line in lines if line) def _convert_element_to_markdown(self, element) -> str: """Convertit un élément HTML en markdown.""" if isinstance(element, NavigableString): text = element.strip() return text if text else "" if isinstance(element, Tag): if element.name in ["script", "style", "iframe"]: return "" if element.name == "p": return element.get_text(strip=True) elif element.name == "a" and element.get("href"): text = element.get_text(strip=True) href = element.get("href") return f"[{text}]({href})" elif element.name in ["ul", "ol"]: items = [] for li in element.find_all("li", recursive=False): text = li.get_text(strip=True) if text: items.append(f"* {text}") return "\n".join(items) elif element.name == "table": # Extraction basique des tableaux rows = [] for tr in element.find_all("tr"): cols = [] for td in tr.find_all(["td", "th"]): cols.append(td.get_text(strip=True)) rows.append(" | ".join(cols)) if rows: # Ajouter la ligne de séparation après l'en-tête if len(rows) > 1: rows.insert(1, "-" * len(rows[0])) return "\n".join(rows) return "" elif element.name in ["div", "section", "article"]: parts = [] for child in element.children: part = self._convert_element_to_markdown(child) if part: parts.append(part) return "\n\n".join(parts) else: text = element.get_text(strip=True) return text if text else "" return "" def _split_content(self, content: str) -> List[str]: """Divise le contenu en chunks de taille maximale.""" if len(content) <= self.max_length: return [content] # Extraction du titre principal pour le préserver dans chaque chunk lines = content.split("\n\n") main_title = lines[0] if lines and lines[0].startswith("# ") else "" chunks = [] current_chunk = main_title current_length = len(main_title) for line in lines: # Ignorer le titre principal déjà traité if line == main_title: continue line_length = len(line) # Si la ligne seule dépasse la taille max, on doit la diviser if line_length > self.max_length: # D'abord ajouter le chunk courant s'il y a du contenu if current_length > len(main_title): chunks.append(current_chunk) # Diviser cette longue ligne en sous-parties start = 0 while start < line_length: part = line[start : start + self.max_length] if main_title and not part.startswith("#"): chunks.append(f"{main_title}\n\n{part}") else: chunks.append(part) start += self.max_length # Réinitialiser le chunk courant current_chunk = main_title current_length = len(main_title) else: # Si l'ajout de cette ligne dépasse la taille max, créer un nouveau chunk if current_length + line_length + 4 > self.max_length: # +4 pour \n\n chunks.append(current_chunk) current_chunk = main_title current_length = len(main_title) if main_title and current_chunk: current_chunk += "\n\n" current_length += 2 # Ajouter la ligne au chunk courant if current_chunk: current_chunk += "\n\n" + line current_length += line_length + 2 else: current_chunk = line current_length = line_length # Ajouter le dernier chunk s'il reste du contenu if current_length > len(main_title): chunks.append(current_chunk) return chunks def _hash_content(self, content: str) -> str: """Crée un hash du contenu pour identifier les doublons.""" # Utiliser seulement le contenu principal (pas les URLs) pour la détection de doublons return hashlib.md5(content.encode()).hexdigest()