This commit integrates the Roboplanet website contact form submissions into the Lead Engine, allowing them to be processed alongside TradingTwins leads.
Key changes:
- **Database Schema Update (db.py):** Added a new source column to the leads table for tracking lead origin (TradingTwins or Website-Formular). Includes a migration check to safely add the column.
- **Improved HTML Parsing (ingest.py):** Refined the `parse_roboplanet_form` function to accurately extract data from the specific HTML structure of Roboplanet contact form emails.
- **Enhanced Ingestion Logic (trading_twins_ingest.py):**
- Renamed `fetch_tradingtwins_emails` to `fetch_new_leads_emails` and updated it to fetch emails from both lead sources.
- Modified `process_leads` to dynamically select the correct parser based on email subject.
- Ensured `source` field is correctly populated and `is_low_quality` checks are applied for both lead types.
- **UI Enhancement (app.py):** Updated the Streamlit UI to visually distinguish lead types with icons and improved the "Low Quality Lead" warning message.
This feature enables a unified processing pipeline for different lead sources and provides better visibility in the Lead Engine dashboard.
220 lines
8.1 KiB
Python
220 lines
8.1 KiB
Python
import os
|
|
import sys
|
|
import re
|
|
import logging
|
|
import requests
|
|
import json
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
# Ensure we can import from root directory
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
# Import db functions and parsers
|
|
try:
|
|
from db import insert_lead, init_db
|
|
from ingest import parse_roboplanet_form, parse_tradingtwins_html, is_free_mail
|
|
except ImportError:
|
|
# Fallback for direct execution
|
|
sys.path.append(os.path.dirname(__file__))
|
|
from db import insert_lead, init_db
|
|
from ingest import parse_roboplanet_form, parse_tradingtwins_html, is_free_mail
|
|
|
|
# Configuration
|
|
env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '.env'))
|
|
load_dotenv(dotenv_path=env_path, override=True)
|
|
|
|
CLIENT_ID = os.getenv("INFO_Application_ID")
|
|
TENANT_ID = os.getenv("INFO_Tenant_ID")
|
|
CLIENT_SECRET = os.getenv("INFO_Secret")
|
|
USER_EMAIL = "info@robo-planet.de"
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def get_access_token():
|
|
url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
|
|
data = {
|
|
"client_id": CLIENT_ID,
|
|
"scope": "https://graph.microsoft.com/.default",
|
|
"client_secret": CLIENT_SECRET,
|
|
"grant_type": "client_credentials"
|
|
}
|
|
response = requests.post(url, data=data)
|
|
response.raise_for_status()
|
|
return response.json().get("access_token")
|
|
|
|
def fetch_new_leads_emails(token, limit=200):
|
|
url = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}/messages"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
params = {
|
|
"$top": limit,
|
|
"$select": "id,subject,receivedDateTime,body",
|
|
"$orderby": "receivedDateTime desc"
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, params=params)
|
|
if response.status_code != 200:
|
|
logger.error(f"Graph API Error: {response.status_code} - {response.text}")
|
|
return []
|
|
|
|
all_msgs = response.json().get("value", [])
|
|
|
|
# Filter client-side for both TradingTwins and Roboplanet contact forms
|
|
filtered = [m for m in all_msgs if (
|
|
"Neue Anfrage zum Thema Roboter" in (m.get('subject') or '') or
|
|
"Kontaktformular Roboplanet" in (m.get('subject') or '')
|
|
)]
|
|
return filtered
|
|
|
|
def is_free_mail(email_addr):
|
|
"""Checks if an email belongs to a known free-mail provider."""
|
|
if not email_addr: return False
|
|
free_domains = {
|
|
'gmail.com', 'googlemail.com', 'outlook.com', 'hotmail.com', 'live.com',
|
|
'msn.com', 'icloud.com', 'me.com', 'mac.com', 'yahoo.com', 'ymail.com',
|
|
'rocketmail.com', 'gmx.de', 'gmx.net', 'web.de', 't-online.de',
|
|
'freenet.de', 'mail.com', 'protonmail.com', 'proton.me', 'online.de'
|
|
}
|
|
domain = email_addr.split('@')[-1].lower()
|
|
return domain in free_domains
|
|
|
|
def parse_tradingtwins_html(html_body):
|
|
"""
|
|
Extracts data from the Tradingtwins HTML table structure.
|
|
Pattern: <p ...>Label:</p>...<p ...>Value</p>
|
|
"""
|
|
data = {}
|
|
|
|
# Map label names in HTML to our keys
|
|
field_map = {
|
|
'Firma': 'company',
|
|
'Vorname': 'contact_first',
|
|
'Nachname': 'contact_last',
|
|
'Anrede': 'salutation',
|
|
'E-Mail': 'email',
|
|
'Rufnummer': 'phone',
|
|
'Einsatzzweck': 'purpose',
|
|
'Reinigungs-Funktionen': 'cleaning_functions',
|
|
'Reinigungs-Fläche': 'area',
|
|
'PLZ': 'zip',
|
|
'Stadt': 'city',
|
|
'Lead-ID': 'source_id'
|
|
}
|
|
|
|
for label, key in field_map.items():
|
|
pattern = fr'>\s*{re.escape(label)}:\s*</p>.*?<p[^>]*>(.*?)</p>'
|
|
match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE)
|
|
if match:
|
|
raw_val = match.group(1).strip()
|
|
clean_val = re.sub(r'<[^>]+>', '', raw_val).strip()
|
|
data[key] = clean_val
|
|
|
|
# Composite fields
|
|
if data.get('contact_first') and data.get('contact_last'):
|
|
data['contact'] = f"{data['contact_first']} {data['contact_last']}"
|
|
|
|
# Quality Check: Free mail or missing company
|
|
email = data.get('email', '')
|
|
company = data.get('company', '-')
|
|
|
|
data['is_free_mail'] = is_free_mail(email)
|
|
data['is_low_quality'] = data['is_free_mail'] or company == '-' or not company
|
|
|
|
# Ensure source_id is present and map to 'id' for db.py compatibility
|
|
if not data.get('source_id'):
|
|
data['source_id'] = f"tt_unknown_{int(datetime.now().timestamp())}"
|
|
|
|
data['id'] = data['source_id'] # db.py expects 'id' for source_id column
|
|
|
|
return data
|
|
|
|
def process_leads(auto_sync=False):
|
|
init_db()
|
|
new_count = 0
|
|
try:
|
|
token = get_access_token()
|
|
emails = fetch_new_leads_emails(token) # Use the new function
|
|
logger.info(f"Found {len(emails)} potential lead emails.")
|
|
|
|
for email in emails:
|
|
subject = email.get('subject') or ''
|
|
body = email.get('body', {}).get('content', '')
|
|
received_at_str = email.get('receivedDateTime')
|
|
|
|
received_at = None
|
|
if received_at_str:
|
|
try:
|
|
received_at = datetime.fromisoformat(received_at_str.replace('Z', '+00:00'))
|
|
except:
|
|
pass
|
|
|
|
lead_data = {}
|
|
source_prefix = "unknown"
|
|
source_display_name = "Unknown"
|
|
|
|
if "Neue Anfrage zum Thema Roboter" in subject:
|
|
lead_data = parse_tradingtwins_html(body)
|
|
source_prefix = "tt"
|
|
source_display_name = "TradingTwins"
|
|
elif "Kontaktformular Roboplanet" in subject:
|
|
lead_data = parse_roboplanet_form(body)
|
|
source_prefix = "rp"
|
|
source_display_name = "Website-Formular"
|
|
else:
|
|
# Should not happen with current filtering, but good for robustness
|
|
logger.warning(f"Skipping unknown email type: {subject}")
|
|
continue
|
|
|
|
lead_data['source'] = source_display_name # Add the new source field for the DB
|
|
lead_data['raw_body'] = body
|
|
lead_data['received_at'] = received_at
|
|
|
|
# Apply general quality checks (if not already done by parser)
|
|
if 'is_free_mail' not in lead_data:
|
|
lead_data['is_free_mail'] = is_free_mail(lead_data.get('email', ''))
|
|
if 'is_low_quality' not in lead_data:
|
|
company_name_check = lead_data.get('company', '')
|
|
# Consider company name '-' as missing/invalid
|
|
if company_name_check == '-': company_name_check = ''
|
|
lead_data['is_low_quality'] = lead_data['is_free_mail'] or not company_name_check
|
|
|
|
company_name = lead_data.get('company')
|
|
if not company_name or company_name == '-':
|
|
# Fallback: if company name is missing, use contact name as company
|
|
company_name = lead_data.get('contact')
|
|
lead_data['company'] = company_name
|
|
|
|
if not company_name:
|
|
logger.warning(f"Skipping lead due to missing company and contact name: {subject}")
|
|
continue
|
|
|
|
# Ensure source_id and 'id' for db.py compatibility
|
|
if not lead_data.get('source_id'):
|
|
lead_data['source_id'] = f"{source_prefix}_unknown_{int(datetime.now().timestamp())}"
|
|
lead_data['id'] = lead_data['source_id'] # db.py expects 'id' for source_id column
|
|
|
|
if insert_lead(lead_data):
|
|
logger.info(f" -> Ingested ({source_prefix}): {company_name}")
|
|
new_count += 1
|
|
|
|
if new_count > 0 and auto_sync:
|
|
logger.info(f"Triggering auto-sync for {new_count} new leads...")
|
|
from enrich import run_sync
|
|
run_sync()
|
|
|
|
return new_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in process_leads: {e}")
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
count = process_leads()
|
|
print(f"Ingested {count} new leads.")
|