5 Commits

Author SHA1 Message Date
f35a702216 [31e88f42] Erreicht: Die Stabilität des SuperOffice Connectors wurde maßgeblich verbessert, um Endlos-Schleifen bei der Job-Verarbeitung zu verhindern und die Dashboard-Anzeige zu optimieren. Die Verarbeitung relevanter Änderungen wurde präzisiert.
Erreicht: Die Stabilität des SuperOffice Connectors wurde maßgeblich verbessert, um Endlos-Schleifen bei der Job-Verarbeitung zu verhindern und die Dashboard-Anzeige zu optimieren. Die Verarbeitung relevanter Änderungen wurde präzisiert.
Details der Implementierung:
 * Stabile Job-Verarbeitung (Poison Pill): Ein Poison Pill-Mechanismus wurde in queue_manager.py eingeführt. Jobs werden nun nach maximal 5 fehlgeschlagenen Versuchen automatisch als FAILED markiert.
 * Robuste SuperOffice API-Client-Authentifizierung: Die Fehlerbehandlung im superoffice_client.py wurde gehärtet. Authentifizierungsfehler und andere kritische API-Probleme lösen jetzt spezifische Exceptions aus.
 * Behebung des Worker-Startfehlers: Ein ImportError (ContactNotFoundException) im Worker wurde behoben.
 * Präzise Trigger-Logik: Eine Neubewertung von Accounts wird jetzt nur noch bei Änderungen an den Feldern name, urladdress, urls oder dem Vertical-UDF (SuperOffice:83) ausgelöst.
 * Korrekte Datumsanzeige im Dashboard: Die Dashboard-Formatierungslogik wurde angepasst, um updated_at-Zeitstempel anzuzeigen.
2026-03-09 12:38:08 +00:00
93318bc38d Docs: Aktualisierung der Dokumentation für Task [31e88f42] 2026-03-09 12:38:08 +00:00
e1a3fc563b [31988f42] Docs: Added ToDos for MS Bookings tracking via Graph API (pragmatic approach) and CRM sync to SuperOffice 2026-03-09 10:51:42 +00:00
92c491a548 [31988f42] Feat: Added lunch break (12:00-12:30) and 2026 Bavarian holidays to slot finding logic 2026-03-09 10:45:57 +00:00
ca4967fc0a [31988f42] Feat: Added fallback MS Bookings / WordPress link directly into the email body as an alternative booking option 2026-03-09 10:42:56 +00:00
9 changed files with 137 additions and 48 deletions

View File

@@ -1 +1 @@
{"task_id": "31e88f42-8544-8024-ad7c-da1733e94f9a", "token": "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", "readme_path": "connector-superoffice/README.md", "session_start_time": "2026-03-09T08:46:32.104282"} {"task_id": "31e88f42-8544-8024-ad7c-da1733e94f9a", "token": "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", "readme_path": "connector-superoffice/README.md", "session_start_time": "2026-03-09T12:38:07.040119"}

View File

@@ -38,10 +38,18 @@ This directory contains Python scripts designed to integrate with the SuperOffic
* Before extensive processing, the worker now checks `connector_queue.db` via `queue_manager.check_for_recent_duplicate()`. * Before extensive processing, the worker now checks `connector_queue.db` via `queue_manager.check_for_recent_duplicate()`.
* If a `contact.created` event for the same company name is already `PROCESSING` or has been `COMPLETED` within the last 5 minutes, the current job is immediately `SKIPPED` as a duplicate. * If a `contact.created` event for the same company name is already `PROCESSING` or has been `COMPLETED` within the last 5 minutes, the current job is immediately `SKIPPED` as a duplicate.
#### 💊 E. Job Stability (Poison Pill)
* **Problem:** API errors or transient issues could cause a job to enter an infinite retry loop, blocking the entire queue indefinitely.
* **Solution:** A **"poison pill" mechanism** was added to the queue manager.
* A `retry_count` is now tracked for every job.
* If a job fails and is retried, this counter is incremented.
* If the counter exceeds a maximum threshold (currently 5), the job is automatically moved to the `FAILED` state and will not be retried again. This ensures that problematic jobs are cleared and do not block new incoming events.
### 3. Advanced API Handling (Critical Fixes) ### 3. Advanced API Handling (Critical Fixes)
* **OData Pagination:** We implemented `odata.nextLink` support (Manuel Zierl's advice) to correctly handle large result sets (>1000 records). * **OData Pagination:** We implemented `odata.nextLink` support (Manuel Zierl's advice) to correctly handle large result sets (>1000 records).
* **Case Sensitivity:** We learned that SuperOffice API keys are case-sensitive in responses (e.g., `contactId` vs `ContactId`) depending on the endpoint. Our code now handles both. * **Case Sensitivity:** We learned that SuperOffice API keys are case-sensitive in responses (e.g., `contactId` vs `ContactId`) depending on the endpoint. Our code now handles both.
* **Robust Authentication:** The API client was hardened to raise exceptions on authentication failures instead of returning `None`. This ensures that auth errors are properly caught and jobs are failed or retried correctly.
* **Auth Consistency:** Always use `load_dotenv(override=True)` to prevent stale environment variables (like expired tokens) from lingering in the shell process. * **Auth Consistency:** Always use `load_dotenv(override=True)` to prevent stale environment variables (like expired tokens) from lingering in the shell process.
* **Identity Crisis:** The `Associate/Me` and `Associate/{id}` endpoints return 500 errors if the API user lacks a linked Person record. This is a known configuration blocker for automated mailing. * **Identity Crisis:** The `Associate/Me` and `Associate/{id}` endpoints return 500 errors if the API user lacks a linked Person record. This is a known configuration blocker for automated mailing.

View File

@@ -0,0 +1,31 @@
import sqlite3
import sys
DB_PATH = "/data/connector_queue.db"
def kill_pending_jobs():
"""Sets the status of stuck jobs to FAILED."""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = """
UPDATE jobs
SET status = 'FAILED', error_msg = 'Manually failed by admin to clear queue.'
WHERE status = 'PROCESSING' OR status = 'PENDING'
"""
cursor.execute(query)
changes = conn.total_changes
conn.commit()
conn.close()
print(f"OK: Successfully marked {changes} jobs as FAILED.")
return 0
except sqlite3.Error as e:
print(f"ERROR: Could not update jobs. Reason: {e}")
return 1
if __name__ == "__main__":
sys.exit(kill_pending_jobs())

View File

@@ -30,7 +30,8 @@ class JobQueue:
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_msg TEXT, error_msg TEXT,
next_try_at TIMESTAMP next_try_at TIMESTAMP,
retry_count INTEGER DEFAULT 0
) )
""") """)
# Migration for existing DBs # Migration for existing DBs
@@ -42,6 +43,9 @@ class JobQueue:
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
except sqlite3.OperationalError: pass except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0")
except sqlite3.OperationalError: pass
conn.commit() conn.commit()
logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).") logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).")
except Exception as e: except Exception as e:
@@ -87,7 +91,7 @@ class JobQueue:
cursor = conn.cursor() cursor = conn.cursor()
try: try:
cursor.execute(""" cursor.execute("""
SELECT id, event_type, payload, created_at SELECT id, event_type, payload, created_at, retry_count
FROM jobs FROM jobs
WHERE status = 'PENDING' WHERE status = 'PENDING'
AND (next_try_at IS NULL OR next_try_at <= datetime('now')) AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
@@ -118,22 +122,25 @@ class JobQueue:
return job return job
def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): def retry_job_later(self, job_id, current_retry_count, delay_seconds=60, error_msg=None):
MAX_RETRIES = 5
if current_retry_count >= MAX_RETRIES:
fail_msg = f"Job reached maximum retry limit of {MAX_RETRIES}. Last error: {error_msg}"
logger.error(f"Job {job_id} FAILED permanently: {fail_msg}")
self.fail_job(job_id, fail_msg)
return
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
new_retry_count = current_retry_count + 1
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
try: try:
if error_msg: conn.execute(
conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ?, retry_count = ? WHERE id = ?",
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", (next_try, str(error_msg), new_retry_count, job_id)
(next_try, str(error_msg), job_id) )
)
else:
conn.execute(
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?",
(next_try, job_id)
)
conn.commit() conn.commit()
logger.warning(f"Job {job_id} set to RETRY. Next attempt at {next_try}.") logger.warning(f"Job {job_id} set to RETRY (Attempt {new_retry_count}/{MAX_RETRIES}). Next attempt at {next_try}.")
except Exception as e: except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True) logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True)
conn.rollback() conn.rollback()
@@ -271,7 +278,6 @@ class JobQueue:
if entity_id in id_to_runs: if entity_id in id_to_runs:
for run in id_to_runs[entity_id]: for run in id_to_runs[entity_id]:
run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S") run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S")
# If this job is within 15 mins of the run's activity
if abs((run_latest_time - job_time).total_seconds()) < 900: if abs((run_latest_time - job_time).total_seconds()) < 900:
target_run = run target_run = run
break break
@@ -316,7 +322,12 @@ class JobQueue:
end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S") end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S")
diff = end - start diff = end - start
seconds = int(diff.total_seconds()) seconds = int(diff.total_seconds())
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
# Display a minimal duration for skipped runs to avoid confusion
if target_run["status"] == "SKIPPED":
target_run["duration"] = "~1s"
else:
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
except: pass except: pass
# Resolve Name & Associate (if not already set from a newer job in this cluster) # Resolve Name & Associate (if not already set from a newer job in this cluster)

View File

@@ -13,6 +13,10 @@ class ContactNotFoundException(Exception):
"""Custom exception for 404 errors on Contact/Person lookups.""" """Custom exception for 404 errors on Contact/Person lookups."""
pass pass
class SuperOfficeAuthenticationError(Exception):
"""Custom exception for auth/token refresh failures."""
pass
class SuperOfficeClient: class SuperOfficeClient:
"""A client for interacting with the SuperOffice REST API.""" """A client for interacting with the SuperOffice REST API."""
@@ -86,7 +90,7 @@ class SuperOfficeClient:
"""Helper to handle 401 Unauthorized with auto-refresh.""" """Helper to handle 401 Unauthorized with auto-refresh."""
if not self.access_token: if not self.access_token:
if not self._refresh_access_token(): if not self._refresh_access_token():
return None raise SuperOfficeAuthenticationError("FATAL: Could not get initial access token.")
url = f"{self.base_url}/{endpoint}" url = f"{self.base_url}/{endpoint}"
try: try:
@@ -113,7 +117,7 @@ class SuperOfficeClient:
return self._request_with_retry(method, endpoint, payload, retry=False) return self._request_with_retry(method, endpoint, payload, retry=False)
else: else:
logger.error("❌ Token Refresh failed during retry.") logger.error("❌ Token Refresh failed during retry.")
return None raise SuperOfficeAuthenticationError("Could not refresh token during retry.")
if resp.status_code == 204: if resp.status_code == 204:
return True return True
@@ -128,10 +132,10 @@ class SuperOfficeClient:
raise ContactNotFoundException(f"Entity not found at {endpoint}") from e raise ContactNotFoundException(f"Entity not found at {endpoint}") from e
logger.error(f"❌ API {method} Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}") logger.error(f"❌ API {method} Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
return None raise # Re-raise the original HTTPError
except Exception as e: except Exception as e:
logger.error(f"❌ Connection Error during {method} for {endpoint}: {e}") logger.error(f"❌ Connection Error during {method} for {endpoint}: {e}")
return None raise # Re-raise other exceptions
def _get(self, endpoint): def _get(self, endpoint):
return self._request_with_retry("GET", endpoint) return self._request_with_retry("GET", endpoint)

View File

@@ -233,7 +233,7 @@ def dashboard():
<td>${phasesHtml}</td> <td>${phasesHtml}</td>
<td><span class="meta">${acc.duration || '0s'}</span></td> <td><span class="meta">${acc.duration || '0s'}</span></td>
<td><span class="status status-${acc.status}">${acc.status}</span></td> <td><span class="status status-${acc.status}">${acc.status}</span></td>
<td>${new Date(acc.updated_at + "Z").toLocaleTimeString()}</td> <td>${new Date(acc.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}</td>
<td><pre>${acc.error_msg || 'No issues'}</pre></td> <td><pre>${acc.error_msg || 'No issues'}</pre></td>
`; `;
tbody.appendChild(tr); tbody.appendChild(tr);
@@ -256,7 +256,7 @@ def dashboard():
tr.innerHTML = ` tr.innerHTML = `
<td>#${job.id}</td> <td>#${job.id}</td>
<td><span class="status status-${job.status}">${job.status}</span></td> <td><span class="status status-${job.status}">${job.status}</span></td>
<td>${new Date(job.updated_at + "Z").toLocaleTimeString()}</td> <td>${new Date(job.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}</td>
<td>${job.event_type}</td> <td>${job.event_type}</td>
<td><pre>${details}</pre></td> <td><pre>${details}</pre></td>
`; `;

View File

@@ -5,7 +5,7 @@ import requests
import json import json
from datetime import datetime from datetime import datetime
from queue_manager import JobQueue from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient, ContactNotFoundException from superoffice_client import SuperOfficeClient, ContactNotFoundException, SuperOfficeAuthenticationError
from config import settings from config import settings
# Setup Logging # Setup Logging
@@ -75,7 +75,7 @@ def run_worker():
status, msg = process_job(job, so_client, queue) status, msg = process_job(job, so_client, queue)
if status == "RETRY": if status == "RETRY":
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg) queue.retry_job_later(job['id'], job['retry_count'], delay_seconds=120, error_msg=msg)
elif status == "FAILED": elif status == "FAILED":
queue.fail_job(job['id'], msg or "Job failed status") queue.fail_job(job['id'], msg or "Job failed status")
elif status == "SKIPPED": elif status == "SKIPPED":
@@ -104,30 +104,26 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
payload = job['payload'] payload = job['payload']
event_low = job['event_type'].lower() event_low = job['event_type'].lower()
# --- 1. HARD ECHO SHIELD (Who triggered this?) --- # --- Strict Whitelist Filter ---
changed_by = payload.get("ChangedByAssociateId") # A job is only processed if the webhook indicates a change to a relevant field.
self_id = job.get('self_associate_id') # This is the primary defense against noise and echoes.
if changed_by and self_id and int(changed_by) == int(self_id):
msg = f"🛡️ ECHO DETECTED: Event triggered by myself (ID {self_id}). Stopping immediately."
logger.info(msg)
return ("SKIPPED", msg)
# --- 2. NOISE REDUCTION: FIELD FILTER (What changed?) ---
changes = [c.lower() for c in payload.get("Changes", [])] changes = [c.lower() for c in payload.get("Changes", [])]
if "person" in event_low: # Define fields that are significant enough to trigger a full re-assessment.
# We allow contact_id changes (linking to company) and basic identity changes trigger_fields = ["name", "urladdress", "urls"]
if "name" not in changes and "email" not in changes and "jobtitle" not in changes and "contact_id" not in changes:
msg = f"Skipping person event: No relevant changes (Name/Email/JobTitle/Mapping) in {changes}."
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
elif "contact" in event_low: # The ProgID for the Vertical UDF. A change to this field MUST trigger a resync.
if "name" not in changes and "urladdress" not in changes: vertical_udf_key = settings.UDF_VERTICAL.lower() if settings.UDF_VERTICAL else "superoffice:83"
msg = f"Skipping contact event: No relevant changes (Name/Website) in {changes}."
logger.info(f"⏭️ {msg}") # Check if any of the trigger fields or the specific vertical UDF is in the changes list.
return ("SKIPPED", msg) has_trigger_field = any(f in changes for f in trigger_fields) or vertical_udf_key in changes
# For '...created' events, we always process. For all other events, we check the whitelist.
if "created" not in event_low and not has_trigger_field:
msg = f"⏭️ Skipping event '{event_low}': No relevant field changes in {changes}."
logger.info(msg)
return ("SKIPPED", msg)
# 0. ID Extraction & Early Exit for irrelevant jobs # 0. ID Extraction & Early Exit for irrelevant jobs
person_id = None person_id = None

View File

@@ -91,6 +91,17 @@ docker-compose up -d --build --force-recreate lead-engine
* **Problem:** Wenn ein Lead nicht auf die E-Mail antwortet und auch keinen Termin bucht, geht der Kontakt verloren. * **Problem:** Wenn ein Lead nicht auf die E-Mail antwortet und auch keinen Termin bucht, geht der Kontakt verloren.
* **Lösung:** Einbindung eines Follow-up-Mechanismus nach 5 Tagen. Dies könnte entweder durch ein Flag im CRM-System oder durch eine geplante E-Mail direkt über die Lead-Engine realisiert werden. * **Lösung:** Einbindung eines Follow-up-Mechanismus nach 5 Tagen. Dies könnte entweder durch ein Flag im CRM-System oder durch eine geplante E-Mail direkt über die Lead-Engine realisiert werden.
### Task: Erfolgsmessung & Tracking (Microsoft Bookings Auswertung)
* **Ziel:** Übersicht gewinnen, wie viele Meetings tatsächlich über diesen neuen Kanal (Trading Twins / E-Mail) final gebucht wurden.
* **Lösungsweg (Pragmatischer Ansatz):**
1. Die E-Mail-Adresse der geteilten Bookings-Seite (`KennenlernenmitRoboplanet@wackler-group.de`) muss von der IT zur bestehenden **Exchange ApplicationAccessPolicy** der `CAL_APPID` hinzugefügt werden.
2. Dadurch kann unsere bestehende "Lese-App" auch die Termine dieses zentralen Postfachs über die MS Graph API (`GET /users/{bookings-email}/calendar/events`) auslesen.
3. Ein geplanter Job (z.B. täglich oder wöchentlich) zählt die neu hinzugekommenen Termine und erstellt einen kurzen KPI-Report.
### Task: CRM-Synchronisierung der gebuchten Termine (SuperOffice)
* **Ziel:** Die über Bookings generierten Termine müssen für die Historie und Dokumentation im CRM-System (SuperOffice) landen.
* **Lösung:** Die im vorherigen Task ausgelesenen Termine werden über den `connector-superoffice` an das CRM übergeben und dort als "Termin" oder "Aktivität" direkt beim entsprechenden Kontakt (gematcht über die E-Mail-Adresse) abgelegt.
```env ```env
# Info-Postfach (App 1 - Schreiben) # Info-Postfach (App 1 - Schreiben)
INFO_Application_ID=... INFO_Application_ID=...

View File

@@ -102,8 +102,25 @@ def find_slots(start, view, interval):
""" """
Parses availability string: '0'=Free, '2'=Busy. Parses availability string: '0'=Free, '2'=Busy.
Returns 2 free slots (start times) within business hours (09:00 - 16:30), Returns 2 free slots (start times) within business hours (09:00 - 16:30),
excluding weekends (Sat/Sun), with approx. 3 hours distance between them. excluding weekends (Sat/Sun), lunch breaks, and holidays.
""" """
# Hardcoded Bavarian Holidays for 2026 (Common in Munich)
HOLIDAYS_2026 = {
(1, 1), # Neujahr
(1, 6), # Heilige Drei Könige
(4, 3), # Karfreitag
(4, 6), # Ostermontag
(5, 1), # Maifeiertag
(5, 14), # Christi Himmelfahrt
(5, 25), # Pfingstmontag
(6, 4), # Fronleichnam
(8, 15), # Mariä Himmelfahrt
(10, 3), # Tag der Deutschen Einheit
(11, 1), # Allerheiligen
(12, 25), # 1. Weihnachtstag
(12, 26) # 2. Weihnachtstag
}
slots = [] slots = []
first_slot = None first_slot = None
@@ -116,7 +133,15 @@ def find_slots(start, view, interval):
# 1. Mon-Fri only # 1. Mon-Fri only
# 2. Business hours (09:00 - 16:30) # 2. Business hours (09:00 - 16:30)
# 3. Future only # 3. Future only
if slot_time.weekday() < 5 and (9 <= slot_time.hour < 17) and slot_time > datetime.now(TZ_BERLIN): # 4. No Holidays (Bavaria)
# 5. Lunch break (12:00 - 12:30)
is_holiday = (slot_time.month, slot_time.day) in HOLIDAYS_2026
is_weekend = slot_time.weekday() >= 5
is_lunch = 12 == slot_time.hour and slot_time.minute < 30
is_business_hours = 9 <= slot_time.hour < 17
if not is_weekend and not is_holiday and not is_lunch and is_business_hours and slot_time > datetime.now(TZ_BERLIN):
# Max start time 16:30 # Max start time 16:30
if slot_time.hour == 16 and slot_time.minute > 30: if slot_time.hour == 16 and slot_time.minute > 30:
continue continue
@@ -411,6 +436,9 @@ def process_lead(request_id, company, opener, receiver, name):
booking_html += f'<li><a href="{link}">{format_date_for_email(s)}</a></li>' booking_html += f'<li><a href="{link}">{format_date_for_email(s)}</a></li>'
booking_html += "</ul>" booking_html += "</ul>"
fallback_url = WORDPRESS_BOOKING_URL if WORDPRESS_BOOKING_URL else MS_BOOKINGS_URL
booking_html += f'<br><p>Alternativ buchen Sie direkt einen passenden Termin in <a href="{fallback_url}">meinem Kalender</a>.</p>'
try: try:
with open(SIGNATURE_FILE_PATH, 'r') as f: with open(SIGNATURE_FILE_PATH, 'r') as f:
sig = f.read() sig = f.read()