[31388f42] Deep CE Sync: Support contact creation and automated enrichment workflow

This commit is contained in:
2026-03-02 10:01:11 +00:00
parent d08899a7a9
commit aa1a8699e0
3 changed files with 143 additions and 73 deletions

View File

@@ -64,10 +64,24 @@ def trigger_analysis(company_id: int) -> dict:
def get_company_details(company_id: int) -> dict:
"""Holt die vollständigen Details zu einem Unternehmen."""
return _make_api_request("GET", f"/companies/{company_id}")
def create_contact(company_id: int, contact_data: dict) -> dict:
"""Erstellt einen neuen Kontakt für ein Unternehmen im Company Explorer."""
payload = {
"company_id": company_id,
"first_name": contact_data.get("first_name"),
"last_name": contact_data.get("last_name"),
"email": contact_data.get("email"),
"job_title": contact_data.get("job_title"),
"role": contact_data.get("role"),
"is_primary": contact_data.get("is_primary", True)
}
return _make_api_request("POST", "/contacts", json_data=payload)
def handle_company_workflow(company_name: str) -> dict:
def handle_company_workflow(company_name: str, contact_info: dict = None) -> dict:
"""
Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an.
Optional wird auch ein Kontakt angelegt.
Gibt die finalen Unternehmensdaten zurück.
"""
print(f"Workflow gestartet für: '{company_name}'")
@@ -75,70 +89,60 @@ def handle_company_workflow(company_name: str) -> dict:
# 1. Prüfen, ob das Unternehmen existiert
existence_check = check_company_existence(company_name)
company_id = None
if existence_check.get("exists"):
company_id = existence_check["company"]["id"]
print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.")
final_company_data = get_company_details(company_id)
return {"status": "found", "data": final_company_data}
if "error" in existence_check:
elif "error" in existence_check:
print(f"Fehler bei der Existenzprüfung: {existence_check['error']}")
return {"status": "error", "message": existence_check['error']}
else:
# 2. Wenn nicht, Unternehmen erstellen
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
creation_response = create_company(company_name)
if "error" in creation_response:
return {"status": "error", "message": creation_response['error']}
company_id = creation_response.get("id")
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
# 2. Wenn nicht, Unternehmen erstellen
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
creation_response = create_company(company_name)
if "error" in creation_response:
print(f"Fehler bei der Erstellung: {creation_response['error']}")
return {"status": "error", "message": creation_response['error']}
company_id = creation_response.get("id")
if not company_id:
print(f"Fehler: Konnte keine ID aus der Erstellungs-Antwort extrahieren: {creation_response}")
return {"status": "error", "message": "Failed to get company ID after creation."}
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
# 2b. Kontakt anlegen/aktualisieren (falls Info vorhanden)
if company_id and contact_info:
print(f"Lege Kontakt für {contact_info.get('last_name')} an...")
contact_res = create_contact(company_id, contact_info)
if "error" in contact_res:
print(f"Hinweis: Kontakt konnte nicht angelegt werden: {contact_res['error']}")
# 3. Discovery anstoßen
print(f"Starte Discovery für ID {company_id}...")
discovery_status = trigger_discovery(company_id)
if "error" in discovery_status:
print(f"Fehler beim Anstoßen der Discovery: {discovery_status['error']}")
return {"status": "error", "message": discovery_status['error']}
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
max_wait_time = 30
start_time = time.time()
website_found = False
print("Warte auf Abschluss der Discovery (max. 30s)...")
while time.time() - start_time < max_wait_time:
details = get_company_details(company_id)
if details.get("website") and details["website"] not in ["", "k.A."]:
print(f"Website gefunden: {details['website']}")
website_found = True
break
time.sleep(3)
print(".")
if not website_found:
print("Discovery hat nach 30s keine Website gefunden. Breche Analyse ab.")
final_data = get_company_details(company_id)
return {"status": "created_discovery_timeout", "data": final_data}
# 5. Analyse anstoßen
print(f"Starte Analyse für ID {company_id}...")
analysis_status = trigger_analysis(company_id)
if "error" in analysis_status:
print(f"Fehler beim Anstoßen der Analyse: {analysis_status['error']}")
return {"status": "error", "message": analysis_status['error']}
print("Analyse-Prozess erfolgreich in die Warteschlange gestellt.")
# 3. Discovery anstoßen (falls Status NEW)
# Wir holen Details, um den Status zu prüfen
details = get_company_details(company_id)
if details.get("status") == "NEW":
print(f"Starte Discovery für ID {company_id}...")
trigger_discovery(company_id)
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
max_wait_time = 30
start_time = time.time()
website_found = False
print("Warte auf Abschluss der Discovery (max. 30s)...")
while time.time() - start_time < max_wait_time:
details = get_company_details(company_id)
if details.get("website") and details["website"] not in ["", "k.A."]:
print(f"Website gefunden: {details['website']}")
website_found = True
break
time.sleep(3)
print(".")
# 5. Analyse anstoßen (falls Website da, aber noch nicht ENRICHED)
if details.get("website") and details["website"] not in ["", "k.A."] and details.get("status") != "ENRICHED":
print(f"Starte Analyse für ID {company_id}...")
trigger_analysis(company_id)
# 6. Finale Daten abrufen und zurückgeben
final_company_data = get_company_details(company_id)
return {"status": "created_and_enriched", "data": final_company_data}
return {"status": "synced", "data": final_company_data}
if __name__ == "__main__":

View File

@@ -6,7 +6,8 @@ import sqlite3
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from company_explorer_connector import handle_company_workflow, get_company_details
from db import get_leads, DB_PATH
from db import get_leads, DB_PATH, update_lead_metadata
from lookup_role import lookup_person_role
def update_lead_enrichment(lead_id, data, status):
"""Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status."""
@@ -26,18 +27,54 @@ def refresh_ce_data(lead_id, ce_id):
print(f"Refreshing data for CE ID {ce_id}...")
ce_data = get_company_details(ce_id)
# Bestehende Enrichment-Daten holen, um nichts zu überschreiben
# (Vereinfachung: Wir bauen das dict neu auf)
enrichment_data = {
# Bestehende Enrichment-Daten holen
leads = get_leads()
lead = next((l for l in leads if l['id'] == lead_id), None)
enrichment_data = {}
if lead and lead.get('enrichment_data'):
try:
enrichment_data = json.loads(lead['enrichment_data'])
except:
pass
enrichment_data.update({
"sync_status": "refreshed",
"ce_id": ce_id,
"message": "Data refreshed from CE",
"ce_data": ce_data
}
})
update_lead_enrichment(lead_id, enrichment_data, status='synced')
return ce_data
def enrich_contact_role(lead):
"""
Versucht, die Rolle des Kontakts via SerpAPI zu finden und speichert sie in den Metadaten.
"""
meta = {}
if lead.get('lead_metadata'):
try:
meta = json.loads(lead.get('lead_metadata'))
except:
pass
# Skip if we already have a role (and it's not None/Unknown)
if meta.get('role') and meta.get('role') != "Unbekannt":
return meta.get('role')
print(f"Looking up role for {lead['contact_name']} at {lead['company_name']}...")
role = lookup_person_role(lead['contact_name'], lead['company_name'])
if role:
print(f" -> Found role: {role}")
meta['role'] = role
update_lead_metadata(lead['id'], meta)
else:
print(" -> No role found.")
return role
def run_sync():
"""
Haupt-Synchronisationsprozess.
@@ -56,15 +93,33 @@ def run_sync():
company_name = lead['company_name']
print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---")
# Rufe den zentralen Workflow auf, den wir im Connector definiert haben
# Diese Funktion kümmert sich um alles: prüfen, erstellen, discovern, pollen, analysieren
result = handle_company_workflow(company_name)
# 1. Contact Enrichment (Role Lookup via SerpAPI)
role = enrich_contact_role(lead)
# 2. Prepare Contact Info for CE
meta = {}
if lead.get('lead_metadata'):
try:
meta = json.loads(lead.get('lead_metadata'))
except:
pass
contact_info = {
"first_name": meta.get('contact_first', ''),
"last_name": meta.get('contact_last', lead['contact_name'].split(' ')[-1] if lead['contact_name'] else ''),
"email": lead['email'],
"job_title": meta.get('role', role), # The raw title or Gemini result
"role": meta.get('role', role) # Currently mapped to same field
}
# 3. Company Enrichment (CE Workflow with Contact)
result = handle_company_workflow(company_name, contact_info=contact_info)
# Bereite die Daten für die Speicherung in der DB vor
enrichment_data = {
"sync_status": result.get("status"),
"ce_id": result.get("data", {}).get("id") if result.get("data") else None,
"message": result.get("message"),
"message": result.get("message", "Sync successful"),
"ce_data": result.get("data")
}

View File

@@ -111,7 +111,7 @@ def parse_tradingtwins_html(html_body):
return data
def process_leads():
def process_leads(auto_sync=True):
init_db()
new_count = 0
try:
@@ -120,11 +120,21 @@ def process_leads():
logger.info(f"Found {len(emails)} Tradingtwins emails.")
for email in emails:
# ... (parsing logic remains same)
body = email.get('body', {}).get('content', '')
lead_data = parse_tradingtwins_html(body)
received_at_str = email.get('receivedDateTime')
# Add raw body for reference
# Convert ISO string to datetime object
received_at = None
if received_at_str:
try:
received_at = datetime.fromisoformat(received_at_str.replace('Z', '+00:00'))
except:
pass
lead_data = parse_tradingtwins_html(body)
lead_data['raw_body'] = body
lead_data['received_at'] = received_at
company_name = lead_data.get('company')
if not company_name or company_name == '-':
@@ -132,17 +142,18 @@ def process_leads():
lead_data['company'] = company_name
if not company_name:
logger.warning(f"Skipping email {email['id']}: No company or contact name found.")
continue
logger.info(f"Ingesting Lead: {company_name} (ID: {lead_data.get('id')})")
lead_data['id'] = lead_data.get('source_id') or f"tt_{int(datetime.now().timestamp())}"
# Save to local DB (status=new)
if insert_lead(lead_data):
logger.info(f" -> Successfully saved to DB.")
logger.info(f" -> Ingested: {company_name}")
new_count += 1
else:
logger.info(f" -> Lead already exists (skipped).")
if new_count > 0 and auto_sync:
logger.info(f"Triggering auto-sync for {new_count} new leads...")
from enrich import run_sync
run_sync()
return new_count