Spaces:
Sleeping
Sleeping
File size: 11,579 Bytes
50705f6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
import hashlib
from typing import Dict, List, Set, Tuple
from urllib.parse import urljoin, urlparse, urlunparse
import requests
from bs4 import BeautifulSoup, NavigableString, Tag
class ContentCrawler:
def __init__(
self, base_url: str, ignore_prefixes: List[str] = None, max_length: int = 8000
):
"""
Initialize the crawler with the base URL, a list of URL prefixes to ignore, and the maximum chunk size.
Args:
base_url: The website URL to crawl.
ignore_prefixes: List of URL path prefixes to ignore.
max_length: Maximum allowed size for a chunk.
"""
self.base_url = base_url
self.visited = set()
self.results = []
self.max_length = max_length
self.ignore_prefixes = ignore_prefixes or [
"manage/",
"password/",
"media/",
"notes-de-mises-a-jour/",
]
# Pour éviter les doublons de contenu
self.content_hashes = set()
def crawl(self) -> List[Dict[str, str]]:
"""
Recursively crawl the website starting from the homepage.
Returns:
A list of dictionaries with keys 'url' and 'text' (in markdown format).
"""
try:
homepage_response = requests.get(self.base_url)
homepage_response.raise_for_status()
except Exception as e:
print(f"Error fetching homepage {self.base_url}: {e}")
return []
homepage_soup = BeautifulSoup(homepage_response.text, "html.parser")
initial_links = self._get_internal_links(homepage_soup)
# Utiliser un ensemble pour éviter les doublons d'URLs dans la file
queue = set()
for link in initial_links:
full_url = self._normalize_url(urljoin(self.base_url, link))
if full_url != self.base_url:
queue.add(full_url)
self.visited.add(full_url)
# Convertir en liste pour le traitement
queue_list = list(queue)
while queue_list:
current_url = queue_list.pop(0)
print(f"Processing {current_url}")
result, new_links = self._parse_page(current_url)
if result:
self.results.extend(result)
# Ajouter seulement les liens non visités
for link in new_links:
full_url = self._normalize_url(urljoin(self.base_url, link))
if full_url not in self.visited and full_url != self.base_url:
self.visited.add(full_url)
queue_list.append(full_url)
return self.results
def _normalize_url(self, url: str) -> str:
"""Normaliser l'URL en supprimant les fragments et paramètres de requête."""
parsed = urlparse(url)
# Supprimer fragment et query params
normalized = parsed._replace(fragment="", query="")
return urlunparse(normalized)
def _get_internal_links(self, soup: BeautifulSoup) -> Set[str]:
"""
Retrieve internal links from the BeautifulSoup object,
ignoring those whose path starts with any of the specified prefixes.
"""
links = set()
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
if href.startswith("#") or href.startswith("javascript:"):
continue
parsed_href = urlparse(href)
path = parsed_href.path.lstrip("/")
if any(path.startswith(prefix) for prefix in self.ignore_prefixes):
continue
# S'assurer que le lien est interne
is_internal = (
not parsed_href.netloc
or self.base_url in href
or parsed_href.netloc == urlparse(self.base_url).netloc
)
if is_internal:
links.add(href)
return links
def _parse_page(self, url: str) -> Tuple[List[Dict[str, str]], Set[str]]:
"""Parse une page et extrait son contenu ainsi que ses liens."""
try:
response = requests.get(url)
response.raise_for_status()
except Exception as e:
print(f"Error fetching {url}: {e}")
return [], set()
soup = BeautifulSoup(response.text, "html.parser")
# Trouver la div principale de contenu
content_div = soup.find(id="content")
if not content_div:
print(f"No content div found in {url}")
return [], self._get_internal_links(soup)
# Nettoyer le contenu
for script in content_div.find_all(["script", "style"]):
script.decompose()
# Récupérer le titre
h1_tag = content_div.find("h1")
page_title = h1_tag.get_text(strip=True) if h1_tag else ""
# Créer le markdown complet
markdown_content = self._extract_structured_content(content_div, page_title)
# Vérifier si le contenu est un doublon
content_hash = self._hash_content(markdown_content)
if content_hash in self.content_hashes:
print(f"Duplicate content skipped for {url}")
return [], self._get_internal_links(soup)
self.content_hashes.add(content_hash)
# Diviser en chunks si nécessaire
chunks = self._split_content(markdown_content)
# Créer la liste des résultats
results = []
for i, chunk in enumerate(chunks):
results.append({"url": f"{url}#chunk-{i+1}", "text": chunk})
return results, self._get_internal_links(soup)
def _extract_structured_content(self, content_div: Tag, page_title: str) -> str:
"""Extrait le contenu de manière structurée en respectant la hiérarchie de titres."""
lines = []
# Ajouter le titre principal
if page_title:
lines.append(f"# {page_title}")
# Identifier tous les titres et le contenu
current_element = content_div.find_next()
while current_element and current_element.parent == content_div:
if current_element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# Convertir le niveau de titre
level = int(current_element.name[1])
text = current_element.get_text(strip=True)
lines.append(f"{'#' * level} {text}")
else:
markdown = self._convert_element_to_markdown(current_element)
if markdown:
lines.append(markdown)
# Passer à l'élément suivant au même niveau
current_element = current_element.find_next_sibling()
return "\n\n".join(line for line in lines if line)
def _convert_element_to_markdown(self, element) -> str:
"""Convertit un élément HTML en markdown."""
if isinstance(element, NavigableString):
text = element.strip()
return text if text else ""
if isinstance(element, Tag):
if element.name in ["script", "style", "iframe"]:
return ""
if element.name == "p":
return element.get_text(strip=True)
elif element.name == "a" and element.get("href"):
text = element.get_text(strip=True)
href = element.get("href")
return f"[{text}]({href})"
elif element.name in ["ul", "ol"]:
items = []
for li in element.find_all("li", recursive=False):
text = li.get_text(strip=True)
if text:
items.append(f"* {text}")
return "\n".join(items)
elif element.name == "table":
# Extraction basique des tableaux
rows = []
for tr in element.find_all("tr"):
cols = []
for td in tr.find_all(["td", "th"]):
cols.append(td.get_text(strip=True))
rows.append(" | ".join(cols))
if rows:
# Ajouter la ligne de séparation après l'en-tête
if len(rows) > 1:
rows.insert(1, "-" * len(rows[0]))
return "\n".join(rows)
return ""
elif element.name in ["div", "section", "article"]:
parts = []
for child in element.children:
part = self._convert_element_to_markdown(child)
if part:
parts.append(part)
return "\n\n".join(parts)
else:
text = element.get_text(strip=True)
return text if text else ""
return ""
def _split_content(self, content: str) -> List[str]:
"""Divise le contenu en chunks de taille maximale."""
if len(content) <= self.max_length:
return [content]
# Extraction du titre principal pour le préserver dans chaque chunk
lines = content.split("\n\n")
main_title = lines[0] if lines and lines[0].startswith("# ") else ""
chunks = []
current_chunk = main_title
current_length = len(main_title)
for line in lines:
# Ignorer le titre principal déjà traité
if line == main_title:
continue
line_length = len(line)
# Si la ligne seule dépasse la taille max, on doit la diviser
if line_length > self.max_length:
# D'abord ajouter le chunk courant s'il y a du contenu
if current_length > len(main_title):
chunks.append(current_chunk)
# Diviser cette longue ligne en sous-parties
start = 0
while start < line_length:
part = line[start : start + self.max_length]
if main_title and not part.startswith("#"):
chunks.append(f"{main_title}\n\n{part}")
else:
chunks.append(part)
start += self.max_length
# Réinitialiser le chunk courant
current_chunk = main_title
current_length = len(main_title)
else:
# Si l'ajout de cette ligne dépasse la taille max, créer un nouveau chunk
if current_length + line_length + 4 > self.max_length: # +4 pour \n\n
chunks.append(current_chunk)
current_chunk = main_title
current_length = len(main_title)
if main_title and current_chunk:
current_chunk += "\n\n"
current_length += 2
# Ajouter la ligne au chunk courant
if current_chunk:
current_chunk += "\n\n" + line
current_length += line_length + 2
else:
current_chunk = line
current_length = line_length
# Ajouter le dernier chunk s'il reste du contenu
if current_length > len(main_title):
chunks.append(current_chunk)
return chunks
def _hash_content(self, content: str) -> str:
"""Crée un hash du contenu pour identifier les doublons."""
# Utiliser seulement le contenu principal (pas les URLs) pour la détection de doublons
return hashlib.md5(content.encode()).hexdigest()
|