[31188f42] einfügen

einfügen
This commit is contained in:
2026-02-24 06:47:35 +00:00
parent 45fef41a6a
commit 5603d42574
21 changed files with 1575 additions and 152 deletions

View File

@@ -32,11 +32,12 @@ setup_logging()
import logging
logger = logging.getLogger(__name__)
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRolePattern, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .services.deduplication import Deduplicator
from .services.discovery import DiscoveryService
from .services.scraping import ScraperService
from .services.classification import ClassificationService
from .services.role_mapping import RoleMappingService
# Initialize App
app = FastAPI(
@@ -119,6 +120,25 @@ class IndustryDetails(BaseModel):
class Config:
from_attributes = True
class MarketingMatrixUpdate(BaseModel):
subject: Optional[str] = None
intro: Optional[str] = None
social_proof: Optional[str] = None
class MarketingMatrixResponse(BaseModel):
id: int
industry_id: int
persona_id: int
industry_name: str
persona_name: str
subject: Optional[str] = None
intro: Optional[str] = None
social_proof: Optional[str] = None
updated_at: datetime
class Config:
from_attributes = True
class ContactResponse(BaseModel):
id: int
first_name: Optional[str] = None
@@ -314,23 +334,21 @@ def provision_superoffice_contact(
logger.info(f"Created new person {req.so_person_id} for company {company.name}")
# Update Job Title & Role logic
if req.job_title:
if req.job_title and req.job_title != person.job_title:
person.job_title = req.job_title
# Simple classification fallback
mappings = db.query(JobRoleMapping).all()
found_role = None
for m in mappings:
pattern_clean = m.pattern.replace("%", "").lower()
if pattern_clean in req.job_title.lower():
found_role = m.role
break
# New, service-based classification
role_mapping_service = RoleMappingService(db)
found_role = role_mapping_service.get_role_for_job_title(req.job_title)
# ALWAYS update role, even if to None, to avoid 'sticking' old roles
if found_role != person.role:
logger.info(f"Role Change for {person.so_person_id}: {person.role} -> {found_role}")
logger.info(f"Role Change for {person.so_person_id} via Mapping Service: {person.role} -> {found_role}")
person.role = found_role
if not found_role:
# If no role was found, we log it for future pattern mining
role_mapping_service.add_or_update_unclassified_title(req.job_title)
db.commit()
db.refresh(person)
@@ -429,6 +447,8 @@ def export_companies_csv(db: Session = Depends(get_db), username: str = Depends(
from fastapi.responses import StreamingResponse
output = io.StringIO()
# Add UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output)
# Header
@@ -567,7 +587,229 @@ def list_industries(db: Session = Depends(get_db), username: str = Depends(authe
@app.get("/api/job_roles")
def list_job_roles(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(JobRoleMapping).order_by(JobRoleMapping.pattern.asc()).all()
return db.query(JobRolePattern).order_by(JobRolePattern.priority.asc()).all()
# --- Marketing Matrix Endpoints ---
@app.get("/api/matrix", response_model=List[MarketingMatrixResponse])
def get_marketing_matrix(
industry_id: Optional[int] = Query(None),
persona_id: Optional[int] = Query(None),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
query = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
)
if industry_id:
query = query.filter(MarketingMatrix.industry_id == industry_id)
if persona_id:
query = query.filter(MarketingMatrix.persona_id == persona_id)
entries = query.all()
# Map to response model
return [
MarketingMatrixResponse(
id=e.id,
industry_id=e.industry_id,
persona_id=e.persona_id,
industry_name=e.industry.name if e.industry else "Unknown",
persona_name=e.persona.name if e.persona else "Unknown",
subject=e.subject,
intro=e.intro,
social_proof=e.social_proof,
updated_at=e.updated_at
) for e in entries
]
@app.get("/api/matrix/export")
def export_matrix_csv(
industry_id: Optional[int] = Query(None),
persona_id: Optional[int] = Query(None),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Exports a CSV of the marketing matrix, optionally filtered.
"""
import io
import csv
from fastapi.responses import StreamingResponse
query = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
)
if industry_id:
query = query.filter(MarketingMatrix.industry_id == industry_id)
if persona_id:
query = query.filter(MarketingMatrix.persona_id == persona_id)
entries = query.all()
output = io.StringIO()
# Add UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output)
# Header
writer.writerow([
"ID", "Industry", "Persona", "Subject", "Intro", "Social Proof", "Last Updated"
])
for e in entries:
writer.writerow([
e.id,
e.industry.name if e.industry else "Unknown",
e.persona.name if e.persona else "Unknown",
e.subject,
e.intro,
e.social_proof,
e.updated_at.strftime('%Y-%m-%d %H:%M:%S') if e.updated_at else "-"
])
output.seek(0)
filename = f"marketing_matrix_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
@app.put("/api/matrix/{entry_id}", response_model=MarketingMatrixResponse)
def update_matrix_entry(
entry_id: int,
data: MarketingMatrixUpdate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
entry = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
).filter(MarketingMatrix.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Matrix entry not found")
if data.subject is not None:
entry.subject = data.subject
if data.intro is not None:
entry.intro = data.intro
if data.social_proof is not None:
entry.social_proof = data.social_proof
entry.updated_at = datetime.utcnow()
db.commit()
db.refresh(entry)
return MarketingMatrixResponse(
id=entry.id,
industry_id=entry.industry_id,
persona_id=entry.persona_id,
industry_name=entry.industry.name if entry.industry else "Unknown",
persona_name=entry.persona.name if entry.persona else "Unknown",
subject=entry.subject,
intro=entry.intro,
social_proof=entry.social_proof,
updated_at=entry.updated_at
)
@app.get("/api/matrix/personas")
def list_personas(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(Persona).all()
class JobRolePatternCreate(BaseModel):
pattern_type: str
pattern_value: str
role: str
priority: int = 100
class JobRolePatternResponse(BaseModel):
id: int
pattern_type: str
pattern_value: str
role: str
priority: int
is_active: bool
created_by: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class ClassificationResponse(BaseModel):
status: str
processed: int
new_patterns: int
@app.post("/api/job_roles", response_model=JobRolePatternResponse)
def create_job_role(
job_role: JobRolePatternCreate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = JobRolePattern(
pattern_type=job_role.pattern_type,
pattern_value=job_role.pattern_value,
role=job_role.role,
priority=job_role.priority,
created_by="user"
)
db.add(db_job_role)
db.commit()
db.refresh(db_job_role)
return db_job_role
@app.put("/api/job_roles/{role_id}", response_model=JobRolePatternResponse)
def update_job_role(
role_id: int,
job_role: JobRolePatternCreate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = db.query(JobRolePattern).filter(JobRolePattern.id == role_id).first()
if not db_job_role:
raise HTTPException(status_code=404, detail="Job role not found")
db_job_role.pattern_type = job_role.pattern_type
db_job_role.pattern_value = job_role.pattern_value
db_job_role.role = job_role.role
db_job_role.priority = job_role.priority
db_job_role.updated_at = datetime.utcnow()
db.commit()
db.refresh(db_job_role)
return db_job_role
@app.delete("/api/job_roles/{role_id}")
def delete_job_role(
role_id: int,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = db.query(JobRolePattern).filter(JobRolePattern.id == role_id).first()
if not db_job_role:
raise HTTPException(status_code=404, detail="Job role not found")
db.delete(db_job_role)
db.commit()
return {"status": "deleted"}
@app.post("/api/job_roles/classify-batch", response_model=ClassificationResponse)
def classify_batch_job_roles(
background_tasks: BackgroundTasks,
username: str = Depends(authenticate_user)
):
"""
Triggers a background task to classify all unmapped job titles from the inbox.
"""
background_tasks.add_task(run_batch_classification_task)
return {"status": "queued", "processed": 0, "new_patterns": 0}
@app.get("/api/job_roles/raw")
def list_raw_job_titles(
@@ -947,6 +1189,66 @@ def run_analysis_task(company_id: int):
finally:
db.close()
def run_batch_classification_task():
from .database import SessionLocal
from .lib.core_utils import call_gemini_flash
import json
db = SessionLocal()
logger.info("--- [BACKGROUND TASK] Starting Batch Job Title Classification ---")
BATCH_SIZE = 50
try:
personas = db.query(Persona).all()
available_roles = [p.name for p in personas]
if not available_roles:
logger.error("No Personas found. Aborting classification task.")
return
unmapped_titles = db.query(RawJobTitle).filter(RawJobTitle.is_mapped == False).all()
if not unmapped_titles:
logger.info("No unmapped titles to process.")
return
logger.info(f"Found {len(unmapped_titles)} unmapped titles. Processing in batches of {BATCH_SIZE}.")
for i in range(0, len(unmapped_titles), BATCH_SIZE):
batch = unmapped_titles[i:i + BATCH_SIZE]
title_strings = [item.title for item in batch]
prompt = f'''You are an expert in B2B contact segmentation. Classify the following job titles into one of the provided roles: {', '.join(available_roles)}. Respond ONLY with a valid JSON object mapping the title to the role. Use "Influencer" as a fallback. Titles: {json.dumps(title_strings)}'''
response_text = ""
try:
response_text = call_gemini_flash(prompt, json_mode=True)
if response_text.strip().startswith("```json"):
response_text = response_text.strip()[7:-4]
classifications = json.loads(response_text)
except Exception as e:
logger.error(f"LLM response error for batch, skipping. Error: {e}. Response: {response_text}")
continue
new_patterns = 0
for title_obj in batch:
original_title = title_obj.title
assigned_role = classifications.get(original_title)
if assigned_role and assigned_role in available_roles:
if not db.query(JobRolePattern).filter(JobRolePattern.pattern_value == original_title).first():
db.add(JobRolePattern(pattern_type='exact', pattern_value=original_title, role=assigned_role, priority=90, created_by='llm_batch'))
new_patterns += 1
title_obj.is_mapped = True
db.commit()
logger.info(f"Batch {i//BATCH_SIZE + 1} complete. Created {new_patterns} new patterns.")
except Exception as e:
logger.critical(f"--- [BACKGROUND TASK] CRITICAL ERROR during classification ---", exc_info=True)
db.rollback()
finally:
db.close()
logger.info("--- [BACKGROUND TASK] Finished Batch Job Title Classification ---")
# --- Serve Frontend ---
static_path = "/frontend_static"
if not os.path.exists(static_path):