Docs: Aktualisierung der Dokumentation für Task [2ea88f42]

This commit is contained in:
2026-03-04 15:14:11 +00:00
parent 6b89c68edc
commit fdca0e5f54
6 changed files with 858 additions and 17 deletions

View File

@@ -1,4 +1,4 @@
from fastapi import FastAPI, Depends, HTTPException, Query, BackgroundTasks
from fastapi import FastAPI, Depends, HTTPException, Query, BackgroundTasks, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
@@ -9,6 +9,9 @@ from datetime import datetime
import os
import sys
import uuid
import shutil
import re
from collections import Counter
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
@@ -39,6 +42,7 @@ from .services.discovery import DiscoveryService
from .services.scraping import ScraperService
from .services.classification import ClassificationService
from .services.role_mapping import RoleMappingService
from .services.optimization import PatternOptimizationService
# Initialize App
app = FastAPI(
@@ -60,6 +64,14 @@ scraper = ScraperService()
classifier = ClassificationService() # Now works without args
discovery = DiscoveryService()
# Global State for Long-Running Optimization Task
optimization_status = {
"state": "idle", # idle, processing, completed, error
"progress": 0,
"result": None,
"error": None
}
# --- Pydantic Models ---
class CompanyCreate(BaseModel):
name: str
@@ -898,6 +910,96 @@ class ClassificationResponse(BaseModel):
processed: int
new_patterns: int
class OptimizationProposal(BaseModel):
target_role: str
regex: str
explanation: str
priority: int
covered_pattern_ids: List[int]
covered_titles: List[str]
false_positives: List[str]
class ApplyOptimizationRequest(BaseModel):
target_role: str
regex: str
priority: int
ids_to_delete: List[int]
def run_optimization_task():
global optimization_status
optimization_status["state"] = "processing"
optimization_status["result"] = None
optimization_status["error"] = None
from .database import SessionLocal
db = SessionLocal()
try:
optimizer = PatternOptimizationService(db)
proposals = optimizer.generate_proposals()
optimization_status["result"] = proposals
optimization_status["state"] = "completed"
except Exception as e:
logger.error(f"Optimization task failed: {e}", exc_info=True)
optimization_status["state"] = "error"
optimization_status["error"] = str(e)
finally:
db.close()
@app.post("/api/job_roles/optimize-start")
def start_pattern_optimization(
background_tasks: BackgroundTasks,
username: str = Depends(authenticate_user)
):
"""
Starts the optimization analysis in the background.
"""
global optimization_status
if optimization_status["state"] == "processing":
return {"status": "already_running"}
background_tasks.add_task(run_optimization_task)
return {"status": "started"}
@app.get("/api/job_roles/optimize-status")
def get_pattern_optimization_status(
username: str = Depends(authenticate_user)
):
"""
Poll this endpoint to get the result of the optimization.
"""
return optimization_status
@app.post("/api/job_roles/apply-optimization")
def apply_pattern_optimization(
req: ApplyOptimizationRequest,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Applies a proposal: Creates the new regex and deletes the obsolete exact patterns.
"""
# 1. Create new Regex Pattern
# Check duplicate first
existing = db.query(JobRolePattern).filter(JobRolePattern.pattern_value == req.regex).first()
if not existing:
new_pattern = JobRolePattern(
pattern_type="regex",
pattern_value=req.regex,
role=req.target_role,
priority=req.priority,
created_by="optimizer"
)
db.add(new_pattern)
logger.info(f"Optimization: Created new regex {req.regex} for {req.target_role}")
# 2. Delete covered Exact Patterns
if req.ids_to_delete:
db.query(JobRolePattern).filter(JobRolePattern.id.in_(req.ids_to_delete)).delete(synchronize_session=False)
logger.info(f"Optimization: Deleted {len(req.ids_to_delete)} obsolete patterns.")
db.commit()
return {"status": "success", "message": f"Created regex and removed {len(req.ids_to_delete)} old patterns."}
@app.post("/api/job_roles", response_model=JobRolePatternResponse)
def create_job_role(
job_role: JobRolePatternCreate,
@@ -977,6 +1079,34 @@ def list_raw_job_titles(
return query.order_by(RawJobTitle.count.desc()).limit(limit).all()
@app.get("/api/job_roles/suggestions")
def get_job_role_suggestions(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
"""
Analyzes existing contacts to suggest regex patterns based on frequent keywords per role.
"""
contacts = db.query(Contact).filter(Contact.role != None, Contact.job_title != None).all()
role_groups = {}
for c in contacts:
if c.role not in role_groups:
role_groups[c.role] = []
role_groups[c.role].append(c.job_title)
suggestions = {}
for role, titles in role_groups.items():
all_tokens = []
for t in titles:
# Simple cleaning: keep alphanum, lower
cleaned = re.sub(r'[^\w\s]', ' ', t).lower()
tokens = [w for w in cleaned.split() if len(w) > 3] # Ignore short words
all_tokens.extend(tokens)
common = Counter(all_tokens).most_common(10)
suggestions[role] = [{"word": w, "count": c} for w, c in common]
return suggestions
@app.get("/api/mistakes")
def list_reported_mistakes(
status: Optional[str] = Query(None),
@@ -1024,6 +1154,87 @@ def update_reported_mistake_status(
logger.info(f"Updated status for mistake {mistake_id} to {mistake.status}")
return {"status": "success", "mistake": mistake}
# --- Database Management ---
@app.get("/api/admin/database/download")
def download_database(username: str = Depends(authenticate_user)):
"""
Downloads the current SQLite database file.
"""
db_path = "/app/companies_v3_fixed_2.db"
if not os.path.exists(db_path):
raise HTTPException(status_code=404, detail="Database file not found")
filename = f"companies_backup_{datetime.utcnow().strftime('%Y-%m-%d_%H-%M')}.db"
return FileResponse(db_path, media_type="application/octet-stream", filename=filename)
@app.post("/api/admin/database/upload")
async def upload_database(
file: UploadFile = File(...),
username: str = Depends(authenticate_user)
):
"""
Uploads and replaces the SQLite database file. Creating a backup first.
"""
db_path = "/app/companies_v3_fixed_2.db"
backup_path = f"{db_path}.bak.{datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S')}"
try:
# Create Backup
if os.path.exists(db_path):
shutil.copy2(db_path, backup_path)
logger.info(f"Created database backup at {backup_path}")
# Save new file
with open(db_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
logger.info(f"Database replaced via upload by user {username}")
return {"status": "success", "message": "Database uploaded successfully. Please restart the container to apply changes."}
except Exception as e:
logger.error(f"Database upload failed: {e}", exc_info=True)
# Try to restore backup if something went wrong during write
if os.path.exists(backup_path):
shutil.copy2(backup_path, db_path)
logger.warning("Restored database from backup due to upload failure.")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
# --- Regex Testing ---
class RegexTestRequest(BaseModel):
pattern: str
pattern_type: str = "regex" # regex, exact, startswith
test_string: str
@app.post("/api/job_roles/test-pattern")
def test_job_role_pattern(req: RegexTestRequest, username: str = Depends(authenticate_user)):
"""
Tests if a given pattern matches a test string.
"""
try:
is_match = False
normalized_test = req.test_string.lower().strip()
pattern = req.pattern.lower().strip()
if req.pattern_type == "regex":
if re.search(pattern, normalized_test, re.IGNORECASE):
is_match = True
elif req.pattern_type == "exact":
if pattern == normalized_test:
is_match = True
elif req.pattern_type == "startswith":
if normalized_test.startswith(pattern):
is_match = True
return {"match": is_match}
except re.error as e:
return {"match": False, "error": f"Invalid Regex: {str(e)}"}
except Exception as e:
logger.error(f"Pattern test error: {e}")
return {"match": False, "error": str(e)}
@app.post("/api/enrich/discover")
def discover_company(req: AnalysisRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == req.company_id).first()

View File

@@ -0,0 +1,82 @@
import sys
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from collections import Counter
import re
# Add backend to path to import models
sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
from backend.config import settings
from backend.database import Contact, JobRolePattern
def clean_text(text):
if not text: return ""
# Keep only alphanumeric and spaces
text = re.sub(r'[^\w\s]', ' ', text)
return text.lower().strip()
def get_ngrams(tokens, n):
if len(tokens) < n:
return []
return [" ".join(tokens[i:i+n]) for i in range(len(tokens)-n+1)]
def analyze_patterns():
print(f"Connecting to database: {settings.DATABASE_URL}")
engine = create_engine(settings.DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Fetch all contacts with a role
contacts = session.query(Contact).filter(Contact.role != None, Contact.job_title != None).all()
print(f"Found {len(contacts)} classified contacts to analyze.")
role_groups = {}
for c in contacts:
if c.role not in role_groups:
role_groups[c.role] = []
role_groups[c.role].append(c.job_title)
print("\n" + "="*60)
print(" JOB TITLE PATTERN ANALYSIS REPORT")
print("="*60 + "\n")
for role, titles in role_groups.items():
print(f"--- ROLE: {role} ({len(titles)} samples) ---")
# Tokenize all titles
all_tokens = []
all_bigrams = []
for t in titles:
cleaned = clean_text(t)
tokens = [w for w in cleaned.split() if len(w) > 2] # Ignore short words
all_tokens.extend(tokens)
all_bigrams.extend(get_ngrams(tokens, 2))
# Analyze frequencies
common_words = Counter(all_tokens).most_common(15)
common_bigrams = Counter(all_bigrams).most_common(10)
print("Top Keywords:")
for word, count in common_words:
print(f" - {word}: {count}")
print("\nTop Bigrams (Word Pairs):")
for bg, count in common_bigrams:
print(f" - \"{bg}\": {count}")
print("\nSuggested Regex Components:")
top_5_words = [w[0] for w in common_words[:5]]
print(f" ({ '|'.join(top_5_words) })")
print("\n" + "-"*30 + "\n")
except Exception as e:
print(f"Error: {e}")
finally:
session.close()
if __name__ == "__main__":
analyze_patterns()

View File

@@ -0,0 +1,157 @@
from sqlalchemy.orm import Session
from ..database import JobRolePattern, Persona
from ..lib.core_utils import call_gemini_flash
import json
import logging
import re
import ast
logger = logging.getLogger(__name__)
class PatternOptimizationService:
def __init__(self, db: Session):
self.db = db
def generate_proposals(self):
"""
Analyzes existing EXACT patterns and proposes consolidated REGEX patterns.
"""
# ... (Fetch Data logic remains)
# 1. Fetch Data
patterns = self.db.query(JobRolePattern).filter(JobRolePattern.pattern_type == "exact").all()
# Group by Role
roles_data = {}
pattern_map = {}
for p in patterns:
if p.role not in roles_data:
roles_data[p.role] = []
roles_data[p.role].append(p.pattern_value)
pattern_map[p.pattern_value] = p.id
if not roles_data:
return []
proposals = []
# 2. Analyze each role
for target_role in roles_data.keys():
target_titles = roles_data[target_role]
if len(target_titles) < 3:
continue
negative_examples = []
for other_role, titles in roles_data.items():
if other_role != target_role:
negative_examples.extend(titles[:50])
# 3. Build Prompt
prompt = f"""
Act as a Regex Optimization Engine for B2B Job Titles.
GOAL: Break down the list of 'Positive Examples' into logical CLUSTERS and create a Regex for each cluster.
TARGET ROLE: "{target_role}"
TITLES TO COVER (Positive Examples):
{json.dumps(target_titles)}
TITLES TO AVOID (Negative Examples - DO NOT MATCH THESE):
{json.dumps(negative_examples[:150])}
INSTRUCTIONS:
1. Analyze the 'Positive Examples'. Do NOT try to create one single regex for all of them.
2. Identify distinct semantic groups.
3. Create a Regex for EACH group.
4. CRITICAL - CONFLICT HANDLING:
- The Regex must NOT match the 'Negative Examples'.
- Use Negative Lookahead (e.g. ^(?=.*Manager)(?!.*Facility).*) if needed.
5. Aggressiveness: Be bold.
OUTPUT FORMAT:
Return a valid Python List of Dictionaries.
Example:
[
{{
"regex": r"(?i).*pattern.*",
"explanation": "Explanation...",
"suggested_priority": 50
}}
]
Enclose regex patterns in r"..." strings to handle backslashes correctly.
"""
try:
logger.info(f"Optimizing patterns for role: {target_role} (Positive: {len(target_titles)})")
response = call_gemini_flash(prompt) # Removed json_mode=True to allow Python syntax
# Cleanup markdown
clean_text = response.strip()
if clean_text.startswith("```python"):
clean_text = clean_text[9:-3]
elif clean_text.startswith("```json"):
clean_text = clean_text[7:-3]
elif clean_text.startswith("```"):
clean_text = clean_text[3:-3]
clean_text = clean_text.strip()
ai_suggestions = []
try:
# First try standard JSON
ai_suggestions = json.loads(clean_text)
except json.JSONDecodeError:
try:
# Fallback: Python AST Literal Eval (handles r"..." strings)
ai_suggestions = ast.literal_eval(clean_text)
except Exception as e:
logger.error(f"Failed to parse response for {target_role} with JSON and AST. Error: {e}")
continue
# Verify and map back IDs
for sugg in ai_suggestions:
try:
regex_str = sugg.get('regex')
if not regex_str: continue
# Python AST already handles r"..." decoding, so regex_str is the raw pattern
regex = re.compile(regex_str)
# Calculate coverage locally
covered_ids = []
covered_titles_verified = []
for t in target_titles:
if regex.search(t):
if t in pattern_map:
covered_ids.append(pattern_map[t])
covered_titles_verified.append(t)
# Calculate False Positives
false_positives = []
for t in negative_examples:
if regex.search(t):
false_positives.append(t)
if len(covered_ids) >= 2 and len(false_positives) == 0:
proposals.append({
"target_role": target_role,
"regex": regex_str,
"explanation": sugg.get('explanation', 'No explanation provided'),
"priority": sugg.get('suggested_priority', 50),
"covered_pattern_ids": covered_ids,
"covered_titles": covered_titles_verified,
"false_positives": false_positives
})
except re.error:
logger.warning(f"AI generated invalid regex: {sugg.get('regex')}")
continue
except Exception as e:
logger.error(f"Error optimizing patterns for {target_role}: {e}", exc_info=True)
continue
logger.info(f"Optimization complete. Generated {len(proposals)} proposals.")
return proposals