import os import sys import re import logging import requests import json from datetime import datetime from dotenv import load_dotenv # Ensure we can import from root directory sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Import db functions and parsers try: from db import insert_lead, init_db from ingest import parse_roboplanet_form, parse_tradingtwins_html, is_free_mail except ImportError: # Fallback for direct execution sys.path.append(os.path.dirname(__file__)) from db import insert_lead, init_db from ingest import parse_roboplanet_form, parse_tradingtwins_html, is_free_mail # Configuration env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '.env')) load_dotenv(dotenv_path=env_path, override=True) CLIENT_ID = os.getenv("INFO_Application_ID") TENANT_ID = os.getenv("INFO_Tenant_ID") CLIENT_SECRET = os.getenv("INFO_Secret") USER_EMAIL = "info@robo-planet.de" # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def get_access_token(): url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token" data = { "client_id": CLIENT_ID, "scope": "https://graph.microsoft.com/.default", "client_secret": CLIENT_SECRET, "grant_type": "client_credentials" } response = requests.post(url, data=data) response.raise_for_status() return response.json().get("access_token") def fetch_new_leads_emails(token, limit=200): url = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}/messages" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } params = { "$top": limit, "$select": "id,subject,receivedDateTime,body", "$orderby": "receivedDateTime desc" } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: logger.error(f"Graph API Error: {response.status_code} - {response.text}") return [] all_msgs = response.json().get("value", []) # Filter client-side for both TradingTwins and Roboplanet contact forms filtered = [m for m in all_msgs if ( "Neue Anfrage zum Thema Roboter" in (m.get('subject') or '') or "Kontaktformular Roboplanet" in (m.get('subject') or '') )] return filtered def process_leads(auto_sync=False): init_db() new_count = 0 try: token = get_access_token() emails = fetch_new_leads_emails(token) # Use the new function logger.info(f"Found {len(emails)} potential lead emails.") for email in emails: subject = email.get('subject') or '' body = email.get('body', {}).get('content', '') received_at_str = email.get('receivedDateTime') received_at = None if received_at_str: try: received_at = datetime.fromisoformat(received_at_str.replace('Z', '+00:00')) except: pass lead_data = {} source_prefix = "unknown" source_display_name = "Unknown" if "Neue Anfrage zum Thema Roboter" in subject: lead_data = parse_tradingtwins_html(body) source_prefix = "tt" source_display_name = "TradingTwins" elif "Kontaktformular Roboplanet" in subject: lead_data = parse_roboplanet_form(body) source_prefix = "rp" source_display_name = "Website-Formular" else: # Should not happen with current filtering, but good for robustness logger.warning(f"Skipping unknown email type: {subject}") continue lead_data['source'] = source_display_name # Add the new source field for the DB lead_data['raw_body'] = body lead_data['received_at'] = received_at # Apply general quality checks (if not already done by parser) if 'is_free_mail' not in lead_data: lead_data['is_free_mail'] = is_free_mail(lead_data.get('email', '')) if 'is_low_quality' not in lead_data: company_name_check = lead_data.get('company', '') # Consider company name '-' as missing/invalid if company_name_check == '-': company_name_check = '' lead_data['is_low_quality'] = lead_data['is_free_mail'] or not company_name_check company_name = lead_data.get('company') if not company_name or company_name == '-': # Fallback: if company name is missing, use contact name as company company_name = lead_data.get('contact') lead_data['company'] = company_name if not company_name: logger.warning(f"Skipping lead due to missing company and contact name: {subject}") continue # Ensure source_id and 'id' for db.py compatibility if not lead_data.get('source_id'): lead_data['source_id'] = f"{source_prefix}_unknown_{int(datetime.now().timestamp())}" lead_data['id'] = lead_data['source_id'] # db.py expects 'id' for source_id column if insert_lead(lead_data): logger.info(f" -> Ingested ({source_prefix}): {company_name}") new_count += 1 if new_count > 0 and auto_sync: logger.info(f"Triggering auto-sync for {new_count} new leads...") from enrich import run_sync run_sync() return new_count except Exception as e: logger.error(f"Error in process_leads: {e}") return 0 if __name__ == "__main__": count = process_leads() print(f"Ingested {count} new leads.")