#!/usr/bin/env python3 """ GMB Scraper → Pipecat Voice Agent Bridge ========================================== Takes GMB scraper CSV output and imports businesses as leads into the Pipecat AI cold caller at voice.darwisyah.com. Usage: # Import from existing scraper CSV into a campaign python3 gmb_to_voice.py --csv /path/to/results.csv --campaign CAMPAIGN_ID # Create campaign + scrape + import in one shot python3 gmb_to_voice.py --query "phone repair Wangara Perth" --campaign-name "My Campaign" # Just create a campaign (no leads yet) python3 gmb_to_voice.py --create-only --campaign-name "New Campaign" --topic "IT services" # List campaigns python3 gmb_to_voice.py --list-campaigns """ import argparse import csv import json import os import sys import time import urllib.request import urllib.error from pathlib import Path from datetime import datetime from dotenv import load_dotenv # Load .env file load_dotenv(Path(__file__).parent / '.env') VOICE_API = os.environ.get("VOICE_API_URL", "https://voice.darwisyah.com") API_KEY = os.environ.get("PIPECAT_API_KEY", "pipeca...wart") def api_request(method, path, data=None, expect_json=True): """Make API request to voice agent.""" url = f"{VOICE_API}{path}" body = json.dumps(data).encode() if data else None req = urllib.request.Request( url, data=body, method=method, headers={ "Content-Type": "application/json", "X-API-Key": API_KEY, }, ) try: with urllib.request.urlopen(req, timeout=30) as resp: if expect_json: return json.loads(resp.read().decode()) return resp.read().decode() except urllib.error.HTTPError as e: body = e.read().decode() if e.fp else "" print(f"❌ API Error {e.code}: {body}") return None except Exception as e: print(f"❌ Request failed: {e}") return None def create_campaign(name, topic, pain_point, greeting, system_prompt, referrer="GMB_Scraper"): """Create a new campaign in the voice agent.""" payload = { "name": name, "description": f"AI outreach campaign: {topic}", "status": "active", "campaign_type": "cold_outreach", "locale": "en-AU", "greeting_override": greeting, "system_prompt_override": system_prompt, "default_referrer": referrer, "default_topic": topic, "default_pain": pain_point, "list_name": name.lower().replace(" ", "_")[:50], } result = api_request("POST", "/api/campaigns", payload) if result and "campaign_id" in result: return result["campaign_id"] return None def list_campaigns(): """List all campaigns.""" result = api_request("GET", "/api/campaigns") if result is None: return [] return result if isinstance(result, list) else [] def import_leads_csv(csv_path, campaign_id): """Import GMB scraper CSV as leads into the voice agent.""" if not Path(csv_path).exists(): print(f"❌ CSV not found: {csv_path}") return None # Read the GMB scraper CSV and transform for voice agent leads_csv = [] with open(csv_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: phone = row.get("phone", "").strip() if not phone: continue # Parse name into first/last name = row.get("name", "").strip() parts = name.split(" ", 1) first_name = parts[0] if parts else "" last_name = parts[1] if len(parts) > 1 else "" lead = { "phone": phone, "campaign_id": campaign_id, "first_name": first_name, "last_name": last_name, "company": name, "email": "", "title": "", "referrer": "GMB_Scraper", "topic": row.get("category", ""), "pain_point": "", "timezone": "Australia/Perth", } leads_csv.append(lead) if not leads_csv: print("❌ No leads with phone numbers found in CSV") return None # Build CSV string for import import io output = io.StringIO() writer = csv.DictWriter( output, fieldnames=["phone", "campaign_id", "first_name", "last_name", "company", "email", "title", "referrer", "topic", "pain_point", "timezone"], ) writer.writeheader() writer.writerows(leads_csv) csv_text = output.getvalue() # POST to import endpoint url = f"{VOICE_API}/api/leads/import" req = urllib.request.Request( url, data=csv_text.encode("utf-8"), method="POST", headers={ "Content-Type": "text/csv", "X-API-Key": API_KEY, }, ) try: with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read().decode()) return result except urllib.error.HTTPError as e: body = e.read().decode() if e.fp else "" print(f"❌ Import Error {e.code}: {body}") return None except Exception as e: print(f"❌ Import failed: {e}") return None def get_dialer_stats(): """Get current dialer stats.""" return api_request("GET", "/api/dialer/stats") def start_dialer(): """Start the auto-dialer.""" return api_request("POST", "/api/dialer/start") def main(): parser = argparse.ArgumentParser(description="GMB Scraper → Voice Agent Bridge") parser.add_argument("--csv", help="Path to GMB scraper CSV file") parser.add_argument("--campaign", help="Existing campaign ID to import into") parser.add_argument("--campaign-name", help="Create new campaign with this name") parser.add_argument("--query", help="Run GMB scraper with this query first, then import") parser.add_argument("--topic", default="Business Services", help="Campaign topic") parser.add_argument("--pain", default="", help="Pain point for the campaign") parser.add_argument("--greeting", default="", help="Custom greeting for the AI") parser.add_argument("--prompt", default="", help="Custom system prompt for the AI") parser.add_argument("--list-campaigns", action="store_true", help="List all campaigns") parser.add_argument("--create-only", action="store_true", help="Just create campaign, don't import") parser.add_argument("--start-dialer", action="store_true", help="Start the dialer after import") parser.add_argument("--stats", action="store_true", help="Show dialer stats") args = parser.parse_args() # List campaigns if args.list_campaigns: print("📋 Campaigns:") campaigns = list_campaigns() if not campaigns: print(" (none)") else: for c in campaigns: if isinstance(c, dict): print(f" {c.get('campaign_id', c.get('id', '?'))} | {c.get('name', '?')} | {c.get('status', '?')}") return # Stats if args.stats: stats = get_dialer_stats() if stats: print(json.dumps(stats, indent=2)) return # Create campaign campaign_id = args.campaign if args.campaign_name and not campaign_id: greeting = args.greeting or f"Hi there! This is Zul calling about {args.topic}. Do you have a quick minute?" prompt = args.prompt or f"You are Zul, a friendly local business owner in Perth WA. You're calling about {args.topic}. Be warm, professional, and conversational. Australian accent and manner." print(f"📢 Creating campaign: {args.campaign_name}") campaign_id = create_campaign( name=args.campaign_name, topic=args.topic, pain_point=args.pain, greeting=greeting, system_prompt=prompt, ) if campaign_id: print(f"✅ Campaign created: {campaign_id}") else: print("❌ Failed to create campaign") return if args.create_only: return # Run scraper if query provided if args.query and not args.csv: import subprocess safe = "".join(c if c.isalnum() else "_" for c in args.query)[:40] ts = datetime.now().strftime("%Y%m%d_%H%M%S") csv_path = f"/root/.hermes/cache/gmb/{safe}_{ts}.csv" print(f"\n🔍 Running GMB scraper: \"{args.query}\"") result = subprocess.run( [ sys.executable, "/root/tools/gmb-scraper/gmb_scraper.py", "-q", args.query, "-o", csv_path, "--min-rating", "4.0", "--max-results", "50", ], capture_output=False, ) if result.returncode != 0: print("❌ Scraper failed") return args.csv = csv_path # Import CSV if args.csv and campaign_id: print(f"\n📥 Importing leads from: {args.csv}") print(f" Campaign: {campaign_id}") result = import_leads_csv(args.csv, campaign_id) if result: print(f"\n✅ Import complete:") print(f" Imported: {result.get('imported', 0)}") print(f" Skipped: {result.get('skipped', 0)}") print(f" Errors: {result.get('errors', 0)}") if args.start_dialer: print("\n📞 Starting dialer...") dialer = start_dialer() if dialer: print(f"✅ Dialer started: {json.dumps(dialer, indent=2)}") else: print("❌ Import failed") elif not args.csv and not args.query: print("Provide --csv or --query to import leads") parser.print_help() if __name__ == "__main__": main()