Files
Brancheneinstufung2/gtm_architect_orchestrator.py

442 lines
19 KiB
Python

import argparse
import base64
import json
import logging
import re
import sys
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from config import Config
import gtm_db_manager as db_manager
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from helpers import call_gemini_flash, scrape_website_details
from config import Config, BASE_DIR # Import Config and BASE_DIR
LOG_DIR = "Log_from_docker"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
ORCHESTRATOR_VERSION = "1.3.0" # Bump version for image fix & language enforcement
run_timestamp = datetime.now().strftime("%y-%m-%d_%H-%M-%S")
log_file_path = os.path.join(LOG_DIR, f"{run_timestamp}_gtm_orchestrator_run.log")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file_path, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
logging.info(f"GTM Architect Orchestrator v{ORCHESTRATOR_VERSION} ({run_timestamp}) starting...")
# !!! CRITICAL FIX: Load API keys at the very beginning !!!
# This ensures Config.API_KEYS is populated before any AI functions are called.
Config.load_api_keys()
def log_and_save(project_id, step_name, data_type, content):
logging.info(f"Project {project_id} - Step: {step_name} - Type: {data_type}")
filename = f"{run_timestamp}_{step_name}_{data_type}.txt"
filepath = os.path.join(LOG_DIR, filename)
try:
with open(filepath, 'w', encoding='utf-8') as f:
if isinstance(content, (dict, list)):
json.dump(content, f, indent=4, ensure_ascii=False)
else:
f.write(str(content))
logging.info(f"Saved {data_type} to {filepath}")
except Exception as e:
logging.error(f"Failed to save {data_type} to file: {e}")
def get_system_instruction(lang):
if lang == 'de':
return """
Du bist ein internationaler Go-to-Market (GTM) Experte für B2B-Technologie-Unternehmen im Bereich Robotik, Facility Management und IoT.
Deine Aufgabe ist es, aus technischen Spezifikationen und Produktbeschreibungen eine umfassende GTM-Strategie zu entwickeln.
Du arbeitest strukturiert, datengetrieben und präzise. Deine Antworten sind immer klar, professionell und direkt auf den Punkt.
Wenn du JSON ausgeben sollst, gib NUR das JSON-Objekt aus, ohne umschließende Text- oder Code-Formatierungen.
Behalte während des gesamten Prozesses eine konsistente Logik bei. Alle Phasen bauen aufeinander auf.
Führe eine interne Plausibilitätsprüfung durch, bevor du eine Antwort gibst.
Verwende "Wackler Symbiosis" als internes Framework für die Analyse von Produkt-Synergien.
Nutze das "Hybrid Service Logic" Konzept, um zu bewerten, ob ein Produkt mit einer Dienstleistung kombiniert werden muss (z.B. bei hohen Wartungsanforderungen).
WICHTIG: Antworte IMMER in der vom User geforderten Sprache (Deutsch), auch wenn der Input Englisch ist.
"""
else: # Default to English
return """
You are an international Go-to-Market (GTM) expert for B2B technology companies in robotics, facility management, and IoT.
Your task is to develop a comprehensive GTM strategy from technical specifications and product descriptions.
You are structured, data-driven, and precise. Your answers are always clear, professional, and to the point.
When asked to output JSON, provide ONLY the JSON object without any surrounding text or code formatting.
Maintain consistent logic throughout the process. All phases build on each other.
Perform an internal plausibility check before providing an answer.
Use "Wackler Symbiosis" as an internal framework for analyzing product synergies.
Use the "Hybrid Service Logic" concept to evaluate if a product needs to be combined with a service (e.g., due to high maintenance requirements).
"""
def get_output_lang_instruction(lang):
"""Returns a strong instruction to enforce the output language."""
if lang == 'de':
return "ACHTUNG: Die gesamte Ausgabe (JSON-Werte, Texte, Analysen) MUSS in DEUTSCH sein. Übersetze englische Input-Daten."
return "IMPORTANT: The entire output MUST be in ENGLISH."
# --- ORCHESTRATOR PHASES ---
def phase1(payload):
product_input = payload.get('productInput', '')
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
# Check if input is a URL and scrape it
if product_input.strip().startswith('http'):
logging.info(f"Input detected as URL. Starting scrape for: {product_input}")
analysis_content = scrape_website_details(product_input)
if "Fehler:" in analysis_content:
# If scraping fails, use the URL itself with a note for the AI.
analysis_content = f"Scraping der URL {product_input} ist fehlgeschlagen. Analysiere das Produkt basierend auf der URL und deinem allgemeinen Wissen."
logging.warning("Scraping failed. Using URL as fallback content for analysis.")
else:
analysis_content = product_input
logging.info("Input is raw text. Analyzing directly.")
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 1: PRODUCT ANALYSIS & CONSTRAINTS
Input: "{analysis_content}"
Task:
1. Extract and CONSOLIDATE technical features into 8-12 high-level core capabilities or value propositions. Group minor specs (e.g., specific ports like USB/Ethernet) into broader categories (e.g., "Connectivity & Integration"). Do NOT list every single hardware spec individually. Focus on what matters for the buyer.
2. Define hard constraints (e.g., physical dimensions, max payload, environment limitations).
3. Check for internal portfolio conflicts (hypothetical product "Scrubber 5000").
{lang_instr}
Output JSON format ONLY: {{"features": [], "constraints": [], "conflictCheck": {{"hasConflict": false, "details": "", "relatedProduct": ""}}, "rawAnalysis": ""}}
"""
log_and_save(project_id, "phase1", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase1", "response", response)
try:
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase1_result', json.dumps(data))
return data
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON from Gemini response in phase1: {response}")
error_response = {
"error": "Die Antwort des KI-Modells war kein gültiges JSON. Das passiert manchmal bei hoher Auslastung. Bitte versuchen Sie es in Kürze erneut.",
"details": response
}
return error_response
def phase2(payload):
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 2: IDEAL CUSTOMER PROFILE (ICP) & DATA PROXIES
Product Context: {json.dumps(phase1_data)}
Task: 1. Identify top 3 ICPs (Ideal Customer Profiles/Industries). 2. Define data proxies for identifying these ICPs online.
{lang_instr}
Output JSON format ONLY: {{"icps": [{{"name": "", "rationale": ""}}], "dataProxies": [{{"target": "", "method": ""}}]}}
"""
log_and_save(project_id, "phase2", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase2", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase2_result', json.dumps(data))
return data
def phase3(payload):
phase2_data = payload.get('phase2Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 3: WHALE HUNTING
Target ICPs (Industries): {json.dumps(phase2_data.get('icps'))}
Task: 1. Group 'Whales' (Key Accounts) strictly by ICP industries. 2. Identify 3-5 concrete top companies in DACH market per industry. 3. Define Buying Center Roles.
{lang_instr}
Output JSON format ONLY: {{"whales": [{{"industry": "", "accounts": []}}], "roles": []}}
"""
log_and_save(project_id, "phase3", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase3", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase3_result', json.dumps(data))
return data
def phase4(payload):
phase3_data = payload.get('phase3Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
all_accounts = []
for w in phase3_data.get('whales', []):
all_accounts.extend(w.get('accounts', []))
prompt = f"""
PHASE 4: STRATEGY & ANGLE DEVELOPMENT
Accounts: {json.dumps(all_accounts)}
Target Industries: {json.dumps([w.get('industry') for w in phase3_data.get('whales', [])])}
Product Features: {json.dumps(phase1_data.get('features'))}
Task: 1. Develop specific "Angle" per target/industry. 2. Consistency Check against Product Matrix. 3. **IMPORTANT:** Apply "Hybrid Service Logic" if constraints exist!
{lang_instr}
Output JSON format ONLY: {{"strategyMatrix": [{{"segment": "", "painPoint": "", "angle": "", "differentiation": ""}}]}}
"""
log_and_save(project_id, "phase4", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase4", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase4_result', json.dumps(data))
return data
def phase5(payload):
phase4_data = payload.get('phase4Data', {})
phase3_data = payload.get('phase3Data', {})
phase2_data = payload.get('phase2Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 5: ASSET GENERATION & FINAL REPORT
CONTEXT DATA:
- Technical: {json.dumps(phase1_data)}
- ICPs: {json.dumps(phase2_data)}
- Targets (Whales): {json.dumps(phase3_data)}
- Strategy: {json.dumps(phase4_data)}
TASK:
1. Create a "GTM STRATEGY REPORT" in Markdown.
2. CONSOLIDATE ALL PREVIOUS PHASES (1-4) into the report. Don't skip details.
3. Report Structure: Executive Summary, Product Analysis, Target Audience (ICPs), Target Accounts (Whales), Strategy Matrix, Next Steps.
4. Hybrid-Check: Ensure "Hybrid Service Logic" is visible.
{lang_instr}
Output: Return strictly MARKDOWN formatted text. Start with "# GTM STRATEGY REPORT".
"""
log_and_save(project_id, "phase5", "prompt", prompt)
report = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=False)
log_and_save(project_id, "phase5", "response", report)
db_manager.save_gtm_result(project_id, 'phase5_result', json.dumps({"report": report}))
return {"report": report}
def phase6(payload):
phase4_data = payload.get('phase4Data', {})
phase3_data = payload.get('phase3Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 6: SALES ENABLEMENT & VISUALS
CONTEXT: - Product Features: {json.dumps(phase1_data.get('features'))} - Personas: {json.dumps(phase3_data.get('roles'))} - Strategy: {json.dumps(phase4_data.get('strategyMatrix'))}
TASK: 1. Anticipate Friction & Objections. 2. Formulate Battlecards. 3. Create Visual Prompts.
{lang_instr}
Output JSON format ONLY: {{"battlecards": [{{"persona": "", "objection": "", "responseScript": ""}}], "visualPrompts": [{{"title": "", "context": "", "prompt": ""}}]}}
"""
log_and_save(project_id, "phase6", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase6", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase6_result', json.dumps(data))
return data
def phase7(payload):
phase4_data = payload.get('phase4Data', {})
phase2_data = payload.get('phase2Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 7: VERTICAL LANDING PAGE COPY (Conversion Optimization)
ICPs: {json.dumps(phase2_data.get('icps'))}
Strategy: {json.dumps(phase4_data.get('strategyMatrix'))}
TASK: 1. Transform generic features into specific benefits for the Top 2 ICPs. 2. Apply "Wackler Symbiosis". 3. Create Landing Page Drafts (Hero Section).
{lang_instr}
Output JSON format ONLY: {{"landingPages": [{{"industry": "", "headline": "", "subline": "", "bullets": [], "cta": ""}}]}}
"""
log_and_save(project_id, "phase7", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase7", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase7_result', json.dumps(data))
return data
def phase8(payload):
phase2_data = payload.get('phase2Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 8: BUSINESS CASE BUILDER (The CFO Pitch)
Input: ICPs: {json.dumps(phase2_data.get('icps'))}, Features: {json.dumps(phase1_data.get('features'))}
TASK: 1. Estimate labor costs/pain points. 2. Compare against Robot Leasing (approx 330-600€/month). 3. Develop ROI logic. 4. Create "Financial Argumentation Guide" for each ICP.
{lang_instr}
Output JSON format ONLY: {{"businessCases": [{{"industry": "", "costDriver": "", "efficiencyGain": "", "riskArgument": ""}}]}}
"""
log_and_save(project_id, "phase8", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase8", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase8_result', json.dumps(data))
return data
def phase9(payload):
phase1_data = payload.get('phase1Data', {})
phase4_data = payload.get('phase4Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 9: THE "FEATURE-TO-VALUE" TRANSLATOR
Input Features: {json.dumps(phase1_data.get('features'))}
Strategy Pains: {json.dumps([s.get('painPoint') for s in phase4_data.get('strategyMatrix', [])])}
TASK: 1. Take a tech feature. 2. Ask "So what?". 3. Ask "So what?" again. 4. Formulate benefit without jargon. Create a table.
{lang_instr}
Output JSON format ONLY: {{"techTranslations": [{{"feature": "", "story": "", "headline": ""}}]}}
"""
log_and_save(project_id, "phase9", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase9", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase9_result', json.dumps(data))
return data
def translate(payload):
# ... (to be implemented)
return {"report": "Translated report will be here."}
def image(payload):
prompt = payload.get('prompt', 'No Prompt')[:50] + "..."
# Create a simple SVG placeholder to avoid frontend crash
svg = f"""
<svg width="512" height="512" xmlns="http://www.w3.org/2000/svg">
<rect width="100%" height="100%" fill="#eee"/>
<text x="50%" y="50%" font-family="Arial" font-size="20" fill="#555" text-anchor="middle" dy=".3em">
Image Generation Not Configured
</text>
<text x="50%" y="60%" font-family="Arial" font-size="12" fill="#888" text-anchor="middle" dy=".3em">
{prompt}
</text>
</svg>
"""
svg_b64 = base64.b64encode(svg.encode('utf-8')).decode('utf-8')
return {"imageBase64": f"data:image/svg+xml;base64,{svg_b64}"}
def main():
"""
Main entry point of the script.
Parses command-line arguments to determine which phase to run.
"""
parser = argparse.ArgumentParser(description="GTM Architect Orchestrator")
parser.add_argument("--mode", required=True, help="The execution mode (e.g., phase1, phase2).")
parser.add_argument("--payload_base64", help="The Base64 encoded JSON payload (deprecated, use payload_file).")
parser.add_argument("--payload_file", help="Path to a JSON file containing the payload (preferred).")
args = parser.parse_args()
payload = {}
try:
if args.payload_file:
if not os.path.exists(args.payload_file):
raise FileNotFoundError(f"Payload file not found: {args.payload_file}")
with open(args.payload_file, 'r', encoding='utf-8') as f:
payload = json.load(f)
elif args.payload_base64:
payload_str = base64.b64decode(args.payload_base64).decode('utf-8')
payload = json.loads(payload_str)
else:
raise ValueError("No payload provided (neither --payload_file nor --payload_base64).")
except (json.JSONDecodeError, base64.binascii.Error, ValueError, FileNotFoundError) as e:
logging.error(f"Failed to load payload: {e}")
# Print error as JSON to stdout for the server to catch
print(json.dumps({"error": "Invalid payload.", "details": str(e)}))
sys.exit(1)
# Function mapping to dynamically call the correct phase
modes = {
"phase1": phase1,
"phase2": phase2,
"phase3": phase3,
"phase4": phase4,
"phase5": phase5,
"phase6": phase6,
"phase7": phase7,
"phase8": phase8,
"phase9": phase9,
"translate": translate,
"image": image,
}
mode_function = modes.get(args.mode)
if not mode_function:
logging.error(f"Invalid mode specified: {args.mode}")
print(json.dumps({"error": f"Invalid mode: {args.mode}"}))
sys.exit(1)
try:
logging.info(f"Executing mode: {args.mode}")
result = mode_function(payload)
# Ensure the output is always a JSON string
print(json.dumps(result, ensure_ascii=False))
logging.info(f"Successfully executed mode: {args.mode}")
except Exception as e:
logging.error(f"An error occurred during execution of mode '{args.mode}': {e}", exc_info=True)
print(json.dumps({"error": f"An error occurred in {args.mode}.", "details": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()