Files
Brancheneinstufung2/gtm_architect_orchestrator.py

432 lines
19 KiB
Python

import argparse
import json
import logging
import re
import sys
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from config import Config
# Append the current directory to sys.path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from helpers import call_gemini_flash
# Configure logging to file
LOG_DIR = "Log_from_docker"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
timestamp = datetime.now().strftime("%Y-%m-%d")
log_file = os.path.join(LOG_DIR, f"{timestamp}_gtm_architect.log")
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
def log_to_stderr(msg):
sys.stderr.write(f"[GTM-ORCHESTRATOR] {msg}\n")
sys.stderr.flush()
# --- SCRAPING HELPER ---
def get_text_from_url(url):
try:
log_to_stderr(f"Scraping URL: {url}")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
# Using html.parser
soup = BeautifulSoup(response.content, 'html.parser')
# Remove noise
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside']):
element.decompose()
# Get text
text = soup.get_text(separator=' ', strip=True)
log_to_stderr(f"Scraping success. Length: {len(text)}")
return text[:30000] # Limit length
except Exception as e:
log_to_stderr(f"Scraping failed: {e}")
logging.warning(f"Could not scrape URL {url}: {e}")
return ""
# --- SYSTEM PROMPTS (Constructed reliably) ---
def get_system_instruction(lang):
if lang == 'de':
return "\n".join([
"# IDENTITY & PURPOSE",
'Du bist die "GTM Architect Engine" für Roboplanet. Deine Aufgabe ist es, für neue technische Produkte (Roboter) eine präzise Go-to-Market-Strategie zu entwickeln.',
"Du handelst nicht als kreativer Werbetexter, sondern als strategischer Analyst. Dein oberstes Ziel ist Product-Market-Fit und operative Umsetzbarkeit.",
"Antworte IMMER auf DEUTSCH.",
"",
"# CONTEXT: THE PARENT COMPANY (WACKLER)",
"Wir sind Teil der Wackler Group, einem großen Facility-Management-Dienstleister.",
'Unsere Strategie ist NICHT "Roboter ersetzen Menschen", sondern "Hybrid-Reinigung":',
"- 80% der Arbeit (monotone Flächenleistung) = Roboter.",
"- 20% der Arbeit (Edge Cases, Winterdienst, Treppen, Grobschmutz) = Manuelle Reinigung durch Wackler.",
"",
"# STRICT ANALYSIS RULES (MUST FOLLOW):",
"1. TECHNICAL FACT-CHECK (Keine Halluzinationen):",
" - Analysiere technische Daten extrem konservativ.",
' - Vakuumsystem = Kein "Winterdienst" (Schnee) und keine "Schwerindustrie" (Metallspäne), außer explizit genannt.',
" - Erfinde keine Features, nur um eine Zielgruppe passend zu machen.",
" ",
"2. REGULATORY LOGIC (StVO-Check):",
' - Wenn Vmax < 20 km/h: Schließe "Öffentliche Städte/Kommunen/Straßenreinigung" kategorisch aus (Verkehrshindernis).',
' - Fokusänderung: Konzentriere dich stattdessen ausschließlich auf "Große, zusammenhängende Privatflächen" (Gated Areas).',
"",
"3. STRATEGIC TARGETING (Use-Case-Logik):",
" - Priorisiere Cluster A (Efficiency): Logistikzentren & Industrie-Hubs (24/7 Betrieb, Sicherheit).",
" - Priorisiere Cluster B (Experience): Shopping Center, Outlets & Freizeitparks (Sauberkeit als Visitenkarte).",
" - Entferne reine E-Commerce-Händler ohne physische Kundenfläche.",
"",
'4. THE "HYBRID SERVICE" LOGIC (RULE 5):',
'Wann immer du ein "Hartes Constraint" oder eine technische Limitierung identifizierst (z.B. "Kein Winterdienst" oder "Kommt nicht in Ecken"), darfst du dies niemals als reines "Nein" stehen lassen.',
'Wende stattdessen die **"Yes, and..." Logik** an:',
' 1. **Identifiziere die Lücke:** (z.B. "Roboter kann bei Schnee nicht fahren").',
' 2. **Fülle die Lücke mit Service:** Schlage explizit vor, diesen Teil durch "Wackler Human Manpower" abzudecken.',
' 3. **Formuliere den USP:** Positioniere das Gesamtpaket als "100% Coverage" (Roboter + Mensch aus einer Hand).'
])
else:
return "\n".join([
"# IDENTITY & PURPOSE",
'You are the "GTM Architect Engine" for Roboplanet. Your task is to develop a precise Go-to-Market strategy for new technical products (robots).',
"You do not act as a creative copywriter, but as a strategic analyst. Your top goal is product-market fit and operational feasibility.",
"ALWAYS respond in ENGLISH.",
"",
"# CONTEXT: THE PARENT COMPANY (WACKLER)",
"We are part of the Wackler Group, a major facility management service provider.",
'Our strategy is NOT "Robots replace humans", but "Hybrid Cleaning":',
"- 80% of work (monotonous area coverage) = Robots.",
"- 20% of work (Edge cases, winter service, stairs, heavy debris) = Manual cleaning by Wackler.",
"",
"# STRICT ANALYSIS RULES (MUST FOLLOW):",
"1. TECHNICAL FACT-CHECK (No Hallucinations):",
" - Analyze technical data extremely conservatively.",
' - Vacuum System = No "Winter Service" (snow) and no "Heavy Industry" (metal shavings), unless explicitly stated.',
" - Do not invent features just to fit a target audience.",
"",
"2. REGULATORY LOGIC (Traffic Regs):",
' - If Vmax < 20 km/h: Categorically exclude "Public Cities/Streets" (traffic obstruction).',
' - Change Focus: Concentrate exclusively on "Large, contiguous private areas" (Gated Areas).',
"",
"3. STRATEGIC TARGETING (Use Case Logic):",
" - Prioritize Cluster A (Efficiency): Logistics Centers & Industrial Hubs (24/7 ops, safety).",
" - Prioritize Cluster B (Experience): Shopping Centers, Outlets & Theme Parks (Cleanliness as a calling card).",
" - Remove pure E-commerce retailers without physical customer areas.",
"",
'4. THE "HYBRID SERVICE" LOGIC (RULE 5):',
'Whenever you identify a "Hard Constraint" or technical limitation (e.g., "No winter service" or "Cannot reach corners"), never let this stand as a simple "No".',
'Instead, apply the **"Yes, and..." logic**:',
' 1. **Identify the gap:** (e.g., "Robot cannot operate in snow").',
' 2. **Fill the gap with service:** Explicitly suggest covering this part with "Wackler Human Manpower".',
' 3. **Formulate the USP:** Position the total package as "100% Coverage" (Robot + Human from a single source).'
])
# --- ORCHESTRATOR LOGIC ---
def analyze_product(product_input, lang):
# 1. Scraping if URL
content = product_input
if re.match(r'^https?://', product_input.strip()):
logging.info(f"Detected URL: {product_input}. Scraping...")
scraped_text = get_text_from_url(product_input.strip())
if scraped_text:
content = scraped_text
logging.info(f"Scraped {len(content)} chars.")
else:
logging.warning("Scraping failed, using URL as input.")
sys_instr = get_system_instruction(lang)
# 1. Extraction
prompt_extract = "\n".join([
"PHASE 1-A: TECHNICAL EXTRACTION",
f'Input Product Description: "{content[:25000]}"',
"",
"Task:",
"1. Extract key technical features (specs, capabilities).",
'2. Derive "Hard Constraints". IMPORTANT: Check Vmax (<20km/h = Private Grounds) and Cleaning Type (Vacuum != Heavy Debris/Snow).',
"3. Create a short raw analysis summary.",
"",
"Output JSON format ONLY:",
"{",
' "features": ["feature1", "feature2"],
' "constraints": ["constraint1", "constraint2"],
' "rawAnalysis": "summary text"',
"}"
])
log_to_stderr("Starting Phase 1-A: Technical Extraction...")
raw_response = call_gemini_flash(prompt_extract, system_instruction=sys_instr, json_mode=True)
try:
data = json.loads(raw_response)
except json.JSONDecodeError:
logging.error(f"Failed to parse Phase 1 JSON: {raw_response}")
return {"features": [], "constraints": [], "rawAnalysis": "Error parsing AI response."}
# 2. Conflict Check
prompt_conflict = "\n".join([
"PHASE 1-B: PORTFOLIO CONFLICT CHECK",
"",
f"New Product Features: {json.dumps(data.get('features'))}",
f"New Product Constraints: {json.dumps(data.get('constraints'))}",
"",
"Existing Portfolio:",
'1. "Indoor Scrubber 50": Indoor cleaning, hard floor, supermarkets.',
'2. "Service Bot Bella": Service/Gastro, indoor, restaurants.',
"",
"Task:",
"Check if the new product overlaps significantly with existing ones (is it just a clone?).",
"",
"Output JSON format ONLY:",
"{",
' "conflictCheck": {',
' "hasConflict": true/false,',
' "details": "explanation",',
' "relatedProduct": "name or null"',
" }
])
log_to_stderr("Starting Phase 1-B: Conflict Check...")
conflict_response = call_gemini_flash(prompt_conflict, system_instruction=sys_instr, json_mode=True)
try:
conflict_data = json.loads(conflict_response)
data.update(conflict_data)
except:
pass # Ignore conflict check error
return data
def discover_icps(phase1_result, lang):
sys_instr = get_system_instruction(lang)
prompt = "\n".join([
"PHASE 2: ICP DISCOVERY & DATA PROXIES",
f"Based on the product features: {json.dumps(phase1_result.get('features'))}",
f"And constraints: {json.dumps(phase1_result.get('constraints'))}",
"",
"Task:",
"1. Negative Selection: Which industries are impossible? (Remember Vmax & Vacuum rules!)",
"2. High Pain: Identify Cluster A (Logistics/Industry) and Cluster B (Shopping/Outlets).",
"3. Data Proxy Generation: How to find them digitally via data traces (e.g. satellite, registries).",
"",
"Output JSON format ONLY:",
"{",
' "icps": [',
' { "name": "Industry Name", "rationale": "Why this is a good fit" }',
" ],
' "dataProxies": [',
' { "target": "Specific criteria", "method": "How to find" }',
" ]
])
log_to_stderr("Starting Phase 2: ICP Discovery...")
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
return json.loads(response)
def hunt_whales(phase2_result, lang):
sys_instr = get_system_instruction(lang)
prompt = "\n".join([
"PHASE 3: WHALE HUNTING",
f"Target ICPs (Industries): {json.dumps(phase2_result.get('icps'))}",
"",
"Task:",
"1. Group 'Whales' (Key Accounts) strictly by the identified ICP industries.",
"2. Identify 3-5 concrete top companies in the DACH market per industry.",
"3. Define Buying Center Roles.",
"",
"Output JSON format ONLY:",
"{",
' "whales": [',
' { "industry": "Name of ICP Industry", "accounts": ["Company A", "Company B"] }',
" ],
' "roles": ["Job Title 1", "Job Title 2"]
])
log_to_stderr("Starting Phase 3: Whale Hunting...")
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
return json.loads(response)
def develop_strategy(phase3_result, phase1_result, lang):
sys_instr = get_system_instruction(lang)
all_accounts = []
for w in phase3_result.get('whales', []):
all_accounts.extend(w.get('accounts', []))
prompt = "\n".join([
"PHASE 4: STRATEGY & ANGLE DEVELOPMENT",
f"Accounts: {json.dumps(all_accounts)}",
f"Product Features: {json.dumps(phase1_result.get('features'))}",
"",
"Task:",
"1. Develop specific 'Angle' per target/industry.",
"2. Consistency Check against Product Matrix.",
'3. **IMPORTANT:** Apply "Hybrid Service Logic" if technical constraints exist!',
"",
"Output JSON format ONLY:",
"{",
' "strategyMatrix": [',
" {",
' "segment": "Target Segment",',
' "painPoint": "Specific Pain",',
' "angle": "Our Marketing Angle",',
' "differentiation": "How it differs"',
" }
])
log_to_stderr("Starting Phase 4: Strategy...")
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
return json.loads(response)
def generate_assets(phase4_result, phase3_result, phase2_result, phase1_result, lang):
sys_instr = get_system_instruction(lang)
prompt = "\n".join([
"PHASE 5: ASSET GENERATION & FINAL REPORT",
"",
"CONTEXT DATA:",
f"- Technical: {json.dumps(phase1_result)}",
f"- ICPs: {json.dumps(phase2_result)}",
f"- Targets (Whales): {json.dumps(phase3_result)}",
f"- Strategy: {json.dumps(phase4_result)}",
"",
"TASK:",
'1. Create a "GTM STRATEGY REPORT" in Markdown.',
"2. Report Structure: Executive Summary, Product Analysis, Target Audience, Target Accounts, Strategy Matrix, Assets.",
'3. Hybrid-Check: Ensure "Hybrid Service Logic" is visible.',
"",
"Output:",
'Return strictly MARKDOWN formatted text. Start with "# GTM STRATEGY REPORT".'
])
# For Phase 5, we expect TEXT (Markdown), not JSON. So json_mode=False.
log_to_stderr("Starting Phase 5: Asset Generation...")
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=False)
# The frontend expects a string here, not a JSON object wrapping it?
return response
def generate_sales_enablement(phase4_result, phase3_result, phase1_result, lang):
sys_instr = get_system_instruction(lang)
prompt = "\n".join([
"PHASE 6: SALES ENABLEMENT & VISUALS",
"",
"CONTEXT:",
f"- Product Features: {json.dumps(phase1_result.get('features'))}",
f"- Accounts (Personas): {json.dumps(phase3_result.get('roles'))}",
f"- Strategy: {json.dumps(phase4_result.get('strategyMatrix'))}",
"",
"TASK:",
"1. Anticipate Friction & Objections.",
"2. Formulate Battlecards.",
"3. Create Visual Prompts.",
"",
"Output JSON format ONLY:",
"{",
' "battlecards": [',
" {",
' "persona": "Role",',
' "objection": "Objection quote",',
' "responseScript": "Response"',
" }
],
' "visualPrompts": [',
" {",
' "title": "Title",',
' "context": "Context",',
' "prompt": "Prompt Code"',
" }
],
])
log_to_stderr("Starting Phase 6: Sales Enablement...")
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
return json.loads(response)
# --- MAIN ---
def main():
log_to_stderr("--- GTM Orchestrator Starting ---")
# --- CRITICAL FIXES FOR API KEY & SCRAPING ---
# 1. Load API keys manually because helpers.py relies on Config class state
try:
Config.load_api_keys()
log_to_stderr("API Keys loaded.")
logging.info("Config.load_api_keys() called successfully.")
except Exception as e:
log_to_stderr(f"CRITICAL: Failed to load API keys: {e}")
logging.critical(f"Failed to load API keys: {e}")
# ---------------------------------------------
parser = argparse.ArgumentParser()
parser.add_argument('--mode', required=True)
parser.add_argument('--data', required=True)
try:
args = parser.parse_args()
data_in = json.loads(args.data)
mode = args.mode
lang = data_in.get('language', 'de')
log_to_stderr(f"Processing mode: {mode} in language: {lang}")
logging.info(f"Processing mode: {mode} in language: {lang}")
result = {}
if mode == 'analyze_product':
product_input = data_in.get('productInput')
result = analyze_product(product_input, lang)
elif mode == 'discover_icps':
phase1_result = data_in.get('phase1Result')
result = discover_icps(phase1_result, lang)
elif mode == 'hunt_whales':
phase2_result = data_in.get('phase2Result')
result = hunt_whales(phase2_result, lang)
elif mode == 'develop_strategy':
phase3_result = data_in.get('phase3Result')
phase1_result = data_in.get('phase1Result')
result = develop_strategy(phase3_result, phase1_result, lang)
elif mode == 'generate_assets':
phase4_result = data_in.get('phase4Result')
phase3_result = data_in.get('phase3Result')
phase2_result = data_in.get('phase2Result')
phase1_result = data_in.get('phase1Result')
# Returns a string (Markdown)
markdown_report = generate_assets(phase4_result, phase3_result, phase2_result, phase1_result, lang)
print(json.dumps(markdown_report))
log_to_stderr("Finished Phase 5. Output sent to stdout.")
return
elif mode == 'generate_sales_enablement':
phase4_result = data_in.get('phase4Result')
phase3_result = data_in.get('phase3Result')
phase1_result = data_in.get('phase1Result')
result = generate_sales_enablement(phase4_result, phase3_result, phase1_result, lang)
else:
logging.error(f"Unknown mode: {mode}")
result = {"error": f"Unknown mode: {mode}"}
print(json.dumps(result))
log_to_stderr("Finished. Output sent to stdout.")
except Exception as e:
log_to_stderr(f"CRITICAL ERROR: {e}")
logging.error(f"Error in orchestrator: {e}", exc_info=True)
# Return error as JSON so server.cjs can handle it gracefully
print(json.dumps({"error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()