diff --git a/lead-engine/trading_twins_ingest.py b/lead-engine/trading_twins_ingest.py index 6e6e5987..ea581fd3 100644 --- a/lead-engine/trading_twins_ingest.py +++ b/lead-engine/trading_twins_ingest.py @@ -10,16 +10,18 @@ from dotenv import load_dotenv # Ensure we can import from root directory sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -# Import from root modules +# Import db functions try: - from company_explorer_connector import handle_company_workflow + from db import insert_lead, init_db except ImportError: - # Fallback/Mock for testing if run in isolation without full env - def handle_company_workflow(company_name): - return {"status": "mock", "data": {"name": company_name, "id": "mock-id"}} + # Fallback for direct execution + sys.path.append(os.path.dirname(__file__)) + from db import insert_lead, init_db # Configuration -load_dotenv(override=True) +env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '.env')) +load_dotenv(dotenv_path=env_path, override=True) + CLIENT_ID = os.getenv("INFO_Application_ID") TENANT_ID = os.getenv("INFO_Tenant_ID") CLIENT_SECRET = os.getenv("INFO_Secret") @@ -41,26 +43,32 @@ def get_access_token(): response.raise_for_status() return response.json().get("access_token") -def fetch_tradingtwins_emails(token, limit=20): +def fetch_tradingtwins_emails(token, limit=200): url = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}/messages" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } - # Filter for Tradingtwins subject + + # Graph API restriction: 'contains' on subject is often blocked. + # Strategy: Fetch metadata + body for last 200 messages and filter client-side. params = { "$top": limit, "$select": "id,subject,receivedDateTime,body", "$orderby": "receivedDateTime desc" } + response = requests.get(url, headers=headers, params=params) if response.status_code != 200: logger.error(f"Graph API Error: {response.status_code} - {response.text}") return [] all_msgs = response.json().get("value", []) - # Filter strictly for the subject pattern we saw - return [m for m in all_msgs if "Neue Anfrage zum Thema Roboter" in m.get('subject', '')] + + # Filter strictly for the subject pattern locally + # Handle case where subject might be None + filtered = [m for m in all_msgs if "Neue Anfrage zum Thema Roboter" in (m.get('subject') or '')] + return filtered def parse_tradingtwins_html(html_body): """ @@ -72,41 +80,40 @@ def parse_tradingtwins_html(html_body): # Map label names in HTML to our keys field_map = { 'Firma': 'company', - 'Vorname': 'first_name', - 'Nachname': 'last_name', + 'Vorname': 'contact_first', # Key fixed to match ingest.py logic + 'Nachname': 'contact_last', # Key fixed to match ingest.py logic 'E-Mail': 'email', 'Rufnummer': 'phone', - 'Einsatzzweck': 'purpose', - 'Reinigungs-Fläche': 'area', + 'Einsatzzweck': 'purpose', # Specific field + 'Reinigungs-Fläche': 'area', # Specific field 'PLZ': 'zip', 'Stadt': 'city', - 'Lead-ID': 'lead_id' + 'Lead-ID': 'source_id' # Mapped to DB column source_id } for label, key in field_map.items(): - # Regex explanation: - # >\s*{label}:\s*
-> Finds the label inside a p tag, ending with colon - # .*? -> Non-greedy match for table cell closing/opening - #]*> -> Finds the start of the value paragraph - # (.*?) -> Captures the value - #
-> Ends at closing paragraph tag pattern = fr'>\s*{re.escape(label)}:\s*.*?]*>(.*?)
' - match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE) if match: - # Clean up the value (remove HTML tags inside if any, though usually plain text) raw_val = match.group(1).strip() - # Remove any link tags if present (e.g. for email/phone) clean_val = re.sub(r'<[^>]+>', '', raw_val).strip() data[key] = clean_val # Composite fields - if data.get('first_name') and data.get('last_name'): - data['contact_name'] = f"{data['first_name']} {data['last_name']}" - + if data.get('contact_first') and data.get('contact_last'): + data['contact'] = f"{data['contact_first']} {data['contact_last']}" + + # Ensure source_id is present and map to 'id' for db.py compatibility + if not data.get('source_id'): + data['source_id'] = f"tt_unknown_{int(datetime.now().timestamp())}" + + data['id'] = data['source_id'] # db.py expects 'id' for source_id column + return data def process_leads(): + init_db() + new_count = 0 try: token = get_access_token() emails = fetch_tradingtwins_emails(token) @@ -116,28 +123,33 @@ def process_leads(): body = email.get('body', {}).get('content', '') lead_data = parse_tradingtwins_html(body) + # Add raw body for reference + lead_data['raw_body'] = body + company_name = lead_data.get('company') if not company_name or company_name == '-': - # Fallback if company is empty (sometimes happens with private persons) - # Use contact name as company name - company_name = lead_data.get('contact_name') + company_name = lead_data.get('contact') + lead_data['company'] = company_name if not company_name: logger.warning(f"Skipping email {email['id']}: No company or contact name found.") continue - logger.info(f"Processing Lead: {company_name} (ID: {lead_data.get('lead_id')})") + logger.info(f"Ingesting Lead: {company_name} (ID: {lead_data.get('id')})") - # Trigger Company Explorer Workflow - # Note: In a real scenario, we might want to check if we already processed this message ID - # to avoid duplicates. For now, we rely on the Company Explorer's deduplication. - logger.info(f" -> Triggering Company Explorer for '{company_name}'...") - result = handle_company_workflow(company_name) - - logger.info(f" -> Result: {result.get('status')} (ID: {result.get('data', {}).get('id')})") + # Save to local DB (status=new) + if insert_lead(lead_data): + logger.info(f" -> Successfully saved to DB.") + new_count += 1 + else: + logger.info(f" -> Lead already exists (skipped).") + + return new_count except Exception as e: logger.error(f"Error in process_leads: {e}") + return 0 if __name__ == "__main__": - process_leads() + count = process_leads() + print(f"Ingested {count} new leads.")