From 720ca0fc7208421179a4c49aba30b82d54425b43 Mon Sep 17 00:00:00 2001 From: Floke Date: Wed, 4 Mar 2026 18:41:35 +0000 Subject: [PATCH] [31188f42] Keine neuen Commits in dieser Session. Keine neuen Commits in dieser Session. --- .dev_session/SESSION_INFO | 2 +- .../tools/final_truth_check.py | 65 ++++++++++++++ connector-superoffice/tools/full_discovery.py | 54 ++++++++++++ connector-superoffice/webhook_app.py | 1 + connector-superoffice/worker.py | 84 +++++++++---------- 5 files changed, 161 insertions(+), 45 deletions(-) create mode 100644 connector-superoffice/tools/final_truth_check.py create mode 100644 connector-superoffice/tools/full_discovery.py diff --git a/.dev_session/SESSION_INFO b/.dev_session/SESSION_INFO index 37aad7e0..555fbea3 100644 --- a/.dev_session/SESSION_INFO +++ b/.dev_session/SESSION_INFO @@ -1 +1 @@ -{"task_id": "31188f42-8544-8074-bad3-d3e1b9b4051f", "token": "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", "readme_path": "connector-superoffice/README.md", "session_start_time": "2026-03-04T17:52:02.227148"} \ No newline at end of file +{"task_id": "31188f42-8544-8074-bad3-d3e1b9b4051f", "token": "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", "readme_path": "connector-superoffice/README.md", "session_start_time": "2026-03-04T18:41:33.912605"} \ No newline at end of file diff --git a/connector-superoffice/tools/final_truth_check.py b/connector-superoffice/tools/final_truth_check.py new file mode 100644 index 00000000..7cfaee76 --- /dev/null +++ b/connector-superoffice/tools/final_truth_check.py @@ -0,0 +1,65 @@ + +import json +import os + +def check_truth(): + file_path = "raw_api_response.json" + if not os.path.exists(file_path): + print(f"❌ Datei {file_path} nicht gefunden.") + return + + print(f"🧐 Analysiere {file_path}...") + try: + with open(file_path, "r") as f: + data = json.load(f) + + udfs = data.get("UserDefinedFields", {}) + print(f"✅ JSON erfolgreich geladen. UDFs gefunden: {len(udfs)}") + + invalid_keys = [] + for key in udfs.keys(): + if not isinstance(key, str): + invalid_keys.append((key, type(key))) + + if invalid_keys: + print(f"❌ FEHLER GEFUNDEN! Folgende Keys sind KEINE Strings:") + for k, t in invalid_keys: + print(f" - Key: {k}, Typ: {t}") + else: + print("✅ Alle Keys in UserDefinedFields sind valide Strings.") + + # Jetzt prüfen wir unsere eigenen Settings gegen dieses Dict + print("\n--- Teste Zugriff mit unseren Settings ---") + from dotenv import load_dotenv + import sys + + # Pfad zum Hauptverzeichnis für .env + dotenv_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '.env')) + load_dotenv(dotenv_path=dotenv_path, override=True) + + # Pfad für config import + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + from config import settings + + test_keys = { + "Vertical": settings.UDF_VERTICAL, + "Summary": settings.UDF_SUMMARY, + "Opener": settings.UDF_OPENER + } + + for name, key in test_keys.items(): + print(f"Prüfe {name} (Key: '{key}', Typ: {type(key)})...") + try: + # Das ist die Stelle, die vorhin zum Absturz führte + val = udfs.get(key) + print(f" -> Zugriff erfolgreich! Wert: {val}") + except TypeError as e: + print(f" -> ❌ ABSTURZ: {e}") + if isinstance(key, dict): + print(f" Grund: settings.UDF_{name.upper()} ist ein DICTIONARY!") + + except Exception as e: + print(f"❌ Allgemeiner Fehler bei der Analyse: {e}") + +if __name__ == "__main__": + check_truth() diff --git a/connector-superoffice/tools/full_discovery.py b/connector-superoffice/tools/full_discovery.py new file mode 100644 index 00000000..c5653271 --- /dev/null +++ b/connector-superoffice/tools/full_discovery.py @@ -0,0 +1,54 @@ + +import os +import sys +import json +from dotenv import load_dotenv + +# Explicitly load .env +dotenv_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '.env')) +load_dotenv(dotenv_path=dotenv_path, override=True) + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from superoffice_client import SuperOfficeClient + +def run_discovery(): + print(f"🚀 Running Full Discovery on PRODUCTION...") + try: + client = SuperOfficeClient() + if not client.access_token: return + + # 1. Check Contact UDFs + print("\n--- 🏢 CONTACT UDFs (ProgIDs) ---") + contact_meta = client._get("Contact/default") + if contact_meta and 'UserDefinedFields' in contact_meta: + udfs = contact_meta['UserDefinedFields'] + for key in sorted(udfs.keys()): + print(f" - {key}") + + # 2. Check Person UDFs + print("\n--- 👤 PERSON UDFs (ProgIDs) ---") + person_meta = client._get("Person/default") + if person_meta and 'UserDefinedFields' in person_meta: + udfs = person_meta['UserDefinedFields'] + for key in sorted(udfs.keys()): + print(f" - {key}") + + # 3. Check specific List IDs (e.g. Verticals) + # This often requires admin rights to see all list definitions + print("\n--- 📋 LIST CHECK (Verticals) ---") + # Assuming udlist331 is the list for Verticals (based on previous logs) + list_data = client._get("List/udlist331/Items") + if list_data and 'value' in list_data: + print(f"Found {len(list_data['value'])} items in Vertical list.") + for item in list_data['value'][:5]: + print(f" - ID {item['Id']}: {item['Name']}") + else: + print(" ⚠️ Could not access Vertical list items.") + + print("\n✅ Discovery Complete.") + + except Exception as e: + print(f"❌ Error: {e}") + +if __name__ == "__main__": + run_discovery() diff --git a/connector-superoffice/webhook_app.py b/connector-superoffice/webhook_app.py index 586eb567..6e51cb5c 100644 --- a/connector-superoffice/webhook_app.py +++ b/connector-superoffice/webhook_app.py @@ -101,6 +101,7 @@ def dashboard(): .status-PROCESSING { background: #1e40af; color: #bfdbfe; } .status-COMPLETED { background: #064e3b; color: #a7f3d0; } .status-FAILED { background: #7f1d1d; color: #fecaca; } + .status-SKIPPED { background: #475569; color: #cbd5e1; } .phases { display: flex; gap: 4px; align-items: center; } .phase { width: 12px; height: 12px; border-radius: 50%; background: #334155; border: 2px solid #1e293b; box-shadow: 0 0 0 1px #334155; } diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index c788afc9..b90edd72 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -36,62 +36,49 @@ def safe_get_udfs(entity_data): def process_job(job, so_client: SuperOfficeClient): """ Core logic for processing a single job. + Returns: (STATUS, MESSAGE) + STATUS: 'SUCCESS', 'SKIPPED', 'RETRY', 'FAILED' """ logger.info(f"--- [WORKER v1.8] Processing Job {job['id']} ({job['event_type']}) ---") payload = job['payload'] event_low = job['event_type'].lower() - # 0. Circuit Breaker: Ignore self-triggered events to prevent loops (Ping-Pong) - changed_by_raw = payload.get("ChangedByAssociateId") - logger.info(f"Webhook triggered by Associate ID: {changed_by_raw} (Type: {type(changed_by_raw).__name__})") - - try: - if changed_by_raw and int(changed_by_raw) == 528: - logger.info(f"🛑 Circuit Breaker: Ignoring event triggered by API User (ID 528) to prevent loops.") - return "SUCCESS" - except (ValueError, TypeError): - pass - - # 0b. Noise Reduction: Filter irrelevant field changes - # Only re-process if core data (Name, Website) or UDFs (Vertical) changed. + # 0. Noise Reduction: Filter irrelevant field changes if job['event_type'] == 'contact.changed': changes = payload.get('Changes', []) - # Normalize to lower case for comparison changes_lower = [str(c).lower() for c in changes] # Fields that trigger a re-analysis relevant_fields = [ - 'name', # Company Name change -> Full Re-Scan - 'urladdress', # Website change -> Full Re-Scan - 'urls', # Website array change - 'orgnr', # VAT/Register ID change - 'userdef_id', # UDFs (Verticals, etc.) changed - 'country_id' # Country change + 'name', 'urladdress', 'urls', 'orgnr', 'userdef_id', 'country_id' ] - # Check if ANY relevant field is in the changes list - is_relevant = any(field in changes_lower for field in relevant_fields) + # Identify which relevant field triggered the event + hit_fields = [f for f in relevant_fields if f in changes_lower] - if not is_relevant: - logger.info(f"⏭️ Skipping 'contact.changed': No relevant fields changed. (Changes: {changes})") - return "SUCCESS" + if not hit_fields: + msg = f"Skipping 'contact.changed': No relevant fields affected. (Changes: {changes})" + logger.info(f"⏭️ {msg}") + return ("SKIPPED", msg) + else: + logger.info(f"🎯 Relevant change detected in fields: {hit_fields}") if job['event_type'] == 'person.changed': changes = payload.get('Changes', []) changes_lower = [str(c).lower() for c in changes] relevant_person_fields = [ - 'jobtitle', # Job Title change - 'title', # Alternative title field - 'position_id', # Standard Position/Role dropdown - 'userdef_id' # UDFs (MA Status, Campaign, etc.) + 'jobtitle', 'title', 'position_id', 'userdef_id' ] - is_relevant = any(field in changes_lower for field in relevant_person_fields) + hit_fields = [f for f in relevant_person_fields if f in changes_lower] - if not is_relevant: - logger.info(f"⏭️ Skipping 'person.changed': No relevant fields changed. (Changes: {changes})") - return "SUCCESS" + if not hit_fields: + msg = f"Skipping 'person.changed': No relevant fields affected. (Changes: {changes})" + logger.info(f"⏭️ {msg}") + return ("SKIPPED", msg) + else: + logger.info(f"🎯 Relevant change detected in fields: {hit_fields}") # 1. Extract IDs Early person_id = None @@ -137,10 +124,11 @@ def process_job(job, so_client: SuperOfficeClient): except Exception as e: logger.warning(f"Failed to fetch person details for {person_id}: {e}") - # 2. Noise Reduction Logic + # 2. Noise Reduction Logic (Event Type) if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): - logger.info(f"Skipping irrelevant event type: {job['event_type']}") - return "SUCCESS" + msg = f"Skipping irrelevant event type: {job['event_type']}" + logger.info(msg) + return ("SKIPPED", msg) if not contact_id: raise ValueError(f"Could not identify ContactId in payload: {payload}") @@ -226,11 +214,11 @@ def process_job(job, so_client: SuperOfficeClient): try: resp = requests.post(ce_url, json=ce_req, auth=ce_auth) if resp.status_code == 404: - return "RETRY" + return ("RETRY", "CE returned 404") resp.raise_for_status() provisioning_data = resp.json() if provisioning_data.get("status") == "processing": - return "RETRY" + return ("RETRY", "CE is processing") except requests.exceptions.RequestException as e: raise Exception(f"Company Explorer API failed: {e}") @@ -238,7 +226,7 @@ def process_job(job, so_client: SuperOfficeClient): # Fetch fresh Contact Data for comparison contact_data = so_client.get_contact(contact_id) - if not contact_data: return "FAILED" + if not contact_data: return ("FAILED", "Could not fetch contact for patch") # SAFE GET FOR COMPARISON current_udfs = safe_get_udfs(contact_data) @@ -354,7 +342,7 @@ def process_job(job, so_client: SuperOfficeClient): except Exception as e: logger.error(f"Failed simulation: {e}") - return "SUCCESS" + return ("SUCCESS", None) def run_worker(): queue = JobQueue() @@ -373,10 +361,18 @@ def run_worker(): job = queue.get_next_job() if job: try: - result = process_job(job, so_client) - if result == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120) - elif result == "FAILED": queue.fail_job(job['id'], "Job failed status") - else: queue.complete_job(job['id']) + # process_job now returns a tuple (STATUS, MESSAGE) + status, msg = process_job(job, so_client) + + if status == "RETRY": + queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg) + elif status == "FAILED": + queue.fail_job(job['id'], msg or "Job failed status") + elif status == "SKIPPED": + queue.skip_job(job['id'], msg or "Skipped") + else: + queue.complete_job(job['id']) + except Exception as e: logger.error(f"Job {job['id']} failed: {e}", exc_info=True) queue.fail_job(job['id'], str(e))