diff --git a/.dev_session/SESSION_INFO b/.dev_session/SESSION_INFO index dcabcef6..9b7fbaf0 100644 --- a/.dev_session/SESSION_INFO +++ b/.dev_session/SESSION_INFO @@ -1 +1 @@ -{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-22T08:20:16.802201"} \ No newline at end of file +{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-22T14:30:47.658775"} \ No newline at end of file diff --git a/MIGRATION_PLAN.md b/MIGRATION_PLAN.md index df5d7a74..f63f911e 100644 --- a/MIGRATION_PLAN.md +++ b/MIGRATION_PLAN.md @@ -258,4 +258,29 @@ Die Implementierung der v3.0-Logik war von mehreren hartnäckigen Problemen gepr * **Lösung:** Robuste Prüfung auf `None` vor der String-Manipulation (`(value or "").lower()`) implementiert. * **Test:** Ein vollständiger E2E-Test (`test_e2e_full_flow.py`) wurde etabliert, der Provisioning, Analyse und Opener-Generierung automatisiert verifiziert. -Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik. \ No newline at end of file +Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik. + +### 17.5 Lessons Learned: SuperOffice Address Sync (Feb 22, 2026) + +Die Synchronisation von Stammdaten (Adresse, VAT) erforderte ein tiefes Eintauchen in die API-Struktur. + +1. **Field Naming:** Die REST-API verlangt strikt `OrgNr` für die Umsatzsteuer-ID, nicht `OrgNumber` oder `VatNo`. +2. **Nested Updates:** Adressen müssen tief verschachtelt übergeben werden (`Address.Postal.City`), nicht flach (`PostalAddress`). +3. **Atomic Strategy:** Getrennte Updates für UDFs und Standardfelder führen zu Race Conditions. **Nur ein gebündelter PUT-Request** auf den Haupt-Endpunkt garantiert, dass keine Daten (durch veraltete Reads) überschrieben werden. + +--- + +## 18. Next Steps & Todos (Post-Migration) + +Nach Abschluss der Kern-Migration stehen folgende Optimierungen an: + +### Task 1: Monitoring & Alerting +* **Dashboards:** Ausbau des Connector-Dashboards (`/connector/dashboard`) um Fehler-Statistiken und Retry-Logik. +* **Alerting:** Benachrichtigung (z.B. Slack/Teams) bei wiederholten Sync-Fehlern. + +### Task 2: Robust Address Parsing +* **Scraper:** Derzeit verlässt sich der Scraper auf das LLM für die Adress-Extraktion. Eine Validierung gegen Google Maps API oder PLZ-Verzeichnisse würde die Datenqualität ("Golden Record") massiv erhöhen. + +### Task 3: "Person-First" Logic +* **Aktuell:** Trigger ist meist das Unternehmen. +* **Zukunft:** Wenn eine Person ohne Firma angelegt wird, sollte der CE proaktiv die Domain der E-Mail-Adresse nutzen, um das Unternehmen im Hintergrund zu suchen und anzulegen ("Reverse Lookup"). \ No newline at end of file diff --git a/add_mapping.py b/add_mapping.py new file mode 100644 index 00000000..ef143eb2 --- /dev/null +++ b/add_mapping.py @@ -0,0 +1,12 @@ +import sqlite3 + +def add_mapping(): + conn = sqlite3.connect('/app/companies_v3_fixed_2.db') + cursor = conn.cursor() + cursor.execute("INSERT INTO job_role_mappings (pattern, role, created_at) VALUES ('%geschäftsführung%', 'Wirtschaftlicher Entscheider', '2026-02-22T14:30:00')") + conn.commit() + conn.close() + print("Added mapping for geschäftsführung") + +if __name__ == "__main__": + add_mapping() diff --git a/check_mappings.py b/check_mappings.py new file mode 100644 index 00000000..b1dfedd9 --- /dev/null +++ b/check_mappings.py @@ -0,0 +1,14 @@ +import sqlite3 + +def check_mappings(): + conn = sqlite3.connect('/app/companies_v3_fixed_2.db') + cursor = conn.cursor() + cursor.execute("SELECT * FROM job_role_mappings") + rows = cursor.fetchall() + print("--- Job Role Mappings ---") + for row in rows: + print(row) + conn.close() + +if __name__ == "__main__": + check_mappings() diff --git a/company-explorer/backend/app.py b/company-explorer/backend/app.py index 5acd1409..c515a40c 100644 --- a/company-explorer/backend/app.py +++ b/company-explorer/backend/app.py @@ -243,41 +243,45 @@ def provision_superoffice_contact( # 1c. Update CRM Snapshot Data (The Double Truth) changed = False - if req.crm_name: - company.crm_name = req.crm_name - changed = True - if req.crm_website: - company.crm_website = req.crm_website - changed = True + name_changed_significantly = False - # NEW: Handle Vertical Override from SuperOffice - if req.crm_industry_name: - # Check if valid industry - valid_industry = db.query(Industry).filter(Industry.name == req.crm_industry_name).first() - if valid_industry: - if company.industry_ai != req.crm_industry_name: - logger.info(f"Overriding Industry for {company.name}: {company.industry_ai} -> {req.crm_industry_name} (from CRM)") - company.industry_ai = req.crm_industry_name - # Trigger metric re-extraction? Maybe later. For now, just update. - changed = True - else: - logger.warning(f"CRM provided industry '{req.crm_industry_name}' not found in DB. Ignoring.") - - # Simple Mismatch Check - if company.website and company.crm_website: - def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").replace("www.", "").strip("/") - if norm(company.website) != norm(company.crm_website): - company.data_mismatch_score = 0.8 # High mismatch + if req.crm_name and req.crm_name != company.crm_name: + logger.info(f"CRM Name Change detected for ID {company.crm_id}: {company.crm_name} -> {req.crm_name}") + company.crm_name = req.crm_name + # If the name changes, we should potentially re-evaluate the whole company + # especially if the status was already ENRICHED + if company.status == "ENRICHED": + name_changed_significantly = True + changed = True + + if req.crm_website: + if company.crm_website != req.crm_website: + company.crm_website = req.crm_website changed = True - else: - if company.data_mismatch_score != 0.0: - company.data_mismatch_score = 0.0 - changed = True + + # ... if changed: company.updated_at = datetime.utcnow() + if name_changed_significantly: + logger.info(f"Triggering FRESH discovery for {company.name} due to CRM name change.") + company.status = "NEW" + # We don't change the internal 'name' yet, Discovery will do that or we keep it as anchor. + # But we must clear old results to avoid stale data. + company.industry_ai = None + company.ai_opener = None + company.ai_opener_secondary = None + background_tasks.add_task(run_discovery_task, company.id) + db.commit() + # If we just triggered a fresh discovery, tell the worker to wait. + if name_changed_significantly: + return ProvisioningResponse( + status="processing", + company_name=company.crm_name + ) + # 2. Find Contact (Person) if req.so_person_id is None: # Just a company sync, but return all company-level metadata @@ -316,13 +320,14 @@ def provision_superoffice_contact( mappings = db.query(JobRoleMapping).all() found_role = None for m in mappings: - # Check pattern type (Regex vs Simple) - simplified here pattern_clean = m.pattern.replace("%", "").lower() if pattern_clean in req.job_title.lower(): found_role = m.role break - if found_role: + # ALWAYS update role, even if to None, to avoid 'sticking' old roles + if found_role != person.role: + logger.info(f"Role Change for {person.so_person_id}: {person.role} -> {found_role}") person.role = found_role db.commit() diff --git a/connector-superoffice/README.md b/connector-superoffice/README.md index ed317a67..5c425c4c 100644 --- a/connector-superoffice/README.md +++ b/connector-superoffice/README.md @@ -130,6 +130,22 @@ Die `config.py` wurde auf natives Python (`os.getenv`) umgestellt, um Konflikte Der Worker wurde erweitert, um auch `City` und `OrgNumber` (VAT) zurückzuschreiben. **Status (21.02.2026):** Implementiert, aber noch im Feinschliff. Logs zeigen teils Re-Queueing während das Enrichment läuft. +### 8. Lessons Learned: Address & VAT Sync (Solved Feb 22, 2026) + +Die Synchronisation von Adressdaten stellte sich als unerwartet komplex heraus. Hier die wichtigsten Erkenntnisse für zukünftige Entwickler: + +1. **Das "OrgNumber"-Phantom:** + * **Problem:** In der API-Dokumentation oft als `OrgNumber` referenziert, akzeptiert die WebAPI (REST) strikt nur **`OrgNr`**. + * **Lösung:** Mapping im `worker.py` hart auf `OrgNr` umgestellt. + +2. **Verschachtelte Adress-Struktur:** + * **Problem:** Ein flaches Update auf `PostalAddress` wird von der API stillschweigend ignoriert (HTTP 200, aber keine Änderung). + * **Lösung:** Das Update muss die tiefe Struktur respektieren: `Address["Postal"]["City"]` UND `Address["Street"]["City"]`. Beide müssen explizit gesetzt werden, um in der UI sichtbar zu sein. + +3. **Die "Race Condition" Falle (Atomic Updates):** + * **Problem:** Wenn Adress-Daten (`PUT Contact`) und UDF-Daten (`PUT Contact/Udef`) in separaten API-Aufrufen kurz hintereinander gesendet werden, gewinnt der letzte Call. Da dieser oft auf einem *veralteten* `GET`-Stand basiert (bevor das erste Update durch war), wurde die Adresse wieder mit "Leer" überschrieben. + * **Lösung:** **Atomic Update Strategy**. Der Worker sammelt *alle* Änderungen (Adresse, VAT, Vertical, Openers) in einem einzigen Dictionary und sendet genau **einen** `PUT`-Request an den Kontakt-Endpunkt. Dies garantiert Konsistenz. + ## Appendix: The "First Sentence" Prompt This is the core logic used to generate the company-specific opener. diff --git a/connector-superoffice/debug_person_4.py b/connector-superoffice/debug_person_4.py new file mode 100644 index 00000000..595589d2 --- /dev/null +++ b/connector-superoffice/debug_person_4.py @@ -0,0 +1,18 @@ +import os +import json +import sys + +# Setup Paths +connector_dir = "/app/connector-superoffice" +sys.path.append(connector_dir) + +from superoffice_client import SuperOfficeClient + +def debug_person(person_id): + so_client = SuperOfficeClient() + person = so_client.get_person(person_id) + print("--- FULL PERSON DATA ---") + print(json.dumps(person, indent=2)) + +if __name__ == "__main__": + debug_person(4) diff --git a/connector-superoffice/superoffice_client.py b/connector-superoffice/superoffice_client.py index 9af4e6e5..d3cfa1d3 100644 --- a/connector-superoffice/superoffice_client.py +++ b/connector-superoffice/superoffice_client.py @@ -157,17 +157,26 @@ class SuperOfficeClient: import datetime now = datetime.datetime.utcnow().isoformat() + "Z" + # SuperOffice UI limit: 42 chars. + # We put exactly this in the FIRST line of the description. + short_title = (subject[:39] + '...') if len(subject) > 42 else subject + + # SuperOffice often 'steals' the first line of the description for the list view header. + # So we give it exactly the subject it wants, then two newlines for the real body. + formatted_description = f"{short_title}\n\n{description}" + payload = { - "Description": f"{subject}\n\n{description}", + "Description": formatted_description, "Contact": {"ContactId": contact_id}, "StartDate": now, "EndDate": now, - "Task": {"Id": 1} # Usually 'Follow-up' or similar, depending on SO config + "MainHeader": short_title, + "Task": {"Id": 1} } if person_id: payload["Person"] = {"PersonId": person_id} - print(f"Creating new appointment: {subject}...") + logger.info(f"Creating new appointment: {short_title}...") return self._post("Appointment", payload) def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict): diff --git a/connector-superoffice/tests/test_e2e_flow.py b/connector-superoffice/tests/test_e2e_flow.py index fd15c3f8..a2c00f19 100644 --- a/connector-superoffice/tests/test_e2e_flow.py +++ b/connector-superoffice/tests/test_e2e_flow.py @@ -77,6 +77,17 @@ class MockSuperOfficeClient: return True return False + def create_appointment(self, subject, description, contact_id, person_id=None): + if not hasattr(self, 'appointments'): + self.appointments = [] + self.appointments.append({ + "Subject": subject, + "Description": description, + "ContactId": contact_id, + "PersonId": person_id + }) + return True + def search(self, query): if "contact/contactId eq" in query: contact_id = int(query.split("eq")[1].strip()) @@ -240,6 +251,13 @@ class TestE2EFlow(unittest.TestCase): udfs = self.mock_so_client.persons[500]["UserDefinedFields"] self.assertEqual(udfs["SuperOffice:Subject"], "TEST SUBJECT LOGISTICS") + # Verify Appointment (Simulation) + self.assertTrue(len(self.mock_so_client.appointments) > 0) + appt = self.mock_so_client.appointments[0] + self.assertIn("✉️ Entwurf: TEST SUBJECT LOGISTICS", appt["Subject"]) + self.assertIn("TEST BRIDGE LOGISTICS", appt["Description"]) + print(f"[TEST] Appointment created: {appt['Subject']}") + # --- Step 3: Vertical Change in SO (To Healthcare) --- print("[TEST] Step 3: Change Vertical in SO...") diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index ab06848f..9d28f484 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -25,46 +25,10 @@ def process_job(job, so_client: SuperOfficeClient): payload = job['payload'] event_low = job['event_type'].lower() - # 0. Fast-Fail on Irrelevant Events (Noise Reduction) - if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): - logger.info(f"Skipping irrelevant event type: {job['event_type']}") - return "SUCCESS" - - # 0b. Fast-Fail on Irrelevant Field Changes - # Only if 'Changes' list is provided by Webhook - changes = [c.lower() for c in payload.get("Changes", [])] - if changes: - # Define what we care about (Strategic triggers for re-evaluation) - # Company: Name/Department (Identity), Urls (Source), Numbers (Matching) - relevant_contact = ["name", "department", "urladdress", "number1", "number2"] - # Add Vertical UDF to relevant changes - if settings.UDF_VERTICAL: - relevant_contact.append(settings.UDF_VERTICAL.lower()) - # Also catch generic "userdefinedfields" if specific key not present - relevant_contact.append("userdefinedfields") - - # Person: JobTitle (Persona Logic), Position (Role Logic) - relevant_person = ["jobtitle", "position"] - - is_relevant = False - - if "contact" in event_low: - if any(f in changes for f in relevant_contact): - is_relevant = True - elif "urls" in changes: # Website might be in Urls collection - is_relevant = True - - if "person" in event_low: - if any(f in changes for f in relevant_person): - is_relevant = True - - if not is_relevant: - logger.info(f"Skipping irrelevant changes: {changes}") - return "SUCCESS" - - # 1. Extract IDs from Webhook Payload + # 1. Extract IDs Early (Crucial for logging and logic) person_id = None contact_id = None + job_title = payload.get("JobTitle") if "PersonId" in payload: person_id = int(payload["PersonId"]) @@ -76,19 +40,59 @@ def process_job(job, so_client: SuperOfficeClient): elif "PrimaryKey" in payload and "contact" in event_low: contact_id = int(payload["PrimaryKey"]) - # Fallback/Deep Lookup - if not contact_id and person_id: - person_data = so_client.get_person(person_id) - if person_data and "Contact" in person_data: - contact_id = person_data["Contact"].get("ContactId") - + # Fallback/Deep Lookup & Fetch JobTitle if missing + if person_id: + try: + person_details = so_client.get_person(person_id) + if person_details: + if not job_title: + job_title = person_details.get("JobTitle") or person_details.get("Title") + if not contact_id and "Contact" in person_details: + contact_id = person_details["Contact"].get("ContactId") + except Exception as e: + logger.warning(f"Failed to fetch person details for {person_id}: {e}") + + # 2. Noise Reduction Logic + if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): + logger.info(f"Skipping irrelevant event type: {job['event_type']}") + return "SUCCESS" + + changes = [c.lower() for c in payload.get("Changes", [])] + if changes: + relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"] + if settings.UDF_VERTICAL: + relevant_contact.append(settings.UDF_VERTICAL.lower()) + + relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"] + technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"] + actual_changes = [c for c in changes if c not in technical_fields] + + is_relevant = False + + if "contact" in event_low: + logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}") + if any(f in actual_changes for f in relevant_contact): + is_relevant = True + elif "urls" in actual_changes: + is_relevant = True + + if "person" in event_low: + logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}") + if any(f in actual_changes for f in relevant_person): + is_relevant = True + + if not is_relevant: + logger.info(f"Skipping technical/irrelevant changes: {changes}") + return "SUCCESS" + else: + logger.info("Change is deemed RELEVANT. Proceeding...") + if not contact_id: raise ValueError(f"Could not identify ContactId in payload: {payload}") - logger.info(f"Target: Person {person_id}, Contact {contact_id}") + logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}") # --- Cascading Logic --- - # If a company changes, we want to update all its persons eventually. if "contact" in event_low and not person_id: logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.") try: @@ -117,18 +121,15 @@ def process_job(job, so_client: SuperOfficeClient): if not crm_website and "Urls" in contact_details and contact_details["Urls"]: crm_website = contact_details["Urls"][0].get("Value") - # Extract Vertical (if set in SO) if settings.UDF_VERTICAL: udfs = contact_details.get("UserDefinedFields", {}) so_vertical_val = udfs.get(settings.UDF_VERTICAL) if so_vertical_val: - # Normalize "[I:23]" -> "23" val_str = str(so_vertical_val) if val_str.startswith("[I:"): val_str = val_str.split(":")[1].strip("]") - # Reverse Map ID -> Name try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) vertical_map_rev = {str(v): k for k, v in vertical_map.items()} @@ -141,15 +142,12 @@ def process_job(job, so_client: SuperOfficeClient): except Exception as e: logger.warning(f"Failed to fetch contact details for {contact_id}: {e}") - # --- 2. PREPARE UPDATES (Atomic Strategy) --- - # We will fetch the contact ONCE, calculate all needed changes (Standard + UDFs), - # and push them in a single operation if possible to avoid race conditions. - + # --- 3. Company Explorer Provisioning --- ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { "so_contact_id": contact_id, "so_person_id": person_id, - "job_title": payload.get("JobTitle"), + "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name @@ -188,9 +186,7 @@ def process_job(job, so_client: SuperOfficeClient): dirty_standard = False dirty_udfs = False - # Ensure nested dicts exist if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {} - if "PostalAddress" not in contact_data or contact_data["PostalAddress"] is None: contact_data["PostalAddress"] = {} # --- A. Vertical Sync --- vertical_name = provisioning_data.get("vertical_name") @@ -201,7 +197,6 @@ def process_job(job, so_client: SuperOfficeClient): if vertical_id: udf_key = settings.UDF_VERTICAL current_val = contact_data["UserDefinedFields"].get(udf_key, "") - # Normalize "[I:26]" -> "26" if current_val and str(current_val).startswith("[I:"): current_val = str(current_val).split(":")[1].strip("]") @@ -218,9 +213,7 @@ def process_job(job, so_client: SuperOfficeClient): ce_zip = provisioning_data.get("address_zip") ce_vat = provisioning_data.get("vat_id") - # Check if ANY address component is present if ce_city or ce_street or ce_zip: - # Initialize Address object if missing if "Address" not in contact_data or contact_data["Address"] is None: contact_data["Address"] = {"Street": {}, "Postal": {}} @@ -228,11 +221,9 @@ def process_job(job, so_client: SuperOfficeClient): if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {} if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {} - # Update Helper def update_addr_field(field_name, new_val, log_name): nonlocal dirty_standard if new_val: - # Sync to both Postal and Street for best visibility for type_key in ["Postal", "Street"]: cur = addr_obj[type_key].get(field_name, "") if cur != new_val: @@ -245,7 +236,6 @@ def process_job(job, so_client: SuperOfficeClient): update_addr_field("Zipcode", ce_zip, "Zip") if ce_vat: - # Field is 'OrgNr' in WebAPI, not 'OrgNumber' current_vat = contact_data.get("OrgNr", "") if current_vat != ce_vat: logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'") @@ -272,87 +262,34 @@ def process_job(job, so_client: SuperOfficeClient): # --- D. Apply Updates (Single Transaction) --- if dirty_standard or dirty_udfs: - logger.info(f"Pushing combined updates for Contact {contact_id} (Standard={dirty_standard}, UDFs={dirty_udfs})...") + logger.info(f"Pushing combined updates for Contact {contact_id}...") try: - # We PUT the whole modified contact object back - # This handles both standard fields and UDFs in one atomic-ish go so_client._put(f"Contact/{contact_id}", contact_data) logger.info("✅ Contact Update Successful.") except Exception as e: logger.error(f"Failed to update Contact {contact_id}: {e}") raise - else: - logger.info("No changes needed for Contact.") - - # 2d. Sync Person Position (Role) - if Person exists - # TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue) - # Re-enable via config if needed - if settings.ENABLE_WEBSITE_SYNC: - website = provisioning_data.get("website") - if website and website != "k.A.": - try: - contact_data = so_client.get_contact(contact_id) - current_url = contact_data.get("UrlAddress", "") - - def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else "" - - if norm(current_url) != norm(website): - logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}") - - # Update Urls collection (Rank 1) - new_urls = [] - if "Urls" in contact_data: - found = False - for u in contact_data["Urls"]: - if u.get("Rank") == 1: - u["Value"] = website - found = True - new_urls.append(u) - if not found: - new_urls.append({"Value": website, "Rank": 1, "Description": "Website"}) - contact_data["Urls"] = new_urls - else: - contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}] - - if not current_url: - contact_data["UrlAddress"] = website - - so_client._put(f"Contact/{contact_id}", contact_data) - else: - logger.info(f"Website for Contact {contact_id} already in sync.") - except Exception as e: - logger.error(f"Failed to sync website for Contact {contact_id}: {e}") # 2d. Sync Person Position (Role) - if Person exists role_name = provisioning_data.get("role_name") if person_id and role_name: try: persona_map = json.loads(settings.PERSONA_MAP_JSON) - except: - persona_map = {} - logger.error("Failed to parse PERSONA_MAP_JSON from config.") - - position_id = persona_map.get(role_name) - - if position_id: - logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}") - try: - success = so_client.update_person_position(person_id, int(position_id)) - if not success: - logger.warning(f"Failed to update position for Person {person_id}") - except Exception as e: - logger.error(f"Error syncing position for Person {person_id}: {e}") - else: - logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.") + position_id = persona_map.get(role_name) + if position_id: + logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}") + so_client.update_person_position(person_id, int(position_id)) + except Exception as e: + logger.error(f"Error syncing position for Person {person_id}: {e}") # 3. Update SuperOffice Texts (Only if person_id is present) if not person_id: - logger.info("Sync complete (Company only). No texts to write back.") + logger.info("Sync complete (Company only).") return "SUCCESS" texts = provisioning_data.get("texts", {}) if not any(texts.values()): - logger.info("No texts returned from Matrix (yet). Skipping write-back.") + logger.info("No texts returned from Matrix yet.") return "SUCCESS" udf_update = {} @@ -361,7 +298,6 @@ def process_job(job, so_client: SuperOfficeClient): if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"] if udf_update: - # Loop Prevention try: current_person = so_client.get_person(person_id) current_udfs = current_person.get("UserDefinedFields", {}) @@ -371,16 +307,61 @@ def process_job(job, so_client: SuperOfficeClient): needs_update = True break - if needs_update: - logger.info(f"Applying update to Person {person_id} (Changes detected).") - success = so_client.update_entity_udfs(person_id, "Person", udf_update) - if not success: - raise Exception("Failed to update SuperOffice UDFs") + # Simulation Trigger: Either texts changed, OR it's a direct manual trigger + if needs_update or (person_id and not "Source" in payload): + if needs_update: + logger.info(f"Applying text update to Person {person_id}.") + so_client.update_entity_udfs(person_id, "Person", udf_update) + else: + logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.") + + # --- 4. Create Email Simulation Appointment --- + try: + opener = provisioning_data.get("opener", "") + intro = texts.get("intro", "") + proof = texts.get("social_proof", "") + subject = texts.get("subject", "No Subject") + + salutation = "Hallo" + p_data = so_client.get_person(person_id) + if p_data: + fname = p_data.get("Firstname", "") + lname = p_data.get("Lastname", "") + if fname or lname: + salutation = f"Hallo {fname} {lname}".strip() + + cta = ( + "H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n" + "Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)" + ) + + email_body = ( + f"{salutation},\n\n" + f"{opener.strip()}\n\n" + f"{intro.strip()}\n\n" + f"{cta.strip()}\n\n" + f"{proof.strip()}\n\n" + "(Generated via Gemini Marketing Engine)" + ) + + from datetime import datetime + now_str = datetime.now().strftime("%H:%M") + appt_title = f"[{now_str}] KI: {subject}" + + so_client.create_appointment( + subject=appt_title, + description=email_body, + contact_id=contact_id, + person_id=person_id + ) + except Exception as e: + logger.error(f"Failed to create email simulation appointment: {e}") + else: - logger.info(f"Skipping update for Person {person_id}: Values match (Loop Prevention).") + logger.info(f"Skipping update for Person {person_id}: Values match.") except Exception as e: - logger.error(f"Error during pre-update check: {e}") + logger.error(f"Error during Person update: {e}") raise logger.info("Job successfully processed.") @@ -388,20 +369,16 @@ def process_job(job, so_client: SuperOfficeClient): def run_worker(): queue = JobQueue() - - # Initialize SO Client with retry so_client = None while not so_client: try: so_client = SuperOfficeClient() - if not so_client.access_token: # Check if auth worked - raise Exception("Auth failed") + if not so_client.access_token: raise Exception("Auth failed") except Exception as e: logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...") time.sleep(30) logger.info("Worker started. Polling queue...") - while True: try: job = queue.get_next_job() @@ -417,10 +394,9 @@ def run_worker(): queue.fail_job(job['id'], str(e)) else: time.sleep(POLL_INTERVAL) - except Exception as e: logger.error(f"Worker loop error: {e}") time.sleep(POLL_INTERVAL) if __name__ == "__main__": - run_worker() \ No newline at end of file + run_worker() diff --git a/fix_mappings_v2.py b/fix_mappings_v2.py new file mode 100644 index 00000000..a85fc466 --- /dev/null +++ b/fix_mappings_v2.py @@ -0,0 +1,23 @@ +import sqlite3 + +def fix_mappings(): + conn = sqlite3.connect('/app/companies_v3_fixed_2.db') + cursor = conn.cursor() + + # Neue Mappings für Geschäftsleitung und Verallgemeinerung + new_rules = [ + ('%leitung%', 'Wirtschaftlicher Entscheider'), + ('%vorstand%', 'Wirtschaftlicher Entscheider'), + ('%geschäftsleitung%', 'Wirtschaftlicher Entscheider'), + ('%management%', 'Wirtschaftlicher Entscheider') + ] + + for pattern, role in new_rules: + cursor.execute("INSERT OR REPLACE INTO job_role_mappings (pattern, role, created_at) VALUES (?, ?, '2026-02-22T15:30:00')", (pattern, role)) + + conn.commit() + conn.close() + print("Mappings updated for Geschäftsleitung, Vorstand, Management.") + +if __name__ == "__main__": + fix_mappings()