|
import os |
|
import json |
|
import logging |
|
import re |
|
import requests |
|
import hashlib |
|
from urllib.parse import urlparse, urljoin |
|
from typing import List, Dict, Optional, Tuple |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from bs4 import BeautifulSoup |
|
import PyPDF2 |
|
from io import BytesIO |
|
from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering |
|
import numpy as np |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sentence_transformers import SentenceTransformer |
|
import spacy |
|
import gradio as gr |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
class AdvancedSEOAanalyzer: |
|
def __init__(self, sitemap_url: str): |
|
self.sitemap_url = sitemap_url |
|
self.session = self._configure_session() |
|
self.models = self._load_models() |
|
self.processed_urls = set() |
|
self.link_graph = defaultdict(list) |
|
self.content_store = {} |
|
self.documents = [] |
|
|
|
def _configure_session(self) -> requests.Session: |
|
session = requests.Session() |
|
retry = Retry( |
|
total=5, |
|
backoff_factor=1, |
|
status_forcelist=[500, 502, 503, 504] |
|
) |
|
adapter = HTTPAdapter(max_retries=retry) |
|
session.mount('https://', adapter) |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0; +https://seo.example.com/bot)' |
|
}) |
|
return session |
|
|
|
def _load_models(self) -> Dict: |
|
return { |
|
'summarization': pipeline("summarization", |
|
model="facebook/bart-large-cnn", |
|
device=0 if torch.cuda.is_available() else -1), |
|
'qa': pipeline("question-answering", |
|
model="deepset/roberta-base-squad2", |
|
tokenizer="deepset/roberta-base-squad2"), |
|
'ner': pipeline("ner", |
|
model="dslim/bert-base-NER", |
|
aggregation_strategy="simple"), |
|
'semantic': SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2'), |
|
'spacy': spacy.load("en_core_web_lg") |
|
} |
|
|
|
async def download_content(self, url: str) -> Optional[Dict]: |
|
if url in self.processed_urls: |
|
return None |
|
|
|
try: |
|
response = self.session.get(url, timeout=15) |
|
response.raise_for_status() |
|
content_type = response.headers.get('Content-Type', '') |
|
|
|
if 'application/pdf' in content_type: |
|
return self._process_pdf(url, response.content) |
|
elif 'text/html' in content_type: |
|
return await self._process_html(url, response.text) |
|
else: |
|
logger.warning(f"Unsupported content type: {content_type}") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"Error downloading {url}: {str(e)}") |
|
return None |
|
|
|
def _process_pdf(self, url: str, content: bytes) -> Dict: |
|
text = "" |
|
with BytesIO(content) as pdf_file: |
|
reader = PyPDF2.PdfReader(pdf_file) |
|
for page in reader.pages: |
|
text += page.extract_text() |
|
|
|
doc_hash = hashlib.sha256(content).hexdigest() |
|
self._save_document(url, content, 'pdf') |
|
|
|
return { |
|
'url': url, |
|
'type': 'pdf', |
|
'content': text, |
|
'hash': doc_hash, |
|
'links': [] |
|
} |
|
|
|
async def _process_html(self, url: str, html: str) -> Dict: |
|
soup = BeautifulSoup(html, 'lxml') |
|
main_content = self._extract_main_content(soup) |
|
links = self._extract_links(url, soup) |
|
|
|
self._save_document(url, html.encode('utf-8'), 'html') |
|
|
|
return { |
|
'url': url, |
|
'type': 'html', |
|
'content': main_content, |
|
'hash': hashlib.sha256(html.encode()).hexdigest(), |
|
'links': links, |
|
'metadata': self._extract_metadata(soup) |
|
} |
|
|
|
def _extract_links(self, base_url: str, soup: BeautifulSoup) -> List[Dict]: |
|
links = [] |
|
base_domain = urlparse(base_url).netloc |
|
|
|
for tag in soup.find_all(['a', 'link'], href=True): |
|
href = tag['href'] |
|
full_url = urljoin(base_url, href) |
|
parsed = urlparse(full_url) |
|
|
|
link_type = 'internal' if parsed.netloc == base_domain else 'external' |
|
file_type = 'other' |
|
|
|
if parsed.path.lower().endswith(('.pdf', '.doc', '.docx')): |
|
file_type = 'document' |
|
elif parsed.path.lower().endswith(('.jpg', '.png', '.gif')): |
|
file_type = 'image' |
|
|
|
links.append({ |
|
'url': full_url, |
|
'type': link_type, |
|
'file_type': file_type, |
|
'anchor': tag.text.strip() |
|
}) |
|
|
|
return links |
|
|
|
def _extract_metadata(self, soup: BeautifulSoup) -> Dict: |
|
metadata = { |
|
'title': soup.title.string if soup.title else '', |
|
'description': '', |
|
'keywords': [], |
|
'open_graph': {} |
|
} |
|
|
|
meta_tags = soup.find_all('meta') |
|
for tag in meta_tags: |
|
name = tag.get('name', '').lower() |
|
property = tag.get('property', '').lower() |
|
content = tag.get('content', '') |
|
|
|
if name == 'description': |
|
metadata['description'] = content |
|
elif name == 'keywords': |
|
metadata['keywords'] = [kw.strip() for kw in content.split(',')] |
|
elif property.startswith('og:'): |
|
key = property[3:] |
|
metadata['open_graph'][key] = content |
|
|
|
return metadata |
|
|
|
def analyze_content(self, content: Dict) -> Dict: |
|
analysis = { |
|
'summary': self.models['summarization'](content['content'], |
|
max_length=150, |
|
min_length=30)[0]['summary_text'], |
|
'entities': self.models['ner'](content['content']), |
|
'semantic_embedding': self.models['semantic'].encode(content['content']), |
|
'seo_analysis': self._perform_seo_analysis(content) |
|
} |
|
|
|
if content['type'] == 'pdf': |
|
analysis['document_analysis'] = self._analyze_document_structure(content) |
|
|
|
return analysis |
|
|
|
def _perform_seo_analysis(self, content: Dict) -> Dict: |
|
text = content['content'] |
|
doc = self.models['spacy'](text) |
|
|
|
return { |
|
'readability_score': self._calculate_readability(text), |
|
'keyword_density': self._calculate_keyword_density(text), |
|
'heading_structure': self._analyze_headings(doc), |
|
'content_length': len(text.split()), |
|
'semantic_topics': self._extract_semantic_topics(text) |
|
} |
|
|
|
def _extract_semantic_topics(self, text: str) -> List[str]: |
|
vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1,2)) |
|
tfidf = vectorizer.fit_transform([text]) |
|
feature_array = np.array(vectorizer.get_feature_names_out()) |
|
tfidf_sorting = np.argsort(tfidf.toarray()).flatten()[::-1] |
|
|
|
return feature_array[tfidf_sorting][:5].tolist() |
|
|
|
def run_analysis(self, max_workers: int = 4) -> Dict: |
|
sitemap_urls = self._parse_sitemap() |
|
results = [] |
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
futures = [executor.submit(self.download_content, url) |
|
for url in sitemap_urls] |
|
|
|
for future in as_completed(futures): |
|
result = future.result() |
|
if result: |
|
analyzed = self.analyze_content(result) |
|
results.append({**result, **analyzed}) |
|
self._update_link_graph(result) |
|
|
|
self._save_full_analysis(results) |
|
return { |
|
'total_pages': len(results), |
|
'document_types': self._count_document_types(results), |
|
'link_analysis': self._analyze_link_graph(), |
|
'content_analysis': self._aggregate_content_stats(results) |
|
} |
|
|
|
def _save_document(self, url: str, content: bytes, file_type: str) -> None: |
|
parsed = urlparse(url) |
|
path = parsed.path.lstrip('/') |
|
filename = f"documents/{parsed.netloc}/{path}" if path else f"documents/{parsed.netloc}/index" |
|
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
with open(filename + f'.{file_type}', 'wb') as f: |
|
f.write(content) |
|
|
|
def launch_interface(self): |
|
interface = gr.Interface( |
|
fn=self.run_analysis, |
|
inputs=gr.Textbox(label="Sitemap URL"), |
|
outputs=[ |
|
gr.JSON(label="Analysis Results"), |
|
gr.File(label="Download Data") |
|
], |
|
title="Advanced SEO Analyzer", |
|
description="Analyze websites with AI-powered SEO insights" |
|
) |
|
interface.launch() |
|
|
|
if __name__ == "__main__": |
|
analyzer = AdvancedSEOAanalyzer("https://www.example.com/sitemap.xml") |
|
analyzer.launch_interface() |