SEO / seo_analyzer.py
Merlintxu's picture
Update seo_analyzer.py
a3047a6 verified
raw
history blame
16.5 kB
import os
import logging
import re
import requests
import hashlib
import PyPDF2
import numpy as np
import pandas as pd
from io import BytesIO
from typing import List, Dict, Any, Tuple
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from pathlib import Path
from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
import torch
import spacy
import matplotlib.pyplot as plt
from utils import sanitize_filename
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SEOSpaceAnalyzer:
def __init__(self, max_urls: int = 20, max_workers: int = 4) -> None:
self.max_urls = max_urls
self.max_workers = max_workers
self.session = self._configure_session()
self.models = self._load_models()
self.base_dir = Path("content_storage")
self.base_dir.mkdir(parents=True, exist_ok=True)
self.current_analysis: Dict[str, Any] = {}
def _load_models(self) -> Dict[str, Any]:
try:
device = 0 if torch.cuda.is_available() else -1
logger.info("Cargando modelos NLP...")
models = {
'summarizer': pipeline("summarization", model="facebook/bart-large-cnn", device=device),
'ner': pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device),
'semantic': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'),
'spacy': spacy.load("es_core_news_lg")
}
logger.info("Modelos cargados correctamente.")
return models
except Exception as e:
logger.error(f"Error cargando modelos: {e}")
raise
def _configure_session(self) -> requests.Session:
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=['GET', 'HEAD']
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0)',
'Accept-Language': 'es-ES,es;q=0.9'
})
return session
def analyze_sitemap(self, sitemap_url: str) -> Tuple[Dict, List[str], Dict, Dict, List[Dict], Dict, Dict]:
try:
urls = self._parse_sitemap(sitemap_url)
if not urls:
return {"error": "No se pudieron extraer URLs del sitemap"}, [], {}, {}, [], {}, {}
results: List[Dict] = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._process_url, url): url for url in urls[:self.max_urls]}
for future in as_completed(futures):
url = futures[future]
try:
res = future.result()
results.append(res)
logger.info(f"Procesado: {url}")
except Exception as e:
logger.error(f"Error procesando {url}: {e}")
results.append({'url': url, 'status': 'error', 'error': str(e)})
summaries, entities = self._apply_nlp(results)
similarities = self._compute_semantic_similarity(results)
self.current_analysis = {
'stats': self._calculate_stats(results),
'content_analysis': self._analyze_content(results),
'links': self._analyze_links(results),
'recommendations': self._generate_seo_recommendations(results),
'details': results,
'summaries': summaries,
'entities': entities,
'similarities': similarities,
'timestamp': datetime.now().isoformat()
}
a = self.current_analysis
return a['stats'], a['recommendations'], a['content_analysis'], a['links'], a['details'], a['summaries'], a['similarities']
except Exception as e:
logger.error(f"Error en análisis: {e}")
return {"error": str(e)}, [], {}, {}, [], {}, {}
def _process_url(self, url: str) -> Dict:
try:
response = self.session.get(url, timeout=15)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
result: Dict[str, Any] = {'url': url, 'status': 'success'}
if 'application/pdf' in content_type:
result.update(self._process_pdf(response.content))
elif 'text/html' in content_type:
result.update(self._process_html(response.text, url))
else:
result.update({'type': 'unknown', 'content': '', 'word_count': 0})
self._save_content(url, response.content)
return result
except requests.exceptions.Timeout as e:
return {'url': url, 'status': 'error', 'error': "Timeout"}
except requests.exceptions.HTTPError as e:
return {'url': url, 'status': 'error', 'error': "HTTP Error"}
except Exception as e:
return {'url': url, 'status': 'error', 'error': str(e)}
def _process_html(self, html: str, base_url: str) -> Dict:
soup = BeautifulSoup(html, 'html.parser')
clean_text = self._clean_text(soup.get_text())
return {
'type': 'html',
'content': clean_text,
'word_count': len(clean_text.split()),
'metadata': self._extract_metadata(soup),
'links': self._extract_links(soup, base_url)
}
def _process_pdf(self, content: bytes) -> Dict:
try:
text = ""
with BytesIO(content) as pdf_file:
reader = PyPDF2.PdfReader(pdf_file)
for page in reader.pages:
extracted = page.extract_text()
text += extracted if extracted else ""
clean_text = self._clean_text(text)
return {
'type': 'pdf',
'content': clean_text,
'word_count': len(clean_text.split()),
'page_count': len(reader.pages)
}
except Exception as e:
return {'type': 'pdf', 'error': str(e)}
def _clean_text(self, text: str) -> str:
if not text:
return ""
text = re.sub(r'\s+', ' ', text)
return re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', ' ', text).strip()
def _extract_metadata(self, soup: BeautifulSoup) -> Dict:
metadata = {'title': '', 'description': '', 'keywords': [], 'og': {}}
if soup.title and soup.title.string:
metadata['title'] = soup.title.string.strip()[:200]
for meta in soup.find_all('meta'):
name = meta.get('name', '').lower()
prop = meta.get('property', '').lower()
content = meta.get('content', '')
if name == 'description':
metadata['description'] = content[:300]
elif name == 'keywords':
metadata['keywords'] = [kw.strip() for kw in content.split(',') if kw.strip()]
elif prop.startswith('og:'):
metadata['og'][prop[3:]] = content
return metadata
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]:
links: List[Dict] = []
base_netloc = urlparse(base_url).netloc
for tag in soup.find_all('a', href=True):
try:
href = tag['href'].strip()
if not href or href.startswith('javascript:'):
continue
full_url = urljoin(base_url, href)
parsed = urlparse(full_url)
links.append({
'url': full_url,
'type': 'internal' if parsed.netloc == base_netloc else 'external',
'anchor': self._clean_text(tag.get_text())[:100],
'file_type': self._get_file_type(parsed.path)
})
except:
continue
return links
def _get_file_type(self, path: str) -> str:
ext = Path(path).suffix.lower()
return ext[1:] if ext else 'html'
def _parse_sitemap(self, sitemap_url: str) -> List[str]:
try:
response = self.session.get(sitemap_url, timeout=10)
response.raise_for_status()
if 'xml' not in response.headers.get('Content-Type', ''):
return []
soup = BeautifulSoup(response.text, 'lxml-xml')
urls: List[str] = []
if soup.find('sitemapindex'):
for sitemap in soup.find_all('loc'):
url = sitemap.text.strip()
if url.endswith('.xml'):
urls.extend(self._parse_sitemap(url))
else:
urls = [loc.text.strip() for loc in soup.find_all('loc')]
return list({url for url in urls if url.startswith('http')})
except:
return []
def _save_content(self, url: str, content: bytes) -> None:
try:
parsed = urlparse(url)
domain_dir = self.base_dir / parsed.netloc
raw_path = parsed.path.lstrip('/')
if not raw_path or raw_path.endswith('/'):
raw_path = os.path.join(raw_path, 'index.html') if raw_path else 'index.html'
safe_path = sanitize_filename(raw_path)
save_path = domain_dir / safe_path
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, 'wb') as f:
f.write(content)
except:
pass
def _apply_nlp(self, results: List[Dict]) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
summaries = {}
entities = {}
for r in results:
if r.get('status') != 'success' or not r.get('content'):
continue
content = r['content']
if len(content.split()) > 300:
try:
summary = self.models['summarizer'](content[:1024], max_length=100, min_length=30, do_sample=False)[0]['summary_text']
summaries[r['url']] = summary
except:
pass
try:
ents = self.models['ner'](content[:1000])
entities[r['url']] = list(set([e['word'] for e in ents if e['entity_group'] in ['PER', 'ORG', 'LOC']]))
except:
pass
return summaries, entities
def _compute_semantic_similarity(self, results: List[Dict]) -> Dict[str, List[Dict]]:
contents = [(r['url'], r['content']) for r in results if r.get('status') == 'success' and r.get('content')]
if len(contents) < 2:
return {}
try:
urls, texts = zip(*contents)
embeddings = self.models['semantic'].encode(texts, convert_to_tensor=True)
sim_matrix = util.pytorch_cos_sim(embeddings, embeddings)
similarity_dict = {}
for i, url in enumerate(urls):
scores = list(sim_matrix[i])
top_indices = sorted(range(len(scores)), key=lambda j: scores[j], reverse=True)
top_similar = [
{"url": urls[j], "score": float(scores[j])}
for j in top_indices if j != i and float(scores[j]) > 0.5
][:3]
similarity_dict[url] = top_similar
return similarity_dict
except:
return {}
def _calculate_stats(self, results: List[Dict]) -> Dict:
successful = [r for r in results if r.get('status') == 'success']
content_types = [r.get('type', 'unknown') for r in successful]
avg_word_count = round(np.mean([r.get('word_count', 0) for r in successful]) if successful else 0, 1)
return {
'total_urls': len(results),
'successful': len(successful),
'failed': len(results) - len(successful),
'content_types': pd.Series(content_types).value_counts().to_dict(),
'avg_word_count': avg_word_count,
'failed_urls': [r['url'] for r in results if r.get('status') != 'success']
}
def _analyze_content(self, results: List[Dict]) -> Dict:
successful = [r for r in results if r.get('status') == 'success' and r.get('content')]
texts = [r['content'] for r in successful if len(r['content'].split()) > 10]
if not texts:
return {'top_keywords': [], 'content_samples': []}
try:
stop_words = list(self.models['spacy'].Defaults.stop_words)
vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=50, ngram_range=(1, 2))
tfidf = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
sorted_indices = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[-10:]
top_keywords = feature_names[sorted_indices][::-1].tolist()
except:
top_keywords = []
samples = [{'url': r['url'], 'sample': r['content'][:500] + '...' if len(r['content']) > 500 else r['content']} for r in successful[:3]]
return {'top_keywords': top_keywords, 'content_samples': samples}
def _analyze_links(self, results: List[Dict]) -> Dict:
all_links = []
for result in results:
if result.get('links'):
all_links.extend(result['links'])
if not all_links:
return {'internal_links': {}, 'external_domains': {}, 'common_anchors': {}, 'file_types': {}}
df = pd.DataFrame(all_links)
return {
'internal_links': df[df['type'] == 'internal']['url'].value_counts().head(20).to_dict(),
'external_domains': df[df['type'] == 'external']['url'].apply(lambda x: urlparse(x).netloc).value_counts().head(10).to_dict(),
'common_anchors': df['anchor'].value_counts().head(10).to_dict(),
'file_types': df['file_type'].value_counts().to_dict()
}
def _generate_seo_recommendations(self, results: List[Dict]) -> List[str]:
successful = [r for r in results if r.get('status') == 'success']
if not successful:
return ["No se pudo analizar ningún contenido exitosamente"]
recs = []
missing_titles = sum(1 for r in successful if not r.get('metadata', {}).get('title'))
if missing_titles:
recs.append(f"📌 Añadir títulos a {missing_titles} páginas")
short_descriptions = sum(1 for r in successful if not r.get('metadata', {}).get('description'))
if short_descriptions:
recs.append(f"📌 Añadir meta descripciones a {short_descriptions} páginas")
short_content = sum(1 for r in successful if r.get('word_count', 0) < 300)
if short_content:
recs.append(f"📝 Ampliar contenido en {short_content} páginas (menos de 300 palabras)")
all_links = [link for r in results for link in r.get('links', [])]
if all_links:
df_links = pd.DataFrame(all_links)
internal_links = df_links[df_links['type'] == 'internal']
if len(internal_links) > 100:
recs.append(f"🔗 Optimizar estructura de enlaces internos ({len(internal_links)} enlaces)")
return recs if recs else ["✅ No se detectaron problemas críticos de SEO"]
def plot_internal_links(self, links_data: Dict) -> Any:
internal_links = links_data.get('internal_links', {})
fig, ax = plt.subplots()
if not internal_links:
ax.text(0.5, 0.5, 'No hay enlaces internos', ha='center', va='center', transform=ax.transAxes)
ax.axis('off')
else:
names = list(internal_links.keys())
counts = list(internal_links.values())
ax.barh(names, counts)
ax.set_xlabel("Cantidad de enlaces")
ax.set_title("Top 20 Enlaces Internos")
plt.tight_layout()
return fig