Copia y pega el código para usarlo en tu propio proyecto
SEO Migration Auditor - Interfaz Web
=====================================
Panel de configuración visual para ejecutar auditorías SEO entre entornos.
Ejecutar: python auditor_web.py
Abrir: http://localhost:5000
from flask import Flask, render_template_string, request, jsonify, send_file
from flask_socketio import SocketIO, emit
import threading
import os
import sys
import json
from datetime import datetime
# Importar el auditor original
from auditor_sitemaps import SEOAuditor, CONFIG, AuditResult
from tqdm import tqdm
import concurrent.futures
app = Flask(__name__)
app.config['SECRET_KEY'] = 'seo-auditor-secret-key'
socketio = SocketIO(app, cors_allowed_origins="*")
# Estado global del análisis
audit_state = {
"running": False,
"progress": 0,
"total": 0,
"current_url": "",
"status": "idle",
"report_path": None,
"error": None
}
# ============================================================================
# PLANTILLA HTML - PANEL DE CONFIGURACIÓN
# ============================================================================
HTML_TEMPLATE = '''
🔍 SEO Migration Auditor
SEO Migration Auditor
Compara entornos de Producción y Staging para detectar problemas SEO
v2.0 Professional
Análisis en Progreso
Analizando...
0 / 0 URLs
0%
Preparando análisis...
🚀 Iniciando auditoría...
¡Auditoría Completada!
Se analizaron X URLs correctamente.
'''
# ============================================================================
# RUTAS DE LA API
# ============================================================================
@app.route('/')
def index():
"""Página principal con el panel de configuración."""
return render_template_string(HTML_TEMPLATE)
@app.route('/start-audit', methods=['POST'])
def start_audit():
"""Inicia una nueva auditoría con la configuración proporcionada."""
global audit_state
print("\n📥 Solicitud de auditoría recibida")
if audit_state['running']:
print("⚠️ Ya hay una auditoría en progreso")
return jsonify({'success': False, 'error': 'Ya hay una auditoría en progreso'})
config = request.json
print(f"📋 Configuración recibida: {config.get('prod_domain', 'N/A')}")
# Actualizar CONFIG global con los valores del usuario
CONFIG['DOMAINS']['prod'] = config['prod_domain'].rstrip('/')
CONFIG['DOMAINS']['stage'] = config['stage_domain'].rstrip('/')
CONFIG['SITEMAP_PROD'] = config['prod_sitemap']
CONFIG['SITEMAP_STAGE'] = config['stage_sitemap']
CONFIG['STAGING_USER'] = config['auth_user'] if config['needs_auth'] else ''
CONFIG['STAGING_PASS'] = config['auth_pass'] if config['needs_auth'] else ''
CONFIG['CONTENT_SELECTOR'] = config['content_selector'] or 'main'
CONFIG['THREADS'] = config['threads']
CONFIG['LIMIT_URLS'] = config['limit_urls']
CONFIG['SMART_MATCH_THRESHOLD'] = config['smart_match_threshold']
CONFIG['LOW_SIMILARITY_TRIGGER'] = config['low_sim_trigger']
# Crawling options
CONFIG['ENABLE_CRAWLING'] = config.get('enable_crawling', False)
CONFIG['CRAWL_MAX_URLS'] = config.get('crawl_max_urls', 500)
CONFIG['CRAWL_MAX_DEPTH'] = config.get('crawl_max_depth', 3)
# Parse exclude patterns
exclude_str = config.get('crawl_exclude', '')
if exclude_str:
CONFIG['CRAWL_EXCLUDE_PATTERNS'] = [p.strip() for p in exclude_str.split(',') if p.strip()]
print(f"✅ CONFIG actualizado:")
print(f" - Prod: {CONFIG['DOMAINS']['prod']}")
print(f" - Stage: {CONFIG['DOMAINS']['stage']}")
print(f" - Crawling: {CONFIG['ENABLE_CRAWLING']}")
# Iniciar auditoría en un hilo separado
print("🚀 Iniciando hilo de auditoría...")
thread = threading.Thread(target=run_audit_thread)
thread.daemon = True # Para que el thread se cierre cuando se cierra la app
thread.start()
print("✅ Hilo iniciado correctamente")
return jsonify({'success': True})
@app.route('/report/')
def serve_report(filename):
"""Sirve el archivo de reporte HTML."""
report_path = os.path.join(os.getcwd(), filename)
if os.path.exists(report_path):
return send_file(report_path)
return "Reporte no encontrado", 404
# ============================================================================
# LÓGICA DE AUDITORÍA
# ============================================================================
def run_audit_thread():
"""Ejecuta la auditoría en un hilo separado."""
global audit_state
print("\n🔄 run_audit_thread() iniciado")
audit_state['running'] = True
audit_state['status'] = 'running'
audit_state['error'] = None
try:
print("📡 Emitiendo mensaje de inicio...")
socketio.emit('log', {'type': 'info', 'message': '📡 Conectando con los sitemaps...'})
# Crear auditor
auditor = SEOAuditor()
# Obtener URLs
socketio.emit('log', {'type': 'info', 'message': '🔄 Descubriendo URLs desde sitemaps...'})
# Si crawling está activo, notificar
if CONFIG.get('ENABLE_CRAWLING', False):
socketio.emit('log', {'type': 'info', 'message': f'🕷️ Crawling activado (max: {CONFIG["CRAWL_MAX_URLS"]} URLs, profundidad: {CONFIG["CRAWL_MAX_DEPTH"]})'})
urls = auditor.get_master_list()
if not urls:
raise Exception("No se encontraron URLs en los sitemaps")
# Mostrar estadísticas de fuentes de URLs
if hasattr(auditor, 'url_sources'):
sources = auditor.url_sources
socketio.emit('log', {'type': 'success', 'message': f'📋 Sitemap Prod: {sources.get("sitemap_prod", 0)} URLs'})
socketio.emit('log', {'type': 'success', 'message': f'📋 Sitemap Stage: {sources.get("sitemap_stage", 0)} URLs'})
if sources.get('crawled_prod', 0) > 0:
socketio.emit('log', {'type': 'warning', 'message': f'🕷️ Crawling Prod: +{sources["crawled_prod"]} URLs adicionales'})
if sources.get('crawled_stage', 0) > 0:
socketio.emit('log', {'type': 'warning', 'message': f'🕷️ Crawling Stage: +{sources["crawled_stage"]} URLs adicionales'})
socketio.emit('log', {'type': 'success', 'message': f'✅ Total: {len(urls)} URLs a analizar'})
# Pre-cachear staging
socketio.emit('log', {'type': 'info', 'message': '🧠 Pre-cacheando contenido de Staging...'})
staging_urls = [f"{CONFIG['DOMAINS']['stage']}{p}" for p in auditor.staging_paths]
cached = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=CONFIG['THREADS']) as executor:
future_map = {executor.submit(auditor.analyze_page, url, True): url for url in staging_urls}
for i, future in enumerate(concurrent.futures.as_completed(future_map)):
try:
from urllib.parse import urlparse
result = future.result()
if result.status == 200 and result.raw_text:
parsed = urlparse(result.url)
path = parsed.path
if parsed.query:
path += f"?{parsed.query}"
auditor.staging_content_cache[path] = result
cached += 1
except:
pass
# Emitir progreso de caché
if i % 5 == 0:
socketio.emit('progress', {
'current': i + 1,
'total': len(staging_urls),
'percent': int((i + 1) / len(staging_urls) * 100),
'url': f'Cacheando staging... ({cached} páginas)'
})
socketio.emit('log', {'type': 'success', 'message': f'✅ {cached} páginas de staging cacheadas'})
# Procesar URLs
socketio.emit('log', {'type': 'info', 'message': '📊 Iniciando comparación de entornos...'})
audit_state['total'] = len(urls)
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=CONFIG['THREADS']) as executor:
future_map = {executor.submit(auditor.compare_environments, url): url for url in urls}
for i, future in enumerate(concurrent.futures.as_completed(future_map)):
try:
result = future.result()
results.append(result)
# Log para smart matches
if result.is_smart_match:
socketio.emit('log', {
'type': 'warning',
'message': f'🔀 Smart Match: {result.rel_uri} → {result.smart_match_uri}'
})
except Exception as e:
socketio.emit('log', {'type': 'error', 'message': f'❌ Error: {str(e)[:50]}'})
# Emitir progreso
progress = int((i + 1) / len(urls) * 100)
socketio.emit('progress', {
'current': i + 1,
'total': len(urls),
'percent': progress,
'url': urls[i] if i < len(urls) else 'Finalizando...'
})
audit_state['progress'] = i + 1
# Generar reporte
socketio.emit('log', {'type': 'info', 'message': '📝 Generando reporte HTML...'})
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_filename = f'Reporte_SEO_{timestamp}.html'
auditor.generate_report(results, report_filename)
audit_state['report_path'] = report_filename
# Estadísticas finales
smart_matches = sum(1 for r in results if r.is_smart_match)
errors = sum(1 for r in results if r.stage.status >= 400)
socketio.emit('log', {'type': 'success', 'message': f'✨ Reporte generado: {report_filename}'})
socketio.emit('complete', {
'success': True,
'total_urls': len(results),
'smart_matches': smart_matches,
'errors': errors,
'report_file': report_filename
})
except Exception as e:
import traceback
error_msg = str(e)
traceback_str = traceback.format_exc()
print(f"\n❌ ERROR EN AUDITORÍA:")
print(traceback_str)
socketio.emit('log', {'type': 'error', 'message': f'❌ Error fatal: {error_msg}'})
socketio.emit('complete', {
'success': False,
'error': error_msg
})
audit_state['error'] = error_msg
finally:
audit_state['running'] = False
audit_state['status'] = 'idle'
# ============================================================================
# MAIN
# ============================================================================
if __name__ == '__main__':
PORT = 5050 # Cambiado de 5000 porque AirPlay usa ese puerto en macOS
print("\n" + "="*60)
print("🔍 SEO MIGRATION AUDITOR - Interfaz Web")
print("="*60)
print(f"\n📌 Abre tu navegador en: http://localhost:{PORT}\n")
print(" Presiona Ctrl+C para detener el servidor\n")
print("="*60 + "\n")
socketio.run(app, host='0.0.0.0', port=PORT, debug=False, allow_unsafe_werkzeug=True)