[2ff88f42] multiplikation vorbereitet

multiplikation vorbereitet
This commit is contained in:
2026-02-19 20:59:04 +00:00
parent 95b80f0bbc
commit f65df42f55
15 changed files with 982 additions and 27 deletions

View File

@@ -0,0 +1,22 @@
import sys
import os
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, JobRoleMapping
def check_mappings():
db = SessionLocal()
count = db.query(JobRoleMapping).count()
print(f"Total JobRoleMappings: {count}")
examples = db.query(JobRoleMapping).limit(5).all()
for ex in examples:
print(f" - {ex.pattern} -> {ex.role}")
db.close()
if __name__ == "__main__":
check_mappings()

View File

@@ -0,0 +1,162 @@
import sys
import os
import json
import argparse
from typing import List
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, Industry, Persona, MarketingMatrix
# --- Configuration ---
MODEL = "gpt-4o"
def generate_prompt(industry: Industry, persona: Persona) -> str:
"""
Builds the prompt for the AI to generate the marketing texts.
Combines Industry context with Persona specific pains/gains.
"""
# Safely load JSON lists
try:
persona_pains = json.loads(persona.pains) if persona.pains else []
persona_gains = json.loads(persona.gains) if persona.gains else []
except:
persona_pains = [persona.pains] if persona.pains else []
persona_gains = [persona.gains] if persona.gains else []
industry_pains = industry.pains if industry.pains else "Allgemeine Effizienzprobleme"
prompt = f"""
Du bist ein erfahrener B2B-Copywriter für Robotik-Lösungen (Reinigung, Transport, Service).
Ziel: Erstelle personalisierte E-Mail-Textbausteine für einen Outreach.
--- KONTEXT ---
ZIELBRANCHE: {industry.name}
BRANCHEN-KONTEXT: {industry.description or 'Keine spezifische Beschreibung'}
BRANCHEN-PAINS: {industry_pains}
ZIELPERSON (ARCHETYP): {persona.name}
PERSÖNLICHE PAINS (Herausforderungen):
{chr(10).join(['- ' + p for p in persona_pains])}
GEWÜNSCHTE GAINS (Ziele):
{chr(10).join(['- ' + g for g in persona_gains])}
--- AUFGABE ---
Erstelle ein JSON-Objekt mit genau 3 Textbausteinen.
Tonalität: Professionell, lösungsorientiert, auf den Punkt. Keine Marketing-Floskeln ("Game Changer").
1. "subject": Betreffzeile (Max 6 Wörter). Muss neugierig machen und einen Pain adressieren.
2. "intro": Einleitungssatz (1-2 Sätze). Verbinde die Branchen-Herausforderung mit der persönlichen Rolle des Empfängers. Zeige Verständnis für seine Situation.
3. "social_proof": Ein Satz, der Vertrauen aufbaut. Nenne generische Erfolge (z.B. "Unternehmen in der {industry.name} senken so ihre Kosten um 15%"), da wir noch keine spezifischen Logos nennen dürfen.
--- FORMAT ---
{{
"subject": "...",
"intro": "...",
"social_proof": "..."
}}
"""
return prompt
def mock_openai_call(prompt: str):
"""Simulates an API call for dry runs."""
print(f"\n--- [MOCK] GENERATING PROMPT ---\n{prompt[:300]}...\n--------------------------------")
return {
"subject": "[MOCK] Effizienzsteigerung in der Produktion",
"intro": "[MOCK] Als Produktionsleiter wissen Sie, wie teuer Stillstand ist. Unsere Roboter helfen.",
"social_proof": "[MOCK] Ähnliche Betriebe sparten 20% Kosten."
}
def real_openai_call(prompt: str):
# This would link to the actual OpenAI client
# For now, we keep it simple or import from a lib
import openai
from backend.config import settings
if not settings.OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY not set")
client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
response = client.chat.completions.create(
model=MODEL,
response_format={"type": "json_object"},
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
return json.loads(response.choices[0].message.content)
def run_matrix_generation(dry_run: bool = True, force: bool = False):
db = SessionLocal()
try:
industries = db.query(Industry).all()
personas = db.query(Persona).all()
print(f"Found {len(industries)} Industries and {len(personas)} Personas.")
print(f"Mode: {'DRY RUN (No API calls, no DB writes)' if dry_run else 'LIVE'}")
total_combinations = len(industries) * len(personas)
processed = 0
for ind in industries:
for pers in personas:
processed += 1
print(f"[{processed}/{total_combinations}] Check: {ind.name} x {pers.name}")
# Check existing
existing = db.query(MarketingMatrix).filter(
MarketingMatrix.industry_id == ind.id,
MarketingMatrix.persona_id == pers.id
).first()
if existing and not force:
print(f" -> Skipped (Already exists)")
continue
# Generate
prompt = generate_prompt(ind, pers)
if dry_run:
result = mock_openai_call(prompt)
else:
try:
result = real_openai_call(prompt)
except Exception as e:
print(f" -> API ERROR: {e}")
continue
# Write to DB (only if not dry run)
if not dry_run:
if not existing:
new_entry = MarketingMatrix(
industry_id=ind.id,
persona_id=pers.id,
subject=result.get("subject"),
intro=result.get("intro"),
social_proof=result.get("social_proof")
)
db.add(new_entry)
print(f" -> Created new entry.")
else:
existing.subject = result.get("subject")
existing.intro = result.get("intro")
existing.social_proof = result.get("social_proof")
print(f" -> Updated entry.")
db.commit()
except Exception as e:
print(f"Error: {e}")
finally:
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--live", action="store_true", help="Actually call OpenAI and write to DB")
parser.add_argument("--force", action="store_true", help="Overwrite existing matrix entries")
args = parser.parse_args()
run_matrix_generation(dry_run=not args.live, force=args.force)

View File

@@ -0,0 +1,123 @@
import sys
import os
import json
# Setup Environment to import backend modules
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, Persona, JobRoleMapping
def seed_archetypes():
db = SessionLocal()
print("Seeding Strategic Archetypes (Pains & Gains)...")
# --- 1. The 4 Strategic Archetypes ---
# Based on user input and synthesis of previous specific roles
archetypes = [
{
"name": "Operativer Entscheider",
"pains": [
"Personelle Unterbesetzung und hohe Fluktuation führen zu Überstunden und Qualitätsmängeln.",
"Manuelle, wiederkehrende Prozesse binden wertvolle Ressourcen und senken die Effizienz.",
"Sicherstellung gleichbleibend hoher Standards (Hygiene/Service) ist bei Personalmangel kaum möglich."
],
"gains": [
"Spürbare Entlastung des Teams von Routineaufgaben (20-40%).",
"Garantierte, gleichbleibend hohe Ausführungsqualität rund um die Uhr.",
"Stabilisierung der operativen Abläufe unabhängig von kurzfristigen Personalausfällen."
]
},
{
"name": "Infrastruktur-Verantwortlicher",
"pains": [
"Integration neuer Systeme in bestehende Gebäude/IT ist oft komplex und risikobehaftet.",
"Sorge vor hohen Ausfallzeiten und aufwändiger Fehlerbehebung ohne internes Spezialwissen.",
"Unklare Wartungsaufwände und Schnittstellenprobleme (WLAN, Aufzüge, Türen)."
],
"gains": [
"Reibungslose, fachgerechte Integration in die bestehende Infrastruktur.",
"Maximale Betriebssicherheit durch proaktives Monitoring und schnelle Reaktionszeiten.",
"Volle Transparenz über Systemstatus und Wartungsbedarf."
]
},
{
"name": "Wirtschaftlicher Entscheider",
"pains": [
"Steigende operative Kosten (Personal, Material) drücken auf die Margen.",
"Unklare Amortisation (ROI) und Risiko von Fehlinvestitionen bei neuen Technologien.",
"Intransparente Folgekosten (TCO) über die Lebensdauer der Anlagen."
],
"gains": [
"Nachweisbare Senkung der operativen Kosten (10-25%).",
"Transparente und planbare Kostenstruktur (TCO) ohne versteckte Überraschungen.",
"Schneller, messbarer Return on Investment durch Effizienzsteigerung."
]
},
{
"name": "Innovations-Treiber",
"pains": [
"Verlust der Wettbewerbsfähigkeit durch veraltete Prozesse und Kundenangebote.",
"Schwierigkeit, das Unternehmen als modernes, zukunftsorientiertes Brand zu positionieren.",
"Verpasste Chancen durch fehlende Datengrundlage für Optimierungen."
],
"gains": [
"Positionierung als Innovationsführer und Steigerung der Arbeitgeberattraktivität.",
"Nutzung modernster Technologie als sichtbares Differenzierungsmerkmal.",
"Gewinnung wertvoller Daten zur kontinuierlichen Prozessoptimierung."
]
}
]
# Clear existing Personas to avoid mix-up with old granular ones
# (In production, we might want to be more careful, but here we want a clean slate for the new archetypes)
try:
db.query(Persona).delete()
db.commit()
print("Cleared old Personas.")
except Exception as e:
print(f"Warning clearing personas: {e}")
for p_data in archetypes:
print(f"Creating Archetype: {p_data['name']}")
new_persona = Persona(
name=p_data["name"],
pains=json.dumps(p_data["pains"]),
gains=json.dumps(p_data["gains"])
)
db.add(new_persona)
db.commit()
# --- 2. Update JobRoleMappings to map to Archetypes ---
# We map the patterns to the new 4 Archetypes
mapping_updates = [
# Wirtschaftlicher Entscheider
{"role": "Wirtschaftlicher Entscheider", "patterns": ["%geschäftsführer%", "%ceo%", "%director%", "%einkauf%", "%procurement%", "%finance%", "%cfo%"]},
# Operativer Entscheider
{"role": "Operativer Entscheider", "patterns": ["%housekeeping%", "%hausdame%", "%hauswirtschaft%", "%reinigung%", "%restaurant%", "%f&b%", "%werksleiter%", "%produktionsleiter%", "%lager%", "%logistik%", "%operations%", "%coo%"]},
# Infrastruktur-Verantwortlicher
{"role": "Infrastruktur-Verantwortlicher", "patterns": ["%facility%", "%technik%", "%instandhaltung%", "%it-leiter%", "%cto%", "%admin%", "%building%"]},
# Innovations-Treiber
{"role": "Innovations-Treiber", "patterns": ["%innovation%", "%digital%", "%transformation%", "%business dev%", "%marketing%"]}
]
# Clear old mappings to prevent confusion
db.query(JobRoleMapping).delete()
db.commit()
print("Cleared old JobRoleMappings.")
for group in mapping_updates:
role_name = group["role"]
for pattern in group["patterns"]:
print(f"Mapping '{pattern}' -> '{role_name}'")
db.add(JobRoleMapping(pattern=pattern, role=role_name))
db.commit()
print("Archetypes and Mappings Seeded Successfully.")
db.close()
if __name__ == "__main__":
seed_archetypes()

View File

@@ -0,0 +1,134 @@
import sys
import os
import requests
import json
import logging
# Add company-explorer to path (parent of backend)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from backend.database import SessionLocal, Persona, init_db
from backend.config import settings
# Setup Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
NOTION_TOKEN_FILE = "/app/notion_token.txt"
# Sector & Persona Master DB
PERSONAS_DB_ID = "2e288f42-8544-8113-b878-ec99c8a02a6b"
VALID_ARCHETYPES = {
"Wirtschaftlicher Entscheider",
"Operativer Entscheider",
"Infrastruktur-Verantwortlicher",
"Innovations-Treiber"
}
def load_notion_token():
try:
with open(NOTION_TOKEN_FILE, "r") as f:
return f.read().strip()
except FileNotFoundError:
logger.error(f"Notion token file not found at {NOTION_TOKEN_FILE}")
sys.exit(1)
def query_notion_db(token, db_id):
url = f"https://api.notion.com/v1/databases/{db_id}/query"
headers = {
"Authorization": f"Bearer {token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
results = []
has_more = True
next_cursor = None
while has_more:
payload = {}
if next_cursor:
payload["start_cursor"] = next_cursor
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
logger.error(f"Error querying Notion DB {db_id}: {response.text}")
break
data = response.json()
results.extend(data.get("results", []))
has_more = data.get("has_more", False)
next_cursor = data.get("next_cursor")
return results
def extract_title(prop):
if not prop: return ""
return "".join([t.get("plain_text", "") for t in prop.get("title", [])])
def extract_rich_text_to_list(prop):
"""
Extracts rich text and converts bullet points/newlines into a list of strings.
"""
if not prop: return []
full_text = "".join([t.get("plain_text", "") for t in prop.get("rich_text", [])])
# Split by newline and clean up bullets
lines = full_text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if not line: continue
if line.startswith("- "):
line = line[2:]
elif line.startswith(""):
line = line[2:]
cleaned_lines.append(line)
return cleaned_lines
def sync_personas(token, session):
logger.info("Syncing Personas from Notion...")
pages = query_notion_db(token, PERSONAS_DB_ID)
count = 0
for page in pages:
props = page.get("properties", {})
name = extract_title(props.get("Name"))
if name not in VALID_ARCHETYPES:
logger.debug(f"Skipping '{name}' (Not a target Archetype)")
continue
logger.info(f"Processing Persona: {name}")
pains_list = extract_rich_text_to_list(props.get("Pains"))
gains_list = extract_rich_text_to_list(props.get("Gains"))
# Upsert Logic
persona = session.query(Persona).filter(Persona.name == name).first()
if not persona:
persona = Persona(name=name)
session.add(persona)
logger.info(f" -> Creating new entry")
else:
logger.info(f" -> Updating existing entry")
persona.pains = json.dumps(pains_list, ensure_ascii=False)
persona.gains = json.dumps(gains_list, ensure_ascii=False)
count += 1
session.commit()
logger.info(f"Sync complete. Updated {count} personas.")
if __name__ == "__main__":
token = load_notion_token()
db = SessionLocal()
try:
sync_personas(token, db)
except Exception as e:
logger.error(f"Sync failed: {e}", exc_info=True)
finally:
db.close()

View File

@@ -7,7 +7,7 @@ import logging
# /app/backend/scripts/sync.py -> /app
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from backend.database import SessionLocal, Industry, RoboticsCategory, init_db
from backend.database import SessionLocal, Industry, RoboticsCategory, Persona, init_db
from dotenv import load_dotenv
# Try loading from .env in root if exists
@@ -76,6 +76,21 @@ def extract_number(prop):
if not prop or "number" not in prop: return None
return prop["number"]
def extract_rich_text_to_list(prop):
if not prop or "rich_text" not in prop: return []
full_text = "".join([t.get("plain_text", "") for t in prop.get("rich_text", [])])
lines = full_text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if not line: continue
if line.startswith("- "):
line = line[2:]
elif line.startswith(""):
line = line[2:]
cleaned_lines.append(line)
return cleaned_lines
def sync():
logger.info("--- Starting Enhanced Sync ---")
@@ -83,6 +98,48 @@ def sync():
init_db()
session = SessionLocal()
# --- 4. Sync Personas (NEW) ---
# Sector & Persona Master ID
PERSONAS_DB_ID = "2e288f42-8544-8113-b878-ec99c8a02a6b"
VALID_ARCHETYPES = {
"Wirtschaftlicher Entscheider",
"Operativer Entscheider",
"Infrastruktur-Verantwortlicher",
"Innovations-Treiber"
}
if PERSONAS_DB_ID:
logger.info(f"Syncing Personas from {PERSONAS_DB_ID}...")
pages = query_all(PERSONAS_DB_ID)
p_count = 0
# We assume Personas are cumulative, so we don't delete all first (safer for IDs)
# But we could if we wanted a clean slate. Upsert is better.
for page in pages:
props = page["properties"]
name = extract_title(props.get("Name"))
if name not in VALID_ARCHETYPES:
continue
import json
pains_list = extract_rich_text_to_list(props.get("Pains"))
gains_list = extract_rich_text_to_list(props.get("Gains"))
persona = session.query(Persona).filter(Persona.name == name).first()
if not persona:
persona = Persona(name=name)
session.add(persona)
persona.pains = json.dumps(pains_list, ensure_ascii=False)
persona.gains = json.dumps(gains_list, ensure_ascii=False)
p_count += 1
session.commit()
logger.info(f"✅ Synced {p_count} Personas.")
# 2. Sync Categories (Products)
cat_db_id = find_db_id("Product Categories") or find_db_id("Products")
if cat_db_id:

View File

@@ -0,0 +1,47 @@
import sys
import os
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, JobRoleMapping, Persona
def test_mapping(job_title):
db = SessionLocal()
print(f"\n--- Testing Mapping for '{job_title}' ---")
# 1. Find Role Name via JobRoleMapping
role_name = None
mappings = db.query(JobRoleMapping).all()
for m in mappings:
pattern_clean = m.pattern.replace("%", "").lower()
if pattern_clean in job_title.lower():
role_name = m.role
print(f" -> Matched Pattern: '{m.pattern}' => Role: '{role_name}'")
break
if not role_name:
print(" -> No Pattern Matched.")
return
# 2. Find Persona via Role Name
persona = db.query(Persona).filter(Persona.name == role_name).first()
if persona:
print(f" -> Found Persona ID: {persona.id} (Name: {persona.name})")
else:
print(f" -> ERROR: Persona '{role_name}' not found in DB!")
db.close()
if __name__ == "__main__":
test_titles = [
"Leiter Hauswirtschaft",
"CTO",
"Geschäftsführer",
"Head of Marketing",
"Einkaufsleiter"
]
for t in test_titles:
test_mapping(t)

View File

@@ -0,0 +1,33 @@
import sys
import os
# Add parent directory to path to allow import of backend.database
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
# Import everything to ensure metadata is populated
from backend.database import engine, Base, Company, Contact, Industry, JobRoleMapping, Persona, Signal, EnrichmentData, RoboticsCategory, ImportLog, ReportedMistake, MarketingMatrix
def migrate():
print("Migrating Database Schema...")
try:
# Hacky migration for MarketingMatrix: Drop if exists to enforce new schema
with engine.connect() as con:
print("Dropping old MarketingMatrix table to enforce schema change...")
try:
from sqlalchemy import text
con.execute(text("DROP TABLE IF EXISTS marketing_matrix"))
print("Dropped marketing_matrix.")
except Exception as e:
print(f"Could not drop marketing_matrix: {e}")
except Exception as e:
print(f"Pre-migration cleanup error: {e}")
# This creates 'personas' table AND re-creates 'marketing_matrix'
Base.metadata.create_all(bind=engine)
print("Migration complete. 'personas' table created and 'marketing_matrix' refreshed.")
if __name__ == "__main__":
migrate()