[31188f42] einfügen

einfügen
This commit is contained in:
2026-02-24 06:47:35 +00:00
parent 391ed60a19
commit 0c2c17df1e
21 changed files with 1575 additions and 152 deletions

View File

@@ -32,11 +32,12 @@ setup_logging()
import logging
logger = logging.getLogger(__name__)
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRolePattern, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .services.deduplication import Deduplicator
from .services.discovery import DiscoveryService
from .services.scraping import ScraperService
from .services.classification import ClassificationService
from .services.role_mapping import RoleMappingService
# Initialize App
app = FastAPI(
@@ -119,6 +120,25 @@ class IndustryDetails(BaseModel):
class Config:
from_attributes = True
class MarketingMatrixUpdate(BaseModel):
subject: Optional[str] = None
intro: Optional[str] = None
social_proof: Optional[str] = None
class MarketingMatrixResponse(BaseModel):
id: int
industry_id: int
persona_id: int
industry_name: str
persona_name: str
subject: Optional[str] = None
intro: Optional[str] = None
social_proof: Optional[str] = None
updated_at: datetime
class Config:
from_attributes = True
class ContactResponse(BaseModel):
id: int
first_name: Optional[str] = None
@@ -314,23 +334,21 @@ def provision_superoffice_contact(
logger.info(f"Created new person {req.so_person_id} for company {company.name}")
# Update Job Title & Role logic
if req.job_title:
if req.job_title and req.job_title != person.job_title:
person.job_title = req.job_title
# Simple classification fallback
mappings = db.query(JobRoleMapping).all()
found_role = None
for m in mappings:
pattern_clean = m.pattern.replace("%", "").lower()
if pattern_clean in req.job_title.lower():
found_role = m.role
break
# New, service-based classification
role_mapping_service = RoleMappingService(db)
found_role = role_mapping_service.get_role_for_job_title(req.job_title)
# ALWAYS update role, even if to None, to avoid 'sticking' old roles
if found_role != person.role:
logger.info(f"Role Change for {person.so_person_id}: {person.role} -> {found_role}")
logger.info(f"Role Change for {person.so_person_id} via Mapping Service: {person.role} -> {found_role}")
person.role = found_role
if not found_role:
# If no role was found, we log it for future pattern mining
role_mapping_service.add_or_update_unclassified_title(req.job_title)
db.commit()
db.refresh(person)
@@ -429,6 +447,8 @@ def export_companies_csv(db: Session = Depends(get_db), username: str = Depends(
from fastapi.responses import StreamingResponse
output = io.StringIO()
# Add UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output)
# Header
@@ -567,7 +587,229 @@ def list_industries(db: Session = Depends(get_db), username: str = Depends(authe
@app.get("/api/job_roles")
def list_job_roles(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(JobRoleMapping).order_by(JobRoleMapping.pattern.asc()).all()
return db.query(JobRolePattern).order_by(JobRolePattern.priority.asc()).all()
# --- Marketing Matrix Endpoints ---
@app.get("/api/matrix", response_model=List[MarketingMatrixResponse])
def get_marketing_matrix(
industry_id: Optional[int] = Query(None),
persona_id: Optional[int] = Query(None),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
query = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
)
if industry_id:
query = query.filter(MarketingMatrix.industry_id == industry_id)
if persona_id:
query = query.filter(MarketingMatrix.persona_id == persona_id)
entries = query.all()
# Map to response model
return [
MarketingMatrixResponse(
id=e.id,
industry_id=e.industry_id,
persona_id=e.persona_id,
industry_name=e.industry.name if e.industry else "Unknown",
persona_name=e.persona.name if e.persona else "Unknown",
subject=e.subject,
intro=e.intro,
social_proof=e.social_proof,
updated_at=e.updated_at
) for e in entries
]
@app.get("/api/matrix/export")
def export_matrix_csv(
industry_id: Optional[int] = Query(None),
persona_id: Optional[int] = Query(None),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Exports a CSV of the marketing matrix, optionally filtered.
"""
import io
import csv
from fastapi.responses import StreamingResponse
query = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
)
if industry_id:
query = query.filter(MarketingMatrix.industry_id == industry_id)
if persona_id:
query = query.filter(MarketingMatrix.persona_id == persona_id)
entries = query.all()
output = io.StringIO()
# Add UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output)
# Header
writer.writerow([
"ID", "Industry", "Persona", "Subject", "Intro", "Social Proof", "Last Updated"
])
for e in entries:
writer.writerow([
e.id,
e.industry.name if e.industry else "Unknown",
e.persona.name if e.persona else "Unknown",
e.subject,
e.intro,
e.social_proof,
e.updated_at.strftime('%Y-%m-%d %H:%M:%S') if e.updated_at else "-"
])
output.seek(0)
filename = f"marketing_matrix_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
@app.put("/api/matrix/{entry_id}", response_model=MarketingMatrixResponse)
def update_matrix_entry(
entry_id: int,
data: MarketingMatrixUpdate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
entry = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
).filter(MarketingMatrix.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Matrix entry not found")
if data.subject is not None:
entry.subject = data.subject
if data.intro is not None:
entry.intro = data.intro
if data.social_proof is not None:
entry.social_proof = data.social_proof
entry.updated_at = datetime.utcnow()
db.commit()
db.refresh(entry)
return MarketingMatrixResponse(
id=entry.id,
industry_id=entry.industry_id,
persona_id=entry.persona_id,
industry_name=entry.industry.name if entry.industry else "Unknown",
persona_name=entry.persona.name if entry.persona else "Unknown",
subject=entry.subject,
intro=entry.intro,
social_proof=entry.social_proof,
updated_at=entry.updated_at
)
@app.get("/api/matrix/personas")
def list_personas(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(Persona).all()
class JobRolePatternCreate(BaseModel):
pattern_type: str
pattern_value: str
role: str
priority: int = 100
class JobRolePatternResponse(BaseModel):
id: int
pattern_type: str
pattern_value: str
role: str
priority: int
is_active: bool
created_by: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class ClassificationResponse(BaseModel):
status: str
processed: int
new_patterns: int
@app.post("/api/job_roles", response_model=JobRolePatternResponse)
def create_job_role(
job_role: JobRolePatternCreate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = JobRolePattern(
pattern_type=job_role.pattern_type,
pattern_value=job_role.pattern_value,
role=job_role.role,
priority=job_role.priority,
created_by="user"
)
db.add(db_job_role)
db.commit()
db.refresh(db_job_role)
return db_job_role
@app.put("/api/job_roles/{role_id}", response_model=JobRolePatternResponse)
def update_job_role(
role_id: int,
job_role: JobRolePatternCreate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = db.query(JobRolePattern).filter(JobRolePattern.id == role_id).first()
if not db_job_role:
raise HTTPException(status_code=404, detail="Job role not found")
db_job_role.pattern_type = job_role.pattern_type
db_job_role.pattern_value = job_role.pattern_value
db_job_role.role = job_role.role
db_job_role.priority = job_role.priority
db_job_role.updated_at = datetime.utcnow()
db.commit()
db.refresh(db_job_role)
return db_job_role
@app.delete("/api/job_roles/{role_id}")
def delete_job_role(
role_id: int,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = db.query(JobRolePattern).filter(JobRolePattern.id == role_id).first()
if not db_job_role:
raise HTTPException(status_code=404, detail="Job role not found")
db.delete(db_job_role)
db.commit()
return {"status": "deleted"}
@app.post("/api/job_roles/classify-batch", response_model=ClassificationResponse)
def classify_batch_job_roles(
background_tasks: BackgroundTasks,
username: str = Depends(authenticate_user)
):
"""
Triggers a background task to classify all unmapped job titles from the inbox.
"""
background_tasks.add_task(run_batch_classification_task)
return {"status": "queued", "processed": 0, "new_patterns": 0}
@app.get("/api/job_roles/raw")
def list_raw_job_titles(
@@ -947,6 +1189,66 @@ def run_analysis_task(company_id: int):
finally:
db.close()
def run_batch_classification_task():
from .database import SessionLocal
from .lib.core_utils import call_gemini_flash
import json
db = SessionLocal()
logger.info("--- [BACKGROUND TASK] Starting Batch Job Title Classification ---")
BATCH_SIZE = 50
try:
personas = db.query(Persona).all()
available_roles = [p.name for p in personas]
if not available_roles:
logger.error("No Personas found. Aborting classification task.")
return
unmapped_titles = db.query(RawJobTitle).filter(RawJobTitle.is_mapped == False).all()
if not unmapped_titles:
logger.info("No unmapped titles to process.")
return
logger.info(f"Found {len(unmapped_titles)} unmapped titles. Processing in batches of {BATCH_SIZE}.")
for i in range(0, len(unmapped_titles), BATCH_SIZE):
batch = unmapped_titles[i:i + BATCH_SIZE]
title_strings = [item.title for item in batch]
prompt = f'''You are an expert in B2B contact segmentation. Classify the following job titles into one of the provided roles: {', '.join(available_roles)}. Respond ONLY with a valid JSON object mapping the title to the role. Use "Influencer" as a fallback. Titles: {json.dumps(title_strings)}'''
response_text = ""
try:
response_text = call_gemini_flash(prompt, json_mode=True)
if response_text.strip().startswith("```json"):
response_text = response_text.strip()[7:-4]
classifications = json.loads(response_text)
except Exception as e:
logger.error(f"LLM response error for batch, skipping. Error: {e}. Response: {response_text}")
continue
new_patterns = 0
for title_obj in batch:
original_title = title_obj.title
assigned_role = classifications.get(original_title)
if assigned_role and assigned_role in available_roles:
if not db.query(JobRolePattern).filter(JobRolePattern.pattern_value == original_title).first():
db.add(JobRolePattern(pattern_type='exact', pattern_value=original_title, role=assigned_role, priority=90, created_by='llm_batch'))
new_patterns += 1
title_obj.is_mapped = True
db.commit()
logger.info(f"Batch {i//BATCH_SIZE + 1} complete. Created {new_patterns} new patterns.")
except Exception as e:
logger.critical(f"--- [BACKGROUND TASK] CRITICAL ERROR during classification ---", exc_info=True)
db.rollback()
finally:
db.close()
logger.info("--- [BACKGROUND TASK] Finished Batch Job Title Classification ---")
# --- Serve Frontend ---
static_path = "/frontend_static"
if not os.path.exists(static_path):

View File

@@ -157,17 +157,24 @@ class Industry(Base):
created_at = Column(DateTime, default=datetime.utcnow)
class JobRoleMapping(Base):
class JobRolePattern(Base):
"""
Maps job title patterns (regex or simple string) to Roles.
Maps job title patterns (regex or exact string) to internal Roles.
"""
__tablename__ = "job_role_mappings"
__tablename__ = "job_role_patterns"
id = Column(Integer, primary_key=True, index=True)
pattern = Column(String, unique=True) # e.g. "%CTO%" or "Technischer Leiter"
role = Column(String) # The target Role
pattern_type = Column(String, default="exact", index=True) # 'exact' or 'regex'
pattern_value = Column(String, unique=True) # e.g. "Technischer Leiter" or "(?i)leiter.*technik"
role = Column(String, index=True) # The target Role, maps to Persona.name
priority = Column(Integer, default=100) # Lower number means higher priority
is_active = Column(Boolean, default=True)
created_by = Column(String, default="system") # 'system', 'user', 'llm'
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class RawJobTitle(Base):
"""
@@ -196,7 +203,7 @@ class Persona(Base):
__tablename__ = "personas"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True) # Matches the 'role' string in JobRoleMapping
name = Column(String, unique=True, index=True) # Matches the 'role' string in JobRolePattern
pains = Column(Text, nullable=True) # JSON list or multiline string
gains = Column(Text, nullable=True) # JSON list or multiline string

View File

@@ -5,14 +5,14 @@ import os
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, JobRoleMapping
from backend.database import SessionLocal, JobRolePattern
def check_mappings():
db = SessionLocal()
count = db.query(JobRoleMapping).count()
print(f"Total JobRoleMappings: {count}")
count = db.query(JobRolePattern).count()
print(f"Total JobRolePatterns: {count}")
examples = db.query(JobRoleMapping).limit(5).all()
examples = db.query(JobRolePattern).limit(5).all()
for ex in examples:
print(f" - {ex.pattern} -> {ex.role}")

View File

@@ -0,0 +1,171 @@
import sys
import os
import argparse
import json
import logging
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from datetime import datetime
# --- Standalone Configuration ---
# Add the project root to the Python path to find the LLM utility
sys.path.insert(0, '/app')
from company_explorer.backend.lib.core_utils import call_gemini_flash
DATABASE_URL = "sqlite:////app/companies_v3_fixed_2.db"
LOG_FILE = "/app/Log_from_docker/batch_classifier.log"
BATCH_SIZE = 50 # Number of titles to process in one LLM call
# --- Logging Setup ---
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- SQLAlchemy Models (self-contained) ---
Base = declarative_base()
class RawJobTitle(Base):
__tablename__ = 'raw_job_titles'
id = Column(Integer, primary_key=True)
title = Column(String, unique=True, index=True)
count = Column(Integer, default=1)
source = Column(String)
is_mapped = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class JobRolePattern(Base):
__tablename__ = "job_role_patterns"
id = Column(Integer, primary_key=True, index=True)
pattern_type = Column(String, default="exact", index=True)
pattern_value = Column(String, unique=True)
role = Column(String, index=True)
priority = Column(Integer, default=100)
is_active = Column(Boolean, default=True)
created_by = Column(String, default="system")
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Persona(Base):
__tablename__ = "personas"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True)
pains = Column(String)
gains = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# --- Database Connection ---
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def build_classification_prompt(titles_to_classify, available_roles):
"""Builds the prompt for the LLM to classify a batch of job titles."""
prompt = f"""
You are an expert in B2B contact segmentation. Your task is to classify a list of job titles into predefined roles.
Analyze the following list of job titles and assign each one to the most appropriate role from the list provided.
The available roles are:
- {', '.join(available_roles)}
RULES:
1. Respond ONLY with a valid JSON object. Do not include any text, explanations, or markdown code fences before or after the JSON.
2. The JSON object should have the original job title as the key and the assigned role as the value.
3. If a job title is ambiguous or you cannot confidently classify it, assign the value "Influencer". Use this as a fallback.
4. Do not invent new roles. Only use the roles from the provided list.
Here are the job titles to classify:
{json.dumps(titles_to_classify, indent=2)}
Your JSON response:
"""
return prompt
def classify_and_store_titles():
db = SessionLocal()
try:
# 1. Fetch available persona names (roles)
personas = db.query(Persona).all()
available_roles = [p.name for p in personas]
if not available_roles:
logger.error("No Personas/Roles found in the database. Cannot classify. Please seed personas first.")
return
logger.info(f"Classifying based on these roles: {available_roles}")
# 2. Fetch unmapped titles
unmapped_titles = db.query(RawJobTitle).filter(RawJobTitle.is_mapped == False).all()
if not unmapped_titles:
logger.info("No unmapped job titles found. Nothing to do.")
return
logger.info(f"Found {len(unmapped_titles)} unmapped job titles to process.")
# 3. Process in batches
for i in range(0, len(unmapped_titles), BATCH_SIZE):
batch = unmapped_titles[i:i + BATCH_SIZE]
title_strings = [item.title for item in batch]
logger.info(f"Processing batch {i//BATCH_SIZE + 1} of { (len(unmapped_titles) + BATCH_SIZE - 1) // BATCH_SIZE } with {len(title_strings)} titles...")
# 4. Call LLM
prompt = build_classification_prompt(title_strings, available_roles)
response_text = ""
try:
response_text = call_gemini_flash(prompt, json_mode=True)
# Clean potential markdown fences
if response_text.strip().startswith("```json"):
response_text = response_text.strip()[7:-4]
classifications = json.loads(response_text)
except Exception as e:
logger.error(f"Failed to get or parse LLM response for batch. Skipping. Error: {e}")
logger.error(f"Raw response was: {response_text}")
continue
# 5. Process results
new_patterns = 0
for title_obj in batch:
original_title = title_obj.title
assigned_role = classifications.get(original_title)
if assigned_role and assigned_role in available_roles:
exists = db.query(JobRolePattern).filter(JobRolePattern.pattern_value == original_title).first()
if not exists:
new_pattern = JobRolePattern(
pattern_type='exact',
pattern_value=original_title,
role=assigned_role,
priority=90,
created_by='llm_batch'
)
db.add(new_pattern)
new_patterns += 1
title_obj.is_mapped = True
else:
logger.warning(f"Could not classify '{original_title}' or role '{assigned_role}' is invalid. It will be re-processed later.")
db.commit()
logger.info(f"Batch {i//BATCH_SIZE + 1} complete. Created {new_patterns} new mapping patterns.")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
db.rollback()
finally:
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Batch classify unmapped job titles using an LLM.")
args = parser.parse_args()
logger.info("--- Starting Batch Classification Script ---")
classify_and_store_titles()
logger.info("--- Batch Classification Script Finished ---")

View File

@@ -1,95 +1,66 @@
import sys
import os
import csv
from collections import Counter
import argparse
from datetime import datetime
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
# Add the 'backend' directory to the path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from backend.database import SessionLocal, RawJobTitle, init_db, engine, Base
from database import SessionLocal, RawJobTitle
from lib.logging_setup import setup_logging
import logging
def import_titles(file_path: str, delimiter: str = ';'):
print(f"🚀 Starting Import from {file_path}...")
# Ensure Table Exists
RawJobTitle.__table__.create(bind=engine, checkfirst=True)
setup_logging()
logger = logging.getLogger(__name__)
def import_job_titles_from_csv(file_path: str):
db = SessionLocal()
total_rows = 0
new_titles = 0
updated_titles = 0
try:
with open(file_path, 'r', encoding='utf-8-sig') as f: # utf-8-sig handles BOM from Excel
# Try to detect header
sample = f.read(1024)
has_header = csv.Sniffer().has_header(sample)
f.seek(0)
reader = csv.reader(f, delimiter=delimiter)
if has_header:
headers = next(reader)
print(f" Header detected: {headers}")
# Try to find the right column index
col_idx = 0
for i, h in enumerate(headers):
if h.lower() in ['funktion', 'jobtitle', 'title', 'position', 'rolle']:
col_idx = i
print(f" -> Using column '{h}' (Index {i})")
break
else:
col_idx = 0
print(" No header detected, using first column.")
logger.info(f"Starting import of job titles from {file_path}")
# Use Counter to get frequencies directly from the CSV
job_title_counts = Counter()
total_rows = 0
# Process Rows
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
# Assuming the CSV contains only job titles, one per row
for row in reader:
if not row: continue
if len(row) <= col_idx: continue
raw_title = row[col_idx].strip()
if not raw_title: continue # Skip empty
total_rows += 1
# Check existance
existing = db.query(RawJobTitle).filter(RawJobTitle.title == raw_title).first()
if existing:
existing.count += 1
existing.updated_at = datetime.utcnow()
updated_titles += 1
else:
db.add(RawJobTitle(title=raw_title, count=1))
new_titles += 1
if total_rows % 100 == 0:
db.commit()
print(f" Processed {total_rows} rows...", end='\r')
if row and row[0].strip():
title = row[0].strip()
job_title_counts[title] += 1
total_rows += 1
logger.info(f"Read {total_rows} total job title entries. Found {len(job_title_counts)} unique titles.")
added_count = 0
updated_count = 0
for title, count in job_title_counts.items():
existing_title = db.query(RawJobTitle).filter(RawJobTitle.title == title).first()
if existing_title:
if existing_title.count != count:
existing_title.count = count
updated_count += 1
# If it exists and count is the same, do nothing.
else:
new_title = RawJobTitle(title=title, count=count, source="csv_import", is_mapped=False)
db.add(new_title)
added_count += 1
db.commit()
logger.info(f"Import complete. Added {added_count} new unique titles, updated {updated_count} existing titles.")
db.commit()
except Exception as e:
print(f"\n❌ Error: {e}")
logger.error(f"Error during job title import: {e}", exc_info=True)
db.rollback()
finally:
db.close()
print(f"\n✅ Import Complete.")
print(f" Total Processed: {total_rows}")
print(f" New Unique Titles: {new_titles}")
print(f" Updated Frequencies: {updated_titles}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Import Job Titles from CSV")
parser.add_argument("file", help="Path to CSV file")
parser.add_argument("--delimiter", default=";", help="CSV Delimiter (default: ';')")
parser = argparse.ArgumentParser(description="Import job titles from a CSV file into the RawJobTitle database table.")
parser.add_argument("file_path", type=str, help="Path to the CSV file containing job titles.")
args = parser.parse_args()
if not os.path.exists(args.file):
print(f"❌ File not found: {args.file}")
sys.exit(1)
import_titles(args.file, args.delimiter)
import_job_titles_from_csv(args.file_path)

View File

@@ -4,7 +4,7 @@ import json
# Setup Environment to import backend modules
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, Persona, JobRoleMapping
from backend.database import SessionLocal, Persona, JobRolePattern
def seed_archetypes():
db = SessionLocal()
@@ -87,33 +87,41 @@ def seed_archetypes():
db.commit()
# --- 2. Update JobRoleMappings to map to Archetypes ---
# --- 2. Update JobRolePatterns to map to Archetypes ---
# We map the patterns to the new 4 Archetypes
mapping_updates = [
# Wirtschaftlicher Entscheider
{"role": "Wirtschaftlicher Entscheider", "patterns": ["%geschäftsführer%", "%ceo%", "%director%", "%einkauf%", "%procurement%", "%finance%", "%cfo%"]},
{"role": "Wirtschaftlicher Entscheider", "patterns": ["geschäftsführer", "ceo", "director", "einkauf", "procurement", "finance", "cfo"]},
# Operativer Entscheider
{"role": "Operativer Entscheider", "patterns": ["%housekeeping%", "%hausdame%", "%hauswirtschaft%", "%reinigung%", "%restaurant%", "%f&b%", "%werksleiter%", "%produktionsleiter%", "%lager%", "%logistik%", "%operations%", "%coo%"]},
{"role": "Operativer Entscheider", "patterns": ["housekeeping", "hausdame", "hauswirtschaft", "reinigung", "restaurant", "f&b", "werksleiter", "produktionsleiter", "lager", "logistik", "operations", "coo"]},
# Infrastruktur-Verantwortlicher
{"role": "Infrastruktur-Verantwortlicher", "patterns": ["%facility%", "%technik%", "%instandhaltung%", "%it-leiter%", "%cto%", "%admin%", "%building%"]},
{"role": "Infrastruktur-Verantwortlicher", "patterns": ["facility", "technik", "instandhaltung", "it-leiter", "cto", "admin", "building"]},
# Innovations-Treiber
{"role": "Innovations-Treiber", "patterns": ["%innovation%", "%digital%", "%transformation%", "%business dev%", "%marketing%"]}
{"role": "Innovations-Treiber", "patterns": ["innovation", "digital", "transformation", "business dev", "marketing"]}
]
# Clear old mappings to prevent confusion
db.query(JobRoleMapping).delete()
db.query(JobRolePattern).delete()
db.commit()
print("Cleared old JobRoleMappings.")
print("Cleared old JobRolePatterns.")
for group in mapping_updates:
role_name = group["role"]
for pattern in group["patterns"]:
print(f"Mapping '{pattern}' -> '{role_name}'")
db.add(JobRoleMapping(pattern=pattern, role=role_name))
for pattern_text in group["patterns"]:
print(f"Mapping '{pattern_text}' -> '{role_name}'")
# All seeded patterns are regex contains checks
new_pattern = JobRolePattern(
pattern_type='regex',
pattern_value=pattern_text, # Stored without wildcards
role=role_name,
priority=100, # Default priority for seeded patterns
created_by='system'
)
db.add(new_pattern)
db.commit()
print("Archetypes and Mappings Seeded Successfully.")

View File

@@ -5,15 +5,15 @@ import os
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, JobRoleMapping, Persona
from backend.database import SessionLocal, JobRolePattern, Persona
def test_mapping(job_title):
db = SessionLocal()
print(f"\n--- Testing Mapping for '{job_title}' ---")
# 1. Find Role Name via JobRoleMapping
# 1. Find Role Name via JobRolePattern
role_name = None
mappings = db.query(JobRoleMapping).all()
mappings = db.query(JobRolePattern).all()
for m in mappings:
pattern_clean = m.pattern.replace("%", "").lower()
if pattern_clean in job_title.lower():

View File

@@ -6,7 +6,7 @@ import os
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
# Import everything to ensure metadata is populated
from backend.database import engine, Base, Company, Contact, Industry, JobRoleMapping, Persona, Signal, EnrichmentData, RoboticsCategory, ImportLog, ReportedMistake, MarketingMatrix
from backend.database import engine, Base, Company, Contact, Industry, JobRolePattern, Persona, Signal, EnrichmentData, RoboticsCategory, ImportLog, ReportedMistake, MarketingMatrix
def migrate():
print("Migrating Database Schema...")

View File

@@ -7,10 +7,10 @@ from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session, joinedload
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
from backend.services.scraping import scrape_website_content
from backend.lib.metric_parser import MetricParser
from ..database import Company, Industry, RoboticsCategory, EnrichmentData
from ..lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
from .scraping import scrape_website_content
from ..lib.metric_parser import MetricParser
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,63 @@
import logging
import re
from sqlalchemy.orm import Session
from typing import Optional
from ..database import JobRolePattern, RawJobTitle, Persona, Contact
logger = logging.getLogger(__name__)
class RoleMappingService:
def __init__(self, db: Session):
self.db = db
def get_role_for_job_title(self, job_title: str) -> Optional[str]:
"""
Finds the corresponding role for a given job title using a multi-step process.
1. Check for exact matches.
2. Evaluate regex patterns.
"""
if not job_title:
return None
# Normalize job title for matching
normalized_title = job_title.lower().strip()
# 1. Fetch all active patterns from the database, ordered by priority
patterns = self.db.query(JobRolePattern).filter(
JobRolePattern.is_active == True
).order_by(JobRolePattern.priority.asc()).all()
# 2. Separate patterns for easier processing
exact_patterns = {p.pattern_value.lower(): p.role for p in patterns if p.pattern_type == 'exact'}
regex_patterns = [(p.pattern_value, p.role) for p in patterns if p.pattern_type == 'regex']
# 3. Check for exact match first (most efficient)
if normalized_title in exact_patterns:
return exact_patterns[normalized_title]
# 4. Evaluate regex patterns
for pattern, role in regex_patterns:
try:
if re.search(pattern, job_title, re.IGNORECASE):
return role
except re.error as e:
logger.error(f"Invalid regex for role '{role}': {pattern}. Error: {e}")
continue
return None
def add_or_update_unclassified_title(self, job_title: str):
"""
Logs an unclassified job title or increments its count if already present.
"""
if not job_title:
return
entry = self.db.query(RawJobTitle).filter(RawJobTitle.title == job_title).first()
if entry:
entry.count += 1
else:
entry = RawJobTitle(title=job_title, count=1)
self.db.add(entry)
self.db.commit()