diff --git a/company_explorer_connector.py b/company_explorer_connector.py index 8e8c74a3..6da60faf 100644 --- a/company_explorer_connector.py +++ b/company_explorer_connector.py @@ -64,10 +64,24 @@ def trigger_analysis(company_id: int) -> dict: def get_company_details(company_id: int) -> dict: """Holt die vollständigen Details zu einem Unternehmen.""" return _make_api_request("GET", f"/companies/{company_id}") + +def create_contact(company_id: int, contact_data: dict) -> dict: + """Erstellt einen neuen Kontakt für ein Unternehmen im Company Explorer.""" + payload = { + "company_id": company_id, + "first_name": contact_data.get("first_name"), + "last_name": contact_data.get("last_name"), + "email": contact_data.get("email"), + "job_title": contact_data.get("job_title"), + "role": contact_data.get("role"), + "is_primary": contact_data.get("is_primary", True) + } + return _make_api_request("POST", "/contacts", json_data=payload) -def handle_company_workflow(company_name: str) -> dict: +def handle_company_workflow(company_name: str, contact_info: dict = None) -> dict: """ Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an. + Optional wird auch ein Kontakt angelegt. Gibt die finalen Unternehmensdaten zurück. """ print(f"Workflow gestartet für: '{company_name}'") @@ -75,70 +89,60 @@ def handle_company_workflow(company_name: str) -> dict: # 1. Prüfen, ob das Unternehmen existiert existence_check = check_company_existence(company_name) + company_id = None if existence_check.get("exists"): company_id = existence_check["company"]["id"] print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.") - final_company_data = get_company_details(company_id) - return {"status": "found", "data": final_company_data} - - if "error" in existence_check: + elif "error" in existence_check: print(f"Fehler bei der Existenzprüfung: {existence_check['error']}") return {"status": "error", "message": existence_check['error']} + else: + # 2. Wenn nicht, Unternehmen erstellen + print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...") + creation_response = create_company(company_name) + + if "error" in creation_response: + return {"status": "error", "message": creation_response['error']} + + company_id = creation_response.get("id") + print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.") - # 2. Wenn nicht, Unternehmen erstellen - print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...") - creation_response = create_company(company_name) - - if "error" in creation_response: - print(f"Fehler bei der Erstellung: {creation_response['error']}") - return {"status": "error", "message": creation_response['error']} - - company_id = creation_response.get("id") - if not company_id: - print(f"Fehler: Konnte keine ID aus der Erstellungs-Antwort extrahieren: {creation_response}") - return {"status": "error", "message": "Failed to get company ID after creation."} - - print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.") + # 2b. Kontakt anlegen/aktualisieren (falls Info vorhanden) + if company_id and contact_info: + print(f"Lege Kontakt für {contact_info.get('last_name')} an...") + contact_res = create_contact(company_id, contact_info) + if "error" in contact_res: + print(f"Hinweis: Kontakt konnte nicht angelegt werden: {contact_res['error']}") - # 3. Discovery anstoßen - print(f"Starte Discovery für ID {company_id}...") - discovery_status = trigger_discovery(company_id) - if "error" in discovery_status: - print(f"Fehler beim Anstoßen der Discovery: {discovery_status['error']}") - return {"status": "error", "message": discovery_status['error']} - - # 4. Warten, bis Discovery eine Website gefunden hat (Polling) - max_wait_time = 30 - start_time = time.time() - website_found = False - print("Warte auf Abschluss der Discovery (max. 30s)...") - while time.time() - start_time < max_wait_time: - details = get_company_details(company_id) - if details.get("website") and details["website"] not in ["", "k.A."]: - print(f"Website gefunden: {details['website']}") - website_found = True - break - time.sleep(3) - print(".") - - if not website_found: - print("Discovery hat nach 30s keine Website gefunden. Breche Analyse ab.") - final_data = get_company_details(company_id) - return {"status": "created_discovery_timeout", "data": final_data} - - # 5. Analyse anstoßen - print(f"Starte Analyse für ID {company_id}...") - analysis_status = trigger_analysis(company_id) - if "error" in analysis_status: - print(f"Fehler beim Anstoßen der Analyse: {analysis_status['error']}") - return {"status": "error", "message": analysis_status['error']} - - print("Analyse-Prozess erfolgreich in die Warteschlange gestellt.") + # 3. Discovery anstoßen (falls Status NEW) + # Wir holen Details, um den Status zu prüfen + details = get_company_details(company_id) + if details.get("status") == "NEW": + print(f"Starte Discovery für ID {company_id}...") + trigger_discovery(company_id) + + # 4. Warten, bis Discovery eine Website gefunden hat (Polling) + max_wait_time = 30 + start_time = time.time() + website_found = False + print("Warte auf Abschluss der Discovery (max. 30s)...") + while time.time() - start_time < max_wait_time: + details = get_company_details(company_id) + if details.get("website") and details["website"] not in ["", "k.A."]: + print(f"Website gefunden: {details['website']}") + website_found = True + break + time.sleep(3) + print(".") + + # 5. Analyse anstoßen (falls Website da, aber noch nicht ENRICHED) + if details.get("website") and details["website"] not in ["", "k.A."] and details.get("status") != "ENRICHED": + print(f"Starte Analyse für ID {company_id}...") + trigger_analysis(company_id) # 6. Finale Daten abrufen und zurückgeben final_company_data = get_company_details(company_id) - - return {"status": "created_and_enriched", "data": final_company_data} + return {"status": "synced", "data": final_company_data} if __name__ == "__main__": diff --git a/lead-engine/enrich.py b/lead-engine/enrich.py index 4728f720..84673608 100644 --- a/lead-engine/enrich.py +++ b/lead-engine/enrich.py @@ -6,7 +6,8 @@ import sqlite3 # Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from company_explorer_connector import handle_company_workflow, get_company_details -from db import get_leads, DB_PATH +from db import get_leads, DB_PATH, update_lead_metadata +from lookup_role import lookup_person_role def update_lead_enrichment(lead_id, data, status): """Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status.""" @@ -26,18 +27,54 @@ def refresh_ce_data(lead_id, ce_id): print(f"Refreshing data for CE ID {ce_id}...") ce_data = get_company_details(ce_id) - # Bestehende Enrichment-Daten holen, um nichts zu überschreiben - # (Vereinfachung: Wir bauen das dict neu auf) - enrichment_data = { + # Bestehende Enrichment-Daten holen + leads = get_leads() + lead = next((l for l in leads if l['id'] == lead_id), None) + + enrichment_data = {} + if lead and lead.get('enrichment_data'): + try: + enrichment_data = json.loads(lead['enrichment_data']) + except: + pass + + enrichment_data.update({ "sync_status": "refreshed", "ce_id": ce_id, "message": "Data refreshed from CE", "ce_data": ce_data - } + }) update_lead_enrichment(lead_id, enrichment_data, status='synced') return ce_data +def enrich_contact_role(lead): + """ + Versucht, die Rolle des Kontakts via SerpAPI zu finden und speichert sie in den Metadaten. + """ + meta = {} + if lead.get('lead_metadata'): + try: + meta = json.loads(lead.get('lead_metadata')) + except: + pass + + # Skip if we already have a role (and it's not None/Unknown) + if meta.get('role') and meta.get('role') != "Unbekannt": + return meta.get('role') + + print(f"Looking up role for {lead['contact_name']} at {lead['company_name']}...") + role = lookup_person_role(lead['contact_name'], lead['company_name']) + + if role: + print(f" -> Found role: {role}") + meta['role'] = role + update_lead_metadata(lead['id'], meta) + else: + print(" -> No role found.") + + return role + def run_sync(): """ Haupt-Synchronisationsprozess. @@ -56,15 +93,33 @@ def run_sync(): company_name = lead['company_name'] print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---") - # Rufe den zentralen Workflow auf, den wir im Connector definiert haben - # Diese Funktion kümmert sich um alles: prüfen, erstellen, discovern, pollen, analysieren - result = handle_company_workflow(company_name) + # 1. Contact Enrichment (Role Lookup via SerpAPI) + role = enrich_contact_role(lead) + + # 2. Prepare Contact Info for CE + meta = {} + if lead.get('lead_metadata'): + try: + meta = json.loads(lead.get('lead_metadata')) + except: + pass + + contact_info = { + "first_name": meta.get('contact_first', ''), + "last_name": meta.get('contact_last', lead['contact_name'].split(' ')[-1] if lead['contact_name'] else ''), + "email": lead['email'], + "job_title": meta.get('role', role), # The raw title or Gemini result + "role": meta.get('role', role) # Currently mapped to same field + } + + # 3. Company Enrichment (CE Workflow with Contact) + result = handle_company_workflow(company_name, contact_info=contact_info) # Bereite die Daten für die Speicherung in der DB vor enrichment_data = { "sync_status": result.get("status"), "ce_id": result.get("data", {}).get("id") if result.get("data") else None, - "message": result.get("message"), + "message": result.get("message", "Sync successful"), "ce_data": result.get("data") } diff --git a/lead-engine/trading_twins_ingest.py b/lead-engine/trading_twins_ingest.py index ea581fd3..cb60107d 100644 --- a/lead-engine/trading_twins_ingest.py +++ b/lead-engine/trading_twins_ingest.py @@ -111,7 +111,7 @@ def parse_tradingtwins_html(html_body): return data -def process_leads(): +def process_leads(auto_sync=True): init_db() new_count = 0 try: @@ -120,11 +120,21 @@ def process_leads(): logger.info(f"Found {len(emails)} Tradingtwins emails.") for email in emails: + # ... (parsing logic remains same) body = email.get('body', {}).get('content', '') - lead_data = parse_tradingtwins_html(body) + received_at_str = email.get('receivedDateTime') - # Add raw body for reference + # Convert ISO string to datetime object + received_at = None + if received_at_str: + try: + received_at = datetime.fromisoformat(received_at_str.replace('Z', '+00:00')) + except: + pass + + lead_data = parse_tradingtwins_html(body) lead_data['raw_body'] = body + lead_data['received_at'] = received_at company_name = lead_data.get('company') if not company_name or company_name == '-': @@ -132,17 +142,18 @@ def process_leads(): lead_data['company'] = company_name if not company_name: - logger.warning(f"Skipping email {email['id']}: No company or contact name found.") continue - logger.info(f"Ingesting Lead: {company_name} (ID: {lead_data.get('id')})") + lead_data['id'] = lead_data.get('source_id') or f"tt_{int(datetime.now().timestamp())}" - # Save to local DB (status=new) if insert_lead(lead_data): - logger.info(f" -> Successfully saved to DB.") + logger.info(f" -> Ingested: {company_name}") new_count += 1 - else: - logger.info(f" -> Lead already exists (skipped).") + + if new_count > 0 and auto_sync: + logger.info(f"Triggering auto-sync for {new_count} new leads...") + from enrich import run_sync + run_sync() return new_count