From e4bf1094585c3c5d650dec98f999803f0f07f452 Mon Sep 17 00:00:00 2001 From: Floke Date: Sun, 1 Feb 2026 19:55:12 +0000 Subject: [PATCH] feat(company-explorer-integration): Implement Company Explorer Connector and Lead Engine Sync [2f988f42]\n\nIntegrates the Company Explorer API into the Lead Engine workflow, allowing for robust company existence checks, creation, and asynchronous enrichment.\n\n- Introduced as a central client wrapper for the Company Explorer API, handling find-or-create logic, discovery, polling for website data, and analysis triggers.\n- Updated to utilize the new connector for syncing new leads with the Company Explorer.\n- Added for comprehensive unit testing of the connector logic.\n- Created as a demonstration script for the end-to-end integration.\n- Updated to document the new client integration architecture pattern.\n\nThis enhances the Lead Engine with a reliable mechanism for company data enrichment. --- MIGRATION_PLAN.md | 11 +- company_explorer_connector.py | 154 +++++++++++++++++ lead-engine/enrich.py | 263 +++++------------------------ test_company_explorer_connector.py | 98 +++++++++++ trading_twins_tool.py | 79 +++++++++ 5 files changed, 385 insertions(+), 220 deletions(-) create mode 100644 company_explorer_connector.py create mode 100644 test_company_explorer_connector.py create mode 100644 trading_twins_tool.py diff --git a/MIGRATION_PLAN.md b/MIGRATION_PLAN.md index 8f0f08f6..0da6a432 100644 --- a/MIGRATION_PLAN.md +++ b/MIGRATION_PLAN.md @@ -38,7 +38,16 @@ Das System wird in `company-explorer/` neu aufgebaut. Wir lösen Abhängigkeiten * **Identifizierte Komponente:** `company-explorer/frontend/src/components/Inspector.tsx` * **View 3: "List Matcher":** Upload einer Excel-Liste -> Anzeige von Duplikaten -> Button "Neue importieren". * **View 4: "Settings":** Konfiguration von Branchen, Rollen und Robotik-Logik. - * **Identifizierte Komponente:** `company-explorer/frontend/src/components/RoboticsSettings.tsx` + * **Frontend "Settings" Komponente:** `company-explorer/frontend/src/components/RoboticsSettings.tsx` + +### C. Architekturmuster für die Client-Integration + +Um externen Diensten (wie der `lead-engine`) eine einfache und robuste Anbindung an den `company-explorer` zu ermöglichen, wurde ein standardisiertes Client-Connector-Muster implementiert. + +| Komponente | Aufgabe & Neue Logik | +| :--- | :--- | +| **`company_explorer_connector.py`** | **NEU:** Ein zentrales Python-Skript, das als "offizieller" Client-Wrapper für die API des Company Explorers dient. Es kapselt die Komplexität der asynchronen Enrichment-Prozesse. | +| **`handle_company_workflow()`** | Die Kernfunktion des Connectors. Sie implementiert den vollständigen "Find-or-Create-and-Enrich"-Workflow:
1. **Prüfen:** Stellt fest, ob ein Unternehmen bereits existiert.
2. **Erstellen:** Legt das Unternehmen an, falls es neu ist.
3. **Anstoßen:** Startet den asynchronen `discover`-Prozess.
4. **Warten (Polling):** Überwacht den Status des Unternehmens, bis eine Website gefunden wurde.
5. **Analysieren:** Startet den asynchronen `analyze`-Prozess.
**Vorteil:** Bietet dem aufrufenden Dienst eine einfache, quasi-synchrone Schnittstelle und stellt sicher, dass die Prozessschritte in der korrekten Reihenfolge ausgeführt werden. | ## 3. Umgang mit Shared Code (`helpers.py` & Co.) diff --git a/company_explorer_connector.py b/company_explorer_connector.py new file mode 100644 index 00000000..8e8c74a3 --- /dev/null +++ b/company_explorer_connector.py @@ -0,0 +1,154 @@ +import requests +import os +import base64 +import json +import time + +# --- Konfiguration --- +BASE_URL = "http://192.168.178.6:8090/ce/api" +API_USER = os.getenv("COMPANY_EXPLORER_API_USER", "admin") +API_PASSWORD = os.getenv("COMPANY_EXPLORER_API_PASSWORD", "gemini") + +def _make_api_request(method, endpoint, params=None, json_data=None): + """Eine zentrale Hilfsfunktion für API-Anfragen.""" + url = f"{BASE_URL}{endpoint}" + try: + response = requests.request( + method, + url, + auth=(API_USER, API_PASSWORD), + params=params, + json=json_data, + timeout=20 + ) + response.raise_for_status() + if response.status_code == 204 or not response.content: + return {} + return response.json() + except requests.exceptions.HTTPError as http_err: + return {"error": f"HTTP error occurred: {http_err} - {response.text}"} + except requests.exceptions.ConnectionError as conn_err: + return {"error": f"Connection error: {conn_err}."} + except requests.exceptions.Timeout as timeout_err: + return {"error": f"Timeout error: {timeout_err}."} + except requests.exceptions.RequestException as req_err: + return {"error": f"An unexpected error occurred: {req_err}"} + except json.JSONDecodeError: + return {"error": f"Failed to decode JSON from response: {response.text}"} + +def check_company_existence(company_name: str) -> dict: + """Prüft die Existenz eines Unternehmens.""" + response = _make_api_request("GET", "/companies", params={"search": company_name}) + if "error" in response: + return {"exists": False, "error": response["error"]} + + if response.get("total", 0) > 0: + for company in response.get("items", []): + if company.get("name", "").lower() == company_name.lower(): + return {"exists": True, "company": company} + + return {"exists": False, "message": f"Company '{company_name}' not found."} + +def create_company(company_name: str) -> dict: + """Erstellt ein neues Unternehmen.""" + return _make_api_request("POST", "/companies", json_data={"name": company_name, "country": "DE"}) + +def trigger_discovery(company_id: int) -> dict: + """Startet den Discovery-Prozess.""" + return _make_api_request("POST", "/enrich/discover", json_data={"company_id": company_id}) + +def trigger_analysis(company_id: int) -> dict: + """Startet den Analyse-Prozess.""" + return _make_api_request("POST", "/enrich/analyze", json_data={"company_id": company_id}) + +def get_company_details(company_id: int) -> dict: + """Holt die vollständigen Details zu einem Unternehmen.""" + return _make_api_request("GET", f"/companies/{company_id}") + +def handle_company_workflow(company_name: str) -> dict: + """ + Haupt-Workflow: Prüft, erstellt und reichert ein Unternehmen an. + Gibt die finalen Unternehmensdaten zurück. + """ + print(f"Workflow gestartet für: '{company_name}'") + + # 1. Prüfen, ob das Unternehmen existiert + existence_check = check_company_existence(company_name) + + if existence_check.get("exists"): + company_id = existence_check["company"]["id"] + print(f"Unternehmen '{company_name}' (ID: {company_id}) existiert bereits.") + final_company_data = get_company_details(company_id) + return {"status": "found", "data": final_company_data} + + if "error" in existence_check: + print(f"Fehler bei der Existenzprüfung: {existence_check['error']}") + return {"status": "error", "message": existence_check['error']} + + # 2. Wenn nicht, Unternehmen erstellen + print(f"Unternehmen '{company_name}' nicht gefunden. Erstelle es...") + creation_response = create_company(company_name) + + if "error" in creation_response: + print(f"Fehler bei der Erstellung: {creation_response['error']}") + return {"status": "error", "message": creation_response['error']} + + company_id = creation_response.get("id") + if not company_id: + print(f"Fehler: Konnte keine ID aus der Erstellungs-Antwort extrahieren: {creation_response}") + return {"status": "error", "message": "Failed to get company ID after creation."} + + print(f"Unternehmen '{company_name}' erfolgreich mit ID {company_id} erstellt.") + + # 3. Discovery anstoßen + print(f"Starte Discovery für ID {company_id}...") + discovery_status = trigger_discovery(company_id) + if "error" in discovery_status: + print(f"Fehler beim Anstoßen der Discovery: {discovery_status['error']}") + return {"status": "error", "message": discovery_status['error']} + + # 4. Warten, bis Discovery eine Website gefunden hat (Polling) + max_wait_time = 30 + start_time = time.time() + website_found = False + print("Warte auf Abschluss der Discovery (max. 30s)...") + while time.time() - start_time < max_wait_time: + details = get_company_details(company_id) + if details.get("website") and details["website"] not in ["", "k.A."]: + print(f"Website gefunden: {details['website']}") + website_found = True + break + time.sleep(3) + print(".") + + if not website_found: + print("Discovery hat nach 30s keine Website gefunden. Breche Analyse ab.") + final_data = get_company_details(company_id) + return {"status": "created_discovery_timeout", "data": final_data} + + # 5. Analyse anstoßen + print(f"Starte Analyse für ID {company_id}...") + analysis_status = trigger_analysis(company_id) + if "error" in analysis_status: + print(f"Fehler beim Anstoßen der Analyse: {analysis_status['error']}") + return {"status": "error", "message": analysis_status['error']} + + print("Analyse-Prozess erfolgreich in die Warteschlange gestellt.") + + # 6. Finale Daten abrufen und zurückgeben + final_company_data = get_company_details(company_id) + + return {"status": "created_and_enriched", "data": final_company_data} + + +if __name__ == "__main__": + test_company_existing = "Robo-Planet GmbH" + test_company_new = f"Zufallsfirma {int(time.time())}" + + print(f"--- Szenario 1: Test mit einem existierenden Unternehmen: '{test_company_existing}' ---") + result_existing = handle_company_workflow(test_company_existing) + print(json.dumps(result_existing, indent=2, ensure_ascii=False)) + + print(f"\n--- Szenario 2: Test mit einem neuen Unternehmen: '{test_company_new}' ---") + result_new = handle_company_workflow(test_company_new) + print(json.dumps(result_new, indent=2, ensure_ascii=False)) diff --git a/lead-engine/enrich.py b/lead-engine/enrich.py index fff920a0..015af4fc 100644 --- a/lead-engine/enrich.py +++ b/lead-engine/enrich.py @@ -1,235 +1,60 @@ import json -import requests +import sys import os -import time -from requests.auth import HTTPBasicAuth -from db import update_lead_status, get_leads, DB_PATH import sqlite3 -# --- Configuration --- -CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api") -CE_API_USER = os.getenv("CE_API_USER", "admin") -CE_API_PASS = os.getenv("CE_API_PASS", "gemini") +# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from company_explorer_connector import handle_company_workflow +from db import get_leads, DB_PATH -def get_auth(): - return HTTPBasicAuth(CE_API_USER, CE_API_PASS) - -# 1. CORE LOGIC: Interaction with Company Explorer - -def find_company(name): - """Prüft, ob Firma im CE existiert.""" - try: - res = requests.get( - f"{CE_API_URL}/companies", - params={"search": name, "limit": 1}, - auth=get_auth(), - timeout=5 - ) - if res.status_code == 200: - data = res.json() - if data.get("items"): - return data["items"][0] # Return first match - except Exception as e: - print(f"❌ CE API Error (Find '{name}'): {e}") - return None - -def create_company(lead_data): - """Legt Firma im CE an.""" - payload = { - "name": lead_data['company_name'], - "city": lead_data.get('city', ''), - "country": "DE", - "website": None - } - try: - res = requests.post( - f"{CE_API_URL}/companies", - json=payload, - auth=get_auth(), - timeout=5 - ) - if res.status_code == 200: - return res.json() - else: - print(f"❌ Create Failed: {res.status_code} {res.text}") - except Exception as e: - print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}") - return None - -def trigger_discovery(company_id): - """Startet die automatische Webseiten-Suche im CE.""" - try: - res = requests.post( - f"{CE_API_URL}/enrich/discover", - json={"company_id": company_id}, - auth=get_auth(), - timeout=5 - ) - return res.status_code == 200 - except Exception as e: - print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}") - return False - -def get_ce_status(company_id): - """Holt den aktuellen Status einer Firma aus dem CE.""" - try: - res = requests.get( - f"{CE_API_URL}/companies/{company_id}", - auth=get_auth(), - timeout=5 - ) - if res.status_code == 200: - return res.json() - except Exception as e: - print(f"❌ Get Status Failed (ID {company_id}): {e}") - return None - -def get_pitch(company_id): - """ - Versucht, einen generierten Pitch abzurufen. - (Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet) - """ - # TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch) - # For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally - company = get_ce_status(company_id) - if company and company.get('industry_ai'): - industry = company['industry_ai'] - # Simple Template based on industry - return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..." - return None - -# 2. ORCHESTRATION: The Sync Process - -def process_new_lead(lead): - """Phase 1: Identifikation & Anlage""" - print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})") - - ce_company = find_company(lead['company_name']) - - enrichment_info = { - "ce_status": "unknown", - "ce_id": None, - "message": "", - "last_check": time.time() - } - - if ce_company: - print(f"✅ Company exists in CE (ID: {ce_company['id']})") - enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched - enrichment_info["ce_id"] = ce_company['id'] - enrichment_info["ce_data"] = ce_company - enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})." - - # If existing company is not enriched yet, trigger discovery too? - if ce_company.get('status') == 'NEW': - trigger_discovery(ce_company['id']) - enrichment_info["message"] += " Triggered Discovery for existing raw account." - - else: - print(f"✨ New Company. Creating in CE...") - city = "" - # Simple extraction - if lead['raw_body'] and "Stadt:" in lead['raw_body']: - try: - for line in lead['raw_body'].split('\n'): - if "Stadt:" in line: - city = line.split("Stadt:")[1].strip() - break - except: pass - - new_company = create_company({ - "company_name": lead['company_name'], - "city": city - }) - - if new_company: - print(f"✅ Created (ID: {new_company['id']}).") - enrichment_info["ce_status"] = "created" - enrichment_info["ce_id"] = new_company['id'] - - if trigger_discovery(new_company['id']): - enrichment_info["message"] = "Created & Discovery started." - print("🚀 Discovery queued.") - else: - enrichment_info["message"] = "Created, but Discovery trigger failed." - else: - enrichment_info["message"] = "Failed to create in CE." - - # Save State - update_lead_enrichment(lead['id'], enrichment_info, status='synced') - -def process_synced_lead(lead): - """Phase 2: Closing the Loop (Abrufen der Ergebnisse)""" - enrichment = json.loads(lead['enrichment_data']) - ce_id = enrichment.get('ce_id') - - if not ce_id: - return # Should not happen if status is synced - - print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})") - - company = get_ce_status(ce_id) - if not company: - return - - # Check if CE is done - ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED... - - if ce_status == 'ENRICHED': - print(f"✅ Analysis Complete! Fetching Pitch...") - - # Get Pitch (Mocked or Real) - pitch = get_pitch(ce_id) - - if pitch: - enrichment['pitch'] = pitch - enrichment['ce_data'] = company # Update with latest data - enrichment['message'] = "Enrichment complete. Pitch ready." - - # Save final state - update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review - print(f"🎉 Lead is ready for response!") - else: - print("⚠️ Enriched, but no pitch generated yet.") - else: - print(f"⏳ Still processing in CE (Status: {ce_status})...") - -def update_lead_enrichment(lead_id, data, status=None): +def update_lead_enrichment(lead_id, data, status): + """Aktualisiert einen Lead in der Datenbank mit neuen Enrichment-Daten und einem neuen Status.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() - if status: - c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?', - (json.dumps(data), status, lead_id)) - else: - c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?', - (json.dumps(data), lead_id)) + c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?', + (json.dumps(data), status, lead_id)) conn.commit() conn.close() + print(f"Lead {lead_id} aktualisiert. Neuer Status: {status}") def run_sync(): - leads = get_leads() - print(f"Scanning {len(leads)} leads...") + """ + Haupt-Synchronisationsprozess. + Holt alle neuen Leads und stößt den Company Explorer Workflow für jeden an. + """ + # Hole nur die Leads, die wirklich neu sind und noch nicht verarbeitet wurden + leads_to_process = [lead for lead in get_leads() if lead['status'] == 'new'] - count = 0 - for lead in leads: - status = lead['status'] - - # Fallback for old/mock leads or explicit 'new' status - is_mock = False - if lead['enrichment_data']: - try: - enrichment = json.loads(lead['enrichment_data']) - is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)' - except: pass + print(f"Found {len(leads_to_process)} new leads to sync with Company Explorer.") + + if not leads_to_process: + print("No new leads to process. Sync finished.") + return - if status == 'new' or not status or is_mock: - process_new_lead(lead) - count += 1 - elif status == 'synced': - process_synced_lead(lead) - count += 1 - - print(f"Sync cycle finished. Processed {count} leads.") + for lead in leads_to_process: + company_name = lead['company_name'] + print(f"\n--- Processing Lead ID: {lead['id']}, Company: '{company_name}' ---") + + # Rufe den zentralen Workflow auf, den wir im Connector definiert haben + # Diese Funktion kümmert sich um alles: prüfen, erstellen, discovern, pollen, analysieren + result = handle_company_workflow(company_name) + + # Bereite die Daten für die Speicherung in der DB vor + enrichment_data = { + "sync_status": result.get("status"), + "ce_id": result.get("data", {}).get("id") if result.get("data") else None, + "message": result.get("message"), + "ce_data": result.get("data") + } + + # Setze den finalen Status und speichere die Daten in der DB + update_lead_enrichment(lead['id'], enrichment_data, status='synced') + + print("\n--- Sync cycle finished. ---") if __name__ == "__main__": + # Dieser Block ermöglicht es, das Skript direkt für Debugging-Zwecke auszuführen + print("Running manual sync...") run_sync() + print("Manual sync finished.") diff --git a/test_company_explorer_connector.py b/test_company_explorer_connector.py new file mode 100644 index 00000000..d1ed8a78 --- /dev/null +++ b/test_company_explorer_connector.py @@ -0,0 +1,98 @@ +import unittest +from unittest.mock import patch, MagicMock +import os +import requests + +# Den Pfad anpassen, damit das Modul gefunden wird +import sys +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) + +from check_company_existence import check_company_existence_with_company_explorer + +class TestCompanyExistenceChecker(unittest.TestCase): + + @patch('check_company_existence.requests.get') + def test_company_exists_exact_match(self, mock_get): + """Testet, ob ein exakt passendes Unternehmen korrekt als 'existent' erkannt wird.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "total": 1, + "items": [ + {"id": 123, "name": "TestCorp"} + ] + } + mock_get.return_value = mock_response + + result = check_company_existence_with_company_explorer("TestCorp") + + self.assertTrue(result["exists"]) + self.assertEqual(result["company_id"], 123) + self.assertEqual(result["company_name"], "TestCorp") + + @patch('check_company_existence.requests.get') + def test_company_does_not_exist(self, mock_get): + """Testet, ob ein nicht existentes Unternehmen korrekt als 'nicht existent' erkannt wird.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"total": 0, "items": []} + mock_get.return_value = mock_response + + result = check_company_existence_with_company_explorer("NonExistentCorp") + + self.assertFalse(result["exists"]) + self.assertIn("not found", result["message"]) + + @patch('check_company_existence.requests.get') + def test_company_partial_match_only(self, mock_get): + """Testet den Fall, in dem die Suche Ergebnisse liefert, aber kein exakter Match dabei ist.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "total": 1, + "items": [ + {"id": 124, "name": "TestCorp Inc"} + ] + } + mock_get.return_value = mock_response + + result = check_company_existence_with_company_explorer("TestCorp") + + self.assertFalse(result["exists"]) + self.assertIn("not found as an exact match", result["message"]) + + @patch('check_company_existence.requests.get') + def test_http_error_handling(self, mock_get): + """Testet das Fehlerhandling bei einem HTTP 401 Unauthorized Error.""" + # Importiere requests innerhalb des Test-Scopes, um den side_effect zu verwenden + import requests + + mock_response = MagicMock() + mock_response.status_code = 401 + mock_response.text = "Unauthorized" + # Die raise_for_status Methode muss eine Ausnahme auslösen + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("401 Client Error: Unauthorized for url") + mock_get.return_value = mock_response + + result = check_company_existence_with_company_explorer("AnyCompany") + + self.assertFalse(result["exists"]) + self.assertIn("HTTP error occurred", result["error"]) + + @patch('check_company_existence.requests.get') + def test_connection_error_handling(self, mock_get): + """Testet das Fehlerhandling bei einem Connection Error.""" + # Importiere requests hier, damit die Ausnahme im Patch-Kontext ist + import requests + mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") + + result = check_company_existence_with_company_explorer("AnyCompany") + + self.assertFalse(result["exists"]) + self.assertIn("Connection error occurred", result["error"]) + + +if __name__ == '__main__': + # Füge 'requests' zum globalen Scope hinzu, damit es im Test-HTTP-Error-Handling-Test verwendet werden kann + import requests + unittest.main(argv=['first-arg-is-ignored'], exit=False) diff --git a/trading_twins_tool.py b/trading_twins_tool.py new file mode 100644 index 00000000..773cc18b --- /dev/null +++ b/trading_twins_tool.py @@ -0,0 +1,79 @@ +import json +import time +import os +from company_explorer_connector import handle_company_workflow + +def run_trading_twins_process(target_company_name: str): + """ + Startet den Trading Twins Prozess für ein Zielunternehmen. + Ruft den Company Explorer Workflow auf, um das Unternehmen zu finden, + zu erstellen oder anzureichern. + """ + print(f"\n{'='*50}") + print(f"Starte Trading Twins Analyse für: {target_company_name}") + print(f"{'='*50}\n") + + # Aufruf des Company Explorer Workflows + # Diese Funktion prüft, ob die Firma existiert. + # Wenn nicht, erstellt sie die Firma und startet die Anreicherung. + # Sie gibt am Ende die Daten aus dem Company Explorer zurück. + company_data_result = handle_company_workflow(target_company_name) + + # Verarbeitung der Rückgabe (für den POC genügt eine Ausgabe) + print("\n--- Ergebnis vom Company Explorer Connector (für Trading Twins) ---") + + status = company_data_result.get("status") + data = company_data_result.get("data") + + if status == "error": + print(f"Ein Fehler ist aufgetreten: {company_data_result.get('message')}") + elif status == "found": + print(f"Unternehmen gefunden. ID: {data.get('id')}, Name: {data.get('name')}") + print(json.dumps(data, indent=2, ensure_ascii=False)) + elif status == "created_and_enriched": + print(f"Unternehmen erstellt und Enrichment angestoßen. ID: {data.get('id')}, Name: {data.get('name')}") + print("Hinweis: Enrichment-Prozesse laufen im Hintergrund und können einige Zeit dauern, bis alle Daten verfügbar sind.") + print(json.dumps(data, indent=2, ensure_ascii=False)) + elif status == "created_discovery_timeout": + print(f"Unternehmen erstellt, aber Discovery konnte keine Website finden (ID: {data.get('id')}, Name: {data.get('name')}).") + print("Der Analyse-Prozess wurde daher nicht gestartet.") + print(json.dumps(data, indent=2, ensure_ascii=False)) + else: + print("Ein unerwarteter Status ist aufgetreten.") + print(json.dumps(company_data_result, indent=2, ensure_ascii=False)) + + print(f"\n{'='*50}") + print(f"Trading Twins Analyse für {target_company_name} abgeschlossen.") + print(f"{'='*50}\n") + + +if __name__ == "__main__": + # Simulieren der Umgebungsvariablen für diesen Testlauf, falls nicht gesetzt + if "COMPANY_EXPLORER_API_USER" not in os.environ: + os.environ["COMPANY_EXPLORER_API_USER"] = "admin" + if "COMPANY_EXPLORER_API_PASSWORD" not in os.environ: + os.environ["COMPANY_EXPLORER_API_PASSWORD"] = "gemini" + + # Testfall 1: Ein Unternehmen, das wahrscheinlich bereits existiert + # Da 'Robo-Planet GmbH' bei den vorherigen Läufen erstellt wurde, sollte es jetzt gefunden werden. + run_trading_twins_process("Robo-Planet GmbH") + + # Kurze Pause zwischen den Testläufen + time.sleep(5) + + # Testfall 1b: Ein bekanntes, real existierendes Unternehmen + run_trading_twins_process("Klinikum Landkreis Erding") + + # Kurze Pause zwischen den Testläufen + time.sleep(5) + + # Testfall 2: Ein neues, eindeutiges Unternehmen + new_unique_company_name = f"Trading Twins New Target {int(time.time())}" + run_trading_twins_process(new_unique_company_name) + + # Kurze Pause + time.sleep(5) + + # Testfall 3: Ein weiteres neues Unternehmen, um die Erstellung zu prüfen + another_new_company_name = f"Another Demo Corp {int(time.time())}" + run_trading_twins_process(another_new_company_name)