[30388f42] Infrastructure Hardening: Repaired CE/Connector DB schema, fixed frontend styling build, implemented robust echo shield in worker v2.1.1, and integrated Lead Engine into gateway.
This commit is contained in:
91
ARCHIVE_legacy_scripts/test_opener_api.py
Normal file
91
ARCHIVE_legacy_scripts/test_opener_api.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import requests
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
# Load credentials from .env
|
||||
# Simple manual parser to avoid dependency on python-dotenv
|
||||
def load_env(path):
|
||||
if not os.path.exists(path):
|
||||
print(f"Warning: .env file not found at {path}")
|
||||
return
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
if line.strip() and not line.startswith('#'):
|
||||
key, val = line.strip().split('=', 1)
|
||||
os.environ.setdefault(key, val)
|
||||
|
||||
load_env('/app/.env')
|
||||
|
||||
API_USER = os.getenv("API_USER", "admin")
|
||||
API_PASS = os.getenv("API_PASSWORD", "gemini")
|
||||
CE_URL = "http://127.0.0.1:8000" # Target the local container (assuming port 8000 is mapped)
|
||||
TEST_CONTACT_ID = 1 # Therme Erding
|
||||
|
||||
def run_test():
|
||||
print("🚀 STARTING API-LEVEL E2E TEXT GENERATION TEST\n")
|
||||
|
||||
# --- Health Check ---
|
||||
print("Waiting for Company Explorer API to be ready...")
|
||||
for i in range(10):
|
||||
try:
|
||||
health_resp = requests.get(f"{CE_URL}/api/health", auth=(API_USER, API_PASS), timeout=2)
|
||||
if health_resp.status_code == 200:
|
||||
print("✅ API is ready.")
|
||||
break
|
||||
except requests.exceptions.RequestException:
|
||||
pass
|
||||
if i == 9:
|
||||
print("❌ API not ready after 20 seconds. Aborting.")
|
||||
return False
|
||||
time.sleep(2)
|
||||
|
||||
scenarios = [
|
||||
{"name": "Infrastructure Role", "job_title": "Facility Manager", "opener_field": "opener", "keyword": "Sicherheit"},
|
||||
{"name": "Operational Role", "job_title": "Leiter Badbetrieb", "opener_field": "opener_secondary", "keyword": "Gäste"}
|
||||
]
|
||||
|
||||
all_passed = True
|
||||
for s in scenarios:
|
||||
print(f"--- Testing: {s['name']} ---")
|
||||
endpoint = f"{CE_URL}/api/provision/superoffice-contact"
|
||||
payload = {
|
||||
"so_contact_id": TEST_CONTACT_ID,
|
||||
"job_title": s['job_title']
|
||||
}
|
||||
|
||||
try:
|
||||
resp = requests.post(endpoint, json=payload, auth=(API_USER, API_PASS))
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
# --- Assertions ---
|
||||
opener = data.get('opener')
|
||||
opener_sec = data.get('opener_secondary')
|
||||
|
||||
assert opener, "❌ FAIL: Primary opener is missing!"
|
||||
print(f" ✅ Primary Opener: '{opener}'")
|
||||
|
||||
assert opener_sec, "❌ FAIL: Secondary opener is missing!"
|
||||
print(f" ✅ Secondary Opener: '{opener_sec}'")
|
||||
|
||||
target_opener_text = data.get(s['opener_field'])
|
||||
assert s['keyword'].lower() in target_opener_text.lower(), f"❌ FAIL: Keyword '{s['keyword']}' not in '{s['opener_field']}'!"
|
||||
print(f" ✅ Keyword '{s['keyword']}' found in correct opener.")
|
||||
|
||||
print(f"--- ✅ PASSED: {s['name']} ---\\n")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ TEST FAILED: {e}")
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
print(f" Response: {e.response.text}")
|
||||
all_passed = False
|
||||
|
||||
return all_passed
|
||||
|
||||
if __name__ == "__main__":
|
||||
if run_test():
|
||||
print("🏁 All scenarios passed successfully!")
|
||||
else:
|
||||
print("🔥 Some scenarios failed.")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user