[30e88f42] Einfügen
Einfügen
This commit is contained in:
@@ -1 +1 @@
|
|||||||
{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-22T08:20:16.802201"}
|
{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-22T14:30:47.658775"}
|
||||||
@@ -258,4 +258,29 @@ Die Implementierung der v3.0-Logik war von mehreren hartnäckigen Problemen gepr
|
|||||||
* **Lösung:** Robuste Prüfung auf `None` vor der String-Manipulation (`(value or "").lower()`) implementiert.
|
* **Lösung:** Robuste Prüfung auf `None` vor der String-Manipulation (`(value or "").lower()`) implementiert.
|
||||||
* **Test:** Ein vollständiger E2E-Test (`test_e2e_full_flow.py`) wurde etabliert, der Provisioning, Analyse und Opener-Generierung automatisiert verifiziert.
|
* **Test:** Ein vollständiger E2E-Test (`test_e2e_full_flow.py`) wurde etabliert, der Provisioning, Analyse und Opener-Generierung automatisiert verifiziert.
|
||||||
|
|
||||||
Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik.
|
Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik.
|
||||||
|
|
||||||
|
### 17.5 Lessons Learned: SuperOffice Address Sync (Feb 22, 2026)
|
||||||
|
|
||||||
|
Die Synchronisation von Stammdaten (Adresse, VAT) erforderte ein tiefes Eintauchen in die API-Struktur.
|
||||||
|
|
||||||
|
1. **Field Naming:** Die REST-API verlangt strikt `OrgNr` für die Umsatzsteuer-ID, nicht `OrgNumber` oder `VatNo`.
|
||||||
|
2. **Nested Updates:** Adressen müssen tief verschachtelt übergeben werden (`Address.Postal.City`), nicht flach (`PostalAddress`).
|
||||||
|
3. **Atomic Strategy:** Getrennte Updates für UDFs und Standardfelder führen zu Race Conditions. **Nur ein gebündelter PUT-Request** auf den Haupt-Endpunkt garantiert, dass keine Daten (durch veraltete Reads) überschrieben werden.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 18. Next Steps & Todos (Post-Migration)
|
||||||
|
|
||||||
|
Nach Abschluss der Kern-Migration stehen folgende Optimierungen an:
|
||||||
|
|
||||||
|
### Task 1: Monitoring & Alerting
|
||||||
|
* **Dashboards:** Ausbau des Connector-Dashboards (`/connector/dashboard`) um Fehler-Statistiken und Retry-Logik.
|
||||||
|
* **Alerting:** Benachrichtigung (z.B. Slack/Teams) bei wiederholten Sync-Fehlern.
|
||||||
|
|
||||||
|
### Task 2: Robust Address Parsing
|
||||||
|
* **Scraper:** Derzeit verlässt sich der Scraper auf das LLM für die Adress-Extraktion. Eine Validierung gegen Google Maps API oder PLZ-Verzeichnisse würde die Datenqualität ("Golden Record") massiv erhöhen.
|
||||||
|
|
||||||
|
### Task 3: "Person-First" Logic
|
||||||
|
* **Aktuell:** Trigger ist meist das Unternehmen.
|
||||||
|
* **Zukunft:** Wenn eine Person ohne Firma angelegt wird, sollte der CE proaktiv die Domain der E-Mail-Adresse nutzen, um das Unternehmen im Hintergrund zu suchen und anzulegen ("Reverse Lookup").
|
||||||
12
add_mapping.py
Normal file
12
add_mapping.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
import sqlite3
|
||||||
|
|
||||||
|
def add_mapping():
|
||||||
|
conn = sqlite3.connect('/app/companies_v3_fixed_2.db')
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("INSERT INTO job_role_mappings (pattern, role, created_at) VALUES ('%geschäftsführung%', 'Wirtschaftlicher Entscheider', '2026-02-22T14:30:00')")
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
print("Added mapping for geschäftsführung")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
add_mapping()
|
||||||
14
check_mappings.py
Normal file
14
check_mappings.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
import sqlite3
|
||||||
|
|
||||||
|
def check_mappings():
|
||||||
|
conn = sqlite3.connect('/app/companies_v3_fixed_2.db')
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("SELECT * FROM job_role_mappings")
|
||||||
|
rows = cursor.fetchall()
|
||||||
|
print("--- Job Role Mappings ---")
|
||||||
|
for row in rows:
|
||||||
|
print(row)
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
check_mappings()
|
||||||
@@ -243,41 +243,45 @@ def provision_superoffice_contact(
|
|||||||
|
|
||||||
# 1c. Update CRM Snapshot Data (The Double Truth)
|
# 1c. Update CRM Snapshot Data (The Double Truth)
|
||||||
changed = False
|
changed = False
|
||||||
if req.crm_name:
|
name_changed_significantly = False
|
||||||
company.crm_name = req.crm_name
|
|
||||||
changed = True
|
|
||||||
if req.crm_website:
|
|
||||||
company.crm_website = req.crm_website
|
|
||||||
changed = True
|
|
||||||
|
|
||||||
# NEW: Handle Vertical Override from SuperOffice
|
if req.crm_name and req.crm_name != company.crm_name:
|
||||||
if req.crm_industry_name:
|
logger.info(f"CRM Name Change detected for ID {company.crm_id}: {company.crm_name} -> {req.crm_name}")
|
||||||
# Check if valid industry
|
company.crm_name = req.crm_name
|
||||||
valid_industry = db.query(Industry).filter(Industry.name == req.crm_industry_name).first()
|
# If the name changes, we should potentially re-evaluate the whole company
|
||||||
if valid_industry:
|
# especially if the status was already ENRICHED
|
||||||
if company.industry_ai != req.crm_industry_name:
|
if company.status == "ENRICHED":
|
||||||
logger.info(f"Overriding Industry for {company.name}: {company.industry_ai} -> {req.crm_industry_name} (from CRM)")
|
name_changed_significantly = True
|
||||||
company.industry_ai = req.crm_industry_name
|
changed = True
|
||||||
# Trigger metric re-extraction? Maybe later. For now, just update.
|
|
||||||
changed = True
|
if req.crm_website:
|
||||||
else:
|
if company.crm_website != req.crm_website:
|
||||||
logger.warning(f"CRM provided industry '{req.crm_industry_name}' not found in DB. Ignoring.")
|
company.crm_website = req.crm_website
|
||||||
|
|
||||||
# Simple Mismatch Check
|
|
||||||
if company.website and company.crm_website:
|
|
||||||
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").replace("www.", "").strip("/")
|
|
||||||
if norm(company.website) != norm(company.crm_website):
|
|
||||||
company.data_mismatch_score = 0.8 # High mismatch
|
|
||||||
changed = True
|
changed = True
|
||||||
else:
|
|
||||||
if company.data_mismatch_score != 0.0:
|
# ...
|
||||||
company.data_mismatch_score = 0.0
|
|
||||||
changed = True
|
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
company.updated_at = datetime.utcnow()
|
company.updated_at = datetime.utcnow()
|
||||||
|
if name_changed_significantly:
|
||||||
|
logger.info(f"Triggering FRESH discovery for {company.name} due to CRM name change.")
|
||||||
|
company.status = "NEW"
|
||||||
|
# We don't change the internal 'name' yet, Discovery will do that or we keep it as anchor.
|
||||||
|
# But we must clear old results to avoid stale data.
|
||||||
|
company.industry_ai = None
|
||||||
|
company.ai_opener = None
|
||||||
|
company.ai_opener_secondary = None
|
||||||
|
background_tasks.add_task(run_discovery_task, company.id)
|
||||||
|
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
# If we just triggered a fresh discovery, tell the worker to wait.
|
||||||
|
if name_changed_significantly:
|
||||||
|
return ProvisioningResponse(
|
||||||
|
status="processing",
|
||||||
|
company_name=company.crm_name
|
||||||
|
)
|
||||||
|
|
||||||
# 2. Find Contact (Person)
|
# 2. Find Contact (Person)
|
||||||
if req.so_person_id is None:
|
if req.so_person_id is None:
|
||||||
# Just a company sync, but return all company-level metadata
|
# Just a company sync, but return all company-level metadata
|
||||||
@@ -316,13 +320,14 @@ def provision_superoffice_contact(
|
|||||||
mappings = db.query(JobRoleMapping).all()
|
mappings = db.query(JobRoleMapping).all()
|
||||||
found_role = None
|
found_role = None
|
||||||
for m in mappings:
|
for m in mappings:
|
||||||
# Check pattern type (Regex vs Simple) - simplified here
|
|
||||||
pattern_clean = m.pattern.replace("%", "").lower()
|
pattern_clean = m.pattern.replace("%", "").lower()
|
||||||
if pattern_clean in req.job_title.lower():
|
if pattern_clean in req.job_title.lower():
|
||||||
found_role = m.role
|
found_role = m.role
|
||||||
break
|
break
|
||||||
|
|
||||||
if found_role:
|
# ALWAYS update role, even if to None, to avoid 'sticking' old roles
|
||||||
|
if found_role != person.role:
|
||||||
|
logger.info(f"Role Change for {person.so_person_id}: {person.role} -> {found_role}")
|
||||||
person.role = found_role
|
person.role = found_role
|
||||||
|
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|||||||
@@ -130,6 +130,22 @@ Die `config.py` wurde auf natives Python (`os.getenv`) umgestellt, um Konflikte
|
|||||||
Der Worker wurde erweitert, um auch `City` und `OrgNumber` (VAT) zurückzuschreiben.
|
Der Worker wurde erweitert, um auch `City` und `OrgNumber` (VAT) zurückzuschreiben.
|
||||||
**Status (21.02.2026):** Implementiert, aber noch im Feinschliff. Logs zeigen teils Re-Queueing während das Enrichment läuft.
|
**Status (21.02.2026):** Implementiert, aber noch im Feinschliff. Logs zeigen teils Re-Queueing während das Enrichment läuft.
|
||||||
|
|
||||||
|
### 8. Lessons Learned: Address & VAT Sync (Solved Feb 22, 2026)
|
||||||
|
|
||||||
|
Die Synchronisation von Adressdaten stellte sich als unerwartet komplex heraus. Hier die wichtigsten Erkenntnisse für zukünftige Entwickler:
|
||||||
|
|
||||||
|
1. **Das "OrgNumber"-Phantom:**
|
||||||
|
* **Problem:** In der API-Dokumentation oft als `OrgNumber` referenziert, akzeptiert die WebAPI (REST) strikt nur **`OrgNr`**.
|
||||||
|
* **Lösung:** Mapping im `worker.py` hart auf `OrgNr` umgestellt.
|
||||||
|
|
||||||
|
2. **Verschachtelte Adress-Struktur:**
|
||||||
|
* **Problem:** Ein flaches Update auf `PostalAddress` wird von der API stillschweigend ignoriert (HTTP 200, aber keine Änderung).
|
||||||
|
* **Lösung:** Das Update muss die tiefe Struktur respektieren: `Address["Postal"]["City"]` UND `Address["Street"]["City"]`. Beide müssen explizit gesetzt werden, um in der UI sichtbar zu sein.
|
||||||
|
|
||||||
|
3. **Die "Race Condition" Falle (Atomic Updates):**
|
||||||
|
* **Problem:** Wenn Adress-Daten (`PUT Contact`) und UDF-Daten (`PUT Contact/Udef`) in separaten API-Aufrufen kurz hintereinander gesendet werden, gewinnt der letzte Call. Da dieser oft auf einem *veralteten* `GET`-Stand basiert (bevor das erste Update durch war), wurde die Adresse wieder mit "Leer" überschrieben.
|
||||||
|
* **Lösung:** **Atomic Update Strategy**. Der Worker sammelt *alle* Änderungen (Adresse, VAT, Vertical, Openers) in einem einzigen Dictionary und sendet genau **einen** `PUT`-Request an den Kontakt-Endpunkt. Dies garantiert Konsistenz.
|
||||||
|
|
||||||
## Appendix: The "First Sentence" Prompt
|
## Appendix: The "First Sentence" Prompt
|
||||||
|
|
||||||
This is the core logic used to generate the company-specific opener.
|
This is the core logic used to generate the company-specific opener.
|
||||||
|
|||||||
18
connector-superoffice/debug_person_4.py
Normal file
18
connector-superoffice/debug_person_4.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Setup Paths
|
||||||
|
connector_dir = "/app/connector-superoffice"
|
||||||
|
sys.path.append(connector_dir)
|
||||||
|
|
||||||
|
from superoffice_client import SuperOfficeClient
|
||||||
|
|
||||||
|
def debug_person(person_id):
|
||||||
|
so_client = SuperOfficeClient()
|
||||||
|
person = so_client.get_person(person_id)
|
||||||
|
print("--- FULL PERSON DATA ---")
|
||||||
|
print(json.dumps(person, indent=2))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
debug_person(4)
|
||||||
@@ -157,17 +157,26 @@ class SuperOfficeClient:
|
|||||||
import datetime
|
import datetime
|
||||||
now = datetime.datetime.utcnow().isoformat() + "Z"
|
now = datetime.datetime.utcnow().isoformat() + "Z"
|
||||||
|
|
||||||
|
# SuperOffice UI limit: 42 chars.
|
||||||
|
# We put exactly this in the FIRST line of the description.
|
||||||
|
short_title = (subject[:39] + '...') if len(subject) > 42 else subject
|
||||||
|
|
||||||
|
# SuperOffice often 'steals' the first line of the description for the list view header.
|
||||||
|
# So we give it exactly the subject it wants, then two newlines for the real body.
|
||||||
|
formatted_description = f"{short_title}\n\n{description}"
|
||||||
|
|
||||||
payload = {
|
payload = {
|
||||||
"Description": f"{subject}\n\n{description}",
|
"Description": formatted_description,
|
||||||
"Contact": {"ContactId": contact_id},
|
"Contact": {"ContactId": contact_id},
|
||||||
"StartDate": now,
|
"StartDate": now,
|
||||||
"EndDate": now,
|
"EndDate": now,
|
||||||
"Task": {"Id": 1} # Usually 'Follow-up' or similar, depending on SO config
|
"MainHeader": short_title,
|
||||||
|
"Task": {"Id": 1}
|
||||||
}
|
}
|
||||||
if person_id:
|
if person_id:
|
||||||
payload["Person"] = {"PersonId": person_id}
|
payload["Person"] = {"PersonId": person_id}
|
||||||
|
|
||||||
print(f"Creating new appointment: {subject}...")
|
logger.info(f"Creating new appointment: {short_title}...")
|
||||||
return self._post("Appointment", payload)
|
return self._post("Appointment", payload)
|
||||||
|
|
||||||
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
|
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
|
||||||
|
|||||||
@@ -77,6 +77,17 @@ class MockSuperOfficeClient:
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def create_appointment(self, subject, description, contact_id, person_id=None):
|
||||||
|
if not hasattr(self, 'appointments'):
|
||||||
|
self.appointments = []
|
||||||
|
self.appointments.append({
|
||||||
|
"Subject": subject,
|
||||||
|
"Description": description,
|
||||||
|
"ContactId": contact_id,
|
||||||
|
"PersonId": person_id
|
||||||
|
})
|
||||||
|
return True
|
||||||
|
|
||||||
def search(self, query):
|
def search(self, query):
|
||||||
if "contact/contactId eq" in query:
|
if "contact/contactId eq" in query:
|
||||||
contact_id = int(query.split("eq")[1].strip())
|
contact_id = int(query.split("eq")[1].strip())
|
||||||
@@ -240,6 +251,13 @@ class TestE2EFlow(unittest.TestCase):
|
|||||||
udfs = self.mock_so_client.persons[500]["UserDefinedFields"]
|
udfs = self.mock_so_client.persons[500]["UserDefinedFields"]
|
||||||
self.assertEqual(udfs["SuperOffice:Subject"], "TEST SUBJECT LOGISTICS")
|
self.assertEqual(udfs["SuperOffice:Subject"], "TEST SUBJECT LOGISTICS")
|
||||||
|
|
||||||
|
# Verify Appointment (Simulation)
|
||||||
|
self.assertTrue(len(self.mock_so_client.appointments) > 0)
|
||||||
|
appt = self.mock_so_client.appointments[0]
|
||||||
|
self.assertIn("✉️ Entwurf: TEST SUBJECT LOGISTICS", appt["Subject"])
|
||||||
|
self.assertIn("TEST BRIDGE LOGISTICS", appt["Description"])
|
||||||
|
print(f"[TEST] Appointment created: {appt['Subject']}")
|
||||||
|
|
||||||
# --- Step 3: Vertical Change in SO (To Healthcare) ---
|
# --- Step 3: Vertical Change in SO (To Healthcare) ---
|
||||||
print("[TEST] Step 3: Change Vertical in SO...")
|
print("[TEST] Step 3: Change Vertical in SO...")
|
||||||
|
|
||||||
|
|||||||
@@ -25,46 +25,10 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
payload = job['payload']
|
payload = job['payload']
|
||||||
event_low = job['event_type'].lower()
|
event_low = job['event_type'].lower()
|
||||||
|
|
||||||
# 0. Fast-Fail on Irrelevant Events (Noise Reduction)
|
# 1. Extract IDs Early (Crucial for logging and logic)
|
||||||
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
|
||||||
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
|
|
||||||
return "SUCCESS"
|
|
||||||
|
|
||||||
# 0b. Fast-Fail on Irrelevant Field Changes
|
|
||||||
# Only if 'Changes' list is provided by Webhook
|
|
||||||
changes = [c.lower() for c in payload.get("Changes", [])]
|
|
||||||
if changes:
|
|
||||||
# Define what we care about (Strategic triggers for re-evaluation)
|
|
||||||
# Company: Name/Department (Identity), Urls (Source), Numbers (Matching)
|
|
||||||
relevant_contact = ["name", "department", "urladdress", "number1", "number2"]
|
|
||||||
# Add Vertical UDF to relevant changes
|
|
||||||
if settings.UDF_VERTICAL:
|
|
||||||
relevant_contact.append(settings.UDF_VERTICAL.lower())
|
|
||||||
# Also catch generic "userdefinedfields" if specific key not present
|
|
||||||
relevant_contact.append("userdefinedfields")
|
|
||||||
|
|
||||||
# Person: JobTitle (Persona Logic), Position (Role Logic)
|
|
||||||
relevant_person = ["jobtitle", "position"]
|
|
||||||
|
|
||||||
is_relevant = False
|
|
||||||
|
|
||||||
if "contact" in event_low:
|
|
||||||
if any(f in changes for f in relevant_contact):
|
|
||||||
is_relevant = True
|
|
||||||
elif "urls" in changes: # Website might be in Urls collection
|
|
||||||
is_relevant = True
|
|
||||||
|
|
||||||
if "person" in event_low:
|
|
||||||
if any(f in changes for f in relevant_person):
|
|
||||||
is_relevant = True
|
|
||||||
|
|
||||||
if not is_relevant:
|
|
||||||
logger.info(f"Skipping irrelevant changes: {changes}")
|
|
||||||
return "SUCCESS"
|
|
||||||
|
|
||||||
# 1. Extract IDs from Webhook Payload
|
|
||||||
person_id = None
|
person_id = None
|
||||||
contact_id = None
|
contact_id = None
|
||||||
|
job_title = payload.get("JobTitle")
|
||||||
|
|
||||||
if "PersonId" in payload:
|
if "PersonId" in payload:
|
||||||
person_id = int(payload["PersonId"])
|
person_id = int(payload["PersonId"])
|
||||||
@@ -76,19 +40,59 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
elif "PrimaryKey" in payload and "contact" in event_low:
|
elif "PrimaryKey" in payload and "contact" in event_low:
|
||||||
contact_id = int(payload["PrimaryKey"])
|
contact_id = int(payload["PrimaryKey"])
|
||||||
|
|
||||||
# Fallback/Deep Lookup
|
# Fallback/Deep Lookup & Fetch JobTitle if missing
|
||||||
if not contact_id and person_id:
|
if person_id:
|
||||||
person_data = so_client.get_person(person_id)
|
try:
|
||||||
if person_data and "Contact" in person_data:
|
person_details = so_client.get_person(person_id)
|
||||||
contact_id = person_data["Contact"].get("ContactId")
|
if person_details:
|
||||||
|
if not job_title:
|
||||||
|
job_title = person_details.get("JobTitle") or person_details.get("Title")
|
||||||
|
if not contact_id and "Contact" in person_details:
|
||||||
|
contact_id = person_details["Contact"].get("ContactId")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
||||||
|
|
||||||
|
# 2. Noise Reduction Logic
|
||||||
|
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
||||||
|
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
|
||||||
|
return "SUCCESS"
|
||||||
|
|
||||||
|
changes = [c.lower() for c in payload.get("Changes", [])]
|
||||||
|
if changes:
|
||||||
|
relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"]
|
||||||
|
if settings.UDF_VERTICAL:
|
||||||
|
relevant_contact.append(settings.UDF_VERTICAL.lower())
|
||||||
|
|
||||||
|
relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"]
|
||||||
|
technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"]
|
||||||
|
actual_changes = [c for c in changes if c not in technical_fields]
|
||||||
|
|
||||||
|
is_relevant = False
|
||||||
|
|
||||||
|
if "contact" in event_low:
|
||||||
|
logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}")
|
||||||
|
if any(f in actual_changes for f in relevant_contact):
|
||||||
|
is_relevant = True
|
||||||
|
elif "urls" in actual_changes:
|
||||||
|
is_relevant = True
|
||||||
|
|
||||||
|
if "person" in event_low:
|
||||||
|
logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}")
|
||||||
|
if any(f in actual_changes for f in relevant_person):
|
||||||
|
is_relevant = True
|
||||||
|
|
||||||
|
if not is_relevant:
|
||||||
|
logger.info(f"Skipping technical/irrelevant changes: {changes}")
|
||||||
|
return "SUCCESS"
|
||||||
|
else:
|
||||||
|
logger.info("Change is deemed RELEVANT. Proceeding...")
|
||||||
|
|
||||||
if not contact_id:
|
if not contact_id:
|
||||||
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
||||||
|
|
||||||
logger.info(f"Target: Person {person_id}, Contact {contact_id}")
|
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
|
||||||
|
|
||||||
# --- Cascading Logic ---
|
# --- Cascading Logic ---
|
||||||
# If a company changes, we want to update all its persons eventually.
|
|
||||||
if "contact" in event_low and not person_id:
|
if "contact" in event_low and not person_id:
|
||||||
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
|
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
|
||||||
try:
|
try:
|
||||||
@@ -117,18 +121,15 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
|
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
|
||||||
crm_website = contact_details["Urls"][0].get("Value")
|
crm_website = contact_details["Urls"][0].get("Value")
|
||||||
|
|
||||||
# Extract Vertical (if set in SO)
|
|
||||||
if settings.UDF_VERTICAL:
|
if settings.UDF_VERTICAL:
|
||||||
udfs = contact_details.get("UserDefinedFields", {})
|
udfs = contact_details.get("UserDefinedFields", {})
|
||||||
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
||||||
|
|
||||||
if so_vertical_val:
|
if so_vertical_val:
|
||||||
# Normalize "[I:23]" -> "23"
|
|
||||||
val_str = str(so_vertical_val)
|
val_str = str(so_vertical_val)
|
||||||
if val_str.startswith("[I:"):
|
if val_str.startswith("[I:"):
|
||||||
val_str = val_str.split(":")[1].strip("]")
|
val_str = val_str.split(":")[1].strip("]")
|
||||||
|
|
||||||
# Reverse Map ID -> Name
|
|
||||||
try:
|
try:
|
||||||
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
||||||
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
|
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
|
||||||
@@ -141,15 +142,12 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
|
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
|
||||||
|
|
||||||
# --- 2. PREPARE UPDATES (Atomic Strategy) ---
|
# --- 3. Company Explorer Provisioning ---
|
||||||
# We will fetch the contact ONCE, calculate all needed changes (Standard + UDFs),
|
|
||||||
# and push them in a single operation if possible to avoid race conditions.
|
|
||||||
|
|
||||||
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
||||||
ce_req = {
|
ce_req = {
|
||||||
"so_contact_id": contact_id,
|
"so_contact_id": contact_id,
|
||||||
"so_person_id": person_id,
|
"so_person_id": person_id,
|
||||||
"job_title": payload.get("JobTitle"),
|
"job_title": job_title,
|
||||||
"crm_name": crm_name,
|
"crm_name": crm_name,
|
||||||
"crm_website": crm_website,
|
"crm_website": crm_website,
|
||||||
"crm_industry_name": crm_industry_name
|
"crm_industry_name": crm_industry_name
|
||||||
@@ -188,9 +186,7 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
dirty_standard = False
|
dirty_standard = False
|
||||||
dirty_udfs = False
|
dirty_udfs = False
|
||||||
|
|
||||||
# Ensure nested dicts exist
|
|
||||||
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
|
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
|
||||||
if "PostalAddress" not in contact_data or contact_data["PostalAddress"] is None: contact_data["PostalAddress"] = {}
|
|
||||||
|
|
||||||
# --- A. Vertical Sync ---
|
# --- A. Vertical Sync ---
|
||||||
vertical_name = provisioning_data.get("vertical_name")
|
vertical_name = provisioning_data.get("vertical_name")
|
||||||
@@ -201,7 +197,6 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
if vertical_id:
|
if vertical_id:
|
||||||
udf_key = settings.UDF_VERTICAL
|
udf_key = settings.UDF_VERTICAL
|
||||||
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
|
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
|
||||||
# Normalize "[I:26]" -> "26"
|
|
||||||
if current_val and str(current_val).startswith("[I:"):
|
if current_val and str(current_val).startswith("[I:"):
|
||||||
current_val = str(current_val).split(":")[1].strip("]")
|
current_val = str(current_val).split(":")[1].strip("]")
|
||||||
|
|
||||||
@@ -218,9 +213,7 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
ce_zip = provisioning_data.get("address_zip")
|
ce_zip = provisioning_data.get("address_zip")
|
||||||
ce_vat = provisioning_data.get("vat_id")
|
ce_vat = provisioning_data.get("vat_id")
|
||||||
|
|
||||||
# Check if ANY address component is present
|
|
||||||
if ce_city or ce_street or ce_zip:
|
if ce_city or ce_street or ce_zip:
|
||||||
# Initialize Address object if missing
|
|
||||||
if "Address" not in contact_data or contact_data["Address"] is None:
|
if "Address" not in contact_data or contact_data["Address"] is None:
|
||||||
contact_data["Address"] = {"Street": {}, "Postal": {}}
|
contact_data["Address"] = {"Street": {}, "Postal": {}}
|
||||||
|
|
||||||
@@ -228,11 +221,9 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
|
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
|
||||||
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
|
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
|
||||||
|
|
||||||
# Update Helper
|
|
||||||
def update_addr_field(field_name, new_val, log_name):
|
def update_addr_field(field_name, new_val, log_name):
|
||||||
nonlocal dirty_standard
|
nonlocal dirty_standard
|
||||||
if new_val:
|
if new_val:
|
||||||
# Sync to both Postal and Street for best visibility
|
|
||||||
for type_key in ["Postal", "Street"]:
|
for type_key in ["Postal", "Street"]:
|
||||||
cur = addr_obj[type_key].get(field_name, "")
|
cur = addr_obj[type_key].get(field_name, "")
|
||||||
if cur != new_val:
|
if cur != new_val:
|
||||||
@@ -245,7 +236,6 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
update_addr_field("Zipcode", ce_zip, "Zip")
|
update_addr_field("Zipcode", ce_zip, "Zip")
|
||||||
|
|
||||||
if ce_vat:
|
if ce_vat:
|
||||||
# Field is 'OrgNr' in WebAPI, not 'OrgNumber'
|
|
||||||
current_vat = contact_data.get("OrgNr", "")
|
current_vat = contact_data.get("OrgNr", "")
|
||||||
if current_vat != ce_vat:
|
if current_vat != ce_vat:
|
||||||
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
|
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
|
||||||
@@ -272,87 +262,34 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
|
|
||||||
# --- D. Apply Updates (Single Transaction) ---
|
# --- D. Apply Updates (Single Transaction) ---
|
||||||
if dirty_standard or dirty_udfs:
|
if dirty_standard or dirty_udfs:
|
||||||
logger.info(f"Pushing combined updates for Contact {contact_id} (Standard={dirty_standard}, UDFs={dirty_udfs})...")
|
logger.info(f"Pushing combined updates for Contact {contact_id}...")
|
||||||
try:
|
try:
|
||||||
# We PUT the whole modified contact object back
|
|
||||||
# This handles both standard fields and UDFs in one atomic-ish go
|
|
||||||
so_client._put(f"Contact/{contact_id}", contact_data)
|
so_client._put(f"Contact/{contact_id}", contact_data)
|
||||||
logger.info("✅ Contact Update Successful.")
|
logger.info("✅ Contact Update Successful.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to update Contact {contact_id}: {e}")
|
logger.error(f"Failed to update Contact {contact_id}: {e}")
|
||||||
raise
|
raise
|
||||||
else:
|
|
||||||
logger.info("No changes needed for Contact.")
|
|
||||||
|
|
||||||
# 2d. Sync Person Position (Role) - if Person exists
|
|
||||||
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
|
|
||||||
# Re-enable via config if needed
|
|
||||||
if settings.ENABLE_WEBSITE_SYNC:
|
|
||||||
website = provisioning_data.get("website")
|
|
||||||
if website and website != "k.A.":
|
|
||||||
try:
|
|
||||||
contact_data = so_client.get_contact(contact_id)
|
|
||||||
current_url = contact_data.get("UrlAddress", "")
|
|
||||||
|
|
||||||
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
|
|
||||||
|
|
||||||
if norm(current_url) != norm(website):
|
|
||||||
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
|
|
||||||
|
|
||||||
# Update Urls collection (Rank 1)
|
|
||||||
new_urls = []
|
|
||||||
if "Urls" in contact_data:
|
|
||||||
found = False
|
|
||||||
for u in contact_data["Urls"]:
|
|
||||||
if u.get("Rank") == 1:
|
|
||||||
u["Value"] = website
|
|
||||||
found = True
|
|
||||||
new_urls.append(u)
|
|
||||||
if not found:
|
|
||||||
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
|
|
||||||
contact_data["Urls"] = new_urls
|
|
||||||
else:
|
|
||||||
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
|
|
||||||
|
|
||||||
if not current_url:
|
|
||||||
contact_data["UrlAddress"] = website
|
|
||||||
|
|
||||||
so_client._put(f"Contact/{contact_id}", contact_data)
|
|
||||||
else:
|
|
||||||
logger.info(f"Website for Contact {contact_id} already in sync.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
|
|
||||||
|
|
||||||
# 2d. Sync Person Position (Role) - if Person exists
|
# 2d. Sync Person Position (Role) - if Person exists
|
||||||
role_name = provisioning_data.get("role_name")
|
role_name = provisioning_data.get("role_name")
|
||||||
if person_id and role_name:
|
if person_id and role_name:
|
||||||
try:
|
try:
|
||||||
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
||||||
except:
|
position_id = persona_map.get(role_name)
|
||||||
persona_map = {}
|
if position_id:
|
||||||
logger.error("Failed to parse PERSONA_MAP_JSON from config.")
|
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
|
||||||
|
so_client.update_person_position(person_id, int(position_id))
|
||||||
position_id = persona_map.get(role_name)
|
except Exception as e:
|
||||||
|
logger.error(f"Error syncing position for Person {person_id}: {e}")
|
||||||
if position_id:
|
|
||||||
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
|
|
||||||
try:
|
|
||||||
success = so_client.update_person_position(person_id, int(position_id))
|
|
||||||
if not success:
|
|
||||||
logger.warning(f"Failed to update position for Person {person_id}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error syncing position for Person {person_id}: {e}")
|
|
||||||
else:
|
|
||||||
logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.")
|
|
||||||
|
|
||||||
# 3. Update SuperOffice Texts (Only if person_id is present)
|
# 3. Update SuperOffice Texts (Only if person_id is present)
|
||||||
if not person_id:
|
if not person_id:
|
||||||
logger.info("Sync complete (Company only). No texts to write back.")
|
logger.info("Sync complete (Company only).")
|
||||||
return "SUCCESS"
|
return "SUCCESS"
|
||||||
|
|
||||||
texts = provisioning_data.get("texts", {})
|
texts = provisioning_data.get("texts", {})
|
||||||
if not any(texts.values()):
|
if not any(texts.values()):
|
||||||
logger.info("No texts returned from Matrix (yet). Skipping write-back.")
|
logger.info("No texts returned from Matrix yet.")
|
||||||
return "SUCCESS"
|
return "SUCCESS"
|
||||||
|
|
||||||
udf_update = {}
|
udf_update = {}
|
||||||
@@ -361,7 +298,6 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
||||||
|
|
||||||
if udf_update:
|
if udf_update:
|
||||||
# Loop Prevention
|
|
||||||
try:
|
try:
|
||||||
current_person = so_client.get_person(person_id)
|
current_person = so_client.get_person(person_id)
|
||||||
current_udfs = current_person.get("UserDefinedFields", {})
|
current_udfs = current_person.get("UserDefinedFields", {})
|
||||||
@@ -371,16 +307,61 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
needs_update = True
|
needs_update = True
|
||||||
break
|
break
|
||||||
|
|
||||||
if needs_update:
|
# Simulation Trigger: Either texts changed, OR it's a direct manual trigger
|
||||||
logger.info(f"Applying update to Person {person_id} (Changes detected).")
|
if needs_update or (person_id and not "Source" in payload):
|
||||||
success = so_client.update_entity_udfs(person_id, "Person", udf_update)
|
if needs_update:
|
||||||
if not success:
|
logger.info(f"Applying text update to Person {person_id}.")
|
||||||
raise Exception("Failed to update SuperOffice UDFs")
|
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
||||||
|
else:
|
||||||
|
logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.")
|
||||||
|
|
||||||
|
# --- 4. Create Email Simulation Appointment ---
|
||||||
|
try:
|
||||||
|
opener = provisioning_data.get("opener", "")
|
||||||
|
intro = texts.get("intro", "")
|
||||||
|
proof = texts.get("social_proof", "")
|
||||||
|
subject = texts.get("subject", "No Subject")
|
||||||
|
|
||||||
|
salutation = "Hallo"
|
||||||
|
p_data = so_client.get_person(person_id)
|
||||||
|
if p_data:
|
||||||
|
fname = p_data.get("Firstname", "")
|
||||||
|
lname = p_data.get("Lastname", "")
|
||||||
|
if fname or lname:
|
||||||
|
salutation = f"Hallo {fname} {lname}".strip()
|
||||||
|
|
||||||
|
cta = (
|
||||||
|
"H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n"
|
||||||
|
"Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)"
|
||||||
|
)
|
||||||
|
|
||||||
|
email_body = (
|
||||||
|
f"{salutation},\n\n"
|
||||||
|
f"{opener.strip()}\n\n"
|
||||||
|
f"{intro.strip()}\n\n"
|
||||||
|
f"{cta.strip()}\n\n"
|
||||||
|
f"{proof.strip()}\n\n"
|
||||||
|
"(Generated via Gemini Marketing Engine)"
|
||||||
|
)
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
now_str = datetime.now().strftime("%H:%M")
|
||||||
|
appt_title = f"[{now_str}] KI: {subject}"
|
||||||
|
|
||||||
|
so_client.create_appointment(
|
||||||
|
subject=appt_title,
|
||||||
|
description=email_body,
|
||||||
|
contact_id=contact_id,
|
||||||
|
person_id=person_id
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create email simulation appointment: {e}")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info(f"Skipping update for Person {person_id}: Values match (Loop Prevention).")
|
logger.info(f"Skipping update for Person {person_id}: Values match.")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error during pre-update check: {e}")
|
logger.error(f"Error during Person update: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
logger.info("Job successfully processed.")
|
logger.info("Job successfully processed.")
|
||||||
@@ -388,20 +369,16 @@ def process_job(job, so_client: SuperOfficeClient):
|
|||||||
|
|
||||||
def run_worker():
|
def run_worker():
|
||||||
queue = JobQueue()
|
queue = JobQueue()
|
||||||
|
|
||||||
# Initialize SO Client with retry
|
|
||||||
so_client = None
|
so_client = None
|
||||||
while not so_client:
|
while not so_client:
|
||||||
try:
|
try:
|
||||||
so_client = SuperOfficeClient()
|
so_client = SuperOfficeClient()
|
||||||
if not so_client.access_token: # Check if auth worked
|
if not so_client.access_token: raise Exception("Auth failed")
|
||||||
raise Exception("Auth failed")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
|
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
|
||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
|
|
||||||
logger.info("Worker started. Polling queue...")
|
logger.info("Worker started. Polling queue...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
job = queue.get_next_job()
|
job = queue.get_next_job()
|
||||||
@@ -417,10 +394,9 @@ def run_worker():
|
|||||||
queue.fail_job(job['id'], str(e))
|
queue.fail_job(job['id'], str(e))
|
||||||
else:
|
else:
|
||||||
time.sleep(POLL_INTERVAL)
|
time.sleep(POLL_INTERVAL)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Worker loop error: {e}")
|
logger.error(f"Worker loop error: {e}")
|
||||||
time.sleep(POLL_INTERVAL)
|
time.sleep(POLL_INTERVAL)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
run_worker()
|
run_worker()
|
||||||
|
|||||||
23
fix_mappings_v2.py
Normal file
23
fix_mappings_v2.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
import sqlite3
|
||||||
|
|
||||||
|
def fix_mappings():
|
||||||
|
conn = sqlite3.connect('/app/companies_v3_fixed_2.db')
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Neue Mappings für Geschäftsleitung und Verallgemeinerung
|
||||||
|
new_rules = [
|
||||||
|
('%leitung%', 'Wirtschaftlicher Entscheider'),
|
||||||
|
('%vorstand%', 'Wirtschaftlicher Entscheider'),
|
||||||
|
('%geschäftsleitung%', 'Wirtschaftlicher Entscheider'),
|
||||||
|
('%management%', 'Wirtschaftlicher Entscheider')
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern, role in new_rules:
|
||||||
|
cursor.execute("INSERT OR REPLACE INTO job_role_mappings (pattern, role, created_at) VALUES (?, ?, '2026-02-22T15:30:00')", (pattern, role))
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
print("Mappings updated for Geschäftsleitung, Vorstand, Management.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
fix_mappings()
|
||||||
Reference in New Issue
Block a user