feat(company-explorer-integration): Implement Company Explorer Connector and Lead Engine Sync [2f988f42]\n\nIntegrates the Company Explorer API into the Lead Engine workflow, allowing for robust company existence checks, creation, and asynchronous enrichment.\n\n- Introduced as a central client wrapper for the Company Explorer API, handling find-or-create logic, discovery, polling for website data, and analysis triggers.\n- Updated to utilize the new connector for syncing new leads with the Company Explorer.\n- Added for comprehensive unit testing of the connector logic.\n- Created as a demonstration script for the end-to-end integration.\n- Updated to document the new client integration architecture pattern.\n\nThis enhances the Lead Engine with a reliable mechanism for company data enrichment.

This commit is contained in:
2026-02-01 19:55:12 +00:00
parent 5a99865c01
commit 4f2106c832
5 changed files with 385 additions and 220 deletions

View File

@@ -1,235 +1,60 @@
import json
import requests
import sys
import os
import time
from requests.auth import HTTPBasicAuth
from db import update_lead_status, get_leads, DB_PATH
import sqlite3
# --- Configuration ---
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
CE_API_USER = os.getenv("CE_API_USER", "admin")
CE_API_PASS = os.getenv("CE_API_PASS", "gemini")
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from company_explorer_connector import handle_company_workflow
from db import get_leads, DB_PATH
def get_auth():
return HTTPBasicAuth(CE_API_USER, CE_API_PASS)
# 1. CORE LOGIC: Interaction with Company Explorer
def find_company(name):
"""Prüft, ob Firma im CE existiert."""
try:
res = requests.get(
f"{CE_API_URL}/companies",
params={"search": name, "limit": 1},
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
data = res.json()
if data.get("items"):
return data["items"][0] # Return first match
except Exception as e:
print(f"❌ CE API Error (Find '{name}'): {e}")
return None
def create_company(lead_data):
"""Legt Firma im CE an."""
payload = {
"name": lead_data['company_name'],
"city": lead_data.get('city', ''),
"country": "DE",
"website": None
}
try:
res = requests.post(
f"{CE_API_URL}/companies",
json=payload,
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
return res.json()
else:
print(f"❌ Create Failed: {res.status_code} {res.text}")
except Exception as e:
print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}")
return None
def trigger_discovery(company_id):
"""Startet die automatische Webseiten-Suche im CE."""
try:
res = requests.post(
f"{CE_API_URL}/enrich/discover",
json={"company_id": company_id},
auth=get_auth(),
timeout=5
)
return res.status_code == 200
except Exception as e:
print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}")
return False
def get_ce_status(company_id):
"""Holt den aktuellen Status einer Firma aus dem CE."""
try:
res = requests.get(
f"{CE_API_URL}/companies/{company_id}",
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
return res.json()
except Exception as e:
print(f"❌ Get Status Failed (ID {company_id}): {e}")
return None
def get_pitch(company_id):
"""
Versucht, einen generierten Pitch abzurufen.
(Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet)
"""
# TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch)
# For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally
company = get_ce_status(company_id)
if company and company.get('industry_ai'):
industry = company['industry_ai']
# Simple Template based on industry
return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..."
return None
# 2. ORCHESTRATION: The Sync Process
def process_new_lead(lead):
"""Phase 1: Identifikation & Anlage"""
print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})")
ce_company = find_company(lead['company_name'])
enrichment_info = {
"ce_status": "unknown",
"ce_id": None,
"message": "",
"last_check": time.time()
}
if ce_company:
print(f"✅ Company exists in CE (ID: {ce_company['id']})")
enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched
enrichment_info["ce_id"] = ce_company['id']
enrichment_info["ce_data"] = ce_company
enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})."
# If existing company is not enriched yet, trigger discovery too?
if ce_company.get('status') == 'NEW':
trigger_discovery(ce_company['id'])
enrichment_info["message"] += " Triggered Discovery for existing raw account."
else:
print(f"✨ New Company. Creating in CE...")
city = ""
# Simple extraction
if lead['raw_body'] and "Stadt:" in lead['raw_body']:
try:
for line in lead['raw_body'].split('\n'):
if "Stadt:" in line:
city = line.split("Stadt:")[1].strip()
break
except: pass
new_company = create_company({
"company_name": lead['company_name'],
"city": city
})
if new_company:
print(f"✅ Created (ID: {new_company['id']}).")
enrichment_info["ce_status"] = "created"
enrichment_info["ce_id"] = new_company['id']
if trigger_discovery(new_company['id']):
enrichment_info["message"] = "Created & Discovery started."
print("🚀 Discovery queued.")
else:
enrichment_info["message"] = "Created, but Discovery trigger failed."
else:
enrichment_info["message"] = "Failed to create in CE."
# Save State
update_lead_enrichment(lead['id'], enrichment_info, status='synced')
def process_synced_lead(lead):
"""Phase 2: Closing the Loop (Abrufen der Ergebnisse)"""
enrichment = json.loads(lead['enrichment_data'])
ce_id = enrichment.get('ce_id')
if not ce_id:
return # Should not happen if status is synced
print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})")
company = get_ce_status(ce_id)
if not company:
return
# Check if CE is done
ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED...
if ce_status == 'ENRICHED':
print(f"✅ Analysis Complete! Fetching Pitch...")
# Get Pitch (Mocked or Real)
pitch = get_pitch(ce_id)
if pitch:
enrichment['pitch'] = pitch
enrichment['ce_data'] = company # Update with latest data
enrichment['message'] = "Enrichment complete. Pitch ready."
# Save final state
update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review
print(f"🎉 Lead is ready for response!")
else:
print("⚠️ Enriched, but no pitch generated yet.")
else:
print(f"⏳ Still processing in CE (Status: {ce_status})...")
def update_lead_enrichment(lead_id, data, status=None):
def update_lead_enrichment(lead_id, data, status):
"""Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
if status:
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
(json.dumps(data), status, lead_id))
else:
c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?',
(json.dumps(data), lead_id))
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
(json.dumps(data), status, lead_id))
conn.commit()
conn.close()
print(f"Lead {lead_id} aktualisiert. Neuer Status: {status}")
def run_sync():
leads = get_leads()
print(f"Scanning {len(leads)} leads...")
"""
Haupt-Synchronisationsprozess.
Holt alle neuen Leads und stößt den Company Explorer Workflow für jeden an.
"""
# Hole nur die Leads, die wirklich neu sind und noch nicht verarbeitet wurden
leads_to_process = [lead for lead in get_leads() if lead['status'] == 'new']
count = 0
for lead in leads:
status = lead['status']
# Fallback for old/mock leads or explicit 'new' status
is_mock = False
if lead['enrichment_data']:
try:
enrichment = json.loads(lead['enrichment_data'])
is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)'
except: pass
print(f"Found {len(leads_to_process)} new leads to sync with Company Explorer.")
if not leads_to_process:
print("No new leads to process. Sync finished.")
return
if status == 'new' or not status or is_mock:
process_new_lead(lead)
count += 1
elif status == 'synced':
process_synced_lead(lead)
count += 1
print(f"Sync cycle finished. Processed {count} leads.")
for lead in leads_to_process:
company_name = lead['company_name']
print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---")
# Rufe den zentralen Workflow auf, den wir im Connector definiert haben
# Diese Funktion kümmert sich um alles: prüfen, erstellen, discovern, pollen, analysieren
result = handle_company_workflow(company_name)
# Bereite die Daten für die Speicherung in der DB vor
enrichment_data = {
"sync_status": result.get("status"),
"ce_id": result.get("data", {}).get("id") if result.get("data") else None,
"message": result.get("message"),
"ce_data": result.get("data")
}
# Setze den finalen Status und speichere die Daten in der DB
update_lead_enrichment(lead['id'], enrichment_data, status='synced')
print("\n--- Sync cycle finished. ---")
if __name__ == "__main__":
# Dieser Block ermöglicht es, das Skript direkt für Debugging-Zwecke auszuführen
print("Running manual sync...")
run_sync()
print("Manual sync finished.")