Files
Brancheneinstufung2/market_intel_orchestrator.py
Floke c682bd8576 docs: document deep tech audit and smart grounding
- Updated readme.md with new step 3 audit details.
- Updated market_intel_backend_plan.md with current status and achievements.
- Fixed SerpAPI key file path in orchestrator.
- Documented the transition to direct REST API calls and enhanced terminal UX.
2025-12-21 22:19:11 +00:00

365 lines
14 KiB
Python

import argparse
import json
import os
import sys # Import sys for stderr
import requests
from bs4 import BeautifulSoup
import logging
from datetime import datetime
import re # Für Regex-Operationen
# --- AUTARKES LOGGING SETUP --- #
def create_self_contained_log_filename(mode):
log_dir_path = "/app/Log"
if not os.path.exists(log_dir_path):
os.makedirs(log_dir_path, exist_ok=True)
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
version_str = "orchestrator_v2"
filename = f"{now}_{version_str}_Modus-{mode}.log"
return os.path.join(log_dir_path, filename)
log_filename = create_self_contained_log_filename("market_intel_orchestrator")
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(log_filename, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger(__name__)
# --- END AUTARKES LOGGING SETUP --- #
def load_gemini_api_key(file_path="gemini_api_key.txt"):
try:
with open(file_path, "r") as f:
api_key = f.read().strip()
return api_key
except Exception as e:
logger.critical(f"Fehler beim Laden des Gemini API Keys: {e}")
raise
def load_serp_api_key(file_path="serpapikey.txt"):
"""Lädt den SerpAPI Key. Gibt None zurück, wenn nicht gefunden."""
try:
if os.path.exists(file_path):
with open(file_path, "r") as f:
return f.read().strip()
# Fallback: Versuche Umgebungsvariable
return os.environ.get("SERP_API_KEY")
except Exception as e:
logger.warning(f"Konnte SerpAPI Key nicht laden: {e}")
return None
def get_website_text(url):
logger.info(f"Scraping URL: {url}")
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
tag.decompose()
text = soup.get_text(separator=' ', strip=True)
return text[:15000] # Erhöhtes Limit für besseren Kontext
except Exception as e:
logger.error(f"Scraping failed for {url}: {e}")
return None
def serp_search(query, num_results=3):
"""Führt eine Google-Suche über SerpAPI durch."""
api_key = load_serp_api_key()
if not api_key:
logger.warning("SerpAPI Key fehlt. Suche übersprungen.")
return []
logger.info(f"SerpAPI Suche: {query}")
try:
params = {
"engine": "google",
"q": query,
"api_key": api_key,
"num": num_results,
"hl": "de",
"gl": "de"
}
response = requests.get("https://serpapi.com/search", params=params, timeout=20)
response.raise_for_status()
data = response.json()
results = []
if "organic_results" in data:
for result in data["organic_results"]:
results.append({
"title": result.get("title"),
"link": result.get("link"),
"snippet": result.get("snippet")
})
return results
except Exception as e:
logger.error(f"SerpAPI Fehler: {e}")
return []
def _extract_target_industries_from_context(context_content):
md = context_content
step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE)
if not step2_match: return []
table_lines = []
in_table = False
for line in step2_match.group(0).split('\n'):
if line.strip().startswith('|'):
in_table = True
table_lines.append(line.strip())
elif in_table: break
if len(table_lines) < 3: return []
header = [s.strip() for s in table_lines[0].split('|') if s.strip()]
industry_col = next((h for h in header if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None)
if not industry_col: return []
col_idx = header.index(industry_col)
industries = []
for line in table_lines[2:]:
cells = [s.strip() for s in line.split('|') if s.strip()]
if len(cells) > col_idx: industries.append(cells[col_idx])
return list(set(industries))
def generate_search_strategy(reference_url, context_content):
logger.info(f"Generating strategy for {reference_url}")
api_key = load_gemini_api_key()
target_industries = _extract_target_industries_from_context(context_content)
homepage_text = get_website_text(reference_url)
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"
prompt = f"""
You are a B2B Market Intelligence Architect.
--- STRATEGIC CONTEXT ---
{context_content}
--- EXTRACTED TARGET INDUSTRIES ---
{', '.join(target_industries)}
--- REFERENCE CLIENT HOMEPAGE ---
{homepage_text}
TASK:
1. Create a 1-sentence 'summaryOfOffer'.
2. Define an 'idealCustomerProfile' based on the reference client.
3. Identify 3-5 'signals'.
FOR EACH SIGNAL, you MUST define a 'proofStrategy':
- 'likelySource': Where to find the proof (e.g., "Datenschutz", "Jobs", "Case Studies", "Homepage", "Press").
- 'searchQueryTemplate': A specific Google search query template to find this proof. Use '{{COMPANY}}' as placeholder for the company name.
Example: "site:{{COMPANY}} 'it-leiter' sap" or "{{COMPANY}} nachhaltigkeitsbericht 2024 filetype:pdf".
STRICTLY output only valid JSON:
{{
"summaryOfOffer": "...",
"idealCustomerProfile": "...",
"signals": [
{{
"id": "sig_1",
"name": "...",
"description": "...",
"targetPageKeywords": ["homepage"],
"proofStrategy": {{
"likelySource": "...",
"searchQueryTemplate": "..."
}}
}}
]
}}
"""
payload = {"contents": [{"parts": [{"text": prompt}]}]}
try:
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
res_json = response.json()
text = res_json['candidates'][0]['content']['parts'][0]['text']
if "```json" in text: text = text.split("```json")[1].split("```")[0].strip()
return json.loads(text)
except Exception as e:
logger.error(f"Strategy generation failed: {e}")
return {"error": str(e)}
def identify_competitors(reference_url, target_market, industries, summary_of_offer=None):
logger.info(f"Identifying competitors for {reference_url}")
api_key = load_gemini_api_key()
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"
prompt = f"""
Find 3-5 competitors/lookalikes for the company at {reference_url}.
Offer context: {summary_of_offer}
Target Market: {target_market}
Industries: {', '.join(industries)}
Categorize into 'localCompetitors', 'nationalCompetitors', 'internationalCompetitors'.
Return ONLY JSON.
"""
payload = {"contents": [{"parts": [{"text": prompt}]}]}
try:
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
res_json = response.json()
text = res_json['candidates'][0]['content']['parts'][0]['text']
if "```json" in text: text = text.split("```json")[1].split("```")[0].strip()
return json.loads(text)
except Exception as e:
logger.error(f"Competitor identification failed: {e}")
return {"error": str(e)}
def analyze_company(company_name, strategy, target_market):
logger.info(f"--- STARTING DEEP TECH AUDIT FOR: {company_name} ---")
api_key = load_gemini_api_key()
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"
# 1. Website Finding (SerpAPI fallback to Gemini)
url = None
website_search_results = serp_search(f"{company_name} offizielle Website")
if website_search_results:
url = website_search_results[0].get("link")
logger.info(f"Website via SerpAPI gefunden: {url}")
if not url:
# Fallback: Frage Gemini (Low Confidence)
logger.info("Keine URL via SerpAPI, frage Gemini...")
prompt_url = f"Find the official website URL for '{company_name}' in '{target_market}'. Output ONLY the URL."
try:
res = requests.post(GEMINI_API_URL, json={"contents": [{"parts": [{"text": prompt_url}]}]}, headers={'Content-Type': 'application/json'})
url = res.json()['candidates'][0]['content']['parts'][0]['text'].strip()
except: pass
if not url or not url.startswith("http"):
return {"error": f"Could not find website for {company_name}"}
# 2. Homepage Scraping
homepage_text = get_website_text(url)
if not homepage_text:
return {"error": f"Could not scrape website {url}"}
# 3. Targeted Signal Search (The "Hunter" Phase)
signal_evidence = []
# Firmographics Search
firmographics_results = serp_search(f"{company_name} Umsatz Mitarbeiterzahl 2023")
firmographics_context = "\n".join([f"- {r['snippet']} ({r['link']})" for r in firmographics_results])
# Signal Searches
signals = strategy.get('signals', [])
for signal in signals:
proof_strategy = signal.get('proofStrategy', {})
query_template = proof_strategy.get('searchQueryTemplate')
search_context = ""
if query_template:
# Domain aus URL extrahieren für bessere Queries (z.B. site:firma.de)
domain = url.split("//")[-1].split("/")[0].replace("www.", "")
query = query_template.replace("{{COMPANY}}", company_name).replace("{{domain}}", domain)
logger.info(f"Signal Search '{signal['name']}': {query}")
results = serp_search(query, num_results=3)
if results:
search_context = "\n".join([f" * Snippet: {r['snippet']}\n Source: {r['link']}" for r in results])
if search_context:
signal_evidence.append(f"SIGNAL '{signal['name']}':\n{search_context}")
# 4. Final Analysis & Synthesis (The "Judge" Phase)
evidence_text = "\n\n".join(signal_evidence)
prompt = f"""
You are a B2B Market Intelligence Auditor.
Audit the company '{company_name}' ({url}) based on the collected evidence.
--- STRATEGY (Signals to find) ---
{json.dumps(signals, indent=2)}
--- EVIDENCE SOURCE 1: HOMEPAGE CONTENT ---
{homepage_text[:10000]}
--- EVIDENCE SOURCE 2: FIRMOGRAPHICS SEARCH ---
{firmographics_context}
--- EVIDENCE SOURCE 3: TARGETED SIGNAL SEARCH RESULTS ---
{evidence_text}
----------------------------------
TASK:
1. **Firmographics**: Estimate Revenue and Employees based on Source 1 & 2. Be realistic. Use buckets if unsure.
2. **Status**: Determine 'status' (Bestandskunde, Nutzt Wettbewerber, Greenfield, Unklar).
3. **Evaluate Signals**: For each signal, decide 'value' (Yes/No/Partial).
- **CRITICAL**: You MUST cite your source for the 'proof'.
- If found in Source 3 (Search), write: "Found in job posting/doc: [Snippet]" and include the URL.
- If found in Source 1 (Homepage), write: "On homepage: [Quote]".
- If not found, write: "Not found".
4. **Recommendation**: 1-sentence verdict.
STRICTLY output only JSON:
{{
"companyName": "{company_name}",
"status": "...",
"revenue": "...",
"employees": "...",
"tier": "Tier 1/2/3",
"dynamicAnalysis": {{
"sig_id_from_strategy": {{ "value": "...", "proof": "..." }}
}},
"recommendation": "..."
}}
"""
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"response_mime_type": "application/json"}
}
try:
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
response_data = response.json()
response_text = response_data['candidates'][0]['content']['parts'][0]['text']
if response_text.startswith('```json'):
response_text = response_text.split('```json')[1].split('```')[0].strip()
result = json.loads(response_text)
result['dataSource'] = "Digital Trace Audit (Deep Dive)" # Mark as verified
logger.info(f"Audit für {company_name} erfolgreich abgeschlossen.")
return result
except Exception as e:
logger.error(f"Audit failed for {company_name}: {e}")
return {"error": str(e)}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode", required=True)
parser.add_argument("--reference_url")
parser.add_argument("--context_file")
parser.add_argument("--target_market")
parser.add_argument("--company_name")
parser.add_argument("--strategy_json")
parser.add_argument("--summary_of_offer")
args = parser.parse_args()
if args.mode == "generate_strategy":
with open(args.context_file, "r") as f: context = f.read()
print(json.dumps(generate_search_strategy(args.reference_url, context)))
elif args.mode == "identify_competitors":
industries = []
if args.context_file:
with open(args.context_file, "r") as f: context = f.read()
industries = _extract_target_industries_from_context(context)
print(json.dumps(identify_competitors(args.reference_url, args.target_market, industries, args.summary_of_offer)))
elif args.mode == "analyze_company":
strategy = json.loads(args.strategy_json)
print(json.dumps(analyze_company(args.company_name, strategy, args.target_market)))
if __name__ == "__main__":
main()