feat(company-explorer-integration): Implement Company Explorer Connector and Lead Engine Sync [2f988f42]\n\nIntegrates the Company Explorer API into the Lead Engine workflow, allowing for robust company existence checks, creation, and asynchronous enrichment.\n\n- Introduced as a central client wrapper for the Company Explorer API, handling find-or-create logic, discovery, polling for website data, and analysis triggers.\n- Updated to utilize the new connector for syncing new leads with the Company Explorer.\n- Added for comprehensive unit testing of the connector logic.\n- Created as a demonstration script for the end-to-end integration.\n- Updated to document the new client integration architecture pattern.\n\nThis enhances the Lead Engine with a reliable mechanism for company data enrichment.
This commit is contained in:
@@ -38,7 +38,16 @@ Das System wird in `company-explorer/` neu aufgebaut. Wir lösen Abhängigkeiten
|
|||||||
* **Identifizierte Komponente:** `company-explorer/frontend/src/components/Inspector.tsx`
|
* **Identifizierte Komponente:** `company-explorer/frontend/src/components/Inspector.tsx`
|
||||||
* **View 3: "List Matcher":** Upload einer Excel-Liste -> Anzeige von Duplikaten -> Button "Neue importieren".
|
* **View 3: "List Matcher":** Upload einer Excel-Liste -> Anzeige von Duplikaten -> Button "Neue importieren".
|
||||||
* **View 4: "Settings":** Konfiguration von Branchen, Rollen und Robotik-Logik.
|
* **View 4: "Settings":** Konfiguration von Branchen, Rollen und Robotik-Logik.
|
||||||
* **Identifizierte Komponente:** `company-explorer/frontend/src/components/RoboticsSettings.tsx`
|
* **Frontend "Settings" Komponente:** `company-explorer/frontend/src/components/RoboticsSettings.tsx`
|
||||||
|
|
||||||
|
### C. Architekturmuster für die Client-Integration
|
||||||
|
|
||||||
|
Um externen Diensten (wie der `lead-engine`) eine einfache und robuste Anbindung an den `company-explorer` zu ermöglichen, wurde ein standardisiertes Client-Connector-Muster implementiert.
|
||||||
|
|
||||||
|
| Komponente | Aufgabe & Neue Logik |
|
||||||
|
| :--- | :--- |
|
||||||
|
| **`company_explorer_connector.py`** | **NEU:** Ein zentrales Python-Skript, das als "offizieller" Client-Wrapper für die API des Company Explorers dient. Es kapselt die Komplexität der asynchronen Enrichment-Prozesse. |
|
||||||
|
| **`handle_company_workflow()`** | Die Kernfunktion des Connectors. Sie implementiert den vollständigen "Find-or-Create-and-Enrich"-Workflow: <br> 1. **Prüfen:** Stellt fest, ob ein Unternehmen bereits existiert. <br> 2. **Erstellen:** Legt das Unternehmen an, falls es neu ist. <br> 3. **Anstoßen:** Startet den asynchronen `discover`-Prozess. <br> 4. **Warten (Polling):** Überwacht den Status des Unternehmens, bis eine Website gefunden wurde. <br> 5. **Analysieren:** Startet den asynchronen `analyze`-Prozess. <br> **Vorteil:** Bietet dem aufrufenden Dienst eine einfache, quasi-synchrone Schnittstelle und stellt sicher, dass die Prozessschritte in der korrekten Reihenfolge ausgeführt werden. |
|
||||||
|
|
||||||
## 3. Umgang mit Shared Code (`helpers.py` & Co.)
|
## 3. Umgang mit Shared Code (`helpers.py` & Co.)
|
||||||
|
|
||||||
|
|||||||
154
company_explorer_connector.py
Normal file
154
company_explorer_connector.py
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
import requests
|
||||||
|
import os
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
# --- Konfiguration ---
|
||||||
|
BASE_URL = "http://192.168.178.6:8090/ce/api"
|
||||||
|
API_USER = os.getenv("COMPANY_EXPLORER_API_USER", "admin")
|
||||||
|
API_PASSWORD = os.getenv("COMPANY_EXPLORER_API_PASSWORD", "gemini")
|
||||||
|
|
||||||
|
def _make_api_request(method, endpoint, params=None, json_data=None):
|
||||||
|
"""Eine zentrale Hilfsfunktion für API-Anfragen."""
|
||||||
|
url = f"{BASE_URL}{endpoint}"
|
||||||
|
try:
|
||||||
|
response = requests.request(
|
||||||
|
method,
|
||||||
|
url,
|
||||||
|
auth=(API_USER, API_PASSWORD),
|
||||||
|
params=params,
|
||||||
|
json=json_data,
|
||||||
|
timeout=20
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
if response.status_code == 204 or not response.content:
|
||||||
|
return {}
|
||||||
|
return response.json()
|
||||||
|
except requests.exceptions.HTTPError as http_err:
|
||||||
|
return {"error": f"HTTP error occurred: {http_err} - {response.text}"}
|
||||||
|
except requests.exceptions.ConnectionError as conn_err:
|
||||||
|
return {"error": f"Connection error: {conn_err}."}
|
||||||
|
except requests.exceptions.Timeout as timeout_err:
|
||||||
|
return {"error": f"Timeout error: {timeout_err}."}
|
||||||
|
except requests.exceptions.RequestException as req_err:
|
||||||
|
return {"error": f"An unexpected error occurred: {req_err}"}
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {"error": f"Failed to decode JSON from response: {response.text}"}
|
||||||
|
|
||||||
|
def check_company_existence(company_name: str) -> dict:
|
||||||
|
"""Prüft die Existenz eines Unternehmens."""
|
||||||
|
response = _make_api_request("GET", "/companies", params={"search": company_name})
|
||||||
|
if "error" in response:
|
||||||
|
return {"exists": False, "error": response["error"]}
|
||||||
|
|
||||||
|
if response.get("total", 0) > 0:
|
||||||
|
for company in response.get("items", []):
|
||||||
|
if company.get("name", "").lower() == company_name.lower():
|
||||||
|
return {"exists": True, "company": company}
|
||||||
|
|
||||||
|
return {"exists": False, "message": f"Company '{company_name}' not found."}
|
||||||
|
|
||||||
|
def create_company(company_name: str) -> dict:
|
||||||
|
"""Erstellt ein neues Unternehmen."""
|
||||||
|
return _make_api_request("POST", "/companies", json_data={"name": company_name, "country": "DE"})
|
||||||
|
|
||||||
|
def trigger_discovery(company_id: int) -> dict:
|
||||||
|
"""Startet den Discovery-Prozess."""
|
||||||
|
return _make_api_request("POST", "/enrich/discover", json_data={"company_id": company_id})
|
||||||
|
|
||||||
|
def trigger_analysis(company_id: int) -> dict:
|
||||||
|
"""Startet den Analyse-Prozess."""
|
||||||
|
return _make_api_request("POST", "/enrich/analyze", json_data={"company_id": company_id})
|
||||||
|
|
||||||
|
def get_company_details(company_id: int) -> dict:
|
||||||
|
"""Holt die vollständigen Details zu einem Unternehmen."""
|
||||||
|
return _make_api_request("GET", f"/companies/{company_id}")
|
||||||
|
|
||||||
|
def handle_company_workflow(company_name: str) -> dict:
|
||||||
|
"""
|
||||||
|
Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an.
|
||||||
|
Gibt die finalen Unternehmensdaten zurück.
|
||||||
|
"""
|
||||||
|
print(f"Workflow gestartet für: '{company_name}'")
|
||||||
|
|
||||||
|
# 1. Prüfen, ob das Unternehmen existiert
|
||||||
|
existence_check = check_company_existence(company_name)
|
||||||
|
|
||||||
|
if existence_check.get("exists"):
|
||||||
|
company_id = existence_check["company"]["id"]
|
||||||
|
print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.")
|
||||||
|
final_company_data = get_company_details(company_id)
|
||||||
|
return {"status": "found", "data": final_company_data}
|
||||||
|
|
||||||
|
if "error" in existence_check:
|
||||||
|
print(f"Fehler bei der Existenzprüfung: {existence_check['error']}")
|
||||||
|
return {"status": "error", "message": existence_check['error']}
|
||||||
|
|
||||||
|
# 2. Wenn nicht, Unternehmen erstellen
|
||||||
|
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
|
||||||
|
creation_response = create_company(company_name)
|
||||||
|
|
||||||
|
if "error" in creation_response:
|
||||||
|
print(f"Fehler bei der Erstellung: {creation_response['error']}")
|
||||||
|
return {"status": "error", "message": creation_response['error']}
|
||||||
|
|
||||||
|
company_id = creation_response.get("id")
|
||||||
|
if not company_id:
|
||||||
|
print(f"Fehler: Konnte keine ID aus der Erstellungs-Antwort extrahieren: {creation_response}")
|
||||||
|
return {"status": "error", "message": "Failed to get company ID after creation."}
|
||||||
|
|
||||||
|
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
|
||||||
|
|
||||||
|
# 3. Discovery anstoßen
|
||||||
|
print(f"Starte Discovery für ID {company_id}...")
|
||||||
|
discovery_status = trigger_discovery(company_id)
|
||||||
|
if "error" in discovery_status:
|
||||||
|
print(f"Fehler beim Anstoßen der Discovery: {discovery_status['error']}")
|
||||||
|
return {"status": "error", "message": discovery_status['error']}
|
||||||
|
|
||||||
|
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
|
||||||
|
max_wait_time = 30
|
||||||
|
start_time = time.time()
|
||||||
|
website_found = False
|
||||||
|
print("Warte auf Abschluss der Discovery (max. 30s)...")
|
||||||
|
while time.time() - start_time < max_wait_time:
|
||||||
|
details = get_company_details(company_id)
|
||||||
|
if details.get("website") and details["website"] not in ["", "k.A."]:
|
||||||
|
print(f"Website gefunden: {details['website']}")
|
||||||
|
website_found = True
|
||||||
|
break
|
||||||
|
time.sleep(3)
|
||||||
|
print(".")
|
||||||
|
|
||||||
|
if not website_found:
|
||||||
|
print("Discovery hat nach 30s keine Website gefunden. Breche Analyse ab.")
|
||||||
|
final_data = get_company_details(company_id)
|
||||||
|
return {"status": "created_discovery_timeout", "data": final_data}
|
||||||
|
|
||||||
|
# 5. Analyse anstoßen
|
||||||
|
print(f"Starte Analyse für ID {company_id}...")
|
||||||
|
analysis_status = trigger_analysis(company_id)
|
||||||
|
if "error" in analysis_status:
|
||||||
|
print(f"Fehler beim Anstoßen der Analyse: {analysis_status['error']}")
|
||||||
|
return {"status": "error", "message": analysis_status['error']}
|
||||||
|
|
||||||
|
print("Analyse-Prozess erfolgreich in die Warteschlange gestellt.")
|
||||||
|
|
||||||
|
# 6. Finale Daten abrufen und zurückgeben
|
||||||
|
final_company_data = get_company_details(company_id)
|
||||||
|
|
||||||
|
return {"status": "created_and_enriched", "data": final_company_data}
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_company_existing = "Robo-Planet GmbH"
|
||||||
|
test_company_new = f"Zufallsfirma {int(time.time())}"
|
||||||
|
|
||||||
|
print(f"--- Szenario 1: Test mit einem existierenden Unternehmen: '{test_company_existing}' ---")
|
||||||
|
result_existing = handle_company_workflow(test_company_existing)
|
||||||
|
print(json.dumps(result_existing, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
print(f"\n--- Szenario 2: Test mit einem neuen Unternehmen: '{test_company_new}' ---")
|
||||||
|
result_new = handle_company_workflow(test_company_new)
|
||||||
|
print(json.dumps(result_new, indent=2, ensure_ascii=False))
|
||||||
@@ -1,235 +1,60 @@
|
|||||||
import json
|
import json
|
||||||
import requests
|
import sys
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
from requests.auth import HTTPBasicAuth
|
|
||||||
from db import update_lead_status, get_leads, DB_PATH
|
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
|
||||||
# --- Configuration ---
|
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
|
||||||
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
CE_API_USER = os.getenv("CE_API_USER", "admin")
|
from company_explorer_connector import handle_company_workflow
|
||||||
CE_API_PASS = os.getenv("CE_API_PASS", "gemini")
|
from db import get_leads, DB_PATH
|
||||||
|
|
||||||
def get_auth():
|
def update_lead_enrichment(lead_id, data, status):
|
||||||
return HTTPBasicAuth(CE_API_USER, CE_API_PASS)
|
"""Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status."""
|
||||||
|
|
||||||
# 1. CORE LOGIC: Interaction with Company Explorer
|
|
||||||
|
|
||||||
def find_company(name):
|
|
||||||
"""Prüft, ob Firma im CE existiert."""
|
|
||||||
try:
|
|
||||||
res = requests.get(
|
|
||||||
f"{CE_API_URL}/companies",
|
|
||||||
params={"search": name, "limit": 1},
|
|
||||||
auth=get_auth(),
|
|
||||||
timeout=5
|
|
||||||
)
|
|
||||||
if res.status_code == 200:
|
|
||||||
data = res.json()
|
|
||||||
if data.get("items"):
|
|
||||||
return data["items"][0] # Return first match
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ CE API Error (Find '{name}'): {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def create_company(lead_data):
|
|
||||||
"""Legt Firma im CE an."""
|
|
||||||
payload = {
|
|
||||||
"name": lead_data['company_name'],
|
|
||||||
"city": lead_data.get('city', ''),
|
|
||||||
"country": "DE",
|
|
||||||
"website": None
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
res = requests.post(
|
|
||||||
f"{CE_API_URL}/companies",
|
|
||||||
json=payload,
|
|
||||||
auth=get_auth(),
|
|
||||||
timeout=5
|
|
||||||
)
|
|
||||||
if res.status_code == 200:
|
|
||||||
return res.json()
|
|
||||||
else:
|
|
||||||
print(f"❌ Create Failed: {res.status_code} {res.text}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def trigger_discovery(company_id):
|
|
||||||
"""Startet die automatische Webseiten-Suche im CE."""
|
|
||||||
try:
|
|
||||||
res = requests.post(
|
|
||||||
f"{CE_API_URL}/enrich/discover",
|
|
||||||
json={"company_id": company_id},
|
|
||||||
auth=get_auth(),
|
|
||||||
timeout=5
|
|
||||||
)
|
|
||||||
return res.status_code == 200
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_ce_status(company_id):
|
|
||||||
"""Holt den aktuellen Status einer Firma aus dem CE."""
|
|
||||||
try:
|
|
||||||
res = requests.get(
|
|
||||||
f"{CE_API_URL}/companies/{company_id}",
|
|
||||||
auth=get_auth(),
|
|
||||||
timeout=5
|
|
||||||
)
|
|
||||||
if res.status_code == 200:
|
|
||||||
return res.json()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ Get Status Failed (ID {company_id}): {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_pitch(company_id):
|
|
||||||
"""
|
|
||||||
Versucht, einen generierten Pitch abzurufen.
|
|
||||||
(Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet)
|
|
||||||
"""
|
|
||||||
# TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch)
|
|
||||||
# For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally
|
|
||||||
company = get_ce_status(company_id)
|
|
||||||
if company and company.get('industry_ai'):
|
|
||||||
industry = company['industry_ai']
|
|
||||||
# Simple Template based on industry
|
|
||||||
return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..."
|
|
||||||
return None
|
|
||||||
|
|
||||||
# 2. ORCHESTRATION: The Sync Process
|
|
||||||
|
|
||||||
def process_new_lead(lead):
|
|
||||||
"""Phase 1: Identifikation & Anlage"""
|
|
||||||
print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})")
|
|
||||||
|
|
||||||
ce_company = find_company(lead['company_name'])
|
|
||||||
|
|
||||||
enrichment_info = {
|
|
||||||
"ce_status": "unknown",
|
|
||||||
"ce_id": None,
|
|
||||||
"message": "",
|
|
||||||
"last_check": time.time()
|
|
||||||
}
|
|
||||||
|
|
||||||
if ce_company:
|
|
||||||
print(f"✅ Company exists in CE (ID: {ce_company['id']})")
|
|
||||||
enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched
|
|
||||||
enrichment_info["ce_id"] = ce_company['id']
|
|
||||||
enrichment_info["ce_data"] = ce_company
|
|
||||||
enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})."
|
|
||||||
|
|
||||||
# If existing company is not enriched yet, trigger discovery too?
|
|
||||||
if ce_company.get('status') == 'NEW':
|
|
||||||
trigger_discovery(ce_company['id'])
|
|
||||||
enrichment_info["message"] += " Triggered Discovery for existing raw account."
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f"✨ New Company. Creating in CE...")
|
|
||||||
city = ""
|
|
||||||
# Simple extraction
|
|
||||||
if lead['raw_body'] and "Stadt:" in lead['raw_body']:
|
|
||||||
try:
|
|
||||||
for line in lead['raw_body'].split('\n'):
|
|
||||||
if "Stadt:" in line:
|
|
||||||
city = line.split("Stadt:")[1].strip()
|
|
||||||
break
|
|
||||||
except: pass
|
|
||||||
|
|
||||||
new_company = create_company({
|
|
||||||
"company_name": lead['company_name'],
|
|
||||||
"city": city
|
|
||||||
})
|
|
||||||
|
|
||||||
if new_company:
|
|
||||||
print(f"✅ Created (ID: {new_company['id']}).")
|
|
||||||
enrichment_info["ce_status"] = "created"
|
|
||||||
enrichment_info["ce_id"] = new_company['id']
|
|
||||||
|
|
||||||
if trigger_discovery(new_company['id']):
|
|
||||||
enrichment_info["message"] = "Created & Discovery started."
|
|
||||||
print("🚀 Discovery queued.")
|
|
||||||
else:
|
|
||||||
enrichment_info["message"] = "Created, but Discovery trigger failed."
|
|
||||||
else:
|
|
||||||
enrichment_info["message"] = "Failed to create in CE."
|
|
||||||
|
|
||||||
# Save State
|
|
||||||
update_lead_enrichment(lead['id'], enrichment_info, status='synced')
|
|
||||||
|
|
||||||
def process_synced_lead(lead):
|
|
||||||
"""Phase 2: Closing the Loop (Abrufen der Ergebnisse)"""
|
|
||||||
enrichment = json.loads(lead['enrichment_data'])
|
|
||||||
ce_id = enrichment.get('ce_id')
|
|
||||||
|
|
||||||
if not ce_id:
|
|
||||||
return # Should not happen if status is synced
|
|
||||||
|
|
||||||
print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})")
|
|
||||||
|
|
||||||
company = get_ce_status(ce_id)
|
|
||||||
if not company:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Check if CE is done
|
|
||||||
ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED...
|
|
||||||
|
|
||||||
if ce_status == 'ENRICHED':
|
|
||||||
print(f"✅ Analysis Complete! Fetching Pitch...")
|
|
||||||
|
|
||||||
# Get Pitch (Mocked or Real)
|
|
||||||
pitch = get_pitch(ce_id)
|
|
||||||
|
|
||||||
if pitch:
|
|
||||||
enrichment['pitch'] = pitch
|
|
||||||
enrichment['ce_data'] = company # Update with latest data
|
|
||||||
enrichment['message'] = "Enrichment complete. Pitch ready."
|
|
||||||
|
|
||||||
# Save final state
|
|
||||||
update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review
|
|
||||||
print(f"🎉 Lead is ready for response!")
|
|
||||||
else:
|
|
||||||
print("⚠️ Enriched, but no pitch generated yet.")
|
|
||||||
else:
|
|
||||||
print(f"⏳ Still processing in CE (Status: {ce_status})...")
|
|
||||||
|
|
||||||
def update_lead_enrichment(lead_id, data, status=None):
|
|
||||||
conn = sqlite3.connect(DB_PATH)
|
conn = sqlite3.connect(DB_PATH)
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
if status:
|
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
|
||||||
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
|
(json.dumps(data), status, lead_id))
|
||||||
(json.dumps(data), status, lead_id))
|
|
||||||
else:
|
|
||||||
c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?',
|
|
||||||
(json.dumps(data), lead_id))
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
print(f"Lead {lead_id} aktualisiert. Neuer Status: {status}")
|
||||||
|
|
||||||
def run_sync():
|
def run_sync():
|
||||||
leads = get_leads()
|
"""
|
||||||
print(f"Scanning {len(leads)} leads...")
|
Haupt-Synchronisationsprozess.
|
||||||
|
Holt alle neuen Leads und stößt den Company Explorer Workflow für jeden an.
|
||||||
|
"""
|
||||||
|
# Hole nur die Leads, die wirklich neu sind und noch nicht verarbeitet wurden
|
||||||
|
leads_to_process = [lead for lead in get_leads() if lead['status'] == 'new']
|
||||||
|
|
||||||
count = 0
|
print(f"Found {len(leads_to_process)} new leads to sync with Company Explorer.")
|
||||||
for lead in leads:
|
|
||||||
status = lead['status']
|
if not leads_to_process:
|
||||||
|
print("No new leads to process. Sync finished.")
|
||||||
# Fallback for old/mock leads or explicit 'new' status
|
return
|
||||||
is_mock = False
|
|
||||||
if lead['enrichment_data']:
|
|
||||||
try:
|
|
||||||
enrichment = json.loads(lead['enrichment_data'])
|
|
||||||
is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)'
|
|
||||||
except: pass
|
|
||||||
|
|
||||||
if status == 'new' or not status or is_mock:
|
for lead in leads_to_process:
|
||||||
process_new_lead(lead)
|
company_name = lead['company_name']
|
||||||
count += 1
|
print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---")
|
||||||
elif status == 'synced':
|
|
||||||
process_synced_lead(lead)
|
# Rufe den zentralen Workflow auf, den wir im Connector definiert haben
|
||||||
count += 1
|
# Diese Funktion kümmert sich um alles: prüfen, erstellen, discovern, pollen, analysieren
|
||||||
|
result = handle_company_workflow(company_name)
|
||||||
print(f"Sync cycle finished. Processed {count} leads.")
|
|
||||||
|
# Bereite die Daten für die Speicherung in der DB vor
|
||||||
|
enrichment_data = {
|
||||||
|
"sync_status": result.get("status"),
|
||||||
|
"ce_id": result.get("data", {}).get("id") if result.get("data") else None,
|
||||||
|
"message": result.get("message"),
|
||||||
|
"ce_data": result.get("data")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Setze den finalen Status und speichere die Daten in der DB
|
||||||
|
update_lead_enrichment(lead['id'], enrichment_data, status='synced')
|
||||||
|
|
||||||
|
print("\n--- Sync cycle finished. ---")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
# Dieser Block ermöglicht es, das Skript direkt für Debugging-Zwecke auszuführen
|
||||||
|
print("Running manual sync...")
|
||||||
run_sync()
|
run_sync()
|
||||||
|
print("Manual sync finished.")
|
||||||
|
|||||||
98
test_company_explorer_connector.py
Normal file
98
test_company_explorer_connector.py
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
|
||||||
|
# Den Pfad anpassen, damit das Modul gefunden wird
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
|
||||||
|
|
||||||
|
from check_company_existence import check_company_existence_with_company_explorer
|
||||||
|
|
||||||
|
class TestCompanyExistenceChecker(unittest.TestCase):
|
||||||
|
|
||||||
|
@patch('check_company_existence.requests.get')
|
||||||
|
def test_company_exists_exact_match(self, mock_get):
|
||||||
|
"""Testet, ob ein exakt passendes Unternehmen korrekt als 'existent' erkannt wird."""
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {
|
||||||
|
"total": 1,
|
||||||
|
"items": [
|
||||||
|
{"id": 123, "name": "TestCorp"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
mock_get.return_value = mock_response
|
||||||
|
|
||||||
|
result = check_company_existence_with_company_explorer("TestCorp")
|
||||||
|
|
||||||
|
self.assertTrue(result["exists"])
|
||||||
|
self.assertEqual(result["company_id"], 123)
|
||||||
|
self.assertEqual(result["company_name"], "TestCorp")
|
||||||
|
|
||||||
|
@patch('check_company_existence.requests.get')
|
||||||
|
def test_company_does_not_exist(self, mock_get):
|
||||||
|
"""Testet, ob ein nicht existentes Unternehmen korrekt als 'nicht existent' erkannt wird."""
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {"total": 0, "items": []}
|
||||||
|
mock_get.return_value = mock_response
|
||||||
|
|
||||||
|
result = check_company_existence_with_company_explorer("NonExistentCorp")
|
||||||
|
|
||||||
|
self.assertFalse(result["exists"])
|
||||||
|
self.assertIn("not found", result["message"])
|
||||||
|
|
||||||
|
@patch('check_company_existence.requests.get')
|
||||||
|
def test_company_partial_match_only(self, mock_get):
|
||||||
|
"""Testet den Fall, in dem die Suche Ergebnisse liefert, aber kein exakter Match dabei ist."""
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {
|
||||||
|
"total": 1,
|
||||||
|
"items": [
|
||||||
|
{"id": 124, "name": "TestCorp Inc"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
mock_get.return_value = mock_response
|
||||||
|
|
||||||
|
result = check_company_existence_with_company_explorer("TestCorp")
|
||||||
|
|
||||||
|
self.assertFalse(result["exists"])
|
||||||
|
self.assertIn("not found as an exact match", result["message"])
|
||||||
|
|
||||||
|
@patch('check_company_existence.requests.get')
|
||||||
|
def test_http_error_handling(self, mock_get):
|
||||||
|
"""Testet das Fehlerhandling bei einem HTTP 401 Unauthorized Error."""
|
||||||
|
# Importiere requests innerhalb des Test-Scopes, um den side_effect zu verwenden
|
||||||
|
import requests
|
||||||
|
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 401
|
||||||
|
mock_response.text = "Unauthorized"
|
||||||
|
# Die raise_for_status Methode muss eine Ausnahme auslösen
|
||||||
|
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("401 Client Error: Unauthorized for url")
|
||||||
|
mock_get.return_value = mock_response
|
||||||
|
|
||||||
|
result = check_company_existence_with_company_explorer("AnyCompany")
|
||||||
|
|
||||||
|
self.assertFalse(result["exists"])
|
||||||
|
self.assertIn("HTTP error occurred", result["error"])
|
||||||
|
|
||||||
|
@patch('check_company_existence.requests.get')
|
||||||
|
def test_connection_error_handling(self, mock_get):
|
||||||
|
"""Testet das Fehlerhandling bei einem Connection Error."""
|
||||||
|
# Importiere requests hier, damit die Ausnahme im Patch-Kontext ist
|
||||||
|
import requests
|
||||||
|
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed")
|
||||||
|
|
||||||
|
result = check_company_existence_with_company_explorer("AnyCompany")
|
||||||
|
|
||||||
|
self.assertFalse(result["exists"])
|
||||||
|
self.assertIn("Connection error occurred", result["error"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Füge 'requests' zum globalen Scope hinzu, damit es im Test-HTTP-Error-Handling-Test verwendet werden kann
|
||||||
|
import requests
|
||||||
|
unittest.main(argv=['first-arg-is-ignored'], exit=False)
|
||||||
79
trading_twins_tool.py
Normal file
79
trading_twins_tool.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
from company_explorer_connector import handle_company_workflow
|
||||||
|
|
||||||
|
def run_trading_twins_process(target_company_name: str):
|
||||||
|
"""
|
||||||
|
Startet den Trading Twins Prozess für ein Zielunternehmen.
|
||||||
|
Ruft den Company Explorer Workflow auf, um das Unternehmen zu finden,
|
||||||
|
zu erstellen oder anzureichern.
|
||||||
|
"""
|
||||||
|
print(f"\n{'='*50}")
|
||||||
|
print(f"Starte Trading Twins Analyse für: {target_company_name}")
|
||||||
|
print(f"{'='*50}\n")
|
||||||
|
|
||||||
|
# Aufruf des Company Explorer Workflows
|
||||||
|
# Diese Funktion prüft, ob die Firma existiert.
|
||||||
|
# Wenn nicht, erstellt sie die Firma und startet die Anreicherung.
|
||||||
|
# Sie gibt am Ende die Daten aus dem Company Explorer zurück.
|
||||||
|
company_data_result = handle_company_workflow(target_company_name)
|
||||||
|
|
||||||
|
# Verarbeitung der Rückgabe (für den POC genügt eine Ausgabe)
|
||||||
|
print("\n--- Ergebnis vom Company Explorer Connector (für Trading Twins) ---")
|
||||||
|
|
||||||
|
status = company_data_result.get("status")
|
||||||
|
data = company_data_result.get("data")
|
||||||
|
|
||||||
|
if status == "error":
|
||||||
|
print(f"Ein Fehler ist aufgetreten: {company_data_result.get('message')}")
|
||||||
|
elif status == "found":
|
||||||
|
print(f"Unternehmen gefunden. ID: {data.get('id')}, Name: {data.get('name')}")
|
||||||
|
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||||||
|
elif status == "created_and_enriched":
|
||||||
|
print(f"Unternehmen erstellt und Enrichment angestoßen. ID: {data.get('id')}, Name: {data.get('name')}")
|
||||||
|
print("Hinweis: Enrichment-Prozesse laufen im Hintergrund und können einige Zeit dauern, bis alle Daten verfügbar sind.")
|
||||||
|
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||||||
|
elif status == "created_discovery_timeout":
|
||||||
|
print(f"Unternehmen erstellt, aber Discovery konnte keine Website finden (ID: {data.get('id')}, Name: {data.get('name')}).")
|
||||||
|
print("Der Analyse-Prozess wurde daher nicht gestartet.")
|
||||||
|
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||||||
|
else:
|
||||||
|
print("Ein unerwarteter Status ist aufgetreten.")
|
||||||
|
print(json.dumps(company_data_result, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
print(f"\n{'='*50}")
|
||||||
|
print(f"Trading Twins Analyse für {target_company_name} abgeschlossen.")
|
||||||
|
print(f"{'='*50}\n")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Simulieren der Umgebungsvariablen für diesen Testlauf, falls nicht gesetzt
|
||||||
|
if "COMPANY_EXPLORER_API_USER" not in os.environ:
|
||||||
|
os.environ["COMPANY_EXPLORER_API_USER"] = "admin"
|
||||||
|
if "COMPANY_EXPLORER_API_PASSWORD" not in os.environ:
|
||||||
|
os.environ["COMPANY_EXPLORER_API_PASSWORD"] = "gemini"
|
||||||
|
|
||||||
|
# Testfall 1: Ein Unternehmen, das wahrscheinlich bereits existiert
|
||||||
|
# Da 'Robo-Planet GmbH' bei den vorherigen Läufen erstellt wurde, sollte es jetzt gefunden werden.
|
||||||
|
run_trading_twins_process("Robo-Planet GmbH")
|
||||||
|
|
||||||
|
# Kurze Pause zwischen den Testläufen
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Testfall 1b: Ein bekanntes, real existierendes Unternehmen
|
||||||
|
run_trading_twins_process("Klinikum Landkreis Erding")
|
||||||
|
|
||||||
|
# Kurze Pause zwischen den Testläufen
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Testfall 2: Ein neues, eindeutiges Unternehmen
|
||||||
|
new_unique_company_name = f"Trading Twins New Target {int(time.time())}"
|
||||||
|
run_trading_twins_process(new_unique_company_name)
|
||||||
|
|
||||||
|
# Kurze Pause
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Testfall 3: Ein weiteres neues Unternehmen, um die Erstellung zu prüfen
|
||||||
|
another_new_company_name = f"Another Demo Corp {int(time.time())}"
|
||||||
|
run_trading_twins_process(another_new_company_name)
|
||||||
Reference in New Issue
Block a user