Files
Brancheneinstufung2/market_intel_orchestrator.py
Floke 8e99891f44 fix: Robustify Market Intel Audit - Fallback when scraping fails
- market_intel_orchestrator.py: Updated analyze_company to NOT abort if homepage scraping fails (e.g. 403 Forbidden). Instead, it sets a placeholder and proceeds using external search signals.

- market_intel_orchestrator.py: Updated get_website_text to use a modern, realistic User-Agent to reduce blocking.

- market_intel_orchestrator.py: Adjusted Gemini prompt to handle missing homepage content gracefully.
2025-12-29 13:21:08 +00:00

410 lines
18 KiB
Python

import argparse
import json
import os
import sys # Import sys for stderr
import requests
from bs4 import BeautifulSoup
import logging
from datetime import datetime
import re # Für Regex-Operationen
# --- AUTARKES LOGGING SETUP --- #
def create_self_contained_log_filename(mode):
"""
Erstellt einen zeitgestempelten Logdateinamen für den Orchestrator.
Verwendet ein festes Log-Verzeichnis innerhalb des Docker-Containers.
NEU: Nur eine Datei pro Tag, um Log-Spam zu verhindern.
"""
log_dir_path = "/app/Log" # Festes Verzeichnis im Container
if not os.path.exists(log_dir_path):
os.makedirs(log_dir_path, exist_ok=True)
# Nur Datum verwenden, nicht Uhrzeit, damit alle Runs des Tages in einer Datei landen
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}_market_intel.log"
return os.path.join(log_dir_path, filename)
log_filename = create_self_contained_log_filename("market_intel_orchestrator")
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(log_filename, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger(__name__)
# --- END AUTARKES LOGGING SETUP --- #
def load_gemini_api_key(file_path="gemini_api_key.txt"):
try:
with open(file_path, "r") as f:
api_key = f.read().strip()
return api_key
except Exception as e:
logger.critical(f"Fehler beim Laden des Gemini API Keys: {e}")
raise
def load_serp_api_key(file_path="serpapikey.txt"):
"""Lädt den SerpAPI Key. Gibt None zurück, wenn nicht gefunden."""
try:
if os.path.exists(file_path):
with open(file_path, "r") as f:
return f.read().strip()
# Fallback: Versuche Umgebungsvariable
return os.environ.get("SERP_API_KEY")
except Exception as e:
logger.warning(f"Konnte SerpAPI Key nicht laden: {e}")
return None
def get_website_text(url):
logger.info(f"Scraping URL: {url}")
try:
# Use a more realistic, modern User-Agent to avoid blocking
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
'Referer': 'https://www.google.com/'
}
response = requests.get(url, headers=headers, timeout=15) # Increased timeout
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
tag.decompose()
text = soup.get_text(separator=' ', strip=True)
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
return text[:15000] # Increased limit
except Exception as e:
logger.error(f"Scraping failed for {url}: {e}")
return None
# ... (omitted parts) ...
def analyze_company(company_name, strategy, target_market):
logger.info(f"--- STARTING DEEP TECH AUDIT FOR: {company_name} ---")
api_key = load_gemini_api_key()
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
# 1. Website Finding (SerpAPI fallback to Gemini)
url = None
website_search_results = serp_search(f"{company_name} offizielle Website")
if website_search_results:
url = website_search_results[0].get("link")
logger.info(f"Website via SerpAPI gefunden: {url}")
if not url:
# Fallback: Frage Gemini
# ... (Gemini URL fallback logic remains same) ...
pass
# 2. Homepage Scraping with GRACEFUL FALLBACK
homepage_text = ""
scraping_note = ""
if url and url.startswith("http"):
scraped_content = get_website_text(url)
if scraped_content:
homepage_text = scraped_content
else:
homepage_text = "[WEBSITE ACCESS DENIED] - The audit must rely on external search signals (Tech Stack, Job Postings, News) as the homepage content is unavailable."
scraping_note = "(Website Content Unavailable - Analysis based on Digital Footprint)"
logger.warning(f"Audit continuing without website content for {company_name}")
else:
homepage_text = "No valid URL found. Analysis based on Name ONLY."
scraping_note = "(No URL found)"
# --- ENHANCED: EXTERNAL TECHNOGRAPHIC INTELLIGENCE ---
# ... (remains same) ...
# Liste bekannter Wettbewerber / Incumbents
known_incumbents = [
"SAP Ariba", "Jaggaer", "Coupa", "SynerTrade", "Ivalua",
"ServiceNow", "Salesforce", "Oracle SCM", "Zycus", "GEP",
"SupplyOn", "EcoVadis", "IntegrityNext"
]
# Suche 1: Direkte Verbindung zu Software-Anbietern (Case Studies, News, etc.)
# Wir bauen eine Query mit OR, um API-Calls zu sparen.
# Splitte in 2 Gruppen, um Query-Länge im Rahmen zu halten
half = len(known_incumbents) // 2
group1 = " OR ".join([f'"{inc}"' for inc in known_incumbents[:half]])
group2 = " OR ".join([f'"{inc}"' for inc in known_incumbents[half:]])
tech_queries = [
f'"{company_name}" ({group1})',
f'"{company_name}" ({group2})',
f'"{company_name}" "supplier portal" login' # Suche nach dem Portal selbst
]
logger.info(f"Starte erweiterte Tech-Stack-Suche für {company_name}...")
for q in tech_queries:
logger.info(f"Tech Search: {q}")
results = serp_search(q, num_results=4) # Etwas mehr Ergebnisse
if results:
for r in results:
tech_evidence.append(f"- Found: {r['title']}\n Snippet: {r['snippet']}\n Link: {r['link']}")
tech_evidence_text = "\n".join(tech_evidence)
# --- END ENHANCED TECH SEARCH ---
# 3. Targeted Signal Search (The "Hunter" Phase) - Basierend auf Strategy
signal_evidence = []
# Firmographics Search
firmographics_results = serp_search(f"{company_name} Umsatz Mitarbeiterzahl 2023")
firmographics_context = "\n".join([f"- {r['snippet']} ({r['link']})" for r in firmographics_results])
# Signal Searches (Original Strategy)
signals = strategy.get('signals', [])
for signal in signals:
# Überspringe Signale, die wir schon durch die Tech-Suche massiv abgedeckt haben,
# es sei denn, sie sind sehr spezifisch.
if "incumbent" in signal['id'].lower() or "tech" in signal['id'].lower():
logger.info(f"Skipping generic signal search '{signal['name']}' in favor of Enhanced Tech Search.")
continue
proof_strategy = signal.get('proofStrategy', {})
query_template = proof_strategy.get('searchQueryTemplate')
search_context = ""
if query_template:
try:
domain = url.split("//")[-1].split("/")[0].replace("www.", "")
except:
domain = ""
query = query_template.replace("{{COMPANY}}", company_name).replace("{COMPANY}", company_name)
query = query.replace("{{domain}}", domain).replace("{domain}", domain)
logger.info(f"Signal Search '{signal['name']}': {query}")
results = serp_search(query, num_results=3)
if results:
search_context = "\n".join([f" * Snippet: {r['snippet']}\n Source: {r['link']}" for r in results])
if search_context:
signal_evidence.append(f"SIGNAL '{signal['name']}':\n{search_context}")
# 4. Final Analysis & Synthesis (The "Judge" Phase)
evidence_text = "\n\n".join(signal_evidence)
prompt = f"""
You are a Strategic B2B Sales Consultant.
Analyze the company '{company_name}' ({url}) to create a "best-of-breed" sales pitch strategy.
--- STRATEGY (What we are looking for) ---
{json.dumps(signals, indent=2)}
--- EVIDENCE 1: EXTERNAL TECH-STACK INTELLIGENCE (CRITICAL) ---
Look closely here for mentions of competitors like SAP Ariba, Jaggaer, SynerTrade, Coupa, etc.
{tech_evidence_text}
--- EVIDENCE 2: HOMEPAGE CONTENT {scraping_note} ---
{homepage_text[:8000]}
--- EVIDENCE 3: FIRMOGRAPHICS SEARCH ---
{firmographics_context}
--- EVIDENCE 4: TARGETED SIGNAL SEARCH RESULTS ---
{evidence_text}
----------------------------------
TASK:
1. **Firmographics**: Estimate Revenue and Employees.
2. **Technographic Audit**: Look for specific competitor software or legacy systems mentioned in EVIDENCE 1 (e.g., "Partner of SynerTrade", "Login to Jaggaer Portal").
3. **Status**:
- Set to "Nutzt Wettbewerber" if ANY competitor technology is found (Ariba, Jaggaer, SynerTrade, Coupa, etc.).
- Set to "Greenfield" ONLY if absolutely no competitor tech is found.
- Set to "Bestandskunde" if they already use our solution.
4. **Evaluate Signals**: For each signal, provide a "value" (Yes/No/Partial) and "proof".
- NOTE: If Homepage Content is unavailable, rely on Evidence 1, 3, and 4.
5. **Recommendation (Pitch Strategy)**:
- DO NOT write a generic verdict.
- If they use a competitor (e.g., Ariba), explain how to position against it (e.g., "Pitch as a specialized add-on for logistics, filling Ariba's gaps").
- If Greenfield, explain the entry point.
- **Tone**: Strategic, insider-knowledge, specific.
STRICTLY output only JSON:
{{
"companyName": "{company_name}",
"status": "...",
"revenue": "...",
"employees": "...",
"tier": "Tier 1/2/3",
"dynamicAnalysis": {{
"sig_id_from_strategy": {{ "value": "...", "proof": "..." }}
}},
"recommendation": "..."
}}
"""
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"response_mime_type": "application/json"}
}
try:
logger.info("Sende Audit-Anfrage an Gemini API...")
# logger.debug(f"Rohe Gemini API-Anfrage (JSON): {json.dumps(payload, indent=2)}")
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
response_data = response.json()
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
text = response_data['candidates'][0]['content']['parts'][0]['text']
result = _extract_json_from_text(text)
if not result:
raise ValueError("Konnte kein valides JSON extrahieren")
result['dataSource'] = "Digital Trace Audit (Deep Dive)"
logger.info(f"Audit für {company_name} erfolgreich abgeschlossen.")
return result
except Exception as e:
logger.error(f"Audit failed for {company_name}: {e}")
return {
"companyName": company_name,
"status": "Unklar / Manuelle Prüfung",
"revenue": "Error",
"employees": "Error",
"tier": "Tier 3",
"dynamicAnalysis": {},
"recommendation": f"Audit failed due to API Error: {str(e)}",
"dataSource": "Error"
}
def generate_outreach_campaign(company_data_json, knowledge_base_content, reference_url):
"""
Erstellt personalisierte E-Mail-Kampagnen basierend auf Audit-Daten und einer strukturierten Wissensdatenbank.
Generiert spezifische Ansprachen für verschiedene Rollen (Personas).
"""
company_name = company_data_json.get('companyName', 'Unknown')
logger.info(f"--- STARTING ROLE-BASED OUTREACH GENERATION FOR: {company_name} ---")
api_key = load_gemini_api_key()
# Switch to stable 2.5-pro model
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
prompt = f"""
You are a Strategic Key Account Manager and deeply technical Industry Insider.
Your goal is to write highly personalized, **operationally specific** outreach emails to the company '{company_name}'.
--- INPUT 1: YOUR IDENTITY & STRATEGY (The Sender) ---
The following Markdown contains your company's identity, products, and strategy.
You act as the sales representative for the company described here:
{knowledge_base_content}
--- INPUT 2: THE TARGET COMPANY (Audit Facts) ---
{json.dumps(company_data_json, indent=2)}
--- INPUT 3: THE REFERENCE CLIENT (Social Proof) ---
Reference Client URL: {reference_url}
CRITICAL: This 'Reference Client' is an existing happy customer of ours. They are the "Seed Company" used to find the Target Company (Lookalike).
You MUST mention this Reference Client by name (derive it from the URL, e.g., 'schindler.com' -> 'Schindler') to establish trust.
--- TASK ---
1. **Analyze**: Match the Target Company (Input 2) to the most relevant 'Zielbranche/Segment' from the Knowledge Base (Input 1).
2. **Select Roles**: Identify the top 2 most distinct and relevant 'Rollen' (Personas) from the Knowledge Base for this specific company situation.
- *Example:* If the audit says they use a competitor (risk of lock-in), select a role like "Strategic Purchaser" or "Head of R&D" who cares about "Second Source".
- *Example:* If they have quality issues or complex logistics, pick "Quality Manager" or "Logistics Head".
3. **Draft Campaigns**: For EACH of the 2 selected roles, write a 3-step email sequence.
--- TONE & STYLE GUIDELINES (CRITICAL) ---
- **Perspective:** Operational Expert & Insider. NOT generic marketing.
- **Be Gritty & Specific:** Do NOT use fluff like "optimize efficiency" or "streamline processes" without context.
- Use **hard, operational keywords** from the Knowledge Base (e.g., "ASNs", "VMI", "8D-Reports", "Maverick Buying", "Bandstillstand", "Sonderfahrten", "PPAP").
- Show you understand their daily pain.
- **Narrative Arc:**
1. "I noticed [Fact from Audit/Tech Stack]..." (e.g., "You rely on PDF orders via Jaggaer...")
2. "In [Industry], this often leads to [Operational Pain]..." (e.g., "missing ASNs causing delays at the hub.")
3. "We helped [Reference Client Name] solve exactly this by [Specific Solution]..."
4. "Let's discuss how to get [Operational Gain] without replacing your ERP."
- **Mandatory Social Proof:** You MUST mention the Reference Client Name (from Input 3) in the email body or footer.
- **Language:** German (as the inputs are German).
--- OUTPUT FORMAT (Strictly JSON) ---
Returns a list of campaigns.
[
{{
"target_role": "Name of the Role (e.g. Leiter F&E)",
"rationale": "Why this role? (e.g. Because the audit found dependency on Competitor X...)",
"emails": [
{{
"subject": "Specific Subject Line",
"body": "Email Body..."
}},
{{
"subject": "Re: Subject",
"body": "Follow-up Body..."
}},
{{
"subject": "Final Check",
"body": "Final Body..."
}}
]
}},
... (Second Role)
]
"""
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"response_mime_type": "application/json"}
}
try:
logger.info("Sende Campaign-Anfrage an Gemini API...")
# logger.debug(f"Rohe Gemini API-Anfrage (JSON): {json.dumps(payload, indent=2)}")
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
response_data = response.json()
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
# logger.debug(f"Rohe API-Antwort (JSON): {json.dumps(response_data, indent=2)}")
text = response_data['candidates'][0]['content']['parts'][0]['text']
result = _extract_json_from_text(text)
if not result:
raise ValueError("Konnte kein valides JSON extrahieren")
return result
except Exception as e:
logger.error(f"Campaign generation failed for {company_name}: {e}")
return [{"error": str(e)}]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode", required=True)
parser.add_argument("--reference_url")
parser.add_argument("--context_file")
parser.add_argument("--target_market")
parser.add_argument("--company_name")
parser.add_argument("--strategy_json")
parser.add_argument("--summary_of_offer")
parser.add_argument("--company_data_file") # For generate_outreach
args = parser.parse_args()
if args.mode == "generate_strategy":
with open(args.context_file, "r") as f: context = f.read()
print(json.dumps(generate_search_strategy(args.reference_url, context)))
elif args.mode == "identify_competitors":
industries = []
if args.context_file:
with open(args.context_file, "r") as f: context = f.read()
industries = _extract_target_industries_from_context(context)
print(json.dumps(identify_competitors(args.reference_url, args.target_market, industries, args.summary_of_offer)))
elif args.mode == "analyze_company":
strategy = json.loads(args.strategy_json)
print(json.dumps(analyze_company(args.company_name, strategy, args.target_market)))
elif args.mode == "generate_outreach":
with open(args.company_data_file, "r") as f: company_data = json.load(f)
with open(args.context_file, "r") as f: knowledge_base = f.read()
print(json.dumps(generate_outreach_campaign(company_data, knowledge_base, args.reference_url)))
if __name__ == "__main__":
main()