167 lines
7.2 KiB
Python
167 lines
7.2 KiB
Python
import requests
|
|
import os
|
|
import base64
|
|
import json
|
|
import time
|
|
|
|
# --- Konfiguration ---
|
|
# Default to internal Docker service URL
|
|
BASE_URL = os.getenv("COMPANY_EXPLORER_URL", "http://company-explorer:8000/api")
|
|
API_USER = os.getenv("COMPANY_EXPLORER_API_USER", "admin")
|
|
API_PASSWORD = os.getenv("COMPANY_EXPLORER_API_PASSWORD", "gemini")
|
|
|
|
def _make_api_request(method, endpoint, params=None, json_data=None):
|
|
"""Eine zentrale Hilfsfunktion für API-Anfragen."""
|
|
url = f"{BASE_URL}{endpoint}"
|
|
# Remove leading slash if BASE_URL ends with one or endpoint starts with one to avoid double slashes issues
|
|
# Actually, requests handles this mostly, but let's be clean.
|
|
# Assuming BASE_URL has no trailing slash and endpoint has leading slash
|
|
|
|
try:
|
|
# Auth is technically handled by Nginx, but if we go direct to port 8000,
|
|
# the backend might not enforce it unless configured.
|
|
# But we pass it anyway.
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
auth=(API_USER, API_PASSWORD),
|
|
params=params,
|
|
json=json_data,
|
|
timeout=20
|
|
)
|
|
response.raise_for_status()
|
|
if response.status_code == 204 or not response.content:
|
|
return {}
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as http_err:
|
|
return {"error": f"HTTP error occurred: {http_err} - {response.text}"}
|
|
except requests.exceptions.ConnectionError as conn_err:
|
|
return {"error": f"Connection error: {conn_err}."}
|
|
except requests.exceptions.Timeout as timeout_err:
|
|
return {"error": f"Timeout error: {timeout_err}."}
|
|
except requests.exceptions.RequestException as req_err:
|
|
return {"error": f"An unexpected error occurred: {req_err}"}
|
|
except json.JSONDecodeError:
|
|
return {"error": f"Failed to decode JSON from response: {response.text}"}
|
|
|
|
def check_company_existence(company_name: str) -> dict:
|
|
"""Prüft die Existenz eines Unternehmens."""
|
|
response = _make_api_request("GET", "/companies", params={"search": company_name})
|
|
if "error" in response:
|
|
return {"exists": False, "error": response["error"]}
|
|
|
|
if response.get("total", 0) > 0:
|
|
for company in response.get("items", []):
|
|
if company.get("name", "").lower() == company_name.lower():
|
|
return {"exists": True, "company": company}
|
|
|
|
return {"exists": False, "message": f"Company '{company_name}' not found."}
|
|
|
|
def create_company(company_name: str) -> dict:
|
|
"""Erstellt ein neues Unternehmen."""
|
|
return _make_api_request("POST", "/companies", json_data={"name": company_name, "country": "DE"})
|
|
|
|
def trigger_discovery(company_id: int) -> dict:
|
|
"""Startet den Discovery-Prozess."""
|
|
return _make_api_request("POST", "/enrich/discover", json_data={"company_id": company_id})
|
|
|
|
def trigger_analysis(company_id: int) -> dict:
|
|
"""Startet den Analyse-Prozess."""
|
|
return _make_api_request("POST", "/enrich/analyze", json_data={"company_id": company_id})
|
|
|
|
def get_company_details(company_id: int) -> dict:
|
|
"""Holt die vollständigen Details zu einem Unternehmen."""
|
|
return _make_api_request("GET", f"/companies/{company_id}")
|
|
|
|
def create_contact(company_id: int, contact_data: dict) -> dict:
|
|
"""Erstellt einen neuen Kontakt für ein Unternehmen im Company Explorer."""
|
|
payload = {
|
|
"company_id": company_id,
|
|
"first_name": contact_data.get("first_name"),
|
|
"last_name": contact_data.get("last_name"),
|
|
"email": contact_data.get("email"),
|
|
"job_title": contact_data.get("job_title"),
|
|
"role": contact_data.get("role"),
|
|
"is_primary": contact_data.get("is_primary", True)
|
|
}
|
|
return _make_api_request("POST", "/contacts", json_data=payload)
|
|
|
|
def handle_company_workflow(company_name: str, contact_info: dict = None) -> dict:
|
|
"""
|
|
Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an.
|
|
Optional wird auch ein Kontakt angelegt.
|
|
Gibt die finalen Unternehmensdaten zurück.
|
|
"""
|
|
print(f"Workflow gestartet für: '{company_name}'")
|
|
|
|
# 1. Prüfen, ob das Unternehmen existiert
|
|
existence_check = check_company_existence(company_name)
|
|
|
|
company_id = None
|
|
if existence_check.get("exists"):
|
|
company_id = existence_check["company"]["id"]
|
|
print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.")
|
|
elif "error" in existence_check:
|
|
print(f"Fehler bei der Existenzprüfung: {existence_check['error']}")
|
|
return {"status": "error", "message": existence_check['error']}
|
|
else:
|
|
# 2. Wenn nicht, Unternehmen erstellen
|
|
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
|
|
creation_response = create_company(company_name)
|
|
|
|
if "error" in creation_response:
|
|
return {"status": "error", "message": creation_response['error']}
|
|
|
|
company_id = creation_response.get("id")
|
|
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
|
|
|
|
# 2b. Kontakt anlegen/aktualisieren (falls Info vorhanden)
|
|
if company_id and contact_info:
|
|
print(f"Lege Kontakt für {contact_info.get('last_name')} an...")
|
|
contact_res = create_contact(company_id, contact_info)
|
|
if "error" in contact_res:
|
|
print(f"Hinweis: Kontakt konnte nicht angelegt werden: {contact_res['error']}")
|
|
|
|
# 3. Discovery anstoßen (falls Status NEW)
|
|
# Wir holen Details, um den Status zu prüfen
|
|
details = get_company_details(company_id)
|
|
if details.get("status") == "NEW":
|
|
print(f"Starte Discovery für ID {company_id}...")
|
|
trigger_discovery(company_id)
|
|
|
|
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
|
|
max_wait_time = 30
|
|
start_time = time.time()
|
|
website_found = False
|
|
print("Warte auf Abschluss der Discovery (max. 30s)...")
|
|
while time.time() - start_time < max_wait_time:
|
|
details = get_company_details(company_id)
|
|
if details.get("website") and details["website"] not in ["", "k.A."]:
|
|
print(f"Website gefunden: {details['website']}")
|
|
website_found = True
|
|
break
|
|
time.sleep(3)
|
|
print(".")
|
|
|
|
# 5. Analyse anstoßen (falls Website da, aber noch nicht ENRICHED)
|
|
if details.get("website") and details["website"] not in ["", "k.A."] and details.get("status") != "ENRICHED":
|
|
print(f"Starte Analyse für ID {company_id}...")
|
|
trigger_analysis(company_id)
|
|
|
|
# 6. Finale Daten abrufen und zurückgeben
|
|
final_company_data = get_company_details(company_id)
|
|
return {"status": "synced", "data": final_company_data}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_company_existing = "Robo-Planet GmbH"
|
|
test_company_new = f"Zufallsfirma {int(time.time())}"
|
|
|
|
print(f"--- Szenario 1: Test mit einem existierenden Unternehmen: '{test_company_existing}' ---")
|
|
result_existing = handle_company_workflow(test_company_existing)
|
|
print(json.dumps(result_existing, indent=2, ensure_ascii=False))
|
|
|
|
print(f"\n--- Szenario 2: Test mit einem neuen Unternehmen: '{test_company_new}' ---")
|
|
result_new = handle_company_workflow(test_company_new)
|
|
print(json.dumps(result_new, indent=2, ensure_ascii=False))
|