[2f988f42] fix(company-explorer): Implement robust quantitative potential and atomic opener generation\n\n- Refactored ClassificationService for two-stage metric extraction (direct area and proxy).- Enhanced MetricParser for targeted value matching and robust number parsing.- Implemented persona-specific 'Atomic Opener' generation using segmented pains.- Fixed logging configuration and Pydantic response models.- Added dedicated debugging script and updated documentation (GEMINI.md, MIGRATION_PLAN.md).

This commit is contained in:
2026-02-21 08:01:07 +00:00
parent 26644f0e54
commit cf7cac2819
13 changed files with 666 additions and 534 deletions

View File

@@ -1 +1 @@
{"task_id": "2ff88f42-8544-8018-883f-e8837c0421af", "token": "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", "session_start_time": "2026-02-20T13:24:58.251700"}
{"task_id": "2f988f42-8544-8100-9dba-e69ee2376730", "token": "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", "session_start_time": "2026-02-21T04:23:04.206814"}

View File

@@ -173,3 +173,44 @@ Since the "Golden Record" for Industry Verticals (Pains, Gains, Products) reside
**Troubleshooting:**
* **"BaseModel" Error:** Usually a mix-up between Pydantic and SQLAlchemy `Base`. Check imports in `database.py`.
* **Missing Dependencies:** The CLI agent runs in `/app` but not necessarily inside the container's venv. Use standard tools (`grep`, `sqlite3`) where possible.
---
## Critical Debugging Session (Feb 21, 2026) - Re-Stabilizing the Analysis Engine
A critical session was required to fix a series of cascading failures in the `ClassificationService`. The key takeaways are documented here to prevent future issues.
1. **The "Phantom" `NameError`:**
* **Symptom:** The application crashed with a `NameError: name 'joinedload' is not defined`, even though the import was correctly added to `classification.py`.
* **Root Cause:** The `uvicorn` server's hot-reload mechanism within the Docker container did not reliably pick up file changes made from outside the container. A simple `docker-compose restart` was insufficient to clear the process's cached state.
* **Solution:** After any significant code change, especially to imports or core logic, a forced-recreation of the container is **mandatory**.
```bash
# Correct Way to Apply Changes:
docker-compose up -d --build --force-recreate company-explorer
```
2. **The "Invisible" Logs:**
* **Symptom:** No debug logs were being written, making it impossible to trace the execution flow.
* **Root Cause:** The `LOG_DIR` path in `/company-explorer/backend/config.py` was misconfigured (`/app/logs_debug`) and did not point to the actual, historical log directory (`/app/Log_from_docker`).
* **Solution:** Configuration paths must be treated as absolute and verified. Correcting the `LOG_DIR` path immediately resolved the issue.
3. **Inefficient Debugging Loop:**
* **Symptom:** The cycle of triggering a background job via API, waiting, and then manually checking logs was slow and inefficient.
* **Root Cause:** Lack of a tool to test the core application logic in isolation.
* **Solution:** The creation of a dedicated, interactive test script (`/company-explorer/backend/scripts/debug_single_company.py`). This script allows running the entire analysis for a single company in the foreground, providing immediate and detailed feedback. This pattern is invaluable for complex, multi-step processes and should be a standard for future development.
--- End of Context from: GEMINI.md ---Here are the available functions:
[
"list_directory",
"read_file",
"search_file_content",
"glob",
"activate_skill",
"replace",
"write_file",
"web_fetch",
"run_shell_command",
"save_memory",
"google_web_search",
"write_todos",
"delegate_to_agent"
]

0
Generating Normal file
View File

View File

@@ -159,3 +159,98 @@ Anweisungen für den "Bautrupp" (Gemini CLI).
* **Pfad:** `/volume1/homes/Floke/python/brancheneinstufung/company-explorer`
* **DB:** `/app/companies_v3_fixed_2.db`
* **Sync:** `docker exec -it company-explorer python backend/scripts/sync_notion_to_ce_enhanced.py`
---
## 17. Analyse-Logik v3.0 (Feb 2026): Quantitative Potenzialanalyse & "Atomic Opener"
Nach mehreren instabilen Iterationen wurde die Kernlogik des `ClassificationService` finalisiert. Dieser Abschnitt dient als "Single Source of Truth", um zukünftige Fehlentwicklungen zu vermeiden.
### 17.1 Das Gesamtbild: Vom Content zur fertigen Analyse
Der Prozess ist streng sequenziell und baut aufeinander auf.
```
1. Branchen-Klassifizierung
|
-> Erkannte Branche: "Healthcare - Hospital"
|
2. Quantitative Potenzialanalyse (Zweistufige Kaskade)
|
--> 2a. Stufe 1: Direkte Flächensuche ("Fläche in m²")
| |
| --> Ergebnis: FEHLSCHLAG
|
--> 2b. Stufe 2: Branchenspezifische Proxy-Suche
|
--> Suchbegriff (aus Branche): "Anzahl Betten"
--> Formel (aus Branche): "wert * 100"
|
-> Ergebnis: 250 Betten -> 25000 m²
|
3. "Atomic Opener" Generierung (Zwei getrennte Personas)
|
--> 3a. Opener 1 (Primär): Fokus auf Infrastruktur-Entscheider
| |
| --> Produkt-Kontext: Nassreinigungsroboter (Primärprodukt)
| --> Pain-Kontext: Hygiene-Audits, Keimbelastung
|
--> 3b. Opener 2 (Sekundär): Fokus auf Operativen Entscheider
|
--> Produkt-Kontext: Serviceroboter (Sekundärprodukt, da "ops_focus_secondary" aktiv)
|
--> Pain-Kontext: Personalmangel, Entlastung der Pflegekräfte
|
4. FINALES COMMIT
```
### 17.2 Quantitative Potenzialanalyse im Detail
**Ziel:** Für jedes Unternehmen einen `standardized_metric_value` in `m²` zu ermitteln.
* **Stufe 1: Direkte Flächensuche (Direct Hit)**
* Das System sucht **immer** zuerst nach direkten Flächenangaben (Keywords: "Fläche", "m²", "Quadratmeter").
* Findet der `MetricParser` einen plausiblen Wert, wird dieser direkt in `standardized_metric_value` geschrieben und der Prozess ist für diese Stufe beendet. `calculated_metric_value` ist in diesem Fall identisch.
* **Stufe 2: Proxy-Metrik-Suche (Fallback)**
* **Nur wenn Stufe 1 fehlschlägt**, wird die branchenspezifische Logik aus den `industries`-Settings angewendet.
* **Suchbegriff:** `scraper_search_term` (z.B. "Anzahl Betten", "Anzahl Passagiere").
* **Extraktion:** Der `MetricParser` extrahiert den Rohwert (z.B. `250`). Dieser wird in `calculated_metric_value` gespeichert.
* **Standardisierung:** Die Formel aus `standardization_logic` (z.B. `wert * 100`) wird auf den Rohwert angewendet. Das Ergebnis wird in `standardized_metric_value` geschrieben.
### 17.3 "Atomic Opener" Generierung im Detail
**Ziel:** Zwei hoch-personalisierte, schlagkräftige Einleitungssätze (1-2 Sätze) zu generieren, die eine operative Herausforderung implizieren, ohne die Lösung zu nennen.
* **Zwei getrennte Kontexte:** Es werden zwei Sätze für zwei Personas generiert:
1. **`ai_opener` (Primär):** Zielt auf den **Infrastruktur-Entscheider** (z.B. Facility Manager, Technischer Leiter).
2. **`ai_opener_secondary` (Sekundär):** Zielt auf den **Operativen Entscheider** (z.B. Produktionsleiter, Pflegedienstleitung).
* **Persona-spezifische Produktauswahl:**
* Der primäre Opener (Infrastruktur) bezieht sich **immer** auf das `primary_category` der Branche.
* Der sekundäre Opener (Operativ) bezieht sich:
* Standardmäßig ebenfalls auf das `primary_category`.
* **Ausnahme:** Wenn in der Branche `ops_focus_secondary = True` gesetzt ist, bezieht er sich auf das `secondary_category`.
* **Der "1komma5°"-Prompt:**
* Die Generierung nutzt einen bewährten Prompt, der das Sprachmodell anweist, das Geschäftsmodell des Unternehmens zu analysieren und eine wertschätzende Beobachtung zu formulieren.
* **"Munition":** Der Prompt wird dynamisch mit den hoch-spezifischen, vordefinierten `pains` und `gains` aus der jeweiligen Branche angereichert.
* **Regel:** Das Produkt selbst wird **nicht** im Opener genannt. Der Satz fokussiert sich rein auf die Formulierung der Herausforderung. Die Auflösung erfolgt in den nachfolgenden, persona-spezifischen Textbausteinen.
### 17.4 Debugging & Lessons Learned (Feb 21, 2026)
Die Implementierung der v3.0-Logik war von mehreren hartnäckigen Problemen geprägt, deren Behebung wichtige Erkenntnisse für die zukünftige Entwicklung lieferte.
1. **"Phantom" `NameError` für `joinedload`:**
* **Problem:** Trotz korrekter `import`-Anweisung wurde ein `NameError` ausgelöst.
* **Lösung:** Ein erzwungener Neustart des Containers (`--force-recreate`) ist nach kritischen Code-Änderungen (besonders Imports) unerlässlich.
2. **Die "Krankenhaus-Schlacht" (Proxy-Metriken & Parser-Interferenz):**
* **Problem:** Bei Kliniken wurde oft der Wert "100" extrahiert (aus "100%ige Trägerschaft"), anstatt der korrekten Bettenanzahl. Zudem scheiterte die Standardisierung an Resten von Einheiten in der Formel (z.B. `wert * 100 (m²)`).
* **Lösung 1 (Targeted Matching):** Der `MetricParser` wurde so umgebaut, dass er einen "Hint" (erwarteter Wert vom LLM) priorisiert. Er sucht nun im Volltext exakt nach der Ziffernfolge, die das LLM identifiziert hat, und ignoriert alle anderen plausiblen Zahlen.
* **Lösung 2 (Aggressive Formula Cleaning):** Die `_parse_standardization_logic` entfernt nun konsequent alles in Klammern und alle Nicht-Rechenzeichen, bevor sie `safe_eval_math` aufruft. Dies verhindert `SyntaxError` durch Datenbank-Reste.
3. **Persona-spezifische Pains:**
* **Erkenntnis:** Damit die Opener wirklich zwischen Infrastruktur und Betrieb unterscheiden, müssen die `pains` in der Datenbank mit Markern wie `[Primary Product]` und `[Secondary Product]` versehen werden. Die Logik wurde entsprechend angepasst, um diese Segmente gezielt zu extrahieren.
Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik.

View File

@@ -32,7 +32,7 @@ setup_logging()
import logging
logger = logging.getLogger(__name__)
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .services.deduplication import Deduplicator
from .services.discovery import DiscoveryService
from .services.scraping import ScraperService
@@ -101,6 +101,71 @@ class ProvisioningResponse(BaseModel):
opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics)
texts: Dict[str, Optional[str]] = {}
class IndustryDetails(BaseModel):
pains: Optional[str] = None
gains: Optional[str] = None
priority: Optional[str] = None
notes: Optional[str] = None
ops_focus_secondary: bool = False
class Config:
from_attributes = True
class ContactResponse(BaseModel):
id: int
first_name: Optional[str] = None
last_name: Optional[str] = None
job_title: Optional[str] = None
role: Optional[str] = None
email: Optional[str] = None
is_primary: bool
class Config:
from_attributes = True
class EnrichmentDataResponse(BaseModel):
id: int
source_type: str
content: Dict[str, Any]
is_locked: bool
wiki_verified_empty: bool
updated_at: datetime
class Config:
from_attributes = True
class CompanyDetailsResponse(BaseModel):
id: int
name: str
website: Optional[str] = None
city: Optional[str] = None
country: Optional[str] = None
industry_ai: Optional[str] = None
status: str
# Metrics
calculated_metric_name: Optional[str] = None
calculated_metric_value: Optional[float] = None
calculated_metric_unit: Optional[str] = None
standardized_metric_value: Optional[float] = None
standardized_metric_unit: Optional[str] = None
metric_source: Optional[str] = None
metric_proof_text: Optional[str] = None
metric_source_url: Optional[str] = None
metric_confidence: Optional[float] = None
# Openers
ai_opener: Optional[str] = None
ai_opener_secondary: Optional[str] = None
# Relations
industry_details: Optional[IndustryDetails] = None
contacts: List[ContactResponse] = []
enrichment_data: List[EnrichmentDataResponse] = []
class Config:
from_attributes = True
# --- Events ---
@app.on_event("startup")
def on_startup():
@@ -336,7 +401,7 @@ def export_companies_csv(db: Session = Depends(get_db), username: str = Depends(
headers={"Content-Disposition": f"attachment; filename=company_export_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"}
)
@app.get("/api/companies/{company_id}")
@app.get("/api/companies/{company_id}", response_model=CompanyDetailsResponse)
def get_company(company_id: int, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).options(
joinedload(Company.enrichment_data),
@@ -350,28 +415,14 @@ def get_company(company_id: int, db: Session = Depends(get_db), username: str =
if company.industry_ai:
ind = db.query(Industry).filter(Industry.name == company.industry_ai).first()
if ind:
industry_details = {
"pains": ind.pains,
"gains": ind.gains,
"priority": ind.priority,
"notes": ind.notes,
"ops_focus_secondary": ind.ops_focus_secondary
}
industry_details = IndustryDetails.model_validate(ind)
# HACK: Attach to response object (Pydantic would be cleaner, but this works for fast prototyping)
# We convert to dict and append
resp = company.__dict__.copy()
resp["industry_details"] = industry_details
# Handle SQLAlchemy internal state
if "_sa_instance_state" in resp: del resp["_sa_instance_state"]
# Handle relationships manually if needed, or let FastAPI encode the SQLAlchemy model + extra dict
# Better: return a custom dict merging both
# FastAPI will automatically serialize the 'company' ORM object into the
# CompanyDetailsResponse schema. We just need to attach the extra 'industry_details'.
response_data = CompanyDetailsResponse.model_validate(company)
response_data.industry_details = industry_details
# Since we use joinedload, relationships are loaded.
# Let's rely on FastAPI's ability to serialize the object, but we need to inject the extra field.
# The safest way without changing Pydantic schemas everywhere is to return a dict.
return {**resp, "enrichment_data": company.enrichment_data, "contacts": company.contacts, "signals": company.signals}
return response_data
@app.post("/api/companies")
def create_company(company: CompanyCreate, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
@@ -797,23 +848,21 @@ def run_analysis_task(company_id: int):
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company: return
if not company:
logger.error(f"Analysis Task: Company with ID {company_id} not found.")
return
logger.info(f"Running Analysis Task for {company.name}")
logger.info(f"--- [BACKGROUND TASK] Starting for {company.name} ---")
# --- 1. Scrape Website (if not locked) ---
# Check for existing scrape data first
existing_scrape = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company.id,
EnrichmentData.source_type == "website_scrape"
).first()
# If it doesn't exist or is not locked, we perform a scrape
if not existing_scrape or not existing_scrape.is_locked:
logger.info(f"Scraping website for {company.name}...")
scrape_res = scraper.scrape_url(company.website) # Use singleton
# Now, either create new or update existing
scrape_res = scraper.scrape_url(company.website)
if not existing_scrape:
db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_res))
logger.info("Created new website_scrape entry.")
@@ -825,15 +874,16 @@ def run_analysis_task(company_id: int):
else:
logger.info("Website scrape is locked. Skipping.")
# 2. Classify Industry & Metrics
# IMPORTANT: Using the new method name and passing db session
# --- 2. Classify Industry & Metrics ---
logger.info(f"Handing over to ClassificationService for {company.name}...")
classifier.classify_company_potential(company, db)
company.status = "ENRICHED"
db.commit()
logger.info(f"Analysis complete for {company.name}")
logger.info(f"--- [BACKGROUND TASK] Successfully finished for {company.name} ---")
except Exception as e:
logger.error(f"Analyze Task Error: {e}", exc_info=True)
logger.critical(f"--- [BACKGROUND TASK] CRITICAL ERROR for Company ID {company_id} ---", exc_info=True)
finally:
db.close()

View File

@@ -22,7 +22,7 @@ try:
SERP_API_KEY: Optional[str] = None
# Paths
LOG_DIR: str = "/app/logs_debug"
LOG_DIR: str = "/app/Log_from_docker"
class Config:
env_file = ".env"
@@ -40,7 +40,7 @@ except ImportError:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
SERP_API_KEY = os.getenv("SERP_API_KEY")
LOG_DIR = "/app/logs_debug"
LOG_DIR = "/app/Log_from_docker"
settings = FallbackSettings()

View File

@@ -68,6 +68,10 @@ class Company(Base):
metric_source_url = Column(Text, nullable=True) # URL where the proof was found
metric_confidence = Column(Float, nullable=True) # 0.0 - 1.0
metric_confidence_reason = Column(Text, nullable=True) # Why is it high/low?
# NEW: AI-generated Marketing Openers
ai_opener = Column(Text, nullable=True)
ai_opener_secondary = Column(Text, nullable=True)
# Relationships
signals = relationship("Signal", back_populates="company", cascade="all, delete-orphan")

View File

@@ -23,52 +23,43 @@ class MetricParser:
# 1. Pre-cleaning
text_processed = str(text).strip()
logger.info(f"[MetricParser] Processing: '{text_processed}' (Expected: {expected_value})")
logger.info(f"[MetricParser] Processing text (len: {len(text_processed)}) (Hint: {expected_value})")
# Optimize: If we have an expected value, try to clean and parse THAT first
# Optimize: If we have an expected value (hint), try to find that specific number first
if expected_value:
# Try to parse the LLM's raw value directly first (it's often cleaner: "200000")
try:
# Remove simple noise from expected value
# Aggressively strip units and text to isolate the number
clean_expected = str(expected_value).lower()
# Remove common units
for unit in ['', 'qm', 'sqm', 'mitarbeiter', 'employees', 'eur', 'usd', 'chf', '', '$', '£', '¥']:
clean_expected = clean_expected.replace(unit, "")
# Remove multipliers text (we handle multipliers via is_revenue later, but for expected value matching we want the raw number)
# Actually, expected_value "2.5 Mio" implies we want to match 2.5 in the text, OR 2500000?
# Usually the LLM extract matches the text representation.
clean_expected = clean_expected.replace("mio", "").replace("millionen", "").replace("mrd", "").replace("milliarden", "")
clean_expected = clean_expected.replace("tsd", "").replace("tausend", "")
# Final cleanup of non-numeric chars (allow . , ' -)
# But preserve structure for robust parser
clean_expected = clean_expected.replace(" ", "").replace("'", "")
# If it looks like a clean number already, try parsing it
# But use the robust parser to handle German decimals if present in expected
val = MetricParser._parse_robust_number(clean_expected, is_revenue)
# Check if this value (or a close representation) actually exists in the text
# This prevents hallucination acceptance, but allows the LLM to guide us to the *second* number in a string.
# Simplified check: is the digits sequence present?
# No, better: Let the parser run on the FULL text, find all candidates, and pick the one closest to 'val'.
except:
pass
try:
# Clean the hint to get the target digits (e.g. "352" from "352 Betten")
# We only take the FIRST sequence of digits as the target
hint_match = re.search(r'[\d\.,\']+', str(expected_value))
if hint_match:
target_str = hint_match.group(0)
target_digits = re.sub(r'[^0-9]', '', target_str)
if target_digits:
# Find all numbers in the text and check if they match our target
all_numbers_in_text = re.findall(r'[\d\.,\']+', text_processed)
for num_str in all_numbers_in_text:
if target_digits == re.sub(r'[^0-9]', '', num_str):
# Exact digit match!
val = MetricParser._parse_robust_number(num_str, is_revenue)
if val is not None:
logger.info(f"[MetricParser] Found targeted value via hint: '{num_str}' -> {val}")
return val
except Exception as e:
logger.error(f"Error while parsing with hint: {e}")
# Fallback: Classic robust parsing
# Normalize quotes
text_processed = text_processed.replace("", "'").replace("", "'")
# 2. Remove noise: Citations [1] and Year/Date in parentheses (2020)
# We remove everything in parentheses/brackets as it's almost always noise for the metric itself.
text_processed = re.sub(r'\(.*?\)|\[.*?\]', ' ', text_processed).strip()
# 3. Remove common prefixes and currency symbols
prefixes = [
r'ca\.?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*',
r'ca\.?:?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*',
r'mehr als\s*', r'weniger als\s*', r'bis zu\s*', r'about\s*', r'over\s*',
r'approx\.?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?\s*'
r'approx\.?:?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?:?\s*'
]
currencies = [
r'', r'EUR', r'US\$', r'USD', r'CHF', r'GBP', r'£', r'¥', r'JPY'
@@ -79,23 +70,16 @@ class MetricParser:
for c in currencies:
text_processed = re.sub(f'(?i){c}', '', text_processed).strip()
# 4. Remove Range Splitting (was too aggressive, cutting off text after dashes)
# Old: text_processed = re.split(r'\s*(-||bis|to)\s*', text_processed, 1)[0].strip()
# 5. Extract Multipliers (Mio, Mrd)
# 4. Extract Multipliers (Mio, Mrd)
multiplier = 1.0
lower_text = text_processed.lower()
def has_unit(text, units):
for u in units:
# Escape special chars if any, though mostly alphanumeric here
# Use word boundaries \b for safe matching
if re.search(r'\b' + re.escape(u) + r'\b', text):
return True
return False
# For Revenue, we normalize to Millions (User Rule)
# For others (Employees), we scale to absolute numbers
if is_revenue:
if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']):
multiplier = 1000.0
@@ -111,214 +95,92 @@ class MetricParser:
elif has_unit(lower_text, ['tsd', 'tausend', 'k']):
multiplier = 1000.0
# 6. Extract the number candidate
# Loop through matches to find the best candidate (skipping years if possible)
# 5. Extract the first valid number candidate
candidates = re.finditer(r'([\d\.,\'\s]+)', text_processed)
selected_candidate = None
best_candidate_val = None
matches = [m for m in candidates]
# logger.info(f"DEBUG matches: {[m.group(1) for m in matches]}")
# logger.info(f"DEBUG: Found {len(matches)} matches: {[m.group(1) for m in matches]}")
# Helper to parse a candidate string
def parse_cand(c):
# Extract temporary multiplier for this specific candidate context?
# Complex. For now, we assume the global multiplier applies or we rely on the candidates raw numeric value.
# Actually, simpler: We parse the candidate as is (treating as raw number)
try:
# Remove thousands separators for comparison
c_clean = c.replace("'", "").replace(".", "").replace(" ", "").replace(",", ".") # Rough EN/DE mix
return float(c_clean)
except:
return None
# Parse expected value for comparison
target_val = None
if expected_value:
try:
# Re-apply aggressive cleaning to ensure we have a valid float for comparison
clean_expected = str(expected_value).lower()
for unit in ['', 'qm', 'sqm', 'mitarbeiter', 'employees', 'eur', 'usd', 'chf', '', '$', '£', '¥']:
clean_expected = clean_expected.replace(unit, "")
clean_expected = clean_expected.replace("mio", "").replace("millionen", "").replace("mrd", "").replace("milliarden", "")
clean_expected = clean_expected.replace("tsd", "").replace("tausend", "")
clean_expected = clean_expected.replace(" ", "").replace("'", "")
target_val = MetricParser._parse_robust_number(clean_expected, is_revenue)
except:
pass
for i, match in enumerate(matches):
for match in candidates:
cand = match.group(1).strip()
if not cand: continue
if not cand or not re.search(r'\d', cand):
continue
# Clean candidate for analysis (remove separators)
# Clean candidate
clean_cand = cand.replace("'", "").replace(".", "").replace(",", "").replace(" ", "")
# Check if it looks like a year (4 digits, 1900-2100)
is_year_like = False
# Year detection
if clean_cand.isdigit() and len(clean_cand) == 4:
val = int(clean_cand)
if 1900 <= val <= 2100:
is_year_like = True
continue # Skip years
# Smart Year Skip (Legacy Logic)
if is_year_like and not target_val: # Only skip if we don't have a specific target
if i < len(matches) - 1:
logger.info(f"[MetricParser] Skipping year-like candidate '{cand}' because another number follows.")
continue
# Clean candidate for checking (remove internal spaces if they look like thousands separators)
# Simple approach: Remove all spaces for parsing check
cand_clean_for_parse = cand.replace(" ", "")
# If we have a target value from LLM, check if this candidate matches it
if target_val is not None:
try:
curr_val = MetricParser._parse_robust_number(cand_clean_for_parse, is_revenue)
if abs(curr_val - target_val) < 0.1 or abs(curr_val - target_val/1000) < 0.1 or abs(curr_val - target_val*1000) < 0.1:
selected_candidate = cand # Keep original with spaces for final processing
logger.info(f"[MetricParser] Found candidate '{cand}' matching expected '{expected_value}'")
break
except:
pass
# Fallback logic:
# If we have NO target value, we take the first valid one we find.
# If we DO have a target value, we only take a fallback if we reach the end and haven't found the target?
# Better: We keep the FIRST valid candidate as a fallback in a separate variable.
if selected_candidate is None:
# Check if it's a valid number at all before storing as fallback
try:
MetricParser._parse_robust_number(cand_clean_for_parse, is_revenue)
if not is_year_like:
if best_candidate_val is None: # Store first valid non-year
best_candidate_val = cand
except:
pass
# Smart separator handling for spaces
if " " in cand:
parts = cand.split()
if len(parts) > 1:
if not (len(parts[1]) == 3 and parts[1].isdigit()):
cand = parts[0]
else:
merged = parts[0]
for p in parts[1:]:
if len(p) == 3 and p.isdigit():
merged += p
else:
break
cand = merged
# If we found a specific match, use it. Otherwise use the fallback.
if selected_candidate:
candidate = selected_candidate
elif best_candidate_val:
candidate = best_candidate_val
else:
return None
# logger.info(f"DEBUG: Selected candidate: '{candidate}'")
# Smart separator handling (on the chosen candidate):
# Smart separator handling:
# Smart separator handling:
# A space is only a thousands-separator if it's followed by 3 digits.
# Otherwise it's likely a separator between unrelated numbers (e.g. "80 2020")
if " " in candidate:
parts = candidate.split()
if len(parts) > 1:
# Basic check: if second part is not 3 digits, we take only the first part
if not (len(parts[1]) == 3 and parts[1].isdigit()):
candidate = parts[0]
else:
# It might be 1 000. Keep merging if subsequent parts are also 3 digits.
merged = parts[0]
for p in parts[1:]:
if len(p) == 3 and p.isdigit():
merged += p
else:
break
candidate = merged
# Remove thousands separators (Quote)
candidate = candidate.replace("'", "")
if not candidate or not re.search(r'\d', candidate):
return None
try:
val = MetricParser._parse_robust_number(cand, is_revenue)
if val is not None:
final = val * multiplier
logger.info(f"[MetricParser] Found value: '{cand}' -> {final}")
return final
except:
continue
# Count separators for rule checks
dots = candidate.count('.')
commas = candidate.count(',')
# 7. Concatenated Year Detection (Bug Fix for 802020)
# If the number is long (5-7 digits) and ends with a recent year (2018-2026),
# and has no separators, it's likely a concatenation like "802020".
if dots == 0 and commas == 0 and " " not in candidate:
if len(candidate) >= 5 and len(candidate) <= 7:
for year in range(2018, 2027):
y_str = str(year)
if candidate.endswith(y_str):
val_str = candidate[:-4]
if val_str.isdigit():
logger.warning(f"[MetricParser] Caught concatenated year BUG: '{candidate}' -> '{val_str}' (Year {year})")
candidate = val_str
break
try:
val = MetricParser._parse_robust_number(candidate, is_revenue)
final = val * multiplier
logger.info(f"[MetricParser] Candidate: '{candidate}' -> Multiplier: {multiplier} -> Value: {final}")
return final
except Exception as e:
logger.debug(f"Failed to parse number string '{candidate}': {e}")
return None
return None
@staticmethod
def _parse_robust_number(s: str, is_revenue: bool) -> float:
def _parse_robust_number(s: str, is_revenue: bool) -> Optional[float]:
"""
Parses a number string dealing with ambiguous separators.
Standardizes to Python float.
"""
# Count separators
s = s.strip().replace("'", "")
if not s:
return None
dots = s.count('.')
commas = s.count(',')
# Case 1: Both present (e.g. 1.234,56 or 1,234.56)
if dots > 0 and commas > 0:
# Check which comes last
if s.rfind('.') > s.rfind(','): # US Style: 1,234.56
try:
# Case 1: Both present
if dots > 0 and commas > 0:
if s.rfind('.') > s.rfind(','): # US Style
return float(s.replace(',', ''))
else: # German Style
return float(s.replace('.', '').replace(',', '.'))
# Case 2: Multiple dots
if dots > 1:
return float(s.replace('.', ''))
# Case 3: Multiple commas
if commas > 1:
return float(s.replace(',', ''))
else: # German Style: 1.234,56
return float(s.replace('.', '').replace(',', '.'))
# Case 2: Multiple dots (Thousands: 1.000.000)
if dots > 1:
return float(s.replace('.', ''))
# Case 3: Multiple commas (Unusual, but treat as thousands)
if commas > 1:
return float(s.replace(',', ''))
# Case 4: Only Comma
if commas == 1:
# In German context "1,5" is 1.5. "1.000" is usually 1000.
# If it looks like decimal (1-2 digits after comma), treat as decimal.
# Except if it's exactly 3 digits and not is_revenue? No, comma is almost always decimal in DE.
return float(s.replace(',', '.'))
# Case 5: Only Dot
if dots == 1:
# Ambiguity: "1.005" (1005) vs "1.5" (1.5)
# Rule from Lesson 1: "1.005 Mitarbeiter" extracted as "1" (wrong).
# If dot followed by exactly 3 digits (and no comma), it's a thousands separator.
# FOR REVENUE: dots are generally decimals (375.6 Mio) unless unambiguous.
# Case 4: Only Comma
if commas == 1:
return float(s.replace(',', '.'))
parts = s.split('.')
if len(parts[1]) == 3:
if is_revenue:
# Revenue: 375.600 Mio? Unlikely compared to 375.6 Mio.
# But 1.000 Mio is 1 Billion? No, 1.000 (thousand) millions.
# User Rule: "Revenue: dots are generally treated as decimals"
# "1.005" as revenue -> 1.005 (Millions)
# "1.005" as employees -> 1005
return float(s)
else:
return float(s.replace('.', ''))
# Case 5: Only Dot
if dots == 1:
parts = s.split('.')
if len(parts[1]) == 3:
if is_revenue:
return float(s)
else:
return float(s.replace('.', ''))
return float(s)
return float(s)
return float(s)
except:
return None

View File

@@ -0,0 +1,72 @@
import os
import sys
import argparse
import logging
# Add the backend directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from backend.database import get_db, Company
from backend.services.classification import ClassificationService
from backend.lib.logging_setup import setup_logging
# --- CONFIGURATION ---
# Setup logging to be very verbose for this script
setup_logging()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
def run_debug_analysis(company_id: int):
"""
Runs the full classification and enrichment process for a single company
in the foreground and prints detailed results.
"""
logger.info(f"--- Starting Interactive Debug for Company ID: {company_id} ---")
db_session = next(get_db())
try:
# 1. Fetch the company
company = db_session.query(Company).filter(Company.id == company_id).first()
if not company:
logger.error(f"Company with ID {company_id} not found.")
return
logger.info(f"Found Company: {company.name}")
# --- PRE-ANALYSIS STATE ---
print("\n--- METRICS BEFORE ---")
print(f"Calculated: {company.calculated_metric_value} {company.calculated_metric_unit}")
print(f"Standardized: {company.standardized_metric_value} {company.standardized_metric_unit}")
print("----------------------\n")
# 2. Instantiate the service
classifier = ClassificationService()
# 3. RUN THE CORE LOGIC
# This will now print all the detailed logs we added
updated_company = classifier.classify_company_potential(company, db_session)
# --- POST-ANALYSIS STATE ---
print("\n--- METRICS AFTER ---")
print(f"Industry (AI): {updated_company.industry_ai}")
print(f"Metric Source: {updated_company.metric_source}")
print(f"Proof Text: {updated_company.metric_proof_text}")
print(f"Calculated: {updated_company.calculated_metric_value} {updated_company.calculated_metric_unit}")
print(f"Standardized: {updated_company.standardized_metric_value} {updated_company.standardized_metric_unit}")
print(f"\nOpener 1 (Infra): {updated_company.ai_opener}")
print(f"Opener 2 (Ops): {updated_company.ai_opener_secondary}")
print("---------------------")
logger.info(f"--- Interactive Debug Finished for Company ID: {company_id} ---")
finally:
db_session.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a single company analysis for debugging.")
parser.add_argument("--id", type=int, default=1, help="The ID of the company to analyze.")
args = parser.parse_args()
run_debug_analysis(args.id)

View File

@@ -0,0 +1,67 @@
import requests
import os
import time
import argparse
import sys
import logging
# Add the backend directory to the Python path for relative imports to work
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# --- Configuration ---
def load_env_manual(path):
if not os.path.exists(path):
# print(f"⚠️ Warning: .env file not found at {path}") # Suppress for cleaner output in container
return
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
os.environ.setdefault(key.strip(), val.strip())
# Load .env (assuming it's in /app) - this needs to be run from /app or adjusted
# For docker-compose exec from project root, /app is the container's WORKDIR
load_env_manual('/app/.env')
API_USER = os.getenv("API_USER")
API_PASS = os.getenv("API_PASSWORD")
# When run INSIDE the container, the service is reachable via localhost
CE_URL = "http://localhost:8000"
ANALYZE_ENDPOINT = f"{CE_URL}/api/enrich/analyze"
def trigger_analysis(company_id: int):
print("="*60)
print(f"🚀 Triggering REAL analysis for Company ID: {company_id}")
print("="*60)
payload = {"company_id": company_id}
try:
# Added logging for API user/pass (debug only, remove in prod)
logger.debug(f"API Call to {ANALYZE_ENDPOINT} with user {API_USER}")
response = requests.post(ANALYZE_ENDPOINT, json=payload, auth=(API_USER, API_PASS), timeout=30) # Increased timeout
if response.status_code == 200 and response.json().get("status") == "queued":
print(" ✅ SUCCESS: Analysis task has been queued on the server.")
print(" The result will be available in the database and UI shortly.")
return True
else:
print(f" ❌ FAILURE: Server responded with status {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ FATAL: Could not connect to the server: {e}")
return False
if __name__ == "__main__":
# Add a basic logger to the script itself for clearer output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description="Trigger Company Explorer Analysis Task")
parser.add_argument("--company-id", type=int, required=True, help="ID of the company to analyze")
args = parser.parse_args()
trigger_analysis(args.company_id)

View File

@@ -5,7 +5,7 @@ import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
@@ -19,9 +19,12 @@ class ClassificationService:
pass
def _load_industry_definitions(self, db: Session) -> List[Industry]:
industries = db.query(Industry).all()
industries = db.query(Industry).options(
joinedload(Industry.primary_category),
joinedload(Industry.secondary_category)
).all()
if not industries:
logger.warning("No industry definitions found in DB. Classification might be limited.")
logger.warning("No industry definitions found in DB.")
return industries
def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[Dict[str, Any]]:
@@ -49,18 +52,11 @@ Return ONLY the exact name of the industry.
try:
response = call_gemini_flash(prompt)
if not response: return "Others"
cleaned = response.strip().replace('"', '').replace("'", "")
# Simple fuzzy match check
valid_names = [i['name'] for i in industry_definitions] + ["Others"]
if cleaned in valid_names:
return cleaned
# Fallback: Try to find name in response
if cleaned in valid_names: return cleaned
for name in valid_names:
if name in cleaned:
return name
if name in cleaned: return name
return "Others"
except Exception as e:
logger.error(f"Classification Prompt Error: {e}")
@@ -79,23 +75,20 @@ Return a JSON object with:
- "raw_unit": The unit found (e.g. "Betten", "").
- "proof_text": A short quote from the text proving this value.
**IMPORTANT:** Ignore obvious year numbers (like 1900-2026) if other, more plausible metric values are present in the text. Focus on the target metric.
JSON ONLY.
"""
try:
response = call_gemini_flash(prompt, json_mode=True)
if not response: return None
if isinstance(response, str):
response = response.replace("```json", "").replace("```", "").strip()
data = json.loads(response)
try:
data = json.loads(response.replace("```json", "").replace("```", "").strip())
except: return None
else:
data = response
# Basic cleanup
if isinstance(data, list) and data: data = data[0]
if not isinstance(data, dict): return None
if data.get("raw_value") == "null": data["raw_value"] = None
return data
except Exception as e:
logger.error(f"LLM Extraction Parse Error: {e}")
@@ -103,38 +96,37 @@ JSON ONLY.
def _is_metric_plausible(self, metric_name: str, value: Optional[float]) -> bool:
if value is None: return False
try:
val_float = float(value)
return val_float > 0
except:
return False
try: return float(value) > 0
except: return False
def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]:
if not formula or raw_value is None:
return None
formula_cleaned = formula.replace("wert", str(raw_value)).replace("Value", str(raw_value)).replace("Wert", str(raw_value))
formula_cleaned = re.sub(r'(?i)m[²2]', '', formula_cleaned)
formula_cleaned = re.sub(r'(?i)qm', '', formula_cleaned)
formula_cleaned = re.sub(r'\s*\(.*\)\s*$', '', formula_cleaned).strip()
if not formula or raw_value is None: return None
# Clean formula: remove anything in parentheses first (often units or comments)
clean_formula = re.sub(r'\(.*?\)', '', formula.lower())
# Replace 'wert' with the actual value
expression = clean_formula.replace("wert", str(raw_value))
# Remove any non-math characters
expression = re.sub(r'[^0-9\.\+\-\*\/]', '', expression)
try:
return safe_eval_math(formula_cleaned)
return safe_eval_math(expression)
except Exception as e:
logger.error(f"Failed to parse standardization logic '{formula}' with value {raw_value}: {e}")
logger.error(f"Failed to parse logic '{formula}' with value {raw_value}: {e}")
return None
def _get_best_metric_result(self, results_list: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not results_list:
return None
if not results_list: return None
source_priority = {"wikipedia": 0, "website": 1, "serpapi": 2}
valid_results = [r for r in results_list if r.get("calculated_metric_value") is not None]
if not valid_results:
return None
valid_results.sort(key=lambda r: (source_priority.get(r.get("metric_source"), 99), -r.get("metric_confidence", 0.0)))
logger.info(f"Best result chosen: {valid_results[0]}")
if not valid_results: return None
valid_results.sort(key=lambda r: source_priority.get(r.get("metric_source"), 99))
return valid_results[0]
def _get_website_content_and_url(self, company: Company) -> Tuple[Optional[str], Optional[str]]:
return scrape_website_content(company.website), company.website
def _get_website_content_and_url(self, db: Session, company: Company) -> Tuple[Optional[str], Optional[str]]:
enrichment = db.query(EnrichmentData).filter_by(company_id=company.id, source_type="website_scrape").order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "raw_text" in enrichment.content:
return enrichment.content["raw_text"], company.website
content = scrape_website_content(company.website)
return content, company.website
def _get_wikipedia_content_and_url(self, db: Session, company_id: int) -> Tuple[Optional[str], Optional[str]]:
wiki_data = self._get_wikipedia_content(db, company_id)
@@ -142,219 +134,135 @@ JSON ONLY.
def _get_serpapi_content_and_url(self, company: Company, search_term: str) -> Tuple[Optional[str], Optional[str]]:
serp_results = run_serp_search(f"{company.name} {company.city or ''} {search_term}")
if not serp_results:
return None, None
if not serp_results: return None, None
content = " ".join([res.get("snippet", "") for res in serp_results.get("organic_results", [])])
url = serp_results.get("organic_results", [{}])[0].get("link") if serp_results.get("organic_results") else None
return content, url
def _extract_and_calculate_metric_cascade(self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str]) -> Dict[str, Any]:
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "metric_proof_text": None, "metric_source_url": None, "metric_confidence": 0.0, "metric_confidence_reason": "No value found in any source."}
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "proof_text": None, "metric_source_url": None}
sources = [
("website", self._get_website_content_and_url),
("wikipedia", self._get_wikipedia_content_and_url),
("serpapi", self._get_serpapi_content_and_url)
("website", lambda: self._get_website_content_and_url(db, company)),
("wikipedia", lambda: self._get_wikipedia_content_and_url(db, company.id)),
("serpapi", lambda: self._get_serpapi_content_and_url(company, search_term))
]
all_source_results = []
parser = MetricParser()
for source_name, content_loader in sources:
logger.info(f"Checking {source_name} for '{search_term}' for {company.name}")
logger.info(f" -> Checking source: [{source_name.upper()}] for '{search_term}'")
try:
args = (company,) if source_name == 'website' else (db, company.id) if source_name == 'wikipedia' else (company, search_term)
content_text, current_source_url = content_loader(*args)
if not content_text or len(content_text) < 100:
logger.info(f"No or insufficient content for {source_name} (Length: {len(content_text) if content_text else 0}).")
continue
content_text, current_source_url = content_loader()
if not content_text or len(content_text) < 100: continue
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
if llm_result:
llm_result['source_url'] = current_source_url
all_source_results.append((source_name, llm_result))
except Exception as e:
logger.error(f"Error in {source_name} stage: {e}")
if llm_result and llm_result.get("proof_text"):
# Use the robust parser on the LLM's proof text or raw_value
hint = llm_result.get("raw_value") or llm_result.get("proof_text")
parsed_value = parser.extract_numeric_value(text=content_text, expected_value=str(hint))
if parsed_value is not None:
llm_result.update({"calculated_metric_value": parsed_value, "calculated_metric_unit": llm_result.get('raw_unit'), "metric_source": source_name, "metric_source_url": current_source_url})
all_source_results.append(llm_result)
except Exception as e: logger.error(f" -> Error in {source_name} stage: {e}")
processed_results = []
for source_name, llm_result in all_source_results:
metric_value = llm_result.get("raw_value")
metric_unit = llm_result.get("raw_unit")
if metric_value is not None and self._is_metric_plausible(search_term, metric_value):
standardized_value = None
if standardization_logic and metric_value is not None:
standardized_value = self._parse_standardization_logic(standardization_logic, metric_value)
processed_results.append({
"calculated_metric_name": search_term,
"calculated_metric_value": metric_value,
"calculated_metric_unit": metric_unit,
"standardized_metric_value": standardized_value,
"standardized_metric_unit": standardized_unit,
"metric_source": source_name,
"metric_proof_text": llm_result.get("proof_text"),
"metric_source_url": llm_result.get("source_url"),
"metric_confidence": 0.95,
"metric_confidence_reason": "Value found and extracted by LLM."
})
else:
logger.info(f"LLM found no plausible metric for {search_term} in {source_name}.")
best_result = self._get_best_metric_result(processed_results)
return best_result if best_result else final_result
best_result = self._get_best_metric_result(all_source_results)
if not best_result: return final_result
final_result.update(best_result)
if self._is_metric_plausible(search_term, final_result['calculated_metric_value']):
final_result['standardized_metric_value'] = self._parse_standardization_logic(standardization_logic, final_result['calculated_metric_value'])
return final_result
def extract_metrics_for_industry(self, company: Company, db: Session, industry: Industry) -> Company:
if not industry or not industry.scraper_search_term:
logger.warning(f"No metric configuration for industry '{industry.name if industry else 'None'}'")
return company
# Improved unit derivation
if "" in (industry.standardization_logic or "") or "" in (industry.scraper_search_term or ""):
std_unit = ""
else:
std_unit = "Einheiten"
metrics = self._extract_and_calculate_metric_cascade(
db, company, industry.name, industry.scraper_search_term, industry.standardization_logic, std_unit
)
company.calculated_metric_name = metrics["calculated_metric_name"]
company.calculated_metric_value = metrics["calculated_metric_value"]
company.calculated_metric_unit = metrics["calculated_metric_unit"]
company.standardized_metric_value = metrics["standardized_metric_value"]
company.standardized_metric_unit = metrics["standardized_metric_unit"]
company.metric_source = metrics["metric_source"]
company.metric_proof_text = metrics["metric_proof_text"]
company.metric_source_url = metrics.get("metric_source_url")
company.metric_confidence = metrics["metric_confidence"]
company.metric_confidence_reason = metrics["metric_confidence_reason"]
company.last_classification_at = datetime.utcnow()
# REMOVED: db.commit() - This should be handled by the calling function.
return company
def _find_direct_area(self, db: Session, company: Company, industry_name: str) -> Optional[Dict[str, Any]]:
logger.info(" -> (Helper) Running specific search for 'Fläche'...")
area_metrics = self._extract_and_calculate_metric_cascade(db, company, industry_name, search_term="Fläche", standardization_logic=None, standardized_unit="")
if area_metrics and area_metrics.get("calculated_metric_value") is not None:
unit = area_metrics.get("calculated_metric_unit", "").lower()
if any(u in unit for u in ["", "qm", "quadratmeter"]):
logger.info(" ✅ SUCCESS: Found direct area value.")
area_metrics['standardized_metric_value'] = area_metrics['calculated_metric_value']
return area_metrics
return None
def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company:
logger.info(f"Re-evaluating metric for {company.name}...")
return self.extract_metrics_for_industry(company, db, industry)
def _generate_marketing_opener(self, company: Company, industry: Industry, website_text: str, focus_mode: str = "primary") -> Optional[str]:
if not industry: return None
# 1. Determine Context & Pains/Gains
product_context = industry.primary_category.name if industry.primary_category else "Robotik-Lösungen"
raw_pains = industry.pains or ""
# Split pains/gains based on markers
def extract_segment(text, marker):
if not text: return ""
segments = re.split(r'\[(.*?)\]', text)
for i in range(1, len(segments), 2):
if marker.lower() in segments[i].lower():
return segments[i+1].strip()
return text # Fallback to full text if no markers found
def _generate_marketing_opener(self, company_name: str, website_text: str, industry_name: str, industry_pains: str, focus_mode: str = "primary") -> Optional[str]:
"""
Generates the 'First Sentence' (Opener).
focus_mode: 'primary' (Standard/Cleaning) or 'secondary' (Service/Logistics).
"""
if not industry_pains:
industry_pains = "Effizienz und Personalmangel" # Fallback
# Dynamic Focus Instruction
if focus_mode == "secondary":
focus_instruction = """
- **FOKUS: SEKUNDÄR-PROZESSE (Logistik/Service/Versorgung).**
- Ignoriere das Thema Reinigung. Konzentriere dich auf **Abläufe, Materialfluss, Entlastung von Fachkräften** oder **Gäste-Service**.
- Der Satz muss einen operativen Entscheider (z.B. Pflegedienstleitung, Produktionsleiter) abholen."""
else:
focus_instruction = """
- **FOKUS: PRIMÄR-PROZESSE (Infrastruktur/Sauberkeit/Sicherheit).**
- Konzentriere dich auf Anforderungen an das Facility Management, Hygiene, Außenwirkung oder Arbeitssicherheit.
- Der Satz muss einen Infrastruktur-Entscheider (z.B. FM-Leiter, Geschäftsführer) abholen."""
relevant_pains = extract_segment(raw_pains, "Primary Product")
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
product_context = industry.secondary_category.name
relevant_pains = extract_segment(raw_pains, "Secondary Product")
prompt = f"""
Du bist ein exzellenter B2B-Stratege und Texter.
Deine Aufgabe ist es, einen hochpersonalisierten Einleitungssatz für eine E-Mail an ein potenzielles Kundenunternehmen zu formulieren.
Du bist ein exzellenter B2B-Stratege und Texter. Formuliere einen hochpersonalisierten Einleitungssatz (1-2 Sätze).
Unternehmen: {company.name}
Branche: {industry.name}
Fokus: {focus_mode.upper()}
Herausforderungen: {relevant_pains}
Kontext: {website_text[:2500]}
--- KONTEXT ---
Zielunternehmen: {company_name}
Branche: {industry_name}
Operative Herausforderung (Pain): "{industry_pains}"
Webseiten-Kontext:
{website_text[:2500]}
--- Denkprozess & Stilvorgaben ---
1. **Analysiere den Kontext:** Verstehe das Kerngeschäft.
2. **Identifiziere den Hebel:** Was ist der Erfolgsfaktor in Bezug auf den FOKUS?
3. **Formuliere den Satz (ca. 20-35 Wörter):**
- Wähle einen eleganten, aktiven Einstieg.
- Verbinde die **Tätigkeit** mit dem **Hebel** und den **Konsequenzen**.
- **WICHTIG:** Formuliere als positive Beobachtung über eine Kernkompetenz.
- **VERMEIDE:** Konkrete Zahlen.
- Verwende den Firmennamen: {company_name}.
{focus_instruction}
--- Deine Ausgabe ---
Gib NUR den finalen Satz aus. Keine Anführungszeichen.
REGEL: Nenne NICHT das Produkt "{product_context}". Fokussiere dich NUR auf die Herausforderung.
AUSGABE: NUR den fertigen Satz.
"""
try:
response = call_gemini_flash(prompt)
if response:
return response.strip().strip('"')
return None
return response.strip().strip('"') if response else None
except Exception as e:
logger.error(f"Opener Generation Error: {e}")
logger.error(f"Opener Error: {e}")
return None
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"Starting classification for {company.name}...")
# 1. Load Definitions
logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
industries = self._load_industry_definitions(db)
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
logger.debug(f"Loaded {len(industries)} industry definitions.")
# 2. Get Content (Website)
website_content, _ = self._get_website_content_and_url(company)
website_content, _ = self._get_website_content_and_url(db, company)
if not website_content or len(website_content) < 100:
logger.warning(f"No or insufficient website content for {company.name} (Length: {len(website_content) if website_content else 0}). Skipping classification.")
company.status = "ENRICH_FAILED"
db.commit()
return company
logger.debug(f"Website content length for classification: {len(website_content)}")
# 3. Classify Industry
logger.info(f"Running LLM classification prompt for {company.name}...")
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
logger.info(f"AI suggests industry: {suggested_industry_name}")
# 4. Update Company & Generate Openers
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
if not matched_industry:
company.industry_ai = "Others"
db.commit()
return company
if matched_industry:
company.industry_ai = matched_industry.name
logger.info(f"Matched company to industry: {matched_industry.name}")
# --- Generate PRIMARY Opener (Infrastructure/Cleaning) ---
logger.info(f"Generating PRIMARY opener for {company.name}...")
op_prim = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "primary"
)
if op_prim:
company.ai_opener = op_prim
logger.info(f"Opener (Primary) generated and set.")
else:
logger.warning(f"Failed to generate PRIMARY opener for {company.name}.")
company.industry_ai = matched_industry.name
logger.info(f"✅ Industry: {matched_industry.name}")
# --- Generate SECONDARY Opener (Service/Logistics) ---
logger.info(f"Generating SECONDARY opener for {company.name}...")
op_sec = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "secondary"
)
if op_sec:
company.ai_opener_secondary = op_sec
logger.info(f"Opener (Secondary) generated and set.")
else:
logger.warning(f"Failed to generate SECONDARY opener for {company.name}.")
else:
company.industry_ai = "Others"
logger.warning(f"No specific industry matched for {company.name}. Set to 'Others'.")
# 5. Extract Metrics (Cascade)
if matched_industry:
logger.info(f"Extracting metrics for {company.name} and industry {matched_industry.name}...")
try:
self.extract_metrics_for_industry(company, db, matched_industry)
logger.info(f"Metric extraction completed for {company.name}.")
except Exception as e:
logger.error(f"Error during metric extraction for {company.name}: {e}", exc_info=True)
else:
logger.warning(f"Skipping metric extraction for {company.name} as no specific industry was matched.")
metrics = self._find_direct_area(db, company, matched_industry.name)
if not metrics:
logger.info(" -> No direct area. Trying proxy...")
if matched_industry.scraper_search_term:
metrics = self._extract_and_calculate_metric_cascade(db, company, matched_industry.name, search_term=matched_industry.scraper_search_term, standardization_logic=matched_industry.standardization_logic, standardized_unit="")
if metrics and metrics.get("calculated_metric_value"):
logger.info(f" ✅ SUCCESS: {metrics.get('calculated_metric_value')} {metrics.get('calculated_metric_unit')}")
company.calculated_metric_name = metrics.get("calculated_metric_name", matched_industry.scraper_search_term or "Fläche")
company.calculated_metric_value = metrics.get("calculated_metric_value")
company.calculated_metric_unit = metrics.get("calculated_metric_unit")
company.standardized_metric_value = metrics.get("standardized_metric_value")
company.standardized_metric_unit = metrics.get("standardized_metric_unit")
company.metric_source = metrics.get("metric_source")
company.metric_proof_text = metrics.get("proof_text")
company.metric_source_url = metrics.get("metric_source_url")
company.metric_confidence = 0.8
company.metric_confidence_reason = "Metric processed."
company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_content, "primary")
company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_content, "secondary")
company.last_classification_at = datetime.utcnow()
company.status = "ENRICHED"
db.commit()
logger.info(f"Classification and enrichment for {company.name} completed and committed.")
logger.info(f"--- ✅ Analysis Finished for {company.name} ---")
return company

View File

@@ -0,0 +1,82 @@
import unittest
import os
import sys
from unittest.mock import MagicMock, patch
# Adjust path to allow importing from backend
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from backend.services.classification import ClassificationService
from backend.database import Company, Industry, RoboticsCategory, Session
class TestHospitalMetricFinal(unittest.TestCase):
def setUp(self):
self.service = ClassificationService()
self.mock_db = MagicMock(spec=Session)
self.mock_company = Company(id=8, name="Klinikum Landkreis Erding")
self.mock_industry_hospital = Industry(
id=1,
name="Healthcare - Hospital",
scraper_search_term="Anzahl Betten",
standardization_logic="wert * 100",
primary_category=RoboticsCategory(name="Reinigungsroboter"),
secondary_category=RoboticsCategory(name="Serviceroboter"),
)
self.mock_website_content = "Ein langer Text, der die 100-Zeichen-Prüfung besteht."
@patch('backend.services.classification.ClassificationService._generate_marketing_opener')
@patch('backend.services.classification.ClassificationService._extract_and_calculate_metric_cascade')
@patch('backend.services.classification.ClassificationService._find_direct_area')
@patch('backend.services.classification.ClassificationService._run_llm_classification_prompt')
@patch('backend.services.classification.ClassificationService._get_website_content_and_url')
@patch('backend.services.classification.ClassificationService._load_industry_definitions')
def test_final_hospital_logic(
self,
mock_load_industries,
mock_get_website,
mock_classify,
mock_find_direct_area,
mock_extract_cascade,
mock_generate_opener
):
print("\n--- Running Final Hospital Logic Test ---")
# --- MOCK SETUP ---
mock_load_industries.return_value = [self.mock_industry_hospital]
mock_get_website.return_value = (self.mock_website_content, "http://mock.com")
mock_classify.return_value = "Healthcare - Hospital"
mock_find_direct_area.return_value = None # STAGE 1 MUST FAIL
proxy_metric_result = {
"calculated_metric_name": "Anzahl Betten",
"calculated_metric_value": 352.0,
"calculated_metric_unit": "Betten",
"standardized_metric_value": 35200.0,
"standardized_metric_unit": "",
"metric_source": "wikipedia",
}
mock_extract_cascade.return_value = proxy_metric_result
mock_generate_opener.side_effect = ["Primary Opener", "Secondary Opener"]
# --- EXECUTION ---
updated_company = self.service.classify_company_potential(self.mock_company, self.mock_db)
# --- ASSERTIONS ---
mock_find_direct_area.assert_called_once()
mock_extract_cascade.assert_called_once()
self.assertEqual(updated_company.calculated_metric_name, "Anzahl Betten")
self.assertEqual(updated_company.calculated_metric_value, 352.0)
self.assertEqual(updated_company.standardized_metric_value, 35200.0)
print(" ✅ Metrics from Stage 2 correctly applied.")
self.assertEqual(updated_company.ai_opener, "Primary Opener")
self.assertEqual(updated_company.ai_opener_secondary, "Secondary Opener")
print(" ✅ Openers correctly applied.")
print("\n--- ✅ PASSED: Final Hospital Logic Test. ---")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,49 +0,0 @@
import requests
import os
import time
# --- Configuration ---
def load_env_manual(path):
if not os.path.exists(path):
print(f"⚠️ Warning: .env file not found at {path}")
return
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
os.environ.setdefault(key.strip(), val.strip())
load_env_manual('/app/.env')
API_USER = os.getenv("API_USER")
API_PASS = os.getenv("API_PASSWORD")
CE_URL = "http://127.0.0.1:8000"
ANALYZE_ENDPOINT = f"{CE_URL}/api/enrich/analyze"
COMPANY_ID_TO_ANALYZE = 1 # Therme Erding
def trigger_analysis():
print("="*60)
print(f"🚀 Triggering REAL analysis for Company ID: {COMPANY_ID_TO_ANALYZE}")
print("="*60)
payload = {"company_id": COMPANY_ID_TO_ANALYZE}
try:
response = requests.post(ANALYZE_ENDPOINT, json=payload, auth=(API_USER, API_PASS), timeout=10)
if response.status_code == 200 and response.json().get("status") == "queued":
print(" ✅ SUCCESS: Analysis task has been queued on the server.")
print(" The result will be available in the database and UI shortly.")
return True
else:
print(f" ❌ FAILURE: Server responded with status {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ FATAL: Could not connect to the server: {e}")
return False
if __name__ == "__main__":
trigger_analysis()