Feat: Full integration with CE API (Create + Discover)

This commit is contained in:
Jarvis
2026-01-30 19:27:05 +00:00
parent ee7350d727
commit 907b9ab621

190
enrich.py
View File

@@ -1,107 +1,149 @@
import json import json
import requests import requests
import os import os
import time
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from db import update_lead_status, get_leads, DB_PATH from db import update_lead_status, get_leads, DB_PATH
import sqlite3 import sqlite3
# Config # --- Configuration ---
# Defaults matching our successful test
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api") CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
CE_API_USER = os.getenv("CE_API_USER", "admin") CE_API_USER = os.getenv("CE_API_USER", "admin")
CE_API_PASS = os.getenv("CE_API_PASS", "") CE_API_PASS = os.getenv("CE_API_PASS", "gemini")
def query_company_explorer(company_name): def get_auth():
""" return HTTPBasicAuth(CE_API_USER, CE_API_PASS)
Fragt die Company Explorer API nach Details zur Firma.
Gibt None zurück, wenn die API nicht erreichbar ist.
"""
if not CE_API_PASS:
print("⚠️ Warning: No CE_API_PASS set. Skipping API call.")
return None
def find_company(name):
"""Prüft, ob Firma im CE existiert."""
try: try:
# Beispiel-Endpoint: Suche nach Firma res = requests.get(
# Wir nehmen an, es gibt einen /search Endpoint f"{CE_API_URL}/companies",
response = requests.get( params={"search": name, "limit": 1},
f"{CE_API_URL}/companies/search", auth=get_auth(),
params={"q": company_name},
auth=HTTPBasicAuth(CE_API_USER, CE_API_PASS),
timeout=5 timeout=5
) )
if res.status_code == 200:
if response.status_code == 200: data = res.json()
results = response.json() if data.get("items"):
if results and isinstance(results, list) and len(results) > 0: return data["items"][0] # Return first match
return results[0] # Best Match
elif response.status_code == 401:
print("❌ API Auth failed (401). Check password.")
except Exception as e: except Exception as e:
print(f"❌ API Error: {e}") print(f" CE API Error (Find): {e}")
return None return None
def get_mock_enrichment(company_name, raw_body): def create_company(lead_data):
"""Fallback Logic wenn API nicht verfügbar""" """Legt Firma im CE an."""
enrichment = { payload = {
'vertical': 'Unknown', "name": lead_data['company_name'],
'score': 0, "city": lead_data.get('city', ''), # Falls Parser City extrahiert hat
'recommendation': 'Manual Review (Mock Data)' "country": "DE",
"website": None # Discovery soll suchen
}
try:
res = requests.post(
f"{CE_API_URL}/companies",
json=payload,
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
return res.json()
else:
print(f"❌ Create Failed: {res.status_code} {res.text}")
except Exception as e:
print(f"❌ CE API Error (Create): {e}")
return None
def trigger_discovery(company_id):
"""Startet die automatische Webseiten-Suche im CE."""
try:
res = requests.post(
f"{CE_API_URL}/enrich/discover",
json={"company_id": company_id},
auth=get_auth(),
timeout=5
)
return res.status_code == 200
except Exception as e:
print(f"❌ Trigger Discovery Failed: {e}")
return False
def process_lead(lead):
print(f"\n⚙️ Processing Lead: {lead['company_name']} ({lead['id']})")
# 1. Parsing Helper (falls City im raw_body steckt, aber nicht in DB Spalte)
# Hier vereinfacht: Wir nehmen an, company_name ist sauber.
# 2. Check Existence
ce_company = find_company(lead['company_name'])
enrichment_info = {
"ce_status": "unknown",
"ce_id": None,
"message": ""
} }
name_lower = company_name.lower() if ce_company:
print(f"✅ Company exists in CE (ID: {ce_company['id']})")
if 'küche' in name_lower or 'einbau' in name_lower: enrichment_info["ce_status"] = "existing"
enrichment['vertical'] = 'Manufacturing / Woodworking' enrichment_info["ce_id"] = ce_company['id']
enrichment['score'] = 85 enrichment_info["ce_data"] = ce_company
enrichment['recommendation'] = 'High Priority - Pitch Dust Control' enrichment_info["message"] = f"Matched existing account. Status: {ce_company.get('status')}"
enrichment['area_estimate'] = '> 1000m²'
if 'shop' in name_lower or 'roller' in name_lower:
enrichment['vertical'] = 'Retail / Automotive'
enrichment['score'] = 60
enrichment['recommendation'] = 'Medium Priority - Pitch Showroom Cleanliness'
enrichment['area_estimate'] = '300-500m²'
return enrichment
def enrich_lead(lead):
print(f"🔍 Enriching: {lead['company_name']}...")
# 1. Versuch: Echte API
api_data = query_company_explorer(lead['company_name'])
enrichment = {}
if api_data:
print("✅ API Hit!")
# Mapping der API-Daten auf unser Format
enrichment = {
'vertical': api_data.get('industry', 'Unknown'),
'score': api_data.get('score', 50), # Annahme: API liefert Score
'recommendation': 'Data from Company Explorer',
'details': api_data
}
else: else:
print("⚠️ API Miss/Fail -> Using Mock Fallback") print(f"✨ New Company. Creating in CE...")
enrichment = get_mock_enrichment(lead['company_name'], lead['raw_body']) # Versuch Stadt aus raw_body zu extrahieren (Quick Hack für MVP)
city = ""
if "Stadt:" in lead['raw_body']:
try:
for line in lead['raw_body'].split('\n'):
if "Stadt:" in line:
city = line.split("Stadt:")[1].strip()
break
except: pass
# Update DB new_company = create_company({
"company_name": lead['company_name'],
"city": city
})
if new_company:
print(f"✅ Created (ID: {new_company['id']}). Triggering Discovery...")
enrichment_info["ce_status"] = "created"
enrichment_info["ce_id"] = new_company['id']
enrichment_info["message"] = "Created new account."
# 3. Trigger Discovery
if trigger_discovery(new_company['id']):
enrichment_info["message"] += " Discovery started."
print("🚀 Discovery queued.")
else:
enrichment_info["message"] += " Discovery trigger failed."
else:
enrichment_info["message"] = "Failed to create in CE."
# 4. Save State locally
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(DB_PATH)
c = conn.cursor() c = conn.cursor()
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?', c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
(json.dumps(enrichment), 'enriched', lead['id'])) (json.dumps(enrichment_info), 'synced', lead['id']))
conn.commit() conn.commit()
conn.close() conn.close()
def run_enrichment(): def run_sync():
leads = get_leads() leads = get_leads()
print(f"Found {len(leads)} leads in local DB.")
count = 0 count = 0
for lead in leads: for lead in leads:
if lead['status'] == 'new': # Wir verarbeiten 'new' leads oder solche, die nur 'mock' daten haben (status=enriched aber kein ce_id)
enrich_lead(lead) enrichment = json.loads(lead['enrichment_data']) if lead['enrichment_data'] else {}
is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)'
if lead['status'] == 'new' or is_mock:
process_lead(lead)
count += 1 count += 1
print(f"Done. Enriched {count} leads.")
print(f"Sync complete. Processed {count} leads.")
if __name__ == "__main__": if __name__ == "__main__":
run_enrichment() run_sync()