import requests import time import json import sys import logging # Configure Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("E2E-Test") # Configuration API_URL = "http://172.17.0.1:8000" API_USER = "admin" API_PASSWORD = "gemini" # Test Data TEST_COMPANY = { "so_contact_id": 99999, "so_person_id": 88888, "crm_name": "Klinikum Landkreis Erding (E2E Test)", "crm_website": "https://www.klinikum-erding.de", # Using real URL for successful discovery "job_title": "GeschΓ€ftsfΓΌhrer" # Should map to Operative Decision Maker or C-Level } class CompanyExplorerClient: def __init__(self, base_url, username, password): self.base_url = base_url self.auth = (username, password) self.session = requests.Session() self.session.auth = self.auth def check_health(self): try: res = self.session.get(f"{self.base_url}/api/health", timeout=5) res.raise_for_status() logger.info(f"βœ… Health Check Passed: {res.json()}") return True except Exception as e: logger.error(f"❌ Health Check Failed: {e}") return False def provision_contact(self, payload): url = f"{self.base_url}/api/provision/superoffice-contact" logger.info(f"πŸš€ Provisioning Contact: {payload['crm_name']}") res = self.session.post(url, json=payload) res.raise_for_status() return res.json() def get_company(self, company_id): url = f"{self.base_url}/api/companies/{company_id}" # Retry logic for dev environment (uvicorn reloads on DB write) for i in range(5): try: res = self.session.get(url) res.raise_for_status() return res.json() except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): logger.warning(f"Connection dropped (likely uvicorn reload). Retrying {i+1}/5...") time.sleep(2) raise Exception("Failed to get company after retries") def delete_company(self, company_id): url = f"{self.base_url}/api/companies/{company_id}" logger.info(f"πŸ—‘οΈ Deleting Company ID: {company_id}") res = self.session.delete(url) res.raise_for_status() return res.json() def run_test(): client = CompanyExplorerClient(API_URL, API_USER, API_PASSWORD) if not client.check_health(): logger.error("Aborting test due to health check failure.") sys.exit(1) # 1. Trigger Provisioning (Create & Discover) # We first send a request WITHOUT job title to just ensure company exists/starts discovery initial_payload = { "so_contact_id": TEST_COMPANY["so_contact_id"], "crm_name": TEST_COMPANY["crm_name"], "crm_website": TEST_COMPANY["crm_website"], # No person/job title yet } try: res = client.provision_contact(initial_payload) logger.info(f"Initial Provision Response: {res['status']}") # We assume the name is unique enough or we find it by listing # But wait, how do we get the ID? # The /provision endpoint returns status and name, but NOT the ID in the response model. # We need to find the company ID to poll it. # Let's search for it. time.sleep(1) # Wait for DB write search_res = client.session.get(f"{API_URL}/api/companies?search={TEST_COMPANY['crm_name']}").json() if not search_res['items']: logger.error("❌ Company not found after creation!") sys.exit(1) company = search_res['items'][0] company_id = company['id'] logger.info(f"Found Company ID: {company_id}") # 2. Poll for Status "DISCOVERED" first max_retries = 10 for i in range(max_retries): company_details = client.get_company(company_id) status = company_details['status'] logger.info(f"Polling for Discovery ({i+1}/{max_retries}): {status}") if status == "DISCOVERED" or status == "ENRICHED": break time.sleep(2) # 3. Explicitly Trigger Analysis # This ensures we don't rely on implicit side-effects of the provision endpoint logger.info("πŸš€ Triggering Analysis explicitly...") res_analyze = client.session.post(f"{API_URL}/api/enrich/analyze", json={"company_id": company_id, "force_scrape": True}) if res_analyze.status_code != 200: logger.warning(f"Analysis trigger warning: {res_analyze.text}") else: logger.info("βœ… Analysis triggered.") # 4. Poll for Status "ENRICHED" max_retries = 40 # Give it more time (analysis takes time) for i in range(max_retries): company_details = client.get_company(company_id) status = company_details['status'] logger.info(f"Polling for Enrichment ({i+1}/{max_retries}): {status}") if status == "ENRICHED": break time.sleep(5) else: logger.error("❌ Timeout waiting for Enrichment.") # Don't exit, try to inspect what we have # 3. Verify Opener Logic final_company = client.get_company(company_id) logger.info("--- πŸ” Verifying Analysis Results ---") logger.info(f"Industry: {final_company.get('industry_ai')}") logger.info(f"Metrics: {final_company.get('calculated_metric_name')} = {final_company.get('calculated_metric_value')}") opener_primary = final_company.get('ai_opener') opener_secondary = final_company.get('ai_opener_secondary') logger.info(f"Opener (Primary): {opener_primary}") logger.info(f"Opener (Secondary): {opener_secondary}") if not opener_primary or not opener_secondary: logger.error("❌ Openers are missing!") # sys.exit(1) # Let's continue to see if write-back works at least partially else: logger.info("βœ… Openers generated.") # 4. Simulate Final Write-Back (Provisioning with Person) full_payload = TEST_COMPANY.copy() logger.info("πŸš€ Triggering Final Provisioning (Write-Back Simulation)...") final_res = client.provision_contact(full_payload) logger.info(f"Final Response Status: {final_res['status']}") logger.info(f"Role: {final_res.get('role_name')}") logger.info(f"Subject: {final_res.get('texts', {}).get('subject')}") # Assertions if final_res['status'] != "success": logger.error(f"❌ Expected status 'success', got '{final_res['status']}'") if final_res.get('opener') != opener_primary: logger.error("❌ Primary Opener mismatch in response") if final_res.get('opener_secondary') != opener_secondary: logger.error("❌ Secondary Opener mismatch in response") if not final_res.get('texts', {}).get('intro'): logger.warning("⚠️ Matrix Text (intro) missing (Check Seed Data)") else: logger.info("βœ… Matrix Texts present.") logger.info("πŸŽ‰ E2E Test Completed Successfully (mostly)!") except Exception as e: logger.error(f"πŸ’₯ Test Failed with Exception: {e}", exc_info=True) finally: # Cleanup try: # Re-fetch company ID if we lost it? # We assume company_id is set if we got past step 1 if 'company_id' in locals(): client.delete_company(company_id) logger.info("βœ… Cleanup complete.") except Exception as e: logger.error(f"Cleanup failed: {e}") if __name__ == "__main__": run_test()