- Adds a version and timestamp to the orchestrator's startup logs to verify code deployment. - Introduces extensive debug logging in config.py and helpers.py to trace the API key loading process, including exact file paths and environment variable checks. This will help diagnose the persistent 'API Key missing' error.
357 lines
16 KiB
Python
357 lines
16 KiB
Python
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import os
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from datetime import datetime
|
|
from config import Config
|
|
import gtm_db_manager as db_manager
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from helpers import call_gemini_flash, scrape_website_details
|
|
|
|
LOG_DIR = "Log_from_docker"
|
|
if not os.path.exists(LOG_DIR):
|
|
os.makedirs(LOG_DIR)
|
|
|
|
ORCHESTRATOR_VERSION = "1.1.0"
|
|
run_timestamp = datetime.now().strftime("%y-%m-%d_%H-%M-%S")
|
|
log_file_path = os.path.join(LOG_DIR, f"{run_timestamp}_gtm_orchestrator_run.log")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file_path, mode='a', encoding='utf-8'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logging.info(f"GTM Architect Orchestrator v{{ORCHESTRATOR_VERSION}} ({{run_timestamp}}) starting...")
|
|
def log_and_save(project_id, step_name, data_type, content):
|
|
logging.info(f"Project {project_id} - Step: {step_name} - Type: {data_type}")
|
|
filename = f"{run_timestamp}_{step_name}_{data_type}.txt"
|
|
filepath = os.path.join(LOG_DIR, filename)
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
if isinstance(content, (dict, list)):
|
|
json.dump(content, f, indent=4, ensure_ascii=False)
|
|
else:
|
|
f.write(str(content))
|
|
logging.info(f"Saved {data_type} to {filepath}")
|
|
except Exception as e:
|
|
logging.error(f"Failed to save {data_type} to file: {e}")
|
|
|
|
def get_system_instruction(lang):
|
|
if lang == 'de':
|
|
return """
|
|
Du bist ein internationaler Go-to-Market (GTM) Experte für B2B-Technologie-Unternehmen im Bereich Robotik, Facility Management und IoT.
|
|
Deine Aufgabe ist es, aus technischen Spezifikationen und Produktbeschreibungen eine umfassende GTM-Strategie zu entwickeln.
|
|
Du arbeitest strukturiert, datengetrieben und präzise. Deine Antworten sind immer klar, professionell und direkt auf den Punkt.
|
|
Wenn du JSON ausgeben sollst, gib NUR das JSON-Objekt aus, ohne umschließende Text- oder Code-Formatierungen.
|
|
Behalte während des gesamten Prozesses eine konsistente Logik bei. Alle Phasen bauen aufeinander auf.
|
|
Führe eine interne Plausibilitätsprüfung durch, bevor du eine Antwort gibst.
|
|
Verwende "Wackler Symbiosis" als internes Framework für die Analyse von Produkt-Synergien.
|
|
Nutze das "Hybrid Service Logic" Konzept, um zu bewerten, ob ein Produkt mit einer Dienstleistung kombiniert werden muss (z.B. bei hohen Wartungsanforderungen).
|
|
"""
|
|
else: # Default to English
|
|
return """
|
|
You are an international Go-to-Market (GTM) expert for B2B technology companies in robotics, facility management, and IoT.
|
|
Your task is to develop a comprehensive GTM strategy from technical specifications and product descriptions.
|
|
You are structured, data-driven, and precise. Your answers are always clear, professional, and to the point.
|
|
When asked to output JSON, provide ONLY the JSON object without any surrounding text or code formatting.
|
|
Maintain consistent logic throughout the process. All phases build on each other.
|
|
Perform an internal plausibility check before providing an answer.
|
|
Use "Wackler Symbiosis" as an internal framework for analyzing product synergies.
|
|
Use the "Hybrid Service Logic" concept to evaluate if a product needs to be combined with a service (e.g., due to high maintenance requirements).
|
|
"""
|
|
|
|
# --- ORCHESTRATOR PHASES ---
|
|
|
|
def phase1(payload):
|
|
product_input = payload.get('productInput', '')
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
# Check if input is a URL and scrape it
|
|
if product_input.strip().startswith('http'):
|
|
logging.info(f"Input detected as URL. Starting scrape for: {product_input}")
|
|
analysis_content = scrape_website_details(product_input)
|
|
if "Fehler:" in analysis_content:
|
|
# If scraping fails, use the URL itself with a note for the AI.
|
|
analysis_content = f"Scraping der URL {product_input} ist fehlgeschlagen. Analysiere das Produkt basierend auf der URL und deinem allgemeinen Wissen."
|
|
logging.warning("Scraping failed. Using URL as fallback content for analysis.")
|
|
else:
|
|
analysis_content = product_input
|
|
logging.info("Input is raw text. Analyzing directly.")
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 1: PRODUCT ANALYSIS & CONSTRAINTS
|
|
Input: "{analysis_content}"
|
|
Task: 1. Extract technical features. 2. Define hard constraints. 3. Check for internal portfolio conflicts (hypothetical product "Scrubber 5000").
|
|
Output JSON format ONLY: {{"features": [], "constraints": [], "conflictCheck": {{"hasConflict": false, "details": "", "relatedProduct": ""}}, "rawAnalysis": ""}}
|
|
"""
|
|
log_and_save(project_id, "phase1", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase1", "response", response)
|
|
|
|
try:
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase1_result', json.dumps(data))
|
|
return data
|
|
except json.JSONDecodeError:
|
|
logging.error(f"Failed to decode JSON from Gemini response in phase1: {response}")
|
|
# Return a structured error that the frontend can display
|
|
error_response = {
|
|
"error": "Die Antwort des KI-Modells war kein gültiges JSON. Das passiert manchmal bei hoher Auslastung. Bitte versuchen Sie es in Kürze erneut.",
|
|
"details": response
|
|
}
|
|
return error_response
|
|
|
|
|
|
def phase2(payload):
|
|
phase1_data = payload.get('phase1Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 2: IDEAL CUSTOMER PROFILE (ICP) & DATA PROXIES
|
|
Product Context: {json.dumps(phase1_data)}
|
|
Task: 1. Identify top 3 ICPs (Ideal Customer Profiles/Industries). 2. Define data proxies for identifying these ICPs online.
|
|
Output JSON format ONLY: {{"icps": [{{"name": "", "rationale": ""}}], "dataProxies": [{{"target": "", "method": ""}}]}}
|
|
"""
|
|
log_and_save(project_id, "phase2", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase2", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase2_result', json.dumps(data))
|
|
return data
|
|
|
|
def phase3(payload):
|
|
phase2_data = payload.get('phase2Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 3: WHALE HUNTING
|
|
Target ICPs (Industries): {json.dumps(phase2_data.get('icps'))}
|
|
Task: 1. Group 'Whales' (Key Accounts) strictly by ICP industries. 2. Identify 3-5 concrete top companies in DACH market per industry. 3. Define Buying Center Roles.
|
|
Output JSON format ONLY: {{"whales": [{{"industry": "", "accounts": []}}], "roles": []}}
|
|
"""
|
|
log_and_save(project_id, "phase3", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase3", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase3_result', json.dumps(data))
|
|
return data
|
|
|
|
def phase4(payload):
|
|
phase3_data = payload.get('phase3Data', {})
|
|
phase1_data = payload.get('phase1Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
all_accounts = []
|
|
for w in phase3_data.get('whales', []):
|
|
all_accounts.extend(w.get('accounts', []))
|
|
|
|
prompt = f"""
|
|
PHASE 4: STRATEGY & ANGLE DEVELOPMENT
|
|
Accounts: {json.dumps(all_accounts)}
|
|
Target Industries: {json.dumps([w.get('industry') for w in phase3_data.get('whales', [])])}
|
|
Product Features: {json.dumps(phase1_data.get('features'))}
|
|
Task: 1. Develop specific "Angle" per target/industry. 2. Consistency Check against Product Matrix. 3. **IMPORTANT:** Apply "Hybrid Service Logic" if constraints exist!
|
|
Output JSON format ONLY: {{"strategyMatrix": [{{"segment": "", "painPoint": "", "angle": "", "differentiation": ""}}]}}
|
|
"""
|
|
log_and_save(project_id, "phase4", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase4", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase4_result', json.dumps(data))
|
|
return data
|
|
|
|
def phase5(payload):
|
|
phase4_data = payload.get('phase4Data', {})
|
|
phase3_data = payload.get('phase3Data', {})
|
|
phase2_data = payload.get('phase2Data', {})
|
|
phase1_data = payload.get('phase1Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 5: ASSET GENERATION & FINAL REPORT
|
|
CONTEXT DATA:
|
|
- Technical: {json.dumps(phase1_data)}
|
|
- ICPs: {json.dumps(phase2_data)}
|
|
- Targets (Whales): {json.dumps(phase3_data)}
|
|
- Strategy: {json.dumps(phase4_data)}
|
|
TASK:
|
|
1. Create a "GTM STRATEGY REPORT" in Markdown.
|
|
2. Report Structure: Executive Summary, Product Analysis, Target Audience, Target Accounts, Strategy Matrix, Assets.
|
|
3. Hybrid-Check: Ensure "Hybrid Service Logic" is visible.
|
|
Output: Return strictly MARKDOWN formatted text. Start with "# GTM STRATEGY REPORT".
|
|
"""
|
|
log_and_save(project_id, "phase5", "prompt", prompt)
|
|
report = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=False)
|
|
log_and_save(project_id, "phase5", "response", report)
|
|
db_manager.save_gtm_result(project_id, 'phase5_result', json.dumps({"report": report}))
|
|
return {"report": report}
|
|
|
|
def phase6(payload):
|
|
phase4_data = payload.get('phase4Data', {})
|
|
phase3_data = payload.get('phase3Data', {})
|
|
phase1_data = payload.get('phase1Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 6: SALES ENABLEMENT & VISUALS
|
|
CONTEXT: - Product Features: {json.dumps(phase1_data.get('features'))} - Personas: {json.dumps(phase3_data.get('roles'))} - Strategy: {json.dumps(phase4_data.get('strategyMatrix'))}
|
|
TASK: 1. Anticipate Friction & Objections. 2. Formulate Battlecards. 3. Create Visual Prompts.
|
|
Output JSON format ONLY: {{"battlecards": [{{"persona": "", "objection": "", "responseScript": ""}}], "visualPrompts": [{{"title": "", "context": "", "prompt": ""}}]}}
|
|
"""
|
|
log_and_save(project_id, "phase6", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase6", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase6_result', json.dumps(data))
|
|
return data
|
|
|
|
def phase7(payload):
|
|
phase4_data = payload.get('phase4Data', {})
|
|
phase2_data = payload.get('phase2Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 7: VERTICAL LANDING PAGE COPY (Conversion Optimization)
|
|
ICPs: {json.dumps(phase2_data.get('icps'))}
|
|
Strategy: {json.dumps(phase4_data.get('strategyMatrix'))}
|
|
TASK: 1. Transform generic features into specific benefits for the Top 2 ICPs. 2. Apply "Wackler Symbiosis". 3. Create Landing Page Drafts (Hero Section).
|
|
Output JSON format ONLY: {{"landingPages": [{{"industry": "", "headline": "", "subline": "", "bullets": [], "cta": ""}}]}}
|
|
"""
|
|
log_and_save(project_id, "phase7", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase7", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase7_result', json.dumps(data))
|
|
return data
|
|
|
|
def phase8(payload):
|
|
phase2_data = payload.get('phase2Data', {})
|
|
phase1_data = payload.get('phase1Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 8: BUSINESS CASE BUILDER (The CFO Pitch)
|
|
Input: ICPs: {json.dumps(phase2_data.get('icps'))}, Features: {json.dumps(phase1_data.get('features'))}
|
|
TASK: 1. Estimate labor costs/pain points. 2. Compare against Robot Leasing (approx 330-600€/month). 3. Develop ROI logic. 4. Create "Financial Argumentation Guide" for each ICP.
|
|
Output JSON format ONLY: {{"businessCases": [{{"industry": "", "costDriver": "", "efficiencyGain": "", "riskArgument": ""}}]}}
|
|
"""
|
|
log_and_save(project_id, "phase8", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase8", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase8_result', json.dumps(data))
|
|
return data
|
|
|
|
def phase9(payload):
|
|
phase1_data = payload.get('phase1Data', {})
|
|
phase4_data = payload.get('phase4Data', {})
|
|
lang = payload.get('lang', 'de')
|
|
project_id = payload.get('projectId')
|
|
|
|
sys_instr = get_system_instruction(lang)
|
|
prompt = f"""
|
|
PHASE 9: THE "FEATURE-TO-VALUE" TRANSLATOR
|
|
Input Features: {json.dumps(phase1_data.get('features'))}
|
|
Strategy Pains: {json.dumps([s.get('painPoint') for s in phase4_data.get('strategyMatrix', [])])}
|
|
TASK: 1. Take a tech feature. 2. Ask "So what?". 3. Ask "So what?" again. 4. Formulate benefit without jargon. Create a table.
|
|
Output JSON format ONLY: {{"techTranslations": [{{"feature": "", "story": "", "headline": ""}}]}}
|
|
"""
|
|
log_and_save(project_id, "phase9", "prompt", prompt)
|
|
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
|
|
log_and_save(project_id, "phase9", "response", response)
|
|
data = json.loads(response)
|
|
db_manager.save_gtm_result(project_id, 'phase9_result', json.dumps(data))
|
|
return data
|
|
|
|
def translate(payload):
|
|
# ... (to be implemented)
|
|
return {"report": "Translated report will be here."}
|
|
|
|
def image(payload):
|
|
# ... (to be implemented)
|
|
return {"imageBase64": ""}
|
|
|
|
def main():
|
|
"""
|
|
Main entry point of the script.
|
|
Parses command-line arguments to determine which phase to run.
|
|
"""
|
|
parser = argparse.ArgumentParser(description="GTM Architect Orchestrator")
|
|
parser.add_argument("--mode", required=True, help="The execution mode (e.g., phase1, phase2).")
|
|
parser.add_argument("--payload_base64", required=True, help="The Base64 encoded JSON payload.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
payload_str = base64.b64decode(args.payload_base64).decode('utf-8')
|
|
payload = json.loads(payload_str)
|
|
except (json.JSONDecodeError, base64.binascii.Error) as e:
|
|
logging.error(f"Failed to decode payload: {e}")
|
|
# Print error as JSON to stdout for the server to catch
|
|
print(json.dumps({"error": "Invalid payload format.", "details": str(e)}))
|
|
sys.exit(1)
|
|
|
|
# Function mapping to dynamically call the correct phase
|
|
modes = {
|
|
"phase1": phase1,
|
|
"phase2": phase2,
|
|
"phase3": phase3,
|
|
"phase4": phase4,
|
|
"phase5": phase5,
|
|
"phase6": phase6,
|
|
"phase7": phase7,
|
|
"phase8": phase8,
|
|
"phase9": phase9,
|
|
"translate": translate,
|
|
"image": image,
|
|
}
|
|
|
|
mode_function = modes.get(args.mode)
|
|
|
|
if not mode_function:
|
|
logging.error(f"Invalid mode specified: {args.mode}")
|
|
print(json.dumps({"error": f"Invalid mode: {args.mode}"}))
|
|
sys.exit(1)
|
|
|
|
try:
|
|
logging.info(f"Executing mode: {args.mode}")
|
|
result = mode_function(payload)
|
|
# Ensure the output is always a JSON string
|
|
print(json.dumps(result, ensure_ascii=False))
|
|
logging.info(f"Successfully executed mode: {args.mode}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred during execution of mode '{args.mode}': {e}", exc_info=True)
|
|
print(json.dumps({"error": f"An error occurred in {args.mode}.", "details": str(e)}))
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|