Files
Brancheneinstufung2/market_intel_orchestrator.py
Floke b1f8f64483 feat(market-intel): implement deep tech audit and industry extraction
- Added  to parse industries from Markdown.
- Added  to find local/national/international lookalikes.
- Added  for deep tech audit (website search, scraping, AI analysis).
- Updated prompt engineering for better results grounding.
2025-12-21 20:59:15 +00:00

457 lines
22 KiB
Python

import argparse
import json
import os
import sys # Import sys for stderr
import requests
from bs4 import BeautifulSoup
import logging
from datetime import datetime
import re # Für Regex-Operationen
# --- AUTARKES LOGGING SETUP --- #
# Dieses Setup ist vollständig selbstständig und benötigt KEINE Imports aus config.py oder helpers.py.
# Es schreibt auf stderr (für Docker Logs) und in eine zeitgestempelte Datei im /app/Log Verzeichnis im Container.
def create_self_contained_log_filename(mode):
"""
Erstellt einen zeitgestempelten Logdateinamen für den Orchestrator.
Verwendet ein festes Log-Verzeichnis innerhalb des Docker-Containers.
"""
log_dir_path = "/app/Log" # Festes Verzeichnis im Container
if not os.path.exists(log_dir_path):
os.makedirs(log_dir_path, exist_ok=True)
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Hartkodierte Version, da Config.VERSION nicht importiert wird, um Abhängigkeiten zu vermeiden
version_str = "orchestrator_v1"
filename = f"{now}_{version_str}_Modus-{mode}.log"
return os.path.join(log_dir_path, filename)
# Logging konfigurieren
log_filename = create_self_contained_log_filename("market_intel_orchestrator")
logging.basicConfig(
level=logging.DEBUG, # Setze Level auf DEBUG, um alle Details zu sehen
format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(log_filename, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr) # WICHTIG: Logs auf stderr schreiben, damit stdout rein für JSON bleibt!
]
)
logger = logging.getLogger(__name__)
logger.info("Autarkes Logging für Market Intelligence Orchestrator konfiguriert (Konsole & Datei).")
logger.info(f"Logdatei: {log_filename}")
# --- END AUTARKES LOGGING SETUP --- #
# Funktion zum Laden des Gemini API Keys
def load_gemini_api_key(file_path="gemini_api_key.txt"):
try:
with open(file_path, "r") as f:
api_key = f.read().strip()
if not api_key:
logger.error("Gemini API Key ist leer. Bitte tragen Sie Ihren Schlüssel in die Datei gemini_api_key.txt ein.")
raise ValueError("Gemini API Key ist leer. Bitte tragen Sie Ihren Schlüssel in die Datei gemini_api_key.txt ein.")
logger.info("Gemini API Key erfolgreich geladen.")
return api_key
except FileNotFoundError:
logger.critical(f"Die Datei {file_path} wurde nicht gefunden. Bitte stellen Sie sicher, dass Ihr Gemini API Key dort hinterlegt ist.")
raise FileNotFoundError(f"Die Datei {file_path} wurde nicht gefunden. Bitte stellen Sie sicher, dass Ihr Gemini API Key dort hinterlegt ist.")
except Exception as e:
logger.critical(f"Fehler beim Laden des Gemini API Keys: {e}")
raise RuntimeError(f"Fehler beim Laden des Gemini API Keys: {e}")
# Funktion zum Scrapen und Bereinigen einer Webseite
def get_website_text(url):
logger.info(f"Starte Web-Scraping für URL: {url}")
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Löst HTTPError für schlechte Antworten (4xx oder 5xx) aus
logger.info(f"Webseite {url} erfolgreich abgerufen (Status: {response.status_code}).")
soup = BeautifulSoup(response.text, 'lxml')
for unwanted_tag in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'noscript']):
unwanted_tag.decompose()
text = soup.get_text(separator=' ', strip=True)
text = text[:8000] # Begrenze auf 8000 Zeichen
logger.info(f"Text von {url} erfolgreich extrahiert und auf {len(text)} Zeichen begrenzt.")
logger.debug(f"Gescrapter Text-Auszug: {text[:500]}...")
return text
except requests.exceptions.RequestException as e:
logger.error(f"Fehler beim Abrufen der Webseite {url}: {e}")
return None
except Exception as e:
logger.error(f"Fehler beim Parsen der Webseite {url}: {e}", exc_info=True)
return None
def _parse_markdown_table(table_text):
"""
Parst eine Markdown-Tabelle in eine Liste von Dictionaries.
Entspricht der n8n-Funktion parseMarkdownTable.
"""
if not table_text: return []
rows = table_text.strip().split('\n')
rows = [re.sub(r'^\||\|$', '', r).strip() for r in rows if r.strip().startswith('|') and r.strip().endswith('|')]
if len(rows) < 2: return [] # Header + mindestens 1 Datenzeile (Separator wird ignoriert)
header = [s.strip() for s in rows[0].split('|') if s.strip()]
data_rows = rows[2:] # Überspringt Header und Separator
parsed_data = []
for r_text in data_rows:
cells = [s.strip() for s in r_text.split('|') if s.strip()]
obj = {}
for i, h in enumerate(header):
obj[h] = cells[i] if i < len(cells) else ''
parsed_data.append(obj)
return parsed_data
def _extract_target_industries_from_context(context_content):
"""
Extrahiert eine Liste von Zielbranchen aus dem Kontext-Dokument (Markdown).
Basierend auf der bereitgestellten n8n-Logik.
"""
logger.info("Starte Extraktion von Zielbranchen aus dem Kontextdokument.")
md = context_content
# 1) Schritt-2-Sektion isolieren (bis zum nächsten "## Schritt" oder Ende)
step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE)
step2 = step2_match.group(0) if step2_match else ''
logger.debug(f"Schritt 2 Sektion gefunden: {bool(step2_match)}")
if not step2:
logger.warning("Keine 'Schritt 2' Sektion im Kontextdokument gefunden.")
return []
# 2) Tabellenblock finden (alle zusammenhängenden Zeilen, die mit | anfangen)
table_lines = []
in_table = False
lines = step2.split('\n')
for line in lines:
l = line.strip()
if l.startswith('|') and l.endswith('|'):
in_table = True
table_lines.append(l)
elif in_table:
break
table_text = '\n'.join(table_lines)
logger.debug(f"Tabellenblock gefunden: {bool(table_text)}")
parsed_rows = _parse_markdown_table(table_text)
logger.debug(f"Geparste Tabellenzeilen: {len(parsed_rows)}")
# 3) Zielspalte finden (robust gg. kleine Variationen)
industries = []
if parsed_rows:
headers = parsed_rows[0].keys() # Nimmt an, dass alle Zeilen gleiche Keys haben
industry_col = next((h for h in headers if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None)
if industry_col:
industries = [r[industry_col].strip() for r in parsed_rows if r.get(industry_col) and r[industry_col].strip()]
industries = list(set(industries)) # Deduplizierung
logger.info(f"Extrahierte Zielbranchen: {industries}")
else:
logger.warning("Keine geeignete Branchenspalte in der Tabelle gefunden.")
return industries
# Hauptfunktion für die Strategiegenerierung
def generate_search_strategy(reference_url, context_content):
logger.info("Starte Strategiegenerierung.")
logger.info(f"Referenz-URL: {reference_url}")
logger.info(f"Kontext-Inhalt Länge: {len(context_content)} Zeichen")
logger.debug(f"Kontext-Inhalt Auszug: {context_content[:500]}...")
api_key = load_gemini_api_key()
# Zielbranchen aus dem Kontextdokument extrahieren
extracted_target_industries = _extract_target_industries_from_context(context_content)
industry_list_for_prompt = "\n List of target industries extracted from the strategic context: " + ", ".join(extracted_target_industries) + "\n Use these as primary categories for any industry-related analysis." if extracted_target_industries else ""
logger.debug(f"Branchenliste für Prompt: {industry_list_for_prompt}")
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"
logger.debug(f"Gemini API URL: {GEMINI_API_URL}")
homepage_text = get_website_text(reference_url)
if homepage_text is None:
logger.error(f"Konnte Webseite für {reference_url} nicht abrufen oder parsen.")
return {"error": f"Could not retrieve or parse homepage text for {reference_url}"}
prompt = f"""
You are a B2B Market Intelligence Architect.
--- STRATEGIC CONTEXT (Uploaded Document) ---
{context_content}
---------------------------------------------
--- REFERENZ-BRANCHENLISTE (aus Upload extrahiert) ---
{industry_list_for_prompt}
---------------------------------------------------
--- REFERENCE CLIENT HOMEPAGE TEXT ---
{homepage_text}
------------------------------------
Reference Client URL: "{reference_url}"
Task: Create a "Digital Trace Strategy" to identify high-potential leads based on the Strategic Context, the **Reference Industry List**, and the **factual content of the Reference Client Homepage Text**.
1. ANALYZE the uploaded context (Offer, Personas, Pain Points).
2. **CRITICAL**: Use the **Reference Industry List** to guide your industry identification for the Ideal Customer Profile.
3. EXTRACT a 1-sentence summary of what is being sold ("summaryOfOffer") from the Strategic Context.
4. DEFINE an Ideal Customer Profile (ICP) derived from the "Target Groups" in the context and what you learned from the Reference Client's homepage. The ICP should include the most relevant industry from the **Reference Industry List**.
5. **CRITICAL**: Identify 3-5 specific "Digital Signals" (Traces) that are **ACTUALLY VISIBLE and demonstrable from the provided Homepage Text** that indicate a match for the Pain Points/Needs defined in the context.
- Use the "Pain Points" and "Offer" from the Strategic Context to derive these signals.
- Signals MUST be directly supported by evidence from the "REFERENCE CLIENT HOMEPAGE TEXT". Do not invent signals that are not verifiable from the text.
- Example: If the context mentions "Pain: High return rates", and the homepage text mentions "easy returns within 14 days", a Signal could be "Mentions detailed return policy".
OUTPUT LANGUAGE: German (Deutsch) for all text fields.
STRICTLY output only a valid JSON object matching this format. DO NOT include any additional text or markdown code blocks (e.g., ```json```).
{{
"summaryOfOffer": "<Short 1-sentence summary of the product/service>",
"idealCustomerProfile": "<Detailed ICP based on context and homepage analysis>",
"signals": [
{{
"id": "sig_1",
"name": "<Short Name (e.g. 'Tech Stack')>",
"description": "<What specifically to look for? (e.g. 'Look for Shopify in source code')>",
"targetPageKeywords": ["homepage"]
}}
]
}}
"""
# Payload für die REST-API erstellen (generationConfig ohne response_mime_type)
payload = {
"contents": [
{
"parts": [
{
"text": prompt
}
]
}
]
}
logger.debug(f"Gesamter Prompt, gesendet an Gemini API:\n{prompt}")
logger.debug(f"Payload für Gemini API: {json.dumps(payload, indent=2)}")
try:
logger.info("Sende Anfrage an Gemini API...")
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status() # Löst einen Fehler für HTTP-Statuscodes 4xx/5xx aus
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
response_data = response.json()
logger.debug(f"Rohe API-Antwort (JSON): {json.dumps(response_data, indent=2)}")
response_text = response_data['candidates'][0]['content']['parts'][0]['text']
logger.debug(f"Extrahierter Text aus API-Antwort: {response_text}")
if response_text.startswith('```json'):
logger.debug("JSON-Antwort im Markdown-Code-Block erkannt. Extrahiere reines JSON.")
response_text = response_text.split('```json')[1].split('```')[0].strip()
strategy = json.loads(response_text)
logger.info("Strategie erfolgreich als JSON geparst.")
logger.info(f"Generierte Strategie: {json.dumps(strategy, indent=2)}")
return strategy
except requests.exceptions.HTTPError as http_err:
error_message = f"HTTP Fehler bei der Gemini API-Anfrage: {http_err}"
logger.error(error_message, exc_info=True)
return {"error": error_message, "response_text": response.text}
except Exception as e:
error_message = f"Fehler bei der Gemini API-Anfrage oder beim Parsen der Antwort: {e}"
logger.error(error_message, exc_info=True)
raw_response_text = ""
try:
raw_response_text = response.text
except:
pass
return {"error": error_message, "response_text": raw_response_text}
def identify_competitors(reference_url, target_market, extracted_industries, reference_city=None, reference_country=None, summary_of_offer=None):
logger.info("Starte Konkurrenten-Identifikation.")
logger.info(f"Referenz-URL: {reference_url}")
logger.info(f"Zielmarkt: {target_market}")
logger.info(f"Extrahierte Industrien: {extracted_industries}")
logger.info(f"Referenz Stadt: {reference_city}, Land: {reference_country}")
logger.info(f"Summary of Offer: {summary_of_offer}")
api_key = load_gemini_api_key()
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"
logger.debug(f"Gemini API URL: {GEMINI_API_URL}")
# Den Prompt für die Konkurrenten-Identifikation erstellen
industries_prompt = f" in der Branche {', '.join(extracted_industries)}" if extracted_industries else ""
city_prompt = f" in der Stadt {reference_city}" if reference_city else ""
country_prompt = f" im Land {reference_country}" if reference_country else ""
offer_prompt = f"\n Offer Summary: {summary_of_offer}" if summary_of_offer else ""
prompt = f"""
You are a B2B Market Intelligence Analyst specializing in competitor analysis.
--- REFERENCE COMPANY CONTEXT ---
Reference URL: {reference_url}
Target Market: {target_market}
Extracted Industries (Target Groups): {', '.join(extracted_industries) if extracted_industries else 'Not specified'}{offer_prompt}
Reference City: {reference_city if reference_city else 'Not specified'}
Reference Country: {reference_country if reference_country else 'Not specified'}
----------------------------------
Task: Identify competitors for the reference company. Categorize them into 'Local', 'National', and 'International'.
**CRITICAL**: Use the 'Offer Summary' (if provided) to understand the company's specific business. The 'Extracted Industries' often represent the TARGET GROUPS/CLIENTS, not necessarily the competitor's own industry. Focus on finding companies that offer SIMILAR PRODUCTS/SERVICES to the reference company.
1. **Local Competitors**: Companies operating in the immediate vicinity or specific region of the reference company, offering similar products/services. Focus on direct geographical overlap.
2. **National Competitors**: Major players operating across the entire country (or relevant large region within the target market), offering comparable products/services. These are the main national rivals.
3. **International Competitors**: Global or large multinational corporations that operate on an international scale and compete with the reference company in its product/service domain.
OUTPUT LANGUAGE: German (Deutsch) for all text fields.
STRICTLY output only a valid JSON object matching this format. DO NOT include any additional text or markdown code blocks (e.g., ```json```).
{{
"localCompetitors": [
{{
"name": "<Competitor Name>",
"url": "<Homepage URL, if available>",
"description": "<1-2 sentences describing their similar offering/market>"
}}
],
"nationalCompetitors": [
{{
"name": "<Competitor Name>",
"url": "<Homepage URL, if available>",
"description": "<1-2 sentences describing their similar offering/market>"
}}
],
"internationalCompetitors": [
{{
"name": "<Competitor Name>",
"url": "<Homepage URL, if available>",
"description": "<1-2 sentences describing their similar offering/market>"
}}
]
}}
"""
payload = {
"contents": [
{
"parts": [
{
"text": prompt
}
]
}
]
}
logger.debug(f"Gesamter Prompt (identify_competitors), gesendet an Gemini API:\n{prompt}")
logger.debug(f"Payload (identify_competitors) für Gemini API: {json.dumps(payload, indent=2)}")
try:
logger.info("Sende Anfrage für Konkurrenten-Identifikation an Gemini API...")
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
logger.info(f"Gemini API-Antwort für Konkurrenten erhalten (Status: {response.status_code}).")
response_data = response.json()
logger.debug(f"Rohe API-Antwort (identify_competitors, JSON): {json.dumps(response_data, indent=2)}")
response_text = response_data['candidates'][0]['content']['parts'][0]['text']
logger.debug(f"Extrahierter Text (identify_competitors) aus API-Antwort: {response_text}")
if response_text.startswith('```json'):
logger.debug("JSON-Antwort im Markdown-Code-Block erkannt. Extrahiere reines JSON.")
response_text = response_text.split('```json')[1].split('```')[0].strip()
competitors_data = json.loads(response_text)
logger.info("Konkurrenten-Daten erfolgreich als JSON geparst.")
logger.info(f"Generierte Konkurrenten: {json.dumps(competitors_data, indent=2)}")
return competitors_data
except requests.exceptions.HTTPError as http_err:
error_message = f"HTTP Fehler bei der Gemini API-Anfrage (identify_competitors): {http_err}"
logger.error(error_message, exc_info=True)
return {"error": error_message, "response_text": response.text}
except Exception as e:
error_message = f"Fehler bei der Gemini API-Anfrage oder beim Parsen der Antwort (identify_competitors): {e}"
logger.error(error_message, exc_info=True)
raw_response_text = ""
try:
raw_response_text = response.text
except:
pass
return {"error": error_message, "response_text": raw_response_text}
# Haupt-CLI-Logik
def main():
logger.info("Starte Market Intelligence Backend Orchestrator.")
parser = argparse.ArgumentParser(description="Market Intelligence Backend Orchestrator.")
parser.add_argument("--mode", required=True, help="Der auszuführende Modus (z.B. generate_strategy, identify_competitors).")
parser.add_argument("--reference_url", help="Die URL des Referenzkunden.")
parser.add_argument("--context_file", help="Pfad zur Datei mit dem Strategie-Dokument.")
parser.add_argument("--target_market", help="Der Zielmarkt (z.B. 'Germany').")
parser.add_argument("--reference_city", help="Die Stadt des Referenzkunden (optional).")
parser.add_argument("--reference_country", help="Das Land des Referenzkunden (optional).")
parser.add_argument("--summary_of_offer", help="Zusammenfassung des Angebots (für Konkurrentensuche).")
args = parser.parse_args()
logger.info(f"Modus: {args.mode}")
context_content = ""
if args.context_file:
try:
with open(args.context_file, "r") as f:
context_content = f.read()
logger.info(f"Kontext-Datei {args.context_file} erfolgreich gelesen.")
except FileNotFoundError:
logger.critical(f"Kontext-Datei nicht gefunden: {args.context_file}")
print(json.dumps({"error": f"Context file not found: {args.context_file}"}))
return
if args.mode == "generate_strategy":
if not args.reference_url or not args.context_file:
logger.error("Für den Modus 'generate_strategy' sind --reference_url und --context_file erforderlich.")
print(json.dumps({"error": "Für den Modus 'generate_strategy' sind --reference_url und --context_file erforderlich."}))
return
result = generate_search_strategy(args.reference_url, context_content)
print(json.dumps(result, indent=2))
elif args.mode == "identify_competitors":
if not args.reference_url or not args.target_market:
logger.error("Für den Modus 'identify_competitors' sind --reference_url und --target_market erforderlich.")
print(json.dumps({"error": "Für den Modus 'identify_competitors' sind --reference_url und --target_market erforderlich."}))
return
# Die Branchen extrahieren wir auch hier, um sie für die Konkurrentensuche zu erden
extracted_industries = _extract_target_industries_from_context(context_content)
result = identify_competitors(
args.reference_url,
args.target_market,
extracted_industries,
args.reference_city,
args.reference_country,
args.summary_of_offer
)
print(json.dumps(result, indent=2))
else:
logger.error(f"Unbekannter Modus: {args.mode}")
print(json.dumps({"error": f"Unbekannter Modus: {args.mode}"}))
if __name__ == "__main__":
main()