|
import os |
|
import logging |
|
import re |
|
import requests |
|
import hashlib |
|
import PyPDF2 |
|
import numpy as np |
|
import pandas as pd |
|
from io import BytesIO |
|
from typing import List, Dict, Any, Tuple |
|
from urllib.parse import urlparse, urljoin |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from bs4 import BeautifulSoup |
|
from pathlib import Path |
|
from datetime import datetime |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
import torch |
|
import spacy |
|
import matplotlib.pyplot as plt |
|
|
|
from utils import sanitize_filename |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class SEOSpaceAnalyzer: |
|
def __init__(self, max_urls: int = 20, max_workers: int = 4) -> None: |
|
self.max_urls = max_urls |
|
self.max_workers = max_workers |
|
self.session = self._configure_session() |
|
self.models = self._load_models() |
|
self.base_dir = Path("content_storage") |
|
self.base_dir.mkdir(parents=True, exist_ok=True) |
|
self.current_analysis: Dict[str, Any] = {} |
|
|
|
def _load_models(self) -> Dict[str, Any]: |
|
try: |
|
device = 0 if torch.cuda.is_available() else -1 |
|
logger.info("Cargando modelos NLP...") |
|
models = { |
|
'summarizer': pipeline("summarization", model="facebook/bart-large-cnn", device=device), |
|
'ner': pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device), |
|
'semantic': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'), |
|
'spacy': spacy.load("es_core_news_lg") |
|
} |
|
logger.info("Modelos cargados correctamente.") |
|
return models |
|
except Exception as e: |
|
logger.error(f"Error cargando modelos: {e}") |
|
raise |
|
|
|
def _configure_session(self) -> requests.Session: |
|
session = requests.Session() |
|
retry = Retry( |
|
total=3, |
|
backoff_factor=1, |
|
status_forcelist=[500, 502, 503, 504], |
|
allowed_methods=['GET', 'HEAD'] |
|
) |
|
adapter = HTTPAdapter(max_retries=retry) |
|
session.mount('http://', adapter) |
|
session.mount('https://', adapter) |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0)', |
|
'Accept-Language': 'es-ES,es;q=0.9' |
|
}) |
|
return session |
|
|
|
def analyze_sitemap(self, sitemap_url: str) -> Tuple[Dict, List[str], Dict, Dict, List[Dict], Dict, Dict]: |
|
try: |
|
urls = self._parse_sitemap(sitemap_url) |
|
if not urls: |
|
return {"error": "No se pudieron extraer URLs del sitemap"}, [], {}, {}, [], {}, {} |
|
|
|
results: List[Dict] = [] |
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
futures = {executor.submit(self._process_url, url): url for url in urls[:self.max_urls]} |
|
for future in as_completed(futures): |
|
url = futures[future] |
|
try: |
|
res = future.result() |
|
results.append(res) |
|
logger.info(f"Procesado: {url}") |
|
except Exception as e: |
|
logger.error(f"Error procesando {url}: {e}") |
|
results.append({'url': url, 'status': 'error', 'error': str(e)}) |
|
|
|
summaries, entities = self._apply_nlp(results) |
|
similarities = self._compute_semantic_similarity(results) |
|
|
|
self.current_analysis = { |
|
'stats': self._calculate_stats(results), |
|
'content_analysis': self._analyze_content(results), |
|
'links': self._analyze_links(results), |
|
'recommendations': self._generate_seo_recommendations(results), |
|
'details': results, |
|
'summaries': summaries, |
|
'entities': entities, |
|
'similarities': similarities, |
|
'timestamp': datetime.now().isoformat() |
|
} |
|
a = self.current_analysis |
|
return a['stats'], a['recommendations'], a['content_analysis'], a['links'], a['details'], a['summaries'], a['similarities'] |
|
except Exception as e: |
|
logger.error(f"Error en análisis: {e}") |
|
return {"error": str(e)}, [], {}, {}, [], {}, {} |
|
|
|
def _process_url(self, url: str) -> Dict: |
|
try: |
|
response = self.session.get(url, timeout=15) |
|
response.raise_for_status() |
|
content_type = response.headers.get('Content-Type', '') |
|
result: Dict[str, Any] = {'url': url, 'status': 'success'} |
|
if 'application/pdf' in content_type: |
|
result.update(self._process_pdf(response.content)) |
|
elif 'text/html' in content_type: |
|
result.update(self._process_html(response.text, url)) |
|
else: |
|
result.update({'type': 'unknown', 'content': '', 'word_count': 0}) |
|
self._save_content(url, response.content) |
|
return result |
|
except requests.exceptions.Timeout as e: |
|
return {'url': url, 'status': 'error', 'error': "Timeout"} |
|
except requests.exceptions.HTTPError as e: |
|
return {'url': url, 'status': 'error', 'error': "HTTP Error"} |
|
except Exception as e: |
|
return {'url': url, 'status': 'error', 'error': str(e)} |
|
|
|
def _process_html(self, html: str, base_url: str) -> Dict: |
|
soup = BeautifulSoup(html, 'html.parser') |
|
clean_text = self._clean_text(soup.get_text()) |
|
return { |
|
'type': 'html', |
|
'content': clean_text, |
|
'word_count': len(clean_text.split()), |
|
'metadata': self._extract_metadata(soup), |
|
'links': self._extract_links(soup, base_url) |
|
} |
|
|
|
def _process_pdf(self, content: bytes) -> Dict: |
|
try: |
|
text = "" |
|
with BytesIO(content) as pdf_file: |
|
reader = PyPDF2.PdfReader(pdf_file) |
|
for page in reader.pages: |
|
extracted = page.extract_text() |
|
text += extracted if extracted else "" |
|
clean_text = self._clean_text(text) |
|
return { |
|
'type': 'pdf', |
|
'content': clean_text, |
|
'word_count': len(clean_text.split()), |
|
'page_count': len(reader.pages) |
|
} |
|
except Exception as e: |
|
return {'type': 'pdf', 'error': str(e)} |
|
|
|
def _clean_text(self, text: str) -> str: |
|
if not text: |
|
return "" |
|
text = re.sub(r'\s+', ' ', text) |
|
return re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', ' ', text).strip() |
|
|
|
def _extract_metadata(self, soup: BeautifulSoup) -> Dict: |
|
metadata = {'title': '', 'description': '', 'keywords': [], 'og': {}} |
|
if soup.title and soup.title.string: |
|
metadata['title'] = soup.title.string.strip()[:200] |
|
for meta in soup.find_all('meta'): |
|
name = meta.get('name', '').lower() |
|
prop = meta.get('property', '').lower() |
|
content = meta.get('content', '') |
|
if name == 'description': |
|
metadata['description'] = content[:300] |
|
elif name == 'keywords': |
|
metadata['keywords'] = [kw.strip() for kw in content.split(',') if kw.strip()] |
|
elif prop.startswith('og:'): |
|
metadata['og'][prop[3:]] = content |
|
return metadata |
|
|
|
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]: |
|
links: List[Dict] = [] |
|
base_netloc = urlparse(base_url).netloc |
|
for tag in soup.find_all('a', href=True): |
|
try: |
|
href = tag['href'].strip() |
|
if not href or href.startswith('javascript:'): |
|
continue |
|
full_url = urljoin(base_url, href) |
|
parsed = urlparse(full_url) |
|
links.append({ |
|
'url': full_url, |
|
'type': 'internal' if parsed.netloc == base_netloc else 'external', |
|
'anchor': self._clean_text(tag.get_text())[:100], |
|
'file_type': self._get_file_type(parsed.path) |
|
}) |
|
except: |
|
continue |
|
return links |
|
|
|
def _get_file_type(self, path: str) -> str: |
|
ext = Path(path).suffix.lower() |
|
return ext[1:] if ext else 'html' |
|
|
|
def _parse_sitemap(self, sitemap_url: str) -> List[str]: |
|
try: |
|
response = self.session.get(sitemap_url, timeout=10) |
|
response.raise_for_status() |
|
if 'xml' not in response.headers.get('Content-Type', ''): |
|
return [] |
|
soup = BeautifulSoup(response.text, 'lxml-xml') |
|
urls: List[str] = [] |
|
if soup.find('sitemapindex'): |
|
for sitemap in soup.find_all('loc'): |
|
url = sitemap.text.strip() |
|
if url.endswith('.xml'): |
|
urls.extend(self._parse_sitemap(url)) |
|
else: |
|
urls = [loc.text.strip() for loc in soup.find_all('loc')] |
|
return list({url for url in urls if url.startswith('http')}) |
|
except: |
|
return [] |
|
|
|
def _save_content(self, url: str, content: bytes) -> None: |
|
try: |
|
parsed = urlparse(url) |
|
domain_dir = self.base_dir / parsed.netloc |
|
raw_path = parsed.path.lstrip('/') |
|
if not raw_path or raw_path.endswith('/'): |
|
raw_path = os.path.join(raw_path, 'index.html') if raw_path else 'index.html' |
|
safe_path = sanitize_filename(raw_path) |
|
save_path = domain_dir / safe_path |
|
save_path.parent.mkdir(parents=True, exist_ok=True) |
|
with open(save_path, 'wb') as f: |
|
f.write(content) |
|
except: |
|
pass |
|
|
|
def _apply_nlp(self, results: List[Dict]) -> Tuple[Dict[str, str], Dict[str, List[str]]]: |
|
summaries = {} |
|
entities = {} |
|
for r in results: |
|
if r.get('status') != 'success' or not r.get('content'): |
|
continue |
|
content = r['content'] |
|
if len(content.split()) > 300: |
|
try: |
|
summary = self.models['summarizer'](content[:1024], max_length=100, min_length=30, do_sample=False)[0]['summary_text'] |
|
summaries[r['url']] = summary |
|
except: |
|
pass |
|
try: |
|
ents = self.models['ner'](content[:1000]) |
|
entities[r['url']] = list(set([e['word'] for e in ents if e['entity_group'] in ['PER', 'ORG', 'LOC']])) |
|
except: |
|
pass |
|
return summaries, entities |
|
|
|
def _compute_semantic_similarity(self, results: List[Dict]) -> Dict[str, List[Dict]]: |
|
contents = [(r['url'], r['content']) for r in results if r.get('status') == 'success' and r.get('content')] |
|
if len(contents) < 2: |
|
return {} |
|
try: |
|
urls, texts = zip(*contents) |
|
embeddings = self.models['semantic'].encode(texts, convert_to_tensor=True) |
|
sim_matrix = util.pytorch_cos_sim(embeddings, embeddings) |
|
similarity_dict = {} |
|
for i, url in enumerate(urls): |
|
scores = list(sim_matrix[i]) |
|
top_indices = sorted(range(len(scores)), key=lambda j: scores[j], reverse=True) |
|
top_similar = [ |
|
{"url": urls[j], "score": float(scores[j])} |
|
for j in top_indices if j != i and float(scores[j]) > 0.5 |
|
][:3] |
|
similarity_dict[url] = top_similar |
|
return similarity_dict |
|
except: |
|
return {} |
|
|
|
def _calculate_stats(self, results: List[Dict]) -> Dict: |
|
successful = [r for r in results if r.get('status') == 'success'] |
|
content_types = [r.get('type', 'unknown') for r in successful] |
|
avg_word_count = round(np.mean([r.get('word_count', 0) for r in successful]) if successful else 0, 1) |
|
return { |
|
'total_urls': len(results), |
|
'successful': len(successful), |
|
'failed': len(results) - len(successful), |
|
'content_types': pd.Series(content_types).value_counts().to_dict(), |
|
'avg_word_count': avg_word_count, |
|
'failed_urls': [r['url'] for r in results if r.get('status') != 'success'] |
|
} |
|
|
|
def _analyze_content(self, results: List[Dict]) -> Dict: |
|
successful = [r for r in results if r.get('status') == 'success' and r.get('content')] |
|
texts = [r['content'] for r in successful if len(r['content'].split()) > 10] |
|
if not texts: |
|
return {'top_keywords': [], 'content_samples': []} |
|
try: |
|
stop_words = list(self.models['spacy'].Defaults.stop_words) |
|
vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=50, ngram_range=(1, 2)) |
|
tfidf = vectorizer.fit_transform(texts) |
|
feature_names = vectorizer.get_feature_names_out() |
|
sorted_indices = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[-10:] |
|
top_keywords = feature_names[sorted_indices][::-1].tolist() |
|
except: |
|
top_keywords = [] |
|
samples = [{'url': r['url'], 'sample': r['content'][:500] + '...' if len(r['content']) > 500 else r['content']} for r in successful[:3]] |
|
return {'top_keywords': top_keywords, 'content_samples': samples} |
|
|
|
def _analyze_links(self, results: List[Dict]) -> Dict: |
|
all_links = [] |
|
for result in results: |
|
if result.get('links'): |
|
all_links.extend(result['links']) |
|
if not all_links: |
|
return {'internal_links': {}, 'external_domains': {}, 'common_anchors': {}, 'file_types': {}} |
|
df = pd.DataFrame(all_links) |
|
return { |
|
'internal_links': df[df['type'] == 'internal']['url'].value_counts().head(20).to_dict(), |
|
'external_domains': df[df['type'] == 'external']['url'].apply(lambda x: urlparse(x).netloc).value_counts().head(10).to_dict(), |
|
'common_anchors': df['anchor'].value_counts().head(10).to_dict(), |
|
'file_types': df['file_type'].value_counts().to_dict() |
|
} |
|
|
|
def _generate_seo_recommendations(self, results: List[Dict]) -> List[str]: |
|
successful = [r for r in results if r.get('status') == 'success'] |
|
if not successful: |
|
return ["No se pudo analizar ningún contenido exitosamente"] |
|
recs = [] |
|
missing_titles = sum(1 for r in successful if not r.get('metadata', {}).get('title')) |
|
if missing_titles: |
|
recs.append(f"📌 Añadir títulos a {missing_titles} páginas") |
|
short_descriptions = sum(1 for r in successful if not r.get('metadata', {}).get('description')) |
|
if short_descriptions: |
|
recs.append(f"📌 Añadir meta descripciones a {short_descriptions} páginas") |
|
short_content = sum(1 for r in successful if r.get('word_count', 0) < 300) |
|
if short_content: |
|
recs.append(f"📝 Ampliar contenido en {short_content} páginas (menos de 300 palabras)") |
|
all_links = [link for r in results for link in r.get('links', [])] |
|
if all_links: |
|
df_links = pd.DataFrame(all_links) |
|
internal_links = df_links[df_links['type'] == 'internal'] |
|
if len(internal_links) > 100: |
|
recs.append(f"🔗 Optimizar estructura de enlaces internos ({len(internal_links)} enlaces)") |
|
return recs if recs else ["✅ No se detectaron problemas críticos de SEO"] |
|
|
|
def plot_internal_links(self, links_data: Dict) -> Any: |
|
internal_links = links_data.get('internal_links', {}) |
|
fig, ax = plt.subplots() |
|
if not internal_links: |
|
ax.text(0.5, 0.5, 'No hay enlaces internos', ha='center', va='center', transform=ax.transAxes) |
|
ax.axis('off') |
|
else: |
|
names = list(internal_links.keys()) |
|
counts = list(internal_links.values()) |
|
ax.barh(names, counts) |
|
ax.set_xlabel("Cantidad de enlaces") |
|
ax.set_title("Top 20 Enlaces Internos") |
|
plt.tight_layout() |
|
return fig |
|
|