File size: 11,579 Bytes
50705f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import hashlib
from typing import Dict, List, Set, Tuple
from urllib.parse import urljoin, urlparse, urlunparse

import requests
from bs4 import BeautifulSoup, NavigableString, Tag


class ContentCrawler:
    def __init__(
        self, base_url: str, ignore_prefixes: List[str] = None, max_length: int = 8000
    ):
        """
        Initialize the crawler with the base URL, a list of URL prefixes to ignore, and the maximum chunk size.

        Args:
            base_url: The website URL to crawl.
            ignore_prefixes: List of URL path prefixes to ignore.
            max_length: Maximum allowed size for a chunk.
        """
        self.base_url = base_url
        self.visited = set()
        self.results = []
        self.max_length = max_length
        self.ignore_prefixes = ignore_prefixes or [
            "manage/",
            "password/",
            "media/",
            "notes-de-mises-a-jour/",
        ]
        # Pour éviter les doublons de contenu
        self.content_hashes = set()

    def crawl(self) -> List[Dict[str, str]]:
        """
        Recursively crawl the website starting from the homepage.

        Returns:
            A list of dictionaries with keys 'url' and 'text' (in markdown format).
        """
        try:
            homepage_response = requests.get(self.base_url)
            homepage_response.raise_for_status()
        except Exception as e:
            print(f"Error fetching homepage {self.base_url}: {e}")
            return []

        homepage_soup = BeautifulSoup(homepage_response.text, "html.parser")
        initial_links = self._get_internal_links(homepage_soup)

        # Utiliser un ensemble pour éviter les doublons d'URLs dans la file
        queue = set()
        for link in initial_links:
            full_url = self._normalize_url(urljoin(self.base_url, link))
            if full_url != self.base_url:
                queue.add(full_url)
                self.visited.add(full_url)

        # Convertir en liste pour le traitement
        queue_list = list(queue)
        while queue_list:
            current_url = queue_list.pop(0)
            print(f"Processing {current_url}")
            result, new_links = self._parse_page(current_url)

            if result:
                self.results.extend(result)

            # Ajouter seulement les liens non visités
            for link in new_links:
                full_url = self._normalize_url(urljoin(self.base_url, link))
                if full_url not in self.visited and full_url != self.base_url:
                    self.visited.add(full_url)
                    queue_list.append(full_url)

        return self.results

    def _normalize_url(self, url: str) -> str:
        """Normaliser l'URL en supprimant les fragments et paramètres de requête."""
        parsed = urlparse(url)
        # Supprimer fragment et query params
        normalized = parsed._replace(fragment="", query="")
        return urlunparse(normalized)

    def _get_internal_links(self, soup: BeautifulSoup) -> Set[str]:
        """
        Retrieve internal links from the BeautifulSoup object,
        ignoring those whose path starts with any of the specified prefixes.
        """
        links = set()
        for a_tag in soup.find_all("a", href=True):
            href = a_tag["href"]
            if href.startswith("#") or href.startswith("javascript:"):
                continue

            parsed_href = urlparse(href)
            path = parsed_href.path.lstrip("/")

            if any(path.startswith(prefix) for prefix in self.ignore_prefixes):
                continue

            # S'assurer que le lien est interne
            is_internal = (
                not parsed_href.netloc
                or self.base_url in href
                or parsed_href.netloc == urlparse(self.base_url).netloc
            )

            if is_internal:
                links.add(href)

        return links

    def _parse_page(self, url: str) -> Tuple[List[Dict[str, str]], Set[str]]:
        """Parse une page et extrait son contenu ainsi que ses liens."""
        try:
            response = requests.get(url)
            response.raise_for_status()
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return [], set()

        soup = BeautifulSoup(response.text, "html.parser")

        # Trouver la div principale de contenu
        content_div = soup.find(id="content")
        if not content_div:
            print(f"No content div found in {url}")
            return [], self._get_internal_links(soup)

        # Nettoyer le contenu
        for script in content_div.find_all(["script", "style"]):
            script.decompose()

        # Récupérer le titre
        h1_tag = content_div.find("h1")
        page_title = h1_tag.get_text(strip=True) if h1_tag else ""

        # Créer le markdown complet
        markdown_content = self._extract_structured_content(content_div, page_title)

        # Vérifier si le contenu est un doublon
        content_hash = self._hash_content(markdown_content)
        if content_hash in self.content_hashes:
            print(f"Duplicate content skipped for {url}")
            return [], self._get_internal_links(soup)

        self.content_hashes.add(content_hash)

        # Diviser en chunks si nécessaire
        chunks = self._split_content(markdown_content)

        # Créer la liste des résultats
        results = []
        for i, chunk in enumerate(chunks):
            results.append({"url": f"{url}#chunk-{i+1}", "text": chunk})

        return results, self._get_internal_links(soup)

    def _extract_structured_content(self, content_div: Tag, page_title: str) -> str:
        """Extrait le contenu de manière structurée en respectant la hiérarchie de titres."""
        lines = []

        # Ajouter le titre principal
        if page_title:
            lines.append(f"# {page_title}")

        # Identifier tous les titres et le contenu
        current_element = content_div.find_next()

        while current_element and current_element.parent == content_div:
            if current_element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
                # Convertir le niveau de titre
                level = int(current_element.name[1])
                text = current_element.get_text(strip=True)
                lines.append(f"{'#' * level} {text}")
            else:
                markdown = self._convert_element_to_markdown(current_element)
                if markdown:
                    lines.append(markdown)

            # Passer à l'élément suivant au même niveau
            current_element = current_element.find_next_sibling()

        return "\n\n".join(line for line in lines if line)

    def _convert_element_to_markdown(self, element) -> str:
        """Convertit un élément HTML en markdown."""
        if isinstance(element, NavigableString):
            text = element.strip()
            return text if text else ""

        if isinstance(element, Tag):
            if element.name in ["script", "style", "iframe"]:
                return ""

            if element.name == "p":
                return element.get_text(strip=True)

            elif element.name == "a" and element.get("href"):
                text = element.get_text(strip=True)
                href = element.get("href")
                return f"[{text}]({href})"

            elif element.name in ["ul", "ol"]:
                items = []
                for li in element.find_all("li", recursive=False):
                    text = li.get_text(strip=True)
                    if text:
                        items.append(f"* {text}")
                return "\n".join(items)

            elif element.name == "table":
                # Extraction basique des tableaux
                rows = []
                for tr in element.find_all("tr"):
                    cols = []
                    for td in tr.find_all(["td", "th"]):
                        cols.append(td.get_text(strip=True))
                    rows.append(" | ".join(cols))

                if rows:
                    # Ajouter la ligne de séparation après l'en-tête
                    if len(rows) > 1:
                        rows.insert(1, "-" * len(rows[0]))
                    return "\n".join(rows)
                return ""

            elif element.name in ["div", "section", "article"]:
                parts = []
                for child in element.children:
                    part = self._convert_element_to_markdown(child)
                    if part:
                        parts.append(part)
                return "\n\n".join(parts)

            else:
                text = element.get_text(strip=True)
                return text if text else ""

        return ""

    def _split_content(self, content: str) -> List[str]:
        """Divise le contenu en chunks de taille maximale."""
        if len(content) <= self.max_length:
            return [content]

        # Extraction du titre principal pour le préserver dans chaque chunk
        lines = content.split("\n\n")
        main_title = lines[0] if lines and lines[0].startswith("# ") else ""

        chunks = []
        current_chunk = main_title
        current_length = len(main_title)

        for line in lines:
            # Ignorer le titre principal déjà traité
            if line == main_title:
                continue

            line_length = len(line)

            # Si la ligne seule dépasse la taille max, on doit la diviser
            if line_length > self.max_length:
                # D'abord ajouter le chunk courant s'il y a du contenu
                if current_length > len(main_title):
                    chunks.append(current_chunk)

                # Diviser cette longue ligne en sous-parties
                start = 0
                while start < line_length:
                    part = line[start : start + self.max_length]
                    if main_title and not part.startswith("#"):
                        chunks.append(f"{main_title}\n\n{part}")
                    else:
                        chunks.append(part)
                    start += self.max_length

                # Réinitialiser le chunk courant
                current_chunk = main_title
                current_length = len(main_title)
            else:
                # Si l'ajout de cette ligne dépasse la taille max, créer un nouveau chunk
                if current_length + line_length + 4 > self.max_length:  # +4 pour \n\n
                    chunks.append(current_chunk)
                    current_chunk = main_title
                    current_length = len(main_title)

                    if main_title and current_chunk:
                        current_chunk += "\n\n"
                        current_length += 2

                # Ajouter la ligne au chunk courant
                if current_chunk:
                    current_chunk += "\n\n" + line
                    current_length += line_length + 2
                else:
                    current_chunk = line
                    current_length = line_length

        # Ajouter le dernier chunk s'il reste du contenu
        if current_length > len(main_title):
            chunks.append(current_chunk)

        return chunks

    def _hash_content(self, content: str) -> str:
        """Crée un hash du contenu pour identifier les doublons."""
        # Utiliser seulement le contenu principal (pas les URLs) pour la détection de doublons
        return hashlib.md5(content.encode()).hexdigest()