236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
import json
|
|
import requests
|
|
import os
|
|
import time
|
|
from requests.auth import HTTPBasicAuth
|
|
from db import update_lead_status, get_leads, DB_PATH
|
|
import sqlite3
|
|
|
|
# --- Configuration ---
|
|
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
|
|
CE_API_USER = os.getenv("CE_API_USER", "admin")
|
|
CE_API_PASS = os.getenv("CE_API_PASS", "gemini")
|
|
|
|
def get_auth():
|
|
return HTTPBasicAuth(CE_API_USER, CE_API_PASS)
|
|
|
|
# 1. CORE LOGIC: Interaction with Company Explorer
|
|
|
|
def find_company(name):
|
|
"""Prüft, ob Firma im CE existiert."""
|
|
try:
|
|
res = requests.get(
|
|
f"{CE_API_URL}/companies",
|
|
params={"search": name, "limit": 1},
|
|
auth=get_auth(),
|
|
timeout=5
|
|
)
|
|
if res.status_code == 200:
|
|
data = res.json()
|
|
if data.get("items"):
|
|
return data["items"][0] # Return first match
|
|
except Exception as e:
|
|
print(f"❌ CE API Error (Find '{name}'): {e}")
|
|
return None
|
|
|
|
def create_company(lead_data):
|
|
"""Legt Firma im CE an."""
|
|
payload = {
|
|
"name": lead_data['company_name'],
|
|
"city": lead_data.get('city', ''),
|
|
"country": "DE",
|
|
"website": None
|
|
}
|
|
try:
|
|
res = requests.post(
|
|
f"{CE_API_URL}/companies",
|
|
json=payload,
|
|
auth=get_auth(),
|
|
timeout=5
|
|
)
|
|
if res.status_code == 200:
|
|
return res.json()
|
|
else:
|
|
print(f"❌ Create Failed: {res.status_code} {res.text}")
|
|
except Exception as e:
|
|
print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}")
|
|
return None
|
|
|
|
def trigger_discovery(company_id):
|
|
"""Startet die automatische Webseiten-Suche im CE."""
|
|
try:
|
|
res = requests.post(
|
|
f"{CE_API_URL}/enrich/discover",
|
|
json={"company_id": company_id},
|
|
auth=get_auth(),
|
|
timeout=5
|
|
)
|
|
return res.status_code == 200
|
|
except Exception as e:
|
|
print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}")
|
|
return False
|
|
|
|
def get_ce_status(company_id):
|
|
"""Holt den aktuellen Status einer Firma aus dem CE."""
|
|
try:
|
|
res = requests.get(
|
|
f"{CE_API_URL}/companies/{company_id}",
|
|
auth=get_auth(),
|
|
timeout=5
|
|
)
|
|
if res.status_code == 200:
|
|
return res.json()
|
|
except Exception as e:
|
|
print(f"❌ Get Status Failed (ID {company_id}): {e}")
|
|
return None
|
|
|
|
def get_pitch(company_id):
|
|
"""
|
|
Versucht, einen generierten Pitch abzurufen.
|
|
(Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet)
|
|
"""
|
|
# TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch)
|
|
# For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally
|
|
company = get_ce_status(company_id)
|
|
if company and company.get('industry_ai'):
|
|
industry = company['industry_ai']
|
|
# Simple Template based on industry
|
|
return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..."
|
|
return None
|
|
|
|
# 2. ORCHESTRATION: The Sync Process
|
|
|
|
def process_new_lead(lead):
|
|
"""Phase 1: Identifikation & Anlage"""
|
|
print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})")
|
|
|
|
ce_company = find_company(lead['company_name'])
|
|
|
|
enrichment_info = {
|
|
"ce_status": "unknown",
|
|
"ce_id": None,
|
|
"message": "",
|
|
"last_check": time.time()
|
|
}
|
|
|
|
if ce_company:
|
|
print(f"✅ Company exists in CE (ID: {ce_company['id']})")
|
|
enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched
|
|
enrichment_info["ce_id"] = ce_company['id']
|
|
enrichment_info["ce_data"] = ce_company
|
|
enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})."
|
|
|
|
# If existing company is not enriched yet, trigger discovery too?
|
|
if ce_company.get('status') == 'NEW':
|
|
trigger_discovery(ce_company['id'])
|
|
enrichment_info["message"] += " Triggered Discovery for existing raw account."
|
|
|
|
else:
|
|
print(f"✨ New Company. Creating in CE...")
|
|
city = ""
|
|
# Simple extraction
|
|
if lead['raw_body'] and "Stadt:" in lead['raw_body']:
|
|
try:
|
|
for line in lead['raw_body'].split('\n'):
|
|
if "Stadt:" in line:
|
|
city = line.split("Stadt:")[1].strip()
|
|
break
|
|
except: pass
|
|
|
|
new_company = create_company({
|
|
"company_name": lead['company_name'],
|
|
"city": city
|
|
})
|
|
|
|
if new_company:
|
|
print(f"✅ Created (ID: {new_company['id']}).")
|
|
enrichment_info["ce_status"] = "created"
|
|
enrichment_info["ce_id"] = new_company['id']
|
|
|
|
if trigger_discovery(new_company['id']):
|
|
enrichment_info["message"] = "Created & Discovery started."
|
|
print("🚀 Discovery queued.")
|
|
else:
|
|
enrichment_info["message"] = "Created, but Discovery trigger failed."
|
|
else:
|
|
enrichment_info["message"] = "Failed to create in CE."
|
|
|
|
# Save State
|
|
update_lead_enrichment(lead['id'], enrichment_info, status='synced')
|
|
|
|
def process_synced_lead(lead):
|
|
"""Phase 2: Closing the Loop (Abrufen der Ergebnisse)"""
|
|
enrichment = json.loads(lead['enrichment_data'])
|
|
ce_id = enrichment.get('ce_id')
|
|
|
|
if not ce_id:
|
|
return # Should not happen if status is synced
|
|
|
|
print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})")
|
|
|
|
company = get_ce_status(ce_id)
|
|
if not company:
|
|
return
|
|
|
|
# Check if CE is done
|
|
ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED...
|
|
|
|
if ce_status == 'ENRICHED':
|
|
print(f"✅ Analysis Complete! Fetching Pitch...")
|
|
|
|
# Get Pitch (Mocked or Real)
|
|
pitch = get_pitch(ce_id)
|
|
|
|
if pitch:
|
|
enrichment['pitch'] = pitch
|
|
enrichment['ce_data'] = company # Update with latest data
|
|
enrichment['message'] = "Enrichment complete. Pitch ready."
|
|
|
|
# Save final state
|
|
update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review
|
|
print(f"🎉 Lead is ready for response!")
|
|
else:
|
|
print("⚠️ Enriched, but no pitch generated yet.")
|
|
else:
|
|
print(f"⏳ Still processing in CE (Status: {ce_status})...")
|
|
|
|
def update_lead_enrichment(lead_id, data, status=None):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
if status:
|
|
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
|
|
(json.dumps(data), status, lead_id))
|
|
else:
|
|
c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?',
|
|
(json.dumps(data), lead_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def run_sync():
|
|
leads = get_leads()
|
|
print(f"Scanning {len(leads)} leads...")
|
|
|
|
count = 0
|
|
for lead in leads:
|
|
status = lead['status']
|
|
|
|
# Fallback for old/mock leads or explicit 'new' status
|
|
is_mock = False
|
|
if lead['enrichment_data']:
|
|
try:
|
|
enrichment = json.loads(lead['enrichment_data'])
|
|
is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)'
|
|
except: pass
|
|
|
|
if status == 'new' or not status or is_mock:
|
|
process_new_lead(lead)
|
|
count += 1
|
|
elif status == 'synced':
|
|
process_synced_lead(lead)
|
|
count += 1
|
|
|
|
print(f"Sync cycle finished. Processed {count} leads.")
|
|
|
|
if __name__ == "__main__":
|
|
run_sync()
|