9 Commits

Author SHA1 Message Date
96fecd8a46 docs: add deep-dive analysis of notion verticals and check script [2ff88f42] 2026-02-23 07:09:28 +00:00
874853b6bc [2f988f42] Zusammenfassung einfügen
Zusammenfassung einfügen
2026-02-22 19:36:42 +00:00
4d9cdc9e6c General Maintenance: Add project overview table with sub-README links [2f988f42] 2026-02-22 19:36:03 +00:00
acd2062d0b [30f88f42] einfügen
einfügen
2026-02-22 19:22:39 +00:00
0d294cd0cc [30e88f42] Einfügen
Einfügen
2026-02-22 14:31:00 +00:00
11d2bc03bf [30e88f42] ✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
  mit Echtdaten abgeschlossen.
2026-02-22 08:20:28 +00:00
32332c092d [30e88f42] 1. Erreichte Meilensteine
1. Erreichte Meilensteine
   * Stabile Authentifizierung: Kritische Fehler beim Token-Refresh behoben. Der Client fällt nun automatisch auf sod zurück, falls
     die Umgebungsvariable leer ist.
   * Pydantic V2 Kompatibilität: config.py auf natives Python umgestellt, um ModuleNotFoundError in Docker-Containern zu verhindern.
   * Automatisierter E2E-Test: Neuer Standalone-Test connector-superoffice/tests/test_e2e_flow.py verifiziert die gesamte Kette
     (Account-Anlage -> Anreicherung -> Person-Texte -> Vertical-Wechsel).
   * Bidirektionaler Vertical-Sync: Der Worker erkennt jetzt manuelle Branchen-Änderungen in SuperOffice, synchronisiert sie zum
     Company Explorer und triggert automatisch neue Texte für alle Personen (Kaskade).
   * Daten-Persistenz: Personen werden jetzt beim ersten Webhook im Company Explorer gespeichert, damit Updates (wie Branchenwechsel)
     auch ohne erneute Übermittlung des Jobtitels funktionieren.
   * Content-Generierung: Die Marketing-Matrix wurde live für die Branchen "Healthcare - Hospital" und "Leisure - Indoor Active"
     befüllt.
2026-02-21 21:27:12 +00:00
e47aa383c7 Enhance: Address/VAT Sync & Infrastructure Hardening [30e88f42]
- Implemented Address (City) and VAT (OrgNumber) sync back to SuperOffice.
- Hardened Infrastructure: Removed Pydantic dependency in config for better Docker compatibility.
- Improved SuperOffice Client error logging and handled empty SO_ENVIRONMENT variables.
- Updated Matrix Generator: Switched to gemini-2.0-flash, added industry filtering, and robust JSON parsing.
- Updated Documentation with session findings and troubleshooting steps.
2026-02-21 21:26:57 +00:00
4d48b2689b Feat: End-to-End Test & Bidirectional Vertical Sync [30e88f42]
- Implemented comprehensive E2E test  covering full roundtrip and manual overrides.
- Enhanced  to detect manual Vertical changes in SuperOffice and sync them to Company Explorer.
- Updated  to handle industry overrides from CRM and auto-persist Person/Contact data for robust cascade updates.
2026-02-21 20:05:25 +00:00
31 changed files with 2054 additions and 273 deletions

View File

@@ -1 +1 @@
{"task_id": "2f988f42-8544-8100-9dba-e69ee2376730", "token": "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", "session_start_time": "2026-02-21T10:32:38.618482"}
{"task_id": "2f988f42-8544-800e-abc1-d1b1c56ade4d", "token": "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", "session_start_time": "2026-02-22T19:36:18.401647"}

239
ANALYSIS_AND_PROPOSAL.md Normal file
View File

@@ -0,0 +1,239 @@
# Deep-Dive Analyse: Pains & Gains vs. Product Reality
**Status:** In Bearbeitung
**Basis:** Transkript (`transkript_verticals1.txt`) + Notion Ist-Stand + Logik-Check "Product Fit"
---
## 1. Automotive - Dealer (Autohäuser)
* **Primary Product:** Security Roboter
* **Secondary Product:** Cleaning Outdoor Roboter (Sweeper)
### Analyse (Chain of Thought)
* **Ist-Zustand Fehler:** "Doppel-Nutzen: Tagsüber Reinigung, nachts Bestreifung" bei Gains. Das ist technisch falsch. Ein Wachroboter hat keine Besen, eine Kehrmaschine keine Überwachungskameras (in der Regel).
* **Transcript Check:** "Teile-Diebstahl (Katalysatoren/Räder)", "Vandalismus", "Imageverlust (Laub/Dreck)". Christian sagt: "Sicherheit ist wichtiger als sauberer Hof".
* **Logik:**
* *Security:* Löst Diebstahl/Vandalismus. Ersetzt/Ergänzt Wachdienst.
* *Sweeper:* Löst "dreckigen Hof" (Imageproblem bei Premium-Autos).
### PROPOSAL
**Pains:**
[Primary Product: Security]
- Teile-Diebstahl: Organisierte Banden demontieren nachts Katalysatoren und Räder enormer Schaden und Versicherungsstress.
- Vandalismus: Zerkratzte Neuwagen auf dem Außenhof mindern den Verkaufswert drastisch.
- Personalkosten: Lückenlose menschliche Nachtbewachung ist für viele Standorte wirtschaftlich kaum darstellbar.
[Secondary Product: Cleaning Outdoor]
- Image-Verlust: Ein verschmutzter Außenbereich (Laub, Müll) passt nicht zum Premium-Anspruch der ausgestellten Fahrzeuge.
- Manueller Aufwand: Verkaufspersonal oder teure Hausmeisterdienste binden Zeit mit unproduktivem Fegen.
**Gains:**
[Primary Product: Security]
- Abschreckung & Intervention: Permanente Roboter-Präsenz wirkt präventiv; bei Alarm schaltet sich sofort eine Leitstelle auf.
- Asset-Schutz: Reduktion von Versicherungsschäden und Selbstbehalten durch lückenlose Dokumentation.
[Secondary Product: Cleaning Outdoor]
- Premium-Präsentation: Der Hof ist bereits morgens bei Kundenöffnung makellos sauber.
- Automatisierung: Täglich gereinigte Flächen ohne manuellen Eingriff.
---
## 2. Industry - Manufacturing (Produktion)
* **Primary Product:** Cleaning Indoor (Wet Surface)
* **Secondary Product:** Transport Roboter
### Analyse (Chain of Thought)
* **Ist-Zustand Fehler:** Die aktuellen Pains ("Such- und Holzeiten", "Materialfluss") beziehen sich zu 100% auf *Transport*, obwohl *Cleaning* das Primary Product ist.
* **Transcript Check:** Alex warnt explizit vor "Öl und Chemie" ("Wie Hundekacke"). Roboter schmieren das nur breit. Fokus muss auf *Staub*, *Prozesssicherheit* und *Mitarbeiterentlastung* liegen. Alex: "Betriebskosten senken", "Produktivität steigern".
* **Logik:**
* *Cleaning:* Darf nicht in Ölspuren fahren. Aber: Große Hallen verstauben. Staplerverkehr erzeugt Abrieb. Rutschgefahr für Mitarbeiter.
* *Transport:* "Facharbeiter rennt C-Teilen hinterher". Das ist der klassische Pain.
### PROPOSAL
**Pains:**
[Primary Product: Cleaning Indoor]
- Prozess-Sicherheit: Staub und Abrieb auf Fahrwegen gefährden empfindliche Sensorik (z.B. von FTS) und die Produktqualität.
- Arbeitssicherheit: Rutschgefahr durch feine Staubschichten oder ausgelaufene (nicht-chemische) Flüssigkeiten erhöht das Unfallrisiko.
- Ressourcen-Verschwendung: Hochbezahlte Fachkräfte müssen Maschinen stoppen, um ihr Umfeld zu reinigen.
[Secondary Product: Transport]
- Intransparenz & Suchzeiten: Facharbeiter unterbrechen die Wertschöpfung für unproduktive Materialbeschaffung ("C-Teile holen").
- Mikrostillstände: Fehlendes Material an der Linie stoppt den Takt.
**Gains:**
[Primary Product: Cleaning Indoor]
- Konstante Bodenqualität: Definierte Sauberkeitsstandards (Audit-Ready) rund um die Uhr.
- Unfallschutz: Reduktion von Arbeitsunfällen durch rutschfreie Verkehrswege.
[Secondary Product: Transport]
- Just-in-Time Logistik: Automatisierter Nachschub hält die Fachkraft wertschöpfend an der Maschine.
- Fluss-Optimierung: Stabilisierung der Taktzeiten und OEE durch verlässliche Materialflüsse.
---
## 3. Healthcare - Hospital (Krankenhaus)
* **Primary Product:** Cleaning Indoor (Wet Surface)
* **Secondary Product:** Service Roboter (Transport)
### Analyse (Chain of Thought)
* **Ist-Zustand Fehler:** Vermischung. "Fachpflegekräfte... logistische Routinetätigkeiten" steht bei Cleaning. Das ist falsch.
* **Transcript Check:** "Hände weg vom Bett" ist das Transport-Thema (Essen/Wäsche). "Hygienerisiko/Kreuzkontamination" ist das Cleaning-Thema. Alex: "Validierbare Reinigung".
* **Logik:**
* *Cleaning:* Muss Keime reduzieren, 24/7 laufen, dokumentieren.
* *Service/Transport:* Muss Schwestern entlasten (Schrittzähler reduzieren).
### PROPOSAL
**Pains:**
[Primary Product: Cleaning Indoor]
- Hygienerisiko & Kreuzkontamination: Manuelle Reinigung ist oft fehleranfällig und variiert stark in der Qualität (Gefahr für Patienten).
- Dokumentationspflicht: Der Nachweis RKI-konformer Reinigung bindet wertvolle Zeit und ist bei Personalmangel lückenhaft.
- Personalnot: Fehlende Reinigungskräfte führen zu gesperrten Bereichen oder sinkendem Hygienelevel.
[Secondary Product: Service]
- Berufsfremde Tätigkeiten: Pflegekräfte verbringen bis zu 30% der Schichtzeit mit Hol- und Bringdiensten (Essen, Wäsche, Labor).
- Physische Überlastung: Lange Laufwege in großen Kliniken erhöhen die Erschöpfung des Fachpersonals.
**Gains:**
[Primary Product: Cleaning Indoor]
- Validierbare Hygiene: Robotergarantierte, protokollierte Desinfektionsleistung audit-sicher auf Knopfdruck.
- 24/7 Verfügbarkeit: Konstantes Hygienelevel auch nachts und am Wochenende, unabhängig vom Dienstplan.
[Secondary Product: Service]
- Zeit für Patienten: Rückgewinnung von ca. 2,5 Stunden Fachkraft-Kapazität pro Schicht für die Pflege.
- Mitarbeiterzufriedenheit: Reduktion der Laufwege ("Schrittzähler") entlastet das Team spürbar.
---
## 4. Logistics - Warehouse (Lagerhalle)
* **Primary Product:** Cleaning Outdoor Roboter (Sweeper) -> *Korrektur: Sollte hier "Cleaning Indoor (Sweeper)" gemeint sein?*
* **Logik-Check:** Im Warehouse fährt man selten mit einer Straßenkehrmaschine. Aber: Man nutzt *Aufsitz-Kehrmaschinen* (Sweeper) für den Innenbereich (Palettenspäne, Staub). "Wet Surface" ist im Lager oft zweitrangig (außer Lebensmittel), da Wasser + Kartonage = schlecht.
* **Annahme:** Wir mappen "Sweeper" hier auf *Indoor Dry Cleaning*.
* **Transcript Check:** Alex: "Im Warehouse sehe ich eher die Kehrmaschine als Erstes... Verschmutzung ist eine andere... Paletten Dinger."
* **Secondary Product:** Cleaning Indoor (Wet Surface) -> Alex: "Anspruch an Nassreinigung im ersten Schritt nicht so hoch."
### PROPOSAL
**Pains:**
[Primary Product: Cleaning (Sweeper/Dry)]
- Grobschmutz & Palettenreste: Holzspäne und Verpackungsreste gefährden Reifen von Flurförderzeugen und blockieren Lichtschranken.
- Staubbelastung: Aufgewirbelter Staub legt sich auf Waren und Verpackungen (Reklamationsgrund) und schadet der Gesundheit.
- Manuelle Bindung: Mitarbeiter müssen große Flächen manuell kehren, statt zu kommissionieren.
[Secondary Product: Cleaning (Wet)]
- Hartnäckige Verschmutzungen: Eingefahrene Spuren, die durch reines Kehren nicht lösbar sind.
**Gains:**
[Primary Product: Cleaning (Sweeper/Dry)]
- Anlagenschutz: Sauberer Boden verhindert Störungen an Fördertechnik und Sensoren durch Staub/Teile.
- Staubfreie Ware: Produkte verlassen das Lager in sauberem Zustand (Qualitätsanspruch).
[Secondary Product: Cleaning (Wet)]
- Grundsauberkeit: Gelegentliche Nassreinigung für Tiefenhygiene in Fahrgassen.
---
## 5. Retail - Food (Supermarkt/LEH)
* **Primary Product:** Cleaning Indoor (Wet)
* **Secondary Product:** Service Roboter
### Analyse
* **Transcript Check:** "Kaufland/Aldi... große Flächen". "Reinigungskosten steigen". "Sichtbare Reinigungsmaschinen blockieren Kundenwege".
* **Pain:** Dreckige Böden (Milch/Joghurt ausgelaufen) = Rutschgefahr + Ekel. Personal ist knapp (Regalauffüller).
* **Logik:**
* *Cleaning:* Muss "Spot Cleaning" können (Malheur wegmachen) und Flächenleistung bringen.
* *Service:* Promotion? Oder "Wo ist die H-Milch?"
### PROPOSAL
**Pains:**
[Primary Product: Cleaning Indoor]
- "Malheur-Management": Zerbrochene Gläser oder ausgelaufene Flüssigkeiten (Haverien) bilden sofortige Rutschfallen und binden Personal.
- Optischer Eindruck: Grauschleier und verschmutzte Böden senken das Frische-Empfinden der Kunden massiv.
- Personal-Engpass: Marktpersonal soll Regale füllen und kassieren, nicht mit der Scheuersaugmaschine fahren.
[Secondary Product: Service]
- Fehlende Beratung: Kunden finden Produkte nicht und brechen den Kauf ab, da kein Personal greifbar ist.
**Gains:**
[Primary Product: Cleaning Indoor]
- Sofortige Sicherheit: Roboter beseitigt Rutschgefahren autonom und schnell.
- Frische-Optik: Permanent glänzende Böden ("Lobby-Effekt") unterstreichen die Qualität der Lebensmittel.
[Secondary Product: Service]
- Umsatz-Boost: Roboter führt Kunden direkt zum gesuchten Produkt oder bewirbt Aktionen aktiv am POS.
---
## 6. Hospitality - Gastronomy
* **Primary Product:** Cleaning Indoor (Wet)
* **Secondary Product:** Service Roboter
* **Achtung:** Im Transkript wurde diskutiert, dies evtl. zu tauschen ("L'Osteria... Service Robotik"). Aber Alex sagt auch: "Reinigungsrobotik als erstes".
* **Entscheidung:** Wir lassen Cleaning als Primary, aber schärfen Service als starken Secondary.
### PROPOSAL
**Pains:**
[Primary Product: Cleaning Indoor]
- Klebrige Böden: Verschüttete Getränke und Speisereste wirken unhygienisch und stören das Ambiente.
- Randzeiten-Problem: Nach Schließung ist es schwer, Personal für die Grundreinigung zu finden (Nachtzuschläge).
[Secondary Product: Service]
- "Teller-Taxi": Servicekräfte verbringen 80% der Zeit mit Laufen (Küche <-> Gast) statt mit Verkaufen/Betreuung.
- Personalmangel: Zu wenig Kellner führen zu langen Wartezeiten, kalten Speisen und genervten Gästen.
**Gains:**
[Primary Product: Cleaning Indoor]
- Makelloses Ambiente: Sauberer Boden als Visitenkarte des Restaurants.
- Zuverlässigkeit: Die Grundreinigung findet jede Nacht garantiert statt.
[Secondary Product: Service]
- Mehr Umsatz am Gast: Servicekraft hat Zeit für Empfehlungen (Wein, Dessert) und Upselling.
- Entlastung: Roboter übernimmt das schwere Tragen (Tabletts), Personal bleibt im Gastraum präsent.
---
## 7. Leisure - Outdoor Park (Freizeitparks)
* **Primary Product:** Cleaning Outdoor (Sweeper)
* **Secondary Product:** Service Roboter
### Analyse
* **Transcript Check:** "Kilometerlange Wege", "Grobschmutz (Laub, Müll)". Alex: "Große Kehrmaschine (VIGGO 100)".
* **Logik:** Es geht um Ästhetik ("Heile Welt") und Sicherheit (kein Müll).
### PROPOSAL
**Pains:**
[Primary Product: Cleaning Outdoor]
- Immersion-Breaker: Müll und Laub auf den Wegen stören die perfekte Illusion ("Heile Welt") des Parks.
- Enorme Flächen: Kilometerlange Wegenetze binden ganze Kolonnen von Reinigungskräften.
- Sicherheit: Rutschgefahr durch nasses Laub oder Abfall.
[Secondary Product: Service]
- Versorgungslücken: An abgelegenen Attraktionen fehlt oft Gastronomie-Angebot.
**Gains:**
[Primary Product: Cleaning Outdoor]
- Perfekte Inszenierung: Unsichtbare Reinigung in den frühen Morgenstunden sichert das perfekte Erlebnis bei Parköffnung.
- Effizienz: Ein Roboter schafft die Flächenleistung mehrerer manueller Kehrer.
[Secondary Product: Service]
- Mobiler Verkauf: Roboter bringen Getränke/Eis direkt zu den Warteschlangen (Zusatzumsatz).
---
## 8. Energy - Grid & Utilities (Energieversorger)
* **Primary Product:** Security Roboter
* **Secondary Product:** -
### Analyse
* **Logik:** KRITIS-Infrastruktur. Abgelegen. Kupferdiebstahl.
* **Pain:** Man kann nicht überall sein. Zäune werden durchschnitten.
### PROPOSAL
**Pains:**
[Primary Product: Security]
- Sabotage & Diebstahl: Kupferdiebstahl in Umspannwerken verursacht Millionenschäden und Versorgungsausfälle.
- Reaktionszeit: Entlegene Standorte sind für Interventionskräfte oft zu spät erreichbar.
- Sicherheitsrisiko Mensch: Alleinarbeit bei Kontrollgängen in Hochspannungsbereichen ist gefährlich.
**Gains:**
[Primary Product: Security]
- First Responder Maschine: Roboter ist bereits vor Ort, verifiziert Alarm und schreckt Täter ab.
- KRITIS-Compliance: Lückenlose, manipulationssichere Dokumentation aller Vorfälle für Behörden.
- Arbeitsschutz: Roboter übernimmt gefährliche Routinekontrollen (z.B. Thermografie an Trafos).

View File

@@ -218,24 +218,65 @@ Der Prozess ist streng sequenziell und baut aufeinander auf.
* **Extraktion:** Der `MetricParser` extrahiert den Rohwert (z.B. `250`). Dieser wird in `calculated_metric_value` gespeichert.
* **Standardisierung:** Die Formel aus `standardization_logic` (z.B. `wert * 100`) wird auf den Rohwert angewendet. Das Ergebnis wird in `standardized_metric_value` geschrieben.
### 17.3 "Atomic Opener" Generierung im Detail
### 17.3 "Atomic Opener" Generierung im Detail (Zweistufiger Prozess, Feb 22, 2026)
**Ziel:** Zwei hoch-personalisierte, schlagkräftige Einleitungssätze (1-2 Sätze) zu generieren, die eine operative Herausforderung implizieren, ohne die Lösung zu nennen.
**Ziel:** Zwei hoch-personalisierte, schlagkräftige Einleitungssätze (2-3 Sätze) zu generieren, die eine operative Herausforderung als wertschätzende Beobachtung formulieren.
* **Zwei getrennte Kontexte:** Es werden zwei Sätze für zwei Personas generiert:
1. **`ai_opener` (Primär):** Zielt auf den **Infrastruktur-Entscheider** (z.B. Facility Manager, Technischer Leiter).
2. **`ai_opener_secondary` (Sekundär):** Zielt auf den **Operativen Entscheider** (z.B. Produktionsleiter, Pflegedienstleitung).
Um die Qualität der Ergebnisse zu maximieren und "Rauschen" aus den Website-Texten zu eliminieren, wurde ein zweistufiger Generierungsprozess implementiert.
* **Persona-spezifische Produktauswahl:**
* Der primäre Opener (Infrastruktur) bezieht sich **immer** auf das `primary_category` der Branche.
* Der sekundäre Opener (Operativ) bezieht sich:
* Standardmäßig ebenfalls auf das `primary_category`.
* **Ausnahme:** Wenn in der Branche `ops_focus_secondary = True` gesetzt ist, bezieht er sich auf das `secondary_category`.
#### Schritt 1: Das Website-Dossier (Extraktion & Komprimierung)
Zunächst analysiert die KI den Rohtext der Website und erstellt ein strukturiertes Dossier. Dieser Zwischenschritt dient dazu, die wesentlichen Informationen über das Geschäftsmodell und das Reinigungspotenzial (Hygieneanforderungen, Flächengröße) zu isolieren.
* **Der "1komma5°"-Prompt:**
* Die Generierung nutzt einen bewährten Prompt, der das Sprachmodell anweist, das Geschäftsmodell des Unternehmens zu analysieren und eine wertschätzende Beobachtung zu formulieren.
* **"Munition":** Der Prompt wird dynamisch mit den hoch-spezifischen, vordefinierten `pains` und `gains` aus der jeweiligen Branche angereichert.
* **Regel:** Das Produkt selbst wird **nicht** im Opener genannt. Der Satz fokussiert sich rein auf die Formulierung der Herausforderung. Die Auflösung erfolgt in den nachfolgenden, persona-spezifischen Textbausteinen.
**Prompt (Website-Dossier - Fokus Reinigung):**
```text
**Rolle:** Du bist ein erfahrener B2B-Marktanalyst mit Fokus auf Facility Management und Gebäudereinigung.
**Aufgabe:** Analysiere den Website-Text des Unternehmens '{company_name}' und erstelle ein prägnantes Dossier.
**Deine Analyse besteht aus ZWEI TEILEN:**
**TEIL 1: Geschäftsmodell-Analyse**
1. Identifiziere die Kernprodukte und/oder Dienstleistungen des Unternehmens.
2. Fasse in 2-3 prägnanten Sätzen zusammen, was das Unternehmen macht und für welche Kunden.
**TEIL 2: Reinigungspotenzial & Hygiene-Analyse**
1. Scanne den Text gezielt nach Hinweisen auf große Bodenflächen, Publikumsverkehr oder hohe Hygieneanforderungen (Schlüsselwörter: Reinigung, Sauberkeit, Hygiene, Bodenpflege, Verkaufsfläche, Logistikhalle, Patientenversorgung, Gästeerlebnis).
2. Bewerte das Potenzial für automatisierte Reinigungslösungen auf einer Skala (Hoch / Mittel / Niedrig).
3. Extrahiere die 1-2 wichtigsten Sätze, die diese Anforderungen oder die Größe der Einrichtung belegen.
**Antworte AUSSCHLIESSLICH im folgenden exakten Format:**
GESCHÄFTSMODELL: <Deine 2-3 Sätze über das Kerngeschäft des Unternehmens.>
REINIGUNGSPOTENZIAL: <Hoch / Mittel / Niedrig / Kein Hinweis>
HYGIENE-BEWEISE: <Die 1-2 aussagekräftigsten Sätze als Bullet Points (* Satz 1...)>
```
#### Schritt 2: Formulierung des Openers (Scharfsinnige Beobachtung)
Basierend auf dem Dossier aus Schritt 1 und den branchenspezifischen Pains/Gains wird der finale Opener formuliert.
**Prompt (Finaler Opener - Optimiert Feb 22, 2026):**
```text
Du bist ein scharfsinniger Marktbeobachter und Branchenexperte. Formuliere eine wertschätzende Einleitung (genau 2-3 Sätze) für ein Anschreiben an das Unternehmen {company.name}.
DEINE PERSONA:
Ein anerkennender Branchenkenner, der eine scharfsinnige Beobachtung teilt. Dein Ton ist wertschätzend, professionell und absolut NICHT verkäuferisch.
STRATEGISCHER HINTERGRUND (Nicht nennen!):
Dieses Unternehmen wird kontaktiert, weil sein Geschäftsmodell perfekt zu folgendem Bereich passt: "{product_name}" ({product_desc}).
Ziel des Schreibens ist es, die Branchen-Herausforderungen "{relevant_pains}" zu adressieren und die Mehrwerte "{relevant_gains}" zu ermöglichen.
DEINE AUFGABE:
1. Firmenname kürzen: Kürze "{company.name}" sinnvoll (meist erste zwei Worte). Entferne UNBEDINGT Rechtsformen wie GmbH, AG, gGmbH, e.V. etc.
2. Struktur: Genau 2 bis 3 flüssige Sätze.
3. Inhalt:
- Satz 1: Eine wertschätzende Beobachtung zum Geschäftsmodell oder einem aktuellen Fokus des Unternehmens (siehe Analyse-Dossier).
- Satz 2-3: Leite elegant zu einer spezifischen operativen Herausforderung über, die für das Unternehmen aufgrund seiner Größe oder Branche relevant ist (orientiere dich an "{relevant_pains}").
4. STRENGES VERBOT: Nenne KEIN Produkt ("{product_name}") and biete KEINE "Lösungen", "Hilfe" oder "Zusammenarbeit" an. Der Text soll eine reine Beobachtung bleiben.
5. KEINE Anrede (kein "Sehr geehrte Damen und Herren", kein "Hallo").
KONTEXT (Analyse-Dossier):
{context_text}
```
* **Regel:** Das Produkt selbst wird **nicht** im Opener genannt. Der Satz fokussiert sich rein auf die Formulierung der Herausforderung. Die Auflösung erfolgt in den nachfolgenden, persona-spezifischen Textbausteinen.
### 17.4 Debugging & Lessons Learned (Feb 21, 2026)
@@ -258,4 +299,60 @@ Die Implementierung der v3.0-Logik war von mehreren hartnäckigen Problemen gepr
* **Lösung:** Robuste Prüfung auf `None` vor der String-Manipulation (`(value or "").lower()`) implementiert.
* **Test:** Ein vollständiger E2E-Test (`test_e2e_full_flow.py`) wurde etabliert, der Provisioning, Analyse und Opener-Generierung automatisiert verifiziert.
Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik.
Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik.
### 17.5 Lessons Learned: SuperOffice Address Sync (Feb 22, 2026)
Die Synchronisation von Stammdaten (Adresse, VAT) erforderte ein tiefes Eintauchen in die API-Struktur.
1. **Field Naming:** Die REST-API verlangt strikt `OrgNr` für die Umsatzsteuer-ID, nicht `OrgNumber` oder `VatNo`.
2. **Nested Updates:** Adressen müssen tief verschachtelt übergeben werden (`Address.Postal.City`), nicht flach (`PostalAddress`).
3. **Atomic Strategy:** Getrennte Updates für UDFs und Standardfelder führen zu Race Conditions. **Nur ein gebündelter PUT-Request** auf den Haupt-Endpunkt garantiert, dass keine Daten (durch veraltete Reads) überschrieben werden.
### 17.6 Lessons Learned: Full E-Mail Simulation & Persona Hardening (Feb 22, 2026)
Der End-to-End-Test der E-Mail-Simulation im SuperOffice-Kalender deckte subtile UI-Herausforderungen auf.
1. **Die "42-Zeichen-Falle" (Appointment Title):**
* **Problem:** Das `MainHeader`-Feld in der SuperOffice-Listenansicht ist auf ca. 42 Zeichen begrenzt. Längere Betreffzeilen werden abgeschnitten, und der Überhang wird oft als erste Zeile in den Textkörper verschoben.
* **Lösung:** Strikte serverseitige Kürzung des Titels auf 40 Zeichen. Zusätzlich wird der **vollständige Betreff** als allererste Zeile in die Description geschrieben, gefolgt von zwei Newlines. Dies stellt sicher, dass SuperOffice den Betreff "stiehlt" und die Anrede ("Hallo...") sicher im Body bleibt.
2. **Rollen-Dynamik & "Sticking":**
* **Problem:** Wenn ein Nutzer den Jobtitel ändert, blieb oft die alte Persona (z.B. "Infrastruktur") aktiv, wenn für den neuen Titel kein exakter Match vorlag.
* **Lösung:** Implementierung eines **Rollen-Resets**. Bei jeder Namens- oder Funktionsänderung wird die Persona im Company Explorer gelöscht und basierend auf den neuesten Mappings (z.B. neue Regeln für "Geschäftsleitung" -> "Wirtschaftlicher Entscheider") neu berechnet.
3. **Kaskadierung & Loop-Schutz:**
* **Problem:** Änderungen am Unternehmen (Contact) müssen alle Personen aktualisieren. Dies triggerte jedoch Endlosschleifen, da SuperOffice nach jedem Update einen eigenen Webhook sendete.
* **Lösung:** Implementierung einer **Deep-Filter-Logik**. Der Worker ignoriert nun Felder wie `updated`, `updated_associate_id` und `registered`. Nur inhaltliche Änderungen (Name, Branche, Funktion) lösen die Verarbeitungslogik aus.
### 17.7 Marketing Matrix Schärfung (v3.2 - Feb 22, 2026)
Die Qualität der Marketing-Matrix (Subject, Intro, Social Proof) ist entscheidend für den Erfolg des Outreachs. Daher wurde die Generierungslogik in `generate_matrix.py` massiv geschärft.
**Kern-Konzept: Der Strategische Brückenschlag**
Die KI agiert nicht mehr als reiner Copywriter, sondern als **scharfsinniger B2B-Strategieberater**. Der Prompt erzwingt die Verknüpfung von drei Ebenen:
1. **Persönlicher Druck (Persona):** Was hält den Entscheider nachts wach? (Pains der Rolle)
2. **Strategische Notwendigkeit (Branche):** Welchen externen Anforderungen muss das Unternehmen gerecht werden? (Pains/Gains der Branche)
3. **Technologischer Enabler (Produkt):** Wie genau löst die spezifische Robotik-Kategorie (Cleaning, Service, etc.) diesen Konflikt?
**Prompt-Anforderungen (Matrix):**
* **Betreff:** "Finger in die Wunde" (Max. 6 Wörter).
* **Einleitung:** Sofortiges Gefühl von "Verstanden-Werden" durch Verbindung von Rollen-Verantwortung und Branchen-Herausforderung.
* **Social Proof:** Quantifizierbare Effekte statt leerer Versprechungen, abgestimmt auf die Branche.
* **Produkt-Kontext:** Nutzung der offiziellen Kategorie-Beschreibungen aus der Datenbank zur Vermeidung von generischen Floskeln.
---
## 18. Next Steps & Todos (Post-Migration)
Nach Abschluss der Kern-Migration stehen folgende Optimierungen an:
### Task 1: Monitoring & Alerting
* **Dashboards:** Ausbau des Connector-Dashboards (`/connector/dashboard`) um Fehler-Statistiken und Retry-Logik.
* **Alerting:** Benachrichtigung (z.B. Slack/Teams) bei wiederholten Sync-Fehlern.
### Task 2: Robust Address Parsing
* **Scraper:** Derzeit verlässt sich der Scraper auf das LLM für die Adress-Extraktion. Eine Validierung gegen Google Maps API oder PLZ-Verzeichnisse würde die Datenqualität ("Golden Record") massiv erhöhen.
### Task 3: "Person-First" Logic
* **Aktuell:** Trigger ist meist das Unternehmen.
* **Zukunft:** Wenn eine Person ohne Firma angelegt wird, sollte der CE proaktiv die Domain der E-Mail-Adresse nutzen, um das Unternehmen im Hintergrund zu suchen und anzulegen ("Reverse Lookup").

View File

@@ -0,0 +1,45 @@
# Session Report: SuperOffice Connector & End-to-End Test
**Date:** Feb 21, 2026
**Focus:** End-to-End Testing, Infrastructure Hardening, Vertical Sync
## 1. Achievements
### ✅ Infrastructure & Stability
* **Authentication Fixed:** Resolved critical auth failures in `SuperOfficeClient`. Added fallback for empty `SO_ENVIRONMENT` variables and improved error logging.
* **Pydantic V2 Migration:** Rewrote `connector-superoffice/config.py` to remove dependency on `pydantic-settings`, resolving crash loops in Docker containers with older/mixed Python environments.
* **Network Path Clarified:** Confirmed that Webhooks reach the system via Nginx (`/connector/` route) on Port 80/8090, solving the "closed port 8003" mystery.
### ✅ Functional Improvements
* **Bidirectional Vertical Sync:** Implemented logic in `worker.py` to detect manual Vertical changes in SuperOffice (e.g. `[I:26] -> Leisure`) and sync them back to the Company Explorer.
* **Cascading Updates:** A Vertical change now correctly triggers a re-calculation of marketing texts for all associated persons.
* **Data Persistence:** Updated `company-explorer/backend/app.py` to automatically create/update `Contact` objects during provisioning, ensuring data consistency for cascade updates.
### ✅ Testing
* **Automated E2E Test:** Created `connector-superoffice/tests/test_e2e_flow.py`. This standalone script verifies the full data roundtrip and the vertical change scenario without needing external dependencies.
* **Matrix Content:** Generated live marketing texts for **"Healthcare - Hospital"** and **"Leisure - Indoor Active"** (5 Personas each) to enable real-world testing.
## 2. Current Status (Snapshot)
* **Connector:** Running, Authenticated (`✅ SuperOffice Client initialized`).
* **Worker:** Processing jobs. Currently correctly handling "Processing" state from CE by re-queueing (RETRY).
* **Write-Back:** Vertical Sync confirmed working. Address/VAT Sync implemented but requires final verification.
## 3. Open Issues / Next Steps
### 🔸 Address & VAT Sync Debugging
The logic for writing back `City` (PostalAddress) and `OrgNumber` (VAT) was added to `worker.py` but potentially causes loops or needs validation against the complex SuperOffice address model.
* **Todo:** Verify if address updates actually arrive in SuperOffice once the CE status switches from `PROCESSING` to `SUCCESS`.
### 🔸 UDF Configuration
There is a suspicion that `UDF_SUBJECT` and `UDF_VERTICAL` might share the same ID (`SuperOffice:5`) in `config.py`.
* **Todo:** Verify the correct ProgIDs for the UDFs in the SuperOffice Admin client and update `.env` / `config.py`.
### 🔸 Monitoring
* **Todo:** Consider a simple web-interface for the connector logs/queue status (as discussed).
## 4. How to Resume
1. **Check Logs:** Run `python3 show_logs.py` to see if the pending jobs for "Silly Billy Entertainment" have completed.
2. **Verify Data:** Check SuperOffice to see if Address and VAT were updated.
3. **Refine:** If address sync fails, debug `worker.py` section `2b.2 Sync Address & VAT`.

12
add_mapping.py Normal file
View File

@@ -0,0 +1,12 @@
import sqlite3
def add_mapping():
conn = sqlite3.connect('/app/companies_v3_fixed_2.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO job_role_mappings (pattern, role, created_at) VALUES ('%geschäftsführung%', 'Wirtschaftlicher Entscheider', '2026-02-22T14:30:00')")
conn.commit()
conn.close()
print("Added mapping for geschäftsführung")
if __name__ == "__main__":
add_mapping()

40
check_benni.py Normal file
View File

@@ -0,0 +1,40 @@
import sqlite3
import os
import json
DB_PATH = "companies_v3_fixed_2.db"
def check_company_33():
if not os.path.exists(DB_PATH):
print(f"❌ Database not found at {DB_PATH}")
return
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
print(f"🔍 Checking Company ID 33 (Bennis Playland)...")
# Check standard fields
cursor.execute("SELECT id, name, city, street, zip_code FROM companies WHERE id = 33")
row = cursor.fetchone()
if row:
print(f" Standard: City='{row[2]}', Street='{row[3]}', Zip='{row[4]}'")
else:
print(" ❌ Company 33 not found in DB.")
# Check Enrichment
cursor.execute("SELECT content FROM enrichment_data WHERE company_id = 33 AND source_type = 'website_scrape'")
enrich_row = cursor.fetchone()
if enrich_row:
data = json.loads(enrich_row[0])
imp = data.get("impressum")
print(f" Impressum Data: {json.dumps(imp, indent=2) if imp else 'None'}")
else:
print(" ❌ No website_scrape found for Company 33.")
conn.close()
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
check_company_33()

14
check_mappings.py Normal file
View File

@@ -0,0 +1,14 @@
import sqlite3
def check_mappings():
conn = sqlite3.connect('/app/companies_v3_fixed_2.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM job_role_mappings")
rows = cursor.fetchall()
print("--- Job Role Mappings ---")
for row in rows:
print(row)
conn.close()
if __name__ == "__main__":
check_mappings()

53
check_silly_billy.py Normal file
View File

@@ -0,0 +1,53 @@
import sqlite3
import os
DB_PATH = "companies_v3_fixed_2.db"
def check_company():
if not os.path.exists(DB_PATH):
print(f"❌ Database not found at {DB_PATH}")
return
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
print(f"🔍 Searching for 'Silly Billy' in {DB_PATH}...")
cursor.execute("SELECT id, name, crm_id, ai_opener, ai_opener_secondary, city, crm_vat, status FROM companies WHERE name LIKE '%Silly Billy%'")
rows = cursor.fetchall()
if not rows:
print("❌ No company found matching 'Silly Billy'")
else:
for row in rows:
company_id = row[0]
print("\n✅ Company Found:")
print(f" ID: {company_id}")
print(f" Name: {row[1]}")
print(f" CRM ID: {row[2]}")
print(f" Status: {row[7]}")
print(f" City: {row[5]}")
print(f" VAT: {row[6]}")
print(f" Opener (Primary): {row[3][:50]}..." if row[3] else " Opener (Primary): None")
# Check Enrichment Data
print(f"\n 🔍 Checking Enrichment Data for ID {company_id}...")
cursor.execute("SELECT content FROM enrichment_data WHERE company_id = ? AND source_type = 'website_scrape'", (company_id,))
enrich_row = cursor.fetchone()
if enrich_row:
import json
try:
data = json.loads(enrich_row[0])
imp = data.get("impressum")
print(f" Impressum Data in Scrape: {json.dumps(imp, indent=2) if imp else 'None'}")
except Exception as e:
print(f" ❌ Error parsing JSON: {e}")
else:
print(" ❌ No website_scrape enrichment data found.")
conn.close()
except Exception as e:
print(f"❌ Error reading DB: {e}")
if __name__ == "__main__":
check_company()

View File

@@ -90,6 +90,7 @@ class ProvisioningRequest(BaseModel):
crm_name: Optional[str] = None
crm_website: Optional[str] = None
job_title: Optional[str] = None
crm_industry_name: Optional[str] = None
class ProvisioningResponse(BaseModel):
status: str
@@ -100,6 +101,13 @@ class ProvisioningResponse(BaseModel):
opener: Optional[str] = None # Primary opener (Infrastructure/Cleaning)
opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics)
texts: Dict[str, Optional[str]] = {}
# Enrichment Data for Write-Back
address_city: Optional[str] = None
address_zip: Optional[str] = None
address_street: Optional[str] = None
address_country: Optional[str] = None
vat_id: Optional[str] = None
class IndustryDetails(BaseModel):
pains: Optional[str] = None
@@ -157,6 +165,7 @@ class CompanyDetailsResponse(BaseModel):
# Openers
ai_opener: Optional[str] = None
ai_opener_secondary: Optional[str] = None
research_dossier: Optional[str] = None
# Relations
industry_details: Optional[IndustryDetails] = None
@@ -235,53 +244,98 @@ def provision_superoffice_contact(
# 1c. Update CRM Snapshot Data (The Double Truth)
changed = False
if req.crm_name:
company.crm_name = req.crm_name
changed = True
if req.crm_website:
company.crm_website = req.crm_website
changed = True
name_changed_significantly = False
# Simple Mismatch Check
if company.website and company.crm_website:
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").replace("www.", "").strip("/")
if norm(company.website) != norm(company.crm_website):
company.data_mismatch_score = 0.8 # High mismatch
if req.crm_name and req.crm_name != company.crm_name:
logger.info(f"CRM Name Change detected for ID {company.crm_id}: {company.crm_name} -> {req.crm_name}")
company.crm_name = req.crm_name
# If the name changes, we should potentially re-evaluate the whole company
# especially if the status was already ENRICHED
if company.status == "ENRICHED":
name_changed_significantly = True
changed = True
if req.crm_website:
if company.crm_website != req.crm_website:
company.crm_website = req.crm_website
changed = True
else:
if company.data_mismatch_score != 0.0:
company.data_mismatch_score = 0.0
changed = True
# ...
if changed:
company.updated_at = datetime.utcnow()
if name_changed_significantly:
logger.info(f"Triggering FRESH discovery for {company.name} due to CRM name change.")
company.status = "NEW"
# We don't change the internal 'name' yet, Discovery will do that or we keep it as anchor.
# But we must clear old results to avoid stale data.
company.industry_ai = None
company.ai_opener = None
company.ai_opener_secondary = None
background_tasks.add_task(run_discovery_task, company.id)
db.commit()
# If we just triggered a fresh discovery, tell the worker to wait.
if name_changed_significantly:
return ProvisioningResponse(
status="processing",
company_name=company.crm_name
)
# 2. Find Contact (Person)
if req.so_person_id is None:
# Just a company sync, no texts needed
# Just a company sync, but return all company-level metadata
return ProvisioningResponse(
status="success",
company_name=company.name,
website=company.website,
vertical_name=company.industry_ai
vertical_name=company.industry_ai,
opener=company.ai_opener,
opener_secondary=company.ai_opener_secondary,
address_city=company.city,
address_street=company.street,
address_zip=company.zip_code,
address_country=company.country,
vat_id=company.crm_vat
)
person = db.query(Contact).filter(Contact.so_person_id == req.so_person_id).first()
# 3. Determine Role
role_name = None
if person and person.role:
role_name = person.role
elif req.job_title:
# Auto-Create/Update Person
if not person:
person = Contact(
company_id=company.id,
so_contact_id=req.so_contact_id,
so_person_id=req.so_person_id,
status="ACTIVE"
)
db.add(person)
logger.info(f"Created new person {req.so_person_id} for company {company.name}")
# Update Job Title & Role logic
if req.job_title:
person.job_title = req.job_title
# Simple classification fallback
mappings = db.query(JobRoleMapping).all()
found_role = None
for m in mappings:
# Check pattern type (Regex vs Simple) - simplified here
pattern_clean = m.pattern.replace("%", "").lower()
if pattern_clean in req.job_title.lower():
role_name = m.role
found_role = m.role
break
# ALWAYS update role, even if to None, to avoid 'sticking' old roles
if found_role != person.role:
logger.info(f"Role Change for {person.so_person_id}: {person.role} -> {found_role}")
person.role = found_role
db.commit()
db.refresh(person)
# 3. Determine Role
role_name = person.role
# 4. Determine Vertical (Industry)
vertical_name = company.industry_ai
@@ -312,7 +366,13 @@ def provision_superoffice_contact(
role_name=role_name,
opener=company.ai_opener,
opener_secondary=company.ai_opener_secondary,
texts=texts
texts=texts,
address_city=company.city,
address_street=company.street,
address_zip=company.zip_code,
address_country=company.country,
# TODO: Add VAT field to Company model if not present, for now using crm_vat if available
vat_id=company.crm_vat
)
@app.get("/api/companies")

View File

@@ -34,6 +34,8 @@ class Company(Base):
industry_ai = Column(String, nullable=True) # The AI suggested industry
# Location (Golden Record)
street = Column(String, nullable=True) # NEW: Street + Number
zip_code = Column(String, nullable=True) # NEW: Postal Code
city = Column(String, nullable=True)
country = Column(String, default="DE")
@@ -72,6 +74,7 @@ class Company(Base):
# NEW: AI-generated Marketing Openers
ai_opener = Column(Text, nullable=True)
ai_opener_secondary = Column(Text, nullable=True)
research_dossier = Column(Text, nullable=True)
# Relationships
signals = relationship("Signal", back_populates="company", cascade="all, delete-orphan")

View File

@@ -12,50 +12,71 @@ from backend.database import SessionLocal, Industry, Persona, MarketingMatrix
from backend.config import settings
# --- Configuration ---
MODEL_NAME = "gemini-1.5-pro-latest" # High quality copy
MODEL_NAME = "gemini-2.0-flash" # High quality copy
def generate_prompt(industry: Industry, persona: Persona) -> str:
"""
Builds the prompt for the AI to generate the marketing texts.
Combines Industry context with Persona specific pains/gains.
Combines Industry context with Persona specific pains/gains and Product Category.
"""
# Safely load JSON lists
# 1. Determine Product Context
# We focus on the primary category for the general matrix,
# but we inform the AI about the secondary option if applicable.
primary_cat = industry.primary_category
product_context = f"{primary_cat.name}: {primary_cat.description}" if primary_cat else "Intelligente Robotik-Lösungen"
# 2. Extract specific segments from industry pains/gains
def extract_segment(text, marker):
if not text: return ""
import re
segments = re.split(r'\[(.*?)\]', text)
for i in range(1, len(segments), 2):
if marker.lower() in segments[i].lower():
return segments[i+1].strip()
return text
industry_pains = extract_segment(industry.pains, "Primary Product")
industry_gains = extract_segment(industry.gains, "Primary Product")
# 3. Handle Persona Data
try:
persona_pains = json.loads(persona.pains) if persona.pains else []
persona_gains = json.loads(persona.gains) if persona.gains else []
except:
persona_pains = [persona.pains] if persona.pains else []
persona_gains = [persona.gains] if persona.gains else []
industry_pains = industry.pains if industry.pains else "Allgemeine Effizienzprobleme"
prompt = f"""
Du bist ein erfahrener B2B-Copywriter für Robotik-Lösungen (Reinigung, Transport, Service).
Ziel: Erstelle personalisierte E-Mail-Textbausteine für einen Outreach.
Du bist ein scharfsinniger B2B-Strategieberater und exzellenter Copywriter.
Deine Aufgabe: Erstelle hochpräzise, "scharfe" Marketing-Textbausteine für einen Outreach an Entscheider.
--- KONTEXT ---
ZIELBRANCHE: {industry.name}
--- STRATEGISCHER RAHMEN ---
ZIELUNTERNEHMEN (Branche): {industry.name}
BRANCHEN-KONTEXT: {industry.description or 'Keine spezifische Beschreibung'}
BRANCHEN-PAINS: {industry_pains}
BRANCEHN-HERAUSFORDERUNGEN: {industry_pains}
ANGESTREBTE MEHRWERTE: {industry_gains}
ZIELPERSON (ARCHETYP): {persona.name}
PERSÖNLICHE PAINS (Herausforderungen):
ZIELPERSON (Rolle): {persona.name}
PERSÖNLICHER DRUCK (Pains der Rolle):
{chr(10).join(['- ' + p for p in persona_pains])}
GEWÜNSCHTE GAINS (Ziele):
GEWÜNSCHTE ERFOLGE (Gains der Rolle):
{chr(10).join(['- ' + g for g in persona_gains])}
--- AUFGABE ---
Erstelle ein JSON-Objekt mit genau 3 Textbausteinen.
Tonalität: Professionell, lösungsorientiert, auf den Punkt. Keine Marketing-Floskeln ("Game Changer").
ANGEBOTENE LÖSUNG (Produkt-Fokus):
{product_context}
1. "subject": Betreffzeile (Max 6 Wörter). Muss neugierig machen und einen Pain adressieren.
2. "intro": Einleitungssatz (1-2 Sätze). Verbinde die Branchen-Herausforderung mit der persönlichen Rolle des Empfängers. Zeige Verständnis für seine Situation.
3. "social_proof": Ein Satz, der Vertrauen aufbaut. Nenne generische Erfolge (z.B. "Unternehmen in der {industry.name} senken so ihre Kosten um 15%"), da wir noch keine spezifischen Logos nennen dürfen.
--- DEIN AUFTRAG ---
Erstelle ein JSON-Objekt mit 3 Textbausteinen, die den persönlichen Druck des Empfängers mit den strategischen Notwendigkeiten seiner Branche und der technologischen Lösung verknüpfen.
Tonalität: Wertschätzend, auf Augenhöhe, scharfsinnig, absolut NICHT marktschreierisch.
1. "subject": Eine Betreffzeile (Max 6 Wörter), die den Finger direkt in eine Wunde (Pain) legt oder ein hohes Ziel (Gain) verspricht.
2. "intro": Einleitung (2-3 Sätze). Verbinde die spezifische Branchen-Herausforderung mit der persönlichen Verantwortung des Empfängers. Er muss sich sofort verstanden fühlen.
3. "social_proof": Ein Beweissatz, der zeigt, dass diese Lösung in der Branche {industry.name} bereits reale Probleme (z.B. Personalmangel, Dokumentationsdruck) gelöst hat. Nenne keine konkreten Firmennamen, aber quantifizierbare Effekte.
--- FORMAT ---
Respond ONLY with a valid JSON object. Do not add markdown formatting like ```json ... ```.
Antworte NUR mit einem validen JSON-Objekt.
Format:
{{
"subject": "...",
@@ -104,15 +125,25 @@ def real_gemini_call(prompt: str):
elif text.startswith("```"):
text = text[3:-3].strip()
return json.loads(text)
parsed_json = json.loads(text)
if isinstance(parsed_json, list):
if len(parsed_json) > 0:
return parsed_json[0]
else:
raise ValueError("Empty list returned from API")
return parsed_json
except Exception as e:
print(f"JSON Parse Error: {e}. Raw Response: {response.text}")
raise
def run_matrix_generation(dry_run: bool = True, force: bool = False):
def run_matrix_generation(dry_run: bool = True, force: bool = False, specific_industry: str = None):
db = SessionLocal()
try:
industries = db.query(Industry).all()
query = db.query(Industry)
if specific_industry:
query = query.filter(Industry.name == specific_industry)
industries = query.all()
personas = db.query(Persona).all()
print(f"Found {len(industries)} Industries and {len(personas)} Personas.")
@@ -182,6 +213,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--live", action="store_true", help="Actually call Gemini and write to DB")
parser.add_argument("--force", action="store_true", help="Overwrite existing matrix entries")
parser.add_argument("--industry", type=str, help="Specific industry name to process")
args = parser.parse_args()
run_matrix_generation(dry_run=not args.live, force=args.force)
run_matrix_generation(dry_run=not args.live, force=args.force, specific_industry=args.industry)

View File

@@ -0,0 +1,112 @@
import os
import requests
import json
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_VERTICALS = "2ec88f4285448014ab38ea664b4c2b81"
NOTION_DB_PRODUCTS = "2ec88f42854480f0b154f7a07342eb58"
if not NOTION_API_KEY:
print("Error: NOTION_API_KEY not found.")
exit(1)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
def fetch_all_pages(db_id):
pages = []
has_more = True
start_cursor = None
while has_more:
url = f"https://api.notion.com/v1/databases/{db_id}/query"
payload = {"page_size": 100}
if start_cursor:
payload["start_cursor"] = start_cursor
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
print(f"Error fetching DB {db_id}: {response.status_code} - {response.text}")
break
data = response.json()
pages.extend(data.get("results", []))
has_more = data.get("has_more", False)
start_cursor = data.get("next_cursor")
return pages
def get_property_text(page, prop_name):
props = page.get("properties", {})
prop = props.get(prop_name)
if not prop:
return ""
prop_type = prop.get("type")
if prop_type == "title":
return "".join([t["plain_text"] for t in prop.get("title", [])])
elif prop_type == "rich_text":
return "".join([t["plain_text"] for t in prop.get("rich_text", [])])
elif prop_type == "select":
select = prop.get("select")
return select.get("name") if select else ""
elif prop_type == "multi_select":
return ", ".join([s["name"] for s in prop.get("multi_select", [])])
elif prop_type == "relation":
return [r["id"] for r in prop.get("relation", [])]
else:
return f"[Type: {prop_type}]"
def main():
print("--- 1. Fetching Product Categories ---")
product_pages = fetch_all_pages(NOTION_DB_PRODUCTS)
product_map = {}
for p in product_pages:
p_id = p["id"]
# Product Category name is likely the title property
# Let's find the title property key dynamically
title_key = next((k for k, v in p["properties"].items() if v["id"] == "title"), "Name")
name = get_property_text(p, title_key)
product_map[p_id] = name
# print(f"Product: {name} ({p_id})")
print(f"Loaded {len(product_map)} products.")
print("\n--- 2. Fetching Verticals ---")
vertical_pages = fetch_all_pages(NOTION_DB_VERTICALS)
print("\n--- 3. Analysis ---")
for v in vertical_pages:
# Determine Title Key (Vertical Name)
title_key = next((k for k, v in v["properties"].items() if v["id"] == "title"), "Vertical")
vertical_name = get_property_text(v, title_key)
# Primary Product
pp_ids = get_property_text(v, "Primary Product Category")
pp_names = [product_map.get(pid, f"Unknown ({pid})") for pid in pp_ids] if isinstance(pp_ids, list) else []
# Secondary Product
sp_ids = get_property_text(v, "Secondary Product")
sp_names = [product_map.get(pid, f"Unknown ({pid})") for pid in sp_ids] if isinstance(sp_ids, list) else []
# Pains & Gains
pains = get_property_text(v, "Pains")
gains = get_property_text(v, "Gains")
print(f"\n### {vertical_name}")
print(f"**Primary Product:** {', '.join(pp_names)}")
print(f"**Secondary Product:** {', '.join(sp_names)}")
print(f"**Pains:**\n{pains.strip()}")
print(f"**Gains:**\n{gains.strip()}")
print("-" * 40)
if __name__ == "__main__":
main()

View File

@@ -181,12 +181,54 @@ JSON ONLY.
return area_metrics
return None
def _generate_marketing_opener(self, company: Company, industry: Industry, website_text: str, focus_mode: str = "primary") -> Optional[str]:
def _summarize_website_for_opener(self, company_name: str, website_text: str) -> str:
"""
Creates a high-quality summary of the website content to provide
better context for the opener generation.
"""
prompt = f"""
**Rolle:** Du bist ein erfahrener B2B-Marktanalyst mit Fokus auf Facility Management und Gebäudereinigung.
**Aufgabe:** Analysiere den Website-Text des Unternehmens '{company_name}' und erstelle ein prägnantes Dossier.
**Deine Analyse besteht aus ZWEI TEILEN:**
**TEIL 1: Geschäftsmodell-Analyse**
1. Identifiziere die Kernprodukte und/oder Dienstleistungen des Unternehmens.
2. Fasse in 2-3 prägnanten Sätzen zusammen, was das Unternehmen macht und für welche Kunden.
**TEIL 2: Reinigungspotenzial & Hygiene-Analyse**
1. Scanne den Text gezielt nach Hinweisen auf große Bodenflächen, Publikumsverkehr oder hohe Hygieneanforderungen (Schlüsselwörter: Reinigung, Sauberkeit, Hygiene, Bodenpflege, Verkaufsfläche, Logistikhalle, Patientenversorgung, Gästeerlebnis).
2. Bewerte das Potenzial für automatisierte Reinigungslösungen auf einer Skala (Hoch / Mittel / Niedrig).
3. Extrahiere die 1-2 wichtigsten Sätze, die diese Anforderungen oder die Größe der Einrichtung belegen.
**Antworte AUSSCHLIESSLICH im folgenden exakten Format:**
GESCHÄFTSMODELL: <Deine 2-3 Sätze über das Kerngeschäft des Unternehmens.>
REINIGUNGSPOTENZIAL: <Hoch / Mittel / Niedrig / Kein Hinweis>
HYGIENE-BEWEISE: <Die 1-2 aussagekräftigsten Sätze als Bullet Points (* Satz 1...)>
**Hier ist der Website-Text:**
{website_text[:5000]}
"""
try:
response = call_gemini_flash(prompt)
return response.strip() if response else "Keine Zusammenfassung möglich."
except Exception as e:
logger.error(f"Summary Error: {e}")
return "Fehler bei der Zusammenfassung."
def _generate_marketing_opener(self, company: Company, industry: Industry, context_text: str, focus_mode: str = "primary") -> Optional[str]:
if not industry: return None
# 1. Determine Context & Pains/Gains
product_context = industry.primary_category.name if industry.primary_category else "Robotik-Lösungen"
# 1. Determine Product Category & Context
category = industry.primary_category
raw_pains = industry.pains or ""
raw_gains = industry.gains or ""
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
category = industry.secondary_category
product_name = category.name if category else "Robotik-Lösungen"
product_desc = category.description if category and category.description else "Automatisierung von operativen Prozessen"
# Split pains/gains based on markers
def extract_segment(text, marker):
@@ -195,23 +237,41 @@ JSON ONLY.
for i in range(1, len(segments), 2):
if marker.lower() in segments[i].lower():
return segments[i+1].strip()
return text # Fallback to full text if no markers found
return text
relevant_pains = extract_segment(raw_pains, "Primary Product")
relevant_gains = extract_segment(raw_gains, "Primary Product")
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
product_context = industry.secondary_category.name
relevant_pains = extract_segment(raw_pains, "Secondary Product")
relevant_gains = extract_segment(raw_gains, "Secondary Product")
prompt = f"""
Du bist ein exzellenter B2B-Stratege und Texter. Formuliere einen hochpersonalisierten Einleitungssatz (1-2 Sätze).
Unternehmen: {company.name}
Branche: {industry.name}
Fokus: {focus_mode.upper()}
Herausforderungen: {relevant_pains}
Kontext: {website_text[:2500]}
Du bist ein scharfsinniger Marktbeobachter und Branchenexperte. Formuliere eine wertschätzende Einleitung (genau 2-3 Sätze) für ein Anschreiben an das Unternehmen {company.name}.
REGEL: Nenne NICHT das Produkt "{product_context}". Fokussiere dich NUR auf die Herausforderung.
AUSGABE: NUR den fertigen Satz.
DEINE PERSONA:
Ein anerkennender Branchenkenner, der eine scharfsinnige Beobachtung teilt. Dein Ton ist wertschätzend, professionell und absolut NICHT verkäuferisch.
STRATEGISCHER HINTERGRUND (Nicht nennen!):
Dieses Unternehmen wird kontaktiert, weil sein Geschäftsmodell perfekt zu folgendem Bereich passt: "{product_name}" ({product_desc}).
Ziel des Schreibens ist es, die Branchen-Herausforderungen "{relevant_pains}" zu adressieren und die Mehrwerte "{relevant_gains}" zu ermöglichen.
DEINE AUFGABE:
1. Firmenname kürzen: Kürze "{company.name}" sinnvoll (meist erste zwei Worte). Entferne UNBEDINGT Rechtsformen wie GmbH, AG, gGmbH, e.V. etc.
2. Struktur: Genau 2 bis 3 flüssige Sätze.
3. Inhalt:
- Satz 1: Eine wertschätzende Beobachtung zum Geschäftsmodell oder einem aktuellen Fokus des Unternehmens (siehe Analyse-Dossier).
- Satz 2-3: Leite elegant zu einer spezifischen operativen Herausforderung über, die für das Unternehmen aufgrund seiner Größe oder Branche relevant ist (orientiere dich an "{relevant_pains}").
4. STRENGES VERBOT: Nenne KEIN Produkt ("{product_name}") und biete KEINE "Lösungen", "Hilfe" oder "Zusammenarbeit" an. Der Text soll eine reine Beobachtung bleiben.
5. KEINE Anrede (kein "Sehr geehrte Damen und Herren", kein "Hallo").
KONTEXT (Analyse-Dossier):
{context_text}
BEISPIEL-STIL:
"Das Kreiskrankenhaus Weilburg leistet einen beeindruckenden Beitrag zur regionalen Patientenversorgung. Bei der lückenlosen Dokumentation und den strengen Hygienevorgaben im Klinikalltag stelle ich mir jedoch die Aufrechterhaltung höchster Standards bei gleichzeitigem Kostendruck als enorme operative Herausforderung vor."
AUSGABE: Nur der fertige Text.
"""
try:
response = call_gemini_flash(prompt)
@@ -220,8 +280,49 @@ AUSGABE: NUR den fertigen Satz.
logger.error(f"Opener Error: {e}")
return None
def _sync_company_address_data(self, db: Session, company: Company):
"""Extracts address and VAT data from website scrape if available."""
from ..database import EnrichmentData
enrichment = db.query(EnrichmentData).filter_by(
company_id=company.id, source_type="website_scrape"
).order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "impressum" in enrichment.content:
imp = enrichment.content["impressum"]
if imp and isinstance(imp, dict):
changed = False
# City
if imp.get("city") and not company.city:
company.city = imp.get("city")
changed = True
# Street
if imp.get("street") and not company.street:
company.street = imp.get("street")
changed = True
# Zip / PLZ
zip_val = imp.get("zip") or imp.get("plz")
if zip_val and not company.zip_code:
company.zip_code = zip_val
changed = True
# Country
if imp.get("country_code") and (not company.country or company.country == "DE"):
company.country = imp.get("country_code")
changed = True
# VAT ID
if imp.get("vat_id") and not company.crm_vat:
company.crm_vat = imp.get("vat_id")
changed = True
if changed:
db.commit()
logger.info(f"Updated Address/VAT from Impressum for {company.name}: City={company.city}, VAT={company.crm_vat}")
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
# Ensure metadata is synced from scrape
self._sync_company_address_data(db, company)
industries = self._load_industry_definitions(db)
website_content, _ = self._get_website_content_and_url(db, company)
if not website_content or len(website_content) < 100:
@@ -259,8 +360,12 @@ AUSGABE: NUR den fertigen Satz.
company.metric_confidence = 0.8
company.metric_confidence_reason = "Metric processed."
company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_content, "primary")
company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_content, "secondary")
# NEW: Two-Step approach with summarization
website_summary = self._summarize_website_for_opener(company.name, website_content)
company.research_dossier = website_summary
company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_summary, "primary")
company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_summary, "secondary")
company.last_classification_at = datetime.utcnow()
company.status = "ENRICHED"
db.commit()

View File

@@ -107,13 +107,64 @@ Der Connector ist der Bote, der diese Daten in das CRM bringt.
2. Sync-Skript laufen lassen: `python3 backend/scripts/sync_notion_industries.py`.
3. Matrix neu berechnen: `python3 backend/scripts/generate_matrix.py --live`.
### Prompt-Tuning
Die Prompts für Matrix und Opener liegen in:
* Matrix: `backend/scripts/generate_matrix.py`
* Opener: `backend/services/classification.py` (oder `enrichment.py`)
### End-to-End Tests
Ein automatisierter Integrationstest (`tests/test_e2e_flow.py`) deckt den gesamten Zyklus ab:
1. **Company Creation:** Webhook -> CE Provisioning -> Write-back (Vertical).
2. **Person Creation:** Webhook -> CE Matrix Lookup -> Write-back (Texte).
3. **Vertical Change:** Änderung im CRM -> CE Update -> Cascade zu Personen -> Neue Texte.
Ausführen mittels:
```bash
python3 connector-superoffice/tests/test_e2e_flow.py
```
## 7. Troubleshooting & Known Issues
### Authentication "URL has an invalid label"
Tritt auf, wenn `SO_ENVIRONMENT` leer ist. Der Client fällt nun automatisch auf `sod` zurück.
### Pydantic V2 Compatibility
Die `config.py` wurde auf natives Python (`os.getenv`) umgestellt, um Konflikte mit `pydantic-settings` in Docker-Containern zu vermeiden.
### Address & VAT Sync (WIP)
Der Worker wurde erweitert, um auch `City` und `OrgNumber` (VAT) zurückzuschreiben.
**Status (21.02.2026):** Implementiert, aber noch im Feinschliff. Logs zeigen teils Re-Queueing während das Enrichment läuft.
### 8. Lessons Learned: Address & VAT Sync (Solved Feb 22, 2026)
Die Synchronisation von Adressdaten stellte sich als unerwartet komplex heraus. Hier die wichtigsten Erkenntnisse für zukünftige Entwickler:
1. **Das "OrgNumber"-Phantom:**
* **Problem:** In der API-Dokumentation oft als `OrgNumber` referenziert, akzeptiert die WebAPI (REST) strikt nur **`OrgNr`**.
* **Lösung:** Mapping im `worker.py` hart auf `OrgNr` umgestellt.
2. **Verschachtelte Adress-Struktur:**
* **Problem:** Ein flaches Update auf `PostalAddress` wird von der API stillschweigend ignoriert (HTTP 200, aber keine Änderung).
* **Lösung:** Das Update muss die tiefe Struktur respektieren: `Address["Postal"]["City"]` UND `Address["Street"]["City"]`. Beide müssen explizit gesetzt werden, um in der UI sichtbar zu sein.
3. **Die "Race Condition" Falle (Atomic Updates):**
* **Problem:** Wenn Adress-Daten (`PUT Contact`) und UDF-Daten (`PUT Contact/Udef`) in separaten API-Aufrufen kurz hintereinander gesendet werden, gewinnt der letzte Call. Da dieser oft auf einem *veralteten* `GET`-Stand basiert (bevor das erste Update durch war), wurde die Adresse wieder mit "Leer" überschrieben.
* **Lösung:** **Atomic Update Strategy**. Der Worker sammelt *alle* Änderungen (Adresse, VAT, Vertical, Openers) in einem einzigen Dictionary und sendet genau **einen** `PUT`-Request an den Kontakt-Endpunkt. Dies garantiert Konsistenz.
---
### 9. Lessons Learned: Appointment Simulation & Persona Matching
Die Simulation von E-Mails via Terminen (Appointments) erforderte Workarounds für das UI-Verhalten von SuperOffice.
1. **Header vs. Description (Die 42-Zeichen-Grenze):**
* SuperOffice nutzt im Kalender und in Listen den `MainHeader` als Titel. Dieser ist auf ca. 42 Zeichen begrenzt.
* Ist der Titel länger, schneidet SuperOffice ihn ab. Erscheint der Titel inkonsistent, "stiehlt" das UI oft die erste Zeile der `Description` als Titel-Ersatz.
* **Strategie:** Wir kürzen den `MainHeader` auf 40 Zeichen und stellen sicher, dass der **vollständige Betreff** als allererste Zeile in der `Description` steht. Danach folgen zwei Newlines. Damit landet der Betreff im UI-Header und die Anrede ("Hallo...") bleibt sicher im Textkörper.
2. **Mapping-Resilienz (Funktion vs. Titel):**
* Jobtitel (Funktion) landen in SuperOffice inkonsistent in den Feldern `JobTitle` oder `Title`.
* **Lösung:** Der Worker fragt nun beide Felder ab (`person.get("JobTitle") or person.get("Title")`), um die Rolle korrekt zuzuweisen.
3. **Rollen-Dynamik:**
* Um zu verhindern, dass alte Rollen (z.B. "Infrastruktur") nach einer Beförderung/Änderung in SuperOffice "kleben" bleiben, führt das System nun bei jeder Namens- oder Funktionsänderung einen **Rollen-Reset** durch.
## Appendix: The "First Sentence" Prompt
This is the core logic used to generate the company-specific opener.
**Goal:** Prove understanding of the business model + imply the pain (positive observation).

View File

@@ -1,44 +1,39 @@
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# --- Infrastructure ---
# Internal Docker URL for Company Explorer
COMPANY_EXPLORER_URL: str = "http://company-explorer:8000"
# --- SuperOffice API Credentials ---
SO_ENVIRONMENT: str = "sod" # 'sod' or 'online'
SO_CLIENT_ID: str = ""
SO_CLIENT_SECRET: str = ""
SO_REFRESH_TOKEN: str = ""
SO_REDIRECT_URI: str = "http://localhost"
SO_CONTEXT_IDENTIFIER: str = "Cust55774" # e.g. Cust12345
# --- Feature Flags ---
ENABLE_WEBSITE_SYNC: bool = False # Disabled by default to prevent loops
# --- Mappings (IDs from SuperOffice) ---
# Vertical IDs (List Items)
# Default values match the current hardcoded DEV IDs
# Format: "Name In Explorer": ID_In_SuperOffice
VERTICAL_MAP_JSON: str = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24, "Infrastructure - Transport": 25, "Leisure - Indoor Active": 26}'
class Settings:
def __init__(self):
# --- Infrastructure ---
# Internal Docker URL for Company Explorer
self.COMPANY_EXPLORER_URL = os.getenv("COMPANY_EXPLORER_URL", "http://company-explorer:8000")
# --- SuperOffice API Credentials ---
# Fallback for empty string in env var
env_val = os.getenv("SO_ENVIRONMENT")
self.SO_ENVIRONMENT = env_val if env_val else "sod"
self.SO_CLIENT_ID = os.getenv("SO_CLIENT_ID", "")
self.SO_CLIENT_SECRET = os.getenv("SO_CLIENT_SECRET", "")
self.SO_REFRESH_TOKEN = os.getenv("SO_REFRESH_TOKEN", "")
self.SO_REDIRECT_URI = os.getenv("SO_REDIRECT_URI", "http://localhost")
self.SO_CONTEXT_IDENTIFIER = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774") # e.g. Cust12345
# --- Feature Flags ---
self.ENABLE_WEBSITE_SYNC = os.getenv("ENABLE_WEBSITE_SYNC", "False").lower() in ("true", "1", "t")
# --- Mappings (IDs from SuperOffice) ---
# Vertical IDs (List Items)
self.VERTICAL_MAP_JSON = os.getenv("VERTICAL_MAP_JSON", '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24, "Infrastructure - Transport": 25, "Leisure - Indoor Active": 26}')
# Persona / Job Role IDs (List Items for "Position" field)
# To be filled after discovery
PERSONA_MAP_JSON: str = '{}'
# Persona / Job Role IDs (List Items for "Position" field)
self.PERSONA_MAP_JSON = os.getenv("PERSONA_MAP_JSON", '{}')
# User Defined Fields (ProgIDs)
# The technical names of the fields in SuperOffice
# Default values match the current hardcoded DEV UDFs
UDF_SUBJECT: str = "SuperOffice:5"
UDF_INTRO: str = "SuperOffice:6"
UDF_SOCIAL_PROOF: str = "SuperOffice:7"
UDF_VERTICAL: str = "SuperOffice:5" # NOTE: Currently same as Subject in dev? Need to verify. worker.py had 'SuperOffice:5' for vertical AND 'SuperOffice:5' for subject in the map?
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "ignore" # Ignore extra fields in .env
# User Defined Fields (ProgIDs)
self.UDF_SUBJECT = os.getenv("UDF_SUBJECT", "SuperOffice:5")
self.UDF_INTRO = os.getenv("UDF_INTRO", "SuperOffice:6")
self.UDF_SOCIAL_PROOF = os.getenv("UDF_SOCIAL_PROOF", "SuperOffice:7")
self.UDF_VERTICAL = os.getenv("UDF_VERTICAL", "SuperOffice:5")
self.UDF_OPENER = os.getenv("UDF_OPENER", "SuperOffice:6")
self.UDF_OPENER_SECONDARY = os.getenv("UDF_OPENER_SECONDARY", "SuperOffice:7")
# Global instance
settings = Settings()
settings = Settings()

View File

@@ -0,0 +1,18 @@
import os
import json
import sys
# Setup Paths
connector_dir = "/app/connector-superoffice"
sys.path.append(connector_dir)
from superoffice_client import SuperOfficeClient
def debug_person(person_id):
so_client = SuperOfficeClient()
person = so_client.get_person(person_id)
print("--- FULL PERSON DATA ---")
print(json.dumps(person, indent=2))
if __name__ == "__main__":
debug_person(4)

View File

@@ -104,3 +104,24 @@ class JobQueue:
cursor = conn.cursor()
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
return dict(cursor.fetchall())
def get_recent_jobs(self, limit=50):
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, event_type, status, created_at, updated_at, error_msg, payload
FROM jobs
ORDER BY updated_at DESC, created_at DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
try:
r['payload'] = json.loads(r['payload'])
except:
pass
results.append(r)
return results

View File

@@ -19,6 +19,8 @@ class SuperOfficeClient:
self.env = settings.SO_ENVIRONMENT
self.cust_id = settings.SO_CONTEXT_IDENTIFIER
logger.info(f"DEBUG CONFIG: Env='{self.env}', CustID='{self.cust_id}', ClientID='{self.client_id[:4]}...'")
if not all([self.client_id, self.client_secret, self.refresh_token]):
# Graceful failure: Log error but allow init (for help/docs/discovery scripts)
logger.error("❌ SuperOffice credentials missing in .env file (or environment variables).")
@@ -57,7 +59,8 @@ class SuperOfficeClient:
resp.raise_for_status()
return resp.json().get("access_token")
except requests.exceptions.HTTPError as e:
logger.error(f"❌ Token Refresh Error: {e.response.text}")
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
logger.debug(f"Response Headers: {e.response.headers}")
return None
except Exception as e:
logger.error(f"❌ Connection Error during token refresh: {e}")
@@ -71,7 +74,8 @@ class SuperOfficeClient:
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
logger.error(f"❌ API GET Error for {endpoint}: {e.response.text}")
logger.error(f"❌ API GET Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
logger.debug(f"Response Headers: {e.response.headers}")
return None
def _put(self, endpoint, payload):
@@ -153,17 +157,26 @@ class SuperOfficeClient:
import datetime
now = datetime.datetime.utcnow().isoformat() + "Z"
# SuperOffice UI limit: 42 chars.
# We put exactly this in the FIRST line of the description.
short_title = (subject[:39] + '...') if len(subject) > 42 else subject
# SuperOffice often 'steals' the first line of the description for the list view header.
# So we give it exactly the subject it wants, then two newlines for the real body.
formatted_description = f"{short_title}\n\n{description}"
payload = {
"Description": f"{subject}\n\n{description}",
"Description": formatted_description,
"Contact": {"ContactId": contact_id},
"StartDate": now,
"EndDate": now,
"Task": {"Id": 1} # Usually 'Follow-up' or similar, depending on SO config
"MainHeader": short_title,
"Task": {"Id": 1}
}
if person_id:
payload["Person"] = {"PersonId": person_id}
print(f"Creating new appointment: {subject}...")
logger.info(f"Creating new appointment: {short_title}...")
return self._post("Appointment", payload)
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):

View File

@@ -0,0 +1,308 @@
import unittest
import os
import sys
import json
import logging
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Setup Paths
current_dir = os.path.dirname(os.path.abspath(__file__))
ce_backend_dir = os.path.abspath(os.path.join(current_dir, "../../company-explorer"))
connector_dir = os.path.abspath(os.path.join(current_dir, ".."))
sys.path.append(ce_backend_dir)
sys.path.append(connector_dir)
# Import CE App & DB
# Note: backend.app needs to be importable. If backend is a package.
try:
from backend.app import app, get_db
from backend.database import Base, Industry, Persona, MarketingMatrix, JobRoleMapping, Company, Contact, init_db
except ImportError:
# Try alternate import if running from root
sys.path.append(os.path.abspath("company-explorer"))
from backend.app import app, get_db
from backend.database import Base, Industry, Persona, MarketingMatrix, JobRoleMapping, Company, Contact, init_db
# Import Worker Logic
from worker import process_job
# Setup Test DB
TEST_DB_FILE = "/tmp/test_company_explorer.db"
if os.path.exists(TEST_DB_FILE):
os.remove(TEST_DB_FILE)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{TEST_DB_FILE}"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Override get_db dependency
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
# Mock SuperOffice Client
class MockSuperOfficeClient:
def __init__(self):
self.access_token = "mock_token"
self.contacts = {} # id -> data
self.persons = {} # id -> data
def get_contact(self, contact_id):
return self.contacts.get(int(contact_id))
def get_person(self, person_id):
return self.persons.get(int(person_id))
def update_entity_udfs(self, entity_id, entity_type, udfs):
target = self.contacts if entity_type == "Contact" else self.persons
if int(entity_id) in target:
if "UserDefinedFields" not in target[int(entity_id)]:
target[int(entity_id)]["UserDefinedFields"] = {}
target[int(entity_id)]["UserDefinedFields"].update(udfs)
return True
return False
def update_person_position(self, person_id, position_id):
if int(person_id) in self.persons:
self.persons[int(person_id)]["PositionId"] = position_id
return True
return False
def create_appointment(self, subject, description, contact_id, person_id=None):
if not hasattr(self, 'appointments'):
self.appointments = []
self.appointments.append({
"Subject": subject,
"Description": description,
"ContactId": contact_id,
"PersonId": person_id
})
return True
def search(self, query):
if "contact/contactId eq" in query:
contact_id = int(query.split("eq")[1].strip())
results = []
for pid, p in self.persons.items():
if p.get("ContactId") == contact_id:
results.append({"PersonId": pid, "FirstName": p.get("FirstName")})
return results
return []
def _put(self, endpoint, data):
if endpoint.startswith("Contact/"):
cid = int(endpoint.split("/")[1])
if cid in self.contacts:
self.contacts[cid] = data
return True
return False
class TestE2EFlow(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Set Auth Env Vars
os.environ["API_USER"] = "admin"
os.environ["API_PASSWORD"] = "gemini"
# Create Tables
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
# SEED DATA
# Industry 1
ind1 = Industry(name="Logistics - Warehouse", status_notion="Active")
db.add(ind1)
# Industry 2 (For Change Test)
ind2 = Industry(name="Healthcare - Hospital", status_notion="Active")
db.add(ind2)
db.commit()
pers = Persona(name="Operativer Entscheider")
db.add(pers)
db.commit()
# Matrix 1
matrix1 = MarketingMatrix(
industry_id=ind1.id,
persona_id=pers.id,
subject="TEST SUBJECT LOGISTICS",
intro="TEST BRIDGE LOGISTICS",
social_proof="TEST PROOF LOGISTICS"
)
db.add(matrix1)
# Matrix 2
matrix2 = MarketingMatrix(
industry_id=ind2.id,
persona_id=pers.id,
subject="TEST SUBJECT HEALTH",
intro="TEST BRIDGE HEALTH",
social_proof="TEST PROOF HEALTH"
)
db.add(matrix2)
mapping = JobRoleMapping(pattern="%Head of Operations%", role="Operativer Entscheider")
db.add(mapping)
db.commit()
db.close()
cls.ce_client = TestClient(app)
def setUp(self):
self.mock_so_client = MockSuperOfficeClient()
self.mock_so_client.contacts[100] = {
"ContactId": 100,
"Name": "Test Company GmbH",
"UrlAddress": "old-site.com",
"UserDefinedFields": {}
}
self.mock_so_client.persons[500] = {
"PersonId": 500,
"ContactId": 100,
"FirstName": "Hans",
"JobTitle": "Head of Operations",
"UserDefinedFields": {}
}
def mock_post_side_effect(self, url, json=None, auth=None):
if "/api/" in url:
path = "/api/" + url.split("/api/")[1]
else:
path = url
response = self.ce_client.post(path, json=json, auth=auth)
class MockReqResponse:
def __init__(self, resp):
self.status_code = resp.status_code
self._json = resp.json()
def json(self): return self._json
def raise_for_status(self):
if self.status_code >= 400: raise Exception(f"HTTP {self.status_code}: {self._json}")
return MockReqResponse(response)
@patch("worker.JobQueue")
@patch("worker.requests.post")
@patch("worker.settings")
def test_full_roundtrip_with_vertical_change(self, mock_settings, mock_post, MockJobQueue):
mock_post.side_effect = self.mock_post_side_effect
# Mock JobQueue instance
mock_queue_instance = MockJobQueue.return_value
# Config Mocks
mock_settings.COMPANY_EXPLORER_URL = "http://localhost:8000"
mock_settings.UDF_VERTICAL = "SuperOffice:Vertical"
mock_settings.UDF_SUBJECT = "SuperOffice:Subject"
mock_settings.UDF_INTRO = "SuperOffice:Intro"
mock_settings.UDF_SOCIAL_PROOF = "SuperOffice:SocialProof"
mock_settings.UDF_OPENER = "SuperOffice:Opener"
mock_settings.UDF_OPENER_SECONDARY = "SuperOffice:OpenerSecondary"
mock_settings.VERTICAL_MAP_JSON = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24}'
mock_settings.PERSONA_MAP_JSON = '{"Operativer Entscheider": 99}'
mock_settings.ENABLE_WEBSITE_SYNC = True
# --- Step 1: Company Created (Logistics) ---
print("[TEST] Step 1: Create Company...")
job = {"id": "job1", "event_type": "contact.created", "payload": {"Event": "contact.created", "PrimaryKey": 100, "Changes": ["Name"]}}
process_job(job, self.mock_so_client) # RETRY
# Simulate Enrichment (Logistics)
db = TestingSessionLocal()
company = db.query(Company).filter(Company.crm_id == "100").first()
company.status = "ENRICHED"
company.industry_ai = "Logistics - Warehouse"
company.city = "Koeln"
company.crm_vat = "DE813016729"
company.ai_opener = "Positive observation about Silly Billy"
company.ai_opener_secondary = "Secondary observation"
db.commit()
db.close()
process_job(job, self.mock_so_client) # SUCCESS
# Verify Contact Updates (Standard Fields & UDFs)
contact = self.mock_so_client.contacts[100]
self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Vertical"], "23")
self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Opener"], "Positive observation about Silly Billy")
self.assertEqual(contact["UserDefinedFields"]["SuperOffice:OpenerSecondary"], "Secondary observation")
self.assertEqual(contact.get("PostalAddress", {}).get("City"), "Koeln")
self.assertEqual(contact.get("OrgNumber"), "DE813016729")
# --- Step 2: Person Created (Get Logistics Texts) ---
print("[TEST] Step 2: Create Person...")
job_p = {"id": "job2", "event_type": "person.created", "payload": {"Event": "person.created", "PersonId": 500, "ContactId": 100, "JobTitle": "Head of Operations"}}
process_job(job_p, self.mock_so_client)
udfs = self.mock_so_client.persons[500]["UserDefinedFields"]
self.assertEqual(udfs["SuperOffice:Subject"], "TEST SUBJECT LOGISTICS")
# Verify Appointment (Simulation)
self.assertTrue(len(self.mock_so_client.appointments) > 0)
appt = self.mock_so_client.appointments[0]
self.assertIn("✉️ Entwurf: TEST SUBJECT LOGISTICS", appt["Subject"])
self.assertIn("TEST BRIDGE LOGISTICS", appt["Description"])
print(f"[TEST] Appointment created: {appt['Subject']}")
# --- Step 3: Vertical Change in SO (To Healthcare) ---
print("[TEST] Step 3: Change Vertical in SO...")
# Update Mock SO Data
self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] = "24" # Healthcare
# Simulate Webhook
job_change = {
"id": "job3",
"event_type": "contact.changed",
"payload": {
"Event": "contact.changed",
"PrimaryKey": 100,
"Changes": ["UserDefinedFields"] # Or specific UDF key if passed
}
}
process_job(job_change, self.mock_so_client)
# Verify CE Database Updated
db = TestingSessionLocal()
company = db.query(Company).filter(Company.crm_id == "100").first()
print(f"[TEST] Updated Company Industry in DB: {company.industry_ai}")
self.assertEqual(company.industry_ai, "Healthcare - Hospital")
db.close()
# Verify Cascade Triggered
# Expect JobQueue.add_job called for Person 500
# args: "person.changed", payload
mock_queue_instance.add_job.assert_called()
call_args = mock_queue_instance.add_job.call_args
print(f"[TEST] Cascade Job Added: {call_args}")
self.assertEqual(call_args[0][0], "person.changed")
self.assertEqual(call_args[0][1]["PersonId"], 500)
# --- Step 4: Process Cascade Job (Get Healthcare Texts) ---
print("[TEST] Step 4: Process Cascade Job...")
job_cascade = {"id": "job4", "event_type": "person.changed", "payload": call_args[0][1]}
process_job(job_cascade, self.mock_so_client)
udfs_new = self.mock_so_client.persons[500]["UserDefinedFields"]
print(f"[TEST] New UDFs: {udfs_new}")
self.assertEqual(udfs_new["SuperOffice:Subject"], "TEST SUBJECT HEALTH")
self.assertEqual(udfs_new["SuperOffice:Intro"], "TEST BRIDGE HEALTH")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,56 @@
from superoffice_client import SuperOfficeClient
import json
import logging
# Setup minimal logging
logging.basicConfig(level=logging.ERROR)
def verify_contact(contact_id):
print(f"🔍 Verifying REAL SuperOffice Data for Contact {contact_id}...")
client = SuperOfficeClient()
if not client.access_token:
print("❌ Auth failed.")
return
contact = client.get_contact(contact_id)
if not contact:
print("❌ Contact not found.")
return
# 1. Standard Fields
print("\n--- Standard Fields ---")
print(f"Name: {contact.get('Name')}")
print(f"OrgNr: {contact.get('OrgNr')}") # Changed from OrgNumber
addr = contact.get("Address", {}) # Changed from PostalAddress
print(f"Raw Address JSON: {json.dumps(addr, indent=2)}")
if addr:
postal = addr.get("Postal", {})
street = addr.get("Street", {})
print(f"Postal City: {postal.get('City')}")
print(f"Street City: {street.get('City')}")
else:
print("Address: (Empty)")
print("Address: (Empty)")
# 2. UDFs
print("\n--- User Defined Fields (UDFs) ---")
udfs = contact.get("UserDefinedFields", {})
if not udfs:
print("(No UDFs found)")
else:
for k, v in udfs.items():
# Filter relevant UDFs if possible, or show all
if "SuperOffice:" in k:
# Try to decode value if it's a list item like [I:26]
val_str = str(v)
print(f"{k}: {val_str}")
if __name__ == "__main__":
import sys
c_id = 6
if len(sys.argv) > 1:
c_id = int(sys.argv[1])
verify_contact(c_id)

View File

@@ -1,4 +1,5 @@
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse
import logging
import os
import json
@@ -51,6 +52,104 @@ def health():
def stats():
return queue.get_stats()
@app.get("/api/jobs")
def get_jobs():
return queue.get_recent_jobs(limit=100)
@app.get("/dashboard", response_class=HTMLResponse)
def dashboard():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Connector Dashboard</title>
<meta http-equiv="refresh" content="5">
<style>
body { font-family: sans-serif; padding: 20px; background: #f0f2f5; }
.container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
h1 { color: #333; }
table { width: 100%; border-collapse: collapse; margin-top: 20px; }
th, td { text-align: left; padding: 12px; border-bottom: 1px solid #ddd; font-size: 14px; }
th { background-color: #f8f9fa; color: #666; font-weight: 600; }
tr:hover { background-color: #f8f9fa; }
.status { padding: 4px 8px; border-radius: 4px; font-size: 12px; font-weight: bold; text-transform: uppercase; }
.status-PENDING { background: #e2e8f0; color: #475569; }
.status-PROCESSING { background: #dbeafe; color: #1e40af; }
.status-COMPLETED { background: #dcfce7; color: #166534; }
.status-FAILED { background: #fee2e2; color: #991b1b; }
.status-RETRY { background: #fef9c3; color: #854d0e; }
.meta { color: #888; font-size: 12px; }
pre { margin: 0; white-space: pre-wrap; word-break: break-word; color: #444; font-family: monospace; font-size: 11px; max-height: 60px; overflow-y: auto; }
</style>
</head>
<body>
<div class="container">
<div style="display: flex; justify-content: space-between; align-items: center;">
<h1>🔌 SuperOffice Connector Dashboard</h1>
<div id="stats"></div>
</div>
<table>
<thead>
<tr>
<th width="50">ID</th>
<th width="120">Status</th>
<th width="150">Updated</th>
<th width="150">Event</th>
<th>Payload / Error</th>
</tr>
</thead>
<tbody id="job-table">
<tr><td colspan="5" style="text-align:center;">Loading...</td></tr>
</tbody>
</table>
</div>
<script>
async function loadData() {
try {
// Use relative path to work behind Nginx /connector/ prefix
const response = await fetch('api/jobs');
const jobs = await response.json();
const tbody = document.getElementById('job-table');
tbody.innerHTML = '';
if (jobs.length === 0) {
tbody.innerHTML = '<tr><td colspan="5" style="text-align:center;">No jobs found</td></tr>';
return;
}
jobs.forEach(job => {
const tr = document.createElement('tr');
let details = JSON.stringify(job.payload, null, 2);
if (job.error_msg) {
details += "\\n\\n🔴 ERROR: " + job.error_msg;
}
tr.innerHTML = `
<td>#${job.id}</td>
<td><span class="status status-${job.status}">${job.status}</span></td>
<td>${new Date(job.updated_at + "Z").toLocaleTimeString()}</td>
<td>${job.event_type}</td>
<td><pre>${details}</pre></td>
`;
tbody.appendChild(tr);
});
} catch (e) {
console.error("Failed to load jobs", e);
}
}
loadData();
// Also handled by meta refresh, but JS refresh is smoother if we want to remove meta refresh
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content, status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run("webhook_app:app", host="0.0.0.0", port=8000, reload=True)

View File

@@ -25,40 +25,10 @@ def process_job(job, so_client: SuperOfficeClient):
payload = job['payload']
event_low = job['event_type'].lower()
# 0. Fast-Fail on Irrelevant Events (Noise Reduction)
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
return "SUCCESS"
# 0b. Fast-Fail on Irrelevant Field Changes
# Only if 'Changes' list is provided by Webhook
changes = [c.lower() for c in payload.get("Changes", [])]
if changes:
# Define what we care about (Strategic triggers for re-evaluation)
# Company: Name/Department (Identity), Urls (Source), Numbers (Matching)
relevant_contact = ["name", "department", "urladdress", "number1", "number2"]
# Person: JobTitle (Persona Logic), Position (Role Logic)
relevant_person = ["jobtitle", "position"]
is_relevant = False
if "contact" in event_low:
if any(f in changes for f in relevant_contact):
is_relevant = True
elif "urls" in changes: # Website might be in Urls collection
is_relevant = True
if "person" in event_low:
if any(f in changes for f in relevant_person):
is_relevant = True
if not is_relevant:
logger.info(f"Skipping irrelevant changes: {changes}")
return "SUCCESS"
# 1. Extract IDs from Webhook Payload
# 1. Extract IDs Early (Crucial for logging and logic)
person_id = None
contact_id = None
job_title = payload.get("JobTitle")
if "PersonId" in payload:
person_id = int(payload["PersonId"])
@@ -70,19 +40,59 @@ def process_job(job, so_client: SuperOfficeClient):
elif "PrimaryKey" in payload and "contact" in event_low:
contact_id = int(payload["PrimaryKey"])
# Fallback/Deep Lookup
if not contact_id and person_id:
person_data = so_client.get_person(person_id)
if person_data and "Contact" in person_data:
contact_id = person_data["Contact"].get("ContactId")
# Fallback/Deep Lookup & Fetch JobTitle if missing
if person_id:
try:
person_details = so_client.get_person(person_id)
if person_details:
if not job_title:
job_title = person_details.get("JobTitle") or person_details.get("Title")
if not contact_id and "Contact" in person_details:
contact_id = person_details["Contact"].get("ContactId")
except Exception as e:
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
# 2. Noise Reduction Logic
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
return "SUCCESS"
changes = [c.lower() for c in payload.get("Changes", [])]
if changes:
relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"]
if settings.UDF_VERTICAL:
relevant_contact.append(settings.UDF_VERTICAL.lower())
relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"]
technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"]
actual_changes = [c for c in changes if c not in technical_fields]
is_relevant = False
if "contact" in event_low:
logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}")
if any(f in actual_changes for f in relevant_contact):
is_relevant = True
elif "urls" in actual_changes:
is_relevant = True
if "person" in event_low:
logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}")
if any(f in actual_changes for f in relevant_person):
is_relevant = True
if not is_relevant:
logger.info(f"Skipping technical/irrelevant changes: {changes}")
return "SUCCESS"
else:
logger.info("Change is deemed RELEVANT. Proceeding...")
if not contact_id:
raise ValueError(f"Could not identify ContactId in payload: {payload}")
logger.info(f"Target: Person {person_id}, Contact {contact_id}")
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
# --- Cascading Logic ---
# If a company changes, we want to update all its persons eventually.
if "contact" in event_low and not person_id:
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
try:
@@ -100,6 +110,9 @@ def process_job(job, so_client: SuperOfficeClient):
# 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync)
crm_name = None
crm_website = None
crm_industry_name = None
contact_details = None
try:
contact_details = so_client.get_contact(contact_id)
if contact_details:
@@ -107,20 +120,39 @@ def process_job(job, so_client: SuperOfficeClient):
crm_website = contact_details.get("UrlAddress")
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
crm_website = contact_details["Urls"][0].get("Value")
if settings.UDF_VERTICAL:
udfs = contact_details.get("UserDefinedFields", {})
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
if so_vertical_val:
val_str = str(so_vertical_val)
if val_str.startswith("[I:"):
val_str = val_str.split(":")[1].strip("]")
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
if val_str in vertical_map_rev:
crm_industry_name = vertical_map_rev[val_str]
logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}")
except Exception as ex:
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
except Exception as e:
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
# 2. Call Company Explorer Provisioning API
# --- 3. Company Explorer Provisioning ---
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
"job_title": payload.get("JobTitle"),
"job_title": job_title,
"crm_name": crm_name,
"crm_website": crm_website
"crm_website": crm_website,
"crm_industry_name": crm_industry_name
}
# Simple Basic Auth for internal API
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
@@ -141,112 +173,123 @@ def process_job(job, so_client: SuperOfficeClient):
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
# 2b. Sync Vertical to SuperOffice (Company Level)
vertical_name = provisioning_data.get("vertical_name")
# Fetch fresh Contact Data for comparison
try:
contact_data = so_client.get_contact(contact_id)
if not contact_data:
logger.error(f"Contact {contact_id} not found in SuperOffice.")
return "FAILED"
except Exception as e:
logger.error(f"Failed to fetch contact {contact_id}: {e}")
return "RETRY"
dirty_standard = False
dirty_udfs = False
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
# --- A. Vertical Sync ---
vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
except:
vertical_map = {}
logger.error("Failed to parse VERTICAL_MAP_JSON from config.")
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
try:
# Check current value to avoid loops
current_contact = so_client.get_contact(contact_id)
current_udfs = current_contact.get("UserDefinedFields", {})
# Use Config UDF key
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
udf_key = settings.UDF_VERTICAL
current_val = current_udfs.get(udf_key, "")
# Normalize SO list ID format (e.g., "[I:26]" -> "26")
if current_val and current_val.startswith("[I:"):
current_val = current_val.split(":")[1].strip("]")
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
if current_val and str(current_val).startswith("[I:"):
current_val = str(current_val).split(":")[1].strip("]")
if str(current_val) != str(vertical_id):
logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)})
else:
logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
except Exception as e:
logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}")
else:
logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.")
logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}")
contact_data["UserDefinedFields"][udf_key] = str(vertical_id)
dirty_udfs = True
except Exception as e:
logger.error(f"Vertical sync error: {e}")
# 2c. Sync Website (Company Level)
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
# Re-enable via config if needed
if settings.ENABLE_WEBSITE_SYNC:
website = provisioning_data.get("website")
if website and website != "k.A.":
try:
contact_data = so_client.get_contact(contact_id)
current_url = contact_data.get("UrlAddress", "")
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
if norm(current_url) != norm(website):
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
# Update Urls collection (Rank 1)
new_urls = []
if "Urls" in contact_data:
found = False
for u in contact_data["Urls"]:
if u.get("Rank") == 1:
u["Value"] = website
found = True
new_urls.append(u)
if not found:
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
contact_data["Urls"] = new_urls
else:
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
if not current_url:
contact_data["UrlAddress"] = website
# --- B. Address & VAT Sync ---
ce_city = provisioning_data.get("address_city")
ce_street = provisioning_data.get("address_street")
ce_zip = provisioning_data.get("address_zip")
ce_vat = provisioning_data.get("vat_id")
if ce_city or ce_street or ce_zip:
if "Address" not in contact_data or contact_data["Address"] is None:
contact_data["Address"] = {"Street": {}, "Postal": {}}
addr_obj = contact_data["Address"]
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
so_client._put(f"Contact/{contact_id}", contact_data)
else:
logger.info(f"Website for Contact {contact_id} already in sync.")
except Exception as e:
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
def update_addr_field(field_name, new_val, log_name):
nonlocal dirty_standard
if new_val:
for type_key in ["Postal", "Street"]:
cur = addr_obj[type_key].get(field_name, "")
if cur != new_val:
logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'")
addr_obj[type_key][field_name] = new_val
dirty_standard = True
update_addr_field("City", ce_city, "City")
update_addr_field("Address1", ce_street, "Street")
update_addr_field("Zipcode", ce_zip, "Zip")
if ce_vat:
current_vat = contact_data.get("OrgNr", "")
if current_vat != ce_vat:
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
contact_data["OrgNr"] = ce_vat
dirty_standard = True
# --- C. AI Openers Sync ---
ce_opener = provisioning_data.get("opener")
ce_opener_secondary = provisioning_data.get("opener_secondary")
if ce_opener and ce_opener != "null":
current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "")
if current_opener != ce_opener:
logger.info("Change detected: Primary Opener")
contact_data["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
dirty_udfs = True
if ce_opener_secondary and ce_opener_secondary != "null":
current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "")
if current_opener_sec != ce_opener_secondary:
logger.info("Change detected: Secondary Opener")
contact_data["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
dirty_udfs = True
# --- D. Apply Updates (Single Transaction) ---
if dirty_standard or dirty_udfs:
logger.info(f"Pushing combined updates for Contact {contact_id}...")
try:
so_client._put(f"Contact/{contact_id}", contact_data)
logger.info("✅ Contact Update Successful.")
except Exception as e:
logger.error(f"Failed to update Contact {contact_id}: {e}")
raise
# 2d. Sync Person Position (Role) - if Person exists
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
except:
persona_map = {}
logger.error("Failed to parse PERSONA_MAP_JSON from config.")
position_id = persona_map.get(role_name)
if position_id:
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
try:
success = so_client.update_person_position(person_id, int(position_id))
if not success:
logger.warning(f"Failed to update position for Person {person_id}")
except Exception as e:
logger.error(f"Error syncing position for Person {person_id}: {e}")
else:
logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.")
position_id = persona_map.get(role_name)
if position_id:
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
so_client.update_person_position(person_id, int(position_id))
except Exception as e:
logger.error(f"Error syncing position for Person {person_id}: {e}")
# 3. Update SuperOffice Texts (Only if person_id is present)
if not person_id:
logger.info("Sync complete (Company only). No texts to write back.")
logger.info("Sync complete (Company only).")
return "SUCCESS"
texts = provisioning_data.get("texts", {})
if not any(texts.values()):
logger.info("No texts returned from Matrix (yet). Skipping write-back.")
logger.info("No texts returned from Matrix yet.")
return "SUCCESS"
udf_update = {}
@@ -255,7 +298,6 @@ def process_job(job, so_client: SuperOfficeClient):
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if udf_update:
# Loop Prevention
try:
current_person = so_client.get_person(person_id)
current_udfs = current_person.get("UserDefinedFields", {})
@@ -265,16 +307,61 @@ def process_job(job, so_client: SuperOfficeClient):
needs_update = True
break
if needs_update:
logger.info(f"Applying update to Person {person_id} (Changes detected).")
success = so_client.update_entity_udfs(person_id, "Person", udf_update)
if not success:
raise Exception("Failed to update SuperOffice UDFs")
# Simulation Trigger: Either texts changed, OR it's a direct manual trigger
if needs_update or (person_id and not "Source" in payload):
if needs_update:
logger.info(f"Applying text update to Person {person_id}.")
so_client.update_entity_udfs(person_id, "Person", udf_update)
else:
logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.")
# --- 4. Create Email Simulation Appointment ---
try:
opener = provisioning_data.get("opener", "")
intro = texts.get("intro", "")
proof = texts.get("social_proof", "")
subject = texts.get("subject", "No Subject")
salutation = "Hallo"
p_data = so_client.get_person(person_id)
if p_data:
fname = p_data.get("Firstname", "")
lname = p_data.get("Lastname", "")
if fname or lname:
salutation = f"Hallo {fname} {lname}".strip()
cta = (
"H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n"
"Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)"
)
email_body = (
f"{salutation},\n\n"
f"{opener.strip()}\n\n"
f"{intro.strip()}\n\n"
f"{cta.strip()}\n\n"
f"{proof.strip()}\n\n"
"(Generated via Gemini Marketing Engine)"
)
from datetime import datetime
now_str = datetime.now().strftime("%H:%M")
appt_title = f"[{now_str}] KI: {subject}"
so_client.create_appointment(
subject=appt_title,
description=email_body,
contact_id=contact_id,
person_id=person_id
)
except Exception as e:
logger.error(f"Failed to create email simulation appointment: {e}")
else:
logger.info(f"Skipping update for Person {person_id}: Values match (Loop Prevention).")
logger.info(f"Skipping update for Person {person_id}: Values match.")
except Exception as e:
logger.error(f"Error during pre-update check: {e}")
logger.error(f"Error during Person update: {e}")
raise
logger.info("Job successfully processed.")
@@ -282,20 +369,16 @@ def process_job(job, so_client: SuperOfficeClient):
def run_worker():
queue = JobQueue()
# Initialize SO Client with retry
so_client = None
while not so_client:
try:
so_client = SuperOfficeClient()
if not so_client.access_token: # Check if auth worked
raise Exception("Auth failed")
if not so_client.access_token: raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
time.sleep(30)
logger.info("Worker started. Polling queue...")
while True:
try:
job = queue.get_next_job()
@@ -311,10 +394,9 @@ def run_worker():
queue.fail_job(job['id'], str(e))
else:
time.sleep(POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker loop error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
run_worker()
run_worker()

49
debug_connector_status.py Normal file
View File

@@ -0,0 +1,49 @@
import sqlite3
import json
import os
DB_PATH = "connector_queue.db"
def inspect_queue():
if not os.path.exists(DB_PATH):
print(f"❌ Database not found at {DB_PATH}")
return
print(f"🔍 Inspecting Queue: {DB_PATH}")
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get stats
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
stats = dict(cursor.fetchall())
print(f"\n📊 Stats: {stats}")
# Get recent jobs
print("\n📝 Last 10 Jobs:")
cursor.execute("SELECT id, event_type, status, error_msg, updated_at, payload FROM jobs ORDER BY updated_at DESC LIMIT 10")
rows = cursor.fetchall()
for row in rows:
payload = json.loads(row['payload'])
# Try to identify entity
entity = "Unknown"
if "PrimaryKey" in payload: entity = f"ID {payload['PrimaryKey']}"
if "ContactId" in payload: entity = f"Contact {payload['ContactId']}"
print(f" - Job #{row['id']} [{row['status']}] {row['event_type']} ({entity})")
print(f" Updated: {row['updated_at']}")
if row['error_msg']:
print(f" ❌ ERROR: {row['error_msg']}")
# Print payload details relevant to syncing
if row['status'] == 'COMPLETED':
pass # Maybe less interesting if success, but user says it didn't sync
conn.close()
except Exception as e:
print(f"❌ Error reading DB: {e}")
if __name__ == "__main__":
inspect_queue()

41
fix_benni_data.py Normal file
View File

@@ -0,0 +1,41 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import json
# Setup DB
DB_PATH = "sqlite:///companies_v3_fixed_2.db"
engine = create_engine(DB_PATH)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Company(Base):
__tablename__ = "companies"
id = Column(Integer, primary_key=True)
street = Column(String)
zip_code = Column(String)
def fix_benni():
company_id = 33
print(f"🔧 Fixing Address for Company ID {company_id}...")
company = session.query(Company).filter_by(id=company_id).first()
if not company:
print("❌ Company not found.")
return
# Hardcoded from previous check_benni.py output to be safe/fast
# "street": "Eriagstraße 58", "zip": "85053"
company.street = "Eriagstraße 58"
company.zip_code = "85053"
session.commit()
print(f"✅ Database updated: Street='{company.street}', Zip='{company.zip_code}'")
if __name__ == "__main__":
fix_benni()

23
fix_mappings_v2.py Normal file
View File

@@ -0,0 +1,23 @@
import sqlite3
def fix_mappings():
conn = sqlite3.connect('/app/companies_v3_fixed_2.db')
cursor = conn.cursor()
# Neue Mappings für Geschäftsleitung und Verallgemeinerung
new_rules = [
('%leitung%', 'Wirtschaftlicher Entscheider'),
('%vorstand%', 'Wirtschaftlicher Entscheider'),
('%geschäftsleitung%', 'Wirtschaftlicher Entscheider'),
('%management%', 'Wirtschaftlicher Entscheider')
]
for pattern, role in new_rules:
cursor.execute("INSERT OR REPLACE INTO job_role_mappings (pattern, role, created_at) VALUES (?, ?, '2026-02-22T15:30:00')", (pattern, role))
conn.commit()
conn.close()
print("Mappings updated for Geschäftsleitung, Vorstand, Management.")
if __name__ == "__main__":
fix_mappings()

90
fix_silly_billy_data.py Normal file
View File

@@ -0,0 +1,90 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import json
import logging
# Setup DB
DB_PATH = "sqlite:///companies_v3_fixed_2.db"
engine = create_engine(DB_PATH)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# Import Models (Simplified for script)
from sqlalchemy import Column, Integer, String, Text, JSON
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Company(Base):
__tablename__ = "companies"
id = Column(Integer, primary_key=True)
name = Column(String)
city = Column(String)
country = Column(String)
crm_vat = Column(String)
street = Column(String)
zip_code = Column(String)
class EnrichmentData(Base):
__tablename__ = "enrichment_data"
id = Column(Integer, primary_key=True)
company_id = Column(Integer)
source_type = Column(String)
content = Column(JSON)
def fix_data():
company_id = 32
print(f"🔧 Fixing Data for Company ID {company_id}...")
company = session.query(Company).filter_by(id=company_id).first()
if not company:
print("❌ Company not found.")
return
enrichment = session.query(EnrichmentData).filter_by(
company_id=company_id, source_type="website_scrape"
).first()
if enrichment and enrichment.content:
imp = enrichment.content.get("impressum")
if imp:
print(f"📄 Found Impressum: {imp}")
changed = False
if imp.get("city"):
company.city = imp.get("city")
changed = True
print(f" -> Set City: {company.city}")
if imp.get("vat_id"):
company.crm_vat = imp.get("vat_id")
changed = True
print(f" -> Set VAT: {company.crm_vat}")
if imp.get("country_code"):
company.country = imp.get("country_code")
changed = True
print(f" -> Set Country: {company.country}")
if imp.get("street"):
company.street = imp.get("street")
changed = True
print(f" -> Set Street: {company.street}")
if imp.get("zip"):
company.zip_code = imp.get("zip")
changed = True
print(f" -> Set Zip: {company.zip_code}")
if changed:
session.commit()
print("✅ Database updated.")
else:
print(" No changes needed.")
else:
print("⚠️ No impressum data in enrichment.")
else:
print("⚠️ No enrichment data found.")
if __name__ == "__main__":
fix_data()

30
list_all_companies.py Normal file
View File

@@ -0,0 +1,30 @@
import sqlite3
import os
DB_PATH = "companies_v3_fixed_2.db"
def list_companies():
if not os.path.exists(DB_PATH):
print(f"❌ Database not found at {DB_PATH}")
return
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
print(f"🔍 Listing companies in {DB_PATH}...")
cursor.execute("SELECT id, name, crm_id, city, crm_vat FROM companies ORDER BY id DESC LIMIT 20")
rows = cursor.fetchall()
if not rows:
print("❌ No companies found")
else:
for row in rows:
print(f" ID: {row[0]} | Name: {row[1]} | CRM ID: {row[2]} | City: {row[3]} | VAT: {row[4]}")
conn.close()
except Exception as e:
print(f"❌ Error reading DB: {e}")
if __name__ == "__main__":
list_companies()

View File

@@ -168,12 +168,19 @@ http {
}
location /connector/ {
# SuperOffice Connector Webhook
# Disable Basic Auth for Webhooks as SO cannot provide it easily
# SuperOffice Connector Webhook & Dashboard
# Auth enabled for dashboard access (webhook endpoint might need exclusion if public,
# but current webhook_app checks token param so maybe basic auth is fine for /dashboard?)
# For now, let's keep it open or use token.
# Ideally: /connector/webhook -> open, /connector/dashboard -> protected.
# Nginx doesn't support nested locations well for auth_basic override without duplicating.
# Simplified: Auth off globally for /connector/, rely on App logic or obscurity for now.
auth_basic off;
# Forward to FastAPI app
# Trailing Slash STRIPS the /connector/ prefix!
# So /connector/dashboard -> /dashboard
proxy_pass http://connector-superoffice:8000/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;

View File

@@ -21,6 +21,21 @@ gitea: none
---
# Projekt: Automatisierte Unternehmensbewertung & Lead-Generierung v2.2.1
## 📑 Projekt-Übersicht (Readmes)
| Modul / Projekt | Verzeichnis | Beschreibung |
| :--- | :--- | :--- |
| **SuperOffice Connector** | [📂 `./connector-superoffice`](./connector-superoffice/README.md) | GTM Engine & SuperOffice CRM Integration ("The Muscle"). |
| **B2B Marketing Assistant** | [📂 `./b2b-marketing-assistant`](./b2b-marketing-assistant/README.md) | KI-gestützter Assistent für B2B-Marketing-Strategien. |
| **Content Engine** | [📂 `./content-engine`](./content-engine/README.md) | Dashboard zur Generierung von SEO- & Sales-Content ("The Mouth"). |
| **GTM Architect** | [📂 `./gtm-architect`](./gtm-architect/README.md) | Strategie-Entwicklung und Architektur für Go-to-Market ("The Brain"). |
| **Market Intelligence** | [📂 `./general-market-intelligence`](./general-market-intelligence/README.md) | Analyse von Markt- und Unternehmensdaten. |
| **Competitor Analysis** | [📂 `./competitor-analysis-app`](./competitor-analysis-app/README.md) | Agent zur detaillierten Wettbewerbsanalyse. |
| **Heatmap Tool** | [📂 `./heatmap-tool`](./heatmap-tool/README.md) | Visualisierung von Excel-Daten auf einer PLZ-Karte. |
| **K-Pop Thumbnail Genie** | [📂 `./k-pop-thumbnail-genie`](./k-pop-thumbnail-genie/README.md) | Spezialisierter Generator für K-Pop Thumbnails. |
---
## Current Status (Feb 20, 2026) - SuperOffice Integration Ready (v2.0)
### 1. SuperOffice Connector v2.0 ("The Muscle")
@@ -978,3 +993,37 @@ Als eine zukünftige, sehr wertvolle Erweiterung ist die detaillierte, automatis
* Der technische Aufwand für ein robustes System, das Karriereseiten findet, die verschiedenen Job-Portale parst und die relevanten Informationen extrahiert, ist immens.
* **Status:**
* Diese Erweiterung wird für eine spätere Entwicklungsphase vorgemerkt und sollte aufgrund der Komplexität in einem klar abgegrenzten, überschaubaren Rahmen umgesetzt werden.
----
## 14. Test-Übersicht & Qualitätssicherung
Um die Stabilität und Korrektheit der automatisierten Prozesse zu gewährleisten, verfügt das Projekt über eine Reihe von automatisierten Tests, die in verschiedene Kategorien unterteilt sind.
### A. SuperOffice Connector Tests
Diese Tests befinden sich im Ordner `connector-superoffice/tests/` und validieren die Kommunikation zwischen SuperOffice und dem Company Explorer.
* **`test_e2e_flow.py`**: Der umfassendste Integrationstest. Er simuliert den gesamten Datenzyklus:
1. Anlage eines Accounts in SuperOffice (Webhook-Simulation).
2. Provisionierung im Company Explorer (CE).
3. Automatisches Zurückschreiben der Branche (Vertical) nach SuperOffice.
4. Anlage einer Person & Generierung personalisierter Marketing-Texte basierend auf Rolle und Branche.
5. Simulation einer manuellen Branchenänderung im CRM inkl. Kaskaden-Update für alle zugehörigen Personen.
* **`test_client.py`**: Verifiziert die grundlegende Funktionalität des SuperOffice API-Clients (Authentifizierung, Token-Refresh, einfache GET/POST/PUT Operationen).
### B. Company Explorer Backend Tests
Diese Tests befinden sich in `company-explorer/backend/tests/` und prüfen die internen Logik-Komponenten.
* **`test_metric_parser.py`**: **Kritische Regressions-Tests** für die numerische Extraktion (deutsche Lokalisierung). Prüft komplexe Fälle wie:
* Tausender-Trennzeichen vs. Dezimalpunkte.
* Verkettete Zahlen und Jahre (z.B. "802020").
* Ignorieren von Jahreszahlen als Kennzahlen.
* **`test_classification_service.py`**: Validiert die KI-basierte Einstufung von Unternehmen in Branchen-Verticals und die Bewertung des Automatisierungspotenzials.
* **`test_opener_logic.py`**: Prüft die Generierung der personalisierten Einleitungssätze (Openers) basierend auf Website-Scrapes und Branchen-Pains.
### C. Infrastruktur & API Tests
Allgemeine Tests im Hauptverzeichnis zur Absicherung der Schnittstellen.
* **`test_opener_api.py`**: Testet spezifisch den `/api/provision/superoffice-contact` Endpunkt des Company Explorers.
* **`test_core_functionality.py`**: Basis-Checks für die System-Integrität (Datenbank-Verbindung, API-Health).

12
test_provisioning_api.py Normal file
View File

@@ -0,0 +1,12 @@
import requests
import json
url = "http://company-explorer:8000/api/provision/superoffice-contact"
payload = {"so_contact_id": 4}
auth = ("admin", "gemini")
try:
resp = requests.post(url, json=payload, auth=auth)
print(json.dumps(resp.json(), indent=2))
except Exception as e:
print(f"Error: {e}")

25
trigger_resync.py Normal file
View File

@@ -0,0 +1,25 @@
import sqlite3
import json
import time
DB_PATH = "connector_queue.db"
def trigger_resync(contact_id):
print(f"🚀 Triggering manual resync for Contact {contact_id}...")
payload = {
"Event": "contact.changed",
"PrimaryKey": contact_id,
"ContactId": contact_id,
"Changes": ["UserDefinedFields", "Name"] # Dummy changes to pass filters
}
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
("contact.changed", json.dumps(payload), 'PENDING')
)
print("✅ Job added to queue.")
if __name__ == "__main__":
trigger_resync(6) # Bennis Playland has CRM ID 6