From dddc92a6c3e38b1b0565e9defe6e4f10f1c38bb4 Mon Sep 17 00:00:00 2001
From: Floke
Date: Mon, 2 Mar 2026 07:58:11 +0000
Subject: [PATCH] [31388f42] Fix ingest: Increase limit to 200, handle
None-Subject, client-side filter
---
lead-engine/trading_twins_ingest.py | 90 ++++++++++++++++-------------
1 file changed, 51 insertions(+), 39 deletions(-)
diff --git a/lead-engine/trading_twins_ingest.py b/lead-engine/trading_twins_ingest.py
index 6e6e5987..ea581fd3 100644
--- a/lead-engine/trading_twins_ingest.py
+++ b/lead-engine/trading_twins_ingest.py
@@ -10,16 +10,18 @@ from dotenv import load_dotenv
# Ensure we can import from root directory
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
-# Import from root modules
+# Import db functions
try:
- from company_explorer_connector import handle_company_workflow
+ from db import insert_lead, init_db
except ImportError:
- # Fallback/Mock for testing if run in isolation without full env
- def handle_company_workflow(company_name):
- return {"status": "mock", "data": {"name": company_name, "id": "mock-id"}}
+ # Fallback for direct execution
+ sys.path.append(os.path.dirname(__file__))
+ from db import insert_lead, init_db
# Configuration
-load_dotenv(override=True)
+env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '.env'))
+load_dotenv(dotenv_path=env_path, override=True)
+
CLIENT_ID = os.getenv("INFO_Application_ID")
TENANT_ID = os.getenv("INFO_Tenant_ID")
CLIENT_SECRET = os.getenv("INFO_Secret")
@@ -41,26 +43,32 @@ def get_access_token():
response.raise_for_status()
return response.json().get("access_token")
-def fetch_tradingtwins_emails(token, limit=20):
+def fetch_tradingtwins_emails(token, limit=200):
url = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}/messages"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
- # Filter for Tradingtwins subject
+
+ # Graph API restriction: 'contains' on subject is often blocked.
+ # Strategy: Fetch metadata + body for last 200 messages and filter client-side.
params = {
"$top": limit,
"$select": "id,subject,receivedDateTime,body",
"$orderby": "receivedDateTime desc"
}
+
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
logger.error(f"Graph API Error: {response.status_code} - {response.text}")
return []
all_msgs = response.json().get("value", [])
- # Filter strictly for the subject pattern we saw
- return [m for m in all_msgs if "Neue Anfrage zum Thema Roboter" in m.get('subject', '')]
+
+ # Filter strictly for the subject pattern locally
+ # Handle case where subject might be None
+ filtered = [m for m in all_msgs if "Neue Anfrage zum Thema Roboter" in (m.get('subject') or '')]
+ return filtered
def parse_tradingtwins_html(html_body):
"""
@@ -72,41 +80,40 @@ def parse_tradingtwins_html(html_body):
# Map label names in HTML to our keys
field_map = {
'Firma': 'company',
- 'Vorname': 'first_name',
- 'Nachname': 'last_name',
+ 'Vorname': 'contact_first', # Key fixed to match ingest.py logic
+ 'Nachname': 'contact_last', # Key fixed to match ingest.py logic
'E-Mail': 'email',
'Rufnummer': 'phone',
- 'Einsatzzweck': 'purpose',
- 'Reinigungs-Fläche': 'area',
+ 'Einsatzzweck': 'purpose', # Specific field
+ 'Reinigungs-Fläche': 'area', # Specific field
'PLZ': 'zip',
'Stadt': 'city',
- 'Lead-ID': 'lead_id'
+ 'Lead-ID': 'source_id' # Mapped to DB column source_id
}
for label, key in field_map.items():
- # Regex explanation:
- # >\s*{label}:\s*
-> Finds the label inside a p tag, ending with colon
- # .*? -> Non-greedy match for table cell closing/opening
- # ]*> -> Finds the start of the value paragraph
- # (.*?) -> Captures the value
- #
-> Ends at closing paragraph tag
pattern = fr'>\s*{re.escape(label)}:\s*.*?]*>(.*?)
'
-
match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE)
if match:
- # Clean up the value (remove HTML tags inside if any, though usually plain text)
raw_val = match.group(1).strip()
- # Remove any link tags if present (e.g. for email/phone)
clean_val = re.sub(r'<[^>]+>', '', raw_val).strip()
data[key] = clean_val
# Composite fields
- if data.get('first_name') and data.get('last_name'):
- data['contact_name'] = f"{data['first_name']} {data['last_name']}"
-
+ if data.get('contact_first') and data.get('contact_last'):
+ data['contact'] = f"{data['contact_first']} {data['contact_last']}"
+
+ # Ensure source_id is present and map to 'id' for db.py compatibility
+ if not data.get('source_id'):
+ data['source_id'] = f"tt_unknown_{int(datetime.now().timestamp())}"
+
+ data['id'] = data['source_id'] # db.py expects 'id' for source_id column
+
return data
def process_leads():
+ init_db()
+ new_count = 0
try:
token = get_access_token()
emails = fetch_tradingtwins_emails(token)
@@ -116,28 +123,33 @@ def process_leads():
body = email.get('body', {}).get('content', '')
lead_data = parse_tradingtwins_html(body)
+ # Add raw body for reference
+ lead_data['raw_body'] = body
+
company_name = lead_data.get('company')
if not company_name or company_name == '-':
- # Fallback if company is empty (sometimes happens with private persons)
- # Use contact name as company name
- company_name = lead_data.get('contact_name')
+ company_name = lead_data.get('contact')
+ lead_data['company'] = company_name
if not company_name:
logger.warning(f"Skipping email {email['id']}: No company or contact name found.")
continue
- logger.info(f"Processing Lead: {company_name} (ID: {lead_data.get('lead_id')})")
+ logger.info(f"Ingesting Lead: {company_name} (ID: {lead_data.get('id')})")
- # Trigger Company Explorer Workflow
- # Note: In a real scenario, we might want to check if we already processed this message ID
- # to avoid duplicates. For now, we rely on the Company Explorer's deduplication.
- logger.info(f" -> Triggering Company Explorer for '{company_name}'...")
- result = handle_company_workflow(company_name)
-
- logger.info(f" -> Result: {result.get('status')} (ID: {result.get('data', {}).get('id')})")
+ # Save to local DB (status=new)
+ if insert_lead(lead_data):
+ logger.info(f" -> Successfully saved to DB.")
+ new_count += 1
+ else:
+ logger.info(f" -> Lead already exists (skipped).")
+
+ return new_count
except Exception as e:
logger.error(f"Error in process_leads: {e}")
+ return 0
if __name__ == "__main__":
- process_leads()
+ count = process_leads()
+ print(f"Ingested {count} new leads.")