Files
Brancheneinstufung2/company_explorer_connector.py

155 lines
6.4 KiB
Python

import requests
import os
import base64
import json
import time
# --- Konfiguration ---
BASE_URL = "http://192.168.178.6:8090/ce/api"
API_USER = os.getenv("COMPANY_EXPLORER_API_USER", "admin")
API_PASSWORD = os.getenv("COMPANY_EXPLORER_API_PASSWORD", "gemini")
def _make_api_request(method, endpoint, params=None, json_data=None):
"""Eine zentrale Hilfsfunktion für API-Anfragen."""
url = f"{BASE_URL}{endpoint}"
try:
response = requests.request(
method,
url,
auth=(API_USER, API_PASSWORD),
params=params,
json=json_data,
timeout=20
)
response.raise_for_status()
if response.status_code == 204 or not response.content:
return {}
return response.json()
except requests.exceptions.HTTPError as http_err:
return {"error": f"HTTP error occurred: {http_err} - {response.text}"}
except requests.exceptions.ConnectionError as conn_err:
return {"error": f"Connection error: {conn_err}."}
except requests.exceptions.Timeout as timeout_err:
return {"error": f"Timeout error: {timeout_err}."}
except requests.exceptions.RequestException as req_err:
return {"error": f"An unexpected error occurred: {req_err}"}
except json.JSONDecodeError:
return {"error": f"Failed to decode JSON from response: {response.text}"}
def check_company_existence(company_name: str) -> dict:
"""Prüft die Existenz eines Unternehmens."""
response = _make_api_request("GET", "/companies", params={"search": company_name})
if "error" in response:
return {"exists": False, "error": response["error"]}
if response.get("total", 0) > 0:
for company in response.get("items", []):
if company.get("name", "").lower() == company_name.lower():
return {"exists": True, "company": company}
return {"exists": False, "message": f"Company '{company_name}' not found."}
def create_company(company_name: str) -> dict:
"""Erstellt ein neues Unternehmen."""
return _make_api_request("POST", "/companies", json_data={"name": company_name, "country": "DE"})
def trigger_discovery(company_id: int) -> dict:
"""Startet den Discovery-Prozess."""
return _make_api_request("POST", "/enrich/discover", json_data={"company_id": company_id})
def trigger_analysis(company_id: int) -> dict:
"""Startet den Analyse-Prozess."""
return _make_api_request("POST", "/enrich/analyze", json_data={"company_id": company_id})
def get_company_details(company_id: int) -> dict:
"""Holt die vollständigen Details zu einem Unternehmen."""
return _make_api_request("GET", f"/companies/{company_id}")
def handle_company_workflow(company_name: str) -> dict:
"""
Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an.
Gibt die finalen Unternehmensdaten zurück.
"""
print(f"Workflow gestartet für: '{company_name}'")
# 1. Prüfen, ob das Unternehmen existiert
existence_check = check_company_existence(company_name)
if existence_check.get("exists"):
company_id = existence_check["company"]["id"]
print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.")
final_company_data = get_company_details(company_id)
return {"status": "found", "data": final_company_data}
if "error" in existence_check:
print(f"Fehler bei der Existenzprüfung: {existence_check['error']}")
return {"status": "error", "message": existence_check['error']}
# 2. Wenn nicht, Unternehmen erstellen
print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...")
creation_response = create_company(company_name)
if "error" in creation_response:
print(f"Fehler bei der Erstellung: {creation_response['error']}")
return {"status": "error", "message": creation_response['error']}
company_id = creation_response.get("id")
if not company_id:
print(f"Fehler: Konnte keine ID aus der Erstellungs-Antwort extrahieren: {creation_response}")
return {"status": "error", "message": "Failed to get company ID after creation."}
print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.")
# 3. Discovery anstoßen
print(f"Starte Discovery für ID {company_id}...")
discovery_status = trigger_discovery(company_id)
if "error" in discovery_status:
print(f"Fehler beim Anstoßen der Discovery: {discovery_status['error']}")
return {"status": "error", "message": discovery_status['error']}
# 4. Warten, bis Discovery eine Website gefunden hat (Polling)
max_wait_time = 30
start_time = time.time()
website_found = False
print("Warte auf Abschluss der Discovery (max. 30s)...")
while time.time() - start_time < max_wait_time:
details = get_company_details(company_id)
if details.get("website") and details["website"] not in ["", "k.A."]:
print(f"Website gefunden: {details['website']}")
website_found = True
break
time.sleep(3)
print(".")
if not website_found:
print("Discovery hat nach 30s keine Website gefunden. Breche Analyse ab.")
final_data = get_company_details(company_id)
return {"status": "created_discovery_timeout", "data": final_data}
# 5. Analyse anstoßen
print(f"Starte Analyse für ID {company_id}...")
analysis_status = trigger_analysis(company_id)
if "error" in analysis_status:
print(f"Fehler beim Anstoßen der Analyse: {analysis_status['error']}")
return {"status": "error", "message": analysis_status['error']}
print("Analyse-Prozess erfolgreich in die Warteschlange gestellt.")
# 6. Finale Daten abrufen und zurückgeben
final_company_data = get_company_details(company_id)
return {"status": "created_and_enriched", "data": final_company_data}
if __name__ == "__main__":
test_company_existing = "Robo-Planet GmbH"
test_company_new = f"Zufallsfirma {int(time.time())}"
print(f"--- Szenario 1: Test mit einem existierenden Unternehmen: '{test_company_existing}' ---")
result_existing = handle_company_workflow(test_company_existing)
print(json.dumps(result_existing, indent=2, ensure_ascii=False))
print(f"\n--- Szenario 2: Test mit einem neuen Unternehmen: '{test_company_new}' ---")
result_new = handle_company_workflow(test_company_new)
print(json.dumps(result_new, indent=2, ensure_ascii=False))