[30388f42] Infrastructure Hardening: Repaired CE/Connector DB schema, fixed frontend styling build, implemented robust echo shield in worker v2.1.1, and integrated Lead Engine into gateway.

This commit is contained in:
2026-03-07 14:08:42 +00:00
parent efcaa57cf0
commit ae2303b733
404 changed files with 24100 additions and 13301 deletions

View File

@@ -5,12 +5,12 @@ import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
from backend.services.scraping import scrape_website_content
from backend.lib.metric_parser import MetricParser
from ..database import Company, Industry, RoboticsCategory, EnrichmentData
from ..lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
from .scraping import scrape_website_content
from ..lib.metric_parser import MetricParser
logger = logging.getLogger(__name__)
@@ -19,9 +19,12 @@ class ClassificationService:
pass
def _load_industry_definitions(self, db: Session) -> List[Industry]:
industries = db.query(Industry).all()
industries = db.query(Industry).options(
joinedload(Industry.primary_category),
joinedload(Industry.secondary_category)
).all()
if not industries:
logger.warning("No industry definitions found in DB. Classification might be limited.")
logger.warning("No industry definitions found in DB.")
return industries
def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[Dict[str, Any]]:
@@ -49,18 +52,11 @@ Return ONLY the exact name of the industry.
try:
response = call_gemini_flash(prompt)
if not response: return "Others"
cleaned = response.strip().replace('"', '').replace("'", "")
# Simple fuzzy match check
valid_names = [i['name'] for i in industry_definitions] + ["Others"]
if cleaned in valid_names:
return cleaned
# Fallback: Try to find name in response
if cleaned in valid_names: return cleaned
for name in valid_names:
if name in cleaned:
return name
if name in cleaned: return name
return "Others"
except Exception as e:
logger.error(f"Classification Prompt Error: {e}")
@@ -75,7 +71,7 @@ Source Text:
{text_content[:6000]}
Return a JSON object with:
- "raw_value": The number found (e.g. 352 or 352.0). If text says "352 Betten", extract 352. If not found, null.
- "raw_value": The number found (e.g. 352 or 352.0). If not found, null.
- "raw_unit": The unit found (e.g. "Betten", "").
- "proof_text": A short quote from the text proving this value.
@@ -84,16 +80,15 @@ JSON ONLY.
try:
response = call_gemini_flash(prompt, json_mode=True)
if not response: return None
if isinstance(response, str):
response = response.replace("```json", "").replace("```", "").strip()
data = json.loads(response)
try:
data = json.loads(response.replace("```json", "").replace("```", "").strip())
except: return None
else:
data = response
# Basic cleanup
if isinstance(data, list) and data: data = data[0]
if not isinstance(data, dict): return None
if data.get("raw_value") == "null": data["raw_value"] = None
return data
except Exception as e:
logger.error(f"LLM Extraction Parse Error: {e}")
@@ -101,38 +96,37 @@ JSON ONLY.
def _is_metric_plausible(self, metric_name: str, value: Optional[float]) -> bool:
if value is None: return False
try:
val_float = float(value)
return val_float > 0
except:
return False
try: return float(value) > 0
except: return False
def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]:
if not formula or raw_value is None:
return None
formula_cleaned = formula.replace("wert", str(raw_value)).replace("Value", str(raw_value)).replace("Wert", str(raw_value))
formula_cleaned = re.sub(r'(?i)m[²2]', '', formula_cleaned)
formula_cleaned = re.sub(r'(?i)qm', '', formula_cleaned)
formula_cleaned = re.sub(r'\s*\(.*\)\s*$', '', formula_cleaned).strip()
if not formula or raw_value is None: return None
# Clean formula: remove anything in parentheses first (often units or comments)
clean_formula = re.sub(r'\(.*?\)', '', formula.lower())
# Replace 'wert' with the actual value
expression = clean_formula.replace("wert", str(raw_value))
# Remove any non-math characters
expression = re.sub(r'[^0-9\.\+\-\*\/]', '', expression)
try:
return safe_eval_math(formula_cleaned)
return safe_eval_math(expression)
except Exception as e:
logger.error(f"Failed to parse standardization logic '{formula}' with value {raw_value}: {e}")
logger.error(f"Failed to parse logic '{formula}' with value {raw_value}: {e}")
return None
def _get_best_metric_result(self, results_list: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not results_list:
return None
if not results_list: return None
source_priority = {"wikipedia": 0, "website": 1, "serpapi": 2}
valid_results = [r for r in results_list if r.get("calculated_metric_value") is not None]
if not valid_results:
return None
valid_results.sort(key=lambda r: (source_priority.get(r.get("metric_source"), 99), -r.get("metric_confidence", 0.0)))
logger.info(f"Best result chosen: {valid_results[0]}")
if not valid_results: return None
valid_results.sort(key=lambda r: source_priority.get(r.get("metric_source"), 99))
return valid_results[0]
def _get_website_content_and_url(self, company: Company) -> Tuple[Optional[str], Optional[str]]:
return scrape_website_content(company.website), company.website
def _get_website_content_and_url(self, db: Session, company: Company) -> Tuple[Optional[str], Optional[str]]:
enrichment = db.query(EnrichmentData).filter_by(company_id=company.id, source_type="website_scrape").order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "raw_text" in enrichment.content:
return enrichment.content["raw_text"], company.website
content = scrape_website_content(company.website)
return content, company.website
def _get_wikipedia_content_and_url(self, db: Session, company_id: int) -> Tuple[Optional[str], Optional[str]]:
wiki_data = self._get_wikipedia_content(db, company_id)
@@ -140,129 +134,240 @@ JSON ONLY.
def _get_serpapi_content_and_url(self, company: Company, search_term: str) -> Tuple[Optional[str], Optional[str]]:
serp_results = run_serp_search(f"{company.name} {company.city or ''} {search_term}")
if not serp_results:
return None, None
if not serp_results: return None, None
content = " ".join([res.get("snippet", "") for res in serp_results.get("organic_results", [])])
url = serp_results.get("organic_results", [{}])[0].get("link") if serp_results.get("organic_results") else None
return content, url
def _extract_and_calculate_metric_cascade(self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str]) -> Dict[str, Any]:
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "metric_proof_text": None, "metric_source_url": None, "metric_confidence": 0.0, "metric_confidence_reason": "No value found in any source."}
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "proof_text": None, "metric_source_url": None}
sources = [
("website", self._get_website_content_and_url),
("wikipedia", self._get_wikipedia_content_and_url),
("serpapi", self._get_serpapi_content_and_url)
("website", lambda: self._get_website_content_and_url(db, company)),
("wikipedia", lambda: self._get_wikipedia_content_and_url(db, company.id)),
("serpapi", lambda: self._get_serpapi_content_and_url(company, search_term))
]
all_source_results = []
parser = MetricParser()
for source_name, content_loader in sources:
logger.info(f"Checking {source_name} for '{search_term}' for {company.name}")
logger.info(f" -> Checking source: [{source_name.upper()}] for '{search_term}'")
try:
args = (company,) if source_name == 'website' else (db, company.id) if source_name == 'wikipedia' else (company, search_term)
content_text, current_source_url = content_loader(*args)
if not content_text:
logger.info(f"No content for {source_name}.")
continue
content_text, current_source_url = content_loader()
if not content_text or len(content_text) < 100: continue
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
if llm_result:
llm_result['source_url'] = current_source_url
all_source_results.append((source_name, llm_result))
except Exception as e:
logger.error(f"Error in {source_name} stage: {e}")
if llm_result and llm_result.get("proof_text"):
# Use the robust parser on the LLM's proof text or raw_value
hint = llm_result.get("raw_value") or llm_result.get("proof_text")
parsed_value = parser.extract_numeric_value(text=content_text, expected_value=str(hint))
if parsed_value is not None:
llm_result.update({"calculated_metric_value": parsed_value, "calculated_metric_unit": llm_result.get('raw_unit'), "metric_source": source_name, "metric_source_url": current_source_url})
all_source_results.append(llm_result)
except Exception as e: logger.error(f" -> Error in {source_name} stage: {e}")
processed_results = []
for source_name, llm_result in all_source_results:
metric_value = llm_result.get("raw_value")
metric_unit = llm_result.get("raw_unit")
if metric_value is not None and self._is_metric_plausible(search_term, metric_value):
standardized_value = None
if standardization_logic and metric_value is not None:
standardized_value = self._parse_standardization_logic(standardization_logic, metric_value)
processed_results.append({
"calculated_metric_name": search_term,
"calculated_metric_value": metric_value,
"calculated_metric_unit": metric_unit,
"standardized_metric_value": standardized_value,
"standardized_metric_unit": standardized_unit,
"metric_source": source_name,
"metric_proof_text": llm_result.get("proof_text"),
"metric_source_url": llm_result.get("source_url"),
"metric_confidence": 0.95,
"metric_confidence_reason": "Value found and extracted by LLM."
})
else:
logger.info(f"LLM found no plausible metric for {search_term} in {source_name}.")
best_result = self._get_best_metric_result(processed_results)
return best_result if best_result else final_result
best_result = self._get_best_metric_result(all_source_results)
if not best_result: return final_result
final_result.update(best_result)
if self._is_metric_plausible(search_term, final_result['calculated_metric_value']):
final_result['standardized_metric_value'] = self._parse_standardization_logic(standardization_logic, final_result['calculated_metric_value'])
return final_result
def extract_metrics_for_industry(self, company: Company, db: Session, industry: Industry) -> Company:
if not industry or not industry.scraper_search_term:
logger.warning(f"No metric configuration for industry '{industry.name if industry else 'None'}'")
return company
# Improved unit derivation
if "" in (industry.standardization_logic or "") or "" in (industry.scraper_search_term or ""):
std_unit = ""
else:
std_unit = "Einheiten"
metrics = self._extract_and_calculate_metric_cascade(
db, company, industry.name, industry.scraper_search_term, industry.standardization_logic, std_unit
)
company.calculated_metric_name = metrics["calculated_metric_name"]
company.calculated_metric_value = metrics["calculated_metric_value"]
company.calculated_metric_unit = metrics["calculated_metric_unit"]
company.standardized_metric_value = metrics["standardized_metric_value"]
company.standardized_metric_unit = metrics["standardized_metric_unit"]
company.metric_source = metrics["metric_source"]
company.metric_proof_text = metrics["metric_proof_text"]
company.metric_source_url = metrics.get("metric_source_url")
company.metric_confidence = metrics["metric_confidence"]
company.metric_confidence_reason = metrics["metric_confidence_reason"]
company.last_classification_at = datetime.utcnow()
db.commit()
return company
def _find_direct_area(self, db: Session, company: Company, industry_name: str) -> Optional[Dict[str, Any]]:
logger.info(" -> (Helper) Running specific search for 'Fläche'...")
area_metrics = self._extract_and_calculate_metric_cascade(db, company, industry_name, search_term="Fläche", standardization_logic=None, standardized_unit="")
if area_metrics and area_metrics.get("calculated_metric_value") is not None:
unit = (area_metrics.get("calculated_metric_unit") or "").lower()
if any(u in unit for u in ["", "qm", "quadratmeter"]):
logger.info(" ✅ SUCCESS: Found direct area value.")
area_metrics['standardized_metric_value'] = area_metrics['calculated_metric_value']
return area_metrics
return None
def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company:
logger.info(f"Re-evaluating metric for {company.name}...")
return self.extract_metrics_for_industry(company, db, industry)
def _summarize_website_for_opener(self, company_name: str, website_text: str) -> str:
"""
Creates a high-quality summary of the website content to provide
better context for the opener generation.
"""
prompt = f"""
**Rolle:** Du bist ein erfahrener B2B-Marktanalyst mit Fokus auf Facility Management und Gebäudereinigung.
**Aufgabe:** Analysiere den Website-Text des Unternehmens '{company_name}' und erstelle ein prägnantes Dossier.
**Deine Analyse besteht aus ZWEI TEILEN:**
**TEIL 1: Geschäftsmodell-Analyse**
1. Identifiziere die Kernprodukte und/oder Dienstleistungen des Unternehmens.
2. Fasse in 2-3 prägnanten Sätzen zusammen, was das Unternehmen macht und für welche Kunden.
**TEIL 2: Reinigungspotenzial & Hygiene-Analyse**
1. Scanne den Text gezielt nach Hinweisen auf große Bodenflächen, Publikumsverkehr oder hohe Hygieneanforderungen (Schlüsselwörter: Reinigung, Sauberkeit, Hygiene, Bodenpflege, Verkaufsfläche, Logistikhalle, Patientenversorgung, Gästeerlebnis).
2. Bewerte das Potenzial für automatisierte Reinigungslösungen auf einer Skala (Hoch / Mittel / Niedrig).
3. Extrahiere die 1-2 wichtigsten Sätze, die diese Anforderungen oder die Größe der Einrichtung belegen.
**Antworte AUSSCHLIESSLICH im folgenden exakten Format:**
GESCHÄFTSMODELL: <Deine 2-3 Sätze über das Kerngeschäft des Unternehmens.>
REINIGUNGSPOTENZIAL: <Hoch / Mittel / Niedrig / Kein Hinweis>
HYGIENE-BEWEISE: <Die 1-2 aussagekräftigsten Sätze als Bullet Points (* Satz 1...)>
**Hier ist der Website-Text:**
{website_text[:5000]}
"""
try:
response = call_gemini_flash(prompt)
return response.strip() if response else "Keine Zusammenfassung möglich."
except Exception as e:
logger.error(f"Summary Error: {e}")
return "Fehler bei der Zusammenfassung."
def _generate_marketing_opener(self, company: Company, industry: Industry, context_text: str, focus_mode: str = "primary") -> Optional[str]:
if not industry: return None
# 1. Determine Product Category & Context
category = industry.primary_category
raw_pains = industry.pains or ""
raw_gains = industry.gains or ""
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
category = industry.secondary_category
product_name = category.name if category else "Robotik-Lösungen"
product_desc = category.description if category and category.description else "Automatisierung von operativen Prozessen"
# Split pains/gains based on markers
def extract_segment(text, marker):
if not text: return ""
segments = re.split(r'\[(.*?)\]', text)
for i in range(1, len(segments), 2):
if marker.lower() in segments[i].lower():
return segments[i+1].strip()
return text
relevant_pains = extract_segment(raw_pains, "Primary Product")
relevant_gains = extract_segment(raw_gains, "Primary Product")
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
relevant_pains = extract_segment(raw_pains, "Secondary Product")
relevant_gains = extract_segment(raw_gains, "Secondary Product")
prompt = f"""
Du bist ein scharfsinniger Marktbeobachter und Branchenexperte. Formuliere eine prägnante Einleitung (genau 2 Sätze) für ein Anschreiben an das Unternehmen {company.name}.
DEINE PERSONA:
Ein direkter Branchenkenner, der eine relevante Beobachtung teilt. Dein Ton ist faktenbasiert, professionell und absolut NICHT verkäuferisch. Dein Ziel ist es, schnell zur operativen Herausforderung überzuleiten.
STRATEGISCHER HINTERGRUND (Nicht nennen!):
Dieses Unternehmen wird kontaktiert, weil sein Geschäftsmodell perfekt zu folgendem Bereich passt: "{product_name}" ({product_desc}).
Ziel des Schreibens ist es, die Branchen-Herausforderungen "{relevant_pains}" zu adressieren und die Mehrwerte "{relevant_gains}" zu ermöglichen.
DEINE AUFGABE:
1. Firmenname kürzen: Kürze "{company.name}" sinnvoll (meist erste zwei Worte). Entferne UNBEDINGT Rechtsformen wie GmbH, AG, gGmbH, e.V. etc.
2. Struktur: Genau 2 flüssige Sätze. NICHT MEHR.
3. Inhalt:
- Satz 1: Eine faktenbasierte, relevante Beobachtung zum Geschäftsmodell oder einem aktuellen Fokus des Unternehmens (siehe Analyse-Dossier). Vermeide Lobhudelei und generische Floskeln.
- Satz 2: Leite direkt und prägnant zu einer spezifischen operativen Herausforderung über, die für das Unternehmen aufgrund seiner Größe oder Branche relevant ist (orientiere dich an "{relevant_pains}").
4. STRENGES VERBOT: Nenne KEIN Produkt ("{product_name}") und biete KEINE "Lösungen", "Hilfe" oder "Zusammenarbeit" an. Der Text soll eine reine Beobachtung bleiben.
5. KEINE Anrede (kein "Sehr geehrte Damen und Herren", kein "Hallo").
KONTEXT (Analyse-Dossier):
{context_text}
BEISPIEL-STIL:
"Das Kreiskrankenhaus Weilburg leistet einen bedeutenden Beitrag zur regionalen Patientenversorgung. Bei der lückenlosen Dokumentation und den strengen Hygienevorgaben im Klinikalltag ist die Aufrechterhaltung höchster Standards jedoch eine enorme operative Herausforderung."
AUSGABE: Nur der fertige Text.
"""
try:
response = call_gemini_flash(prompt)
return response.strip().strip('"') if response else None
except Exception as e:
logger.error(f"Opener Error: {e}")
return None
def _sync_company_address_data(self, db: Session, company: Company):
"""Extracts address and VAT data from website scrape if available."""
from ..database import EnrichmentData
enrichment = db.query(EnrichmentData).filter_by(
company_id=company.id, source_type="website_scrape"
).order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "impressum" in enrichment.content:
imp = enrichment.content["impressum"]
if imp and isinstance(imp, dict):
changed = False
# City
if imp.get("city") and not company.city:
company.city = imp.get("city")
changed = True
# Street
if imp.get("street") and not company.street:
company.street = imp.get("street")
changed = True
# Zip / PLZ
zip_val = imp.get("zip") or imp.get("plz")
if zip_val and not company.zip_code:
company.zip_code = zip_val
changed = True
# Country
if imp.get("country_code") and (not company.country or company.country == "DE"):
company.country = imp.get("country_code")
changed = True
# VAT ID
if imp.get("vat_id") and not company.crm_vat:
company.crm_vat = imp.get("vat_id")
changed = True
if changed:
db.commit()
logger.info(f"Updated Address/VAT from Impressum for {company.name}: City={company.city}, VAT={company.crm_vat}")
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"Starting classification for {company.name}...")
logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
# Ensure metadata is synced from scrape
self._sync_company_address_data(db, company)
# 1. Load Definitions
industries = self._load_industry_definitions(db)
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
# 2. Get Content (Website)
website_content, _ = self._get_website_content_and_url(company)
if not website_content:
logger.warning(f"No website content for {company.name}. Skipping classification.")
website_content, _ = self._get_website_content_and_url(db, company)
if not website_content or len(website_content) < 100:
company.status = "ENRICH_FAILED"
db.commit()
return company
# 3. Classify Industry
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
logger.info(f"AI suggests industry: {suggested_industry_name}")
# 4. Update Company
# Match back to DB object
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
if not matched_industry:
company.industry_ai = "Others"
db.commit()
return company
if matched_industry:
company.industry_ai = matched_industry.name
else:
company.industry_ai = "Others"
# 5. Extract Metrics (Cascade)
if matched_industry:
self.extract_metrics_for_industry(company, db, matched_industry)
company.industry_ai = matched_industry.name
logger.info(f"✅ Industry: {matched_industry.name}")
metrics = self._find_direct_area(db, company, matched_industry.name)
if not metrics:
logger.info(" -> No direct area. Trying proxy...")
if matched_industry.scraper_search_term:
metrics = self._extract_and_calculate_metric_cascade(db, company, matched_industry.name, search_term=matched_industry.scraper_search_term, standardization_logic=matched_industry.standardization_logic, standardized_unit="")
if metrics and metrics.get("calculated_metric_value"):
logger.info(f" ✅ SUCCESS: {metrics.get('calculated_metric_value')} {metrics.get('calculated_metric_unit')}")
company.calculated_metric_name = metrics.get("calculated_metric_name", matched_industry.scraper_search_term or "Fläche")
company.calculated_metric_value = metrics.get("calculated_metric_value")
company.calculated_metric_unit = metrics.get("calculated_metric_unit")
company.standardized_metric_value = metrics.get("standardized_metric_value")
company.standardized_metric_unit = metrics.get("standardized_metric_unit")
company.metric_source = metrics.get("metric_source")
company.metric_proof_text = metrics.get("proof_text")
company.metric_source_url = metrics.get("metric_source_url")
company.metric_confidence = 0.8
company.metric_confidence_reason = "Metric processed."
# NEW: Two-Step approach with summarization
website_summary = self._summarize_website_for_opener(company.name, website_content)
company.research_dossier = website_summary
company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_summary, "primary")
company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_summary, "secondary")
company.last_classification_at = datetime.utcnow()
company.status = "ENRICHED"
db.commit()
logger.info(f"--- ✅ Analysis Finished for {company.name} ---")
return company

View File

@@ -0,0 +1,157 @@
from sqlalchemy.orm import Session
from ..database import JobRolePattern, Persona
from ..lib.core_utils import call_gemini_flash
import json
import logging
import re
import ast
logger = logging.getLogger(__name__)
class PatternOptimizationService:
def __init__(self, db: Session):
self.db = db
def generate_proposals(self):
"""
Analyzes existing EXACT patterns and proposes consolidated REGEX patterns.
"""
# ... (Fetch Data logic remains)
# 1. Fetch Data
patterns = self.db.query(JobRolePattern).filter(JobRolePattern.pattern_type == "exact").all()
# Group by Role
roles_data = {}
pattern_map = {}
for p in patterns:
if p.role not in roles_data:
roles_data[p.role] = []
roles_data[p.role].append(p.pattern_value)
pattern_map[p.pattern_value] = p.id
if not roles_data:
return []
proposals = []
# 2. Analyze each role
for target_role in roles_data.keys():
target_titles = roles_data[target_role]
if len(target_titles) < 3:
continue
negative_examples = []
for other_role, titles in roles_data.items():
if other_role != target_role:
negative_examples.extend(titles[:50])
# 3. Build Prompt
prompt = f"""
Act as a Regex Optimization Engine for B2B Job Titles.
GOAL: Break down the list of 'Positive Examples' into logical CLUSTERS and create a Regex for each cluster.
TARGET ROLE: "{target_role}"
TITLES TO COVER (Positive Examples):
{json.dumps(target_titles)}
TITLES TO AVOID (Negative Examples - DO NOT MATCH THESE):
{json.dumps(negative_examples[:150])}
INSTRUCTIONS:
1. Analyze the 'Positive Examples'. Do NOT try to create one single regex for all of them.
2. Identify distinct semantic groups.
3. Create a Regex for EACH group.
4. CRITICAL - CONFLICT HANDLING:
- The Regex must NOT match the 'Negative Examples'.
- Use Negative Lookahead (e.g. ^(?=.*Manager)(?!.*Facility).*) if needed.
5. Aggressiveness: Be bold.
OUTPUT FORMAT:
Return a valid Python List of Dictionaries.
Example:
[
{{
"regex": r"(?i).*pattern.*",
"explanation": "Explanation...",
"suggested_priority": 50
}}
]
Enclose regex patterns in r"..." strings to handle backslashes correctly.
"""
try:
logger.info(f"Optimizing patterns for role: {target_role} (Positive: {len(target_titles)})")
response = call_gemini_flash(prompt) # Removed json_mode=True to allow Python syntax
# Cleanup markdown
clean_text = response.strip()
if clean_text.startswith("```python"):
clean_text = clean_text[9:-3]
elif clean_text.startswith("```json"):
clean_text = clean_text[7:-3]
elif clean_text.startswith("```"):
clean_text = clean_text[3:-3]
clean_text = clean_text.strip()
ai_suggestions = []
try:
# First try standard JSON
ai_suggestions = json.loads(clean_text)
except json.JSONDecodeError:
try:
# Fallback: Python AST Literal Eval (handles r"..." strings)
ai_suggestions = ast.literal_eval(clean_text)
except Exception as e:
logger.error(f"Failed to parse response for {target_role} with JSON and AST. Error: {e}")
continue
# Verify and map back IDs
for sugg in ai_suggestions:
try:
regex_str = sugg.get('regex')
if not regex_str: continue
# Python AST already handles r"..." decoding, so regex_str is the raw pattern
regex = re.compile(regex_str)
# Calculate coverage locally
covered_ids = []
covered_titles_verified = []
for t in target_titles:
if regex.search(t):
if t in pattern_map:
covered_ids.append(pattern_map[t])
covered_titles_verified.append(t)
# Calculate False Positives
false_positives = []
for t in negative_examples:
if regex.search(t):
false_positives.append(t)
if len(covered_ids) >= 2 and len(false_positives) == 0:
proposals.append({
"target_role": target_role,
"regex": regex_str,
"explanation": sugg.get('explanation', 'No explanation provided'),
"priority": sugg.get('suggested_priority', 50),
"covered_pattern_ids": covered_ids,
"covered_titles": covered_titles_verified,
"false_positives": false_positives
})
except re.error:
logger.warning(f"AI generated invalid regex: {sugg.get('regex')}")
continue
except Exception as e:
logger.error(f"Error optimizing patterns for {target_role}: {e}", exc_info=True)
continue
logger.info(f"Optimization complete. Generated {len(proposals)} proposals.")
return proposals

View File

@@ -0,0 +1,63 @@
import logging
import re
from sqlalchemy.orm import Session
from typing import Optional
from ..database import JobRolePattern, RawJobTitle, Persona, Contact
logger = logging.getLogger(__name__)
class RoleMappingService:
def __init__(self, db: Session):
self.db = db
def get_role_for_job_title(self, job_title: str) -> Optional[str]:
"""
Finds the corresponding role for a given job title using a multi-step process.
1. Check for exact matches.
2. Evaluate regex patterns.
"""
if not job_title:
return None
# Normalize job title for matching
normalized_title = job_title.lower().strip()
# 1. Fetch all active patterns from the database, ordered by priority
patterns = self.db.query(JobRolePattern).filter(
JobRolePattern.is_active == True
).order_by(JobRolePattern.priority.asc()).all()
# 2. Separate patterns for easier processing
exact_patterns = {p.pattern_value.lower(): p.role for p in patterns if p.pattern_type == 'exact'}
regex_patterns = [(p.pattern_value, p.role) for p in patterns if p.pattern_type == 'regex']
# 3. Check for exact match first (most efficient)
if normalized_title in exact_patterns:
return exact_patterns[normalized_title]
# 4. Evaluate regex patterns
for pattern, role in regex_patterns:
try:
if re.search(pattern, job_title, re.IGNORECASE):
return role
except re.error as e:
logger.error(f"Invalid regex for role '{role}': {pattern}. Error: {e}")
continue
return None
def add_or_update_unclassified_title(self, job_title: str):
"""
Logs an unclassified job title or increments its count if already present.
"""
if not job_title:
return
entry = self.db.query(RawJobTitle).filter(RawJobTitle.title == job_title).first()
if entry:
entry.count += 1
else:
entry = RawJobTitle(title=job_title, count=1)
self.db.add(entry)
self.db.commit()