import argparse import json import os import sys # Import sys for stderr import requests from bs4 import BeautifulSoup import logging from datetime import datetime import re # Für Regex-Operationen # --- AUTARKES LOGGING SETUP --- # def create_self_contained_log_filename(mode): log_dir_path = "/app/Log" if not os.path.exists(log_dir_path): os.makedirs(log_dir_path, exist_ok=True) now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") version_str = "orchestrator_v2" filename = f"{now}_{version_str}_Modus-{mode}.log" return os.path.join(log_dir_path, filename) log_filename = create_self_contained_log_filename("market_intel_orchestrator") logging.basicConfig( level=logging.DEBUG, format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler(log_filename, mode='a', encoding='utf-8'), logging.StreamHandler(sys.stderr) ] ) logger = logging.getLogger(__name__) # --- END AUTARKES LOGGING SETUP --- # def load_gemini_api_key(file_path="gemini_api_key.txt"): try: with open(file_path, "r") as f: api_key = f.read().strip() return api_key except Exception as e: logger.critical(f"Fehler beim Laden des Gemini API Keys: {e}") raise def load_serp_api_key(file_path="serpapikey.txt"): """Lädt den SerpAPI Key. Gibt None zurück, wenn nicht gefunden.""" try: if os.path.exists(file_path): with open(file_path, "r") as f: return f.read().strip() # Fallback: Versuche Umgebungsvariable return os.environ.get("SERP_API_KEY") except Exception as e: logger.warning(f"Konnte SerpAPI Key nicht laden: {e}") return None def get_website_text(url): logger.info(f"Scraping URL: {url}") try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'lxml') for tag in soup(['script', 'style', 'nav', 'footer', 'header']): tag.decompose() text = soup.get_text(separator=' ', strip=True) return text[:15000] # Erhöhtes Limit für besseren Kontext except Exception as e: logger.error(f"Scraping failed for {url}: {e}") return None def serp_search(query, num_results=3): """Führt eine Google-Suche über SerpAPI durch.""" api_key = load_serp_api_key() if not api_key: logger.warning("SerpAPI Key fehlt. Suche übersprungen.") return [] logger.info(f"SerpAPI Suche: {query}") try: params = { "engine": "google", "q": query, "api_key": api_key, "num": num_results, "hl": "de", "gl": "de" } response = requests.get("https://serpapi.com/search", params=params, timeout=20) response.raise_for_status() data = response.json() results = [] if "organic_results" in data: for result in data["organic_results"]: results.append({ "title": result.get("title"), "link": result.get("link"), "snippet": result.get("snippet") }) return results except Exception as e: logger.error(f"SerpAPI Fehler: {e}") return [] def _extract_target_industries_from_context(context_content): md = context_content step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE) if not step2_match: return [] table_lines = [] in_table = False for line in step2_match.group(0).split('\n'): if line.strip().startswith('|'): in_table = True table_lines.append(line.strip()) elif in_table: break if len(table_lines) < 3: return [] header = [s.strip() for s in table_lines[0].split('|') if s.strip()] industry_col = next((h for h in header if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None) if not industry_col: return [] col_idx = header.index(industry_col) industries = [] for line in table_lines[2:]: cells = [s.strip() for s in line.split('|') if s.strip()] if len(cells) > col_idx: industries.append(cells[col_idx]) return list(set(industries)) def generate_search_strategy(reference_url, context_content): logger.info(f"Generating strategy for {reference_url}") api_key = load_gemini_api_key() target_industries = _extract_target_industries_from_context(context_content) homepage_text = get_website_text(reference_url) GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}" prompt = f""" You are a B2B Market Intelligence Architect. --- STRATEGIC CONTEXT --- {context_content} --- EXTRACTED TARGET INDUSTRIES --- {', '.join(target_industries)} --- REFERENCE CLIENT HOMEPAGE --- {homepage_text} TASK: 1. Create a 1-sentence 'summaryOfOffer'. 2. Define an 'idealCustomerProfile' based on the reference client. 3. Identify 3-5 'signals'. FOR EACH SIGNAL, you MUST define a 'proofStrategy': - 'likelySource': Where to find the proof (e.g., "Datenschutz", "Jobs", "Case Studies", "Homepage", "Press"). - 'searchQueryTemplate': A specific Google search query template to find this proof. Use '{{COMPANY}}' as placeholder for the company name. Example: "site:{{COMPANY}} 'it-leiter' sap" or "{{COMPANY}} nachhaltigkeitsbericht 2024 filetype:pdf". STRICTLY output only valid JSON: {{ "summaryOfOffer": "...", "idealCustomerProfile": "...", "signals": [ {{ "id": "sig_1", "name": "...", "description": "...", "targetPageKeywords": ["homepage"], "proofStrategy": {{ "likelySource": "...", "searchQueryTemplate": "..." }} }} ] }} """ payload = {"contents": [{"parts": [{"text": prompt}]}]} try: response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() res_json = response.json() text = res_json['candidates'][0]['content']['parts'][0]['text'] if "```json" in text: text = text.split("```json")[1].split("```")[0].strip() return json.loads(text) except Exception as e: logger.error(f"Strategy generation failed: {e}") return {"error": str(e)} def identify_competitors(reference_url, target_market, industries, summary_of_offer=None): logger.info(f"Identifying competitors for {reference_url}") api_key = load_gemini_api_key() GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}" prompt = f""" Find 3-5 competitors/lookalikes for the company at {reference_url}. Offer context: {summary_of_offer} Target Market: {target_market} Industries: {', '.join(industries)} Categorize into 'localCompetitors', 'nationalCompetitors', 'internationalCompetitors'. Return ONLY JSON. """ payload = {"contents": [{"parts": [{"text": prompt}]}]} try: response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() res_json = response.json() text = res_json['candidates'][0]['content']['parts'][0]['text'] if "```json" in text: text = text.split("```json")[1].split("```")[0].strip() return json.loads(text) except Exception as e: logger.error(f"Competitor identification failed: {e}") return {"error": str(e)} def analyze_company(company_name, strategy, target_market): logger.info(f"--- STARTING DEEP TECH AUDIT FOR: {company_name} ---") api_key = load_gemini_api_key() GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}" # 1. Website Finding (SerpAPI fallback to Gemini) url = None website_search_results = serp_search(f"{company_name} offizielle Website") if website_search_results: url = website_search_results[0].get("link") logger.info(f"Website via SerpAPI gefunden: {url}") if not url: # Fallback: Frage Gemini (Low Confidence) logger.info("Keine URL via SerpAPI, frage Gemini...") prompt_url = f"Find the official website URL for '{company_name}' in '{target_market}'. Output ONLY the URL." try: res = requests.post(GEMINI_API_URL, json={"contents": [{"parts": [{"text": prompt_url}]}]}, headers={'Content-Type': 'application/json'}) url = res.json()['candidates'][0]['content']['parts'][0]['text'].strip() except: pass if not url or not url.startswith("http"): return {"error": f"Could not find website for {company_name}"} # 2. Homepage Scraping homepage_text = get_website_text(url) if not homepage_text: return {"error": f"Could not scrape website {url}"} # 3. Targeted Signal Search (The "Hunter" Phase) signal_evidence = [] # Firmographics Search firmographics_results = serp_search(f"{company_name} Umsatz Mitarbeiterzahl 2023") firmographics_context = "\n".join([f"- {r['snippet']} ({r['link']})" for r in firmographics_results]) # Signal Searches signals = strategy.get('signals', []) for signal in signals: proof_strategy = signal.get('proofStrategy', {}) query_template = proof_strategy.get('searchQueryTemplate') search_context = "" if query_template: # Domain aus URL extrahieren für bessere Queries (z.B. site:firma.de) domain = url.split("//")[-1].split("/")[0].replace("www.", "") query = query_template.replace("{{COMPANY}}", company_name).replace("{{domain}}", domain) logger.info(f"Signal Search '{signal['name']}': {query}") results = serp_search(query, num_results=3) if results: search_context = "\n".join([f" * Snippet: {r['snippet']}\n Source: {r['link']}" for r in results]) if search_context: signal_evidence.append(f"SIGNAL '{signal['name']}':\n{search_context}") # 4. Final Analysis & Synthesis (The "Judge" Phase) evidence_text = "\n\n".join(signal_evidence) prompt = f""" You are a B2B Market Intelligence Auditor. Audit the company '{company_name}' ({url}) based on the collected evidence. --- STRATEGY (Signals to find) --- {json.dumps(signals, indent=2)} --- EVIDENCE SOURCE 1: HOMEPAGE CONTENT --- {homepage_text[:10000]} --- EVIDENCE SOURCE 2: FIRMOGRAPHICS SEARCH --- {firmographics_context} --- EVIDENCE SOURCE 3: TARGETED SIGNAL SEARCH RESULTS --- {evidence_text} ---------------------------------- TASK: 1. **Firmographics**: Estimate Revenue and Employees based on Source 1 & 2. Be realistic. Use buckets if unsure. 2. **Status**: Determine 'status' (Bestandskunde, Nutzt Wettbewerber, Greenfield, Unklar). 3. **Evaluate Signals**: For each signal, decide 'value' (Yes/No/Partial). - **CRITICAL**: You MUST cite your source for the 'proof'. - If found in Source 3 (Search), write: "Found in job posting/doc: [Snippet]" and include the URL. - If found in Source 1 (Homepage), write: "On homepage: [Quote]". - If not found, write: "Not found". 4. **Recommendation**: 1-sentence verdict. STRICTLY output only JSON: {{ "companyName": "{company_name}", "status": "...", "revenue": "...", "employees": "...", "tier": "Tier 1/2/3", "dynamicAnalysis": {{ "sig_id_from_strategy": {{ "value": "...", "proof": "..." }} }}, "recommendation": "..." }} """ payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"response_mime_type": "application/json"} } try: response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() response_data = response.json() response_text = response_data['candidates'][0]['content']['parts'][0]['text'] if response_text.startswith('```json'): response_text = response_text.split('```json')[1].split('```')[0].strip() result = json.loads(response_text) result['dataSource'] = "Digital Trace Audit (Deep Dive)" # Mark as verified logger.info(f"Audit für {company_name} erfolgreich abgeschlossen.") return result except Exception as e: logger.error(f"Audit failed for {company_name}: {e}") return {"error": str(e)} def main(): parser = argparse.ArgumentParser() parser.add_argument("--mode", required=True) parser.add_argument("--reference_url") parser.add_argument("--context_file") parser.add_argument("--target_market") parser.add_argument("--company_name") parser.add_argument("--strategy_json") parser.add_argument("--summary_of_offer") args = parser.parse_args() if args.mode == "generate_strategy": with open(args.context_file, "r") as f: context = f.read() print(json.dumps(generate_search_strategy(args.reference_url, context))) elif args.mode == "identify_competitors": industries = [] if args.context_file: with open(args.context_file, "r") as f: context = f.read() industries = _extract_target_industries_from_context(context) print(json.dumps(identify_competitors(args.reference_url, args.target_market, industries, args.summary_of_offer))) elif args.mode == "analyze_company": strategy = json.loads(args.strategy_json) print(json.dumps(analyze_company(args.company_name, strategy, args.target_market))) if __name__ == "__main__": main()