import argparse import json import os import sys # Import sys for stderr import requests from bs4 import BeautifulSoup import logging from datetime import datetime import re # Für Regex-Operationen # --- AUTARKES LOGGING SETUP --- # def create_self_contained_log_filename(mode): """ Erstellt einen zeitgestempelten Logdateinamen für den Orchestrator. Verwendet ein festes Log-Verzeichnis innerhalb des Docker-Containers. NEU: Nur eine Datei pro Tag, um Log-Spam zu verhindern. """ log_dir_path = "/app/Log" # Festes Verzeichnis im Container if not os.path.exists(log_dir_path): os.makedirs(log_dir_path, exist_ok=True) # Nur Datum verwenden, nicht Uhrzeit, damit alle Runs des Tages in einer Datei landen date_str = datetime.now().strftime("%Y-%m-%d") filename = f"{date_str}_market_intel.log" return os.path.join(log_dir_path, filename) log_filename = create_self_contained_log_filename("market_intel_orchestrator") logging.basicConfig( level=logging.DEBUG, format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler(log_filename, mode='a', encoding='utf-8'), logging.StreamHandler(sys.stderr) ] ) logger = logging.getLogger(__name__) # --- END AUTARKES LOGGING SETUP --- # def load_gemini_api_key(file_path="gemini_api_key.txt"): try: with open(file_path, "r") as f: api_key = f.read().strip() return api_key except Exception as e: logger.critical(f"Fehler beim Laden des Gemini API Keys: {e}") raise def load_serp_api_key(file_path="serpapikey.txt"): """Lädt den SerpAPI Key. Gibt None zurück, wenn nicht gefunden.""" try: if os.path.exists(file_path): with open(file_path, "r") as f: return f.read().strip() # Fallback: Versuche Umgebungsvariable return os.environ.get("SERP_API_KEY") except Exception as e: logger.warning(f"Konnte SerpAPI Key nicht laden: {e}") return None def get_website_text(url): # Auto-fix missing scheme if url and not url.startswith('http'): url = 'https://' + url logger.info(f"Scraping URL: {url}") try: # Use a more realistic, modern User-Agent to avoid blocking headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9,de;q=0.8', 'Referer': 'https://www.google.com/' } response = requests.get(url, headers=headers, timeout=15) # Increased timeout response.raise_for_status() soup = BeautifulSoup(response.text, 'lxml') for tag in soup(['script', 'style', 'nav', 'footer', 'header']): tag.decompose() text = soup.get_text(separator=' ', strip=True) text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text) return text[:15000] # Increased limit except Exception as e: logger.error(f"Scraping failed for {url}: {e}") return None def serp_search(query, num_results=3): """Führt eine Google-Suche über SerpAPI durch.""" api_key = load_serp_api_key() if not api_key: logger.warning("SerpAPI Key fehlt. Suche übersprungen.") return [] logger.info(f"SerpAPI Suche: {query}") try: params = { "engine": "google", "q": query, "api_key": api_key, "num": num_results, "hl": "de", "gl": "de" } response = requests.get("https://serpapi.com/search", params=params, timeout=20) response.raise_for_status() data = response.json() results = [] if "organic_results" in data: for result in data["organic_results"]: results.append({ "title": result.get("title"), "link": result.get("link"), "snippet": result.get("snippet") }) return results except Exception as e: logger.error(f"SerpAPI Fehler: {e}") return [] def _extract_target_industries_from_context(context_content): md = context_content # Versuche verschiedene Muster für die Tabelle, falls das Format variiert step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE) if not step2_match: # Fallback: Suche nach "Zielbranche" irgendwo im Text match = re.search(r'Zielbranche\s*\|?\s*([^|\n]+)', md, re.IGNORECASE) if match: return [s.strip() for s in match.group(1).split(',')] return [] table_lines = [] in_table = False for line in step2_match.group(0).split('\n'): if line.strip().startswith('|'): in_table = True table_lines.append(line.strip()) elif in_table: break if len(table_lines) < 3: return [] header = [s.strip() for s in table_lines[0].split('|') if s.strip()] industry_col = next((h for h in header if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None) if not industry_col: return [] col_idx = header.index(industry_col) industries = [] for line in table_lines[2:]: cells = [s.strip() for s in line.split('|') if s.strip()] if len(cells) > col_idx: industries.append(cells[col_idx]) return list(set(industries)) def _extract_json_from_text(text): """ Versucht, ein JSON-Objekt aus einem Textstring zu extrahieren, unabhängig von Markdown-Formatierung (```json ... ```). """ try: # 1. Versuch: Direktersatz von Markdown-Tags (falls vorhanden) clean_text = text.replace("```json", "").replace("```", "").strip() return json.loads(clean_text) except json.JSONDecodeError: pass try: # 2. Versuch: Regex Suche nach dem ersten { und letzten } json_match = re.search(r"(\{[\s\S]*\})", text) if json_match: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass logger.error(f"JSON Parsing fehlgeschlagen. Roher Text: {text[:500]}...") return None def generate_search_strategy(reference_url, context_content, language='de'): logger.info(f"Generating strategy for {reference_url} (Language: {language})") api_key = load_gemini_api_key() target_industries = _extract_target_industries_from_context(context_content) homepage_text = get_website_text(reference_url) if not homepage_text: logger.warning(f"Strategy Generation: Could not scrape {reference_url}. Relying on context.") homepage_text = "[WEBSITE ACCESS DENIED] - The strategy must be developed based on the provided STRATEGIC CONTEXT and the URL name alone." # Switch to stable 2.5-pro model (which works for v1beta) GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}" lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH" prompt = f""" You are a B2B Market Intelligence Architect. --- ROLE DEFINITION --- You are working for the company described in the "STRATEGIC CONTEXT" below (The "Hunter"). Your goal is to find new potential customers who look exactly like the "REFERENCE CLIENT" described below (The "Seed" / "Prey"). --- STRATEGIC CONTEXT (YOUR COMPANY / THE OFFER) --- {context_content} --- REFERENCE CLIENT HOMEPAGE (THE IDEAL CUSTOMER TO CLONE) --- URL: {reference_url} CONTENT: {homepage_text[:10000]} --- TASK --- Develop a search strategy to find **Lookalikes of the Reference Client** who would be interested in **Your Company's Offer**. 1. **summaryOfOffer**: A 1-sentence summary of what the **REFERENCE CLIENT** does (NOT what your company does). We need this to search for similar companies. 2. **idealCustomerProfile**: A concise definition of the Ideal Customer Profile (ICP) based on the Reference Client's characteristics. 3. **searchStrategyICP**: A detailed description of the Ideal Customer Profile (ICP) based on the analysis. 4. **digitalSignals**: Identification and description of relevant digital signals that indicate purchase interest or engagement for YOUR offer. 5. **targetPages**: A list of the most important target pages on the company website relevant for marketing and sales activities. 6. **signals**: Identify exactly 4 specific digital signals to check on potential lookalikes. - **CRITICAL**: One signal MUST be "Technographic / Incumbent Search". It must look for existing competitor software or legacy systems that **YOUR COMPANY'S OFFER** replaces or complements. - The other 3 signals should focus on business pains or strategic fit. --- SIGNAL DEFINITION --- For EACH signal, you MUST provide: - `id`: A unique ID (e.g., "sig_1"). - `name`: A short, descriptive name. - `description`: What does this signal indicate? - `targetPageKeywords`: A list of 3-5 keywords to look for on a company's website (e.g., ["career", "jobs"] for a hiring signal). - `proofStrategy`: An object containing: - `likelySource`: Where on the website or web is this info found? (e.g., "Careers Page"). - `searchQueryTemplate`: A Google search query to find this. Use `{{COMPANY}}` as a placeholder for the company name. Example: `site:{{COMPANY}} "software engineer" OR "developer"` --- LANGUAGE INSTRUCTION --- IMPORTANT: The entire JSON content (descriptions, rationale, summaries) MUST be in {lang_instruction}. Translate if necessary. --- OUTPUT FORMAT --- Return ONLY a valid JSON object. {{ "summaryOfOffer": "The Reference Client provides...", "idealCustomerProfile": "...", "searchStrategyICP": "...", "digitalSignals": "...", "targetPages": "...", "signals": [ ... ] }} """ payload = {"contents": [{"parts": [{"text": prompt}]}]} logger.info("Sende Anfrage an Gemini API...") try: response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() res_json = response.json() logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).") text = res_json['candidates'][0]['content']['parts'][0]['text'] # DEBUG LOGGING FOR RAW JSON logger.error(f"RAW GEMINI JSON RESPONSE: {text}") result = _extract_json_from_text(text) if not result: raise ValueError("Konnte kein valides JSON extrahieren") return result except Exception as e: logger.error(f"Strategy generation failed: {e}") # Return fallback to avoid frontend crash return { "summaryOfOffer": "Error generating strategy. Please check logs.", "idealCustomerProfile": "Error generating ICP. Please check logs.", "searchStrategyICP": "Error generating Search Strategy ICP. Please check logs.", "digitalSignals": "Error generating Digital Signals. Please check logs.", "targetPages": "Error generating Target Pages. Please check logs.", "signals": [] } def identify_competitors(reference_url, target_market, industries, summary_of_offer=None, language='de'): logger.info(f"Identifying competitors for {reference_url} (Language: {language})") api_key = load_gemini_api_key() # Switch to stable 2.5-pro model GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}" lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH" prompt = f""" You are a B2B Market Analyst. Find 3-5 direct competitors or highly similar companies (lookalikes) for the company at `{reference_url}`. --- CONTEXT --- - Reference Client Business (What they do): {summary_of_offer} - Target Market: {target_market} - Relevant Industries: {', '.join(industries)} --- TASK --- Identify companies that are **similar to the Reference Client** (i.e., Lookalikes). We are looking for other companies that do the same thing as `{reference_url}`. Categorize them into three groups: 1. 'localCompetitors': Competitors in the same immediate region/city. 2. 'nationalCompetitors': Competitors operating across the same country. 3. 'internationalCompetitors': Global players. For EACH competitor, you MUST provide: - `id`: A unique, URL-friendly identifier (e.g., "competitor-name-gmbh"). - `name`: The official, full name of the company. - `description`: A concise explanation of why they are a competitor. --- LANGUAGE INSTRUCTION --- IMPORTANT: The entire JSON content (descriptions) MUST be in {lang_instruction}. --- OUTPUT FORMAT --- Return ONLY a valid JSON object with the following structure: {{ "localCompetitors": [ {{ "id": "...", "name": "...", "description": "..." }} ], "nationalCompetitors": [ ... ], "internationalCompetitors": [ ... ] }} """ payload = {"contents": [{"parts": [{"text": prompt}]}]} logger.info("Sende Anfrage an Gemini API...") # logger.debug(f"Rohe Gemini API-Anfrage (JSON): {json.dumps(payload, indent=2)}") try: response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() res_json = response.json() logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).") text = res_json['candidates'][0]['content']['parts'][0]['text'] result = _extract_json_from_text(text) if not result: raise ValueError("Konnte kein valides JSON extrahieren") return result except Exception as e: logger.error(f"Competitor identification failed: {e}") return {"localCompetitors": [], "nationalCompetitors": [], "internationalCompetitors": []} def analyze_company(company_name, strategy, target_market, language='de'): logger.info(f"--- STARTING DEEP TECH AUDIT FOR: {company_name} (Language: {language}) ---") api_key = load_gemini_api_key() GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}" lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH" # ... (Rest of function logic remains same, just update prompt) ... # 1. Website Finding (SerpAPI fallback to Gemini) url = None website_search_results = serp_search(f"{company_name} offizielle Website") if website_search_results: url = website_search_results[0].get("link") logger.info(f"Website via SerpAPI gefunden: {url}") if not url: # Fallback: Frage Gemini (Low Confidence) logger.info("Keine URL via SerpAPI, frage Gemini...") prompt_url = f"What is the official homepage URL for the company '{company_name}' in the market '{target_market}'? Respond with ONLY the single, complete URL and nothing else." payload_url = {"contents": [{"parts": [{"text": prompt_url}]}]} logger.info("Sende Anfrage an Gemini API (URL Fallback)...") try: res = requests.post(GEMINI_API_URL, json=payload_url, headers={'Content-Type': 'application/json'}, timeout=15) res.raise_for_status() res_json = res.json() candidate = res_json.get('candidates', [{}])[0] content = candidate.get('content', {}).get('parts', [{}])[0] text_response = content.get('text', '').strip() url_match = re.search(r'(https?://[^\s"]+)', text_response) if url_match: url = url_match.group(1) except Exception as e: logger.error(f"Gemini URL Fallback failed: {e}") pass if not url or not url.startswith("http"): return {"error": f"Could not find website for {company_name}"} homepage_text = "" scraping_note = "" if url and url.startswith("http"): scraped_content = get_website_text(url) if scraped_content: homepage_text = scraped_content else: homepage_text = "[WEBSITE ACCESS DENIED]" scraping_note = "(Website Content Unavailable)" else: homepage_text = "No valid URL found." scraping_note = "(No URL found)" tech_evidence = [] known_incumbents = ["SAP Ariba", "Jaggaer", "Coupa", "SynerTrade", "Ivalua", "ServiceNow", "Salesforce", "Oracle SCM", "Zycus", "GEP", "SupplyOn", "EcoVadis", "IntegrityNext"] half = len(known_incumbents) // 2 group1 = " OR ".join([f'"{inc}"' for inc in known_incumbents[:half]]) group2 = " OR ".join([f'"{inc}"' for inc in known_incumbents[half:]]) tech_queries = [f'"{company_name}" ({group1})', f'"{company_name}" ({group2})', f'"{company_name}" "supplier portal" login'] for q in tech_queries: results = serp_search(q, num_results=4) if results: for r in results: tech_evidence.append(f"- Found: {r['title']}\n Snippet: {r['snippet']}\n Link: {r['link']}") tech_evidence_text = "\n".join(tech_evidence) signal_evidence = [] firmographics_results = serp_search(f"{company_name} Umsatz Mitarbeiterzahl 2023") firmographics_context = "\n".join([f"- {r['snippet']} ({r['link']})" for r in firmographics_results]) signals = strategy.get('signals', []) for signal in signals: if "incumbent" in signal['id'].lower() or "tech" in signal['id'].lower(): continue proof_strategy = signal.get('proofStrategy', {}) query_template = proof_strategy.get('searchQueryTemplate') search_context = "" if query_template: try: domain = url.split("//")[-1].split("/")[0].replace("www.", "") except: domain = "" query = query_template.replace("{{COMPANY}}", company_name).replace("{COMPANY}", company_name).replace("{{domain}}", domain).replace("{domain}", domain) results = serp_search(query, num_results=3) if results: search_context = "\n".join([f" * Snippet: {r['snippet']}\n Source: {r['link']}" for r in results]) if search_context: signal_evidence.append(f"SIGNAL '{signal['name']}':\n{search_context}") evidence_text = "\n\n".join(signal_evidence) prompt = f""" You are a Strategic B2B Sales Consultant. Analyze the company '{company_name}' ({url}) to create a "best-of-breed" sales pitch strategy. --- STRATEGY (What we are looking for) --- {json.dumps(signals, indent=2)} --- EVIDENCE 1: EXTERNAL TECH-STACK INTELLIGENCE (CRITICAL) --- Look closely here for mentions of competitors like SAP Ariba, Jaggaer, SynerTrade, Coupa, etc. {tech_evidence_text} --- EVIDENCE 2: HOMEPAGE CONTENT {scraping_note} --- {homepage_text[:8000]} --- EVIDENCE 3: FIRMOGRAPHICS SEARCH --- {firmographics_context} --- EVIDENCE 4: TARGETED SIGNAL SEARCH RESULTS --- {evidence_text} ---------------------------------- TASK: 1. **Firmographics**: Estimate Revenue and Employees. 2. **Technographic Audit**: Look for specific competitor software or legacy systems mentioned in EVIDENCE 1 (e.g., "Partner of SynerTrade", "Login to Jaggaer Portal"). 3. **Status**: - Set to "Nutzt Wettbewerber" if ANY competitor technology is found. - Set to "Greenfield" ONLY if absolutely no competitor tech is found. - Set to "Bestandskunde" if they already use our solution. 4. **Evaluate Signals**: For each signal, provide a "value" (Yes/No/Partial) and "proof". 5. **Recommendation (Pitch Strategy)**: - If they use a competitor, explain how to position against it. - If Greenfield, explain the entry point. - **Tone**: Strategic, insider-knowledge, specific. --- LANGUAGE INSTRUCTION --- IMPORTANT: The entire JSON content (especially 'recommendation', 'proof', 'value') MUST be in {lang_instruction}. STRICTLY output only JSON: {{ "companyName": "{company_name}", "status": "...", "revenue": "...", "employees": "...", "tier": "Tier 1/2/3", "dynamicAnalysis": {{ "sig_id_from_strategy": {{ "value": "...", "proof": "..." }} }}, "recommendation": "..." }} """ payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"response_mime_type": "application/json"} } try: logger.info("Sende Audit-Anfrage an Gemini API...") response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() response_data = response.json() logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).") text = response_data['candidates'][0]['content']['parts'][0]['text'] result = _extract_json_from_text(text) if not result: raise ValueError("Konnte kein valides JSON extrahieren") result['dataSource'] = "Digital Trace Audit (Deep Dive)" return result except Exception as e: logger.error(f"Audit failed for {company_name}: {e}") return { "companyName": company_name, "status": "Unklar", "revenue": "Error", "employees": "Error", "tier": "Tier 3", "dynamicAnalysis": {}, "recommendation": f"Audit failed: {str(e)}", "dataSource": "Error" } def generate_outreach_campaign(company_data_json, knowledge_base_content, reference_url, specific_role=None, language='de'): """ Erstellt personalisierte E-Mail-Kampagnen. """ company_name = company_data_json.get('companyName', 'Unknown') logger.info(f"--- STARTING OUTREACH GENERATION FOR: {company_name} (Role: {specific_role if specific_role else 'Top 5'}) [Lang: {language}] ---") api_key = load_gemini_api_key() GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}" lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH" if specific_role: # --- MODE B: SINGLE ROLE GENERATION (On Demand) --- task_description = f""" --- TASK --- 1. **Focus**: Create a highly specific 3-step email campaign ONLY for the role: '{specific_role}'. 2. **Analyze**: Use the Audit Facts to find specific hooks for this role. 3. **Draft**: Write the sequence (Opening, Follow-up, Break-up). """ output_format = """ --- OUTPUT FORMAT (Strictly JSON) --- { "target_role": "The requested role", "rationale": "Why this fits...", "emails": [ ... ] } """ else: # --- MODE A: INITIAL START (TOP 1 + SUGGESTIONS) --- task_description = f""" --- TASK --- 1. **Analyze**: Match the Target Company (Input 2) to the most relevant 'Zielbranche/Segment' from the Knowledge Base (Input 1). 2. **Identify Roles**: Identify ALL relevant 'Rollen' (Personas) from the Knowledge Base that fit this company. 3. **Select Best**: Choose the SINGLE most promising role for immediate outreach based on the Audit findings. 4. **Draft Campaign**: Write a 3-step email sequence for this ONE role. 5. **List Others**: List ALL other relevant roles (including the other top candidates) in 'available_roles' so the user can generate them later. """ output_format = """ --- OUTPUT FORMAT (Strictly JSON) --- { "campaigns": [ { "target_role": "Role Name", "rationale": "Why selected...", "emails": [ ... ] } ], "available_roles": [ "Role 2", "Role 3", "Role 4", "Role 5", ... ] } """ prompt = f""" You are a Strategic Key Account Manager and deeply technical Industry Insider. Your goal is to write highly personalized, **operationally specific** outreach emails to the company '{company_name}'. --- INPUT 1: YOUR IDENTITY & STRATEGY (The Sender) --- {knowledge_base_content} --- INPUT 2: THE TARGET COMPANY (Audit Facts) --- {json.dumps(company_data_json, indent=2)} --- INPUT 3: THE REFERENCE CLIENT (Social Proof) --- Reference Client URL: {reference_url} CRITICAL: This 'Reference Client' is an existing happy customer of ours. You MUST mention them by name to establish trust. {task_description} --- TONE & STYLE GUIDELINES (CRITICAL) --- - **Perspective:** Operational Expert & Insider. NOT generic marketing. - **Be Gritty & Specific:** Use hard, operational keywords from the Knowledge Base (e.g., "ASNs", "8D-Reports"). - **Language:** {lang_instruction}. {output_format} """ payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"response_mime_type": "application/json"} } try: logger.info("Sende Campaign-Anfrage an Gemini API...") response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'}) response.raise_for_status() response_data = response.json() logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).") text = response_data['candidates'][0]['content']['parts'][0]['text'] result = _extract_json_from_text(text) if not result: raise ValueError("Konnte kein valides JSON extrahieren") return result except Exception as e: logger.error(f"Campaign generation failed for {company_name}: {e}") return {"error": str(e)} def main(): parser = argparse.ArgumentParser() parser.add_argument("--mode", required=True) parser.add_argument("--reference_url") parser.add_argument("--context_file") parser.add_argument("--target_market") parser.add_argument("--company_name") parser.add_argument("--strategy_json") parser.add_argument("--summary_of_offer") parser.add_argument("--company_data_file") parser.add_argument("--specific_role") parser.add_argument("--language", default="de") # New Argument args = parser.parse_args() if args.mode == "generate_strategy": with open(args.context_file, "r") as f: context = f.read() print(json.dumps(generate_search_strategy(args.reference_url, context, args.language))) elif args.mode == "identify_competitors": industries = [] if args.context_file: with open(args.context_file, "r") as f: context = f.read() industries = _extract_target_industries_from_context(context) print(json.dumps(identify_competitors(args.reference_url, args.target_market, industries, args.summary_of_offer, args.language))) elif args.mode == "analyze_company": strategy = json.loads(args.strategy_json) print(json.dumps(analyze_company(args.company_name, strategy, args.target_market, args.language))) elif args.mode == "generate_outreach": with open(args.company_data_file, "r") as f: company_data = json.load(f) with open(args.context_file, "r") as f: knowledge_base = f.read() print(json.dumps(generate_outreach_campaign(company_data, knowledge_base, args.reference_url, args.specific_role, args.language))) if __name__ == "__main__": sys.stdout.reconfigure(encoding='utf-8') try: main() sys.stdout.flush() except Exception as e: logger.critical(f"Unhandled Exception in Main: {e}", exc_info=True) # Fallback JSON output so the server doesn't crash on parse error error_json = json.dumps({"error": f"Critical Script Error: {str(e)}", "details": "Check market_intel.log"}) print(error_json) sys.exit(1)