GMB-Scraper/lib/health_checker.py

259 lines
8.3 KiB
Python
Raw Normal View History

"""
Website Health Checker Module
==============================
Check website health: SSL, speed, mobile-friendly, forms.
"""
import ssl
import re
import socket
import time
import requests
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from .logger import get_logger
from .retry import retry_with_backoff
@retry_with_backoff(max_attempts=2, base_delay=1.0)
def check_website_health(website_url, timeout=10):
"""
Comprehensive website health check.
Args:
website_url: Website URL to check
timeout: Request timeout in seconds
Returns:
Dictionary with health check results
"""
logger = get_logger()
if not website_url:
return None
result = {
'url': website_url,
'reachable': False,
'ssl_valid': False,
'ssl_expiry': None,
'load_time': 0,
'status_code': 0,
'mobile_friendly': False,
'has_contact_form': False,
'has_phone_number': False,
'has_email': False,
'title': '',
'meta_description': '',
'issues': [],
}
try:
# Parse URL
parsed = urlparse(website_url)
hostname = parsed.hostname
if not hostname:
result['issues'].append("Invalid URL")
return result
# Check SSL certificate
ssl_result = check_ssl(hostname)
result['ssl_valid'] = ssl_result['valid']
result['ssl_expiry'] = ssl_result['expiry']
if not ssl_result['valid']:
result['issues'].append(f"SSL issue: {ssl_result['error']}")
# Check reachability and load time
start_time = time.time()
response = requests.get(
website_url,
timeout=timeout,
headers={
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15'
},
allow_redirects=True
)
load_time = time.time() - start_time
result['reachable'] = response.status_code == 200
result['status_code'] = response.status_code
result['load_time'] = round(load_time, 2)
if response.status_code != 200:
result['issues'].append(f"HTTP {response.status_code}")
# Parse HTML
soup = BeautifulSoup(response.text, 'lxml')
# Check title
title = soup.find('title')
result['title'] = title.get_text().strip() if title else ''
if not result['title']:
result['issues'].append("Missing page title")
# Check meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
result['meta_description'] = meta_desc.get('content', '') if meta_desc else ''
if not result['meta_description']:
result['issues'].append("Missing meta description")
# Check mobile-friendly (viewport meta tag)
viewport = soup.find('meta', attrs={'name': 'viewport'})
result['mobile_friendly'] = viewport is not None and 'width' in viewport.get('content', '')
if not result['mobile_friendly']:
result['issues'].append("Not mobile-friendly (no viewport meta)")
# Check for contact form
forms = soup.find_all('form')
contact_keywords = ['contact', 'inquiry', 'quote', 'book', 'appointment', 'message']
for form in forms:
action = form.get('action', '') or ''
form_id = form.get('id', '') or ''
form_class = form.get('class', [])
# BeautifulSoup returns class as a list
if isinstance(form_class, list):
form_class = ' '.join(form_class)
form_attrs = f"{action} {form_id} {form_class}".lower()
if any(kw in form_attrs for kw in contact_keywords):
result['has_contact_form'] = True
break
else:
# Any form is better than none
result['has_contact_form'] = len(forms) > 0
if not result['has_contact_form']:
result['issues'].append("No contact form found")
# Check for phone number
page_text = soup.get_text()
phone_pattern = r'(\+61|0[2-8])\s*\d[\d\s-]{7,9}'
result['has_phone_number'] = bool(
re.search(phone_pattern, page_text)
)
if not result['has_phone_number']:
result['issues'].append("No phone number found")
# Check for email
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
result['has_email'] = bool(
re.search(email_pattern, page_text)
)
# Check load time
if load_time > 3.0:
result['issues'].append(f"Slow load time: {load_time:.1f}s")
logger.info(
f"Website health for {hostname}: "
f"reachable={result['reachable']}, "
f"ssl={result['ssl_valid']}, "
f"load={load_time:.1f}s, "
f"mobile={result['mobile_friendly']}, "
f"form={result['has_contact_form']}, "
f"issues={len(result['issues'])}"
)
return result
except requests.Timeout:
result['issues'].append("Request timeout")
logger.warning(f"Website timeout: {website_url}")
return result
except requests.ConnectionError:
result['issues'].append("Connection failed")
logger.warning(f"Website unreachable: {website_url}")
return result
except Exception as e:
result['issues'].append(f"Error: {str(e)[:100]}")
logger.error(f"Website check error for {website_url}: {e}")
return result
def check_ssl(hostname, port=443):
"""
Check SSL certificate validity and expiry.
Args:
hostname: Domain name
port: SSL port
Returns:
Dictionary with SSL info
"""
result = {
'valid': False,
'expiry': None,
'error': None,
}
try:
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
# Check expiry
expiry_str = cert.get('notAfter')
if expiry_str:
from datetime import datetime
expiry = datetime.strptime(expiry_str, '%b %d %H:%M:%S %Y %Z')
result['expiry'] = expiry.isoformat()
# Check if expired or expiring soon
days_until_expiry = (expiry - datetime.now()).days
result['valid'] = days_until_expiry > 7
if days_until_expiry <= 0:
result['error'] = "SSL certificate expired"
elif days_until_expiry <= 7:
result['error'] = f"SSL expires in {days_until_expiry} days"
return result
except ssl.SSLError as e:
result['error'] = f"SSL error: {str(e)[:100]}"
return result
except socket.timeout:
result['error'] = "Connection timeout"
return result
except Exception as e:
result['error'] = str(e)[:100]
return result
def batch_health_check(websites, max_workers=5):
"""
Check multiple websites in parallel.
Args:
websites: List of website URLs
max_workers: Number of parallel workers
Returns:
List of health check results
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(check_website_health, url): url
for url in websites if url
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({
'url': url,
'reachable': False,
'issues': [f"Error: {str(e)[:100]}"],
})
return results