feat(company-explorer): Initial Web UI & Backend with Enrichment Flow

This commit introduces the foundational elements for the new "Company Explorer" web application, marking a significant step away from the legacy Google Sheets / CLI system.

Key changes include:
- Project Structure: A new  directory with separate  (FastAPI) and  (React/Vite) components.
- Data Persistence: Migration from Google Sheets to a local SQLite database () using SQLAlchemy.
- Core Utilities: Extraction and cleanup of essential helper functions (LLM wrappers, text utilities) into .
- Backend Services: , ,  for AI-powered analysis, and  logic.
- Frontend UI: Basic React application with company table, import wizard, and dynamic inspector sidebar.
- Docker Integration: Updated  and  for multi-stage builds and sideloading.
- Deployment & Access: Integrated into central Nginx proxy and dashboard, accessible via .

Lessons Learned & Fixed during development:
- Frontend Asset Loading: Addressed issues with Vite's  path and FastAPI's .
- TypeScript Configuration: Added  and .
- Database Schema Evolution: Solved  errors by forcing a new database file and correcting  override.
- Logging: Implemented robust file-based logging ().

This new foundation provides a powerful and maintainable platform for future B2B robotics lead generation.
This commit is contained in:
2026-01-07 17:55:08 +00:00
parent 7405c2acb9
commit 2c7bb262ef
51 changed files with 3475 additions and 2 deletions

View File

@@ -0,0 +1,77 @@
import json
import logging
import os
from typing import Dict, Any, List
from ..lib.core_utils import call_gemini
from ..config import settings
logger = logging.getLogger(__name__)
ALLOWED_INDUSTRIES_FILE = os.path.join(os.path.dirname(__file__), "../data/allowed_industries.json")
class ClassificationService:
def __init__(self):
self.allowed_industries = self._load_allowed_industries()
def _load_allowed_industries(self) -> List[str]:
try:
with open(ALLOWED_INDUSTRIES_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load allowed industries: {e}")
return ["Sonstige"]
def analyze_robotics_potential(self, company_name: str, website_text: str) -> Dict[str, Any]:
"""
Analyzes the company for robotics potential based on website content.
Returns strict JSON.
"""
if not website_text or len(website_text) < 100:
return {"error": "Insufficient text content"}
prompt = f"""
You are a Senior B2B Market Analyst for 'Roboplanet', a robotics distributor.
Your job is to analyze a target company based on their website text and determine their potential for using robots.
--- TARGET COMPANY ---
Name: {company_name}
Website Content (Excerpt):
{website_text[:15000]}
--- ALLOWED INDUSTRIES (STRICT) ---
You MUST assign the company to exactly ONE of these industries. If unsure, choose the closest match or "Sonstige".
{json.dumps(self.allowed_industries, ensure_ascii=False)}
--- ANALYSIS TASKS ---
1. **Industry Classification:** Pick one from the list.
2. **Robotics Potential Scoring (0-100):**
- **Cleaning:** Does the company manage large floors, hospitals, hotels, or public spaces? (Keywords: Hygiene, Cleaning, SPA, Facility Management)
- **Transport/Logistics:** Do they move goods internally? (Keywords: Warehouse, Intralogistics, Production line, Hospital logistics)
- **Security:** Do they have large perimeters or night patrols? (Keywords: Werkschutz, Security, Monitoring)
- **Service:** Do they interact with guests/patients? (Keywords: Reception, Restaurant, Nursing)
3. **Explanation:** A short, strategic reason for the scoring (German).
--- OUTPUT FORMAT (JSON ONLY) ---
{{
"industry": "String (from list)",
"summary": "Short business summary (German)",
"potentials": {{
"cleaning": {{ "score": 0-100, "reason": "..." }},
"transport": {{ "score": 0-100, "reason": "..." }},
"security": {{ "score": 0-100, "reason": "..." }},
"service": {{ "score": 0-100, "reason": "..." }}
}}
}}
"""
try:
response_text = call_gemini(
prompt=prompt,
json_mode=True,
temperature=0.2 # Low temp for consistency
)
return json.loads(response_text)
except Exception as e:
logger.error(f"Classification failed: {e}")
return {"error": str(e)}

View File

@@ -0,0 +1,209 @@
import logging
import re
from collections import Counter
from typing import List, Tuple, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy import select
# External libs (must be in requirements.txt)
from thefuzz import fuzz
from ..database import Company
from ..lib.core_utils import clean_text, normalize_string
logger = logging.getLogger(__name__)
# --- Configuration (Ported from Legacy) ---
SCORE_THRESHOLD = 80
SCORE_THRESHOLD_WEAK = 95
MIN_NAME_FOR_DOMAIN = 70
CITY_MISMATCH_PENALTY = 30
COUNTRY_MISMATCH_PENALTY = 40
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
}
# ==============================================================================
# Helpers
# ==============================================================================
def _tokenize(s: str) -> List[str]:
if not s: return []
return re.split(r"[^a-z0-9]+", str(s).lower())
def split_tokens(name: str) -> List[str]:
if not name: return []
tokens = [t for t in _tokenize(name) if len(t) >= 3]
return [t for t in tokens if t not in STOP_TOKENS_BASE]
def clean_name_for_scoring(norm_name: str) -> Tuple[str, set]:
toks = split_tokens(norm_name)
return " ".join(toks), set(toks)
# ==============================================================================
# Core Deduplication Logic
# ==============================================================================
class Deduplicator:
def __init__(self, db: Session):
self.db = db
self.reference_data = [] # Cache for DB records
self.domain_index = {}
self.token_freq = Counter()
self.token_index = {}
self._load_reference_data()
def _load_reference_data(self):
"""
Loads minimal dataset from DB into RAM for fast fuzzy matching.
Optimized for 10k-50k records.
"""
logger.info("Loading reference data for deduplication...")
query = self.db.query(Company.id, Company.name, Company.website, Company.city, Company.country)
companies = query.all()
for c in companies:
norm_name = normalize_string(c.name)
norm_domain = normalize_string(c.website) # Simplified, should extract domain
record = {
'id': c.id,
'name': c.name,
'normalized_name': norm_name,
'normalized_domain': norm_domain,
'city': normalize_string(c.city),
'country': normalize_string(c.country)
}
self.reference_data.append(record)
# Build Indexes
if norm_domain:
self.domain_index.setdefault(norm_domain, []).append(record)
# Token Frequency
_, toks = clean_name_for_scoring(norm_name)
for t in toks:
self.token_freq[t] += 1
self.token_index.setdefault(t, []).append(record)
logger.info(f"Loaded {len(self.reference_data)} records for deduplication.")
def _choose_rarest_token(self, norm_name: str) -> Optional[str]:
_, toks = clean_name_for_scoring(norm_name)
if not toks: return None
# Sort by frequency (asc) then length (desc)
lst = sorted(list(toks), key=lambda x: (self.token_freq.get(x, 10**9), -len(x)))
return lst[0] if lst else None
def find_duplicates(self, candidate: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Checks a single candidate against the loaded index.
Returns list of matches with score >= Threshold.
"""
# Prepare Candidate
c_norm_name = normalize_string(candidate.get('name', ''))
c_norm_domain = normalize_string(candidate.get('website', ''))
c_city = normalize_string(candidate.get('city', ''))
c_country = normalize_string(candidate.get('country', ''))
candidates_to_check = {} # Map ID -> Record
# 1. Domain Match (Fastest)
if c_norm_domain and c_norm_domain in self.domain_index:
for r in self.domain_index[c_norm_domain]:
candidates_to_check[r['id']] = r
# 2. Rarest Token Match (Blocking)
rtok = self._choose_rarest_token(c_norm_name)
if rtok and rtok in self.token_index:
for r in self.token_index[rtok]:
candidates_to_check[r['id']] = r
if not candidates_to_check:
return []
# 3. Scoring
matches = []
for db_rec in candidates_to_check.values():
score, details = self._calculate_similarity(
cand={'n': c_norm_name, 'd': c_norm_domain, 'c': c_city, 'ct': c_country},
ref=db_rec
)
# Threshold Logic (Weak vs Strong)
is_weak = (details['domain_match'] == 0 and not (details['loc_match']))
threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD
if score >= threshold:
matches.append({
'company_id': db_rec['id'],
'name': db_rec['name'],
'score': score,
'details': details
})
matches.sort(key=lambda x: x['score'], reverse=True)
return matches
def _calculate_similarity(self, cand, ref):
# Data Prep
n1, n2 = cand['n'], ref['normalized_name']
# Exact Name Shortcut
if n1 and n1 == n2:
return 100, {'exact': True, 'domain_match': 0, 'loc_match': 0}
# Domain
d1, d2 = cand['d'], ref['normalized_domain']
domain_match = 1 if (d1 and d2 and d1 == d2) else 0
# Location
city_match = 1 if (cand['c'] and ref['city'] and cand['c'] == ref['city']) else 0
country_match = 1 if (cand['ct'] and ref['country'] and cand['ct'] == ref['country']) else 0
loc_match = city_match and country_match
# Name Fuzzy Score
clean1, _ = clean_name_for_scoring(n1)
clean2, _ = clean_name_for_scoring(n2)
if clean1 and clean2:
ts = fuzz.token_set_ratio(clean1, clean2)
pr = fuzz.partial_ratio(clean1, clean2)
ss = fuzz.token_sort_ratio(clean1, clean2)
name_score = max(ts, pr, ss)
else:
name_score = 0
# Penalties
penalties = 0
if cand['ct'] and ref['country'] and not country_match:
penalties += COUNTRY_MISMATCH_PENALTY
if cand['c'] and ref['city'] and not city_match:
penalties += CITY_MISMATCH_PENALTY
# Final Calc
# Base weights: Domain is king (100), Name is mandatory (unless domain match)
total = 0
if domain_match:
total = 100
else:
total = name_score
if loc_match:
total += 10 # Bonus
total -= penalties
# Capping
total = min(100, max(0, total))
return total, {
'name_score': name_score,
'domain_match': domain_match,
'loc_match': loc_match,
'penalties': penalties
}

View File

@@ -0,0 +1,126 @@
import logging
import requests
import re
from typing import Optional, Dict, Tuple
from urllib.parse import urlparse
from ..config import settings
from ..lib.core_utils import retry_on_failure, normalize_string
logger = logging.getLogger(__name__)
# Domains to ignore when looking for official company homepage
BLACKLIST_DOMAINS = {
"linkedin.com", "xing.com", "facebook.com", "instagram.com", "twitter.com",
"northdata.de", "northdata.com", "firmenwissen.de", "creditreform.de",
"dnb.com", "kompass.com", "wer-zu-wem.de", "kununu.com", "glassdoor.com",
"stepstone.de", "indeed.com", "monster.de", "youtube.com", "wikipedia.org"
}
class DiscoveryService:
def __init__(self):
self.api_key = settings.SERP_API_KEY
if not self.api_key:
logger.warning("SERP_API_KEY not set. Discovery features will fail.")
@retry_on_failure(max_retries=2)
def find_company_website(self, company_name: str, city: Optional[str] = None) -> str:
"""
Uses Google Search via SerpAPI to find the most likely official homepage.
Returns "k.A." if nothing credible is found.
"""
if not self.api_key:
return "k.A."
query = f"{company_name} offizielle Website"
if city:
query += f" {city}"
logger.info(f"Searching website for: {query}")
try:
params = {
"engine": "google",
"q": query,
"api_key": self.api_key,
"num": 5,
"gl": "de",
"hl": "de"
}
response = requests.get("https://serpapi.com/search", params=params, timeout=15)
response.raise_for_status()
data = response.json()
if "organic_results" not in data:
return "k.A."
for result in data["organic_results"]:
link = result.get("link", "")
if self._is_credible_url(link):
# Simple heuristic: If the company name is part of the domain, high confidence
# Otherwise, take the first credible result.
return link
return "k.A."
except Exception as e:
logger.error(f"SerpAPI Error: {e}")
return "k.A."
@retry_on_failure(max_retries=2)
def find_wikipedia_url(self, company_name: str) -> str:
"""
Searches for a specific German Wikipedia article.
"""
if not self.api_key:
return "k.A."
query = f"{company_name} Wikipedia"
try:
params = {
"engine": "google",
"q": query,
"api_key": self.api_key,
"num": 3,
"gl": "de",
"hl": "de"
}
response = requests.get("https://serpapi.com/search", params=params, timeout=15)
response.raise_for_status()
data = response.json()
for result in data.get("organic_results", []):
link = result.get("link", "")
if "de.wikipedia.org/wiki/" in link:
# Basic validation: Is the title roughly the company?
title = result.get("title", "").replace(" Wikipedia", "")
if self._check_name_similarity(company_name, title):
return link
return "k.A."
except Exception as e:
logger.error(f"Wiki Search Error: {e}")
return "k.A."
def _is_credible_url(self, url: str) -> bool:
"""Filters out social media, directories, and junk."""
if not url: return False
try:
domain = urlparse(url).netloc.lower().replace("www.", "")
if domain in BLACKLIST_DOMAINS:
return False
# Check for subdomains of blacklist (e.g. de.linkedin.com)
for bad in BLACKLIST_DOMAINS:
if domain.endswith("." + bad):
return False
return True
except:
return False
def _check_name_similarity(self, name1: str, name2: str) -> bool:
"""Simple fuzzy check for validation."""
n1 = normalize_string(name1)
n2 = normalize_string(name2)
# Very permissive: if one is contained in the other
return n1 in n2 or n2 in n1

View File

@@ -0,0 +1,82 @@
import logging
import requests
import random
import re
from bs4 import BeautifulSoup
from typing import Optional, Dict
from ..lib.core_utils import clean_text, retry_on_failure
logger = logging.getLogger(__name__)
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
]
class ScraperService:
def __init__(self, timeout: int = 15):
self.timeout = timeout
@retry_on_failure(max_retries=2)
def scrape_url(self, url: str) -> Dict[str, str]:
"""
Fetches a URL and returns cleaned text content + meta info.
"""
if not url.startswith("http"):
url = "https://" + url
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
# verify=False is risky but often needed for poorly configured corporate sites
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
response.raise_for_status()
# Check Content Type
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' not in content_type:
logger.warning(f"Skipping non-HTML content for {url}: {content_type}")
return {"error": "Not HTML"}
return self._parse_html(response.content)
except requests.exceptions.SSLError:
# Retry with HTTP if HTTPS fails
if url.startswith("https://"):
logger.info(f"SSL failed for {url}, retrying with http://...")
return self.scrape_url(url.replace("https://", "http://"))
raise
except Exception as e:
logger.error(f"Scraping failed for {url}: {e}")
return {"error": str(e)}
def _parse_html(self, html_content: bytes) -> Dict[str, str]:
soup = BeautifulSoup(html_content, 'html.parser')
# 1. Cleanup Junk
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button']):
element.decompose()
# 2. Extract Title & Meta Description
title = soup.title.string if soup.title else ""
meta_desc = ""
meta_tag = soup.find('meta', attrs={'name': 'description'})
if meta_tag:
meta_desc = meta_tag.get('content', '')
# 3. Extract Main Text
# Prefer body, fallback to full soup
body = soup.find('body')
raw_text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
cleaned_text = clean_text(raw_text)
# 4. Extract Emails (Basic Regex)
emails = set(re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', raw_text))
return {
"title": clean_text(title),
"description": clean_text(meta_desc),
"text": cleaned_text[:25000], # Limit to avoid context overflow
"emails": list(emails)[:5] # Limit to 5
}

View File

@@ -0,0 +1,103 @@
import os
import logging
from sqlalchemy.orm import Session
from ..database import Company
from ..interfaces import LeadData, TaskData, CRMRepository
from ..repositories.mock import MockRepository
from ..repositories.superoffice import SuperOfficeRepository
from ..config import settings
logger = logging.getLogger(__name__)
class CRMFactory:
_instance: CRMRepository = None
@classmethod
def get_repository(cls) -> CRMRepository:
if cls._instance:
return cls._instance
crm_type = os.getenv("CRM_TYPE", "MOCK").upper()
if crm_type == "SUPEROFFICE":
# Load credentials securely from settings/env
tenant = os.getenv("SO_TENANT_ID", "")
token = os.getenv("SO_API_TOKEN", "")
logger.info("Initializing SuperOffice Repository...")
cls._instance = SuperOfficeRepository(tenant, token)
else:
logger.info("Initializing Mock Repository (Default)...")
cls._instance = MockRepository()
return cls._instance
class SyncService:
def __init__(self, db: Session):
self.db = db
self.repo = CRMFactory.get_repository()
def sync_company(self, company_id: int) -> dict:
"""
Pushes a local company to the external CRM.
"""
local_company = self.db.query(Company).filter(Company.id == company_id).first()
if not local_company:
return {"error": "Company not found"}
# 1. Map Data
# Extract highest robotics potential score
max_score = 0
reason = ""
for sig in local_company.signals:
if sig.confidence > max_score:
max_score = int(sig.confidence)
reason = f"{sig.signal_type} ({sig.value})"
lead_data = LeadData(
name=local_company.name,
website=local_company.website,
city=local_company.city,
country=local_company.country,
industry=local_company.industry_ai, # We suggest our AI industry
robotics_potential_score=max_score,
robotics_potential_reason=reason
)
# 2. Check if already linked
external_id = local_company.crm_id
# 3. Check if exists in CRM (by name) if not linked yet
if not external_id:
external_id = self.repo.find_company(local_company.name)
action = "none"
if external_id:
# Update
success = self.repo.update_lead(external_id, lead_data)
if success:
action = "updated"
# If we found it by search, link it locally
if not local_company.crm_id:
local_company.crm_id = external_id
self.db.commit()
else:
# Create
new_id = self.repo.create_lead(lead_data)
if new_id:
action = "created"
local_company.crm_id = new_id
self.db.commit()
# Create a task for the sales rep if high potential
if max_score > 70:
self.repo.create_task(new_id, TaskData(
subject="🔥 Hot Robotics Lead",
description=f"AI detected high potential ({max_score}%). Reason: {reason}. Please check website."
))
return {
"status": "success",
"action": action,
"crm": self.repo.get_name(),
"external_id": local_company.crm_id
}