10 Commits

Author SHA1 Message Date
e4d738990a [2ff88f42] Update GEMINI.md with Notion Tooling Documentation 2026-02-20 13:28:04 +00:00
101f67936a [2ff88f42] einfügen
einfügen
2026-02-20 13:25:21 +00:00
653bd79e1f [2ff88f42] Finalize Verticals Pains/Gains in Notion & Update Docs
Updated all Notion Verticals with sharpened Pains/Gains based on internal strategy (Ops vs Infra focus). Updated SuperOffice Connector README to reflect the 'Static Magic' architecture.
2026-02-20 13:24:13 +00:00
46f650a350 Merge branch 'main' of https://floke-gitea.duckdns.org/Floke/Brancheneinstufung2 2026-02-20 13:20:06 +01:00
e792ebcb39 [2ff88f42] einfügen
einfügen
2026-02-20 10:56:26 +00:00
027d114d08 [2ff88f42] Finalize SuperOffice Integration: Enhanced Persona model with Influencer role, switched Matrix Generator to Gemini, implemented Noise Reduction for Webhooks, and added E2E test scenarios. 2026-02-20 10:55:57 +00:00
405a7656f6 [2ff88f42] Refine Webhook Filter: Restrict to Strategic Fields (Name, URL, JobTitle, Position) only 2026-02-20 09:36:34 +00:00
f400bed368 [2ff88f42] Implement Webhook Noise Reduction: Filter irrelevant events (Sale, Project) and minor field changes 2026-02-20 08:15:19 +00:00
510112b238 [2ff88f42] Finalize SuperOffice Connector: Centralized Config, Added Position/Role Mapping Logic, and Discovery Tools 2026-02-20 07:20:26 +00:00
a54d8b9c4e [2ff88f42] multiplikation vorbereitet
multiplikation vorbereitet
2026-02-19 20:59:04 +00:00
35 changed files with 2786 additions and 453 deletions

View File

@@ -1 +1 @@
{"task_id": "2ff88f42-8544-8000-8314-c9013414d1d0", "token": "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", "session_start_time": "2026-02-19T16:06:04.236614"}
{"task_id": "2ff88f42-8544-8018-883f-e8837c0421af", "token": "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", "session_start_time": "2026-02-20T13:24:58.251700"}

View File

@@ -122,6 +122,29 @@ To ensure the stability and accuracy of the metric extraction logic, a dedicated
These tests are crucial for preventing regressions as the parser logic evolves.
## Notion Maintenance & Data Sync
Since the "Golden Record" for Industry Verticals (Pains, Gains, Products) resides in Notion, specific tools are available to read and sync this data.
**Location:** `/app/company-explorer/backend/scripts/notion_maintenance/`
**Prerequisites:**
- Ensure `.env` is loaded with `NOTION_API_KEY` and correct DB IDs.
**Key Scripts:**
1. **`check_relations.py` (Reader - Deep):**
- **Purpose:** Reads Verticals and resolves linked Product Categories (Relation IDs -> Names). Essential for verifying the "Primary/Secondary Product" logic.
- **Usage:** `python3 check_relations.py`
2. **`update_notion_full.py` (Writer - Batch):**
- **Purpose:** Batch updates Pains and Gains for multiple verticals. Use this as a template when refining the messaging strategy.
- **Usage:** Edit the dictionary in the script, then run `python3 update_notion_full.py`.
3. **`list_notion_structure.py` (Schema Discovery):**
- **Purpose:** Lists all property keys and page titles. Use this to debug schema changes (e.g. if a column was renamed).
- **Usage:** `python3 list_notion_structure.py`
## Next Steps
* **Marketing Automation:** Implement the actual sending logic (or export) based on the contact status.
* **Job Role Mapping Engine:** Connect the configured patterns to the contact import/creation process to auto-assign roles.

View File

@@ -32,7 +32,7 @@ setup_logging()
import logging
logger = logging.getLogger(__name__)
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona
from .services.deduplication import Deduplicator
from .services.discovery import DiscoveryService
from .services.scraping import ScraperService
@@ -89,6 +89,7 @@ class ProvisioningRequest(BaseModel):
so_person_id: Optional[int] = None
crm_name: Optional[str] = None
crm_website: Optional[str] = None
job_title: Optional[str] = None
class ProvisioningResponse(BaseModel):
status: str
@@ -223,22 +224,18 @@ def provision_superoffice_contact(
if vertical_name and role_name:
industry_obj = db.query(Industry).filter(Industry.name == vertical_name).first()
persona_obj = db.query(Persona).filter(Persona.name == role_name).first()
if industry_obj:
# Find any mapping for this role to query the Matrix
# (Assuming Matrix is linked to *one* canonical mapping for this role string)
role_ids = [m.id for m in db.query(JobRoleMapping).filter(JobRoleMapping.role == role_name).all()]
if industry_obj and persona_obj:
matrix_entry = db.query(MarketingMatrix).filter(
MarketingMatrix.industry_id == industry_obj.id,
MarketingMatrix.persona_id == persona_obj.id
).first()
if role_ids:
matrix_entry = db.query(MarketingMatrix).filter(
MarketingMatrix.industry_id == industry_obj.id,
MarketingMatrix.role_id.in_(role_ids)
).first()
if matrix_entry:
texts["subject"] = matrix_entry.subject
texts["intro"] = matrix_entry.intro
texts["social_proof"] = matrix_entry.social_proof
if matrix_entry:
texts["subject"] = matrix_entry.subject
texts["intro"] = matrix_entry.intro
texts["social_proof"] = matrix_entry.social_proof
return ProvisioningResponse(
status="success",
@@ -457,6 +454,22 @@ def list_industries(db: Session = Depends(get_db), username: str = Depends(authe
def list_job_roles(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(JobRoleMapping).order_by(JobRoleMapping.pattern.asc()).all()
@app.get("/api/job_roles/raw")
def list_raw_job_titles(
limit: int = 100,
unmapped_only: bool = True,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Returns unique raw job titles from CRM imports, prioritized by frequency.
"""
query = db.query(RawJobTitle)
if unmapped_only:
query = query.filter(RawJobTitle.is_mapped == False)
return query.order_by(RawJobTitle.count.desc()).limit(limit).all()
@app.get("/api/mistakes")
def list_reported_mistakes(
status: Optional[str] = Query(None),

View File

@@ -150,7 +150,7 @@ class Industry(Base):
created_at = Column(DateTime, default=datetime.utcnow)
class JobRoleMapping(Base):
class JobRoleMapping(BaseModel):
"""
Maps job title patterns (regex or simple string) to Roles.
"""
@@ -162,6 +162,41 @@ class JobRoleMapping(Base):
created_at = Column(DateTime, default=datetime.utcnow)
class RawJobTitle(BaseModel):
"""
Stores raw unique job titles imported from CRM to assist in pattern mining.
Tracks frequency to prioritize high-impact patterns.
"""
__tablename__ = "raw_job_titles"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, unique=True, index=True) # The raw string, e.g. "Senior Sales Mgr."
count = Column(Integer, default=1) # How often this title appears in the CRM
source = Column(String, default="import")
# Status Flags
is_mapped = Column(Boolean, default=False) # True if a pattern currently covers this title
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Persona(BaseModel):
"""
Represents a generalized persona/role (e.g. 'Geschäftsführer', 'IT-Leiter')
independent of the specific job title pattern.
Stores the strategic messaging components.
"""
__tablename__ = "personas"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True) # Matches the 'role' string in JobRoleMapping
pains = Column(Text, nullable=True) # JSON list or multiline string
gains = Column(Text, nullable=True) # JSON list or multiline string
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Signal(Base):
"""
@@ -254,8 +289,8 @@ class ReportedMistake(Base):
class MarketingMatrix(Base):
"""
Stores the static marketing texts for Industry x Role combinations.
Source: Notion (synced).
Stores the static marketing texts for Industry x Persona combinations.
Source: Generated via AI.
"""
__tablename__ = "marketing_matrix"
@@ -263,7 +298,7 @@ class MarketingMatrix(Base):
# The combination keys
industry_id = Column(Integer, ForeignKey("industries.id"), nullable=False)
role_id = Column(Integer, ForeignKey("job_role_mappings.id"), nullable=False)
persona_id = Column(Integer, ForeignKey("personas.id"), nullable=False)
# The Content
subject = Column(Text, nullable=True)
@@ -273,7 +308,7 @@ class MarketingMatrix(Base):
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
industry = relationship("Industry")
role = relationship("JobRoleMapping")
persona = relationship("Persona")
# ==============================================================================
@@ -329,4 +364,4 @@ def get_db():
try:
yield db
finally:
db.close()
db.close()

View File

@@ -0,0 +1,22 @@
import sys
import os
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, JobRoleMapping
def check_mappings():
db = SessionLocal()
count = db.query(JobRoleMapping).count()
print(f"Total JobRoleMappings: {count}")
examples = db.query(JobRoleMapping).limit(5).all()
for ex in examples:
print(f" - {ex.pattern} -> {ex.role}")
db.close()
if __name__ == "__main__":
check_mappings()

View File

@@ -0,0 +1,187 @@
import sys
import os
import json
import argparse
from typing import List
import google.generativeai as genai
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, Industry, Persona, MarketingMatrix
from backend.config import settings
# --- Configuration ---
MODEL_NAME = "gemini-1.5-pro-latest" # High quality copy
def generate_prompt(industry: Industry, persona: Persona) -> str:
"""
Builds the prompt for the AI to generate the marketing texts.
Combines Industry context with Persona specific pains/gains.
"""
# Safely load JSON lists
try:
persona_pains = json.loads(persona.pains) if persona.pains else []
persona_gains = json.loads(persona.gains) if persona.gains else []
except:
persona_pains = [persona.pains] if persona.pains else []
persona_gains = [persona.gains] if persona.gains else []
industry_pains = industry.pains if industry.pains else "Allgemeine Effizienzprobleme"
prompt = f"""
Du bist ein erfahrener B2B-Copywriter für Robotik-Lösungen (Reinigung, Transport, Service).
Ziel: Erstelle personalisierte E-Mail-Textbausteine für einen Outreach.
--- KONTEXT ---
ZIELBRANCHE: {industry.name}
BRANCHEN-KONTEXT: {industry.description or 'Keine spezifische Beschreibung'}
BRANCHEN-PAINS: {industry_pains}
ZIELPERSON (ARCHETYP): {persona.name}
PERSÖNLICHE PAINS (Herausforderungen):
{chr(10).join(['- ' + p for p in persona_pains])}
GEWÜNSCHTE GAINS (Ziele):
{chr(10).join(['- ' + g for g in persona_gains])}
--- AUFGABE ---
Erstelle ein JSON-Objekt mit genau 3 Textbausteinen.
Tonalität: Professionell, lösungsorientiert, auf den Punkt. Keine Marketing-Floskeln ("Game Changer").
1. "subject": Betreffzeile (Max 6 Wörter). Muss neugierig machen und einen Pain adressieren.
2. "intro": Einleitungssatz (1-2 Sätze). Verbinde die Branchen-Herausforderung mit der persönlichen Rolle des Empfängers. Zeige Verständnis für seine Situation.
3. "social_proof": Ein Satz, der Vertrauen aufbaut. Nenne generische Erfolge (z.B. "Unternehmen in der {industry.name} senken so ihre Kosten um 15%"), da wir noch keine spezifischen Logos nennen dürfen.
--- FORMAT ---
Respond ONLY with a valid JSON object. Do not add markdown formatting like ```json ... ```.
Format:
{{
"subject": "...",
"intro": "...",
"social_proof": "..."
}}
"""
return prompt
def mock_call(prompt: str):
"""Simulates an API call for dry runs."""
print(f"\n--- [MOCK] GENERATING PROMPT ---\n{prompt[:300]}...\n--------------------------------")
return {
"subject": "[MOCK] Effizienzsteigerung in der Produktion",
"intro": "[MOCK] Als Produktionsleiter wissen Sie, wie teuer Stillstand ist. Unsere Roboter helfen.",
"social_proof": "[MOCK] Ähnliche Betriebe sparten 20% Kosten."
}
def real_gemini_call(prompt: str):
if not settings.GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY not set in config/env")
genai.configure(api_key=settings.GEMINI_API_KEY)
# Configure Model
generation_config = {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 64,
"max_output_tokens": 1024,
"response_mime_type": "application/json",
}
model = genai.GenerativeModel(
model_name=MODEL_NAME,
generation_config=generation_config,
)
response = model.generate_content(prompt)
try:
# Clean response if necessary (Gemini usually returns clean JSON with mime_type set, but safety first)
text = response.text.strip()
if text.startswith("```json"):
text = text[7:-3].strip()
elif text.startswith("```"):
text = text[3:-3].strip()
return json.loads(text)
except Exception as e:
print(f"JSON Parse Error: {e}. Raw Response: {response.text}")
raise
def run_matrix_generation(dry_run: bool = True, force: bool = False):
db = SessionLocal()
try:
industries = db.query(Industry).all()
personas = db.query(Persona).all()
print(f"Found {len(industries)} Industries and {len(personas)} Personas.")
print(f"Mode: {'DRY RUN (No API calls, no DB writes)' if dry_run else 'LIVE - GEMINI GENERATION'}")
total_combinations = len(industries) * len(personas)
processed = 0
for ind in industries:
for pers in personas:
processed += 1
print(f"[{processed}/{total_combinations}] Check: {ind.name} x {pers.name}")
# Check existing
existing = db.query(MarketingMatrix).filter(
MarketingMatrix.industry_id == ind.id,
MarketingMatrix.persona_id == pers.id
).first()
if existing and not force:
print(f" -> Skipped (Already exists)")
continue
# Generate
prompt = generate_prompt(ind, pers)
if dry_run:
result = mock_call(prompt)
else:
try:
result = real_gemini_call(prompt)
# Basic Validation
if not result.get("subject") or not result.get("intro"):
print(" -> Invalid result structure. Skipping.")
continue
except Exception as e:
print(f" -> API ERROR: {e}")
continue
# Write to DB (only if not dry run)
if not dry_run:
if not existing:
new_entry = MarketingMatrix(
industry_id=ind.id,
persona_id=pers.id,
subject=result.get("subject"),
intro=result.get("intro"),
social_proof=result.get("social_proof")
)
db.add(new_entry)
print(f" -> Created new entry.")
else:
existing.subject = result.get("subject")
existing.intro = result.get("intro")
existing.social_proof = result.get("social_proof")
print(f" -> Updated entry.")
db.commit()
except Exception as e:
print(f"Error: {e}")
finally:
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--live", action="store_true", help="Actually call Gemini and write to DB")
parser.add_argument("--force", action="store_true", help="Overwrite existing matrix entries")
args = parser.parse_args()
run_matrix_generation(dry_run=not args.live, force=args.force)

View File

@@ -0,0 +1,95 @@
import sys
import os
import csv
import argparse
from datetime import datetime
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, RawJobTitle, init_db, engine, Base
def import_titles(file_path: str, delimiter: str = ';'):
print(f"🚀 Starting Import from {file_path}...")
# Ensure Table Exists
RawJobTitle.__table__.create(bind=engine, checkfirst=True)
db = SessionLocal()
total_rows = 0
new_titles = 0
updated_titles = 0
try:
with open(file_path, 'r', encoding='utf-8-sig') as f: # utf-8-sig handles BOM from Excel
# Try to detect header
sample = f.read(1024)
has_header = csv.Sniffer().has_header(sample)
f.seek(0)
reader = csv.reader(f, delimiter=delimiter)
if has_header:
headers = next(reader)
print(f" Header detected: {headers}")
# Try to find the right column index
col_idx = 0
for i, h in enumerate(headers):
if h.lower() in ['funktion', 'jobtitle', 'title', 'position', 'rolle']:
col_idx = i
print(f" -> Using column '{h}' (Index {i})")
break
else:
col_idx = 0
print(" No header detected, using first column.")
# Process Rows
for row in reader:
if not row: continue
if len(row) <= col_idx: continue
raw_title = row[col_idx].strip()
if not raw_title: continue # Skip empty
total_rows += 1
# Check existance
existing = db.query(RawJobTitle).filter(RawJobTitle.title == raw_title).first()
if existing:
existing.count += 1
existing.updated_at = datetime.utcnow()
updated_titles += 1
else:
db.add(RawJobTitle(title=raw_title, count=1))
new_titles += 1
if total_rows % 100 == 0:
db.commit()
print(f" Processed {total_rows} rows...", end='\r')
db.commit()
except Exception as e:
print(f"\n❌ Error: {e}")
db.rollback()
finally:
db.close()
print(f"\n✅ Import Complete.")
print(f" Total Processed: {total_rows}")
print(f" New Unique Titles: {new_titles}")
print(f" Updated Frequencies: {updated_titles}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Import Job Titles from CSV")
parser.add_argument("file", help="Path to CSV file")
parser.add_argument("--delimiter", default=";", help="CSV Delimiter (default: ';')")
args = parser.parse_args()
if not os.path.exists(args.file):
print(f"❌ File not found: {args.file}")
sys.exit(1)
import_titles(args.file, args.delimiter)

View File

@@ -0,0 +1,117 @@
import os
import requests
import json
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_ID = "2ec88f4285448014ab38ea664b4c2b81" # ID from the user's link
if not NOTION_API_KEY:
print("Error: NOTION_API_KEY not found in environment.")
exit(1)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
def get_vertical_data(vertical_name):
url = f"https://api.notion.com/v1/databases/{NOTION_DB_ID}/query"
payload = {
"filter": {
"property": "Vertical",
"title": {
"contains": vertical_name
}
}
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
print(f"Error fetching data for '{vertical_name}': {response.status_code} - {response.text}")
return None
results = response.json().get("results", [])
if not results:
print(f"No entry found for vertical '{vertical_name}'")
return None
# Assuming the first result is the correct one
page = results[0]
props = page["properties"]
# Extract Pains
pains_prop = props.get("Pains", {}).get("rich_text", [])
pains = pains_prop[0]["plain_text"] if pains_prop else "N/A"
# Extract Gains
gains_prop = props.get("Gains", {}).get("rich_text", [])
gains = gains_prop[0]["plain_text"] if gains_prop else "N/A"
# Extract Ops Focus (Checkbox) if available
# The property name might be "Ops. Focus: Secondary" based on user description
# Let's check keys to be sure, but user mentioned "Ops. Focus: Secondary"
# Actually, let's just dump the keys if needed, but for now try to guess
ops_focus = "Unknown"
if "Ops. Focus: Secondary" in props:
ops_focus = props["Ops. Focus: Secondary"].get("checkbox", False)
elif "Ops Focus" in props: # Fallback guess
ops_focus = props["Ops Focus"].get("checkbox", False)
# Extract Product Categories
primary_product = "N/A"
secondary_product = "N/A"
# Assuming these are Select or Multi-select fields, or Relations.
# User mentioned "Primary Product Category" and "Secondary Product Category".
if "Primary Product Category" in props:
pp_data = props["Primary Product Category"].get("select") or props["Primary Product Category"].get("multi_select")
if pp_data:
if isinstance(pp_data, list):
primary_product = ", ".join([item["name"] for item in pp_data])
else:
primary_product = pp_data["name"]
if "Secondary Product Category" in props:
sp_data = props["Secondary Product Category"].get("select") or props["Secondary Product Category"].get("multi_select")
if sp_data:
if isinstance(sp_data, list):
secondary_product = ", ".join([item["name"] for item in sp_data])
else:
secondary_product = sp_data["name"]
return {
"name": vertical_name,
"pains": pains,
"gains": gains,
"ops_focus_secondary": ops_focus,
"primary_product": primary_product,
"secondary_product": secondary_product
}
verticals_to_check = [
"Krankenhaus",
"Pflege", # Might be "Altenheim" or similar
"Hotel",
"Industrie", # Might be "Manufacturing"
"Logistik",
"Einzelhandel",
"Facility Management"
]
print("-" * 60)
for v in verticals_to_check:
data = get_vertical_data(v)
if data:
print(f"VERTICAL: {data['name']}")
print(f" Primary Product: {data['primary_product']}")
print(f" Secondary Product: {data['secondary_product']}")
print(f" Ops. Focus Secondary: {data['ops_focus_secondary']}")
print(f" PAINS: {data['pains']}")
print(f" GAINS: {data['gains']}")
print("-" * 60)

View File

@@ -0,0 +1,90 @@
import os
import requests
import json
from dotenv import load_dotenv
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_ID = "2ec88f4285448014ab38ea664b4c2b81" # Verticals DB
PRODUCT_DB_ID = "2ec88f42854480f0b154f7a07342eb58" # Product Categories DB (from user link)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 1. Fetch Product Map (ID -> Name)
product_map = {}
def fetch_products():
url = f"https://api.notion.com/v1/databases/{PRODUCT_DB_ID}/query"
response = requests.post(url, headers=headers, json={"page_size": 100})
if response.status_code == 200:
results = response.json().get("results", [])
for p in results:
p_id = p["id"]
# Name property might be "Name" or "Product Category"
props = p["properties"]
name = "Unknown"
if "Name" in props:
name = props["Name"]["title"][0]["plain_text"] if props["Name"]["title"] else "N/A"
elif "Product Category" in props:
name = props["Product Category"]["title"][0]["plain_text"] if props["Product Category"]["title"] else "N/A"
product_map[p_id] = name
# Also map the page ID itself if used in relations
else:
print(f"Error fetching products: {response.status_code}")
# 2. Check Verticals with Relation Resolution
def check_vertical_relations(search_term):
url = f"https://api.notion.com/v1/databases/{NOTION_DB_ID}/query"
payload = {
"filter": {
"property": "Vertical",
"title": {
"contains": search_term
}
}
}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code == 200:
results = resp.json().get("results", [])
if not results:
print(f"❌ No vertical found for '{search_term}'")
return
for page in results:
props = page["properties"]
title = props["Vertical"]["title"][0]["plain_text"]
# Resolve Primary
pp_ids = [r["id"] for r in props.get("Primary Product Category", {}).get("relation", [])]
pp_names = [product_map.get(pid, pid) for pid in pp_ids]
# Resolve Secondary
sp_ids = [r["id"] for r in props.get("Secondary Product", {}).get("relation", [])]
sp_names = [product_map.get(pid, pid) for pid in sp_ids]
print(f"\n🔹 VERTICAL: {title}")
print(f" Primary Product (Rel): {', '.join(pp_names)}")
print(f" Secondary Product (Rel): {', '.join(sp_names)}")
# Pains/Gains short check
pains = props.get("Pains", {}).get("rich_text", [])
print(f" Pains Length: {len(pains[0]['plain_text']) if pains else 0} chars")
else:
print(f"Error fetching vertical: {resp.status_code}")
# Run
print("Fetching Product Map...")
fetch_products()
print(f"Loaded {len(product_map)} products.")
print("\nChecking Verticals...")
targets = ["Hospital", "Hotel", "Logistics", "Manufacturing", "Retail", "Reinigungs", "Dienstleister", "Facility"]
for t in targets:
check_vertical_relations(t)

View File

@@ -0,0 +1,87 @@
import os
import requests
import json
from dotenv import load_dotenv
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_ID = "2ec88f4285448014ab38ea664b4c2b81"
if not NOTION_API_KEY:
print("Error: NOTION_API_KEY not found.")
exit(1)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
def get_vertical_details(vertical_name_contains):
url = f"https://api.notion.com/v1/databases/{NOTION_DB_ID}/query"
payload = {
"filter": {
"property": "Vertical",
"title": {
"contains": vertical_name_contains
}
}
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
print(f"Error: {response.status_code}")
return
results = response.json().get("results", [])
if not results:
print(f"❌ No entry found containing '{vertical_name_contains}'")
return
for page in results:
props = page["properties"]
# safely extract title
title_list = props.get("Vertical", {}).get("title", [])
title = title_list[0]["plain_text"] if title_list else "Unknown Title"
# Pains
pains_list = props.get("Pains", {}).get("rich_text", [])
pains = pains_list[0]["plain_text"] if pains_list else "N/A"
# Gains
gains_list = props.get("Gains", {}).get("rich_text", [])
gains = gains_list[0]["plain_text"] if gains_list else "N/A"
# Ops Focus
ops_focus = props.get("Ops Focus: Secondary", {}).get("checkbox", False)
# Products
# Primary is select
pp_select = props.get("Primary Product Category", {}).get("select")
pp = pp_select["name"] if pp_select else "N/A"
# Secondary is select
sp_select = props.get("Secondary Product", {}).get("select")
sp = sp_select["name"] if sp_select else "N/A"
print(f"\n🔹 VERTICAL: {title}")
print(f" Primary: {pp}")
print(f" Secondary: {sp}")
print(f" Ops Focus Secondary? {'✅ YES' if ops_focus else '❌ NO'}")
print(f" PAINS:\n {pains}")
print(f" GAINS:\n {gains}")
print("-" * 40)
targets = [
"Hospital",
"Hotel",
"Logistics",
"Manufacturing",
"Retail",
"Facility Management"
]
for t in targets:
get_vertical_details(t)

View File

@@ -0,0 +1,66 @@
import os
import requests
import json
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_ID = "2ec88f4285448014ab38ea664b4c2b81"
if not NOTION_API_KEY:
print("Error: NOTION_API_KEY not found in environment.")
exit(1)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
def list_pages_and_keys():
url = f"https://api.notion.com/v1/databases/{NOTION_DB_ID}/query"
payload = {
"page_size": 10 # Just list a few to see structure
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
print(f"Error fetching data: {response.status_code} - {response.text}")
return
results = response.json().get("results", [])
if not results:
print("No pages found.")
return
print(f"Found {len(results)} pages.")
# Print keys from the first page
first_page = results[0]
props = first_page["properties"]
print("\n--- Property Keys Found ---")
for key in props.keys():
print(f"- {key}")
print("\n--- Page Titles (Verticals) ---")
for page in results:
title_prop = page["properties"].get("Vertical", {}).get("title", []) # Assuming title prop is named "Vertical" based on user input
if not title_prop:
# Try finding the title property dynamically if "Vertical" is wrong
for k, v in page["properties"].items():
if v["id"] == "title":
title_prop = v["title"]
break
if title_prop:
title = title_prop[0]["plain_text"]
print(f"- {title}")
else:
print("- (No Title)")
if __name__ == "__main__":
list_pages_and_keys()

View File

@@ -0,0 +1,89 @@
import os
import requests
import json
from dotenv import load_dotenv
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_ID = "2ec88f4285448014ab38ea664b4c2b81"
if not NOTION_API_KEY:
print("Error: NOTION_API_KEY not found.")
exit(1)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# COMPLETE LIST OF UPDATES
updates = {
"Infrastructure - Transport": { # Airports, Stations
"Pains": "Sicherheitsbereiche erfordern personalintensives Screening von externen Reinigungskräften. Verschmutzte Böden (Winter/Salz) erhöhen das Rutschrisiko für Passagiere und Klagerisiken.",
"Gains": "Autonome Reinigung innerhalb der Sicherheitszonen ohne externe Personalwechsel. Permanente Trocknung von Nässe (Schneematsch) in Eingangsbereichen."
},
"Leisure - Indoor Active": { # Bowling, Cinema, Gym
"Pains": "Personal ist rar und teuer, Gäste erwarten aber Service am Platz. Reinigung im laufenden Betrieb stört den Erlebnischarakter.",
"Gains": "Service-Roboter als Event-Faktor und Entlastung: Getränke kommen zum Gast, Personal bleibt an der Bar/Theke. Konstante Sauberkeit auch bei hoher Frequenz."
},
"Leisure - Outdoor Park": { # Zoos, Theme Parks
"Pains": "Enorme Flächenleistung (Wege) erfordert viele Arbeitskräfte für die Grobschmutzbeseitigung (Laub, Müll). Sichtbare Reinigungstrupps stören die Immersion der Gäste.",
"Gains": "Autonome Großflächenreinigung (Kehren) in den frühen Morgenstunden vor Parköffnung. Erhalt der 'heilen Welt' (Immersion) für Besucher."
},
"Leisure - Wet & Spa": { # Pools, Thermen
"Pains": "Hohes Unfallrisiko durch Nässe auf Fliesen (Rutschgefahr). Hoher Aufwand für permanente Desinfektion und Trocknung im laufenden Betrieb bindet Aufsichtspersonal.",
"Gains": "Permanente Trocknung und Desinfektion kritischer Barfußbereiche. Reduktion der Rutschgefahr und Haftungsrisiken. Entlastung der Bademeister (Fokus auf Aufsicht)."
},
"Retail - Shopping Center": { # Malls
"Pains": "Food-Court ist der Schmutz-Hotspot: Verschüttete Getränke und Essensreste wirken unhygienisch und binden Personal dauerhaft. Dreckige Böden senken die Verweildauer.",
"Gains": "Sofortige Beseitigung von Malheuren im Food-Court. Steigerung der Aufenthaltsqualität und Verweildauer der Kunden durch sichtbare Sauberkeit."
},
"Retail - Non-Food": { # DIY, Furniture
"Pains": "Riesige Gangflächen verstauben schnell, Personal ist knapp und soll beraten, nicht kehren. Verschmutzte Böden wirken im Premium-Segment (Möbel) wertmindernd.",
"Gains": "Staubfreie Umgebung für angenehmes Einkaufsklima. Roboter reinigen autonom große Flächen, während Mitarbeiter für Kundenberatung verfügbar sind."
},
"Infrastructure - Public": { # Fairs, Schools
"Pains": "Extrem kurze Turnaround-Zeiten zwischen Messetagen oder Events. Hohe Nachtzuschläge für die Endreinigung der Hallengänge oder Klassenzimmer.",
"Gains": "Automatisierte Nachtreinigung der Gänge/Flure stellt die Optik für den nächsten Morgen sicher. Kalkulierbare Kosten ohne Nachtzuschlag."
},
"Hospitality - Gastronomy": { # Restaurants
"Pains": "Servicepersonal verbringt Zeit auf Laufwegen statt am Gast ('Teller-Taxi'). Personalmangel führt zu langen Wartezeiten und Umsatzverlust.",
"Gains": "Servicekräfte werden von Laufwegen befreit und haben Zeit für aktive Beratung und Verkauf (Upselling). Steigerung der Tischumschlagshäufigkeit."
}
}
def update_vertical(vertical_name, new_data):
url = f"https://api.notion.com/v1/databases/{NOTION_DB_ID}/query"
payload = {
"filter": {
"property": "Vertical",
"title": {
"contains": vertical_name
}
}
}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code != 200: return
results = resp.json().get("results", [])
if not results:
print(f"Skipping {vertical_name} (Not found)")
return
page_id = results[0]["id"]
update_url = f"https://api.notion.com/v1/pages/{page_id}"
update_payload = {
"properties": {
"Pains": {"rich_text": [{"text": {"content": new_data["Pains"]}}]},
"Gains": {"rich_text": [{"text": {"content": new_data["Gains"]}}]}
}
}
requests.patch(update_url, headers=headers, json=update_payload)
print(f"✅ Updated {vertical_name}")
print("Starting FULL Notion Update...")
for v_name, data in updates.items():
update_vertical(v_name, data)
print("Done.")

View File

@@ -0,0 +1,94 @@
import os
import requests
import json
from dotenv import load_dotenv
load_dotenv()
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
NOTION_DB_ID = "2ec88f4285448014ab38ea664b4c2b81"
if not NOTION_API_KEY:
print("Error: NOTION_API_KEY not found.")
exit(1)
headers = {
"Authorization": f"Bearer {NOTION_API_KEY}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# Define the updates with "Sharp" Pains/Gains
updates = {
"Healthcare - Hospital": {
"Pains": "Fachpflegekräfte sind bis zu 30% der Schichtzeit mit logistischen Routinetätigkeiten (Wäsche, Essen, Laborproben) gebunden ('Hände weg vom Bett'). Steigende Hygienerisiken bei gleichzeitigem Personalmangel im Reinigungsteam führen zu lückenhafter Dokumentation und Gefährdung der RKI-Konformität.",
"Gains": "Rückgewinnung von ca. 2,5h Fachkraft-Kapazität pro Schicht durch automatisierte Stationslogistik. Validierbare, RKI-konforme Reinigungsqualität rund um die Uhr, unabhängig vom Krankenstand des Reinigungsteams."
},
"Hospitality - Hotel": {
"Pains": "Enorme Fluktuation im Housekeeping gefährdet die pünktliche Zimmer-Freigabe (Check-in 15:00 Uhr). Hohe Nachtzuschläge oder fehlendes Personal verhindern, dass die Lobby und Konferenzbereiche morgens um 06:00 Uhr perfekt glänzen.",
"Gains": "Lautlose Nachtreinigung der Lobby und Flure ohne Personalzuschläge. Servicekräfte im Restaurant werden von Laufwegen ('Teller-Taxi') befreit und haben Zeit für aktives Upselling am Gast."
},
"Logistics - Warehouse": {
"Pains": "Verschmutzte Fahrwege durch Palettenabrieb und Staub gefährden die Sensorik von FTS (Fahrerlosen Transportsystemen) und erhöhen das Unfallrisiko für Flurförderzeuge. Manuelle Reinigung stört den 24/7-Betrieb und bindet Fachpersonal.",
"Gains": "Permanente Staubreduktion im laufenden Betrieb schützt empfindliche Anlagentechnik (Lichtschranken). Saubere Hallen als Visitenkarte und Sicherheitsfaktor (Rutschgefahr), ohne operative Unterbrechungen."
},
"Industry - Manufacturing": {
"Pains": "Hochbezahlte Facharbeiter unterbrechen die Wertschöpfung für unproduktive Such- und Holzeiten von Material (C-Teile). Intransparente Materialflüsse an der Linie führen zu Mikrostillständen und gefährden die Taktzeit.",
"Gains": "Just-in-Time Materialversorgung direkt an die Linie. Fachkräfte bleiben an der Maschine. Stabilisierung der Taktzeiten und OEE durch automatisierten Nachschub."
},
"Reinigungsdienstleister": { # Facility Management
"Pains": "Margendruck durch steigende Tariflöhne bei gleichzeitigem Preisdiktat der Auftraggeber. Hohe Fluktuation (>30%) führt zu ständiger Rekrutierung ('No-Show'-Quote), was Objektleiter bindet und die Qualitätskontrolle vernachlässigt.",
"Gains": "Kalkulationssicherheit durch Fixkosten statt variabler Personalkosten. Garantierte Reinigungsleistung in Objekten unabhängig vom Personalstand. Innovationsträger für Ausschreibungen."
},
"Retail - Food": { # Supermarkets
"Pains": "Reinigungskosten steigen linear zur Fläche, während Kundenfrequenz schwankt. Sichtbare Reinigungsmaschinen blockieren tagsüber Kundenwege ('Störfaktor'). Abends/Nachts schwer Personal zu finden.",
"Gains": "Unsichtbare Reinigung: Roboter fahren in Randzeiten oder weichen Kunden dynamisch aus. Konstantes Sauberkeits-Level ('Lobby-Effekt') steigert Verweildauer."
}
}
def update_vertical(vertical_name, new_data):
# 1. Find Page ID
url = f"https://api.notion.com/v1/databases/{NOTION_DB_ID}/query"
payload = {
"filter": {
"property": "Vertical",
"title": {
"contains": vertical_name
}
}
}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code != 200:
print(f"Error searching {vertical_name}: {resp.status_code}")
return
results = resp.json().get("results", [])
if not results:
print(f"Skipping {vertical_name} (Not found)")
return
page_id = results[0]["id"]
# 2. Update Page
update_url = f"https://api.notion.com/v1/pages/{page_id}"
update_payload = {
"properties": {
"Pains": {
"rich_text": [{"text": {"content": new_data["Pains"]}}]
},
"Gains": {
"rich_text": [{"text": {"content": new_data["Gains"]}}]
}
}
}
upd_resp = requests.patch(update_url, headers=headers, json=update_payload)
if upd_resp.status_code == 200:
print(f"✅ Updated {vertical_name}")
else:
print(f"❌ Failed to update {vertical_name}: {upd_resp.text}")
print("Starting Notion Update...")
for v_name, data in updates.items():
update_vertical(v_name, data)
print("Done.")

View File

@@ -0,0 +1,123 @@
import sys
import os
import json
# Setup Environment to import backend modules
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, Persona, JobRoleMapping
def seed_archetypes():
db = SessionLocal()
print("Seeding Strategic Archetypes (Pains & Gains)...")
# --- 1. The 4 Strategic Archetypes ---
# Based on user input and synthesis of previous specific roles
archetypes = [
{
"name": "Operativer Entscheider",
"pains": [
"Personelle Unterbesetzung und hohe Fluktuation führen zu Überstunden und Qualitätsmängeln.",
"Manuelle, wiederkehrende Prozesse binden wertvolle Ressourcen und senken die Effizienz.",
"Sicherstellung gleichbleibend hoher Standards (Hygiene/Service) ist bei Personalmangel kaum möglich."
],
"gains": [
"Spürbare Entlastung des Teams von Routineaufgaben (20-40%).",
"Garantierte, gleichbleibend hohe Ausführungsqualität rund um die Uhr.",
"Stabilisierung der operativen Abläufe unabhängig von kurzfristigen Personalausfällen."
]
},
{
"name": "Infrastruktur-Verantwortlicher",
"pains": [
"Integration neuer Systeme in bestehende Gebäude/IT ist oft komplex und risikobehaftet.",
"Sorge vor hohen Ausfallzeiten und aufwändiger Fehlerbehebung ohne internes Spezialwissen.",
"Unklare Wartungsaufwände und Schnittstellenprobleme (WLAN, Aufzüge, Türen)."
],
"gains": [
"Reibungslose, fachgerechte Integration in die bestehende Infrastruktur.",
"Maximale Betriebssicherheit durch proaktives Monitoring und schnelle Reaktionszeiten.",
"Volle Transparenz über Systemstatus und Wartungsbedarf."
]
},
{
"name": "Wirtschaftlicher Entscheider",
"pains": [
"Steigende operative Kosten (Personal, Material) drücken auf die Margen.",
"Unklare Amortisation (ROI) und Risiko von Fehlinvestitionen bei neuen Technologien.",
"Intransparente Folgekosten (TCO) über die Lebensdauer der Anlagen."
],
"gains": [
"Nachweisbare Senkung der operativen Kosten (10-25%).",
"Transparente und planbare Kostenstruktur (TCO) ohne versteckte Überraschungen.",
"Schneller, messbarer Return on Investment durch Effizienzsteigerung."
]
},
{
"name": "Innovations-Treiber",
"pains": [
"Verlust der Wettbewerbsfähigkeit durch veraltete Prozesse und Kundenangebote.",
"Schwierigkeit, das Unternehmen als modernes, zukunftsorientiertes Brand zu positionieren.",
"Verpasste Chancen durch fehlende Datengrundlage für Optimierungen."
],
"gains": [
"Positionierung als Innovationsführer und Steigerung der Arbeitgeberattraktivität.",
"Nutzung modernster Technologie als sichtbares Differenzierungsmerkmal.",
"Gewinnung wertvoller Daten zur kontinuierlichen Prozessoptimierung."
]
}
]
# Clear existing Personas to avoid mix-up with old granular ones
# (In production, we might want to be more careful, but here we want a clean slate for the new archetypes)
try:
db.query(Persona).delete()
db.commit()
print("Cleared old Personas.")
except Exception as e:
print(f"Warning clearing personas: {e}")
for p_data in archetypes:
print(f"Creating Archetype: {p_data['name']}")
new_persona = Persona(
name=p_data["name"],
pains=json.dumps(p_data["pains"]),
gains=json.dumps(p_data["gains"])
)
db.add(new_persona)
db.commit()
# --- 2. Update JobRoleMappings to map to Archetypes ---
# We map the patterns to the new 4 Archetypes
mapping_updates = [
# Wirtschaftlicher Entscheider
{"role": "Wirtschaftlicher Entscheider", "patterns": ["%geschäftsführer%", "%ceo%", "%director%", "%einkauf%", "%procurement%", "%finance%", "%cfo%"]},
# Operativer Entscheider
{"role": "Operativer Entscheider", "patterns": ["%housekeeping%", "%hausdame%", "%hauswirtschaft%", "%reinigung%", "%restaurant%", "%f&b%", "%werksleiter%", "%produktionsleiter%", "%lager%", "%logistik%", "%operations%", "%coo%"]},
# Infrastruktur-Verantwortlicher
{"role": "Infrastruktur-Verantwortlicher", "patterns": ["%facility%", "%technik%", "%instandhaltung%", "%it-leiter%", "%cto%", "%admin%", "%building%"]},
# Innovations-Treiber
{"role": "Innovations-Treiber", "patterns": ["%innovation%", "%digital%", "%transformation%", "%business dev%", "%marketing%"]}
]
# Clear old mappings to prevent confusion
db.query(JobRoleMapping).delete()
db.commit()
print("Cleared old JobRoleMappings.")
for group in mapping_updates:
role_name = group["role"]
for pattern in group["patterns"]:
print(f"Mapping '{pattern}' -> '{role_name}'")
db.add(JobRoleMapping(pattern=pattern, role=role_name))
db.commit()
print("Archetypes and Mappings Seeded Successfully.")
db.close()
if __name__ == "__main__":
seed_archetypes()

View File

@@ -0,0 +1,134 @@
import sys
import os
import requests
import json
import logging
# Add company-explorer to path (parent of backend)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from backend.database import SessionLocal, Persona, init_db
from backend.config import settings
# Setup Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
NOTION_TOKEN_FILE = "/app/notion_token.txt"
# Sector & Persona Master DB
PERSONAS_DB_ID = "2e288f42-8544-8113-b878-ec99c8a02a6b"
VALID_ARCHETYPES = {
"Wirtschaftlicher Entscheider",
"Operativer Entscheider",
"Infrastruktur-Verantwortlicher",
"Innovations-Treiber"
}
def load_notion_token():
try:
with open(NOTION_TOKEN_FILE, "r") as f:
return f.read().strip()
except FileNotFoundError:
logger.error(f"Notion token file not found at {NOTION_TOKEN_FILE}")
sys.exit(1)
def query_notion_db(token, db_id):
url = f"https://api.notion.com/v1/databases/{db_id}/query"
headers = {
"Authorization": f"Bearer {token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
results = []
has_more = True
next_cursor = None
while has_more:
payload = {}
if next_cursor:
payload["start_cursor"] = next_cursor
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
logger.error(f"Error querying Notion DB {db_id}: {response.text}")
break
data = response.json()
results.extend(data.get("results", []))
has_more = data.get("has_more", False)
next_cursor = data.get("next_cursor")
return results
def extract_title(prop):
if not prop: return ""
return "".join([t.get("plain_text", "") for t in prop.get("title", [])])
def extract_rich_text_to_list(prop):
"""
Extracts rich text and converts bullet points/newlines into a list of strings.
"""
if not prop: return []
full_text = "".join([t.get("plain_text", "") for t in prop.get("rich_text", [])])
# Split by newline and clean up bullets
lines = full_text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if not line: continue
if line.startswith("- "):
line = line[2:]
elif line.startswith(""):
line = line[2:]
cleaned_lines.append(line)
return cleaned_lines
def sync_personas(token, session):
logger.info("Syncing Personas from Notion...")
pages = query_notion_db(token, PERSONAS_DB_ID)
count = 0
for page in pages:
props = page.get("properties", {})
name = extract_title(props.get("Name"))
if name not in VALID_ARCHETYPES:
logger.debug(f"Skipping '{name}' (Not a target Archetype)")
continue
logger.info(f"Processing Persona: {name}")
pains_list = extract_rich_text_to_list(props.get("Pains"))
gains_list = extract_rich_text_to_list(props.get("Gains"))
# Upsert Logic
persona = session.query(Persona).filter(Persona.name == name).first()
if not persona:
persona = Persona(name=name)
session.add(persona)
logger.info(f" -> Creating new entry")
else:
logger.info(f" -> Updating existing entry")
persona.pains = json.dumps(pains_list, ensure_ascii=False)
persona.gains = json.dumps(gains_list, ensure_ascii=False)
count += 1
session.commit()
logger.info(f"Sync complete. Updated {count} personas.")
if __name__ == "__main__":
token = load_notion_token()
db = SessionLocal()
try:
sync_personas(token, db)
except Exception as e:
logger.error(f"Sync failed: {e}", exc_info=True)
finally:
db.close()

View File

@@ -7,7 +7,7 @@ import logging
# /app/backend/scripts/sync.py -> /app
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from backend.database import SessionLocal, Industry, RoboticsCategory, init_db
from backend.database import SessionLocal, Industry, RoboticsCategory, Persona, init_db
from dotenv import load_dotenv
# Try loading from .env in root if exists
@@ -76,6 +76,21 @@ def extract_number(prop):
if not prop or "number" not in prop: return None
return prop["number"]
def extract_rich_text_to_list(prop):
if not prop or "rich_text" not in prop: return []
full_text = "".join([t.get("plain_text", "") for t in prop.get("rich_text", [])])
lines = full_text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if not line: continue
if line.startswith("- "):
line = line[2:]
elif line.startswith(""):
line = line[2:]
cleaned_lines.append(line)
return cleaned_lines
def sync():
logger.info("--- Starting Enhanced Sync ---")
@@ -83,6 +98,48 @@ def sync():
init_db()
session = SessionLocal()
# --- 4. Sync Personas (NEW) ---
# Sector & Persona Master ID
PERSONAS_DB_ID = "2e288f42-8544-8113-b878-ec99c8a02a6b"
VALID_ARCHETYPES = {
"Wirtschaftlicher Entscheider",
"Operativer Entscheider",
"Infrastruktur-Verantwortlicher",
"Innovations-Treiber"
}
if PERSONAS_DB_ID:
logger.info(f"Syncing Personas from {PERSONAS_DB_ID}...")
pages = query_all(PERSONAS_DB_ID)
p_count = 0
# We assume Personas are cumulative, so we don't delete all first (safer for IDs)
# But we could if we wanted a clean slate. Upsert is better.
for page in pages:
props = page["properties"]
name = extract_title(props.get("Name"))
if name not in VALID_ARCHETYPES:
continue
import json
pains_list = extract_rich_text_to_list(props.get("Pains"))
gains_list = extract_rich_text_to_list(props.get("Gains"))
persona = session.query(Persona).filter(Persona.name == name).first()
if not persona:
persona = Persona(name=name)
session.add(persona)
persona.pains = json.dumps(pains_list, ensure_ascii=False)
persona.gains = json.dumps(gains_list, ensure_ascii=False)
p_count += 1
session.commit()
logger.info(f"✅ Synced {p_count} Personas.")
# 2. Sync Categories (Products)
cat_db_id = find_db_id("Product Categories") or find_db_id("Products")
if cat_db_id:

View File

@@ -0,0 +1,47 @@
import sys
import os
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, JobRoleMapping, Persona
def test_mapping(job_title):
db = SessionLocal()
print(f"\n--- Testing Mapping for '{job_title}' ---")
# 1. Find Role Name via JobRoleMapping
role_name = None
mappings = db.query(JobRoleMapping).all()
for m in mappings:
pattern_clean = m.pattern.replace("%", "").lower()
if pattern_clean in job_title.lower():
role_name = m.role
print(f" -> Matched Pattern: '{m.pattern}' => Role: '{role_name}'")
break
if not role_name:
print(" -> No Pattern Matched.")
return
# 2. Find Persona via Role Name
persona = db.query(Persona).filter(Persona.name == role_name).first()
if persona:
print(f" -> Found Persona ID: {persona.id} (Name: {persona.name})")
else:
print(f" -> ERROR: Persona '{role_name}' not found in DB!")
db.close()
if __name__ == "__main__":
test_titles = [
"Leiter Hauswirtschaft",
"CTO",
"Geschäftsführer",
"Head of Marketing",
"Einkaufsleiter"
]
for t in test_titles:
test_mapping(t)

View File

@@ -0,0 +1,33 @@
import sys
import os
# Add parent directory to path to allow import of backend.database
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
# Import everything to ensure metadata is populated
from backend.database import engine, Base, Company, Contact, Industry, JobRoleMapping, Persona, Signal, EnrichmentData, RoboticsCategory, ImportLog, ReportedMistake, MarketingMatrix
def migrate():
print("Migrating Database Schema...")
try:
# Hacky migration for MarketingMatrix: Drop if exists to enforce new schema
with engine.connect() as con:
print("Dropping old MarketingMatrix table to enforce schema change...")
try:
from sqlalchemy import text
con.execute(text("DROP TABLE IF EXISTS marketing_matrix"))
print("Dropped marketing_matrix.")
except Exception as e:
print(f"Could not drop marketing_matrix: {e}")
except Exception as e:
print(f"Pre-migration cleanup error: {e}")
# This creates 'personas' table AND re-creates 'marketing_matrix'
Base.metadata.create_all(bind=engine)
print("Migration complete. 'personas' table created and 'marketing_matrix' refreshed.")
if __name__ == "__main__":
migrate()

View File

@@ -32,6 +32,7 @@ export function RoboticsSettings({ isOpen, onClose, apiBase }: RoboticsSettingsP
const [roboticsCategories, setRoboticsCategories] = useState<any[]>([])
const [industries, setIndustries] = useState<any[]>([])
const [jobRoles, setJobRoles] = useState<any[]>([])
const [rawJobTitles, setRawJobTitles] = useState<any[]>([])
const [reportedMistakes, setReportedMistakes] = useState<ReportedMistake[]>([])
const [currentMistakeStatusFilter, setCurrentMistakeStatusFilter] = useState<string>("PENDING");
const [isLoading, setIsLoading] = useState(false);
@@ -39,15 +40,17 @@ export function RoboticsSettings({ isOpen, onClose, apiBase }: RoboticsSettingsP
const fetchAllData = async () => {
setIsLoading(true);
try {
const [resRobotics, resIndustries, resJobRoles, resMistakes] = await Promise.all([
const [resRobotics, resIndustries, resJobRoles, resRawTitles, resMistakes] = await Promise.all([
axios.get(`${apiBase}/robotics/categories`),
axios.get(`${apiBase}/industries`),
axios.get(`${apiBase}/job_roles`),
axios.get(`${apiBase}/job_roles/raw`),
axios.get(`${apiBase}/mistakes?status=${currentMistakeStatusFilter}`),
]);
setRoboticsCategories(resRobotics.data);
setIndustries(resIndustries.data);
setJobRoles(resJobRoles.data);
setRawJobTitles(resRawTitles.data);
setReportedMistakes(resMistakes.data.items);
} catch (e) {
console.error("Failed to fetch settings data:", e);
@@ -251,22 +254,58 @@ export function RoboticsSettings({ isOpen, onClose, apiBase }: RoboticsSettingsP
</div>
</div>
<div key="roles-content" className={clsx("space-y-4", { 'hidden': isLoading || activeTab !== 'roles' })}>
<div className="flex justify-between items-center"><h3 className="text-sm font-bold text-slate-700 dark:text-slate-300">Job Title Mapping Patterns</h3><button onClick={handleAddJobRole} className="flex items-center gap-1 px-3 py-1.5 bg-blue-600 hover:bg-blue-500 text-white text-xs font-bold rounded"><Plus className="h-3 w-3" /> ADD PATTERN</button></div>
<div className="bg-slate-50 dark:bg-slate-950 border border-slate-200 dark:border-slate-800 rounded-lg overflow-hidden">
<table className="w-full text-left text-xs">
<thead className="bg-slate-100 dark:bg-slate-900 border-b border-slate-200 dark:border-slate-800 text-slate-500 font-bold uppercase"><tr><th className="p-3">Job Title Pattern (Regex/Text)</th><th className="p-3">Mapped Role</th><th className="p-3 w-10"></th></tr></thead>
<tbody className="divide-y divide-slate-200 dark:divide-slate-800">
{jobRoles.map(role => (
<tr key={role.id} className="group">
<td className="p-2"><input className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500" defaultValue={role.pattern} /></td>
<td className="p-2"><select className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500" defaultValue={role.role}><option>Operativer Entscheider</option><option>Infrastruktur-Verantwortlicher</option><option>Wirtschaftlicher Entscheider</option><option>Innovations-Treiber</option></select></td>
<td className="p-2 text-center"><button onClick={() => handleDeleteJobRole(role.id)} className="text-slate-400 hover:text-red-500 opacity-0 group-hover:opacity-100 transition-opacity"><Trash2 className="h-4 w-4" /></button></td>
</tr>
))}
{jobRoles.length === 0 && (<tr><td colSpan={3} className="p-8 text-center text-slate-500 italic">No patterns defined yet.</td></tr>)}
</tbody>
</table>
<div key="roles-content" className={clsx("space-y-8", { 'hidden': isLoading || activeTab !== 'roles' })}>
{/* Existing Patterns */}
<div className="space-y-4">
<div className="flex justify-between items-center">
<div>
<h3 className="text-sm font-bold text-slate-700 dark:text-slate-300">Active Mapping Patterns</h3>
<p className="text-[10px] text-slate-500 uppercase font-semibold">Deterministic Regex/Text rules</p>
</div>
<button onClick={handleAddJobRole} className="flex items-center gap-1 px-3 py-1.5 bg-blue-600 hover:bg-blue-500 text-white text-xs font-bold rounded shadow-lg shadow-blue-500/20"><Plus className="h-3 w-3" /> ADD PATTERN</button>
</div>
<div className="bg-white dark:bg-slate-950 border border-slate-200 dark:border-slate-800 rounded-xl overflow-hidden shadow-sm">
<table className="w-full text-left text-xs">
<thead className="bg-slate-50 dark:bg-slate-900/50 border-b border-slate-200 dark:border-slate-800 text-slate-500 font-bold uppercase tracking-wider"><tr><th className="p-3">Pattern (% for wildcard)</th><th className="p-3">Target Persona Role</th><th className="p-3 w-10"></th></tr></thead>
<tbody className="divide-y divide-slate-100 dark:divide-slate-800/50">
{jobRoles.map(role => (
<tr key={role.id} className="group hover:bg-slate-50/50 dark:hover:bg-slate-800/30 transition-colors">
<td className="p-2"><input className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500 font-mono" defaultValue={role.pattern} /></td>
<td className="p-2"><select className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500" defaultValue={role.role}><option>Operativer Entscheider</option><option>Infrastruktur-Verantwortlicher</option><option>Wirtschaftlicher Entscheider</option><option>Innovations-Treiber</option><option>Influencer</option></select></td>
<td className="p-2 text-center"><button onClick={() => handleDeleteJobRole(role.id)} className="text-slate-400 hover:text-red-500 opacity-0 group-hover:opacity-100 transition-all transform hover:scale-110"><Trash2 className="h-4 w-4" /></button></td>
</tr>
))}
</tbody>
</table>
</div>
</div>
{/* Discovery Inbox */}
<div className="space-y-4 pt-4 border-t border-slate-200 dark:border-slate-800">
<div className="flex justify-between items-center">
<div>
<h3 className="text-sm font-bold text-slate-700 dark:text-slate-300">Discovery Inbox</h3>
<p className="text-[10px] text-slate-500 uppercase font-semibold">Unmapped job titles from CRM, prioritized by frequency</p>
</div>
</div>
<div className="bg-slate-50/50 dark:bg-slate-900/20 border border-dashed border-slate-300 dark:border-slate-700 rounded-xl overflow-hidden">
<table className="w-full text-left text-xs">
<thead className="bg-slate-100/50 dark:bg-slate-900/80 border-b border-slate-200 dark:border-slate-800 text-slate-400 font-bold uppercase tracking-wider"><tr><th className="p-3">Job Title from CRM</th><th className="p-3 w-20 text-center">Frequency</th><th className="p-3 w-10"></th></tr></thead>
<tbody className="divide-y divide-slate-100 dark:divide-slate-800/50">
{rawJobTitles.map(raw => (
<tr key={raw.id} className="group hover:bg-white dark:hover:bg-slate-800 transition-colors">
<td className="p-3 font-medium text-slate-600 dark:text-slate-400 italic">{raw.title}</td>
<td className="p-3 text-center"><span className="px-2 py-1 bg-slate-200 dark:bg-slate-800 rounded-full font-bold text-[10px] text-slate-500">{raw.count}x</span></td>
<td className="p-3 text-center"><button onClick={async () => {
await axios.post(`${apiBase}/job_roles`, { pattern: `%${raw.title.toLowerCase()}%`, role: "Influencer" });
fetchAllData();
}} className="p-1 text-blue-500 hover:bg-blue-100 dark:hover:bg-blue-900/30 rounded transition-all"><Plus className="h-4 w-4" /></button></td>
</tr>
))}
{rawJobTitles.length === 0 && (<tr><td colSpan={3} className="p-12 text-center text-slate-400 italic">Discovery inbox is empty. Import raw job titles to see data here.</td></tr>)}
</tbody>
</table>
</div>
</div>
</div>

View File

@@ -1,90 +1,113 @@
# SuperOffice Connector ("The Muscle") - GTM Engine
# SuperOffice Connector & GTM Engine ("The Muscle & The Brain")
Dies ist der "dumme" Microservice zur Anbindung von **SuperOffice CRM** an die **Company Explorer Intelligence**.
Der Connector agiert als reiner Bote ("Muscle"): Er nimmt Webhook-Events entgegen, fragt das "Gehirn" (Company Explorer) nach Instruktionen und führt diese im CRM aus.
Dieses Dokument beschreibt die Architektur der **Go-to-Market (GTM) Engine**, die SuperOffice CRM mit der Company Explorer Intelligence verbindet.
## 1. Architektur: "The Intelligent Hub & The Loyal Messenger"
Ziel des Systems ist der vollautomatisierte Versand von **hyper-personalisierten E-Mails**, die so wirken, als wären sie manuell von einem Branchenexperten geschrieben worden.
Wir haben uns für eine **Event-gesteuerte Architektur** entschieden, um Skalierbarkeit und Echtzeit-Verarbeitung zu gewährleisten.
---
**Der Datenfluss:**
1. **Auslöser:** User ändert in SuperOffice einen Kontakt (z.B. Status -> `Init`).
2. **Transport:** SuperOffice sendet ein `POST` Event an unseren Webhook-Endpunkt (`:8003/webhook`).
3. **Queueing:** Der `Webhook Receiver` validiert das Event und legt es sofort in eine lokale `SQLite`-Queue (`connector_queue.db`).
4. **Verarbeitung:** Ein separater `Worker`-Prozess holt den Job ab.
5. **Provisioning:** Der Worker fragt den **Company Explorer** (`POST /api/provision/superoffice-contact`): "Was soll ich mit Person ID 123 tun?".
6. **Write-Back:** Der Company Explorer liefert das fertige Text-Paket (Subject, Intro, Proof) zurück. Der Worker schreibt dies via REST API in die UDF-Felder von SuperOffice.
## 1. Das Konzept: "Static Magic"
## 2. Kern-Komponenten
Anders als bei üblichen KI-Tools, die E-Mails "on the fly" generieren, setzt dieses System auf **vorberechnete, statische Textbausteine**.
* **`webhook_app.py` (FastAPI):**
* Lauscht auf Port `8000` (Extern: `8003`).
* Nimmt Events entgegen, prüft Token (`WEBHOOK_SECRET`).
* Schreibt Jobs in die Queue.
* Endpunkt: `POST /webhook`.
**Warum?**
1. **Qualitätssicherung:** Jeder Baustein kann vor dem Versand geprüft werden.
2. **Performance:** SuperOffice muss beim Versand keine KI anfragen, sondern nur Felder zusammenfügen.
3. **Konsistenz:** Ein "Finanzleiter im Maschinenbau" bekommt immer dieselbe, perfekte Argumentation egal bei welchem Unternehmen.
* **`queue_manager.py` (SQLite):**
* Verwaltet die lokale Job-Queue.
* Status: `PENDING` -> `PROCESSING` -> `COMPLETED` / `FAILED`.
* Persistiert Jobs auch bei Container-Neustart.
### Die E-Mail-Formel
* **`worker.py`:**
* Läuft als Hintergrundprozess.
* Pollt die Queue alle 5 Sekunden.
* Kommuniziert mit Company Explorer (Intern: `http://company-explorer:8000`) und SuperOffice API.
* Behandelt Fehler und Retries.
Eine E-Mail setzt sich aus **drei statischen Komponenten** zusammen, die im CRM (SuperOffice) gespeichert sind:
* **`superoffice_client.py`:**
* Kapselt die SuperOffice REST API (Auth, GET, PUT).
* Verwaltet Refresh Tokens.
## 3. Setup & Konfiguration
### Docker Service
Der Service läuft im Container `connector-superoffice`.
Startet via `start.sh` sowohl den Webserver als auch den Worker.
### Konfiguration (`.env`)
Der Connector benötigt folgende Variablen (in `docker-compose.yml` gesetzt):
```yaml
environment:
API_USER: "admin"
API_PASSWORD: "..."
COMPANY_EXPLORER_URL: "http://company-explorer:8000" # Interne Docker-Adresse
WEBHOOK_SECRET: "changeme" # Muss mit SO-Webhook Config übereinstimmen
# Plus die SuperOffice Credentials (Client ID, Secret, Refresh Token)
```text
[1. Opener (Unternehmens-Spezifisch)] + [2. Bridge (Persona x Vertical)] + [3. Social Proof (Vertical)]
```
## 4. API-Schnittstelle (Intern)
* **1. Opener (Der Haken):** Bezieht sich zu 100% auf das spezifische Unternehmen und dessen Geschäftsmodell.
* *Quelle:* `Company`-Objekt (Feld: `ai_opener`).
* *Beispiel:* "Die präzise Just-in-Time-Fertigung von **Müller CNC** erfordert einen reibungslosen Materialfluss ohne Mikrostillstände."
* **2. Bridge (Die Relevanz):** Holt die Person in ihrer Rolle ab und verknüpft sie mit dem Branchen-Pain.
* *Quelle:* `Matrix`-Tabelle (Feld: `intro`).
* *Beispiel:* "Für Sie als **Produktionsleiter** bedeutet das, trotz Fachkräftemangel die Taktzeiten an der Linie stabil zu halten."
* **3. Social Proof (Die Lösung):** Zeigt Referenzen und den konkreten Nutzen (Gains).
* *Quelle:* `Matrix`-Tabelle (Feld: `social_proof`).
* *Beispiel:* "Unternehmen wie **Jungheinrich** nutzen unsere Transportroboter, um Fachkräfte an der Maschine zu halten und Suchzeiten um 30% zu senken."
Der Connector ruft den Company Explorer auf und liefert dabei **Live-Daten** aus dem CRM für den "Double Truth" Abgleich:
---
**Request:** `POST /api/provision/superoffice-contact`
```json
{
"so_contact_id": 12345,
"so_person_id": 67890,
"crm_name": "RoboPlanet GmbH",
"crm_website": "www.roboplanet.de",
"job_title": "Geschäftsführer"
}
```
## 2. Die Datenbasis (Foundation)
**Response:**
```json
{
"status": "success",
"texts": {
"subject": "Optimierung Ihrer Logistik...",
"intro": "Als Logistikleiter kennen Sie...",
"social_proof": "Wir helfen bereits Firma X..."
}
}
```
Die Qualität der Texte steht und fällt mit der Datenbasis. Diese wird zentral in **Notion** gepflegt und in den Company Explorer synchronisiert.
## 5. Offene To-Dos (Roadmap)
### A. Verticals (Branchen)
Definiert die **Makro-Pains** und **Gains** einer Branche sowie das **passende Produkt**.
* *Beispiel:* Healthcare -> Pain: "Pflegekräfte machen Logistik" -> Gain: "Hände fürs Bett" -> Produkt: Service-Roboter.
* *Wichtig:* Unterscheidung nach **Ops-Focus** (Operativ vs. Infrastruktur) steuert das Produkt (Reinigung vs. Service).
* [ ] **UDF-Mapping:** Aktuell sind die `ProgId`s (z.B. `SuperOffice:5`) im Code (`worker.py`) hartkodiert. Dies muss in eine Config ausgelagert werden.
* [ ] **Fehlerbehandlung:** Was passiert, wenn der Company Explorer "404 Not Found" meldet? (Aktuell: Log Warning & Skip).
* [ ] **Redis:** Bei sehr hoher Last (>100 Events/Sekunde) sollte die SQLite-Queue durch Redis ersetzt werden.
### B. Personas (Rollen)
Definiert die **persönlichen Pains** einer Rolle.
* *Beispiel:* Produktionsleiter -> Pain: "OEE / Taktzeit".
* *Beispiel:* Geschäftsführer -> Pain: "ROI / Amortisation".
---
## 3. Die Matrix-Engine (Multiplikation)
Das Skript `generate_matrix.py` (im Backend) ist das Herzstück. Es berechnet **alle möglichen Kombinationen** aus Verticals und Personas voraus.
**Logik:**
1. Lade alle Verticals (`V`) und Personas (`P`).
2. Für jede Kombination `V x P`:
* Lade `V.Pains` und `P.Pains`.
* Generiere via Gemini einen **perfekten Satz 2 (Bridge)** und **Satz 3 (Proof)**.
* Generiere ein **Subject**, das den Persona-Pain trifft.
3. Speichere das Ergebnis in der Tabelle `marketing_matrix`.
*Ergebnis:* Eine Lookup-Tabelle, aus der für jeden Kontakt sofort der passende Text gezogen werden kann.
---
## 4. Der "Opener" (First Sentence)
Dieser Baustein ist der einzige, der **pro Unternehmen** generiert wird (bei der Analyse/Discovery).
**Logik:**
1. Scrape Website-Content.
2. Identifiziere das **Vertical** (z.B. Maschinenbau).
3. Lade den **Core-Pain** des Verticals (z.B. "Materialfluss").
4. **Prompt:** "Analysiere das Geschäftsmodell von [Firma]. Formuliere einen Satz, der erklärt, warum [Core-Pain] für genau dieses Geschäftsmodell kritisch ist."
*Ergebnis:* Ein Satz, der beweist: "Ich habe verstanden, was ihr tut."
---
## 5. SuperOffice Connector ("The Muscle")
Der Connector ist der Bote, der diese Daten in das CRM bringt.
**Workflow:**
1. **Trigger:** Kontakt-Änderung in SuperOffice (Webhook).
2. **Enrichment:** Connector fragt Company Explorer: "Gib mir Daten für Firma X, Person Y".
3. **Lookup:** Company Explorer...
* Holt den `Opener` aus der Company-Tabelle.
* Bestimmt `Vertical` und `Persona`.
* Sucht den passenden Eintrag in der `MarketingMatrix`.
4. **Write-Back:** Connector schreibt die Texte in die UDF-Felder (User Defined Fields) des Kontakts in SuperOffice.
* `UDF_Opener`
* `UDF_Bridge`
* `UDF_Proof`
* `UDF_Subject`
---
## 6. Setup & Wartung
### Neue Branche hinzufügen
1. In **Notion** anlegen (Pains/Gains/Produkte definieren).
2. Sync-Skript laufen lassen: `python3 backend/scripts/sync_notion_industries.py`.
3. Matrix neu berechnen: `python3 backend/scripts/generate_matrix.py --live`.
### Prompt-Tuning
Die Prompts für Matrix und Opener liegen in:
* Matrix: `backend/scripts/generate_matrix.py`
* Opener: `backend/services/classification.py` (oder `enrichment.py`)

View File

@@ -1,45 +1,44 @@
import os
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
# Load environment variables
if os.path.exists(".env"):
load_dotenv(".env", override=True)
elif os.path.exists("../.env"):
load_dotenv("../.env", override=True)
class Config:
# SuperOffice API Configuration
SO_CLIENT_ID = os.getenv("SO_SOD")
SO_CLIENT_SECRET = os.getenv("SO_CLIENT_SECRET")
SO_CONTEXT_IDENTIFIER = os.getenv("SO_CONTEXT_IDENTIFIER")
SO_REFRESH_TOKEN = os.getenv("SO_REFRESH_TOKEN")
class Settings(BaseSettings):
# --- Infrastructure ---
# Internal Docker URL for Company Explorer
COMPANY_EXPLORER_URL: str = "http://company-explorer:8000"
# Company Explorer Configuration
CE_API_URL = os.getenv("CE_API_URL", "http://company-explorer:8000")
CE_API_USER = os.getenv("CE_API_USER", "admin")
CE_API_PASSWORD = os.getenv("CE_API_PASSWORD", "gemini")
# --- SuperOffice API Credentials ---
SO_ENVIRONMENT: str = "sod" # 'sod' or 'online'
SO_CLIENT_ID: str = ""
SO_CLIENT_SECRET: str = ""
SO_REFRESH_TOKEN: str = ""
SO_REDIRECT_URI: str = "http://localhost"
SO_CONTEXT_IDENTIFIER: str = "Cust55774" # e.g. Cust12345
# --- Feature Flags ---
ENABLE_WEBSITE_SYNC: bool = False # Disabled by default to prevent loops
# --- Mappings (IDs from SuperOffice) ---
# Vertical IDs (List Items)
# Default values match the current hardcoded DEV IDs
# Format: "Name In Explorer": ID_In_SuperOffice
VERTICAL_MAP_JSON: str = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24, "Infrastructure - Transport": 25, "Leisure - Indoor Active": 26}'
# UDF Mapping (ProgIds) - Defaulting to SOD values, should be overridden in Prod
UDF_CONTACT_MAPPING = {
"ai_challenge_sentence": os.getenv("UDF_CONTACT_CHALLENGE", "SuperOffice:1"),
"ai_sentence_timestamp": os.getenv("UDF_CONTACT_TIMESTAMP", "SuperOffice:2"),
"ai_sentence_source_hash": os.getenv("UDF_CONTACT_HASH", "SuperOffice:3"),
"ai_last_outreach_date": os.getenv("UDF_CONTACT_OUTREACH", "SuperOffice:4")
}
# Persona / Job Role IDs (List Items for "Position" field)
# To be filled after discovery
PERSONA_MAP_JSON: str = '{}'
UDF_PERSON_MAPPING = {
"ai_email_draft": os.getenv("UDF_PERSON_DRAFT", "SuperOffice:1"),
"ma_status": os.getenv("UDF_PERSON_STATUS", "SuperOffice:2")
}
# User Defined Fields (ProgIDs)
# The technical names of the fields in SuperOffice
# Default values match the current hardcoded DEV UDFs
UDF_SUBJECT: str = "SuperOffice:5"
UDF_INTRO: str = "SuperOffice:6"
UDF_SOCIAL_PROOF: str = "SuperOffice:7"
UDF_VERTICAL: str = "SuperOffice:5" # NOTE: Currently same as Subject in dev? Need to verify. worker.py had 'SuperOffice:5' for vertical AND 'SuperOffice:5' for subject in the map?
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "ignore" # Ignore extra fields in .env
# MA Status ID Mapping (Text -> ID) - Defaulting to discovered SOD values
MA_STATUS_ID_MAP = {
"Ready_to_Send": int(os.getenv("MA_STATUS_ID_READY", 11)),
"Sent_Week1": int(os.getenv("MA_STATUS_ID_WEEK1", 12)),
"Sent_Week2": int(os.getenv("MA_STATUS_ID_WEEK2", 13)),
"Bounced": int(os.getenv("MA_STATUS_ID_BOUNCED", 14)),
"Soft_Denied": int(os.getenv("MA_STATUS_ID_DENIED", 15)),
"Interested": int(os.getenv("MA_STATUS_ID_INTERESTED", 16)),
"Out_of_Office": int(os.getenv("MA_STATUS_ID_OOO", 17)),
"Unsubscribed": int(os.getenv("MA_STATUS_ID_UNSUB", 18))
}
# Global instance
settings = Settings()

View File

@@ -1,89 +1,82 @@
# connector-superoffice/discover_fields.py (Standalone & Robust)
import os
import requests
import json
from dotenv import load_dotenv
from superoffice_client import SuperOfficeClient
import logging
# Load environment variables
load_dotenv(override=True)
# Setup Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("discovery")
# Configuration
SO_ENV = os.getenv("SO_ENVIRONMENT", "sod") # sod, stage, online
SO_CLIENT_ID = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
SO_CLIENT_SECRET = os.getenv("SO_CLIENT_SECRET")
# SO_REDIRECT_URI often required for validation even in refresh flow
SO_REDIRECT_URI = os.getenv("SO_REDIRECT_URI", "http://localhost")
SO_REFRESH_TOKEN = os.getenv("SO_REFRESH_TOKEN")
def discover():
print("🔍 Starting SuperOffice Discovery Tool...")
client = SuperOfficeClient()
if not client.access_token:
print("❌ Auth failed. Check .env")
return
def get_access_token():
"""Refreshes the access token using the refresh token."""
url = f"https://{SO_ENV}.superoffice.com/login/common/oauth/tokens"
data = {
"grant_type": "refresh_token",
"client_id": SO_CLIENT_ID,
"client_secret": SO_CLIENT_SECRET,
"refresh_token": SO_REFRESH_TOKEN,
"redirect_uri": SO_REDIRECT_URI
}
# 1. Discover UDFs (User Defined Fields)
print("\n--- 1. User Defined Fields (UDFs) ---")
print(f"DEBUG: Refreshing token at {url} for Client ID {SO_CLIENT_ID[:5]}...")
response = requests.post(url, data=data)
if response.status_code == 200:
print("✅ Access Token refreshed.")
return response.json().get("access_token")
else:
print(f"❌ Error getting token: {response.text}")
return None
def discover_udfs(base_url, token, entity="Contact"):
"""
Fetches the UDF layout for a specific entity.
entity: 'Contact' (Firma) or 'Person'
"""
endpoint = "Contact" if entity == "Contact" else "Person"
url = f"{base_url}/api/v1/{endpoint}?$top=1&$select=userDefinedFields"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
print(f"\n--- DISCOVERING UDFS FOR: {entity} ---")
# Contact UDFs
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data['value']:
item = data['value'][0]
udfs = item.get('userDefinedFields', {})
print(f"Found {len(udfs)} UDFs on this record.")
# Filter logic: Show interesting fields
relevant_udfs = {k: v for k, v in udfs.items() if "marketing" in k.lower() or "robotic" in k.lower() or "challenge" in k.lower() or "ai" in k.lower()}
if relevant_udfs:
print("✅ FOUND RELEVANT FIELDS (ProgId : Value):")
print(json.dumps(relevant_udfs, indent=2))
else:
print("⚠️ No fields matching 'marketing/robotic/ai' found.")
print("First 5 UDFs for context:")
print(json.dumps(list(udfs.keys())[:5], indent=2))
print("Fetching a sample Contact to inspect UDFs...")
contacts = client.search("Contact?$top=1")
if contacts:
# Inspect keys of first result
first_contact = contacts[0]
# Try to find ID
c_id = first_contact.get('ContactId') or first_contact.get('PrimaryKey')
if c_id:
c = client.get_contact(c_id)
udfs = c.get("UserDefinedFields", {})
print(f"Found {len(udfs)} UDFs on Contact {c_id}:")
for k, v in udfs.items():
print(f" - Key (ProgId): {k} | Value: {v}")
else:
print("No records found to inspect.")
print(f"⚠️ Could not find ID in search result: {first_contact.keys()}")
else:
print(f"Error {response.status_code}: {response.text}")
print("⚠️ No contacts found. Cannot inspect Contact UDFs.")
print("\nFetching a sample Person to inspect UDFs...")
persons = client.search("Person?$top=1")
if persons:
first_person = persons[0]
p_id = first_person.get('PersonId') or first_person.get('PrimaryKey')
if p_id:
p = client.get_person(p_id)
udfs = p.get("UserDefinedFields", {})
print(f"Found {len(udfs)} UDFs on Person {p_id}:")
for k, v in udfs.items():
print(f" - Key (ProgId): {k} | Value: {v}")
else:
print(f"⚠️ Could not find ID in search result: {first_person.keys()}")
else:
print("⚠️ No persons found. Cannot inspect Person UDFs.")
except Exception as e:
print(f"Request failed: {e}")
print(f"❌ Error inspecting UDFs: {e}")
# 2. Discover Lists (MDO Providers)
print("\n--- 2. MDO Lists (Positions, Business/Industry) ---")
lists_to_check = ["position", "business"]
for list_name in lists_to_check:
print(f"\nChecking List: '{list_name}'...")
try:
# Endpoint: GET /List/{list_name}/Items
items = client._get(f"List/{list_name}/Items")
if items:
print(f"Found {len(items)} items in '{list_name}':")
for item in items:
print(f" - ID: {item['Id']} | Name: '{item['Name']}'")
else:
print(f" (List '{list_name}' is empty or not accessible)")
except Exception as e:
print(f" ❌ Failed to fetch list '{list_name}': {e}")
if __name__ == "__main__":
token = get_access_token()
if token:
# Hardcoded Base URL for Cust55774 (Fix: Use app-sod as per README)
base_url = "https://app-sod.superoffice.com/Cust55774"
discover_udfs(base_url, token, "Person")
discover_udfs(base_url, token, "Contact")
else:
print("Could not get Access Token. Check .env")
discover()

View File

@@ -0,0 +1,70 @@
import datetime
from datetime import date
import holidays
# Configuration
YEARS_TO_GENERATE = [2025, 2026]
COUNTRY_CODE = "DE"
SUB_REGION = "BY" # Bayern (Wackler HQ)
def generate_crm_script():
print(f"Generating CRMScript for Holidays ({COUNTRY_CODE}-{SUB_REGION})...")
# 1. Calculate Holidays
holidays_list = []
de_holidays = holidays.Country(COUNTRY_CODE, subdiv=SUB_REGION)
for year in YEARS_TO_GENERATE:
for date_obj, name in de_holidays.items():
if date_obj.year == year:
holidays_list.append((date_obj, name))
# Sort by date
holidays_list.sort(key=lambda x: x[0])
# 2. Generate CRMScript Code
script = f"// --- AUTO-GENERATED HOLIDAY IMPORT SCRIPT ---
"
script += f"// Generated for: {COUNTRY_CODE}-{SUB_REGION} (Years: {YEARS_TO_GENERATE})\n"
script += f"// Target Table: y_holidays (Must exist! Columns: x_date, x_name)\n\n"
script += "Integer count = 0;
"
script += "DateTime date;
"
script += "String name;
\n"
for d, name in holidays_list:
# Format date for CRMScript (usually specific format required, depends on locale but DateTime can parse ISO often)
# Better: use explicit construction or string
date_str = d.strftime("%Y-%m-%d")
script += f"date = String(\"{date_str}\").toDateTime();\n"
script += f"name = \"{name}\";\n"
# Check if exists to avoid dupes (pseudo-code, adapting to likely CRMScript API)
# Usually we use specific SearchEngine or similar.
# Simple version: Just insert. Admin should clear table before run if needed.
script += f"// Inserting {date_str} - {name}\n"
script += "GenericEntity holiday = getDatabaseConnection().createGenericEntity(\"y_holidays\");\n"
script += "holiday.setValue(\"x_date\", date);
"
script += "holiday.setValue(\"x_name\", name);
"
script += "holiday.save();\n"
script += "count++;\n\n"
script += "print(\"Imported \" + count.toString() + \" holidays.\");\n"
# 3. Output
output_filename = "import_holidays_CRMSCRIPT.txt"
with open(output_filename, "w", encoding="utf-8") as f:
f.write(script)
print(f"✅ CRMScript generated: {output_filename}")
print("👉 Copy the content of this file and run it in SuperOffice (Settings -> CRMScript -> Execute).")
if __name__ == "__main__":
generate_crm_script()

View File

@@ -0,0 +1,90 @@
import os
import requests
import json
import logging
from superoffice_client import SuperOfficeClient
from config import settings
# Setup Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("simulation-e2e")
def simulate_sendout(contact_id: int, person_id: int):
print(f"🚀 Starting E2E Sendout Simulation for Contact {contact_id}, Person {person_id}...")
# 1. Initialize SuperOffice Client
so_client = SuperOfficeClient()
if not so_client.access_token:
print("❌ Auth failed. Check .env")
return
# 2. Get Data from Company Explorer
# We simulate what the worker would do
print(f"📡 Requesting provisioning from Company Explorer...")
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
"crm_name": "RoboPlanet GmbH",
"crm_website": "www.roboplanet.de",
"job_title": "Geschäftsführer" # Explicit job title for persona mapping
}
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
resp.raise_for_status()
provisioning_data = resp.json()
except Exception as e:
print(f"❌ CE API failed: {e}")
return
print(f"✅ Received Data: {json.dumps(provisioning_data, indent=2)}")
if provisioning_data.get("status") == "processing":
print("⏳ CE is still processing. Please wait 1-2 minutes and try again.")
return
texts = provisioning_data.get("texts", {})
if not texts.get("subject"):
print("⚠️ No marketing texts found for this combination (Vertical x Persona).")
return
# 3. Write Texts to SuperOffice UDFs
print("✍️ Writing marketing texts to SuperOffice UDFs...")
udf_payload = {
settings.UDF_SUBJECT: texts["subject"],
settings.UDF_INTRO: texts["intro"],
settings.UDF_SOCIAL_PROOF: texts["social_proof"]
}
success = so_client.update_entity_udfs(person_id, "Person", udf_payload)
if success:
print("✅ UDFs updated successfully.")
else:
print("❌ Failed to update UDFs.")
return
# 4. Create Appointment (The "Sendout Proof")
print("📅 Creating Appointment as sendout proof...")
app_subject = f"[SIMULATION] Mail Sent: {texts['subject']}"
app_desc = f"Content Simulation:\n\n{texts['intro']}\n\n{texts['social_proof']}"
appointment = so_client.create_appointment(
subject=app_subject,
description=app_desc,
contact_id=contact_id,
person_id=person_id
)
if appointment:
print(f"✅ Simulation Complete! Appointment ID: {appointment.get('AppointmentId')}")
print(f"🔗 Check SuperOffice for Contact {contact_id} and look at the activities.")
else:
print("❌ Failed to create appointment.")
if __name__ == "__main__":
# Using the IDs we know exist from previous tests/status
TEST_CONTACT_ID = 2
TEST_PERSON_ID = 2 # Usually same or linked
simulate_sendout(TEST_CONTACT_ID, TEST_PERSON_ID)

View File

@@ -1,129 +1,118 @@
import os
import requests
import json
from dotenv import load_dotenv
from config import settings
import logging
load_dotenv(override=True)
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("superoffice-client")
class SuperOfficeClient:
"""A client for interacting with the SuperOffice REST API."""
def __init__(self):
# Helper to strip quotes if Docker passed them literally
def get_clean_env(key, default=None):
val = os.getenv(key)
if val and val.strip(): # Check if not empty string
return val.strip('"').strip("'")
return default
self.client_id = get_clean_env("SO_CLIENT_ID") or get_clean_env("SO_SOD")
self.client_secret = get_clean_env("SO_CLIENT_SECRET")
self.refresh_token = get_clean_env("SO_REFRESH_TOKEN")
self.redirect_uri = get_clean_env("SO_REDIRECT_URI", "http://localhost")
self.env = get_clean_env("SO_ENVIRONMENT", "sod")
self.cust_id = get_clean_env("SO_CONTEXT_IDENTIFIER", "Cust55774") # Fallback for your dev
# Configuration
self.client_id = settings.SO_CLIENT_ID
self.client_secret = settings.SO_CLIENT_SECRET
self.refresh_token = settings.SO_REFRESH_TOKEN
self.env = settings.SO_ENVIRONMENT
self.cust_id = settings.SO_CONTEXT_IDENTIFIER
if not all([self.client_id, self.client_secret, self.refresh_token]):
raise ValueError("SuperOffice credentials missing in .env file.")
# Graceful failure: Log error but allow init (for help/docs/discovery scripts)
logger.error("❌ SuperOffice credentials missing in .env file (or environment variables).")
self.base_url = None
self.access_token = None
return
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
self.access_token = self._refresh_access_token()
if not self.access_token:
raise Exception("Failed to authenticate with SuperOffice.")
self.headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
print("✅ SuperOffice Client initialized and authenticated.")
if not self.access_token:
logger.error("❌ Failed to authenticate with SuperOffice.")
else:
self.headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
logger.info("✅ SuperOffice Client initialized and authenticated.")
def _refresh_access_token(self):
"""Refreshes and returns a new access token."""
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
print(f"DEBUG: Refresh URL: '{url}' (Env: '{self.env}')") # DEBUG
logger.debug(f"DEBUG: Refresh URL: '{url}' (Env: '{self.env}')")
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"redirect_uri": self.redirect_uri
"redirect_uri": settings.SO_REDIRECT_URI
}
try:
resp = requests.post(url, data=data)
resp.raise_for_status()
return resp.json().get("access_token")
except requests.exceptions.HTTPError as e:
print(f"❌ Token Refresh Error: {e.response.text}")
logger.error(f"❌ Token Refresh Error: {e.response.text}")
return None
except Exception as e:
print(f"❌ Connection Error during token refresh: {e}")
logger.error(f"❌ Connection Error during token refresh: {e}")
return None
def _get(self, endpoint):
"""Generic GET request."""
if not self.access_token: return None
try:
resp = requests.get(f"{self.base_url}/{endpoint}", headers=self.headers)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
print(f"❌ API GET Error for {endpoint}: {e.response.text}")
logger.error(f"❌ API GET Error for {endpoint}: {e.response.text}")
return None
def _put(self, endpoint, payload):
"""Generic PUT request."""
if not self.access_token: return None
try:
resp = requests.put(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
print(f"❌ API PUT Error for {endpoint}: {e.response.text}")
logger.error(f"❌ API PUT Error for {endpoint}: {e.response.text}")
return None
def _post(self, endpoint, payload):
"""Generic POST request."""
if not self.access_token: return None
try:
resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
logger.error(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
return None
except Exception as e:
logger.error(f"❌ Connection Error during POST for {endpoint}: {e}")
return None
# --- Convenience Wrappers ---
def get_person(self, person_id):
"""Gets a single person by ID."""
return self._get(f"Person/{person_id}")
def get_contact(self, contact_id):
"""Gets a single contact (company) by ID."""
return self._get(f"Contact/{contact_id}")
def update_udfs(self, entity: str, entity_id: int, udf_payload: dict):
"""
Updates the UserDefinedFields for a given entity (Person or Contact).
Args:
entity (str): "Person" or "Contact".
entity_id (int): The ID of the entity.
udf_payload (dict): A dictionary of ProgId:Value pairs.
"""
endpoint = f"{entity}/{entity_id}"
# 1. GET the full entity object
existing_data = self._get(endpoint)
if not existing_data:
return False # Error is printed in _get
# 2. Merge the UDF payload
if "UserDefinedFields" not in existing_data:
existing_data["UserDefinedFields"] = {}
existing_data["UserDefinedFields"].update(udf_payload)
# 3. PUT the full object back
print(f"Updating {entity} {entity_id} with new UDFs...")
result = self._put(endpoint, existing_data)
if result:
print(f"✅ Successfully updated {entity} {entity_id}")
return True
return False
def search(self, query_string: str):
"""
Performs a search using OData syntax and handles pagination.
Example: "Person?$select=personId&$filter=lastname eq 'Godelmann'"
"""
if not self.access_token: return None
all_results = []
next_page_url = f"{self.base_url}/{query_string}"
@@ -133,91 +122,14 @@ class SuperOfficeClient:
resp.raise_for_status()
data = resp.json()
# Add the items from the current page
all_results.extend(data.get('value', []))
# Check for the next page link
next_page_url = data.get('next_page_url', None)
except requests.exceptions.HTTPError as e:
print(f"❌ API Search Error for {query_string}: {e.response.text}")
logger.error(f"❌ API Search Error for {query_string}: {e.response.text}")
return None
return all_results
def find_contact_by_criteria(self, name=None, org_nr=None, url=None):
"""
Finds a contact (company) by name, OrgNr, or URL.
Returns the first matching contact or None.
"""
filter_parts = []
if name:
filter_parts.append(f"Name eq '{name}'")
if org_nr:
filter_parts.append(f"OrgNr eq '{org_nr}'")
if url:
filter_parts.append(f"UrlAddress eq '{url}'")
if not filter_parts:
print("❌ No criteria provided for contact search.")
return None
query_string = "Contact?$filter=" + " or ".join(filter_parts)
results = self.search(query_string)
if results:
return results[0] # Return the first match
return None
def _post(self, endpoint, payload):
"""Generic POST request."""
try:
resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
print(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
return None
except Exception as e:
print(f"❌ Connection Error during POST for {endpoint}: {e}")
return None
def create_contact(self, name: str, url: str = None, org_nr: str = None):
"""Creates a new contact (company)."""
payload = {"Name": name}
if url:
payload["UrlAddress"] = url
if org_nr:
payload["OrgNr"] = org_nr
print(f"Creating new contact: {name} with payload: {payload}...") # Added payload to log
return self._post("Contact", payload)
def create_person(self, first_name: str, last_name: str, contact_id: int, email: str = None):
"""Creates a new person linked to a contact."""
payload = {
"Firstname": first_name,
"Lastname": last_name,
"Contact": {"ContactId": contact_id}
}
if email:
payload["EmailAddress"] = email
print(f"Creating new person: {first_name} {last_name} for Contact ID {contact_id}...")
return self._post("Person", payload)
def create_sale(self, title: str, contact_id: int, person_id: int, amount: float = None):
"""Creates a new sale (opportunity) linked to a contact and person."""
payload = {
"Heading": title,
"Contact": {"ContactId": contact_id},
"Person": {"PersonId": person_id}
}
if amount:
payload["Amount"] = amount
print(f"Creating new sale: {title}...")
return self._post("Sale", payload)
def create_project(self, name: str, contact_id: int, person_id: int = None):
"""Creates a new project linked to a contact, and optionally adds a person."""
payload = {
@@ -235,29 +147,74 @@ class SuperOfficeClient:
print(f"Creating new project: {name}...")
return self._post("Project", payload)
def create_appointment(self, subject: str, description: str, contact_id: int, person_id: int = None):
"""Creates a new appointment (to simulate a sent activity)."""
import datetime
now = datetime.datetime.utcnow().isoformat() + "Z"
payload = {
"Description": f"{subject}\n\n{description}",
"Contact": {"ContactId": contact_id},
"StartDate": now,
"EndDate": now,
"Task": {"Id": 1} # Usually 'Follow-up' or similar, depending on SO config
}
if person_id:
payload["Person"] = {"PersonId": person_id}
print(f"Creating new appointment: {subject}...")
return self._post("Appointment", payload)
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
"""
Updates UDFs for a given entity (Contact or Person).
Args:
entity_id (int): ID of the entity.
entity_type (str): 'Contact' or 'Person'.
udf_data (dict): Dictionary with ProgId:Value pairs for UDFs.
Returns:
dict: The updated entity object from the API, or None on failure.
entity_type: 'Contact' or 'Person'
udf_data: {ProgId: Value}
"""
# We need to GET the existing entity, update its UDFs, then PUT it back.
endpoint = f"{entity_type}/{entity_id}"
# 1. GET existing
existing_entity = self._get(endpoint)
if not existing_entity:
print(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.")
return None
logger.error(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.")
return False
if "UserDefinedFields" not in existing_entity:
existing_entity["UserDefinedFields"] = {}
# 2. Merge payload
existing_entity["UserDefinedFields"].update(udf_data)
print(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...")
return self._put(endpoint, existing_entity)
logger.info(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...")
# 3. PUT update
result = self._put(endpoint, existing_entity)
return bool(result)
def update_person_position(self, person_id: int, position_id: int):
"""
Updates the standard 'Position' field of a Person.
"""
endpoint = f"Person/{person_id}"
# 1. GET existing
existing_person = self._get(endpoint)
if not existing_person:
logger.error(f"❌ Failed to retrieve Person {person_id} for Position update.")
return False
# 2. Check current value to avoid redundant updates
current_pos = existing_person.get("Position", {})
if current_pos and str(current_pos.get("Id")) == str(position_id):
logger.info(f"Person {person_id} Position already set to {position_id}. Skipping.")
return True
# 3. Update
existing_person["Position"] = {"Id": int(position_id)}
logger.info(f"Updating Person {person_id} Position to ID {position_id}...")
# 4. PUT
result = self._put(endpoint, existing_person)
return bool(result)

View File

@@ -0,0 +1,80 @@
import time
import json
import logging
from queue_manager import JobQueue
from worker import process_job
from superoffice_client import SuperOfficeClient
from config import settings
from unittest.mock import MagicMock
# Setup Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("e2e-test")
def test_e2e():
print("🚀 Starting End-to-End Simulation...")
# 1. Mock the SuperOffice Client
# We don't want to hit the real API in this test script unless we are sure.
# But wait, the user asked for "Finaler End-to-End Systemtest".
# Usually this implies hitting the real systems.
# Let's try to use the REAL client if credentials are present, otherwise Mock.
real_client = False
if settings.SO_CLIENT_ID and settings.SO_REFRESH_TOKEN:
print("✅ Real Credentials found. Attempting real connection...")
try:
so_client = SuperOfficeClient()
if so_client.access_token:
real_client = True
except:
print("⚠️ Real connection failed. Falling back to Mock.")
if not real_client:
print("⚠️ Using MOCKED SuperOffice Client.")
so_client = MagicMock()
so_client.get_contact.return_value = {"ContactId": 123, "Name": "Test Company", "UserDefinedFields": {}}
so_client.get_person.return_value = {"PersonId": 456, "Contact": {"ContactId": 123}, "UserDefinedFields": {}}
so_client.update_entity_udfs.return_value = True
else:
# Use a SAFE contact ID for testing if possible.
# CAUTION: This writes to the real system.
# Verify with user? Use a known "Gemini Test" contact?
# Let's use the ID 2 (which was mentioned in status updates as test data)
TEST_CONTACT_ID = 2
# Verify it exists
c = so_client.get_contact(TEST_CONTACT_ID)
if not c:
print(f"❌ Test Contact {TEST_CONTACT_ID} not found. Aborting real write-test.")
return
print(f" Using Real Contact: {c.get('Name')} (ID: {TEST_CONTACT_ID})")
# 2. Create a Fake Job
fake_job = {
"id": "test-job-001",
"event_type": "contact.changed",
"payload": {
"PrimaryKey": 2, # Use the real ID
"ContactId": 2,
"JobTitle": "Geschäftsführer" # Trigger mapping
},
"created_at": time.time()
}
# 3. Process the Job (using worker logic)
# NOTE: This assumes COMPANY_EXPLORER_URL is reachable.
# If running in CLI container, it might need to be 'localhost' or the docker DNS name.
# Let's override config for this test run.
# settings.COMPANY_EXPLORER_URL = "http://localhost:8000" # Try localhost first if running on host/mapped
print(f"\n⚙️ Processing Job with CE URL: {settings.COMPANY_EXPLORER_URL}...")
try:
result = process_job(fake_job, so_client)
print(f"\n✅ Job Result: {result}")
except Exception as e:
print(f"\n❌ Job Failed: {e}")
if __name__ == "__main__":
test_e2e()

View File

@@ -0,0 +1,100 @@
import os
import requests
import json
import logging
import sys
# Configure to run from root context
sys.path.append(os.path.join(os.getcwd(), "connector-superoffice"))
# Mock Config if needed, or use real one
try:
from config import settings
except ImportError:
print("Could not import settings. Ensure you are in project root.")
sys.exit(1)
# FORCE CE URL for internal Docker comms if running inside container
# If running outside, this might need localhost.
# settings.COMPANY_EXPLORER_URL is used.
API_USER = os.getenv("API_USER", "admin")
API_PASS = os.getenv("API_PASSWORD", "gemini")
def test_dynamic_role_change():
print("🧪 STARTING TEST: Dynamic Role Change & Content Generation\n")
# Define Scenarios
scenarios = [
{
"name": "Scenario A (CEO)",
"job_title": "Geschäftsführer",
"expect_keywords": ["Kostenreduktion", "Effizienz", "Amortisation"]
},
{
"name": "Scenario B (Warehouse Mgr)",
"job_title": "Lagerleiter",
"expect_keywords": ["Stress", "Sauberkeit", "Entlastung"]
}
]
results = {}
for s in scenarios:
print(f"--- Running {s['name']} ---")
print(f"Role Trigger: '{s['job_title']}'")
payload = {
"so_contact_id": 2, # RoboPlanet Test
"so_person_id": 2,
"crm_name": "RoboPlanet GmbH-SOD",
"crm_website": "www.roboplanet.de", # Ensure we match the industry (Logistics)
"job_title": s['job_title']
}
try:
url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
print(f"POST {url}")
resp = requests.post(url, json=payload, auth=(API_USER, API_PASS))
resp.raise_for_status()
data = resp.json()
# Validation
texts = data.get("texts", {})
subject = texts.get("subject", "")
intro = texts.get("intro", "")
print(f"Received Role: {data.get('role_name')}")
print(f"Received Subject: {subject}")
# Check Keywords
full_text = (subject + " " + intro).lower()
matches = [k for k in s['expect_keywords'] if k.lower() in full_text]
if len(matches) > 0:
print(f"✅ Content Match! Found keywords: {matches}")
results[s['name']] = "PASS"
else:
print(f"❌ Content Mismatch. Expected {s['expect_keywords']}, got text: {subject}...")
results[s['name']] = "FAIL"
results[f"{s['name']}_Subject"] = subject # Store for comparison later
except Exception as e:
print(f"❌ API Error: {e}")
results[s['name']] = "ERROR"
print("")
# Final Comparison
print("--- Final Result Analysis ---")
if results["Scenario A (CEO)"] == "PASS" and results["Scenario B (Warehouse Mgr)"] == "PASS":
if results["Scenario A (CEO)_Subject"] != results["Scenario B (Warehouse Mgr)_Subject"]:
print("✅ SUCCESS: Different roles generated different, targeted content.")
else:
print("⚠️ WARNING: Content matched keywords but Subjects are identical! Check Matrix.")
else:
print("❌ TEST FAILED. See individual steps.")
if __name__ == "__main__":
test_dynamic_role_change()

View File

@@ -0,0 +1,111 @@
import os
import requests
import json
import logging
import sys
import time
# Configure path to import modules from parent directory
sys.path.append(os.path.join(os.getcwd(), "connector-superoffice"))
try:
from config import settings
from superoffice_client import SuperOfficeClient
except ImportError:
print("❌ Import Error. Ensure you are running from the project root.")
sys.exit(1)
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("e2e-roundtrip")
# Config
API_USER = os.getenv("API_USER", "admin")
API_PASS = os.getenv("API_PASSWORD", "gemini")
TEST_PERSON_ID = 2
TEST_CONTACT_ID = 2
def run_roundtrip():
print("🚀 STARTING FULL E2E ROUNDTRIP TEST (API -> SO Write)\n")
so_client = SuperOfficeClient()
if not so_client.access_token:
print("❌ SuperOffice Auth failed. Check .env")
return
scenarios = [
{
"name": "Scenario A",
"role_label": "Geschäftsführer",
"expect_keyword": "Kosten"
},
{
"name": "Scenario B",
"role_label": "Lagerleiter",
"expect_keyword": "Sauberkeit"
}
]
for s in scenarios:
print(f"--- Running {s['name']}: {s['role_label']} ---")
# 1. Provisioning (Company Explorer)
print(f"1. 🧠 Asking Company Explorer (Trigger: {s['role_label']})...")
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
payload = {
"so_contact_id": TEST_CONTACT_ID,
"so_person_id": TEST_PERSON_ID,
"crm_name": "RoboPlanet GmbH-SOD",
"crm_website": "www.roboplanet.de",
"job_title": s['role_label'] # <-- THE TRIGGER
}
try:
resp = requests.post(ce_url, json=payload, auth=(API_USER, API_PASS))
resp.raise_for_status()
data = resp.json()
texts = data.get("texts", {})
subject = texts.get("subject", "N/A")
intro = texts.get("intro", "N/A")
print(f" -> Received Subject: '{subject}'")
if s['expect_keyword'].lower() not in (subject + intro).lower():
print(f" ⚠️ WARNING: Expected keyword '{s['expect_keyword']}' not found!")
except Exception as e:
print(f" ❌ CE API Failed: {e}")
continue
# 2. Write to SuperOffice (UDFs)
print(f"2. ✍️ Writing Texts to SuperOffice UDFs...")
udf_payload = {
settings.UDF_SUBJECT: subject,
settings.UDF_INTRO: intro,
settings.UDF_SOCIAL_PROOF: texts.get("social_proof", "")
}
if so_client.update_entity_udfs(TEST_PERSON_ID, "Person", udf_payload):
print(" -> UDFs Updated.")
else:
print(" -> ❌ UDF Update Failed.")
# 3. Create Appointment (Proof)
print(f"3. 📅 Creating Appointment in SuperOffice...")
appt_subject = f"[E2E TEST] {s['role_label']}: {subject}"
appt_desc = f"GENERATED CONTENT:\n\n{intro}\n\n{texts.get('social_proof')}"
appt = so_client.create_appointment(appt_subject, appt_desc, TEST_CONTACT_ID, TEST_PERSON_ID)
if appt:
print(f" -> ✅ Appointment Created (ID: {appt.get('AppointmentId')})")
else:
print(" -> ❌ Appointment Creation Failed.")
print("")
time.sleep(1) # Brief pause
print("🏁 Test Run Complete.")
if __name__ == "__main__":
run_roundtrip()

View File

@@ -5,6 +5,7 @@ import requests
import json
from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient
from config import settings
# Setup Logging
logging.basicConfig(
@@ -13,17 +14,9 @@ logging.basicConfig(
)
logger = logging.getLogger("connector-worker")
# Config
COMPANY_EXPLORER_URL = os.getenv("COMPANY_EXPLORER_URL", "http://company-explorer:8000")
# Poll Interval
POLL_INTERVAL = 5 # Seconds
# UDF Mapping (DEV) - Should be moved to config later
UDF_MAPPING = {
"subject": "SuperOffice:5",
"intro": "SuperOffice:6",
"social_proof": "SuperOffice:7"
}
def process_job(job, so_client: SuperOfficeClient):
"""
Core logic for processing a single job.
@@ -31,6 +24,37 @@ def process_job(job, so_client: SuperOfficeClient):
logger.info(f"Processing Job {job['id']} ({job['event_type']})")
payload = job['payload']
event_low = job['event_type'].lower()
# 0. Fast-Fail on Irrelevant Events (Noise Reduction)
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
return "SUCCESS"
# 0b. Fast-Fail on Irrelevant Field Changes
# Only if 'Changes' list is provided by Webhook
changes = [c.lower() for c in payload.get("Changes", [])]
if changes:
# Define what we care about (Strategic triggers for re-evaluation)
# Company: Name/Department (Identity), Urls (Source), Numbers (Matching)
relevant_contact = ["name", "department", "urladdress", "number1", "number2"]
# Person: JobTitle (Persona Logic), Position (Role Logic)
relevant_person = ["jobtitle", "position"]
is_relevant = False
if "contact" in event_low:
if any(f in changes for f in relevant_contact):
is_relevant = True
elif "urls" in changes: # Website might be in Urls collection
is_relevant = True
if "person" in event_low:
if any(f in changes for f in relevant_person):
is_relevant = True
if not is_relevant:
logger.info(f"Skipping irrelevant changes: {changes}")
return "SUCCESS"
# 1. Extract IDs from Webhook Payload
person_id = None
@@ -59,7 +83,6 @@ def process_job(job, so_client: SuperOfficeClient):
# --- Cascading Logic ---
# If a company changes, we want to update all its persons eventually.
# We do this by adding "person.changed" jobs for each person to the queue.
if "contact" in event_low and not person_id:
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
try:
@@ -88,7 +111,7 @@ def process_job(job, so_client: SuperOfficeClient):
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
# 2. Call Company Explorer Provisioning API
ce_url = f"{COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
@@ -97,6 +120,7 @@ def process_job(job, so_client: SuperOfficeClient):
"crm_website": crm_website
}
# Simple Basic Auth for internal API
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
@@ -112,28 +136,22 @@ def process_job(job, so_client: SuperOfficeClient):
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
return "RETRY"
if provisioning_data.get("status") == "processing":
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
return "RETRY"
except requests.exceptions.RequestException as e:
raise Exception(f"Company Explorer API failed: {e}")
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # DEBUG
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
# 2b. Sync Vertical to SuperOffice (Company Level)
vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
# Mappings from README
VERTICAL_MAP = {
"Logistics - Warehouse": 23,
"Healthcare - Hospital": 24,
"Infrastructure - Transport": 25,
"Leisure - Indoor Active": 26
}
vertical_id = VERTICAL_MAP.get(vertical_name)
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
except:
vertical_map = {}
logger.error("Failed to parse VERTICAL_MAP_JSON from config.")
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
@@ -141,7 +159,10 @@ def process_job(job, so_client: SuperOfficeClient):
# Check current value to avoid loops
current_contact = so_client.get_contact(contact_id)
current_udfs = current_contact.get("UserDefinedFields", {})
current_val = current_udfs.get("SuperOffice:5", "")
# Use Config UDF key
udf_key = settings.UDF_VERTICAL
current_val = current_udfs.get(udf_key, "")
# Normalize SO list ID format (e.g., "[I:26]" -> "26")
if current_val and current_val.startswith("[I:"):
@@ -149,7 +170,7 @@ def process_job(job, so_client: SuperOfficeClient):
if str(current_val) != str(vertical_id):
logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
so_client.update_entity_udfs(contact_id, "Contact", {"SuperOffice:5": str(vertical_id)})
so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)})
else:
logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
except Exception as e:
@@ -159,49 +180,66 @@ def process_job(job, so_client: SuperOfficeClient):
# 2c. Sync Website (Company Level)
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
"""
website = provisioning_data.get("website")
if website and website != "k.A.":
try:
# Re-fetch contact to ensure we work on latest version (Optimistic Concurrency)
contact_data = so_client.get_contact(contact_id)
current_url = contact_data.get("UrlAddress", "")
# Normalize for comparison
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
if norm(current_url) != norm(website):
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
# Re-enable via config if needed
if settings.ENABLE_WEBSITE_SYNC:
website = provisioning_data.get("website")
if website and website != "k.A.":
try:
contact_data = so_client.get_contact(contact_id)
current_url = contact_data.get("UrlAddress", "")
# Update Urls collection (Rank 1)
new_urls = []
if "Urls" in contact_data:
found = False
for u in contact_data["Urls"]:
if u.get("Rank") == 1:
u["Value"] = website
found = True
new_urls.append(u)
if not found:
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
contact_data["Urls"] = new_urls
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
if norm(current_url) != norm(website):
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
# Update Urls collection (Rank 1)
new_urls = []
if "Urls" in contact_data:
found = False
for u in contact_data["Urls"]:
if u.get("Rank") == 1:
u["Value"] = website
found = True
new_urls.append(u)
if not found:
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
contact_data["Urls"] = new_urls
else:
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
if not current_url:
contact_data["UrlAddress"] = website
so_client._put(f"Contact/{contact_id}", contact_data)
else:
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
# Also set main field if empty
if not current_url:
contact_data["UrlAddress"] = website
logger.info(f"Website for Contact {contact_id} already in sync.")
except Exception as e:
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
# Write back full object
so_client._put(f"Contact/{contact_id}", contact_data)
else:
logger.info(f"Website for Contact {contact_id} already in sync.")
except Exception as e:
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
"""
# 2d. Sync Person Position (Role) - if Person exists
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
except:
persona_map = {}
logger.error("Failed to parse PERSONA_MAP_JSON from config.")
position_id = persona_map.get(role_name)
if position_id:
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
try:
success = so_client.update_person_position(person_id, int(position_id))
if not success:
logger.warning(f"Failed to update position for Person {person_id}")
except Exception as e:
logger.error(f"Error syncing position for Person {person_id}: {e}")
else:
logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.")
# 3. Update SuperOffice (Only if person_id is present)
# 3. Update SuperOffice Texts (Only if person_id is present)
if not person_id:
logger.info("Sync complete (Company only). No texts to write back.")
return "SUCCESS"
@@ -212,9 +250,9 @@ def process_job(job, so_client: SuperOfficeClient):
return "SUCCESS"
udf_update = {}
if texts.get("subject"): udf_update[UDF_MAPPING["subject"]] = texts["subject"]
if texts.get("intro"): udf_update[UDF_MAPPING["intro"]] = texts["intro"]
if texts.get("social_proof"): udf_update[UDF_MAPPING["social_proof"]] = texts["social_proof"]
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if udf_update:
# Loop Prevention
@@ -250,6 +288,8 @@ def run_worker():
while not so_client:
try:
so_client = SuperOfficeClient()
if not so_client.access_token: # Check if auth worked
raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
time.sleep(30)

36
debug_notion_schema.py Normal file
View File

@@ -0,0 +1,36 @@
import requests
import json
# Notion Config
try:
with open("notion_token.txt", "r") as f:
NOTION_TOKEN = f.read().strip()
except FileNotFoundError:
print("Error: notion_token.txt not found.")
exit(1)
NOTION_VERSION = "2022-06-28"
NOTION_API_BASE_URL = "https://api.notion.com/v1"
HEADERS = {
"Authorization": f"Bearer {NOTION_TOKEN}",
"Notion-Version": NOTION_VERSION,
"Content-Type": "application/json",
}
# DB ID from import_product.py
DB_ID = "2e288f42-8544-8113-b878-ec99c8a02a6b"
def get_db_properties(database_id):
url = f"{NOTION_API_BASE_URL}/databases/{database_id}"
try:
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response.json().get("properties")
except Exception as e:
print(f"Error: {e}")
return None
props = get_db_properties(DB_ID)
if props:
print(json.dumps(props, indent=2))

70
fix_industry_units.py Normal file
View File

@@ -0,0 +1,70 @@
import sqlite3
DB_PATH = "companies_v3_fixed_2.db"
UNIT_MAPPING = {
"Logistics - Warehouse": "",
"Healthcare - Hospital": "Betten",
"Infrastructure - Transport": "Passagiere",
"Leisure - Indoor Active": "",
"Retail - Food": "",
"Retail - Shopping Center": "",
"Hospitality - Gastronomy": "Sitzplätze",
"Leisure - Outdoor Park": "Besucher",
"Leisure - Wet & Spa": "Besucher",
"Infrastructure - Public": "Kapazität",
"Retail - Non-Food": "",
"Hospitality - Hotel": "Zimmer",
"Leisure - Entertainment": "Besucher",
"Healthcare - Care Home": "Plätze",
"Industry - Manufacturing": "Mitarbeiter",
"Energy - Grid & Utilities": "Kunden",
"Leisure - Fitness": "Mitglieder",
"Corporate - Campus": "Mitarbeiter",
"Energy - Solar/Wind": "MWp",
"Tech - Data Center": "Racks",
"Automotive - Dealer": "Fahrzeuge",
"Infrastructure Parking": "Stellplätze",
"Reinigungsdienstleister": "Mitarbeiter",
"Infrastructure - Communities": "Einwohner"
}
def fix_units():
print(f"Connecting to {DB_PATH}...")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("SELECT id, name, scraper_search_term, metric_type FROM industries")
rows = cursor.fetchall()
updated_count = 0
for row in rows:
ind_id, name, current_term, m_type = row
new_term = UNIT_MAPPING.get(name)
# Fallback Logic
if not new_term:
if m_type in ["AREA_IN", "AREA_OUT"]:
new_term = ""
else:
new_term = "Anzahl" # Generic fallback
if current_term != new_term:
print(f"Updating '{name}': '{current_term}' -> '{new_term}'")
cursor.execute("UPDATE industries SET scraper_search_term = ? WHERE id = ?", (new_term, ind_id))
updated_count += 1
conn.commit()
print(f"\n✅ Updated {updated_count} industries with correct units.")
except Exception as e:
print(f"❌ Error: {e}")
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
fix_units()

View File

@@ -21,6 +21,37 @@ gitea: none
---
# Projekt: Automatisierte Unternehmensbewertung & Lead-Generierung v2.2.1
## Current Status (Feb 20, 2026) - SuperOffice Integration Ready (v2.0)
### 1. SuperOffice Connector v2.0 ("The Muscle")
* **Event-Driven Architecture:** Hochperformante Webhook-Verarbeitung mit intelligenter "Noise Reduction". Ignoriert irrelevante Änderungen (z.B. Telefonnummern) und verhindert Kaskaden-Effekte.
* **Persona Mapping Engine:** Automatische Zuweisung von SuperOffice-Rollen ("Position") basierend auf Jobtiteln. Neue Rolle **"Influencer"** für Einkäufer/Techniker integriert.
* **Robustheit:** Konfiguration vollständig in `.env` ausgelagert. End-to-End Tests mit Termin-Simulation (statt E-Mail) verifiziert.
### 2. Marketing Matrix Engine ("The Brain")
* **Gemini 1.5 Pro Integration:** Der Matrix-Generator erstellt nun vollautomatisch hyper-personalisierte E-Mail-Texte für alle 125 Kombinationen (25 Branchen x 5 Personas).
* **Intelligente Prompts:** Kombiniert Branchen-Pains (z.B. "Logistik-Druck") mit Rollen-Pains (z.B. "Effizienz-Zwang GF").
### 3. UI/UX & Data Quality
* **Unit Fix:** Korrektur der Einheiten-Anzeige im Frontend (m², Betten, etc.).
* **Influencer Role:** Im Frontend nun als Mapping-Option verfügbar.
---
## 🚀 Next Steps for User (Immediate Actions)
1. **Content Generierung (Matrix füllen):**
Lassen Sie den Generator einmal laufen, um die Texte für alle Branchen zu erstellen (Dauer: ca. 10 Min).
```bash
export PYTHONPATH=$PYTHONPATH:/app/company-explorer
python3 company-explorer/backend/scripts/generate_matrix.py --live
```
2. **Produktions-Deployment:**
Folgen Sie der Anleitung in `connector-superoffice/README.md`, um die App im Developer Portal zu registrieren und den Webhook anzulegen.
---
## 1. Projektübersicht & Architektur
Dieses Projekt ist eine modulare "Lead Enrichment Factory", die darauf ausgelegt ist, Unternehmensdaten aus einem D365-CRM-System automatisiert anzureichern, zu analysieren und für Marketing- & Vertriebszwecke aufzubereiten.
@@ -63,7 +94,14 @@ VII. DIE STRATEGIE-SCHMIEDE (GTM Architect)
├── gtm-architect/ (React Frontend)
└── server.cjs (Node.js API-Bridge)
VIII. DAS FUNDAMENT
VIII. MARKETING AUTOMATION CORE (Company Explorer)
└── Backend-Logik für hyper-personalisierte E-Mail-Texte (vertical x persona)
├── database.py (Neue 'Persona' Tabelle, angepasste 'MarketingMatrix')
├── scripts/seed_marketing_data.py (Befüllt 'Persona' mit Pains/Gains)
├── scripts/sync_notion_personas.py (Synchronisiert Personas aus Notion)
└── scripts/generate_matrix.py (Generiert Texte für alle Vertical x Persona Kombinationen)
IX. DAS FUNDAMENT
└── config.py (Einstellungen & Konstanten für ALLE)
```

64
seed_test_matrix.py Normal file
View File

@@ -0,0 +1,64 @@
import sqlite3
import datetime
DB_PATH = "companies_v3_fixed_2.db"
def seed_matrix():
print(f"Connecting to {DB_PATH}...")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Configuration of Test Scenarios
scenarios = [
{
"ind_id": 1, # Logistics
"pers_id": 3, # Wirtschaftlicher Entscheider (GF)
"subject": "Kostenreduktion in Ihrer Intralogistik durch autonome Reinigung",
"intro": "als Geschäftsführer wissen Sie: Effizienz ist der Schlüssel. Unsere Roboter senken Ihre Reinigungskosten um bis zu 30% und amortisieren sich in unter 12 Monaten.",
"proof": "Referenzkunden wie DB Schenker und DHL setzen bereits auf unsere Flotte und konnten ihre Prozesskosten signifikant senken."
},
{
"ind_id": 1, # Logistics
"pers_id": 1, # Operativer Entscheider (Lagerleiter maps here!)
"subject": "Weniger Stress mit der Sauberkeit in Ihren Hallen",
"intro": "kennen Sie das Problem: Die Reinigungskräfte fallen aus und der Staub legt sich auf die Ware. Unsere autonomen Systeme reinigen nachts, zuverlässig und ohne, dass Sie sich darum kümmern müssen.",
"proof": "Lagerleiter bei Fiege berichten von einer deutlichen Entlastung des Teams und saubereren Böden ohne Mehraufwand."
}
]
try:
now = datetime.datetime.utcnow().isoformat()
for s in scenarios:
# Check existance
cursor.execute(
"SELECT id FROM marketing_matrix WHERE industry_id = ? AND persona_id = ?",
(s['ind_id'], s['pers_id'])
)
existing = cursor.fetchone()
if existing:
print(f"Updating Matrix for Ind {s['ind_id']} / Pers {s['pers_id']}...")
cursor.execute("""
UPDATE marketing_matrix
SET subject = ?, intro = ?, social_proof = ?, updated_at = ?
WHERE id = ?
""", (s['subject'], s['intro'], s['proof'], now, existing[0]))
else:
print(f"Inserting Matrix for Ind {s['ind_id']} / Pers {s['pers_id']}...")
cursor.execute("""
INSERT INTO marketing_matrix (industry_id, persona_id, subject, intro, social_proof, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (s['ind_id'], s['pers_id'], s['subject'], s['intro'], s['proof'], now))
conn.commit()
print("✅ Matrix updated with realistic test data.")
except Exception as e:
print(f"❌ Error: {e}")
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
seed_matrix()

161
sync_archetypes_final.py Normal file
View File

@@ -0,0 +1,161 @@
import requests
import json
import os
# --- Configuration ---
try:
with open("notion_token.txt", "r") as f:
NOTION_TOKEN = f.read().strip()
except FileNotFoundError:
print("Error: notion_token.txt not found.")
exit(1)
NOTION_VERSION = "2022-06-28"
NOTION_API_BASE_URL = "https://api.notion.com/v1"
HEADERS = {
"Authorization": f"Bearer {NOTION_TOKEN}",
"Notion-Version": NOTION_VERSION,
"Content-Type": "application/json",
}
# DB: Personas / Roles
DB_ID = "30588f42854480c38919e22d74d945ea"
# --- Data for Archetypes ---
archetypes = [
{
"name": "Wirtschaftlicher Entscheider",
"pains": [
"Steigende Personalkosten im Reinigungs- und Servicebereich gefährden Profitabilität.",
"Fachkräftemangel und Schwierigkeiten bei der Stellenbesetzung.",
"Inkonsistente Qualitätsstandards schaden dem Ruf des Hauses.",
"Hoher Managementaufwand für manuelle operative Prozesse."
],
"gains": [
"Reduktion operativer Personalkosten um 10-25%.",
"Deutliche Abnahme der Überstunden (bis zu 50%).",
"Sicherstellung konstant hoher Qualitätsstandards.",
"Erhöhung der operativen Effizienz durch präzise Datenanalysen."
],
"kpis": "Betriebskosten pro Einheit, Gästezufriedenheit (NPS), Mitarbeiterfluktuation.",
"positions": "Direktor, Geschäftsführer, C-Level, Einkaufsleiter."
},
{
"name": "Operativer Entscheider",
"pains": [
"Team ist überlastet und gestresst (Gefahr hoher Fluktuation).",
"Zu viele manuelle Routineaufgaben wie Abräumen oder Materialtransport.",
"Mangelnde Personalverfügbarkeit in Stoßzeiten führt zu Engpässen."
],
"gains": [
"Signifikante Entlastung des Personals von Routineaufgaben (20-40% Zeitgewinn).",
"Garantierte Reinigungszyklen unabhängig von Personalausfällen.",
"Mehr Zeit für wertschöpfende Aufgaben (Gästebetreuung, Upselling)."
],
"kpis": "Zeitaufwand für Routineaufgaben, Abdeckungsrate der Zyklen, Servicegeschwindigkeit.",
"positions": "Leiter Housekeeping, F&B Manager, Restaurantleiter, Stationsleitung."
},
{
"name": "Infrastruktur-Verantwortlicher",
"pains": [
"Technische Komplexität der Integration in bestehende Infrastruktur (Aufzüge, WLAN).",
"Sorge vor hohen Ausfallzeiten und unplanmäßigen Wartungskosten.",
"Fehlendes internes Fachpersonal für die Wartung autonomer Systeme."
],
"gains": [
"Reibungslose Integration (20-30% schnellere Implementierung).",
"Minimierung von Ausfallzeiten um 80-90% durch proaktives Monitoring.",
"Planbare Wartung und transparente Kosten durch feste SLAs."
],
"kpis": "System-Uptime, Implementierungszeit, Wartungskosten (TCO).",
"positions": "Technischer Leiter, Facility Manager, IT-Leiter."
},
{
"name": "Innovations-Treiber",
"pains": [
"Verlust der Wettbewerbsfähigkeit durch veraltete Prozesse.",
"Schwierigkeit das Unternehmen als modernen Arbeitgeber zu positionieren.",
"Statische Informations- und Marketingflächen werden oft ignoriert."
],
"gains": [
"Positionierung als Innovationsführer am Markt.",
"Steigerung der Kundeninteraktion um 20-30%.",
"Gewinnung wertvoller Daten zur kontinuierlichen Prozessoptimierung.",
"Erhöhte Attraktivität für junge, technikaffine Talente."
],
"kpis": "Besucherinteraktionsrate, Anzahl Prozessinnovationen, Modernitäts-Sentiment.",
"positions": "Marketingleiter, Center Manager, CDO, Business Development."
}
]
# --- Helper Functions ---
def format_rich_text(text):
return {"rich_text": [{"type": "text", "text": {"content": text}}]}
def format_title(text):
return {"title": [{"type": "text", "text": {"content": text}}]}
def find_page(title):
url = f"{NOTION_API_BASE_URL}/databases/{DB_ID}/query"
payload = {
"filter": {
"property": "Role",
"title": {"equals": title}
}
}
resp = requests.post(url, headers=HEADERS, json=payload)
resp.raise_for_status()
results = resp.json().get("results")
return results[0] if results else None
def create_page(properties):
url = f"{NOTION_API_BASE_URL}/pages"
payload = {
"parent": {"database_id": DB_ID},
"properties": properties
}
resp = requests.post(url, headers=HEADERS, json=payload)
resp.raise_for_status()
print("Created.")
def update_page(page_id, properties):
url = f"{NOTION_API_BASE_URL}/pages/{page_id}"
payload = {"properties": properties}
resp = requests.patch(url, headers=HEADERS, json=payload)
resp.raise_for_status()
print("Updated.")
# --- Main Logic ---
def main():
print(f"Syncing {len(archetypes)} Personas to Notion DB {DB_ID}...")
for p in archetypes:
print(f"Processing '{p['name']}'...")
pains_text = "\n".join([f"- {item}" for item in p["pains"]])
gains_text = "\n".join([f"- {item}" for item in p["gains"]])
properties = {
"Role": format_title(p["name"]),
"Pains": format_rich_text(pains_text),
"Gains": format_rich_text(gains_text),
"KPIs": format_rich_text(p.get("kpis", "")),
"Typische Positionen": format_rich_text(p.get("positions", ""))
}
existing_page = find_page(p["name"])
if existing_page:
print(f" -> Found existing page {existing_page['id']}. Updating...")
update_page(existing_page["id"], properties)
else:
print(" -> Creating new page...")
create_page(properties)
print("Sync complete.")
if __name__ == "__main__":
main()

150
sync_personas_to_notion.py Normal file
View File

@@ -0,0 +1,150 @@
import requests
import json
# --- Configuration ---
try:
with open("notion_token.txt", "r") as f:
NOTION_TOKEN = f.read().strip()
except FileNotFoundError:
print("Error: notion_token.txt not found.")
exit(1)
NOTION_VERSION = "2022-06-28"
NOTION_API_BASE_URL = "https://api.notion.com/v1"
HEADERS = {
"Authorization": f"Bearer {NOTION_TOKEN}",
"Notion-Version": NOTION_VERSION,
"Content-Type": "application/json",
}
# DB: Sector & Persona Master
DB_ID = "2e288f42-8544-8113-b878-ec99c8a02a6b"
# --- Data ---
archetypes = [
{
"name": "Wirtschaftlicher Entscheider",
"pains": [
"Steigende operative Personalkosten und Fachkräftemangel gefährden die Profitabilität.",
"Unklare Amortisation (ROI) und Risiko von Fehlinvestitionen bei neuen Technologien.",
"Intransparente Folgekosten (TCO) und schwierige Budgetplanung über die Lebensdauer."
],
"gains": [
"Nachweisbare Senkung der operativen Kosten (10-25%) und schnelle Amortisation.",
"Sicherung der Wettbewerbsfähigkeit durch effizientere Kostenstrukturen.",
"Volle Transparenz und Planbarkeit durch klare Service-Modelle (SLAs)."
]
},
{
"name": "Operativer Entscheider",
"pains": [
"Personelle Unterbesetzung führt zu Überstunden, Stress und Qualitätsmängeln.",
"Wiederkehrende Routineaufgaben binden wertvolle Fachkräfte-Ressourcen.",
"Schwierigkeit, gleichbleibend hohe Standards (Hygiene/Service) 24/7 zu garantieren."
],
"gains": [
"Spürbare Entlastung des Teams von Routineaufgaben (20-40%).",
"Garantierte, gleichbleibend hohe Ausführungsqualität unabhängig von der Tagesform.",
"Stabilisierung der operativen Abläufe und Kompensation von Personalausfällen."
]
},
{
"name": "Infrastruktur-Verantwortlicher",
"pains": [
"Sorge vor komplexer Integration in bestehende IT- und Gebäudeinfrastruktur (WLAN, Türen, Aufzüge).",
"Risiko von hohen Ausfallzeiten und aufwändiger Fehlerbehebung ohne internes Spezialwissen.",
"Unklare Wartungsaufwände und Angst vor 'Insel-Lösungen' ohne Schnittstellen."
],
"gains": [
"Reibungslose, fachgerechte Integration durch Experten-Support (Plug & Play).",
"Maximale Betriebssicherheit durch proaktives Monitoring und schnelle Reaktionszeiten.",
"Zentrales Management und volle Transparenz über Systemstatus und Wartungsbedarf."
]
},
{
"name": "Innovations-Treiber",
"pains": [
"Verlust der Attraktivität als moderner Arbeitgeber oder Dienstleister (Veraltetes Image).",
"Fehlende 'Wow-Effekte' in der Kundeninteraktion und mangelnde Differenzierung vom Wettbewerb.",
"Verpasste Chancen durch fehlende Datengrundlage für digitale Optimierungen."
],
"gains": [
"Positionierung als Innovationsführer und Steigerung der Markenattraktivität.",
"Schaffung einzigartiger Kundenerlebnisse durch sichtbare High-Tech-Lösungen.",
"Gewinnung wertvoller Daten zur kontinuierlichen Prozessoptimierung und Digitalisierung."
]
}
]
# --- Helper Functions ---
def format_rich_text(text):
return {"rich_text": [{"type": "text", "text": {"content": text}}]}
def format_title(text):
return {"title": [{"type": "text", "text": {"content": text}}]}
def find_page(title):
url = f"{NOTION_API_BASE_URL}/databases/{DB_ID}/query"
payload = {
"filter": {
"property": "Name",
"title": {"equals": title}
}
}
resp = requests.post(url, headers=HEADERS, json=payload)
resp.raise_for_status()
results = resp.json().get("results")
return results[0] if results else None
def create_page(properties):
url = f"{NOTION_API_BASE_URL}/pages"
payload = {
"parent": {"database_id": DB_ID},
"properties": properties
}
resp = requests.post(url, headers=HEADERS, json=payload)
resp.raise_for_status()
print("Created.")
def update_page(page_id, properties):
url = f"{NOTION_API_BASE_URL}/pages/{page_id}"
payload = {"properties": properties}
resp = requests.patch(url, headers=HEADERS, json=payload)
resp.raise_for_status()
print("Updated.")
# --- Main Sync Loop ---
def main():
print(f"Syncing {len(archetypes)} Personas to Notion DB {DB_ID}...")
for p in archetypes:
print(f"Processing '{p['name']}'...")
# Format Pains/Gains as lists with bullets for Notion Text field
pains_text = "\n".join([f"- {item}" for item in p["pains"]])
gains_text = "\n".join([f"- {item}" for item in p["gains"]])
properties = {
"Name": format_title(p["name"]),
"Pains": format_rich_text(pains_text),
"Gains": format_rich_text(gains_text),
# Optional: Add a tag to distinguish them from Sectors if needed?
# Currently just relying on Name uniqueness.
}
existing_page = find_page(p["name"])
if existing_page:
print(f" -> Found existing page {existing_page['id']}. Updating...")
update_page(existing_page["id"], properties)
else:
print(" -> Creating new page...")
create_page(properties)
print("Sync complete.")
if __name__ == "__main__":
main()