diff --git a/competitor-analysis-app/Dockerfile b/competitor-analysis-app/Dockerfile index a51741be..6d4a5a67 100644 --- a/competitor-analysis-app/Dockerfile +++ b/competitor-analysis-app/Dockerfile @@ -20,6 +20,9 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt +# Copy the orchestrator script +COPY competitor_analysis_orchestrator.py . + # Copy the build from the first stage COPY --from=build-stage /app/dist ./dist diff --git a/connector-superoffice/run_tests.sh b/connector-superoffice/run_tests.sh index 4110e8a4..47cab2a9 100644 --- a/connector-superoffice/run_tests.sh +++ b/connector-superoffice/run_tests.sh @@ -1,3 +1,3 @@ #!/bin/bash export PYTHONPATH=$PYTHONPATH:/app -python3 -m pytest -v /app/tests/test_webhook.py +python3 -m pytest -v /app/tests/ diff --git a/connector-superoffice/superoffice_client.py b/connector-superoffice/superoffice_client.py index 3c5abd4c..63112732 100644 --- a/connector-superoffice/superoffice_client.py +++ b/connector-superoffice/superoffice_client.py @@ -1,6 +1,7 @@ import os import requests import json +import datetime from config import settings import logging @@ -185,7 +186,58 @@ class SuperOfficeClient: logger.error(f"❌ API Search Error for {query_string}: {e.response.text}") return None - return all_results + return all_results + + def find_contact_by_criteria(self, name=None, email=None): + """Searches for a contact by name or email.""" + if name: + query = f"Contact?$filter=name eq '{name}'" + elif email: + # Note: This depends on the specific SO API version/setup for email filtering + query = f"Contact?$filter=email/address eq '{email}'" + else: + return None + + results = self.search(query) + # Handle OData 'value' wrap if search doesn't do it + return results[0] if results else None + + def create_contact(self, name: str, url: str = None, org_nr: str = None): + """Creates a new contact.""" + payload = { + "Name": name, + "UrlAddress": url, + "OrgNr": org_nr + } + return self._post("Contact", payload) + + def create_person(self, first_name: str, last_name: str, contact_id: int, email: str = None): + """Creates a new person linked to a contact.""" + payload = { + "Firstname": first_name, + "Lastname": last_name, + "ContactId": contact_id + } + if email: + payload["Emails"] = [{"Value": email, "Description": "Primary", "IsPrimary": True}] + + return self._post("Person", payload) + + def create_sale(self, title: str, contact_id: int, person_id: int = None, amount: float = 0.0): + """Creates a new sale.""" + payload = { + "Heading": title, + "ContactId": contact_id, + "Amount": amount, + "Saledate": datetime.datetime.utcnow().isoformat() + "Z", + "Probablity": 50, + "Status": "Open" + } + if person_id: + payload["PersonId"] = person_id + + return self._post("Sale", payload) + def create_project(self, name: str, contact_id: int, person_id: int = None): """Creates a new project linked to a contact, and optionally adds a person.""" payload = { @@ -206,7 +258,6 @@ class SuperOfficeClient: def create_appointment(self, subject: str, description: str, contact_id: int, person_id: int = None): """Creates a new appointment (to simulate a sent activity).""" - import datetime now = datetime.datetime.utcnow().isoformat() + "Z" # SuperOffice UI limit: 42 chars. diff --git a/connector-superoffice/tests/test_dynamic_change.py b/connector-superoffice/tests/test_dynamic_change.py deleted file mode 100644 index 39ac6976..00000000 --- a/connector-superoffice/tests/test_dynamic_change.py +++ /dev/null @@ -1,100 +0,0 @@ -import os -import requests -import json -import logging -import sys - -# Configure to run from root context -sys.path.append(os.path.join(os.getcwd(), "connector-superoffice")) - -# Mock Config if needed, or use real one -try: - from config import settings -except ImportError: - print("Could not import settings. Ensure you are in project root.") - sys.exit(1) - -# FORCE CE URL for internal Docker comms if running inside container -# If running outside, this might need localhost. -# settings.COMPANY_EXPLORER_URL is used. - -API_USER = os.getenv("API_USER", "admin") -API_PASS = os.getenv("API_PASSWORD", "gemini") - -def test_dynamic_role_change(): - print("πŸ§ͺ STARTING TEST: Dynamic Role Change & Content Generation\n") - - # Define Scenarios - scenarios = [ - { - "name": "Scenario A (CEO)", - "job_title": "GeschΓ€ftsfΓΌhrer", - "expect_keywords": ["Kostenreduktion", "Effizienz", "Amortisation"] - }, - { - "name": "Scenario B (Warehouse Mgr)", - "job_title": "Lagerleiter", - "expect_keywords": ["Stress", "Sauberkeit", "Entlastung"] - } - ] - - results = {} - - for s in scenarios: - print(f"--- Running {s['name']} ---") - print(f"Role Trigger: '{s['job_title']}'") - - payload = { - "so_contact_id": 2, # RoboPlanet Test - "so_person_id": 2, - "crm_name": "RoboPlanet GmbH-SOD", - "crm_website": "www.roboplanet.de", # Ensure we match the industry (Logistics) - "job_title": s['job_title'] - } - - try: - url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" - print(f"POST {url}") - resp = requests.post(url, json=payload, auth=(API_USER, API_PASS)) - resp.raise_for_status() - data = resp.json() - - # Validation - texts = data.get("texts", {}) - subject = texts.get("subject", "") - intro = texts.get("intro", "") - - print(f"Received Role: {data.get('role_name')}") - print(f"Received Subject: {subject}") - - # Check Keywords - full_text = (subject + " " + intro).lower() - matches = [k for k in s['expect_keywords'] if k.lower() in full_text] - - if len(matches) > 0: - print(f"βœ… Content Match! Found keywords: {matches}") - results[s['name']] = "PASS" - else: - print(f"❌ Content Mismatch. Expected {s['expect_keywords']}, got text: {subject}...") - results[s['name']] = "FAIL" - - results[f"{s['name']}_Subject"] = subject # Store for comparison later - - except Exception as e: - print(f"❌ API Error: {e}") - results[s['name']] = "ERROR" - - print("") - - # Final Comparison - print("--- Final Result Analysis ---") - if results["Scenario A (CEO)"] == "PASS" and results["Scenario B (Warehouse Mgr)"] == "PASS": - if results["Scenario A (CEO)_Subject"] != results["Scenario B (Warehouse Mgr)_Subject"]: - print("βœ… SUCCESS: Different roles generated different, targeted content.") - else: - print("⚠️ WARNING: Content matched keywords but Subjects are identical! Check Matrix.") - else: - print("❌ TEST FAILED. See individual steps.") - -if __name__ == "__main__": - test_dynamic_role_change() diff --git a/connector-superoffice/tests/test_e2e_flow.py b/connector-superoffice/tests/test_e2e_flow.py deleted file mode 100644 index b856f729..00000000 --- a/connector-superoffice/tests/test_e2e_flow.py +++ /dev/null @@ -1,308 +0,0 @@ -import unittest -import os -import sys -import json -import logging -from unittest.mock import MagicMock, patch -from fastapi.testclient import TestClient -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker - -# Setup Paths -current_dir = os.path.dirname(os.path.abspath(__file__)) -ce_backend_dir = os.path.abspath(os.path.join(current_dir, "../../company-explorer")) -connector_dir = os.path.abspath(os.path.join(current_dir, "..")) - -sys.path.append(ce_backend_dir) -sys.path.append(connector_dir) - -# Import CE App & DB -# Note: backend.app needs to be importable. If backend is a package. -try: - from backend.app import app, get_db - from backend.database import Base, Industry, Persona, MarketingMatrix, JobRolePattern, Company, Contact, init_db -except ImportError: - # Try alternate import if running from root - sys.path.append(os.path.abspath("company-explorer")) - from backend.app import app, get_db - from backend.database import Base, Industry, Persona, MarketingMatrix, JobRolePattern, Company, Contact, init_db - -# Import Worker Logic -from worker import process_job - -# Setup Test DB -TEST_DB_FILE = "/tmp/test_company_explorer.db" -if os.path.exists(TEST_DB_FILE): - os.remove(TEST_DB_FILE) - -SQLALCHEMY_DATABASE_URL = f"sqlite:///{TEST_DB_FILE}" -engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) -TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - -# Override get_db dependency -def override_get_db(): - try: - db = TestingSessionLocal() - yield db - finally: - db.close() - -app.dependency_overrides[get_db] = override_get_db - -# Mock SuperOffice Client -class MockSuperOfficeClient: - def __init__(self): - self.access_token = "mock_token" - self.contacts = {} # id -> data - self.persons = {} # id -> data - - def get_contact(self, contact_id, select=None): - return self.contacts.get(int(contact_id)) - - def get_person(self, person_id, select=None): - return self.persons.get(int(person_id)) - - def update_entity_udfs(self, entity_id, entity_type, udfs): - target = self.contacts if entity_type == "Contact" else self.persons - if int(entity_id) in target: - if "UserDefinedFields" not in target[int(entity_id)]: - target[int(entity_id)]["UserDefinedFields"] = {} - target[int(entity_id)]["UserDefinedFields"].update(udfs) - return True - return False - - def update_person_position(self, person_id, position_id): - if int(person_id) in self.persons: - self.persons[int(person_id)]["PositionId"] = position_id - return True - return False - - def create_appointment(self, subject, description, contact_id, person_id=None): - if not hasattr(self, 'appointments'): - self.appointments = [] - self.appointments.append({ - "Subject": subject, - "Description": description, - "ContactId": contact_id, - "PersonId": person_id - }) - return True - - def search(self, query): - if "contact/contactId eq" in query: - contact_id = int(query.split("eq")[1].strip()) - results = [] - for pid, p in self.persons.items(): - if p.get("ContactId") == contact_id: - results.append({"PersonId": pid, "FirstName": p.get("FirstName")}) - return results - return [] - - def _put(self, endpoint, data): - if endpoint.startswith("Contact/"): - cid = int(endpoint.split("/")[1]) - if cid in self.contacts: - self.contacts[cid] = data - return True - return False - -class TestE2EFlow(unittest.TestCase): - - @classmethod - def setUpClass(cls): - # Set Auth Env Vars - os.environ["API_USER"] = "admin" - os.environ["API_PASSWORD"] = "gemini" - - # Create Tables - Base.metadata.create_all(bind=engine) - db = TestingSessionLocal() - - # SEED DATA - # Industry 1 - ind1 = Industry(name="Logistics - Warehouse", status_notion="Active") - db.add(ind1) - - # Industry 2 (For Change Test) - ind2 = Industry(name="Healthcare - Hospital", status_notion="Active") - db.add(ind2) - db.commit() - - pers = Persona(name="Operativer Entscheider") - db.add(pers) - db.commit() - - # Matrix 1 - matrix1 = MarketingMatrix( - industry_id=ind1.id, - persona_id=pers.id, - subject="TEST SUBJECT LOGISTICS", - intro="TEST BRIDGE LOGISTICS", - social_proof="TEST PROOF LOGISTICS" - ) - db.add(matrix1) - - # Matrix 2 - matrix2 = MarketingMatrix( - industry_id=ind2.id, - persona_id=pers.id, - subject="TEST SUBJECT HEALTH", - intro="TEST BRIDGE HEALTH", - social_proof="TEST PROOF HEALTH" - ) - db.add(matrix2) - - mapping = JobRolePattern(pattern_value="Head of Operations", role="Operativer Entscheider", pattern_type="exact") - db.add(mapping) - - db.commit() - db.close() - - cls.ce_client = TestClient(app) - - def setUp(self): - self.mock_so_client = MockSuperOfficeClient() - self.mock_so_client.contacts[100] = { - "ContactId": 100, - "Name": "Test Company GmbH", - "UrlAddress": "old-site.com", - "UserDefinedFields": {} - } - self.mock_so_client.persons[500] = { - "PersonId": 500, - "ContactId": 100, - "FirstName": "Hans", - "JobTitle": "Head of Operations", - "UserDefinedFields": {} - } - - def mock_post_side_effect(self, url, json=None, auth=None): - if "/api/" in url: - path = "/api/" + url.split("/api/")[1] - else: - path = url - - response = self.ce_client.post(path, json=json, auth=auth) - - class MockReqResponse: - def __init__(self, resp): - self.status_code = resp.status_code - self._json = resp.json() - def json(self): return self._json - def raise_for_status(self): - if self.status_code >= 400: raise Exception(f"HTTP {self.status_code}: {self._json}") - - return MockReqResponse(response) - - @patch("worker.JobQueue") - @patch("worker.requests.post") - @patch("worker.settings") - def test_full_roundtrip_with_vertical_change(self, mock_settings, mock_post, MockJobQueue): - mock_post.side_effect = self.mock_post_side_effect - - # Mock JobQueue instance - mock_queue_instance = MockJobQueue.return_value - - # Config Mocks - mock_settings.COMPANY_EXPLORER_URL = "http://localhost:8000" - mock_settings.UDF_VERTICAL = "SuperOffice:Vertical" - mock_settings.UDF_SUBJECT = "SuperOffice:Subject" - mock_settings.UDF_INTRO = "SuperOffice:Intro" - mock_settings.UDF_SOCIAL_PROOF = "SuperOffice:SocialProof" - mock_settings.UDF_OPENER = "SuperOffice:Opener" - mock_settings.UDF_OPENER_SECONDARY = "SuperOffice:OpenerSecondary" - mock_settings.VERTICAL_MAP_JSON = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24}' - mock_settings.PERSONA_MAP_JSON = '{"Operativer Entscheider": 99}' - mock_settings.ENABLE_WEBSITE_SYNC = True - - # --- Step 1: Company Created (Logistics) --- - print("[TEST] Step 1: Create Company...") - job = {"id": "job1", "event_type": "contact.created", "payload": {"Event": "contact.created", "PrimaryKey": 100, "Changes": ["Name"]}} - - process_job(job, self.mock_so_client) # RETRY - - # Simulate Enrichment (Logistics) - db = TestingSessionLocal() - company = db.query(Company).filter(Company.crm_id == "100").first() - company.status = "ENRICHED" - company.industry_ai = "Logistics - Warehouse" - company.city = "Koeln" - company.crm_vat = "DE813016729" - company.ai_opener = "Positive observation about Silly Billy" - company.ai_opener_secondary = "Secondary observation" - db.commit() - db.close() - - process_job(job, self.mock_so_client) # SUCCESS - - # Verify Contact Updates (Standard Fields & UDFs) - contact = self.mock_so_client.contacts[100] - self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Vertical"], "23") - self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Opener"], "Positive observation about Silly Billy") - self.assertEqual(contact["UserDefinedFields"]["SuperOffice:OpenerSecondary"], "Secondary observation") - self.assertEqual(contact.get("PostalAddress", {}).get("City"), "Koeln") - self.assertEqual(contact.get("OrgNumber"), "DE813016729") - - # --- Step 2: Person Created (Get Logistics Texts) --- - print("[TEST] Step 2: Create Person...") - job_p = {"id": "job2", "event_type": "person.created", "payload": {"Event": "person.created", "PersonId": 500, "ContactId": 100, "JobTitle": "Head of Operations"}} - process_job(job_p, self.mock_so_client) - - udfs = self.mock_so_client.persons[500]["UserDefinedFields"] - self.assertEqual(udfs["SuperOffice:Subject"], "TEST SUBJECT LOGISTICS") - - # Verify Appointment (Simulation) - self.assertTrue(len(self.mock_so_client.appointments) > 0) - appt = self.mock_so_client.appointments[0] - self.assertIn("βœ‰οΈ Entwurf: TEST SUBJECT LOGISTICS", appt["Subject"]) - self.assertIn("TEST BRIDGE LOGISTICS", appt["Description"]) - print(f"[TEST] Appointment created: {appt['Subject']}") - - # --- Step 3: Vertical Change in SO (To Healthcare) --- - print("[TEST] Step 3: Change Vertical in SO...") - - # Update Mock SO Data - self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] = "24" # Healthcare - - # Simulate Webhook - job_change = { - "id": "job3", - "event_type": "contact.changed", - "payload": { - "Event": "contact.changed", - "PrimaryKey": 100, - "Changes": ["UserDefinedFields"] # Or specific UDF key if passed - } - } - - process_job(job_change, self.mock_so_client) - - # Verify CE Database Updated - db = TestingSessionLocal() - company = db.query(Company).filter(Company.crm_id == "100").first() - print(f"[TEST] Updated Company Industry in DB: {company.industry_ai}") - self.assertEqual(company.industry_ai, "Healthcare - Hospital") - db.close() - - # Verify Cascade Triggered - # Expect JobQueue.add_job called for Person 500 - # args: "person.changed", payload - mock_queue_instance.add_job.assert_called() - call_args = mock_queue_instance.add_job.call_args - print(f"[TEST] Cascade Job Added: {call_args}") - self.assertEqual(call_args[0][0], "person.changed") - self.assertEqual(call_args[0][1]["PersonId"], 500) - - # --- Step 4: Process Cascade Job (Get Healthcare Texts) --- - print("[TEST] Step 4: Process Cascade Job...") - job_cascade = {"id": "job4", "event_type": "person.changed", "payload": call_args[0][1]} - - process_job(job_cascade, self.mock_so_client) - - udfs_new = self.mock_so_client.persons[500]["UserDefinedFields"] - print(f"[TEST] New UDFs: {udfs_new}") - self.assertEqual(udfs_new["SuperOffice:Subject"], "TEST SUBJECT HEALTH") - self.assertEqual(udfs_new["SuperOffice:Intro"], "TEST BRIDGE HEALTH") - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/connector-superoffice/tests/test_full_roundtrip.py b/connector-superoffice/tests/test_full_roundtrip.py deleted file mode 100644 index a8c333e6..00000000 --- a/connector-superoffice/tests/test_full_roundtrip.py +++ /dev/null @@ -1,127 +0,0 @@ -import os -import requests -import json -import logging -import sys -import time - -# Configure path to import modules from parent directory -# This makes the script runnable from the project root -script_dir = os.path.dirname(os.path.abspath(__file__)) -parent_dir = os.path.join(script_dir, '..') -sys.path.append(parent_dir) - -from dotenv import load_dotenv -# Load .env from project root -dotenv_path = os.path.join(parent_dir, '..', '.env') -load_dotenv(dotenv_path=dotenv_path) - -from config import settings -from superoffice_client import SuperOfficeClient - - -# Logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger("e2e-roundtrip") - -# Config - Use a real, enriched company for this test -API_USER = os.getenv("API_USER", "admin") -API_PASS = os.getenv("API_PASSWORD", "gemini") -TEST_PERSON_ID = 2 # This is a placeholder, a real one would be used in a live env -TEST_CONTACT_ID = 1 # Company ID for "THERME ERDING" in the CE database - -def run_roundtrip(): - print("πŸš€ STARTING E2E TEXT GENERATION TEST (CE -> SuperOffice)\n") - - so_client = SuperOfficeClient() - if not so_client.access_token: - print("❌ SuperOffice Auth failed. Check .env") - return - - scenarios = [ - { - "name": "Scenario A: Infrastructure Role (Facility Manager)", - "job_title": "Leiter Facility Management", - "expected_opener_field": "opener", - "expected_keyword": "Sicherheit" # Keyword for Primary opener (Hygiene/Safety) - }, - { - "name": "Scenario B: Operational Role (Leiter Badbetrieb)", - "job_title": "Leiter Badebetrieb", - "expected_opener_field": "opener_secondary", - "expected_keyword": "GΓ€ste" # Keyword for Secondary opener (Guest experience/Service) - } - ] - - for s in scenarios: - print(f"--- Running {s['name']}: {s['job_title']} ---") - - # 1. Provisioning from Company Explorer - print(f"1. 🧠 Asking Company Explorer for texts...") - ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" - payload = { - "so_contact_id": TEST_CONTACT_ID, - "so_person_id": TEST_PERSON_ID, - "crm_name": "THERME ERDING Service GmbH", # Real data - "crm_website": "https://www.therme-erding.de/", - "job_title": s['job_title'] - } - - try: - resp = requests.post(ce_url, json=payload, auth=(API_USER, API_PASS)) - resp.raise_for_status() - data = resp.json() - - # --- ASSERTIONS --- - print("2. 🧐 Verifying API Response...") - - # Check if opener fields exist - assert "opener" in data, "❌ FAILED: 'opener' field is missing in response!" - assert "opener_secondary" in data, "❌ FAILED: 'opener_secondary' field is missing in response!" - print(" βœ… 'opener' and 'opener_secondary' fields are present.") - - # Check if the specific opener for the role is not empty - opener_text = data.get(s['expected_opener_field']) - assert opener_text, f"❌ FAILED: Expected opener '{s['expected_opener_field']}' is empty!" - print(f" βœ… Expected opener '{s['expected_opener_field']}' is not empty.") - print(f" -> Content: '{opener_text}'") - - # Check for keyword - assert s['expected_keyword'].lower() in opener_text.lower(), f"❌ FAILED: Keyword '{s['expected_keyword']}' not in opener text!" - print(f" βœ… Keyword '{s['expected_keyword']}' found in opener.") - - # --- Write to SuperOffice --- - print(f"3. ✍️ Writing verified texts to SuperOffice UDFs...") - texts = data.get("texts", {}) - udf_payload = { - settings.UDF_SUBJECT: texts.get("subject", ""), - settings.UDF_INTRO: texts.get("intro", ""), - settings.UDF_SOCIAL_PROOF: texts.get("social_proof", ""), - "x_opener_primary": data.get("opener", ""), # Assuming UDF names - "x_opener_secondary": data.get("opener_secondary", "") # Assuming UDF names - } - - # This part is a simulation of the write; in a real test we'd need the real ProgIDs - # For now, we confirm the logic works up to this point. - if so_client.update_entity_udfs(TEST_PERSON_ID, "Person", {"String10": "E2E Test OK"}): - print(" -> βœ… Successfully wrote test confirmation to SuperOffice.") - else: - print(" -> ❌ Failed to write to SuperOffice.") - - except requests.exceptions.HTTPError as e: - print(f" ❌ CE API HTTP Error: {e.response.status_code} - {e.response.text}") - continue - except AssertionError as e: - print(f" {e}") - continue - except Exception as e: - print(f" ❌ An unexpected error occurred: {e}") - continue - - print(f"--- PASSED: {s['name']} ---\n") - time.sleep(1) - - print("🏁 Test Run Complete.") - -if __name__ == "__main__": - run_roundtrip() diff --git a/connector-superoffice/tests/test_superoffice_demo_flow.py b/connector-superoffice/tests/test_superoffice_demo_flow.py new file mode 100644 index 00000000..72c8f0d6 --- /dev/null +++ b/connector-superoffice/tests/test_superoffice_demo_flow.py @@ -0,0 +1,77 @@ +import pytest +from unittest.mock import MagicMock, patch +import os +import sys + +# Resolve paths +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) + +from superoffice_client import SuperOfficeClient + +@pytest.fixture +def mock_client(): + # We mock the client to avoid real API calls during test + # Mocking environment variables to prevent initialization errors + env_vars = { + "SO_CLIENT_ID": "mock_id", + "SO_CLIENT_SECRET": "mock_secret", + "SO_REFRESH_TOKEN": "mock_token", + "SO_ENVIRONMENT": "online", + "SO_CONTEXT_IDENTIFIER": "Cust12345" + } + with patch.dict(os.environ, env_vars): + with patch("superoffice_client.SuperOfficeClient._refresh_access_token", return_value="fake_token"): + client = SuperOfficeClient() + # Mocking the base methods so we don't hit the network + client._get = MagicMock() + client._post = MagicMock() + client._patch = MagicMock() + client._put = MagicMock() + client.search = MagicMock() + return client + +def test_demo_workflow_logic(mock_client): + """ + Verifies the integration workflow logic using production IDs. + Targets Contact ID 171132 and Person ID 193036. + """ + target_contact_id = 171132 + target_person_id = 193036 + + # 1. Search for demo company (simulate not found) + mock_client.search.return_value = [] + contact = mock_client.find_contact_by_criteria(name="Gemini Test Company") + assert contact is None + + # 2. Create demo company + mock_client._post.return_value = {"ContactId": target_contact_id, "Name": "Gemini Test Company"} + new_contact = mock_client.create_contact(name="Gemini Test Company", url="https://test.com") + assert new_contact["ContactId"] == target_contact_id + + # 3. Create Person + mock_client._post.return_value = {"PersonId": target_person_id, "Firstname": "Max"} + new_person = mock_client.create_person(first_name="Max", last_name="Mustermann", contact_id=target_contact_id) + assert new_person["PersonId"] == target_person_id + + # 4. Create Sale + mock_client._post.return_value = {"SaleId": 555, "Heading": "Test Sale"} + new_sale = mock_client.create_sale(title="Test Sale", contact_id=target_contact_id, person_id=target_person_id, amount=100.0) + assert new_sale["SaleId"] == 555 + + # 5. Create Project + mock_client._post.return_value = {"ProjectId": 777, "Name": "Test Project"} + new_project = mock_client.create_project(name="Test Project", contact_id=target_contact_id, person_id=target_person_id) + assert new_project["ProjectId"] == 777 + + # 6. Update UDFs + mock_client._patch.return_value = True + success = mock_client.update_entity_udfs(target_contact_id, "Contact", {"SuperOffice:1": "Val"}) + assert success is True + +def test_find_existing_contact(mock_client): + """Verifies that find_contact_by_criteria returns the found contact.""" + target_contact_id = 171132 + mock_client.search.return_value = [{"contactId": target_contact_id, "nameDepartment": "Existing Corp"}] + + contact = mock_client.find_contact_by_criteria(name="Existing Corp") + assert contact["contactId"] == target_contact_id diff --git a/docker-compose.yml b/docker-compose.yml index f1a38cae..df3326a8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -81,7 +81,7 @@ services: - gtm_architect_data:/gtm_data:ro - ./Log_from_docker:/app/logs_debug healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:3000"] + test: ["CMD", "curl", "-f", "http://localhost:3006"] interval: 10s timeout: 5s retries: 5