#!/usr/bin/env python3 """ Google My Business (Maps) Scraper v4 β€” Production Grade ======================================================== Pain-aware lead generation engine for Darwisyah Digital Media. Extracts business data, detects pain signals, checks website health, and generates personalized apex pitches. Usage: # Basic scrape (backward compatible) python3 gmb_scraper.py -q "lawyers Perth CBD" --min-rating 4.0 # Pain-aware scrape (recommended) python3 gmb_scraper.py -q "dentists Joondalup" --detect-pain --check-websites # Filter by pain level python3 gmb_scraper.py -q "accountants Perth" --detect-pain --min-pain 20 # Generate pitch report python3 gmb_scraper.py -q "lawyers Perth" --detect-pain --pitch-report --channel sms # Full analysis (reviews + websites + pitches) python3 gmb_scraper.py -q "dentists Perth" --detect-pain --scrape-reviews --check-websites --pitch-report Output: CSV with pain scores, signals, and optional pitch drafts. """ import argparse import csv import json import os import sys import re import time import random import urllib.parse from pathlib import Path from datetime import datetime from dotenv import load_dotenv from playwright.sync_api import sync_playwright, TimeoutError as PwTimeout # Load .env file load_dotenv(Path(__file__).parent / '.env') # Add lib to path sys.path.insert(0, str(Path(__file__).parent)) from lib.logger import setup_logger, get_logger, ScraperStats from lib.retry import retry_with_backoff from lib.stealth import apply_stealth, create_stealth_context, human_delay, human_scroll_delay from lib.validator import validate_lead, deduplicate_leads from lib.pain_detector import detect_pain_signals, calculate_pain_score, format_pain_summary from lib.review_scraper import scrape_reviews from lib.health_checker import check_website_health from lib.pitch_generator import generate_apex_pitch def parse_args(): p = argparse.ArgumentParser( description="GMB Scraper v4 β€” Pain-Aware Lead Generation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -q "lawyers Perth CBD" --detect-pain --check-websites %(prog)s -q "dentists Joondalup" --detect-pain --scrape-reviews --pitch-report %(prog)s -q "accountants Perth" --detect-pain --min-pain 25 --channel email """ ) # Basic options p.add_argument("--query", "-q", required=True, help="Search query") p.add_argument("--min-rating", type=float, default=0.0, help="Min star rating (default: 0)") p.add_argument("--min-reviews", type=int, default=0, help="Min review count (default: 0)") p.add_argument("--max-results", type=int, default=100, help="Max results (default: 100)") p.add_argument("--output", "-o", default=None, help="Output CSV path") p.add_argument("--json", action="store_true", help="Also output JSON") # Pain detection p.add_argument("--detect-pain", action="store_true", help="Enable pain signal detection") p.add_argument("--min-pain", type=int, default=0, help="Min pain score to include (default: 0)") p.add_argument("--scrape-reviews", action="store_true", help="Scrape reviews for pain keywords") p.add_argument("--max-reviews", type=int, default=30, help="Max reviews per business (default: 30)") p.add_argument("--check-websites", action="store_true", help="Check website health (SSL, speed, forms)") # Pitch generation p.add_argument("--pitch-report", action="store_true", help="Generate pitch report") p.add_argument("--channel", default="sms", choices=["sms", "email", "call", "gumtree"], help="Pitch channel (default: sms)") # Performance p.add_argument("--headful", action="store_true", help="Show browser") p.add_argument("--slow", action="store_true", help="Longer delays (safer)") p.add_argument("--no-stealth", action="store_true", help="Disable stealth mode (faster)") p.add_argument("--proxy", default=None, help="Proxy URL (e.g., http://user:pass@host:port)") return p.parse_args() @retry_with_backoff(max_attempts=3, base_delay=2.0, retry_on=(PwTimeout, Exception)) def scroll_feed(page, max_results, slow=False): """Scroll results feed and collect all place URLs.""" logger = get_logger() collected = {} for i in range(300): items = page.locator('a[href*="/maps/place/"]').all() new = 0 for item in items: try: href = item.get_attribute("href") or "" aria = item.get_attribute("aria-label") or "" if href and href not in collected: collected[href] = aria new += 1 except Exception: continue if len(collected) >= max_results: break # Scroll the feed try: page.locator('[role="feed"]').first.evaluate("el => el.scrollBy(0, 1000)") except Exception: page.keyboard.press("End") time.sleep(human_scroll_delay()) # If no new items, wait longer then retry if new == 0: time.sleep(2) items2 = page.locator('a[href*="/maps/place/"]').all() still_new = 0 for item in items2: try: href = item.get_attribute("href") or "" aria = item.get_attribute("aria-label") or "" if href and href not in collected: collected[href] = aria still_new += 1 except Exception: continue if still_new == 0: logger.info(f"Scroll complete: {len(collected)} businesses loaded") break if (i + 1) % 10 == 0: logger.info(f"Scroll {i+1}: {len(collected)} businesses loaded...") return dict(list(collected.items())[:max_results]) @retry_with_backoff(max_attempts=2, base_delay=1.5) def extract_details(page): """Extract all details from an open business page.""" data = { "name": "", "address": "", "phone": "", "website": "", "rating": 0.0, "review_count": 0, "category": "", "hours": "", "maps_url": page.url, } # Get body text try: body = page.locator("body").inner_text(timeout=5000) lines = [l.strip() for l in body.split("\n") if l.strip()] except Exception: lines = [] # Parse rating and reviews from body text for i, line in enumerate(lines): if re.match(r"^\d\.\d$", line): data["rating"] = float(line) if i + 1 < len(lines): rm = re.match(r"^\((\d[\d,]*)\)$", lines[i + 1]) if rm: data["review_count"] = int(rm.group(1).replace(",", "")) if i + 2 < len(lines): candidate = lines[i + 2] if len(candidate) < 60 and candidate not in ("Overview", "Reviews", "About"): data["category"] = candidate break # Name from h1 try: data["name"] = page.locator("h1").first.inner_text(timeout=3000).strip() except Exception: for i, line in enumerate(lines): if re.match(r"^\d\.\d$", line) and i > 0: data["name"] = lines[i - 1] break # Address try: addr_btn = page.locator('button[data-item-id="address"]').first if addr_btn.count() > 0: aria = addr_btn.get_attribute("aria-label") or "" data["address"] = aria.replace("Address: ", "").strip() except Exception: pass # Phone try: phone_btns = page.locator('button[data-item-id^="phone"]').all() for btn in phone_btns: aria = btn.get_attribute("aria-label") or "" if aria.startswith("Phone:"): data["phone"] = aria.replace("Phone: ", "").strip() break except Exception: pass # Website try: website_links = page.locator('a[aria-label^="Website:"]').all() if website_links: aria = website_links[0].get_attribute("aria-label") or "" data["website"] = aria.replace("Website: ", "").strip() else: all_links = page.locator("a").all() for link in all_links: try: href = link.get_attribute("href") or "" if (href.startswith("http") and "google.com" not in href and "gstatic.com" not in href and "ggpht.com" not in href and "tel:" not in href and len(href) > 10): data["website"] = href break except Exception: continue except Exception: pass # Hours try: hours_btn = page.locator('button[data-item-id="oh"]').first if hours_btn.count() > 0: aria = hours_btn.get_attribute("aria-label") or "" data["hours"] = aria.strip() except Exception: pass return data def main(): args = parse_args() # Setup logging logger = setup_logger('gmb_scraper') stats = ScraperStats(logger) # Setup output path if not args.output: ts = datetime.now().strftime("%Y%m%d_%H%M%S") safe = re.sub(r"[^\w]", "_", args.query)[:40] args.output = f"/root/.hermes/cache/gmb/{safe}_{ts}.csv" Path(args.output).parent.mkdir(parents=True, exist_ok=True) # Banner print("=" * 80) print(" πŸ—ΊοΈ GMB Scraper v4 β€” Pain-Aware Lead Generation") print("=" * 80) print(f" Query: {args.query}") print(f" Max results: {args.max_results}") print(f" Min rating: {args.min_rating}β˜…") print(f" Min reviews: {args.min_reviews}") print(f" Pain detection: {'βœ…' if args.detect_pain else '❌'}") print(f" Review scraping: {'βœ…' if args.scrape_reviews else '❌'}") print(f" Website checks: {'βœ…' if args.check_websites else '❌'}") print(f" Pitch report: {'βœ…' if args.pitch_report else '❌'}") print(f" Output: {args.output}") print("=" * 80) results = [] encoded = urllib.parse.quote_plus(args.query) url = f"https://www.google.com/maps/search/{encoded}" with sync_playwright() as pw: browser = pw.chromium.launch( headless=not args.headful, args=["--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage"], ) # Create context if args.no_stealth: context = browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", locale="en-AU", timezone_id="Australia/Perth", ) else: context = create_stealth_context(browser, headless=not args.headful, proxy=args.proxy) page = context.new_page() # Apply stealth to page if not args.no_stealth: apply_stealth(context, page) # Block images/fonts for speed page.route("**/*.{png,jpg,jpeg,gif,svg,webp,ico,woff,woff2}", lambda r: r.abort()) # Navigate logger.info(f"Searching: \"{args.query}\"") page.goto(url, wait_until="domcontentloaded", timeout=30000) time.sleep(human_delay(3, 5)) # Wait for feed try: page.wait_for_selector('[role="feed"]', timeout=15000) except PwTimeout: logger.warning("Feed slow, waiting extra...") time.sleep(5) # Scroll and collect logger.info(f"Scrolling for results (max {args.max_results})...") hrefs = scroll_feed(page, args.max_results, args.slow) logger.info(f"Found {len(hrefs)} businesses") stats.increment('businesses_found', len(hrefs)) if not hrefs: logger.error("No results found. Try a broader query.") browser.close() return # Visit each business page total = len(hrefs) for i, (href, aria_name) in enumerate(hrefs.items()): progress = f"[{i+1}/{total}]" print(f"\n{progress} {aria_name[:45]:<45}", end="") try: page.goto(href, wait_until="domcontentloaded", timeout=15000) time.sleep(human_delay(1.0, 2.0)) try: page.wait_for_selector("h1", timeout=5000) except PwTimeout: pass # Extract basic details data = extract_details(page) # Apply basic filters if data["rating"] > 0 and data["rating"] < args.min_rating: print(f" ⏭️ {data['rating']}β˜… < {args.min_rating}") stats.increment('businesses_filtered') continue if data["review_count"] < args.min_reviews: print(f" ⏭️ {data['review_count']} reviews < {args.min_reviews}") stats.increment('businesses_filtered') continue # Validate lead data, is_valid, issues = validate_lead(data) if not is_valid: print(f" ⏭️ Invalid lead") stats.increment('businesses_filtered') continue stats.increment('businesses_scraped') # === PAIN DETECTION === pain_data = None reviews = [] health_check = None if args.detect_pain: print(" πŸ”", end="") # Scrape reviews if requested if args.scrape_reviews: print(" πŸ“", end="") try: reviews = scrape_reviews(page, max_reviews=args.max_reviews) stats.increment('reviews_scraped', len(reviews)) except Exception as e: logger.warning(f"Review scrape failed for {data['name']}: {e}") # Detect pain signals pain_data = detect_pain_signals(data, reviews=reviews, health_check=None) stats.increment('pain_signals_detected', pain_data['signal_count']) # Filter by pain score if pain_data['pain_score'] < args.min_pain: print(f" ⏭️ pain={pain_data['pain_score']} < {args.min_pain}") stats.increment('businesses_filtered') continue # Store data data['reviews'] = reviews if args.scrape_reviews else [] data['pain_data'] = pain_data results.append(data) # Print status if pain_data: print(f" βœ… {data['rating']}β˜… ({data['review_count']}r) pain={pain_data['pain_score']}", end="") else: web_flag = "🌐" if data["website"] else "" print(f" βœ… {data['rating']}β˜… ({data['review_count']}r) {web_flag}", end="") # Anti-detection pause every 10 items if (i + 1) % 10 == 0: p = random.uniform(3, 6) logger.debug(f"Anti-detection pause: {p:.1f}s") time.sleep(p) except Exception as e: logger.error(f"Error scraping {aria_name}: {e}") stats.increment('errors') continue browser.close() # === POST-PROCESSING: Website Health Checks === if args.check_websites and args.detect_pain: print(f"\n{'='*80}") print(f"πŸ” Checking website health for {len(results)} businesses...") print(f"{'='*80}") for i, lead in enumerate(results): if lead.get('website'): print(f" [{i+1}/{len(results)}] {lead['name'][:40]:<40}", end=" β†’ ") try: health = check_website_health(lead['website']) stats.increment('websites_checked') lead['health_check'] = health # Re-detect pain with health data pain_data = detect_pain_signals( lead, reviews=lead.get('reviews', []), health_check=health ) lead['pain_data'] = pain_data issues = len(health.get('issues', [])) print(f"{'βœ…' if health.get('reachable') else '❌'} " f"ssl={'βœ…' if health.get('ssl_valid') else '❌'} " f"load={health.get('load_time', 0):.1f}s " f"pain={pain_data['pain_score']}") except Exception as e: print(f"❌ {str(e)[:50]}") lead['health_check'] = None else: print(f" [{i+1}/{len(results)}] {lead['name'][:40]:<40} β†’ No website") # === POST-PROCESSING: Pitch Generation === if args.pitch_report and args.detect_pain: print(f"\n{'='*80}") print(f"πŸ“ Generating apex pitches ({args.channel})...") print(f"{'='*80}") for i, lead in enumerate(results): pain_data = lead.get('pain_data') if pain_data and pain_data.get('pain_score', 0) > 0: pitch = generate_apex_pitch(lead, pain_data, channel=args.channel) lead['pitch'] = pitch stats.increment('pitches_generated') print(f" [{i+1}/{len(results)}] {lead['name'][:40]:<40} β†’ {pitch['primary_service']}") # === OUTPUT === print(f"\n{'='*80}") print(f"πŸ“Š RESULTS: {len(results)} businesses (filtered from {total})") print(f"{'='*80}") if not results: logger.error("No results matched filters. Try lowering thresholds.") stats.log_summary() return # Build CSV fields fields = ["name", "address", "phone", "website", "rating", "review_count", "category", "hours", "maps_url"] if args.detect_pain: fields.extend(["pain_score", "pain_signals", "primary_service", "confidence"]) if args.check_websites: fields.extend(["website_reachable", "website_ssl", "website_load_time", "website_mobile", "website_form"]) if args.pitch_report: fields.append("pitch") # Write CSV with open(args.output, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields, extrasaction='ignore') writer.writeheader() for lead in results: row = lead.copy() # Add pain data if args.detect_pain and lead.get('pain_data'): pd = lead['pain_data'] row['pain_score'] = pd['pain_score'] row['pain_signals'] = '; '.join(pd['signals'].keys()) row['primary_service'] = pd.get('primary_service', '') row['confidence'] = pd.get('confidence', '') # Add health data if args.check_websites and lead.get('health_check'): hc = lead['health_check'] row['website_reachable'] = hc.get('reachable', False) row['website_ssl'] = hc.get('ssl_valid', False) row['website_load_time'] = hc.get('load_time', 0) row['website_mobile'] = hc.get('mobile_friendly', False) row['website_form'] = hc.get('has_contact_form', False) # Add pitch if args.pitch_report and lead.get('pitch'): row['pitch'] = lead['pitch']['pitch'] writer.writerow(row) print(f"\nπŸ’Ύ CSV: {args.output}") # Write JSON if args.json: jp = args.output.replace(".csv", ".json") with open(jp, "w") as f: json.dump(results, f, indent=2, ensure_ascii=False, default=str) print(f"πŸ’Ύ JSON: {jp}") # Write "latest" symlink safe = re.sub(r"[^\w]", "_", args.query)[:40] latest = f"/root/.hermes/cache/gmb/{safe}_latest.csv" with open(latest, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields, extrasaction='ignore') writer.writeheader() for lead in results: row = lead.copy() if args.detect_pain and lead.get('pain_data'): pd = lead['pain_data'] row['pain_score'] = pd['pain_score'] row['pain_signals'] = '; '.join(pd['signals'].keys()) row['primary_service'] = pd.get('primary_service', '') row['confidence'] = pd.get('confidence', '') if args.check_websites and lead.get('health_check'): hc = lead['health_check'] row['website_reachable'] = hc.get('reachable', False) row['website_ssl'] = hc.get('ssl_valid', False) row['website_load_time'] = hc.get('load_time', 0) row['website_mobile'] = hc.get('mobile_friendly', False) row['website_form'] = hc.get('has_contact_form', False) if args.pitch_report and lead.get('pitch'): row['pitch'] = lead['pitch']['pitch'] writer.writerow(row) # Print summary table if args.detect_pain: print(f"\n{'#':<3} {'NAME':<30} {'RATING':<6} {'PAIN':<5} {'SERVICE':<25} {'CONF':<5}") print("-" * 80) sorted_results = sorted(results, key=lambda x: x.get('pain_data', {}).get('pain_score', 0), reverse=True) for i, r in enumerate(sorted_results[:30], 1): name = (r["name"][:28] + "..") if len(r["name"]) > 30 else r["name"] pd = r.get('pain_data', {}) pain = pd.get('pain_score', 0) service = (pd.get('primary_service', 'β€”') or 'β€”')[:23] conf = pd.get('confidence', 'β€”') print(f"{i:<3} {name:<30} {r['rating']:<6} {pain:<5} {service:<25} {conf:<5}") else: print(f"\n{'#':<3} {'NAME':<35} {'RATING':<6} {'REV':<5} {'PHONE':<16} {'WEB':<4}") print("-" * 75) for i, r in enumerate(results[:30], 1): name = (r["name"][:33] + "..") if len(r["name"]) > 35 else r["name"] phone = (r["phone"][:14]) if r["phone"] else "β€”" web = "βœ…" if r["website"] else "β€”" print(f"{i:<3} {name:<35} {r['rating']:<6} {r['review_count']:<5} {phone:<16} {web}") if len(results) > 30: print(f"\n ... +{len(results)-30} more in CSV") # Stats with_web = sum(1 for r in results if r["website"]) with_phone = sum(1 for r in results if r["phone"]) avg_r = sum(r["rating"] for r in results if r["rating"] > 0) / max(sum(1 for r in results if r["rating"] > 0), 1) print(f"\nπŸ“ˆ Stats:") print(f" Total: {len(results)} businesses") print(f" Avg rating: {avg_r:.1f}β˜…") print(f" With website: {with_web}") print(f" With phone: {with_phone}") if args.detect_pain: high_pain = sum(1 for r in results if r.get('pain_data', {}).get('pain_score', 0) >= 30) med_pain = sum(1 for r in results if 15 <= r.get('pain_data', {}).get('pain_score', 0) < 30) low_pain = sum(1 for r in results if 0 < r.get('pain_data', {}).get('pain_score', 0) < 15) print(f" High pain: {high_pain} (score β‰₯30)") print(f" Medium pain: {med_pain} (score 15-29)") print(f" Low pain: {low_pain} (score 1-14)") # Log final stats stats.log_summary() print(f"\n🎯 Done! Results saved to {args.output}") if __name__ == "__main__": main()