SEO / seo_analyzer.py
Merlintxu's picture
Create seo_analyzer.py
ede29cb verified
raw
history blame
17.6 kB
import os
import logging
import re
import requests
import hashlib
import PyPDF2
import numpy as np
import pandas as pd
from io import BytesIO
from typing import List, Dict, Any, Tuple
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from pathlib import Path
from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from transformers import pipeline
from sentence_transformers import SentenceTransformer
import torch
import subprocess
import sys
import spacy
import matplotlib.pyplot as plt
from utils import sanitize_filename
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SEOSpaceAnalyzer:
def __init__(self, max_urls: int = 20, max_workers: int = 4) -> None:
"""
Inicializa la sesión HTTP, carga modelos NLP y prepara el directorio de almacenamiento.
Args:
max_urls: Número máximo de URLs a procesar por análisis.
max_workers: Número de hilos para la ejecución concurrente.
"""
self.max_urls = max_urls
self.max_workers = max_workers
self.session = self._configure_session()
self.models = self._load_models()
self.base_dir = Path("content_storage")
self.base_dir.mkdir(parents=True, exist_ok=True)
self.current_analysis: Dict[str, Any] = {}
def _load_models(self) -> Dict[str, Any]:
"""Carga los modelos NLP de Hugging Face y spaCy."""
try:
device = 0 if torch.cuda.is_available() else -1
logger.info("Cargando modelos NLP...")
models = {
'summarizer': pipeline("summarization", model="facebook/bart-large-cnn", device=device),
'ner': pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device),
'semantic': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'),
'spacy': spacy.load("es_core_news_lg")
}
logger.info("Modelos cargados correctamente.")
return models
except Exception as e:
logger.error(f"Error cargando modelos: {e}")
raise
def _configure_session(self) -> requests.Session:
"""Configura una sesión HTTP con reintentos y headers personalizados."""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=['GET', 'HEAD']
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0)',
'Accept-Language': 'es-ES,es;q=0.9'
})
return session
def analyze_sitemap(self, sitemap_url: str) -> Tuple[Dict, List[str], Dict, Dict, List[Dict]]:
"""
Procesa el sitemap: extrae URLs, analiza cada página individualmente y devuelve datos agregados.
Args:
sitemap_url: URL del sitemap XML.
Returns:
Una tupla con 5 elementos:
- Estadísticas generales (dict)
- Recomendaciones SEO (lista de strings)
- Análisis de contenido agregado (dict)
- Análisis de enlaces (dict)
- Detalle individual de cada URL procesada (lista de dicts)
"""
try:
urls = self._parse_sitemap(sitemap_url)
if not urls:
return {"error": "No se pudieron extraer URLs del sitemap"}, [], {}, {}, []
results: List[Dict] = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._process_url, url): url for url in urls[:self.max_urls]}
for future in as_completed(futures):
url = futures[future]
try:
res = future.result()
results.append(res)
logger.info(f"Procesado: {url}")
except Exception as e:
logger.error(f"Error procesando {url}: {e}")
results.append({'url': url, 'status': 'error', 'error': str(e)})
self.current_analysis = {
'stats': self._calculate_stats(results),
'content_analysis': self._analyze_content(results),
'links': self._analyze_links(results),
'recommendations': self._generate_seo_recommendations(results),
'details': results,
'timestamp': datetime.now().isoformat()
}
analysis = self.current_analysis
return analysis['stats'], analysis['recommendations'], analysis['content_analysis'], analysis['links'], analysis['details']
except Exception as e:
logger.error(f"Error en análisis: {e}")
return {"error": str(e)}, [], {}, {}, []
def _process_url(self, url: str) -> Dict:
"""Procesa una URL individual extrayendo contenido, metadatos y enlaces."""
try:
response = self.session.get(url, timeout=15)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
result: Dict[str, Any] = {'url': url, 'status': 'success'}
if 'application/pdf' in content_type:
result.update(self._process_pdf(response.content))
elif 'text/html' in content_type:
result.update(self._process_html(response.text, url))
else:
result.update({'type': 'unknown', 'content': '', 'word_count': 0})
self._save_content(url, response.content)
return result
except requests.exceptions.RequestException as e:
logger.warning(f"Error procesando {url}: {str(e)}")
return {'url': url, 'status': 'error', 'error': str(e)}
except Exception as e:
logger.error(f"Error inesperado en {url}: {str(e)}")
return {'url': url, 'status': 'error', 'error': str(e)}
def _process_html(self, html: str, base_url: str) -> Dict:
"""Extrae y limpia el contenido HTML, metadatos y enlaces de la página."""
soup = BeautifulSoup(html, 'html.parser')
clean_text = self._clean_text(soup.get_text())
return {
'type': 'html',
'content': clean_text,
'word_count': len(clean_text.split()),
'metadata': self._extract_metadata(soup),
'links': self._extract_links(soup, base_url)
}
def _process_pdf(self, content: bytes) -> Dict:
"""Extrae texto de un documento PDF y calcula estadísticas básicas."""
try:
text = ""
with BytesIO(content) as pdf_file:
reader = PyPDF2.PdfReader(pdf_file)
for page in reader.pages:
extracted = page.extract_text()
text += extracted if extracted else ""
clean_text = self._clean_text(text)
return {
'type': 'pdf',
'content': clean_text,
'word_count': len(clean_text.split()),
'page_count': len(reader.pages)
}
except PyPDF2.PdfReadError as e:
logger.error(f"Error leyendo PDF: {e}")
return {'type': 'pdf', 'error': str(e)}
def _clean_text(self, text: str) -> str:
"""Limpia y normaliza el texto removiendo espacios y caracteres especiales."""
if not text:
return ""
text = re.sub(r'\s+', ' ', text)
return re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', ' ', text).strip()
def _extract_metadata(self, soup: BeautifulSoup) -> Dict:
"""Extrae metadatos relevantes (título, descripción, keywords, Open Graph) de la página."""
metadata = {'title': '', 'description': '', 'keywords': [], 'og': {}}
if soup.title and soup.title.string:
metadata['title'] = soup.title.string.strip()[:200]
for meta in soup.find_all('meta'):
name = meta.get('name', '').lower()
prop = meta.get('property', '').lower()
content = meta.get('content', '')
if name == 'description':
metadata['description'] = content[:300]
elif name == 'keywords':
metadata['keywords'] = [kw.strip() for kw in content.split(',') if kw.strip()]
elif prop.startswith('og:'):
metadata['og'][prop[3:]] = content
return metadata
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]:
"""Extrae enlaces de la página, distinguiendo entre internos y externos."""
links: List[Dict] = []
base_netloc = urlparse(base_url).netloc
for tag in soup.find_all('a', href=True):
try:
href = tag['href'].strip()
if not href or href.startswith('javascript:'):
continue
full_url = urljoin(base_url, href)
parsed = urlparse(full_url)
links.append({
'url': full_url,
'type': 'internal' if parsed.netloc == base_netloc else 'external',
'anchor': self._clean_text(tag.get_text())[:100],
'file_type': self._get_file_type(parsed.path)
})
except Exception as e:
logger.warning(f"Error procesando enlace {tag.get('href')}: {e}")
continue
return links
def _get_file_type(self, path: str) -> str:
"""Determina el tipo de archivo según la extensión."""
ext = Path(path).suffix.lower()
return ext[1:] if ext else 'html'
def _parse_sitemap(self, sitemap_url: str) -> List[str]:
"""Parsea un sitemap XML (y posibles índices de sitemaps) para extraer URLs."""
try:
response = self.session.get(sitemap_url, timeout=10)
response.raise_for_status()
if 'xml' not in response.headers.get('Content-Type', ''):
logger.warning(f"El sitemap no parece ser XML: {sitemap_url}")
return []
soup = BeautifulSoup(response.text, 'lxml-xml')
urls: List[str] = []
if soup.find('sitemapindex'):
for sitemap in soup.find_all('loc'):
url = sitemap.text.strip()
if url.endswith('.xml'):
urls.extend(self._parse_sitemap(url))
else:
urls = [loc.text.strip() for loc in soup.find_all('loc')]
filtered_urls = list({url for url in urls if url.startswith('http')})
return filtered_urls
except Exception as e:
logger.error(f"Error al parsear sitemap {sitemap_url}: {e}")
return []
def _save_content(self, url: str, content: bytes) -> None:
"""
Guarda el contenido descargado en una estructura de directorios organizada por dominio,
sanitizando el nombre del archivo y evitando sobrescribir archivos idénticos mediante hash.
"""
try:
parsed = urlparse(url)
domain_dir = self.base_dir / parsed.netloc
path = parsed.path.lstrip('/')
if not path or path.endswith('/'):
path = os.path.join(path, 'index.html')
safe_path = sanitize_filename(path)
save_path = domain_dir / safe_path
save_path.parent.mkdir(parents=True, exist_ok=True)
new_hash = hashlib.md5(content).hexdigest()
if save_path.exists():
with open(save_path, 'rb') as f:
existing_content = f.read()
existing_hash = hashlib.md5(existing_content).hexdigest()
if new_hash == existing_hash:
logger.debug(f"El contenido de {url} ya está guardado.")
return
with open(save_path, 'wb') as f:
f.write(content)
logger.info(f"Guardado contenido en: {save_path}")
except Exception as e:
logger.error(f"Error guardando contenido para {url}: {e}")
def _calculate_stats(self, results: List[Dict]) -> Dict:
"""Calcula estadísticas generales del análisis."""
successful = [r for r in results if r.get('status') == 'success']
content_types = [r.get('type', 'unknown') for r in successful]
avg_word_count = round(np.mean([r.get('word_count', 0) for r in successful]) if successful else 0, 1)
return {
'total_urls': len(results),
'successful': len(successful),
'failed': len(results) - len(successful),
'content_types': pd.Series(content_types).value_counts().to_dict(),
'avg_word_count': avg_word_count,
'failed_urls': [r['url'] for r in results if r.get('status') != 'success']
}
def _analyze_content(self, results: List[Dict]) -> Dict:
"""Genera un análisis de contenido agregado usando TF-IDF para extraer las palabras clave principales y muestras."""
successful = [r for r in results if r.get('status') == 'success' and r.get('content')]
texts = [r['content'] for r in successful if len(r['content'].split()) > 10]
if not texts:
return {'top_keywords': [], 'content_samples': []}
try:
stop_words = list(self.models['spacy'].Defaults.stop_words)
vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=50, ngram_range=(1, 2))
tfidf = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
sorted_indices = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[-10:]
top_keywords = feature_names[sorted_indices][::-1].tolist()
except Exception as e:
logger.error(f"Error en análisis TF-IDF: {e}")
top_keywords = []
samples = [{'url': r['url'], 'sample': (r['content'][:500] + '...') if len(r['content']) > 500 else r['content']} for r in successful[:3]]
return {'top_keywords': top_keywords, 'content_samples': samples}
def _analyze_links(self, results: List[Dict]) -> Dict:
"""Genera un análisis de enlaces internos, dominios externos, anclas y tipos de archivos."""
all_links = []
for result in results:
if result.get('links'):
all_links.extend(result['links'])
if not all_links:
return {'internal_links': {}, 'external_domains': {}, 'common_anchors': {}, 'file_types': {}}
df = pd.DataFrame(all_links)
return {
'internal_links': df[df['type'] == 'internal']['url'].value_counts().head(20).to_dict(),
'external_domains': df[df['type'] == 'external']['url'].apply(lambda x: urlparse(x).netloc).value_counts().head(10).to_dict(),
'common_anchors': df['anchor'].value_counts().head(10).to_dict(),
'file_types': df['file_type'].value_counts().to_dict()
}
def _generate_seo_recommendations(self, results: List[Dict]) -> List[str]:
"""Genera recomendaciones SEO en base a las deficiencias encontradas en el análisis."""
successful = [r for r in results if r.get('status') == 'success']
if not successful:
return ["No se pudo analizar ningún contenido exitosamente"]
recs = []
missing_titles = sum(1 for r in successful if not r.get('metadata', {}).get('title'))
if missing_titles:
recs.append(f"📌 Añadir títulos a {missing_titles} páginas")
short_descriptions = sum(1 for r in successful if not r.get('metadata', {}).get('description'))
if short_descriptions:
recs.append(f"📌 Añadir meta descripciones a {short_descriptions} páginas")
short_content = sum(1 for r in successful if r.get('word_count', 0) < 300)
if short_content:
recs.append(f"📝 Ampliar contenido en {short_content} páginas (menos de 300 palabras)")
all_links = [link for r in results for link in r.get('links', [])]
if all_links:
df_links = pd.DataFrame(all_links)
internal_links = df_links[df_links['type'] == 'internal']
if len(internal_links) > 100:
recs.append(f"🔗 Optimizar estructura de enlaces internos ({len(internal_links)} enlaces)")
return recs if recs else ["✅ No se detectaron problemas críticos de SEO"]
def plot_internal_links(self, links_data: Dict) -> Any:
"""Genera un gráfico de barras horizontales mostrando los 20 principales enlaces internos."""
internal_links = links_data.get('internal_links', {})
if not internal_links:
return {}
fig, ax = plt.subplots()
names = list(internal_links.keys())
counts = list(internal_links.values())
ax.barh(names, counts)
ax.set_xlabel("Cantidad de enlaces")
ax.set_title("Top 20 Enlaces Internos")
plt.tight_layout()
return fig