From 907b9ab621fb5de8cb90598fce4418f1d9dc3f07 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Fri, 30 Jan 2026 19:27:05 +0000 Subject: [PATCH] Feat: Full integration with CE API (Create + Discover) --- enrich.py | 184 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 113 insertions(+), 71 deletions(-) diff --git a/enrich.py b/enrich.py index 43b1d64..f9d6476 100644 --- a/enrich.py +++ b/enrich.py @@ -1,107 +1,149 @@ import json import requests import os +import time from requests.auth import HTTPBasicAuth from db import update_lead_status, get_leads, DB_PATH import sqlite3 -# Config +# --- Configuration --- +# Defaults matching our successful test CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api") CE_API_USER = os.getenv("CE_API_USER", "admin") -CE_API_PASS = os.getenv("CE_API_PASS", "") +CE_API_PASS = os.getenv("CE_API_PASS", "gemini") -def query_company_explorer(company_name): - """ - Fragt die Company Explorer API nach Details zur Firma. - Gibt None zurück, wenn die API nicht erreichbar ist. - """ - if not CE_API_PASS: - print("⚠️ Warning: No CE_API_PASS set. Skipping API call.") - return None +def get_auth(): + return HTTPBasicAuth(CE_API_USER, CE_API_PASS) +def find_company(name): + """Prüft, ob Firma im CE existiert.""" try: - # Beispiel-Endpoint: Suche nach Firma - # Wir nehmen an, es gibt einen /search Endpoint - response = requests.get( - f"{CE_API_URL}/companies/search", - params={"q": company_name}, - auth=HTTPBasicAuth(CE_API_USER, CE_API_PASS), + res = requests.get( + f"{CE_API_URL}/companies", + params={"search": name, "limit": 1}, + auth=get_auth(), timeout=5 ) - - if response.status_code == 200: - results = response.json() - if results and isinstance(results, list) and len(results) > 0: - return results[0] # Best Match - elif response.status_code == 401: - print("❌ API Auth failed (401). Check password.") - + if res.status_code == 200: + data = res.json() + if data.get("items"): + return data["items"][0] # Return first match except Exception as e: - print(f"❌ API Error: {e}") - + print(f"❌ CE API Error (Find): {e}") return None -def get_mock_enrichment(company_name, raw_body): - """Fallback Logic wenn API nicht verfügbar""" - enrichment = { - 'vertical': 'Unknown', - 'score': 0, - 'recommendation': 'Manual Review (Mock Data)' +def create_company(lead_data): + """Legt Firma im CE an.""" + payload = { + "name": lead_data['company_name'], + "city": lead_data.get('city', ''), # Falls Parser City extrahiert hat + "country": "DE", + "website": None # Discovery soll suchen } - - name_lower = company_name.lower() - - if 'küche' in name_lower or 'einbau' in name_lower: - enrichment['vertical'] = 'Manufacturing / Woodworking' - enrichment['score'] = 85 - enrichment['recommendation'] = 'High Priority - Pitch Dust Control' - enrichment['area_estimate'] = '> 1000m²' - - if 'shop' in name_lower or 'roller' in name_lower: - enrichment['vertical'] = 'Retail / Automotive' - enrichment['score'] = 60 - enrichment['recommendation'] = 'Medium Priority - Pitch Showroom Cleanliness' - enrichment['area_estimate'] = '300-500m²' + try: + res = requests.post( + f"{CE_API_URL}/companies", + json=payload, + auth=get_auth(), + timeout=5 + ) + if res.status_code == 200: + return res.json() + else: + print(f"❌ Create Failed: {res.status_code} {res.text}") + except Exception as e: + print(f"❌ CE API Error (Create): {e}") + return None - return enrichment +def trigger_discovery(company_id): + """Startet die automatische Webseiten-Suche im CE.""" + try: + res = requests.post( + f"{CE_API_URL}/enrich/discover", + json={"company_id": company_id}, + auth=get_auth(), + timeout=5 + ) + return res.status_code == 200 + except Exception as e: + print(f"❌ Trigger Discovery Failed: {e}") + return False -def enrich_lead(lead): - print(f"🔍 Enriching: {lead['company_name']}...") +def process_lead(lead): + print(f"\n⚙️ Processing Lead: {lead['company_name']} ({lead['id']})") - # 1. Versuch: Echte API - api_data = query_company_explorer(lead['company_name']) + # 1. Parsing Helper (falls City im raw_body steckt, aber nicht in DB Spalte) + # Hier vereinfacht: Wir nehmen an, company_name ist sauber. - enrichment = {} + # 2. Check Existence + ce_company = find_company(lead['company_name']) - if api_data: - print("✅ API Hit!") - # Mapping der API-Daten auf unser Format - enrichment = { - 'vertical': api_data.get('industry', 'Unknown'), - 'score': api_data.get('score', 50), # Annahme: API liefert Score - 'recommendation': 'Data from Company Explorer', - 'details': api_data - } + enrichment_info = { + "ce_status": "unknown", + "ce_id": None, + "message": "" + } + + if ce_company: + print(f"✅ Company exists in CE (ID: {ce_company['id']})") + enrichment_info["ce_status"] = "existing" + enrichment_info["ce_id"] = ce_company['id'] + enrichment_info["ce_data"] = ce_company + enrichment_info["message"] = f"Matched existing account. Status: {ce_company.get('status')}" else: - print("⚠️ API Miss/Fail -> Using Mock Fallback") - enrichment = get_mock_enrichment(lead['company_name'], lead['raw_body']) + print(f"✨ New Company. Creating in CE...") + # Versuch Stadt aus raw_body zu extrahieren (Quick Hack für MVP) + city = "" + if "Stadt:" in lead['raw_body']: + try: + for line in lead['raw_body'].split('\n'): + if "Stadt:" in line: + city = line.split("Stadt:")[1].strip() + break + except: pass + + new_company = create_company({ + "company_name": lead['company_name'], + "city": city + }) + + if new_company: + print(f"✅ Created (ID: {new_company['id']}). Triggering Discovery...") + enrichment_info["ce_status"] = "created" + enrichment_info["ce_id"] = new_company['id'] + enrichment_info["message"] = "Created new account." + + # 3. Trigger Discovery + if trigger_discovery(new_company['id']): + enrichment_info["message"] += " Discovery started." + print("🚀 Discovery queued.") + else: + enrichment_info["message"] += " Discovery trigger failed." + else: + enrichment_info["message"] = "Failed to create in CE." - # Update DB + # 4. Save State locally conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?', - (json.dumps(enrichment), 'enriched', lead['id'])) + (json.dumps(enrichment_info), 'synced', lead['id'])) conn.commit() conn.close() - -def run_enrichment(): + +def run_sync(): leads = get_leads() + print(f"Found {len(leads)} leads in local DB.") count = 0 for lead in leads: - if lead['status'] == 'new': - enrich_lead(lead) + # Wir verarbeiten 'new' leads oder solche, die nur 'mock' daten haben (status=enriched aber kein ce_id) + enrichment = json.loads(lead['enrichment_data']) if lead['enrichment_data'] else {} + is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)' + + if lead['status'] == 'new' or is_mock: + process_lead(lead) count += 1 - print(f"Done. Enriched {count} leads.") + + print(f"Sync complete. Processed {count} leads.") if __name__ == "__main__": - run_enrichment() + run_sync()