[31388f42] Deep CE Sync: Support contact creation and automated enrichment workflow
This commit is contained in:
@@ -64,10 +64,24 @@ def trigger_analysis(company_id: int) -> dict:
|
|||||||
def get_company_details(company_id: int) -> dict:
|
def get_company_details(company_id: int) -> dict:
|
||||||
"""Holt die vollständigen Details zu einem Unternehmen."""
|
"""Holt die vollständigen Details zu einem Unternehmen."""
|
||||||
return _make_api_request("GET", f"/companies/{company_id}")
|
return _make_api_request("GET", f"/companies/{company_id}")
|
||||||
|
|
||||||
|
def create_contact(company_id: int, contact_data: dict) -> dict:
|
||||||
|
"""Erstellt einen neuen Kontakt für ein Unternehmen im Company Explorer."""
|
||||||
|
payload = {
|
||||||
|
"company_id": company_id,
|
||||||
|
"first_name": contact_data.get("first_name"),
|
||||||
|
"last_name": contact_data.get("last_name"),
|
||||||
|
"email": contact_data.get("email"),
|
||||||
|
"job_title": contact_data.get("job_title"),
|
||||||
|
"role": contact_data.get("role"),
|
||||||
|
"is_primary": contact_data.get("is_primary", True)
|
||||||
|
}
|
||||||
|
return _make_api_request("POST", "/contacts", json_data=payload)
|
||||||
|
|
||||||
def handle_company_workflow(company_name: str) -> dict:
|
def handle_company_workflow(company_name: str, contact_info: dict = None) -> dict:
|
||||||
"""
|
"""
|
||||||
Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an.
|
Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an.
|
||||||
|
Optional wird auch ein Kontakt angelegt.
|
||||||
Gibt die finalen Unternehmensdaten zurück.
|
Gibt die finalen Unternehmensdaten zurück.
|
||||||
"""
|
"""
|
||||||
print(f"Workflow gestartet für: '{company_name}'")
|
print(f"Workflow gestartet für: '{company_name}'")
|
||||||
@@ -75,70 +89,60 @@ def handle_company_workflow(company_name: str) -> dict:
|
|||||||
# 1. Prüfen, ob das Unternehmen existiert
|
# 1. Prüfen, ob das Unternehmen existiert
|
||||||
existence_check = check_company_existence(company_name)
|
existence_check = check_company_existence(company_name)
|
||||||
|
|
||||||
|
company_id = None
|
||||||
if existence_check.get("exists"):
|
if existence_check.get("exists"):
|
||||||
company_id = existence_check["company"]["id"]
|
company_id = existence_check["company"]["id"]
|
||||||
print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.")
|
print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.")
|
||||||
final_company_data = get_company_details(company_id)
|
elif "error" in existence_check:
|
||||||
return {"status": "found", "data": final_company_data}
|
|
||||||
|
|
||||||
if "error" in existence_check:
|
|
||||||
print(f"Fehler bei der Existenzprüfung: {existence_check['error']}")
|
print(f"Fehler bei der Existenzprüfung: {existence_check['error']}")
|
||||||
return {"status": "error", "message": existence_check['error']}
|
return {"status": "error", "message": existence_check['error']}
|
||||||
|
else:
|
||||||
|
# 2. Wenn nicht, Unternehmen erstellen
|
||||||
|
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
|
||||||
|
creation_response = create_company(company_name)
|
||||||
|
|
||||||
|
if "error" in creation_response:
|
||||||
|
return {"status": "error", "message": creation_response['error']}
|
||||||
|
|
||||||
|
company_id = creation_response.get("id")
|
||||||
|
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
|
||||||
|
|
||||||
# 2. Wenn nicht, Unternehmen erstellen
|
# 2b. Kontakt anlegen/aktualisieren (falls Info vorhanden)
|
||||||
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
|
if company_id and contact_info:
|
||||||
creation_response = create_company(company_name)
|
print(f"Lege Kontakt für {contact_info.get('last_name')} an...")
|
||||||
|
contact_res = create_contact(company_id, contact_info)
|
||||||
if "error" in creation_response:
|
if "error" in contact_res:
|
||||||
print(f"Fehler bei der Erstellung: {creation_response['error']}")
|
print(f"Hinweis: Kontakt konnte nicht angelegt werden: {contact_res['error']}")
|
||||||
return {"status": "error", "message": creation_response['error']}
|
|
||||||
|
|
||||||
company_id = creation_response.get("id")
|
|
||||||
if not company_id:
|
|
||||||
print(f"Fehler: Konnte keine ID aus der Erstellungs-Antwort extrahieren: {creation_response}")
|
|
||||||
return {"status": "error", "message": "Failed to get company ID after creation."}
|
|
||||||
|
|
||||||
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
|
|
||||||
|
|
||||||
# 3. Discovery anstoßen
|
# 3. Discovery anstoßen (falls Status NEW)
|
||||||
print(f"Starte Discovery für ID {company_id}...")
|
# Wir holen Details, um den Status zu prüfen
|
||||||
discovery_status = trigger_discovery(company_id)
|
details = get_company_details(company_id)
|
||||||
if "error" in discovery_status:
|
if details.get("status") == "NEW":
|
||||||
print(f"Fehler beim Anstoßen der Discovery: {discovery_status['error']}")
|
print(f"Starte Discovery für ID {company_id}...")
|
||||||
return {"status": "error", "message": discovery_status['error']}
|
trigger_discovery(company_id)
|
||||||
|
|
||||||
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
|
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
|
||||||
max_wait_time = 30
|
max_wait_time = 30
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
website_found = False
|
website_found = False
|
||||||
print("Warte auf Abschluss der Discovery (max. 30s)...")
|
print("Warte auf Abschluss der Discovery (max. 30s)...")
|
||||||
while time.time() - start_time < max_wait_time:
|
while time.time() - start_time < max_wait_time:
|
||||||
details = get_company_details(company_id)
|
details = get_company_details(company_id)
|
||||||
if details.get("website") and details["website"] not in ["", "k.A."]:
|
if details.get("website") and details["website"] not in ["", "k.A."]:
|
||||||
print(f"Website gefunden: {details['website']}")
|
print(f"Website gefunden: {details['website']}")
|
||||||
website_found = True
|
website_found = True
|
||||||
break
|
break
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
print(".")
|
print(".")
|
||||||
|
|
||||||
if not website_found:
|
# 5. Analyse anstoßen (falls Website da, aber noch nicht ENRICHED)
|
||||||
print("Discovery hat nach 30s keine Website gefunden. Breche Analyse ab.")
|
if details.get("website") and details["website"] not in ["", "k.A."] and details.get("status") != "ENRICHED":
|
||||||
final_data = get_company_details(company_id)
|
print(f"Starte Analyse für ID {company_id}...")
|
||||||
return {"status": "created_discovery_timeout", "data": final_data}
|
trigger_analysis(company_id)
|
||||||
|
|
||||||
# 5. Analyse anstoßen
|
|
||||||
print(f"Starte Analyse für ID {company_id}...")
|
|
||||||
analysis_status = trigger_analysis(company_id)
|
|
||||||
if "error" in analysis_status:
|
|
||||||
print(f"Fehler beim Anstoßen der Analyse: {analysis_status['error']}")
|
|
||||||
return {"status": "error", "message": analysis_status['error']}
|
|
||||||
|
|
||||||
print("Analyse-Prozess erfolgreich in die Warteschlange gestellt.")
|
|
||||||
|
|
||||||
# 6. Finale Daten abrufen und zurückgeben
|
# 6. Finale Daten abrufen und zurückgeben
|
||||||
final_company_data = get_company_details(company_id)
|
final_company_data = get_company_details(company_id)
|
||||||
|
return {"status": "synced", "data": final_company_data}
|
||||||
return {"status": "created_and_enriched", "data": final_company_data}
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -6,7 +6,8 @@ import sqlite3
|
|||||||
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
|
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
from company_explorer_connector import handle_company_workflow, get_company_details
|
from company_explorer_connector import handle_company_workflow, get_company_details
|
||||||
from db import get_leads, DB_PATH
|
from db import get_leads, DB_PATH, update_lead_metadata
|
||||||
|
from lookup_role import lookup_person_role
|
||||||
|
|
||||||
def update_lead_enrichment(lead_id, data, status):
|
def update_lead_enrichment(lead_id, data, status):
|
||||||
"""Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status."""
|
"""Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status."""
|
||||||
@@ -26,18 +27,54 @@ def refresh_ce_data(lead_id, ce_id):
|
|||||||
print(f"Refreshing data for CE ID {ce_id}...")
|
print(f"Refreshing data for CE ID {ce_id}...")
|
||||||
ce_data = get_company_details(ce_id)
|
ce_data = get_company_details(ce_id)
|
||||||
|
|
||||||
# Bestehende Enrichment-Daten holen, um nichts zu überschreiben
|
# Bestehende Enrichment-Daten holen
|
||||||
# (Vereinfachung: Wir bauen das dict neu auf)
|
leads = get_leads()
|
||||||
enrichment_data = {
|
lead = next((l for l in leads if l['id'] == lead_id), None)
|
||||||
|
|
||||||
|
enrichment_data = {}
|
||||||
|
if lead and lead.get('enrichment_data'):
|
||||||
|
try:
|
||||||
|
enrichment_data = json.loads(lead['enrichment_data'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
enrichment_data.update({
|
||||||
"sync_status": "refreshed",
|
"sync_status": "refreshed",
|
||||||
"ce_id": ce_id,
|
"ce_id": ce_id,
|
||||||
"message": "Data refreshed from CE",
|
"message": "Data refreshed from CE",
|
||||||
"ce_data": ce_data
|
"ce_data": ce_data
|
||||||
}
|
})
|
||||||
|
|
||||||
update_lead_enrichment(lead_id, enrichment_data, status='synced')
|
update_lead_enrichment(lead_id, enrichment_data, status='synced')
|
||||||
return ce_data
|
return ce_data
|
||||||
|
|
||||||
|
def enrich_contact_role(lead):
|
||||||
|
"""
|
||||||
|
Versucht, die Rolle des Kontakts via SerpAPI zu finden und speichert sie in den Metadaten.
|
||||||
|
"""
|
||||||
|
meta = {}
|
||||||
|
if lead.get('lead_metadata'):
|
||||||
|
try:
|
||||||
|
meta = json.loads(lead.get('lead_metadata'))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Skip if we already have a role (and it's not None/Unknown)
|
||||||
|
if meta.get('role') and meta.get('role') != "Unbekannt":
|
||||||
|
return meta.get('role')
|
||||||
|
|
||||||
|
print(f"Looking up role for {lead['contact_name']} at {lead['company_name']}...")
|
||||||
|
role = lookup_person_role(lead['contact_name'], lead['company_name'])
|
||||||
|
|
||||||
|
if role:
|
||||||
|
print(f" -> Found role: {role}")
|
||||||
|
meta['role'] = role
|
||||||
|
update_lead_metadata(lead['id'], meta)
|
||||||
|
else:
|
||||||
|
print(" -> No role found.")
|
||||||
|
|
||||||
|
return role
|
||||||
|
|
||||||
def run_sync():
|
def run_sync():
|
||||||
"""
|
"""
|
||||||
Haupt-Synchronisationsprozess.
|
Haupt-Synchronisationsprozess.
|
||||||
@@ -56,15 +93,33 @@ def run_sync():
|
|||||||
company_name = lead['company_name']
|
company_name = lead['company_name']
|
||||||
print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---")
|
print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---")
|
||||||
|
|
||||||
# Rufe den zentralen Workflow auf, den wir im Connector definiert haben
|
# 1. Contact Enrichment (Role Lookup via SerpAPI)
|
||||||
# Diese Funktion kümmert sich um alles: prüfen, erstellen, discovern, pollen, analysieren
|
role = enrich_contact_role(lead)
|
||||||
result = handle_company_workflow(company_name)
|
|
||||||
|
# 2. Prepare Contact Info for CE
|
||||||
|
meta = {}
|
||||||
|
if lead.get('lead_metadata'):
|
||||||
|
try:
|
||||||
|
meta = json.loads(lead.get('lead_metadata'))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
contact_info = {
|
||||||
|
"first_name": meta.get('contact_first', ''),
|
||||||
|
"last_name": meta.get('contact_last', lead['contact_name'].split(' ')[-1] if lead['contact_name'] else ''),
|
||||||
|
"email": lead['email'],
|
||||||
|
"job_title": meta.get('role', role), # The raw title or Gemini result
|
||||||
|
"role": meta.get('role', role) # Currently mapped to same field
|
||||||
|
}
|
||||||
|
|
||||||
|
# 3. Company Enrichment (CE Workflow with Contact)
|
||||||
|
result = handle_company_workflow(company_name, contact_info=contact_info)
|
||||||
|
|
||||||
# Bereite die Daten für die Speicherung in der DB vor
|
# Bereite die Daten für die Speicherung in der DB vor
|
||||||
enrichment_data = {
|
enrichment_data = {
|
||||||
"sync_status": result.get("status"),
|
"sync_status": result.get("status"),
|
||||||
"ce_id": result.get("data", {}).get("id") if result.get("data") else None,
|
"ce_id": result.get("data", {}).get("id") if result.get("data") else None,
|
||||||
"message": result.get("message"),
|
"message": result.get("message", "Sync successful"),
|
||||||
"ce_data": result.get("data")
|
"ce_data": result.get("data")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ def parse_tradingtwins_html(html_body):
|
|||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def process_leads():
|
def process_leads(auto_sync=True):
|
||||||
init_db()
|
init_db()
|
||||||
new_count = 0
|
new_count = 0
|
||||||
try:
|
try:
|
||||||
@@ -120,11 +120,21 @@ def process_leads():
|
|||||||
logger.info(f"Found {len(emails)} Tradingtwins emails.")
|
logger.info(f"Found {len(emails)} Tradingtwins emails.")
|
||||||
|
|
||||||
for email in emails:
|
for email in emails:
|
||||||
|
# ... (parsing logic remains same)
|
||||||
body = email.get('body', {}).get('content', '')
|
body = email.get('body', {}).get('content', '')
|
||||||
lead_data = parse_tradingtwins_html(body)
|
received_at_str = email.get('receivedDateTime')
|
||||||
|
|
||||||
# Add raw body for reference
|
# Convert ISO string to datetime object
|
||||||
|
received_at = None
|
||||||
|
if received_at_str:
|
||||||
|
try:
|
||||||
|
received_at = datetime.fromisoformat(received_at_str.replace('Z', '+00:00'))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
lead_data = parse_tradingtwins_html(body)
|
||||||
lead_data['raw_body'] = body
|
lead_data['raw_body'] = body
|
||||||
|
lead_data['received_at'] = received_at
|
||||||
|
|
||||||
company_name = lead_data.get('company')
|
company_name = lead_data.get('company')
|
||||||
if not company_name or company_name == '-':
|
if not company_name or company_name == '-':
|
||||||
@@ -132,17 +142,18 @@ def process_leads():
|
|||||||
lead_data['company'] = company_name
|
lead_data['company'] = company_name
|
||||||
|
|
||||||
if not company_name:
|
if not company_name:
|
||||||
logger.warning(f"Skipping email {email['id']}: No company or contact name found.")
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
logger.info(f"Ingesting Lead: {company_name} (ID: {lead_data.get('id')})")
|
lead_data['id'] = lead_data.get('source_id') or f"tt_{int(datetime.now().timestamp())}"
|
||||||
|
|
||||||
# Save to local DB (status=new)
|
|
||||||
if insert_lead(lead_data):
|
if insert_lead(lead_data):
|
||||||
logger.info(f" -> Successfully saved to DB.")
|
logger.info(f" -> Ingested: {company_name}")
|
||||||
new_count += 1
|
new_count += 1
|
||||||
else:
|
|
||||||
logger.info(f" -> Lead already exists (skipped).")
|
if new_count > 0 and auto_sync:
|
||||||
|
logger.info(f"Triggering auto-sync for {new_count} new leads...")
|
||||||
|
from enrich import run_sync
|
||||||
|
run_sync()
|
||||||
|
|
||||||
return new_count
|
return new_count
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user