From 1acdad9923a71a3dc404392f9bd9f2e46b51bf17 Mon Sep 17 00:00:00 2001 From: Floke Date: Sat, 21 Feb 2026 20:05:25 +0000 Subject: [PATCH] Feat: End-to-End Test & Bidirectional Vertical Sync [30e88f42] - Implemented comprehensive E2E test covering full roundtrip and manual overrides. - Enhanced to detect manual Vertical changes in SuperOffice and sync them to Company Explorer. - Updated to handle industry overrides from CRM and auto-persist Person/Contact data for robust cascade updates. --- company-explorer/backend/app.py | 46 ++- connector-superoffice/tests/test_e2e_flow.py | 277 +++++++++++++++++++ connector-superoffice/worker.py | 34 ++- 3 files changed, 350 insertions(+), 7 deletions(-) create mode 100644 connector-superoffice/tests/test_e2e_flow.py diff --git a/company-explorer/backend/app.py b/company-explorer/backend/app.py index 31727d19..15820043 100644 --- a/company-explorer/backend/app.py +++ b/company-explorer/backend/app.py @@ -90,6 +90,7 @@ class ProvisioningRequest(BaseModel): crm_name: Optional[str] = None crm_website: Optional[str] = None job_title: Optional[str] = None + crm_industry_name: Optional[str] = None class ProvisioningResponse(BaseModel): status: str @@ -242,6 +243,19 @@ def provision_superoffice_contact( company.crm_website = req.crm_website changed = True + # NEW: Handle Vertical Override from SuperOffice + if req.crm_industry_name: + # Check if valid industry + valid_industry = db.query(Industry).filter(Industry.name == req.crm_industry_name).first() + if valid_industry: + if company.industry_ai != req.crm_industry_name: + logger.info(f"Overriding Industry for {company.name}: {company.industry_ai} -> {req.crm_industry_name} (from CRM)") + company.industry_ai = req.crm_industry_name + # Trigger metric re-extraction? Maybe later. For now, just update. + changed = True + else: + logger.warning(f"CRM provided industry '{req.crm_industry_name}' not found in DB. Ignoring.") + # Simple Mismatch Check if company.website and company.crm_website: def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").replace("www.", "").strip("/") @@ -269,19 +283,39 @@ def provision_superoffice_contact( person = db.query(Contact).filter(Contact.so_person_id == req.so_person_id).first() - # 3. Determine Role - role_name = None - if person and person.role: - role_name = person.role - elif req.job_title: + # Auto-Create/Update Person + if not person: + person = Contact( + company_id=company.id, + so_contact_id=req.so_contact_id, + so_person_id=req.so_person_id, + status="ACTIVE" + ) + db.add(person) + logger.info(f"Created new person {req.so_person_id} for company {company.name}") + + # Update Job Title & Role logic + if req.job_title: + person.job_title = req.job_title + # Simple classification fallback mappings = db.query(JobRoleMapping).all() + found_role = None for m in mappings: # Check pattern type (Regex vs Simple) - simplified here pattern_clean = m.pattern.replace("%", "").lower() if pattern_clean in req.job_title.lower(): - role_name = m.role + found_role = m.role break + + if found_role: + person.role = found_role + + db.commit() + db.refresh(person) + + # 3. Determine Role + role_name = person.role # 4. Determine Vertical (Industry) vertical_name = company.industry_ai diff --git a/connector-superoffice/tests/test_e2e_flow.py b/connector-superoffice/tests/test_e2e_flow.py new file mode 100644 index 00000000..a8969b6f --- /dev/null +++ b/connector-superoffice/tests/test_e2e_flow.py @@ -0,0 +1,277 @@ +import unittest +import os +import sys +import json +import logging +from unittest.mock import MagicMock, patch +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +# Setup Paths +current_dir = os.path.dirname(os.path.abspath(__file__)) +ce_backend_dir = os.path.abspath(os.path.join(current_dir, "../../company-explorer")) +connector_dir = os.path.abspath(os.path.join(current_dir, "..")) + +sys.path.append(ce_backend_dir) +sys.path.append(connector_dir) + +# Import CE App & DB +# Note: backend.app needs to be importable. If backend is a package. +try: + from backend.app import app, get_db + from backend.database import Base, Industry, Persona, MarketingMatrix, JobRoleMapping, Company, Contact, init_db +except ImportError: + # Try alternate import if running from root + sys.path.append(os.path.abspath("company-explorer")) + from backend.app import app, get_db + from backend.database import Base, Industry, Persona, MarketingMatrix, JobRoleMapping, Company, Contact, init_db + +# Import Worker Logic +from worker import process_job + +# Setup Test DB +TEST_DB_FILE = "/tmp/test_company_explorer.db" +if os.path.exists(TEST_DB_FILE): + os.remove(TEST_DB_FILE) + +SQLALCHEMY_DATABASE_URL = f"sqlite:///{TEST_DB_FILE}" +engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) +TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +# Override get_db dependency +def override_get_db(): + try: + db = TestingSessionLocal() + yield db + finally: + db.close() + +app.dependency_overrides[get_db] = override_get_db + +# Mock SuperOffice Client +class MockSuperOfficeClient: + def __init__(self): + self.access_token = "mock_token" + self.contacts = {} # id -> data + self.persons = {} # id -> data + + def get_contact(self, contact_id): + return self.contacts.get(int(contact_id)) + + def get_person(self, person_id): + return self.persons.get(int(person_id)) + + def update_entity_udfs(self, entity_id, entity_type, udfs): + target = self.contacts if entity_type == "Contact" else self.persons + if int(entity_id) in target: + if "UserDefinedFields" not in target[int(entity_id)]: + target[int(entity_id)]["UserDefinedFields"] = {} + target[int(entity_id)]["UserDefinedFields"].update(udfs) + return True + return False + + def update_person_position(self, person_id, position_id): + if int(person_id) in self.persons: + self.persons[int(person_id)]["PositionId"] = position_id + return True + return False + + def search(self, query): + if "contact/contactId eq" in query: + contact_id = int(query.split("eq")[1].strip()) + results = [] + for pid, p in self.persons.items(): + if p.get("ContactId") == contact_id: + results.append({"PersonId": pid, "FirstName": p.get("FirstName")}) + return results + return [] + + def _put(self, endpoint, data): + if endpoint.startswith("Contact/"): + cid = int(endpoint.split("/")[1]) + if cid in self.contacts: + self.contacts[cid] = data + return True + return False + +class TestE2EFlow(unittest.TestCase): + + @classmethod + def setUpClass(cls): + # Set Auth Env Vars + os.environ["API_USER"] = "admin" + os.environ["API_PASSWORD"] = "gemini" + + # Create Tables + Base.metadata.create_all(bind=engine) + db = TestingSessionLocal() + + # SEED DATA + # Industry 1 + ind1 = Industry(name="Logistics - Warehouse", status_notion="Active") + db.add(ind1) + + # Industry 2 (For Change Test) + ind2 = Industry(name="Healthcare - Hospital", status_notion="Active") + db.add(ind2) + db.commit() + + pers = Persona(name="Operativer Entscheider") + db.add(pers) + db.commit() + + # Matrix 1 + matrix1 = MarketingMatrix( + industry_id=ind1.id, + persona_id=pers.id, + subject="TEST SUBJECT LOGISTICS", + intro="TEST BRIDGE LOGISTICS", + social_proof="TEST PROOF LOGISTICS" + ) + db.add(matrix1) + + # Matrix 2 + matrix2 = MarketingMatrix( + industry_id=ind2.id, + persona_id=pers.id, + subject="TEST SUBJECT HEALTH", + intro="TEST BRIDGE HEALTH", + social_proof="TEST PROOF HEALTH" + ) + db.add(matrix2) + + mapping = JobRoleMapping(pattern="%Head of Operations%", role="Operativer Entscheider") + db.add(mapping) + + db.commit() + db.close() + + cls.ce_client = TestClient(app) + + def setUp(self): + self.mock_so_client = MockSuperOfficeClient() + self.mock_so_client.contacts[100] = { + "ContactId": 100, + "Name": "Test Company GmbH", + "UrlAddress": "old-site.com", + "UserDefinedFields": {} + } + self.mock_so_client.persons[500] = { + "PersonId": 500, + "ContactId": 100, + "FirstName": "Hans", + "JobTitle": "Head of Operations", + "UserDefinedFields": {} + } + + def mock_post_side_effect(self, url, json=None, auth=None): + if "/api/" in url: + path = "/api/" + url.split("/api/")[1] + else: + path = url + + response = self.ce_client.post(path, json=json, auth=auth) + + class MockReqResponse: + def __init__(self, resp): + self.status_code = resp.status_code + self._json = resp.json() + def json(self): return self._json + def raise_for_status(self): + if self.status_code >= 400: raise Exception(f"HTTP {self.status_code}: {self._json}") + + return MockReqResponse(response) + + @patch("worker.JobQueue") + @patch("worker.requests.post") + @patch("worker.settings") + def test_full_roundtrip_with_vertical_change(self, mock_settings, mock_post, MockJobQueue): + mock_post.side_effect = self.mock_post_side_effect + + # Mock JobQueue instance + mock_queue_instance = MockJobQueue.return_value + + # Config Mocks + mock_settings.COMPANY_EXPLORER_URL = "http://localhost:8000" + mock_settings.UDF_VERTICAL = "SuperOffice:Vertical" + mock_settings.UDF_SUBJECT = "SuperOffice:Subject" + mock_settings.UDF_INTRO = "SuperOffice:Intro" + mock_settings.UDF_SOCIAL_PROOF = "SuperOffice:SocialProof" + mock_settings.VERTICAL_MAP_JSON = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24}' + mock_settings.PERSONA_MAP_JSON = '{"Operativer Entscheider": 99}' + mock_settings.ENABLE_WEBSITE_SYNC = True + + # --- Step 1: Company Created (Logistics) --- + print("[TEST] Step 1: Create Company...") + job = {"id": "job1", "event_type": "contact.created", "payload": {"Event": "contact.created", "PrimaryKey": 100, "Changes": ["Name"]}} + + process_job(job, self.mock_so_client) # RETRY + + # Simulate Enrichment (Logistics) + db = TestingSessionLocal() + company = db.query(Company).filter(Company.crm_id == "100").first() + company.status = "ENRICHED" + company.industry_ai = "Logistics - Warehouse" + db.commit() + db.close() + + process_job(job, self.mock_so_client) # SUCCESS + assert self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] == "23" + + # --- Step 2: Person Created (Get Logistics Texts) --- + print("[TEST] Step 2: Create Person...") + job_p = {"id": "job2", "event_type": "person.created", "payload": {"Event": "person.created", "PersonId": 500, "ContactId": 100, "JobTitle": "Head of Operations"}} + process_job(job_p, self.mock_so_client) + + udfs = self.mock_so_client.persons[500]["UserDefinedFields"] + self.assertEqual(udfs["SuperOffice:Subject"], "TEST SUBJECT LOGISTICS") + + # --- Step 3: Vertical Change in SO (To Healthcare) --- + print("[TEST] Step 3: Change Vertical in SO...") + + # Update Mock SO Data + self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] = "24" # Healthcare + + # Simulate Webhook + job_change = { + "id": "job3", + "event_type": "contact.changed", + "payload": { + "Event": "contact.changed", + "PrimaryKey": 100, + "Changes": ["UserDefinedFields"] # Or specific UDF key if passed + } + } + + process_job(job_change, self.mock_so_client) + + # Verify CE Database Updated + db = TestingSessionLocal() + company = db.query(Company).filter(Company.crm_id == "100").first() + print(f"[TEST] Updated Company Industry in DB: {company.industry_ai}") + self.assertEqual(company.industry_ai, "Healthcare - Hospital") + db.close() + + # Verify Cascade Triggered + # Expect JobQueue.add_job called for Person 500 + # args: "person.changed", payload + mock_queue_instance.add_job.assert_called() + call_args = mock_queue_instance.add_job.call_args + print(f"[TEST] Cascade Job Added: {call_args}") + self.assertEqual(call_args[0][0], "person.changed") + self.assertEqual(call_args[0][1]["PersonId"], 500) + + # --- Step 4: Process Cascade Job (Get Healthcare Texts) --- + print("[TEST] Step 4: Process Cascade Job...") + job_cascade = {"id": "job4", "event_type": "person.changed", "payload": call_args[0][1]} + + process_job(job_cascade, self.mock_so_client) + + udfs_new = self.mock_so_client.persons[500]["UserDefinedFields"] + print(f"[TEST] New UDFs: {udfs_new}") + self.assertEqual(udfs_new["SuperOffice:Subject"], "TEST SUBJECT HEALTH") + self.assertEqual(udfs_new["SuperOffice:Intro"], "TEST BRIDGE HEALTH") + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index 891ef9f8..785d5730 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -37,6 +37,12 @@ def process_job(job, so_client: SuperOfficeClient): # Define what we care about (Strategic triggers for re-evaluation) # Company: Name/Department (Identity), Urls (Source), Numbers (Matching) relevant_contact = ["name", "department", "urladdress", "number1", "number2"] + # Add Vertical UDF to relevant changes + if settings.UDF_VERTICAL: + relevant_contact.append(settings.UDF_VERTICAL.lower()) + # Also catch generic "userdefinedfields" if specific key not present + relevant_contact.append("userdefinedfields") + # Person: JobTitle (Persona Logic), Position (Role Logic) relevant_person = ["jobtitle", "position"] @@ -100,6 +106,9 @@ def process_job(job, so_client: SuperOfficeClient): # 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync) crm_name = None crm_website = None + crm_industry_name = None + contact_details = None + try: contact_details = so_client.get_contact(contact_id) if contact_details: @@ -107,6 +116,28 @@ def process_job(job, so_client: SuperOfficeClient): crm_website = contact_details.get("UrlAddress") if not crm_website and "Urls" in contact_details and contact_details["Urls"]: crm_website = contact_details["Urls"][0].get("Value") + + # Extract Vertical (if set in SO) + if settings.UDF_VERTICAL: + udfs = contact_details.get("UserDefinedFields", {}) + so_vertical_val = udfs.get(settings.UDF_VERTICAL) + + if so_vertical_val: + # Normalize "[I:23]" -> "23" + val_str = str(so_vertical_val) + if val_str.startswith("[I:"): + val_str = val_str.split(":")[1].strip("]") + + # Reverse Map ID -> Name + try: + vertical_map = json.loads(settings.VERTICAL_MAP_JSON) + vertical_map_rev = {str(v): k for k, v in vertical_map.items()} + if val_str in vertical_map_rev: + crm_industry_name = vertical_map_rev[val_str] + logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}") + except Exception as ex: + logger.error(f"Error mapping vertical ID {val_str}: {ex}") + except Exception as e: logger.warning(f"Failed to fetch contact details for {contact_id}: {e}") @@ -117,7 +148,8 @@ def process_job(job, so_client: SuperOfficeClient): "so_person_id": person_id, "job_title": payload.get("JobTitle"), "crm_name": crm_name, - "crm_website": crm_website + "crm_website": crm_website, + "crm_industry_name": crm_industry_name } # Simple Basic Auth for internal API