import requests import os import sys import time # Load credentials from .env # Simple manual parser to avoid dependency on python-dotenv def load_env(path): if not os.path.exists(path): print(f"Warning: .env file not found at {path}") return with open(path) as f: for line in f: if line.strip() and not line.startswith('#'): key, val = line.strip().split('=', 1) os.environ.setdefault(key, val) load_env('/app/.env') API_USER = os.getenv("API_USER", "admin") API_PASS = os.getenv("API_PASSWORD", "gemini") CE_URL = "http://127.0.0.1:8000" # Target the local container (assuming port 8000 is mapped) TEST_CONTACT_ID = 1 # Therme Erding def run_test(): print("πŸš€ STARTING API-LEVEL E2E TEXT GENERATION TEST\n") # --- Health Check --- print("Waiting for Company Explorer API to be ready...") for i in range(10): try: health_resp = requests.get(f"{CE_URL}/api/health", auth=(API_USER, API_PASS), timeout=2) if health_resp.status_code == 200: print("βœ… API is ready.") break except requests.exceptions.RequestException: pass if i == 9: print("❌ API not ready after 20 seconds. Aborting.") return False time.sleep(2) scenarios = [ {"name": "Infrastructure Role", "job_title": "Facility Manager", "opener_field": "opener", "keyword": "Sicherheit"}, {"name": "Operational Role", "job_title": "Leiter Badbetrieb", "opener_field": "opener_secondary", "keyword": "GΓ€ste"} ] all_passed = True for s in scenarios: print(f"--- Testing: {s['name']} ---") endpoint = f"{CE_URL}/api/provision/superoffice-contact" payload = { "so_contact_id": TEST_CONTACT_ID, "job_title": s['job_title'] } try: resp = requests.post(endpoint, json=payload, auth=(API_USER, API_PASS)) resp.raise_for_status() data = resp.json() # --- Assertions --- opener = data.get('opener') opener_sec = data.get('opener_secondary') assert opener, "❌ FAIL: Primary opener is missing!" print(f" βœ… Primary Opener: '{opener}'") assert opener_sec, "❌ FAIL: Secondary opener is missing!" print(f" βœ… Secondary Opener: '{opener_sec}'") target_opener_text = data.get(s['opener_field']) assert s['keyword'].lower() in target_opener_text.lower(), f"❌ FAIL: Keyword '{s['keyword']}' not in '{s['opener_field']}'!" print(f" βœ… Keyword '{s['keyword']}' found in correct opener.") print(f"--- βœ… PASSED: {s['name']} ---\\n") except Exception as e: print(f" ❌ TEST FAILED: {e}") if hasattr(e, 'response') and e.response is not None: print(f" Response: {e.response.text}") all_passed = False return all_passed if __name__ == "__main__": if run_test(): print("🏁 All scenarios passed successfully!") else: print("πŸ”₯ Some scenarios failed.") sys.exit(1)