[31388f42] Fix ingest: Increase limit to 200, handle None-Subject, client-side filter
This commit is contained in:
@@ -10,16 +10,18 @@ from dotenv import load_dotenv
|
|||||||
# Ensure we can import from root directory
|
# Ensure we can import from root directory
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
|
|
||||||
# Import from root modules
|
# Import db functions
|
||||||
try:
|
try:
|
||||||
from company_explorer_connector import handle_company_workflow
|
from db import insert_lead, init_db
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Fallback/Mock for testing if run in isolation without full env
|
# Fallback for direct execution
|
||||||
def handle_company_workflow(company_name):
|
sys.path.append(os.path.dirname(__file__))
|
||||||
return {"status": "mock", "data": {"name": company_name, "id": "mock-id"}}
|
from db import insert_lead, init_db
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
load_dotenv(override=True)
|
env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '.env'))
|
||||||
|
load_dotenv(dotenv_path=env_path, override=True)
|
||||||
|
|
||||||
CLIENT_ID = os.getenv("INFO_Application_ID")
|
CLIENT_ID = os.getenv("INFO_Application_ID")
|
||||||
TENANT_ID = os.getenv("INFO_Tenant_ID")
|
TENANT_ID = os.getenv("INFO_Tenant_ID")
|
||||||
CLIENT_SECRET = os.getenv("INFO_Secret")
|
CLIENT_SECRET = os.getenv("INFO_Secret")
|
||||||
@@ -41,26 +43,32 @@ def get_access_token():
|
|||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json().get("access_token")
|
return response.json().get("access_token")
|
||||||
|
|
||||||
def fetch_tradingtwins_emails(token, limit=20):
|
def fetch_tradingtwins_emails(token, limit=200):
|
||||||
url = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}/messages"
|
url = f"https://graph.microsoft.com/v1.0/users/{USER_EMAIL}/messages"
|
||||||
headers = {
|
headers = {
|
||||||
"Authorization": f"Bearer {token}",
|
"Authorization": f"Bearer {token}",
|
||||||
"Content-Type": "application/json"
|
"Content-Type": "application/json"
|
||||||
}
|
}
|
||||||
# Filter for Tradingtwins subject
|
|
||||||
|
# Graph API restriction: 'contains' on subject is often blocked.
|
||||||
|
# Strategy: Fetch metadata + body for last 200 messages and filter client-side.
|
||||||
params = {
|
params = {
|
||||||
"$top": limit,
|
"$top": limit,
|
||||||
"$select": "id,subject,receivedDateTime,body",
|
"$select": "id,subject,receivedDateTime,body",
|
||||||
"$orderby": "receivedDateTime desc"
|
"$orderby": "receivedDateTime desc"
|
||||||
}
|
}
|
||||||
|
|
||||||
response = requests.get(url, headers=headers, params=params)
|
response = requests.get(url, headers=headers, params=params)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
logger.error(f"Graph API Error: {response.status_code} - {response.text}")
|
logger.error(f"Graph API Error: {response.status_code} - {response.text}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
all_msgs = response.json().get("value", [])
|
all_msgs = response.json().get("value", [])
|
||||||
# Filter strictly for the subject pattern we saw
|
|
||||||
return [m for m in all_msgs if "Neue Anfrage zum Thema Roboter" in m.get('subject', '')]
|
# Filter strictly for the subject pattern locally
|
||||||
|
# Handle case where subject might be None
|
||||||
|
filtered = [m for m in all_msgs if "Neue Anfrage zum Thema Roboter" in (m.get('subject') or '')]
|
||||||
|
return filtered
|
||||||
|
|
||||||
def parse_tradingtwins_html(html_body):
|
def parse_tradingtwins_html(html_body):
|
||||||
"""
|
"""
|
||||||
@@ -72,41 +80,40 @@ def parse_tradingtwins_html(html_body):
|
|||||||
# Map label names in HTML to our keys
|
# Map label names in HTML to our keys
|
||||||
field_map = {
|
field_map = {
|
||||||
'Firma': 'company',
|
'Firma': 'company',
|
||||||
'Vorname': 'first_name',
|
'Vorname': 'contact_first', # Key fixed to match ingest.py logic
|
||||||
'Nachname': 'last_name',
|
'Nachname': 'contact_last', # Key fixed to match ingest.py logic
|
||||||
'E-Mail': 'email',
|
'E-Mail': 'email',
|
||||||
'Rufnummer': 'phone',
|
'Rufnummer': 'phone',
|
||||||
'Einsatzzweck': 'purpose',
|
'Einsatzzweck': 'purpose', # Specific field
|
||||||
'Reinigungs-Fläche': 'area',
|
'Reinigungs-Fläche': 'area', # Specific field
|
||||||
'PLZ': 'zip',
|
'PLZ': 'zip',
|
||||||
'Stadt': 'city',
|
'Stadt': 'city',
|
||||||
'Lead-ID': 'lead_id'
|
'Lead-ID': 'source_id' # Mapped to DB column source_id
|
||||||
}
|
}
|
||||||
|
|
||||||
for label, key in field_map.items():
|
for label, key in field_map.items():
|
||||||
# Regex explanation:
|
|
||||||
# >\s*{label}:\s*</p> -> Finds the label inside a p tag, ending with colon
|
|
||||||
# .*? -> Non-greedy match for table cell closing/opening
|
|
||||||
# <p[^>]*> -> Finds the start of the value paragraph
|
|
||||||
# (.*?) -> Captures the value
|
|
||||||
# </p> -> Ends at closing paragraph tag
|
|
||||||
pattern = fr'>\s*{re.escape(label)}:\s*</p>.*?<p[^>]*>(.*?)</p>'
|
pattern = fr'>\s*{re.escape(label)}:\s*</p>.*?<p[^>]*>(.*?)</p>'
|
||||||
|
|
||||||
match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE)
|
match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE)
|
||||||
if match:
|
if match:
|
||||||
# Clean up the value (remove HTML tags inside if any, though usually plain text)
|
|
||||||
raw_val = match.group(1).strip()
|
raw_val = match.group(1).strip()
|
||||||
# Remove any link tags if present (e.g. for email/phone)
|
|
||||||
clean_val = re.sub(r'<[^>]+>', '', raw_val).strip()
|
clean_val = re.sub(r'<[^>]+>', '', raw_val).strip()
|
||||||
data[key] = clean_val
|
data[key] = clean_val
|
||||||
|
|
||||||
# Composite fields
|
# Composite fields
|
||||||
if data.get('first_name') and data.get('last_name'):
|
if data.get('contact_first') and data.get('contact_last'):
|
||||||
data['contact_name'] = f"{data['first_name']} {data['last_name']}"
|
data['contact'] = f"{data['contact_first']} {data['contact_last']}"
|
||||||
|
|
||||||
|
# Ensure source_id is present and map to 'id' for db.py compatibility
|
||||||
|
if not data.get('source_id'):
|
||||||
|
data['source_id'] = f"tt_unknown_{int(datetime.now().timestamp())}"
|
||||||
|
|
||||||
|
data['id'] = data['source_id'] # db.py expects 'id' for source_id column
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def process_leads():
|
def process_leads():
|
||||||
|
init_db()
|
||||||
|
new_count = 0
|
||||||
try:
|
try:
|
||||||
token = get_access_token()
|
token = get_access_token()
|
||||||
emails = fetch_tradingtwins_emails(token)
|
emails = fetch_tradingtwins_emails(token)
|
||||||
@@ -116,28 +123,33 @@ def process_leads():
|
|||||||
body = email.get('body', {}).get('content', '')
|
body = email.get('body', {}).get('content', '')
|
||||||
lead_data = parse_tradingtwins_html(body)
|
lead_data = parse_tradingtwins_html(body)
|
||||||
|
|
||||||
|
# Add raw body for reference
|
||||||
|
lead_data['raw_body'] = body
|
||||||
|
|
||||||
company_name = lead_data.get('company')
|
company_name = lead_data.get('company')
|
||||||
if not company_name or company_name == '-':
|
if not company_name or company_name == '-':
|
||||||
# Fallback if company is empty (sometimes happens with private persons)
|
company_name = lead_data.get('contact')
|
||||||
# Use contact name as company name
|
lead_data['company'] = company_name
|
||||||
company_name = lead_data.get('contact_name')
|
|
||||||
|
|
||||||
if not company_name:
|
if not company_name:
|
||||||
logger.warning(f"Skipping email {email['id']}: No company or contact name found.")
|
logger.warning(f"Skipping email {email['id']}: No company or contact name found.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
logger.info(f"Processing Lead: {company_name} (ID: {lead_data.get('lead_id')})")
|
logger.info(f"Ingesting Lead: {company_name} (ID: {lead_data.get('id')})")
|
||||||
|
|
||||||
# Trigger Company Explorer Workflow
|
# Save to local DB (status=new)
|
||||||
# Note: In a real scenario, we might want to check if we already processed this message ID
|
if insert_lead(lead_data):
|
||||||
# to avoid duplicates. For now, we rely on the Company Explorer's deduplication.
|
logger.info(f" -> Successfully saved to DB.")
|
||||||
logger.info(f" -> Triggering Company Explorer for '{company_name}'...")
|
new_count += 1
|
||||||
result = handle_company_workflow(company_name)
|
else:
|
||||||
|
logger.info(f" -> Lead already exists (skipped).")
|
||||||
logger.info(f" -> Result: {result.get('status')} (ID: {result.get('data', {}).get('id')})")
|
|
||||||
|
return new_count
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in process_leads: {e}")
|
logger.error(f"Error in process_leads: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
process_leads()
|
count = process_leads()
|
||||||
|
print(f"Ingested {count} new leads.")
|
||||||
|
|||||||
Reference in New Issue
Block a user