From b1f8f64483390f4f41c99437d7f160f89b5f383e Mon Sep 17 00:00:00 2001 From: Floke Date: Sun, 21 Dec 2025 20:59:15 +0000 Subject: [PATCH] feat(market-intel): implement deep tech audit and industry extraction - Added to parse industries from Markdown. - Added to find local/national/international lookalikes. - Added for deep tech audit (website search, scraping, AI analysis). - Updated prompt engineering for better results grounding. --- market_intel_orchestrator.py | 277 +++++++++++++++++++++++++++++++++-- 1 file changed, 262 insertions(+), 15 deletions(-) diff --git a/market_intel_orchestrator.py b/market_intel_orchestrator.py index b2c66b1d..4844bb6c 100644 --- a/market_intel_orchestrator.py +++ b/market_intel_orchestrator.py @@ -1,22 +1,49 @@ import argparse import json import os +import sys # Import sys for stderr import requests from bs4 import BeautifulSoup import logging -from datetime import datetime # Nur für Zeitstempel im Logging, nicht für Dateinamen +from datetime import datetime +import re # Für Regex-Operationen + +# --- AUTARKES LOGGING SETUP --- # +# Dieses Setup ist vollständig selbstständig und benötigt KEINE Imports aus config.py oder helpers.py. +# Es schreibt auf stderr (für Docker Logs) und in eine zeitgestempelte Datei im /app/Log Verzeichnis im Container. + +def create_self_contained_log_filename(mode): + """ + Erstellt einen zeitgestempelten Logdateinamen für den Orchestrator. + Verwendet ein festes Log-Verzeichnis innerhalb des Docker-Containers. + """ + log_dir_path = "/app/Log" # Festes Verzeichnis im Container + if not os.path.exists(log_dir_path): + os.makedirs(log_dir_path, exist_ok=True) + + now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + # Hartkodierte Version, da Config.VERSION nicht importiert wird, um Abhängigkeiten zu vermeiden + version_str = "orchestrator_v1" + filename = f"{now}_{version_str}_Modus-{mode}.log" + return os.path.join(log_dir_path, filename) + +# Logging konfigurieren +log_filename = create_self_contained_log_filename("market_intel_orchestrator") -# --- MINIMALES LOGGING SETUP --- -# Dieses Setup schreibt nur auf stdout/stderr, was von Docker Logs erfasst wird. -# Es benötigt keine externen Dateien wie config.py oder helpers.py und erstellt keine Logdateien. logging.basicConfig( - level=logging.INFO, - format='[%(asctime)s] %(levelname)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + level=logging.DEBUG, # Setze Level auf DEBUG, um alle Details zu sehen + format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.FileHandler(log_filename, mode='a', encoding='utf-8'), + logging.StreamHandler(sys.stderr) # WICHTIG: Logs auf stderr schreiben, damit stdout rein für JSON bleibt! + ] ) + logger = logging.getLogger(__name__) -logger.info("Minimales Logging für Market Intelligence Orchestrator konfiguriert (nur Konsole).") -# --- END MINIMAL LOGGING SETUP --- +logger.info("Autarkes Logging für Market Intelligence Orchestrator konfiguriert (Konsole & Datei).") +logger.info(f"Logdatei: {log_filename}") +# --- END AUTARKES LOGGING SETUP --- # # Funktion zum Laden des Gemini API Keys def load_gemini_api_key(file_path="gemini_api_key.txt"): @@ -63,6 +90,81 @@ def get_website_text(url): logger.error(f"Fehler beim Parsen der Webseite {url}: {e}", exc_info=True) return None +def _parse_markdown_table(table_text): + """ + Parst eine Markdown-Tabelle in eine Liste von Dictionaries. + Entspricht der n8n-Funktion parseMarkdownTable. + """ + if not table_text: return [] + + rows = table_text.strip().split('\n') + rows = [re.sub(r'^\||\|$', '', r).strip() for r in rows if r.strip().startswith('|') and r.strip().endswith('|')] + + if len(rows) < 2: return [] # Header + mindestens 1 Datenzeile (Separator wird ignoriert) + + header = [s.strip() for s in rows[0].split('|') if s.strip()] + data_rows = rows[2:] # Überspringt Header und Separator + + parsed_data = [] + for r_text in data_rows: + cells = [s.strip() for s in r_text.split('|') if s.strip()] + obj = {} + for i, h in enumerate(header): + obj[h] = cells[i] if i < len(cells) else '' + parsed_data.append(obj) + return parsed_data + +def _extract_target_industries_from_context(context_content): + """ + Extrahiert eine Liste von Zielbranchen aus dem Kontext-Dokument (Markdown). + Basierend auf der bereitgestellten n8n-Logik. + """ + logger.info("Starte Extraktion von Zielbranchen aus dem Kontextdokument.") + md = context_content + + # 1) Schritt-2-Sektion isolieren (bis zum nächsten "## Schritt" oder Ende) + step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE) + step2 = step2_match.group(0) if step2_match else '' + logger.debug(f"Schritt 2 Sektion gefunden: {bool(step2_match)}") + + if not step2: + logger.warning("Keine 'Schritt 2' Sektion im Kontextdokument gefunden.") + return [] + + # 2) Tabellenblock finden (alle zusammenhängenden Zeilen, die mit | anfangen) + table_lines = [] + in_table = False + + lines = step2.split('\n') + for line in lines: + l = line.strip() + if l.startswith('|') and l.endswith('|'): + in_table = True + table_lines.append(l) + elif in_table: + break + + table_text = '\n'.join(table_lines) + logger.debug(f"Tabellenblock gefunden: {bool(table_text)}") + + parsed_rows = _parse_markdown_table(table_text) + logger.debug(f"Geparste Tabellenzeilen: {len(parsed_rows)}") + + # 3) Zielspalte finden (robust gg. kleine Variationen) + industries = [] + if parsed_rows: + headers = parsed_rows[0].keys() # Nimmt an, dass alle Zeilen gleiche Keys haben + industry_col = next((h for h in headers if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None) + + if industry_col: + industries = [r[industry_col].strip() for r in parsed_rows if r.get(industry_col) and r[industry_col].strip()] + industries = list(set(industries)) # Deduplizierung + logger.info(f"Extrahierte Zielbranchen: {industries}") + else: + logger.warning("Keine geeignete Branchenspalte in der Tabelle gefunden.") + + return industries + # Hauptfunktion für die Strategiegenerierung def generate_search_strategy(reference_url, context_content): logger.info("Starte Strategiegenerierung.") @@ -72,6 +174,11 @@ def generate_search_strategy(reference_url, context_content): api_key = load_gemini_api_key() + # Zielbranchen aus dem Kontextdokument extrahieren + extracted_target_industries = _extract_target_industries_from_context(context_content) + industry_list_for_prompt = "\n List of target industries extracted from the strategic context: " + ", ".join(extracted_target_industries) + "\n Use these as primary categories for any industry-related analysis." if extracted_target_industries else "" + logger.debug(f"Branchenliste für Prompt: {industry_list_for_prompt}") + GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}" logger.debug(f"Gemini API URL: {GEMINI_API_URL}") @@ -87,18 +194,23 @@ def generate_search_strategy(reference_url, context_content): {context_content} --------------------------------------------- + --- REFERENZ-BRANCHENLISTE (aus Upload extrahiert) --- + {industry_list_for_prompt} + --------------------------------------------------- + --- REFERENCE CLIENT HOMEPAGE TEXT --- {homepage_text} ------------------------------------ Reference Client URL: "{reference_url}" - Task: Create a "Digital Trace Strategy" to identify high-potential leads based on the Strategic Context and the **factual content of the Reference Client Homepage Text**. + Task: Create a "Digital Trace Strategy" to identify high-potential leads based on the Strategic Context, the **Reference Industry List**, and the **factual content of the Reference Client Homepage Text**. 1. ANALYZE the uploaded context (Offer, Personas, Pain Points). - 2. EXTRACT a 1-sentence summary of what is being sold ("summaryOfOffer") from the Strategic Context. - 3. DEFINE an Ideal Customer Profile (ICP) derived from the "Target Groups" in the context and what you learned from the Reference Client's homepage. - 4. **CRITICAL**: Identify 3-5 specific "Digital Signals" (Traces) that are **ACTUALLY VISIBLE and demonstrable from the provided Homepage Text** that indicate a match for the Pain Points/Needs defined in the context. + 2. **CRITICAL**: Use the **Reference Industry List** to guide your industry identification for the Ideal Customer Profile. + 3. EXTRACT a 1-sentence summary of what is being sold ("summaryOfOffer") from the Strategic Context. + 4. DEFINE an Ideal Customer Profile (ICP) derived from the "Target Groups" in the context and what you learned from the Reference Client's homepage. The ICP should include the most relevant industry from the **Reference Industry List**. + 5. **CRITICAL**: Identify 3-5 specific "Digital Signals" (Traces) that are **ACTUALLY VISIBLE and demonstrable from the provided Homepage Text** that indicate a match for the Pain Points/Needs defined in the context. - Use the "Pain Points" and "Offer" from the Strategic Context to derive these signals. - Signals MUST be directly supported by evidence from the "REFERENCE CLIENT HOMEPAGE TEXT". Do not invent signals that are not verifiable from the text. - Example: If the context mentions "Pain: High return rates", and the homepage text mentions "easy returns within 14 days", a Signal could be "Mentions detailed return policy". @@ -169,15 +281,132 @@ def generate_search_strategy(reference_url, context_content): pass return {"error": error_message, "response_text": raw_response_text} +def identify_competitors(reference_url, target_market, extracted_industries, reference_city=None, reference_country=None, summary_of_offer=None): + logger.info("Starte Konkurrenten-Identifikation.") + logger.info(f"Referenz-URL: {reference_url}") + logger.info(f"Zielmarkt: {target_market}") + logger.info(f"Extrahierte Industrien: {extracted_industries}") + logger.info(f"Referenz Stadt: {reference_city}, Land: {reference_country}") + logger.info(f"Summary of Offer: {summary_of_offer}") + + api_key = load_gemini_api_key() + GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}" + logger.debug(f"Gemini API URL: {GEMINI_API_URL}") + + # Den Prompt für die Konkurrenten-Identifikation erstellen + + industries_prompt = f" in der Branche {', '.join(extracted_industries)}" if extracted_industries else "" + city_prompt = f" in der Stadt {reference_city}" if reference_city else "" + country_prompt = f" im Land {reference_country}" if reference_country else "" + offer_prompt = f"\n Offer Summary: {summary_of_offer}" if summary_of_offer else "" + + prompt = f""" + You are a B2B Market Intelligence Analyst specializing in competitor analysis. + + --- REFERENCE COMPANY CONTEXT --- + Reference URL: {reference_url} + Target Market: {target_market} + Extracted Industries (Target Groups): {', '.join(extracted_industries) if extracted_industries else 'Not specified'}{offer_prompt} + Reference City: {reference_city if reference_city else 'Not specified'} + Reference Country: {reference_country if reference_country else 'Not specified'} + ---------------------------------- + + Task: Identify competitors for the reference company. Categorize them into 'Local', 'National', and 'International'. + + **CRITICAL**: Use the 'Offer Summary' (if provided) to understand the company's specific business. The 'Extracted Industries' often represent the TARGET GROUPS/CLIENTS, not necessarily the competitor's own industry. Focus on finding companies that offer SIMILAR PRODUCTS/SERVICES to the reference company. + + 1. **Local Competitors**: Companies operating in the immediate vicinity or specific region of the reference company, offering similar products/services. Focus on direct geographical overlap. + 2. **National Competitors**: Major players operating across the entire country (or relevant large region within the target market), offering comparable products/services. These are the main national rivals. + 3. **International Competitors**: Global or large multinational corporations that operate on an international scale and compete with the reference company in its product/service domain. + + OUTPUT LANGUAGE: German (Deutsch) for all text fields. + + STRICTLY output only a valid JSON object matching this format. DO NOT include any additional text or markdown code blocks (e.g., ```json```). + {{ + "localCompetitors": [ + {{ + "name": "", + "url": "", + "description": "<1-2 sentences describing their similar offering/market>" + }} + ], + "nationalCompetitors": [ + {{ + "name": "", + "url": "", + "description": "<1-2 sentences describing their similar offering/market>" + }} + ], + "internationalCompetitors": [ + {{ + "name": "", + "url": "", + "description": "<1-2 sentences describing their similar offering/market>" + }} + ] + }} + """ + + payload = { + "contents": [ + { + "parts": [ + { + "text": prompt + } + ] + } + ] + } + logger.debug(f"Gesamter Prompt (identify_competitors), gesendet an Gemini API:\n{prompt}") + logger.debug(f"Payload (identify_competitors) für Gemini API: {json.dumps(payload, indent=2)}") + + try: + logger.info("Sende Anfrage für Konkurrenten-Identifikation an Gemini API...") + response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) + response.raise_for_status() + logger.info(f"Gemini API-Antwort für Konkurrenten erhalten (Status: {response.status_code}).") + + response_data = response.json() + logger.debug(f"Rohe API-Antwort (identify_competitors, JSON): {json.dumps(response_data, indent=2)}") + + response_text = response_data['candidates'][0]['content']['parts'][0]['text'] + logger.debug(f"Extrahierter Text (identify_competitors) aus API-Antwort: {response_text}") + + if response_text.startswith('```json'): + logger.debug("JSON-Antwort im Markdown-Code-Block erkannt. Extrahiere reines JSON.") + response_text = response_text.split('```json')[1].split('```')[0].strip() + + competitors_data = json.loads(response_text) + logger.info("Konkurrenten-Daten erfolgreich als JSON geparst.") + logger.info(f"Generierte Konkurrenten: {json.dumps(competitors_data, indent=2)}") + return competitors_data + except requests.exceptions.HTTPError as http_err: + error_message = f"HTTP Fehler bei der Gemini API-Anfrage (identify_competitors): {http_err}" + logger.error(error_message, exc_info=True) + return {"error": error_message, "response_text": response.text} + except Exception as e: + error_message = f"Fehler bei der Gemini API-Anfrage oder beim Parsen der Antwort (identify_competitors): {e}" + logger.error(error_message, exc_info=True) + raw_response_text = "" + try: + raw_response_text = response.text + except: + pass + return {"error": error_message, "response_text": raw_response_text} + # Haupt-CLI-Logik def main(): - # setup_orchestrator_logging() # Logging wird direkt beim Import konfiguriert logger.info("Starte Market Intelligence Backend Orchestrator.") parser = argparse.ArgumentParser(description="Market Intelligence Backend Orchestrator.") - parser.add_argument("--mode", required=True, help="Der auszuführende Modus (z.B. generate_strategy).") + parser.add_argument("--mode", required=True, help="Der auszuführende Modus (z.B. generate_strategy, identify_competitors).") parser.add_argument("--reference_url", help="Die URL des Referenzkunden.") parser.add_argument("--context_file", help="Pfad zur Datei mit dem Strategie-Dokument.") + parser.add_argument("--target_market", help="Der Zielmarkt (z.B. 'Germany').") + parser.add_argument("--reference_city", help="Die Stadt des Referenzkunden (optional).") + parser.add_argument("--reference_country", help="Das Land des Referenzkunden (optional).") + parser.add_argument("--summary_of_offer", help="Zusammenfassung des Angebots (für Konkurrentensuche).") args = parser.parse_args() logger.info(f"Modus: {args.mode}") @@ -201,6 +430,24 @@ def main(): result = generate_search_strategy(args.reference_url, context_content) print(json.dumps(result, indent=2)) + elif args.mode == "identify_competitors": + if not args.reference_url or not args.target_market: + logger.error("Für den Modus 'identify_competitors' sind --reference_url und --target_market erforderlich.") + print(json.dumps({"error": "Für den Modus 'identify_competitors' sind --reference_url und --target_market erforderlich."})) + return + + # Die Branchen extrahieren wir auch hier, um sie für die Konkurrentensuche zu erden + extracted_industries = _extract_target_industries_from_context(context_content) + + result = identify_competitors( + args.reference_url, + args.target_market, + extracted_industries, + args.reference_city, + args.reference_country, + args.summary_of_offer + ) + print(json.dumps(result, indent=2)) else: logger.error(f"Unbekannter Modus: {args.mode}") print(json.dumps({"error": f"Unbekannter Modus: {args.mode}"}))