[2f988f42] fix(company-explorer): Implement robust quantitative potential and atomic opener generation\n\n- Refactored ClassificationService for two-stage metric extraction (direct area and proxy).- Enhanced MetricParser for targeted value matching and robust number parsing.- Implemented persona-specific 'Atomic Opener' generation using segmented pains.- Fixed logging configuration and Pydantic response models.- Added dedicated debugging script and updated documentation (GEMINI.md, MIGRATION_PLAN.md).

This commit is contained in:
2026-02-21 08:01:07 +00:00
parent a283884afe
commit dce995b24f
13 changed files with 666 additions and 534 deletions

View File

@@ -1 +1 @@
{"task_id": "2ff88f42-8544-8018-883f-e8837c0421af", "token": "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", "session_start_time": "2026-02-20T13:24:58.251700"} {"task_id": "2f988f42-8544-8100-9dba-e69ee2376730", "token": "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", "session_start_time": "2026-02-21T04:23:04.206814"}

View File

@@ -173,3 +173,44 @@ Since the "Golden Record" for Industry Verticals (Pains, Gains, Products) reside
**Troubleshooting:** **Troubleshooting:**
* **"BaseModel" Error:** Usually a mix-up between Pydantic and SQLAlchemy `Base`. Check imports in `database.py`. * **"BaseModel" Error:** Usually a mix-up between Pydantic and SQLAlchemy `Base`. Check imports in `database.py`.
* **Missing Dependencies:** The CLI agent runs in `/app` but not necessarily inside the container's venv. Use standard tools (`grep`, `sqlite3`) where possible. * **Missing Dependencies:** The CLI agent runs in `/app` but not necessarily inside the container's venv. Use standard tools (`grep`, `sqlite3`) where possible.
---
## Critical Debugging Session (Feb 21, 2026) - Re-Stabilizing the Analysis Engine
A critical session was required to fix a series of cascading failures in the `ClassificationService`. The key takeaways are documented here to prevent future issues.
1. **The "Phantom" `NameError`:**
* **Symptom:** The application crashed with a `NameError: name 'joinedload' is not defined`, even though the import was correctly added to `classification.py`.
* **Root Cause:** The `uvicorn` server's hot-reload mechanism within the Docker container did not reliably pick up file changes made from outside the container. A simple `docker-compose restart` was insufficient to clear the process's cached state.
* **Solution:** After any significant code change, especially to imports or core logic, a forced-recreation of the container is **mandatory**.
```bash
# Correct Way to Apply Changes:
docker-compose up -d --build --force-recreate company-explorer
```
2. **The "Invisible" Logs:**
* **Symptom:** No debug logs were being written, making it impossible to trace the execution flow.
* **Root Cause:** The `LOG_DIR` path in `/company-explorer/backend/config.py` was misconfigured (`/app/logs_debug`) and did not point to the actual, historical log directory (`/app/Log_from_docker`).
* **Solution:** Configuration paths must be treated as absolute and verified. Correcting the `LOG_DIR` path immediately resolved the issue.
3. **Inefficient Debugging Loop:**
* **Symptom:** The cycle of triggering a background job via API, waiting, and then manually checking logs was slow and inefficient.
* **Root Cause:** Lack of a tool to test the core application logic in isolation.
* **Solution:** The creation of a dedicated, interactive test script (`/company-explorer/backend/scripts/debug_single_company.py`). This script allows running the entire analysis for a single company in the foreground, providing immediate and detailed feedback. This pattern is invaluable for complex, multi-step processes and should be a standard for future development.
--- End of Context from: GEMINI.md ---Here are the available functions:
[
"list_directory",
"read_file",
"search_file_content",
"glob",
"activate_skill",
"replace",
"write_file",
"web_fetch",
"run_shell_command",
"save_memory",
"google_web_search",
"write_todos",
"delegate_to_agent"
]

0
Generating Normal file
View File

View File

@@ -159,3 +159,98 @@ Anweisungen für den "Bautrupp" (Gemini CLI).
* **Pfad:** `/volume1/homes/Floke/python/brancheneinstufung/company-explorer` * **Pfad:** `/volume1/homes/Floke/python/brancheneinstufung/company-explorer`
* **DB:** `/app/companies_v3_fixed_2.db` * **DB:** `/app/companies_v3_fixed_2.db`
* **Sync:** `docker exec -it company-explorer python backend/scripts/sync_notion_to_ce_enhanced.py` * **Sync:** `docker exec -it company-explorer python backend/scripts/sync_notion_to_ce_enhanced.py`
---
## 17. Analyse-Logik v3.0 (Feb 2026): Quantitative Potenzialanalyse & "Atomic Opener"
Nach mehreren instabilen Iterationen wurde die Kernlogik des `ClassificationService` finalisiert. Dieser Abschnitt dient als "Single Source of Truth", um zukünftige Fehlentwicklungen zu vermeiden.
### 17.1 Das Gesamtbild: Vom Content zur fertigen Analyse
Der Prozess ist streng sequenziell und baut aufeinander auf.
```
1. Branchen-Klassifizierung
|
-> Erkannte Branche: "Healthcare - Hospital"
|
2. Quantitative Potenzialanalyse (Zweistufige Kaskade)
|
--> 2a. Stufe 1: Direkte Flächensuche ("Fläche in m²")
| |
| --> Ergebnis: FEHLSCHLAG
|
--> 2b. Stufe 2: Branchenspezifische Proxy-Suche
|
--> Suchbegriff (aus Branche): "Anzahl Betten"
--> Formel (aus Branche): "wert * 100"
|
-> Ergebnis: 250 Betten -> 25000 m²
|
3. "Atomic Opener" Generierung (Zwei getrennte Personas)
|
--> 3a. Opener 1 (Primär): Fokus auf Infrastruktur-Entscheider
| |
| --> Produkt-Kontext: Nassreinigungsroboter (Primärprodukt)
| --> Pain-Kontext: Hygiene-Audits, Keimbelastung
|
--> 3b. Opener 2 (Sekundär): Fokus auf Operativen Entscheider
|
--> Produkt-Kontext: Serviceroboter (Sekundärprodukt, da "ops_focus_secondary" aktiv)
|
--> Pain-Kontext: Personalmangel, Entlastung der Pflegekräfte
|
4. FINALES COMMIT
```
### 17.2 Quantitative Potenzialanalyse im Detail
**Ziel:** Für jedes Unternehmen einen `standardized_metric_value` in `m²` zu ermitteln.
* **Stufe 1: Direkte Flächensuche (Direct Hit)**
* Das System sucht **immer** zuerst nach direkten Flächenangaben (Keywords: "Fläche", "m²", "Quadratmeter").
* Findet der `MetricParser` einen plausiblen Wert, wird dieser direkt in `standardized_metric_value` geschrieben und der Prozess ist für diese Stufe beendet. `calculated_metric_value` ist in diesem Fall identisch.
* **Stufe 2: Proxy-Metrik-Suche (Fallback)**
* **Nur wenn Stufe 1 fehlschlägt**, wird die branchenspezifische Logik aus den `industries`-Settings angewendet.
* **Suchbegriff:** `scraper_search_term` (z.B. "Anzahl Betten", "Anzahl Passagiere").
* **Extraktion:** Der `MetricParser` extrahiert den Rohwert (z.B. `250`). Dieser wird in `calculated_metric_value` gespeichert.
* **Standardisierung:** Die Formel aus `standardization_logic` (z.B. `wert * 100`) wird auf den Rohwert angewendet. Das Ergebnis wird in `standardized_metric_value` geschrieben.
### 17.3 "Atomic Opener" Generierung im Detail
**Ziel:** Zwei hoch-personalisierte, schlagkräftige Einleitungssätze (1-2 Sätze) zu generieren, die eine operative Herausforderung implizieren, ohne die Lösung zu nennen.
* **Zwei getrennte Kontexte:** Es werden zwei Sätze für zwei Personas generiert:
1. **`ai_opener` (Primär):** Zielt auf den **Infrastruktur-Entscheider** (z.B. Facility Manager, Technischer Leiter).
2. **`ai_opener_secondary` (Sekundär):** Zielt auf den **Operativen Entscheider** (z.B. Produktionsleiter, Pflegedienstleitung).
* **Persona-spezifische Produktauswahl:**
* Der primäre Opener (Infrastruktur) bezieht sich **immer** auf das `primary_category` der Branche.
* Der sekundäre Opener (Operativ) bezieht sich:
* Standardmäßig ebenfalls auf das `primary_category`.
* **Ausnahme:** Wenn in der Branche `ops_focus_secondary = True` gesetzt ist, bezieht er sich auf das `secondary_category`.
* **Der "1komma5°"-Prompt:**
* Die Generierung nutzt einen bewährten Prompt, der das Sprachmodell anweist, das Geschäftsmodell des Unternehmens zu analysieren und eine wertschätzende Beobachtung zu formulieren.
* **"Munition":** Der Prompt wird dynamisch mit den hoch-spezifischen, vordefinierten `pains` und `gains` aus der jeweiligen Branche angereichert.
* **Regel:** Das Produkt selbst wird **nicht** im Opener genannt. Der Satz fokussiert sich rein auf die Formulierung der Herausforderung. Die Auflösung erfolgt in den nachfolgenden, persona-spezifischen Textbausteinen.
### 17.4 Debugging & Lessons Learned (Feb 21, 2026)
Die Implementierung der v3.0-Logik war von mehreren hartnäckigen Problemen geprägt, deren Behebung wichtige Erkenntnisse für die zukünftige Entwicklung lieferte.
1. **"Phantom" `NameError` für `joinedload`:**
* **Problem:** Trotz korrekter `import`-Anweisung wurde ein `NameError` ausgelöst.
* **Lösung:** Ein erzwungener Neustart des Containers (`--force-recreate`) ist nach kritischen Code-Änderungen (besonders Imports) unerlässlich.
2. **Die "Krankenhaus-Schlacht" (Proxy-Metriken & Parser-Interferenz):**
* **Problem:** Bei Kliniken wurde oft der Wert "100" extrahiert (aus "100%ige Trägerschaft"), anstatt der korrekten Bettenanzahl. Zudem scheiterte die Standardisierung an Resten von Einheiten in der Formel (z.B. `wert * 100 (m²)`).
* **Lösung 1 (Targeted Matching):** Der `MetricParser` wurde so umgebaut, dass er einen "Hint" (erwarteter Wert vom LLM) priorisiert. Er sucht nun im Volltext exakt nach der Ziffernfolge, die das LLM identifiziert hat, und ignoriert alle anderen plausiblen Zahlen.
* **Lösung 2 (Aggressive Formula Cleaning):** Die `_parse_standardization_logic` entfernt nun konsequent alles in Klammern und alle Nicht-Rechenzeichen, bevor sie `safe_eval_math` aufruft. Dies verhindert `SyntaxError` durch Datenbank-Reste.
3. **Persona-spezifische Pains:**
* **Erkenntnis:** Damit die Opener wirklich zwischen Infrastruktur und Betrieb unterscheiden, müssen die `pains` in der Datenbank mit Markern wie `[Primary Product]` und `[Secondary Product]` versehen werden. Die Logik wurde entsprechend angepasst, um diese Segmente gezielt zu extrahieren.
Diese Punkte unterstreichen die Notwendigkeit von robusten Deployment-Prozessen, aggressiver Datenbereinigung und der Schaffung von dedizierten Test-Tools zur Isolierung komplexer Anwendungslogik.

View File

@@ -32,7 +32,7 @@ setup_logging()
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRoleMapping, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .services.deduplication import Deduplicator from .services.deduplication import Deduplicator
from .services.discovery import DiscoveryService from .services.discovery import DiscoveryService
from .services.scraping import ScraperService from .services.scraping import ScraperService
@@ -101,6 +101,71 @@ class ProvisioningResponse(BaseModel):
opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics) opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics)
texts: Dict[str, Optional[str]] = {} texts: Dict[str, Optional[str]] = {}
class IndustryDetails(BaseModel):
pains: Optional[str] = None
gains: Optional[str] = None
priority: Optional[str] = None
notes: Optional[str] = None
ops_focus_secondary: bool = False
class Config:
from_attributes = True
class ContactResponse(BaseModel):
id: int
first_name: Optional[str] = None
last_name: Optional[str] = None
job_title: Optional[str] = None
role: Optional[str] = None
email: Optional[str] = None
is_primary: bool
class Config:
from_attributes = True
class EnrichmentDataResponse(BaseModel):
id: int
source_type: str
content: Dict[str, Any]
is_locked: bool
wiki_verified_empty: bool
updated_at: datetime
class Config:
from_attributes = True
class CompanyDetailsResponse(BaseModel):
id: int
name: str
website: Optional[str] = None
city: Optional[str] = None
country: Optional[str] = None
industry_ai: Optional[str] = None
status: str
# Metrics
calculated_metric_name: Optional[str] = None
calculated_metric_value: Optional[float] = None
calculated_metric_unit: Optional[str] = None
standardized_metric_value: Optional[float] = None
standardized_metric_unit: Optional[str] = None
metric_source: Optional[str] = None
metric_proof_text: Optional[str] = None
metric_source_url: Optional[str] = None
metric_confidence: Optional[float] = None
# Openers
ai_opener: Optional[str] = None
ai_opener_secondary: Optional[str] = None
# Relations
industry_details: Optional[IndustryDetails] = None
contacts: List[ContactResponse] = []
enrichment_data: List[EnrichmentDataResponse] = []
class Config:
from_attributes = True
# --- Events --- # --- Events ---
@app.on_event("startup") @app.on_event("startup")
def on_startup(): def on_startup():
@@ -336,7 +401,7 @@ def export_companies_csv(db: Session = Depends(get_db), username: str = Depends(
headers={"Content-Disposition": f"attachment; filename=company_export_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"} headers={"Content-Disposition": f"attachment; filename=company_export_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"}
) )
@app.get("/api/companies/{company_id}") @app.get("/api/companies/{company_id}", response_model=CompanyDetailsResponse)
def get_company(company_id: int, db: Session = Depends(get_db), username: str = Depends(authenticate_user)): def get_company(company_id: int, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).options( company = db.query(Company).options(
joinedload(Company.enrichment_data), joinedload(Company.enrichment_data),
@@ -350,28 +415,14 @@ def get_company(company_id: int, db: Session = Depends(get_db), username: str =
if company.industry_ai: if company.industry_ai:
ind = db.query(Industry).filter(Industry.name == company.industry_ai).first() ind = db.query(Industry).filter(Industry.name == company.industry_ai).first()
if ind: if ind:
industry_details = { industry_details = IndustryDetails.model_validate(ind)
"pains": ind.pains,
"gains": ind.gains,
"priority": ind.priority,
"notes": ind.notes,
"ops_focus_secondary": ind.ops_focus_secondary
}
# HACK: Attach to response object (Pydantic would be cleaner, but this works for fast prototyping) # FastAPI will automatically serialize the 'company' ORM object into the
# We convert to dict and append # CompanyDetailsResponse schema. We just need to attach the extra 'industry_details'.
resp = company.__dict__.copy() response_data = CompanyDetailsResponse.model_validate(company)
resp["industry_details"] = industry_details response_data.industry_details = industry_details
# Handle SQLAlchemy internal state
if "_sa_instance_state" in resp: del resp["_sa_instance_state"]
# Handle relationships manually if needed, or let FastAPI encode the SQLAlchemy model + extra dict
# Better: return a custom dict merging both
# Since we use joinedload, relationships are loaded. return response_data
# Let's rely on FastAPI's ability to serialize the object, but we need to inject the extra field.
# The safest way without changing Pydantic schemas everywhere is to return a dict.
return {**resp, "enrichment_data": company.enrichment_data, "contacts": company.contacts, "signals": company.signals}
@app.post("/api/companies") @app.post("/api/companies")
def create_company(company: CompanyCreate, db: Session = Depends(get_db), username: str = Depends(authenticate_user)): def create_company(company: CompanyCreate, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
@@ -797,23 +848,21 @@ def run_analysis_task(company_id: int):
db = SessionLocal() db = SessionLocal()
try: try:
company = db.query(Company).filter(Company.id == company_id).first() company = db.query(Company).filter(Company.id == company_id).first()
if not company: return if not company:
logger.error(f"Analysis Task: Company with ID {company_id} not found.")
return
logger.info(f"Running Analysis Task for {company.name}") logger.info(f"--- [BACKGROUND TASK] Starting for {company.name} ---")
# --- 1. Scrape Website (if not locked) --- # --- 1. Scrape Website (if not locked) ---
# Check for existing scrape data first
existing_scrape = db.query(EnrichmentData).filter( existing_scrape = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company.id, EnrichmentData.company_id == company.id,
EnrichmentData.source_type == "website_scrape" EnrichmentData.source_type == "website_scrape"
).first() ).first()
# If it doesn't exist or is not locked, we perform a scrape
if not existing_scrape or not existing_scrape.is_locked: if not existing_scrape or not existing_scrape.is_locked:
logger.info(f"Scraping website for {company.name}...") logger.info(f"Scraping website for {company.name}...")
scrape_res = scraper.scrape_url(company.website) # Use singleton scrape_res = scraper.scrape_url(company.website)
# Now, either create new or update existing
if not existing_scrape: if not existing_scrape:
db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_res)) db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_res))
logger.info("Created new website_scrape entry.") logger.info("Created new website_scrape entry.")
@@ -825,15 +874,16 @@ def run_analysis_task(company_id: int):
else: else:
logger.info("Website scrape is locked. Skipping.") logger.info("Website scrape is locked. Skipping.")
# 2. Classify Industry & Metrics # --- 2. Classify Industry & Metrics ---
# IMPORTANT: Using the new method name and passing db session logger.info(f"Handing over to ClassificationService for {company.name}...")
classifier.classify_company_potential(company, db) classifier.classify_company_potential(company, db)
company.status = "ENRICHED" company.status = "ENRICHED"
db.commit() db.commit()
logger.info(f"Analysis complete for {company.name}") logger.info(f"--- [BACKGROUND TASK] Successfully finished for {company.name} ---")
except Exception as e: except Exception as e:
logger.error(f"Analyze Task Error: {e}", exc_info=True) logger.critical(f"--- [BACKGROUND TASK] CRITICAL ERROR for Company ID {company_id} ---", exc_info=True)
finally: finally:
db.close() db.close()

View File

@@ -22,7 +22,7 @@ try:
SERP_API_KEY: Optional[str] = None SERP_API_KEY: Optional[str] = None
# Paths # Paths
LOG_DIR: str = "/app/logs_debug" LOG_DIR: str = "/app/Log_from_docker"
class Config: class Config:
env_file = ".env" env_file = ".env"
@@ -40,7 +40,7 @@ except ImportError:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
SERP_API_KEY = os.getenv("SERP_API_KEY") SERP_API_KEY = os.getenv("SERP_API_KEY")
LOG_DIR = "/app/logs_debug" LOG_DIR = "/app/Log_from_docker"
settings = FallbackSettings() settings = FallbackSettings()

View File

@@ -68,6 +68,10 @@ class Company(Base):
metric_source_url = Column(Text, nullable=True) # URL where the proof was found metric_source_url = Column(Text, nullable=True) # URL where the proof was found
metric_confidence = Column(Float, nullable=True) # 0.0 - 1.0 metric_confidence = Column(Float, nullable=True) # 0.0 - 1.0
metric_confidence_reason = Column(Text, nullable=True) # Why is it high/low? metric_confidence_reason = Column(Text, nullable=True) # Why is it high/low?
# NEW: AI-generated Marketing Openers
ai_opener = Column(Text, nullable=True)
ai_opener_secondary = Column(Text, nullable=True)
# Relationships # Relationships
signals = relationship("Signal", back_populates="company", cascade="all, delete-orphan") signals = relationship("Signal", back_populates="company", cascade="all, delete-orphan")

View File

@@ -23,52 +23,43 @@ class MetricParser:
# 1. Pre-cleaning # 1. Pre-cleaning
text_processed = str(text).strip() text_processed = str(text).strip()
logger.info(f"[MetricParser] Processing: '{text_processed}' (Expected: {expected_value})") logger.info(f"[MetricParser] Processing text (len: {len(text_processed)}) (Hint: {expected_value})")
# Optimize: If we have an expected value, try to clean and parse THAT first # Optimize: If we have an expected value (hint), try to find that specific number first
if expected_value: if expected_value:
# Try to parse the LLM's raw value directly first (it's often cleaner: "200000") try:
try: # Clean the hint to get the target digits (e.g. "352" from "352 Betten")
# Remove simple noise from expected value # We only take the FIRST sequence of digits as the target
# Aggressively strip units and text to isolate the number hint_match = re.search(r'[\d\.,\']+', str(expected_value))
clean_expected = str(expected_value).lower() if hint_match:
# Remove common units target_str = hint_match.group(0)
for unit in ['', 'qm', 'sqm', 'mitarbeiter', 'employees', 'eur', 'usd', 'chf', '', '$', '£', '¥']: target_digits = re.sub(r'[^0-9]', '', target_str)
clean_expected = clean_expected.replace(unit, "")
if target_digits:
# Remove multipliers text (we handle multipliers via is_revenue later, but for expected value matching we want the raw number) # Find all numbers in the text and check if they match our target
# Actually, expected_value "2.5 Mio" implies we want to match 2.5 in the text, OR 2500000? all_numbers_in_text = re.findall(r'[\d\.,\']+', text_processed)
# Usually the LLM extract matches the text representation. for num_str in all_numbers_in_text:
clean_expected = clean_expected.replace("mio", "").replace("millionen", "").replace("mrd", "").replace("milliarden", "") if target_digits == re.sub(r'[^0-9]', '', num_str):
clean_expected = clean_expected.replace("tsd", "").replace("tausend", "") # Exact digit match!
val = MetricParser._parse_robust_number(num_str, is_revenue)
# Final cleanup of non-numeric chars (allow . , ' -) if val is not None:
# But preserve structure for robust parser logger.info(f"[MetricParser] Found targeted value via hint: '{num_str}' -> {val}")
clean_expected = clean_expected.replace(" ", "").replace("'", "") return val
except Exception as e:
# If it looks like a clean number already, try parsing it logger.error(f"Error while parsing with hint: {e}")
# But use the robust parser to handle German decimals if present in expected
val = MetricParser._parse_robust_number(clean_expected, is_revenue)
# Check if this value (or a close representation) actually exists in the text
# This prevents hallucination acceptance, but allows the LLM to guide us to the *second* number in a string.
# Simplified check: is the digits sequence present?
# No, better: Let the parser run on the FULL text, find all candidates, and pick the one closest to 'val'.
except:
pass
# Fallback: Classic robust parsing
# Normalize quotes # Normalize quotes
text_processed = text_processed.replace("", "'").replace("", "'") text_processed = text_processed.replace("", "'").replace("", "'")
# 2. Remove noise: Citations [1] and Year/Date in parentheses (2020) # 2. Remove noise: Citations [1] and Year/Date in parentheses (2020)
# We remove everything in parentheses/brackets as it's almost always noise for the metric itself.
text_processed = re.sub(r'\(.*?\)|\[.*?\]', ' ', text_processed).strip() text_processed = re.sub(r'\(.*?\)|\[.*?\]', ' ', text_processed).strip()
# 3. Remove common prefixes and currency symbols # 3. Remove common prefixes and currency symbols
prefixes = [ prefixes = [
r'ca\.?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*', r'ca\.?:?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*',
r'mehr als\s*', r'weniger als\s*', r'bis zu\s*', r'about\s*', r'over\s*', r'mehr als\s*', r'weniger als\s*', r'bis zu\s*', r'about\s*', r'over\s*',
r'approx\.?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?\s*' r'approx\.?:?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?:?\s*'
] ]
currencies = [ currencies = [
r'', r'EUR', r'US\$', r'USD', r'CHF', r'GBP', r'£', r'¥', r'JPY' r'', r'EUR', r'US\$', r'USD', r'CHF', r'GBP', r'£', r'¥', r'JPY'
@@ -79,23 +70,16 @@ class MetricParser:
for c in currencies: for c in currencies:
text_processed = re.sub(f'(?i){c}', '', text_processed).strip() text_processed = re.sub(f'(?i){c}', '', text_processed).strip()
# 4. Remove Range Splitting (was too aggressive, cutting off text after dashes) # 4. Extract Multipliers (Mio, Mrd)
# Old: text_processed = re.split(r'\s*(-||bis|to)\s*', text_processed, 1)[0].strip()
# 5. Extract Multipliers (Mio, Mrd)
multiplier = 1.0 multiplier = 1.0
lower_text = text_processed.lower() lower_text = text_processed.lower()
def has_unit(text, units): def has_unit(text, units):
for u in units: for u in units:
# Escape special chars if any, though mostly alphanumeric here
# Use word boundaries \b for safe matching
if re.search(r'\b' + re.escape(u) + r'\b', text): if re.search(r'\b' + re.escape(u) + r'\b', text):
return True return True
return False return False
# For Revenue, we normalize to Millions (User Rule)
# For others (Employees), we scale to absolute numbers
if is_revenue: if is_revenue:
if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']): if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']):
multiplier = 1000.0 multiplier = 1000.0
@@ -111,214 +95,92 @@ class MetricParser:
elif has_unit(lower_text, ['tsd', 'tausend', 'k']): elif has_unit(lower_text, ['tsd', 'tausend', 'k']):
multiplier = 1000.0 multiplier = 1000.0
# 6. Extract the number candidate # 5. Extract the first valid number candidate
# Loop through matches to find the best candidate (skipping years if possible)
candidates = re.finditer(r'([\d\.,\'\s]+)', text_processed) candidates = re.finditer(r'([\d\.,\'\s]+)', text_processed)
selected_candidate = None for match in candidates:
best_candidate_val = None
matches = [m for m in candidates]
# logger.info(f"DEBUG matches: {[m.group(1) for m in matches]}")
# logger.info(f"DEBUG: Found {len(matches)} matches: {[m.group(1) for m in matches]}")
# Helper to parse a candidate string
def parse_cand(c):
# Extract temporary multiplier for this specific candidate context?
# Complex. For now, we assume the global multiplier applies or we rely on the candidates raw numeric value.
# Actually, simpler: We parse the candidate as is (treating as raw number)
try:
# Remove thousands separators for comparison
c_clean = c.replace("'", "").replace(".", "").replace(" ", "").replace(",", ".") # Rough EN/DE mix
return float(c_clean)
except:
return None
# Parse expected value for comparison
target_val = None
if expected_value:
try:
# Re-apply aggressive cleaning to ensure we have a valid float for comparison
clean_expected = str(expected_value).lower()
for unit in ['', 'qm', 'sqm', 'mitarbeiter', 'employees', 'eur', 'usd', 'chf', '', '$', '£', '¥']:
clean_expected = clean_expected.replace(unit, "")
clean_expected = clean_expected.replace("mio", "").replace("millionen", "").replace("mrd", "").replace("milliarden", "")
clean_expected = clean_expected.replace("tsd", "").replace("tausend", "")
clean_expected = clean_expected.replace(" ", "").replace("'", "")
target_val = MetricParser._parse_robust_number(clean_expected, is_revenue)
except:
pass
for i, match in enumerate(matches):
cand = match.group(1).strip() cand = match.group(1).strip()
if not cand: continue if not cand or not re.search(r'\d', cand):
continue
# Clean candidate for analysis (remove separators) # Clean candidate
clean_cand = cand.replace("'", "").replace(".", "").replace(",", "").replace(" ", "") clean_cand = cand.replace("'", "").replace(".", "").replace(",", "").replace(" ", "")
# Check if it looks like a year (4 digits, 1900-2100) # Year detection
is_year_like = False
if clean_cand.isdigit() and len(clean_cand) == 4: if clean_cand.isdigit() and len(clean_cand) == 4:
val = int(clean_cand) val = int(clean_cand)
if 1900 <= val <= 2100: if 1900 <= val <= 2100:
is_year_like = True continue # Skip years
# Smart Year Skip (Legacy Logic) # Smart separator handling for spaces
if is_year_like and not target_val: # Only skip if we don't have a specific target if " " in cand:
if i < len(matches) - 1: parts = cand.split()
logger.info(f"[MetricParser] Skipping year-like candidate '{cand}' because another number follows.") if len(parts) > 1:
continue if not (len(parts[1]) == 3 and parts[1].isdigit()):
cand = parts[0]
# Clean candidate for checking (remove internal spaces if they look like thousands separators) else:
# Simple approach: Remove all spaces for parsing check merged = parts[0]
cand_clean_for_parse = cand.replace(" ", "") for p in parts[1:]:
if len(p) == 3 and p.isdigit():
# If we have a target value from LLM, check if this candidate matches it merged += p
if target_val is not None: else:
try: break
curr_val = MetricParser._parse_robust_number(cand_clean_for_parse, is_revenue) cand = merged
if abs(curr_val - target_val) < 0.1 or abs(curr_val - target_val/1000) < 0.1 or abs(curr_val - target_val*1000) < 0.1:
selected_candidate = cand # Keep original with spaces for final processing
logger.info(f"[MetricParser] Found candidate '{cand}' matching expected '{expected_value}'")
break
except:
pass
# Fallback logic:
# If we have NO target value, we take the first valid one we find.
# If we DO have a target value, we only take a fallback if we reach the end and haven't found the target?
# Better: We keep the FIRST valid candidate as a fallback in a separate variable.
if selected_candidate is None:
# Check if it's a valid number at all before storing as fallback
try:
MetricParser._parse_robust_number(cand_clean_for_parse, is_revenue)
if not is_year_like:
if best_candidate_val is None: # Store first valid non-year
best_candidate_val = cand
except:
pass
# If we found a specific match, use it. Otherwise use the fallback. try:
if selected_candidate: val = MetricParser._parse_robust_number(cand, is_revenue)
candidate = selected_candidate if val is not None:
elif best_candidate_val: final = val * multiplier
candidate = best_candidate_val logger.info(f"[MetricParser] Found value: '{cand}' -> {final}")
else: return final
return None except:
continue
# logger.info(f"DEBUG: Selected candidate: '{candidate}'")
# Smart separator handling (on the chosen candidate):
# Smart separator handling:
# Smart separator handling:
# A space is only a thousands-separator if it's followed by 3 digits.
# Otherwise it's likely a separator between unrelated numbers (e.g. "80 2020")
if " " in candidate:
parts = candidate.split()
if len(parts) > 1:
# Basic check: if second part is not 3 digits, we take only the first part
if not (len(parts[1]) == 3 and parts[1].isdigit()):
candidate = parts[0]
else:
# It might be 1 000. Keep merging if subsequent parts are also 3 digits.
merged = parts[0]
for p in parts[1:]:
if len(p) == 3 and p.isdigit():
merged += p
else:
break
candidate = merged
# Remove thousands separators (Quote)
candidate = candidate.replace("'", "")
if not candidate or not re.search(r'\d', candidate):
return None
# Count separators for rule checks return None
dots = candidate.count('.')
commas = candidate.count(',')
# 7. Concatenated Year Detection (Bug Fix for 802020)
# If the number is long (5-7 digits) and ends with a recent year (2018-2026),
# and has no separators, it's likely a concatenation like "802020".
if dots == 0 and commas == 0 and " " not in candidate:
if len(candidate) >= 5 and len(candidate) <= 7:
for year in range(2018, 2027):
y_str = str(year)
if candidate.endswith(y_str):
val_str = candidate[:-4]
if val_str.isdigit():
logger.warning(f"[MetricParser] Caught concatenated year BUG: '{candidate}' -> '{val_str}' (Year {year})")
candidate = val_str
break
try:
val = MetricParser._parse_robust_number(candidate, is_revenue)
final = val * multiplier
logger.info(f"[MetricParser] Candidate: '{candidate}' -> Multiplier: {multiplier} -> Value: {final}")
return final
except Exception as e:
logger.debug(f"Failed to parse number string '{candidate}': {e}")
return None
@staticmethod @staticmethod
def _parse_robust_number(s: str, is_revenue: bool) -> float: def _parse_robust_number(s: str, is_revenue: bool) -> Optional[float]:
""" """
Parses a number string dealing with ambiguous separators. Parses a number string dealing with ambiguous separators.
Standardizes to Python float. Standardizes to Python float.
""" """
# Count separators s = s.strip().replace("'", "")
if not s:
return None
dots = s.count('.') dots = s.count('.')
commas = s.count(',') commas = s.count(',')
# Case 1: Both present (e.g. 1.234,56 or 1,234.56) try:
if dots > 0 and commas > 0: # Case 1: Both present
# Check which comes last if dots > 0 and commas > 0:
if s.rfind('.') > s.rfind(','): # US Style: 1,234.56 if s.rfind('.') > s.rfind(','): # US Style
return float(s.replace(',', ''))
else: # German Style
return float(s.replace('.', '').replace(',', '.'))
# Case 2: Multiple dots
if dots > 1:
return float(s.replace('.', ''))
# Case 3: Multiple commas
if commas > 1:
return float(s.replace(',', '')) return float(s.replace(',', ''))
else: # German Style: 1.234,56
return float(s.replace('.', '').replace(',', '.'))
# Case 2: Multiple dots (Thousands: 1.000.000)
if dots > 1:
return float(s.replace('.', ''))
# Case 3: Multiple commas (Unusual, but treat as thousands)
if commas > 1:
return float(s.replace(',', ''))
# Case 4: Only Comma # Case 4: Only Comma
if commas == 1: if commas == 1:
# In German context "1,5" is 1.5. "1.000" is usually 1000. return float(s.replace(',', '.'))
# If it looks like decimal (1-2 digits after comma), treat as decimal.
# Except if it's exactly 3 digits and not is_revenue? No, comma is almost always decimal in DE.
return float(s.replace(',', '.'))
# Case 5: Only Dot
if dots == 1:
# Ambiguity: "1.005" (1005) vs "1.5" (1.5)
# Rule from Lesson 1: "1.005 Mitarbeiter" extracted as "1" (wrong).
# If dot followed by exactly 3 digits (and no comma), it's a thousands separator.
# FOR REVENUE: dots are generally decimals (375.6 Mio) unless unambiguous.
parts = s.split('.') # Case 5: Only Dot
if len(parts[1]) == 3: if dots == 1:
if is_revenue: parts = s.split('.')
# Revenue: 375.600 Mio? Unlikely compared to 375.6 Mio. if len(parts[1]) == 3:
# But 1.000 Mio is 1 Billion? No, 1.000 (thousand) millions. if is_revenue:
# User Rule: "Revenue: dots are generally treated as decimals" return float(s)
# "1.005" as revenue -> 1.005 (Millions) else:
# "1.005" as employees -> 1005 return float(s.replace('.', ''))
return float(s) return float(s)
else:
return float(s.replace('.', ''))
return float(s) return float(s)
except:
return float(s) return None

View File

@@ -0,0 +1,72 @@
import os
import sys
import argparse
import logging
# Add the backend directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from backend.database import get_db, Company
from backend.services.classification import ClassificationService
from backend.lib.logging_setup import setup_logging
# --- CONFIGURATION ---
# Setup logging to be very verbose for this script
setup_logging()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
def run_debug_analysis(company_id: int):
"""
Runs the full classification and enrichment process for a single company
in the foreground and prints detailed results.
"""
logger.info(f"--- Starting Interactive Debug for Company ID: {company_id} ---")
db_session = next(get_db())
try:
# 1. Fetch the company
company = db_session.query(Company).filter(Company.id == company_id).first()
if not company:
logger.error(f"Company with ID {company_id} not found.")
return
logger.info(f"Found Company: {company.name}")
# --- PRE-ANALYSIS STATE ---
print("\n--- METRICS BEFORE ---")
print(f"Calculated: {company.calculated_metric_value} {company.calculated_metric_unit}")
print(f"Standardized: {company.standardized_metric_value} {company.standardized_metric_unit}")
print("----------------------\n")
# 2. Instantiate the service
classifier = ClassificationService()
# 3. RUN THE CORE LOGIC
# This will now print all the detailed logs we added
updated_company = classifier.classify_company_potential(company, db_session)
# --- POST-ANALYSIS STATE ---
print("\n--- METRICS AFTER ---")
print(f"Industry (AI): {updated_company.industry_ai}")
print(f"Metric Source: {updated_company.metric_source}")
print(f"Proof Text: {updated_company.metric_proof_text}")
print(f"Calculated: {updated_company.calculated_metric_value} {updated_company.calculated_metric_unit}")
print(f"Standardized: {updated_company.standardized_metric_value} {updated_company.standardized_metric_unit}")
print(f"\nOpener 1 (Infra): {updated_company.ai_opener}")
print(f"Opener 2 (Ops): {updated_company.ai_opener_secondary}")
print("---------------------")
logger.info(f"--- Interactive Debug Finished for Company ID: {company_id} ---")
finally:
db_session.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a single company analysis for debugging.")
parser.add_argument("--id", type=int, default=1, help="The ID of the company to analyze.")
args = parser.parse_args()
run_debug_analysis(args.id)

View File

@@ -0,0 +1,67 @@
import requests
import os
import time
import argparse
import sys
import logging
# Add the backend directory to the Python path for relative imports to work
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# --- Configuration ---
def load_env_manual(path):
if not os.path.exists(path):
# print(f"⚠️ Warning: .env file not found at {path}") # Suppress for cleaner output in container
return
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
os.environ.setdefault(key.strip(), val.strip())
# Load .env (assuming it's in /app) - this needs to be run from /app or adjusted
# For docker-compose exec from project root, /app is the container's WORKDIR
load_env_manual('/app/.env')
API_USER = os.getenv("API_USER")
API_PASS = os.getenv("API_PASSWORD")
# When run INSIDE the container, the service is reachable via localhost
CE_URL = "http://localhost:8000"
ANALYZE_ENDPOINT = f"{CE_URL}/api/enrich/analyze"
def trigger_analysis(company_id: int):
print("="*60)
print(f"🚀 Triggering REAL analysis for Company ID: {company_id}")
print("="*60)
payload = {"company_id": company_id}
try:
# Added logging for API user/pass (debug only, remove in prod)
logger.debug(f"API Call to {ANALYZE_ENDPOINT} with user {API_USER}")
response = requests.post(ANALYZE_ENDPOINT, json=payload, auth=(API_USER, API_PASS), timeout=30) # Increased timeout
if response.status_code == 200 and response.json().get("status") == "queued":
print(" ✅ SUCCESS: Analysis task has been queued on the server.")
print(" The result will be available in the database and UI shortly.")
return True
else:
print(f" ❌ FAILURE: Server responded with status {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ FATAL: Could not connect to the server: {e}")
return False
if __name__ == "__main__":
# Add a basic logger to the script itself for clearer output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description="Trigger Company Explorer Analysis Task")
parser.add_argument("--company-id", type=int, required=True, help="ID of the company to analyze")
args = parser.parse_args()
trigger_analysis(args.company_id)

View File

@@ -5,7 +5,7 @@ import re
from datetime import datetime from datetime import datetime
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session from sqlalchemy.orm import Session, joinedload
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
@@ -19,9 +19,12 @@ class ClassificationService:
pass pass
def _load_industry_definitions(self, db: Session) -> List[Industry]: def _load_industry_definitions(self, db: Session) -> List[Industry]:
industries = db.query(Industry).all() industries = db.query(Industry).options(
joinedload(Industry.primary_category),
joinedload(Industry.secondary_category)
).all()
if not industries: if not industries:
logger.warning("No industry definitions found in DB. Classification might be limited.") logger.warning("No industry definitions found in DB.")
return industries return industries
def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[Dict[str, Any]]: def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[Dict[str, Any]]:
@@ -49,18 +52,11 @@ Return ONLY the exact name of the industry.
try: try:
response = call_gemini_flash(prompt) response = call_gemini_flash(prompt)
if not response: return "Others" if not response: return "Others"
cleaned = response.strip().replace('"', '').replace("'", "") cleaned = response.strip().replace('"', '').replace("'", "")
# Simple fuzzy match check
valid_names = [i['name'] for i in industry_definitions] + ["Others"] valid_names = [i['name'] for i in industry_definitions] + ["Others"]
if cleaned in valid_names: if cleaned in valid_names: return cleaned
return cleaned
# Fallback: Try to find name in response
for name in valid_names: for name in valid_names:
if name in cleaned: if name in cleaned: return name
return name
return "Others" return "Others"
except Exception as e: except Exception as e:
logger.error(f"Classification Prompt Error: {e}") logger.error(f"Classification Prompt Error: {e}")
@@ -79,23 +75,20 @@ Return a JSON object with:
- "raw_unit": The unit found (e.g. "Betten", ""). - "raw_unit": The unit found (e.g. "Betten", "").
- "proof_text": A short quote from the text proving this value. - "proof_text": A short quote from the text proving this value.
**IMPORTANT:** Ignore obvious year numbers (like 1900-2026) if other, more plausible metric values are present in the text. Focus on the target metric.
JSON ONLY. JSON ONLY.
""" """
try: try:
response = call_gemini_flash(prompt, json_mode=True) response = call_gemini_flash(prompt, json_mode=True)
if not response: return None if not response: return None
if isinstance(response, str): if isinstance(response, str):
response = response.replace("```json", "").replace("```", "").strip() try:
data = json.loads(response) data = json.loads(response.replace("```json", "").replace("```", "").strip())
except: return None
else: else:
data = response data = response
if isinstance(data, list) and data: data = data[0]
# Basic cleanup if not isinstance(data, dict): return None
if data.get("raw_value") == "null": data["raw_value"] = None if data.get("raw_value") == "null": data["raw_value"] = None
return data return data
except Exception as e: except Exception as e:
logger.error(f"LLM Extraction Parse Error: {e}") logger.error(f"LLM Extraction Parse Error: {e}")
@@ -103,38 +96,37 @@ JSON ONLY.
def _is_metric_plausible(self, metric_name: str, value: Optional[float]) -> bool: def _is_metric_plausible(self, metric_name: str, value: Optional[float]) -> bool:
if value is None: return False if value is None: return False
try: try: return float(value) > 0
val_float = float(value) except: return False
return val_float > 0
except:
return False
def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]: def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]:
if not formula or raw_value is None: if not formula or raw_value is None: return None
return None # Clean formula: remove anything in parentheses first (often units or comments)
formula_cleaned = formula.replace("wert", str(raw_value)).replace("Value", str(raw_value)).replace("Wert", str(raw_value)) clean_formula = re.sub(r'\(.*?\)', '', formula.lower())
formula_cleaned = re.sub(r'(?i)m[²2]', '', formula_cleaned) # Replace 'wert' with the actual value
formula_cleaned = re.sub(r'(?i)qm', '', formula_cleaned) expression = clean_formula.replace("wert", str(raw_value))
formula_cleaned = re.sub(r'\s*\(.*\)\s*$', '', formula_cleaned).strip() # Remove any non-math characters
expression = re.sub(r'[^0-9\.\+\-\*\/]', '', expression)
try: try:
return safe_eval_math(formula_cleaned) return safe_eval_math(expression)
except Exception as e: except Exception as e:
logger.error(f"Failed to parse standardization logic '{formula}' with value {raw_value}: {e}") logger.error(f"Failed to parse logic '{formula}' with value {raw_value}: {e}")
return None return None
def _get_best_metric_result(self, results_list: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: def _get_best_metric_result(self, results_list: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not results_list: if not results_list: return None
return None
source_priority = {"wikipedia": 0, "website": 1, "serpapi": 2} source_priority = {"wikipedia": 0, "website": 1, "serpapi": 2}
valid_results = [r for r in results_list if r.get("calculated_metric_value") is not None] valid_results = [r for r in results_list if r.get("calculated_metric_value") is not None]
if not valid_results: if not valid_results: return None
return None valid_results.sort(key=lambda r: source_priority.get(r.get("metric_source"), 99))
valid_results.sort(key=lambda r: (source_priority.get(r.get("metric_source"), 99), -r.get("metric_confidence", 0.0)))
logger.info(f"Best result chosen: {valid_results[0]}")
return valid_results[0] return valid_results[0]
def _get_website_content_and_url(self, company: Company) -> Tuple[Optional[str], Optional[str]]: def _get_website_content_and_url(self, db: Session, company: Company) -> Tuple[Optional[str], Optional[str]]:
return scrape_website_content(company.website), company.website enrichment = db.query(EnrichmentData).filter_by(company_id=company.id, source_type="website_scrape").order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "raw_text" in enrichment.content:
return enrichment.content["raw_text"], company.website
content = scrape_website_content(company.website)
return content, company.website
def _get_wikipedia_content_and_url(self, db: Session, company_id: int) -> Tuple[Optional[str], Optional[str]]: def _get_wikipedia_content_and_url(self, db: Session, company_id: int) -> Tuple[Optional[str], Optional[str]]:
wiki_data = self._get_wikipedia_content(db, company_id) wiki_data = self._get_wikipedia_content(db, company_id)
@@ -142,219 +134,135 @@ JSON ONLY.
def _get_serpapi_content_and_url(self, company: Company, search_term: str) -> Tuple[Optional[str], Optional[str]]: def _get_serpapi_content_and_url(self, company: Company, search_term: str) -> Tuple[Optional[str], Optional[str]]:
serp_results = run_serp_search(f"{company.name} {company.city or ''} {search_term}") serp_results = run_serp_search(f"{company.name} {company.city or ''} {search_term}")
if not serp_results: if not serp_results: return None, None
return None, None
content = " ".join([res.get("snippet", "") for res in serp_results.get("organic_results", [])]) content = " ".join([res.get("snippet", "") for res in serp_results.get("organic_results", [])])
url = serp_results.get("organic_results", [{}])[0].get("link") if serp_results.get("organic_results") else None url = serp_results.get("organic_results", [{}])[0].get("link") if serp_results.get("organic_results") else None
return content, url return content, url
def _extract_and_calculate_metric_cascade(self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str]) -> Dict[str, Any]: def _extract_and_calculate_metric_cascade(self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str]) -> Dict[str, Any]:
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "metric_proof_text": None, "metric_source_url": None, "metric_confidence": 0.0, "metric_confidence_reason": "No value found in any source."} final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "proof_text": None, "metric_source_url": None}
sources = [ sources = [
("website", self._get_website_content_and_url), ("website", lambda: self._get_website_content_and_url(db, company)),
("wikipedia", self._get_wikipedia_content_and_url), ("wikipedia", lambda: self._get_wikipedia_content_and_url(db, company.id)),
("serpapi", self._get_serpapi_content_and_url) ("serpapi", lambda: self._get_serpapi_content_and_url(company, search_term))
] ]
all_source_results = [] all_source_results = []
parser = MetricParser()
for source_name, content_loader in sources: for source_name, content_loader in sources:
logger.info(f"Checking {source_name} for '{search_term}' for {company.name}") logger.info(f" -> Checking source: [{source_name.upper()}] for '{search_term}'")
try: try:
args = (company,) if source_name == 'website' else (db, company.id) if source_name == 'wikipedia' else (company, search_term) content_text, current_source_url = content_loader()
content_text, current_source_url = content_loader(*args) if not content_text or len(content_text) < 100: continue
if not content_text or len(content_text) < 100:
logger.info(f"No or insufficient content for {source_name} (Length: {len(content_text) if content_text else 0}).")
continue
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name) llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
if llm_result: if llm_result and llm_result.get("proof_text"):
llm_result['source_url'] = current_source_url # Use the robust parser on the LLM's proof text or raw_value
all_source_results.append((source_name, llm_result)) hint = llm_result.get("raw_value") or llm_result.get("proof_text")
except Exception as e: parsed_value = parser.extract_numeric_value(text=content_text, expected_value=str(hint))
logger.error(f"Error in {source_name} stage: {e}") if parsed_value is not None:
llm_result.update({"calculated_metric_value": parsed_value, "calculated_metric_unit": llm_result.get('raw_unit'), "metric_source": source_name, "metric_source_url": current_source_url})
all_source_results.append(llm_result)
except Exception as e: logger.error(f" -> Error in {source_name} stage: {e}")
processed_results = [] best_result = self._get_best_metric_result(all_source_results)
for source_name, llm_result in all_source_results: if not best_result: return final_result
metric_value = llm_result.get("raw_value") final_result.update(best_result)
metric_unit = llm_result.get("raw_unit") if self._is_metric_plausible(search_term, final_result['calculated_metric_value']):
final_result['standardized_metric_value'] = self._parse_standardization_logic(standardization_logic, final_result['calculated_metric_value'])
if metric_value is not None and self._is_metric_plausible(search_term, metric_value): return final_result
standardized_value = None
if standardization_logic and metric_value is not None:
standardized_value = self._parse_standardization_logic(standardization_logic, metric_value)
processed_results.append({
"calculated_metric_name": search_term,
"calculated_metric_value": metric_value,
"calculated_metric_unit": metric_unit,
"standardized_metric_value": standardized_value,
"standardized_metric_unit": standardized_unit,
"metric_source": source_name,
"metric_proof_text": llm_result.get("proof_text"),
"metric_source_url": llm_result.get("source_url"),
"metric_confidence": 0.95,
"metric_confidence_reason": "Value found and extracted by LLM."
})
else:
logger.info(f"LLM found no plausible metric for {search_term} in {source_name}.")
best_result = self._get_best_metric_result(processed_results)
return best_result if best_result else final_result
def extract_metrics_for_industry(self, company: Company, db: Session, industry: Industry) -> Company: def _find_direct_area(self, db: Session, company: Company, industry_name: str) -> Optional[Dict[str, Any]]:
if not industry or not industry.scraper_search_term: logger.info(" -> (Helper) Running specific search for 'Fläche'...")
logger.warning(f"No metric configuration for industry '{industry.name if industry else 'None'}'") area_metrics = self._extract_and_calculate_metric_cascade(db, company, industry_name, search_term="Fläche", standardization_logic=None, standardized_unit="")
return company if area_metrics and area_metrics.get("calculated_metric_value") is not None:
unit = area_metrics.get("calculated_metric_unit", "").lower()
# Improved unit derivation if any(u in unit for u in ["", "qm", "quadratmeter"]):
if "" in (industry.standardization_logic or "") or "" in (industry.scraper_search_term or ""): logger.info(" ✅ SUCCESS: Found direct area value.")
std_unit = "" area_metrics['standardized_metric_value'] = area_metrics['calculated_metric_value']
else: return area_metrics
std_unit = "Einheiten" return None
metrics = self._extract_and_calculate_metric_cascade(
db, company, industry.name, industry.scraper_search_term, industry.standardization_logic, std_unit
)
company.calculated_metric_name = metrics["calculated_metric_name"]
company.calculated_metric_value = metrics["calculated_metric_value"]
company.calculated_metric_unit = metrics["calculated_metric_unit"]
company.standardized_metric_value = metrics["standardized_metric_value"]
company.standardized_metric_unit = metrics["standardized_metric_unit"]
company.metric_source = metrics["metric_source"]
company.metric_proof_text = metrics["metric_proof_text"]
company.metric_source_url = metrics.get("metric_source_url")
company.metric_confidence = metrics["metric_confidence"]
company.metric_confidence_reason = metrics["metric_confidence_reason"]
company.last_classification_at = datetime.utcnow()
# REMOVED: db.commit() - This should be handled by the calling function.
return company
def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company: def _generate_marketing_opener(self, company: Company, industry: Industry, website_text: str, focus_mode: str = "primary") -> Optional[str]:
logger.info(f"Re-evaluating metric for {company.name}...") if not industry: return None
return self.extract_metrics_for_industry(company, db, industry)
# 1. Determine Context & Pains/Gains
product_context = industry.primary_category.name if industry.primary_category else "Robotik-Lösungen"
raw_pains = industry.pains or ""
# Split pains/gains based on markers
def extract_segment(text, marker):
if not text: return ""
segments = re.split(r'\[(.*?)\]', text)
for i in range(1, len(segments), 2):
if marker.lower() in segments[i].lower():
return segments[i+1].strip()
return text # Fallback to full text if no markers found
def _generate_marketing_opener(self, company_name: str, website_text: str, industry_name: str, industry_pains: str, focus_mode: str = "primary") -> Optional[str]: relevant_pains = extract_segment(raw_pains, "Primary Product")
""" if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
Generates the 'First Sentence' (Opener). product_context = industry.secondary_category.name
focus_mode: 'primary' (Standard/Cleaning) or 'secondary' (Service/Logistics). relevant_pains = extract_segment(raw_pains, "Secondary Product")
"""
if not industry_pains:
industry_pains = "Effizienz und Personalmangel" # Fallback
# Dynamic Focus Instruction
if focus_mode == "secondary":
focus_instruction = """
- **FOKUS: SEKUNDÄR-PROZESSE (Logistik/Service/Versorgung).**
- Ignoriere das Thema Reinigung. Konzentriere dich auf **Abläufe, Materialfluss, Entlastung von Fachkräften** oder **Gäste-Service**.
- Der Satz muss einen operativen Entscheider (z.B. Pflegedienstleitung, Produktionsleiter) abholen."""
else:
focus_instruction = """
- **FOKUS: PRIMÄR-PROZESSE (Infrastruktur/Sauberkeit/Sicherheit).**
- Konzentriere dich auf Anforderungen an das Facility Management, Hygiene, Außenwirkung oder Arbeitssicherheit.
- Der Satz muss einen Infrastruktur-Entscheider (z.B. FM-Leiter, Geschäftsführer) abholen."""
prompt = f""" prompt = f"""
Du bist ein exzellenter B2B-Stratege und Texter. Du bist ein exzellenter B2B-Stratege und Texter. Formuliere einen hochpersonalisierten Einleitungssatz (1-2 Sätze).
Deine Aufgabe ist es, einen hochpersonalisierten Einleitungssatz für eine E-Mail an ein potenzielles Kundenunternehmen zu formulieren. Unternehmen: {company.name}
Branche: {industry.name}
Fokus: {focus_mode.upper()}
Herausforderungen: {relevant_pains}
Kontext: {website_text[:2500]}
--- KONTEXT --- REGEL: Nenne NICHT das Produkt "{product_context}". Fokussiere dich NUR auf die Herausforderung.
Zielunternehmen: {company_name} AUSGABE: NUR den fertigen Satz.
Branche: {industry_name}
Operative Herausforderung (Pain): "{industry_pains}"
Webseiten-Kontext:
{website_text[:2500]}
--- Denkprozess & Stilvorgaben ---
1. **Analysiere den Kontext:** Verstehe das Kerngeschäft.
2. **Identifiziere den Hebel:** Was ist der Erfolgsfaktor in Bezug auf den FOKUS?
3. **Formuliere den Satz (ca. 20-35 Wörter):**
- Wähle einen eleganten, aktiven Einstieg.
- Verbinde die **Tätigkeit** mit dem **Hebel** und den **Konsequenzen**.
- **WICHTIG:** Formuliere als positive Beobachtung über eine Kernkompetenz.
- **VERMEIDE:** Konkrete Zahlen.
- Verwende den Firmennamen: {company_name}.
{focus_instruction}
--- Deine Ausgabe ---
Gib NUR den finalen Satz aus. Keine Anführungszeichen.
""" """
try: try:
response = call_gemini_flash(prompt) response = call_gemini_flash(prompt)
if response: return response.strip().strip('"') if response else None
return response.strip().strip('"')
return None
except Exception as e: except Exception as e:
logger.error(f"Opener Generation Error: {e}") logger.error(f"Opener Error: {e}")
return None return None
def classify_company_potential(self, company: Company, db: Session) -> Company: def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"Starting classification for {company.name}...") logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
# 1. Load Definitions
industries = self._load_industry_definitions(db) industries = self._load_industry_definitions(db)
industry_defs = [{"name": i.name, "description": i.description} for i in industries] website_content, _ = self._get_website_content_and_url(db, company)
logger.debug(f"Loaded {len(industries)} industry definitions.")
# 2. Get Content (Website)
website_content, _ = self._get_website_content_and_url(company)
if not website_content or len(website_content) < 100: if not website_content or len(website_content) < 100:
logger.warning(f"No or insufficient website content for {company.name} (Length: {len(website_content) if website_content else 0}). Skipping classification.") company.status = "ENRICH_FAILED"
db.commit()
return company return company
logger.debug(f"Website content length for classification: {len(website_content)}")
# 3. Classify Industry industry_defs = [{"name": i.name, "description": i.description} for i in industries]
logger.info(f"Running LLM classification prompt for {company.name}...")
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs) suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
logger.info(f"AI suggests industry: {suggested_industry_name}")
# 4. Update Company & Generate Openers
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None) matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
if not matched_industry:
company.industry_ai = "Others"
db.commit()
return company
if matched_industry: company.industry_ai = matched_industry.name
company.industry_ai = matched_industry.name logger.info(f"✅ Industry: {matched_industry.name}")
logger.info(f"Matched company to industry: {matched_industry.name}")
# --- Generate PRIMARY Opener (Infrastructure/Cleaning) ---
logger.info(f"Generating PRIMARY opener for {company.name}...")
op_prim = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "primary"
)
if op_prim:
company.ai_opener = op_prim
logger.info(f"Opener (Primary) generated and set.")
else:
logger.warning(f"Failed to generate PRIMARY opener for {company.name}.")
# --- Generate SECONDARY Opener (Service/Logistics) --- metrics = self._find_direct_area(db, company, matched_industry.name)
logger.info(f"Generating SECONDARY opener for {company.name}...") if not metrics:
op_sec = self._generate_marketing_opener( logger.info(" -> No direct area. Trying proxy...")
company.name, website_content, matched_industry.name, matched_industry.pains, "secondary" if matched_industry.scraper_search_term:
) metrics = self._extract_and_calculate_metric_cascade(db, company, matched_industry.name, search_term=matched_industry.scraper_search_term, standardization_logic=matched_industry.standardization_logic, standardized_unit="")
if op_sec:
company.ai_opener_secondary = op_sec if metrics and metrics.get("calculated_metric_value"):
logger.info(f"Opener (Secondary) generated and set.") logger.info(f" ✅ SUCCESS: {metrics.get('calculated_metric_value')} {metrics.get('calculated_metric_unit')}")
else: company.calculated_metric_name = metrics.get("calculated_metric_name", matched_industry.scraper_search_term or "Fläche")
logger.warning(f"Failed to generate SECONDARY opener for {company.name}.") company.calculated_metric_value = metrics.get("calculated_metric_value")
company.calculated_metric_unit = metrics.get("calculated_metric_unit")
else: company.standardized_metric_value = metrics.get("standardized_metric_value")
company.industry_ai = "Others" company.standardized_metric_unit = metrics.get("standardized_metric_unit")
logger.warning(f"No specific industry matched for {company.name}. Set to 'Others'.") company.metric_source = metrics.get("metric_source")
company.metric_proof_text = metrics.get("proof_text")
# 5. Extract Metrics (Cascade) company.metric_source_url = metrics.get("metric_source_url")
if matched_industry: company.metric_confidence = 0.8
logger.info(f"Extracting metrics for {company.name} and industry {matched_industry.name}...") company.metric_confidence_reason = "Metric processed."
try:
self.extract_metrics_for_industry(company, db, matched_industry) company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_content, "primary")
logger.info(f"Metric extraction completed for {company.name}.") company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_content, "secondary")
except Exception as e:
logger.error(f"Error during metric extraction for {company.name}: {e}", exc_info=True)
else:
logger.warning(f"Skipping metric extraction for {company.name} as no specific industry was matched.")
company.last_classification_at = datetime.utcnow() company.last_classification_at = datetime.utcnow()
company.status = "ENRICHED"
db.commit() db.commit()
logger.info(f"Classification and enrichment for {company.name} completed and committed.") logger.info(f"--- ✅ Analysis Finished for {company.name} ---")
return company return company

View File

@@ -0,0 +1,82 @@
import unittest
import os
import sys
from unittest.mock import MagicMock, patch
# Adjust path to allow importing from backend
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from backend.services.classification import ClassificationService
from backend.database import Company, Industry, RoboticsCategory, Session
class TestHospitalMetricFinal(unittest.TestCase):
def setUp(self):
self.service = ClassificationService()
self.mock_db = MagicMock(spec=Session)
self.mock_company = Company(id=8, name="Klinikum Landkreis Erding")
self.mock_industry_hospital = Industry(
id=1,
name="Healthcare - Hospital",
scraper_search_term="Anzahl Betten",
standardization_logic="wert * 100",
primary_category=RoboticsCategory(name="Reinigungsroboter"),
secondary_category=RoboticsCategory(name="Serviceroboter"),
)
self.mock_website_content = "Ein langer Text, der die 100-Zeichen-Prüfung besteht."
@patch('backend.services.classification.ClassificationService._generate_marketing_opener')
@patch('backend.services.classification.ClassificationService._extract_and_calculate_metric_cascade')
@patch('backend.services.classification.ClassificationService._find_direct_area')
@patch('backend.services.classification.ClassificationService._run_llm_classification_prompt')
@patch('backend.services.classification.ClassificationService._get_website_content_and_url')
@patch('backend.services.classification.ClassificationService._load_industry_definitions')
def test_final_hospital_logic(
self,
mock_load_industries,
mock_get_website,
mock_classify,
mock_find_direct_area,
mock_extract_cascade,
mock_generate_opener
):
print("\n--- Running Final Hospital Logic Test ---")
# --- MOCK SETUP ---
mock_load_industries.return_value = [self.mock_industry_hospital]
mock_get_website.return_value = (self.mock_website_content, "http://mock.com")
mock_classify.return_value = "Healthcare - Hospital"
mock_find_direct_area.return_value = None # STAGE 1 MUST FAIL
proxy_metric_result = {
"calculated_metric_name": "Anzahl Betten",
"calculated_metric_value": 352.0,
"calculated_metric_unit": "Betten",
"standardized_metric_value": 35200.0,
"standardized_metric_unit": "",
"metric_source": "wikipedia",
}
mock_extract_cascade.return_value = proxy_metric_result
mock_generate_opener.side_effect = ["Primary Opener", "Secondary Opener"]
# --- EXECUTION ---
updated_company = self.service.classify_company_potential(self.mock_company, self.mock_db)
# --- ASSERTIONS ---
mock_find_direct_area.assert_called_once()
mock_extract_cascade.assert_called_once()
self.assertEqual(updated_company.calculated_metric_name, "Anzahl Betten")
self.assertEqual(updated_company.calculated_metric_value, 352.0)
self.assertEqual(updated_company.standardized_metric_value, 35200.0)
print(" ✅ Metrics from Stage 2 correctly applied.")
self.assertEqual(updated_company.ai_opener, "Primary Opener")
self.assertEqual(updated_company.ai_opener_secondary, "Secondary Opener")
print(" ✅ Openers correctly applied.")
print("\n--- ✅ PASSED: Final Hospital Logic Test. ---")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,49 +0,0 @@
import requests
import os
import time
# --- Configuration ---
def load_env_manual(path):
if not os.path.exists(path):
print(f"⚠️ Warning: .env file not found at {path}")
return
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
os.environ.setdefault(key.strip(), val.strip())
load_env_manual('/app/.env')
API_USER = os.getenv("API_USER")
API_PASS = os.getenv("API_PASSWORD")
CE_URL = "http://127.0.0.1:8000"
ANALYZE_ENDPOINT = f"{CE_URL}/api/enrich/analyze"
COMPANY_ID_TO_ANALYZE = 1 # Therme Erding
def trigger_analysis():
print("="*60)
print(f"🚀 Triggering REAL analysis for Company ID: {COMPANY_ID_TO_ANALYZE}")
print("="*60)
payload = {"company_id": COMPANY_ID_TO_ANALYZE}
try:
response = requests.post(ANALYZE_ENDPOINT, json=payload, auth=(API_USER, API_PASS), timeout=10)
if response.status_code == 200 and response.json().get("status") == "queued":
print(" ✅ SUCCESS: Analysis task has been queued on the server.")
print(" The result will be available in the database and UI shortly.")
return True
else:
print(f" ❌ FAILURE: Server responded with status {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ FATAL: Could not connect to the server: {e}")
return False
if __name__ == "__main__":
trigger_analysis()