Files
lead-engine-mvp/enrich.py

108 lines
3.4 KiB
Python

import json
import requests
import os
from requests.auth import HTTPBasicAuth
from db import update_lead_status, get_leads, DB_PATH
import sqlite3
# Config
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
CE_API_USER = os.getenv("CE_API_USER", "admin")
CE_API_PASS = os.getenv("CE_API_PASS", "")
def query_company_explorer(company_name):
"""
Fragt die Company Explorer API nach Details zur Firma.
Gibt None zurück, wenn die API nicht erreichbar ist.
"""
if not CE_API_PASS:
print("⚠️ Warning: No CE_API_PASS set. Skipping API call.")
return None
try:
# Beispiel-Endpoint: Suche nach Firma
# Wir nehmen an, es gibt einen /search Endpoint
response = requests.get(
f"{CE_API_URL}/companies/search",
params={"q": company_name},
auth=HTTPBasicAuth(CE_API_USER, CE_API_PASS),
timeout=5
)
if response.status_code == 200:
results = response.json()
if results and isinstance(results, list) and len(results) > 0:
return results[0] # Best Match
elif response.status_code == 401:
print("❌ API Auth failed (401). Check password.")
except Exception as e:
print(f"❌ API Error: {e}")
return None
def get_mock_enrichment(company_name, raw_body):
"""Fallback Logic wenn API nicht verfügbar"""
enrichment = {
'vertical': 'Unknown',
'score': 0,
'recommendation': 'Manual Review (Mock Data)'
}
name_lower = company_name.lower()
if 'küche' in name_lower or 'einbau' in name_lower:
enrichment['vertical'] = 'Manufacturing / Woodworking'
enrichment['score'] = 85
enrichment['recommendation'] = 'High Priority - Pitch Dust Control'
enrichment['area_estimate'] = '> 1000m²'
if 'shop' in name_lower or 'roller' in name_lower:
enrichment['vertical'] = 'Retail / Automotive'
enrichment['score'] = 60
enrichment['recommendation'] = 'Medium Priority - Pitch Showroom Cleanliness'
enrichment['area_estimate'] = '300-500m²'
return enrichment
def enrich_lead(lead):
print(f"🔍 Enriching: {lead['company_name']}...")
# 1. Versuch: Echte API
api_data = query_company_explorer(lead['company_name'])
enrichment = {}
if api_data:
print("✅ API Hit!")
# Mapping der API-Daten auf unser Format
enrichment = {
'vertical': api_data.get('industry', 'Unknown'),
'score': api_data.get('score', 50), # Annahme: API liefert Score
'recommendation': 'Data from Company Explorer',
'details': api_data
}
else:
print("⚠️ API Miss/Fail -> Using Mock Fallback")
enrichment = get_mock_enrichment(lead['company_name'], lead['raw_body'])
# Update DB
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
(json.dumps(enrichment), 'enriched', lead['id']))
conn.commit()
conn.close()
def run_enrichment():
leads = get_leads()
count = 0
for lead in leads:
if lead['status'] == 'new':
enrich_lead(lead)
count += 1
print(f"Done. Enriched {count} leads.")
if __name__ == "__main__":
run_enrichment()