chore: [30388f42] Finaler Migration-Readiness-Check

- Wiederherstellung aller Dienste in der  mit korrigierten Pfaden (, ).
- Rückverschiebung aktiver Skripte ( etc.) aus dem Archiv an ihre operativen Orte.
- Hinzufügen von  zur automatischen Prüfung aller Docker-Pfade.
- Systemstatus: Validiert und bereit für Umzug auf neue Infrastruktur.
This commit is contained in:
2026-03-06 13:43:03 +00:00
parent 0da0130325
commit 7f03aaf02e
6 changed files with 2196 additions and 10 deletions

View File

@@ -1,10 +1,7 @@
# WICHTIGER HINWEIS FÜR SPRACHMODELLE UND ENTWICKLER: # =================================================================
# Diese docker-compose.yml Datei ist die zentrale Orchestrierungsdatei für ALLE Docker-Services dieses Projekts. # ZENTRALE ORCHESTRIERUNGSDATEI - ROBO-PLANET GTM STACK
# Es ist strengstens untersagt, Service-Definitionen, Volumes, Netzwerke oder andere Konfigurationen # FINALER STAND FÜR MIGRATION (MÄRZ 2026)
# willkürlich zu löschen, auszukommentieren oder zu modifizieren, es sei denn, dies wurde # =================================================================
# explizit angefordert und die Auswirkungen wurden vollständig verstanden.
# Unbeabsichtigte Löschungen können zu massivem Datenverlust und Fehlfunktionen des Systems führen.
# Prüfe IMMER den gesamten Kontext der Datei und die Projektdokumentation (readme.md), bevor du Änderungen vornimmst.
version: '3.8' version: '3.8'
@@ -15,7 +12,7 @@ services:
container_name: gateway_proxy container_name: gateway_proxy
restart: unless-stopped restart: unless-stopped
ports: ports:
- "8090:80" # Synology Reverse Proxy should point to THIS port (8090) - "8090:80"
volumes: volumes:
- ./nginx-proxy.conf:/etc/nginx/nginx.conf:ro - ./nginx-proxy.conf:/etc/nginx/nginx.conf:ro
- ./.htpasswd:/etc/nginx/.htpasswd:ro - ./.htpasswd:/etc/nginx/.htpasswd:ro
@@ -23,6 +20,13 @@ services:
- company-explorer - company-explorer
- dashboard - dashboard
- connector-superoffice - connector-superoffice
- b2b-assistant
- market-frontend
- gtm-app
- transcription-app
- content-app
- competitor-analysis
- heatmap-frontend
# --- DASHBOARD --- # --- DASHBOARD ---
dashboard: dashboard:
@@ -50,7 +54,7 @@ services:
volumes: volumes:
- ./company-explorer:/app - ./company-explorer:/app
- ./data/companies_v3_fixed_2.db:/app/companies_v3_fixed_2.db - ./data/companies_v3_fixed_2.db:/app/companies_v3_fixed_2.db
- ./Log_from_docker:/app/logs_debug # Ensure logging path is correct - ./Log_from_docker:/app/logs_debug
connector-superoffice: connector-superoffice:
build: build:
@@ -59,7 +63,7 @@ services:
container_name: connector-superoffice container_name: connector-superoffice
restart: unless-stopped restart: unless-stopped
ports: ports:
- "8003:8000" # Expose internal 8000 to host 8003 (8002 was taken) - "8003:8000"
env_file: env_file:
- .env - .env
volumes: volumes:
@@ -71,3 +75,197 @@ services:
API_PASSWORD: "gemini" API_PASSWORD: "gemini"
DB_PATH: "/app/connector_queue.db" DB_PATH: "/app/connector_queue.db"
COMPANY_EXPLORER_URL: "http://company-explorer:8000" COMPANY_EXPLORER_URL: "http://company-explorer:8000"
lead-engine:
build:
context: ./lead-engine
dockerfile: Dockerfile
container_name: lead-engine
restart: unless-stopped
ports:
- "8501:8501"
- "8004:8004"
env_file:
- .env
environment:
PYTHONUNBUFFERED: "1"
COMPANY_EXPLORER_URL: "http://company-explorer:8000"
volumes:
- ./lead-engine:/app
- ./company-explorer/company_explorer_connector.py:/app/company_explorer_connector.py
# --- APPS (Auxiliary) ---
transcription-app:
build:
context: ./transcription-tool
dockerfile: Dockerfile
container_name: transcription-app
restart: unless-stopped
ports:
- "8001:8001"
env_file:
- .env
volumes:
- ./transcription-tool/backend:/app/backend
- ./transcription-tool/frontend/dist:/app/frontend/dist
- ./data/transcripts.db:/app/transcripts.db
- ./uploads_audio:/app/uploads_audio
environment:
PYTHONUNBUFFERED: "1"
DATABASE_URL: "sqlite:////app/transcripts.db"
b2b-assistant:
build:
context: .
dockerfile: dockerfiles/Dockerfile.b2b
container_name: b2b-assistant
restart: unless-stopped
env_file:
- .env
volumes:
- ./b2b-marketing-assistant/b2b_marketing_orchestrator.py:/app/b2b_marketing_orchestrator.py
- ./market_db_manager.py:/app/market_db_manager.py
- ./b2b-marketing-assistant/server.cjs:/app/server.cjs
- ./data/b2b_projects.db:/app/b2b_projects.db
- ./Log_from_docker:/app/Log_from_docker
environment:
PYTHONUNBUFFERED: "1"
DB_PATH: "/app/b2b_projects.db"
market-backend:
build:
context: .
dockerfile: dockerfiles/Dockerfile.market
container_name: market-backend
restart: unless-stopped
env_file:
- .env
volumes:
- ./general-market-intelligence/market_intel_orchestrator.py:/app/market_intel_orchestrator.py
- ./market_db_manager.py:/app/market_db_manager.py
- ./config.py:/app/config.py
- ./helpers.py:/app/helpers.py
- ./general-market-intelligence/server.cjs:/app/general-market-intelligence/server.cjs
- ./data/market_intelligence.db:/app/market_intelligence.db
- ./Log:/app/Log
environment:
PYTHONUNBUFFERED: "1"
DB_PATH: "/app/market_intelligence.db"
market-frontend:
build:
context: ./general-market-intelligence
dockerfile: Dockerfile
container_name: market-frontend
restart: unless-stopped
depends_on:
- market-backend
gtm-app:
build:
context: .
dockerfile: gtm-architect/Dockerfile
container_name: gtm-app
restart: unless-stopped
env_file:
- .env
volumes:
- ./gtm-architect:/app/gtm-architect
- ./gtm-architect/server.cjs:/app/server.cjs
- ./gtm-architect/gtm_architect_orchestrator.py:/app/gtm_architect_orchestrator.py
- ./helpers.py:/app/helpers.py
- ./config.py:/app/config.py
- ./gtm-architect/gtm_db_manager.py:/app/gtm_db_manager.py
- ./data/gtm_projects.db:/app/gtm_projects.db
- ./Log_from_docker:/app/Log_from_docker
environment:
PYTHONUNBUFFERED: "1"
DB_PATH: "/app/gtm_projects.db"
content-app:
build:
context: .
dockerfile: content-engine/Dockerfile
container_name: content-app
restart: unless-stopped
env_file:
- .env
volumes:
- ./content-engine:/app/content-engine
- ./content-engine/server.cjs:/app/server.cjs
- ./content-engine/content_orchestrator.py:/app/content_orchestrator.py
- ./content-engine/content_db_manager.py:/app/content_db_manager.py
- ./data/content_engine.db:/app/content_engine.db
- ./helpers.py:/app/helpers.py
- ./config.py:/app/config.py
- ./data/gtm_projects.db:/app/gtm_projects.db
- ./Log_from_docker:/app/Log_from_docker
environment:
PYTHONUNBUFFERED: "1"
DB_PATH: "/app/content_engine.db"
GTM_DB_PATH: "/app/gtm_projects.db"
competitor-analysis:
build:
context: ./competitor-analysis-app
dockerfile: Dockerfile
container_name: competitor-analysis
restart: unless-stopped
env_file:
- .env
dns:
- 8.8.8.8
- 8.8.4.4
volumes:
- ./competitor-analysis-app/competitor_analysis_orchestrator.py:/app/competitor_analysis_orchestrator.py
- ./Log_from_docker:/app/logs_debug
environment:
PYTHONUNBUFFERED: "1"
# --- HEATMAP ---
heatmap-backend:
build: ./heatmap-tool/backend
container_name: heatmap-backend
restart: unless-stopped
env_file:
- .env
volumes:
- ./heatmap-tool/backend:/app
heatmap-frontend:
build: ./heatmap-tool/frontend
container_name: heatmap-frontend
restart: unless-stopped
env_file:
- .env
volumes:
- ./heatmap-tool/frontend:/app
depends_on:
- heatmap-backend
# --- INFRASTRUCTURE ---
duckdns:
image: lscr.io/linuxserver/duckdns:latest
container_name: duckdns
restart: unless-stopped
env_file:
- .env
environment:
PUID: "1000"
PGID: "1000"
TZ: "Europe/Berlin"
SUBDOMAINS: "floke,floke-ai,floke-gitea,floke-ha,floke-n8n"
dns-monitor:
image: alpine
container_name: dns-monitor
restart: unless-stopped
dns:
- 8.8.8.8
- 1.1.1.1
environment:
SUBDOMAINS: "floke,floke-ai,floke-gitea,floke-ha,floke-n8n"
TZ: "Europe/Berlin"
volumes:
- ./dns-monitor:/app
command: "/app/monitor.sh"

View File

@@ -0,0 +1,676 @@
import argparse
import json
import os
import sys # Import sys for stderr
import requests
from bs4 import BeautifulSoup
import logging
from datetime import datetime
import re # Für Regex-Operationen
# --- AUTARKES LOGGING SETUP --- #
def create_self_contained_log_filename(mode):
"""
Erstellt einen zeitgestempelten Logdateinamen für den Orchestrator.
Verwendet ein festes Log-Verzeichnis innerhalb des Docker-Containers.
NEU: Nur eine Datei pro Tag, um Log-Spam zu verhindern.
"""
log_dir_path = "/app/Log" # Festes Verzeichnis im Container
if not os.path.exists(log_dir_path):
os.makedirs(log_dir_path, exist_ok=True)
# Nur Datum verwenden, nicht Uhrzeit, damit alle Runs des Tages in einer Datei landen
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}_market_intel.log"
return os.path.join(log_dir_path, filename)
log_filename = create_self_contained_log_filename("market_intel_orchestrator")
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(log_filename, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger(__name__)
# --- END AUTARKES LOGGING SETUP --- #
def load_gemini_api_key(file_path="gemini_api_key.txt"):
try:
with open(file_path, "r") as f:
api_key = f.read().strip()
return api_key
except Exception as e:
logger.critical(f"Fehler beim Laden des Gemini API Keys: {e}")
raise
def load_serp_api_key(file_path="serpapikey.txt"):
"""Lädt den SerpAPI Key. Gibt None zurück, wenn nicht gefunden."""
try:
if os.path.exists(file_path):
with open(file_path, "r") as f:
return f.read().strip()
# Fallback: Versuche Umgebungsvariable
return os.environ.get("SERP_API_KEY")
except Exception as e:
logger.warning(f"Konnte SerpAPI Key nicht laden: {e}")
return None
def get_website_text(url):
# Auto-fix missing scheme
if url and not url.startswith('http'):
url = 'https://' + url
logger.info(f"Scraping URL: {url}")
try:
# Use a more realistic, modern User-Agent to avoid blocking
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
'Referer': 'https://www.google.com/'
}
response = requests.get(url, headers=headers, timeout=15) # Increased timeout
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
tag.decompose()
text = soup.get_text(separator=' ', strip=True)
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
return text[:15000] # Increased limit
except Exception as e:
logger.error(f"Scraping failed for {url}: {e}")
return None
def serp_search(query, num_results=3):
"""Führt eine Google-Suche über SerpAPI durch."""
api_key = load_serp_api_key()
if not api_key:
logger.warning("SerpAPI Key fehlt. Suche übersprungen.")
return []
logger.info(f"SerpAPI Suche: {query}")
try:
params = {
"engine": "google",
"q": query,
"api_key": api_key,
"num": num_results,
"hl": "de",
"gl": "de"
}
response = requests.get("https://serpapi.com/search", params=params, timeout=20)
response.raise_for_status()
data = response.json()
results = []
if "organic_results" in data:
for result in data["organic_results"]:
results.append({
"title": result.get("title"),
"link": result.get("link"),
"snippet": result.get("snippet")
})
return results
except Exception as e:
logger.error(f"SerpAPI Fehler: {e}")
return []
def _extract_target_industries_from_context(context_content):
md = context_content
# Versuche verschiedene Muster für die Tabelle, falls das Format variiert
step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE)
if not step2_match:
# Fallback: Suche nach "Zielbranche" irgendwo im Text
match = re.search(r'Zielbranche\s*\|?\s*([^|\n]+)', md, re.IGNORECASE)
if match:
return [s.strip() for s in match.group(1).split(',')]
return []
table_lines = []
in_table = False
for line in step2_match.group(0).split('\n'):
if line.strip().startswith('|'):
in_table = True
table_lines.append(line.strip())
elif in_table:
break
if len(table_lines) < 3: return []
header = [s.strip() for s in table_lines[0].split('|') if s.strip()]
industry_col = next((h for h in header if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None)
if not industry_col: return []
col_idx = header.index(industry_col)
industries = []
for line in table_lines[2:]:
cells = [s.strip() for s in line.split('|') if s.strip()]
if len(cells) > col_idx: industries.append(cells[col_idx])
return list(set(industries))
def _extract_json_from_text(text):
"""
Versucht, ein JSON-Objekt aus einem Textstring zu extrahieren,
unabhängig von Markdown-Formatierung (```json ... ```).
"""
try:
# 1. Versuch: Direktersatz von Markdown-Tags (falls vorhanden)
clean_text = text.replace("```json", "").replace("```", "").strip()
return json.loads(clean_text)
except json.JSONDecodeError:
pass
try:
# 2. Versuch: Regex Suche nach dem ersten { und letzten }
json_match = re.search(r"(\{[\s\S]*\})", text)
if json_match:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
logger.error(f"JSON Parsing fehlgeschlagen. Roher Text: {text[:500]}...")
return None
def generate_search_strategy(reference_url, context_content, language='de'):
logger.info(f"Generating strategy for {reference_url} (Language: {language})")
api_key = load_gemini_api_key()
target_industries = _extract_target_industries_from_context(context_content)
homepage_text = get_website_text(reference_url)
if not homepage_text:
logger.warning(f"Strategy Generation: Could not scrape {reference_url}. Relying on context.")
homepage_text = "[WEBSITE ACCESS DENIED] - The strategy must be developed based on the provided STRATEGIC CONTEXT and the URL name alone."
# Switch to stable 2.5-pro model (which works for v1beta)
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
prompt = f"""
You are a B2B Market Intelligence Architect.
--- ROLE DEFINITION ---
You are working for the company described in the "STRATEGIC CONTEXT" below (The "Hunter").
Your goal is to find new potential customers who look exactly like the "REFERENCE CLIENT" described below (The "Seed" / "Prey").
--- STRATEGIC CONTEXT (YOUR COMPANY / THE OFFER) ---
{context_content}
--- REFERENCE CLIENT HOMEPAGE (THE IDEAL CUSTOMER TO CLONE) ---
URL: {reference_url}
CONTENT: {homepage_text[:10000]}
--- TASK ---
Develop a search strategy to find **Lookalikes of the Reference Client** who would be interested in **Your Company's Offer**.
1. **summaryOfOffer**: A 1-sentence summary of what the **REFERENCE CLIENT** does (NOT what your company does). We need this to search for similar companies.
2. **idealCustomerProfile**: A concise definition of the Ideal Customer Profile (ICP) based on the Reference Client's characteristics.
3. **searchStrategyICP**: A detailed description of the Ideal Customer Profile (ICP) based on the analysis.
4. **digitalSignals**: Identification and description of relevant digital signals that indicate purchase interest or engagement for YOUR offer.
5. **targetPages**: A list of the most important target pages on the company website relevant for marketing and sales activities.
6. **signals**: Identify exactly 4 specific digital signals to check on potential lookalikes.
- **CRITICAL**: One signal MUST be "Technographic / Incumbent Search". It must look for existing competitor software or legacy systems that **YOUR COMPANY'S OFFER** replaces or complements.
- The other 3 signals should focus on business pains or strategic fit.
--- SIGNAL DEFINITION ---
For EACH signal, you MUST provide:
- `id`: A unique ID (e.g., "sig_1").
- `name`: A short, descriptive name.
- `description`: What does this signal indicate?
- `targetPageKeywords`: A list of 3-5 keywords to look for on a company's website (e.g., ["career", "jobs"] for a hiring signal).
- `proofStrategy`: An object containing:
- `likelySource`: Where on the website or web is this info found? (e.g., "Careers Page").
- `searchQueryTemplate`: A Google search query to find this. Use `{{COMPANY}}` as a placeholder for the company name.
Example: `site:{{COMPANY}} "software engineer" OR "developer"`
--- LANGUAGE INSTRUCTION ---
IMPORTANT: The entire JSON content (descriptions, rationale, summaries) MUST be in {lang_instruction}. Translate if necessary.
--- OUTPUT FORMAT ---
Return ONLY a valid JSON object.
{{
"summaryOfOffer": "The Reference Client provides...",
"idealCustomerProfile": "...",
"searchStrategyICP": "...",
"digitalSignals": "...",
"targetPages": "...",
"signals": [ ... ]
}}
"""
payload = {"contents": [{"parts": [{"text": prompt}]}]}
logger.info("Sende Anfrage an Gemini API...")
try:
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
res_json = response.json()
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
text = res_json['candidates'][0]['content']['parts'][0]['text']
# DEBUG LOGGING FOR RAW JSON
logger.error(f"RAW GEMINI JSON RESPONSE: {text}")
result = _extract_json_from_text(text)
if not result:
raise ValueError("Konnte kein valides JSON extrahieren")
return result
except Exception as e:
logger.error(f"Strategy generation failed: {e}")
# Return fallback to avoid frontend crash
return {
"summaryOfOffer": "Error generating strategy. Please check logs.",
"idealCustomerProfile": "Error generating ICP. Please check logs.",
"searchStrategyICP": "Error generating Search Strategy ICP. Please check logs.",
"digitalSignals": "Error generating Digital Signals. Please check logs.",
"targetPages": "Error generating Target Pages. Please check logs.",
"signals": []
}
def identify_competitors(reference_url, target_market, industries, summary_of_offer=None, language='de'):
logger.info(f"Identifying competitors for {reference_url} (Language: {language})")
api_key = load_gemini_api_key()
# Switch to stable 2.5-pro model
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
prompt = f"""
You are a B2B Market Analyst. Find 3-5 direct competitors or highly similar companies (lookalikes) for the company at `{reference_url}`.
--- CONTEXT ---
- Reference Client Business (What they do): {summary_of_offer}
- Target Market: {target_market}
- Relevant Industries: {', '.join(industries)}
--- TASK ---
Identify companies that are **similar to the Reference Client** (i.e., Lookalikes).
We are looking for other companies that do the same thing as `{reference_url}`.
Categorize them into three groups:
1. 'localCompetitors': Competitors in the same immediate region/city.
2. 'nationalCompetitors': Competitors operating across the same country.
3. 'internationalCompetitors': Global players.
For EACH competitor, you MUST provide:
- `id`: A unique, URL-friendly identifier (e.g., "competitor-name-gmbh").
- `name`: The official, full name of the company.
- `description`: A concise explanation of why they are a competitor.
--- LANGUAGE INSTRUCTION ---
IMPORTANT: The entire JSON content (descriptions) MUST be in {lang_instruction}.
--- OUTPUT FORMAT ---
Return ONLY a valid JSON object with the following structure:
{{
"localCompetitors": [ {{ "id": "...", "name": "...", "description": "..." }} ],
"nationalCompetitors": [ ... ],
"internationalCompetitors": [ ... ]
}}
"""
payload = {"contents": [{"parts": [{"text": prompt}]}]}
logger.info("Sende Anfrage an Gemini API...")
# logger.debug(f"Rohe Gemini API-Anfrage (JSON): {json.dumps(payload, indent=2)}")
try:
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
res_json = response.json()
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
text = res_json['candidates'][0]['content']['parts'][0]['text']
result = _extract_json_from_text(text)
if not result:
raise ValueError("Konnte kein valides JSON extrahieren")
return result
except Exception as e:
logger.error(f"Competitor identification failed: {e}")
return {"localCompetitors": [], "nationalCompetitors": [], "internationalCompetitors": []}
def analyze_company(company_name, strategy, target_market, language='de'):
logger.info(f"--- STARTING DEEP TECH AUDIT FOR: {company_name} (Language: {language}) ---")
api_key = load_gemini_api_key()
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
# ... (Rest of function logic remains same, just update prompt) ...
# 1. Website Finding (SerpAPI fallback to Gemini)
url = None
website_search_results = serp_search(f"{company_name} offizielle Website")
if website_search_results:
url = website_search_results[0].get("link")
logger.info(f"Website via SerpAPI gefunden: {url}")
if not url:
# Fallback: Frage Gemini (Low Confidence)
logger.info("Keine URL via SerpAPI, frage Gemini...")
prompt_url = f"What is the official homepage URL for the company '{company_name}' in the market '{target_market}'? Respond with ONLY the single, complete URL and nothing else."
payload_url = {"contents": [{"parts": [{"text": prompt_url}]}]}
logger.info("Sende Anfrage an Gemini API (URL Fallback)...")
try:
res = requests.post(GEMINI_API_URL, json=payload_url, headers={'Content-Type': 'application/json'}, timeout=15)
res.raise_for_status()
res_json = res.json()
candidate = res_json.get('candidates', [{}])[0]
content = candidate.get('content', {}).get('parts', [{}])[0]
text_response = content.get('text', '').strip()
url_match = re.search(r'(https?://[^\s"]+)', text_response)
if url_match:
url = url_match.group(1)
except Exception as e:
logger.error(f"Gemini URL Fallback failed: {e}")
pass
if not url or not url.startswith("http"):
return {"error": f"Could not find website for {company_name}"}
homepage_text = ""
scraping_note = ""
if url and url.startswith("http"):
scraped_content = get_website_text(url)
if scraped_content:
homepage_text = scraped_content
else:
homepage_text = "[WEBSITE ACCESS DENIED]"
scraping_note = "(Website Content Unavailable)"
else:
homepage_text = "No valid URL found."
scraping_note = "(No URL found)"
tech_evidence = []
# NEU: Dynamische Suche basierend auf Strategie statt Hardcoded Liste
# Wir suchen NICHT mehr proaktiv nach SAP Ariba, es sei denn, es steht in der Strategie.
# Stattdessen machen wir eine generische "Tech Stack"-Suche.
tech_queries = [
f'site:{url.split("//")[-1].split("/")[0] if url and "//" in url else company_name} "software" OR "technology" OR "system"',
f'"{company_name}" "technology stack"',
f'"{company_name}" "partners"'
]
# Add explicit tech signals from strategy if they exist
signals = strategy.get('signals', [])
for signal in signals:
if "technographic" in signal.get('id', '').lower() or "incumbent" in signal.get('id', '').lower():
keywords = signal.get('targetPageKeywords', [])
for kw in keywords:
tech_queries.append(f'"{company_name}" "{kw}"')
# Deduplicate queries and limit
tech_queries = list(set(tech_queries))[:4]
for q in tech_queries:
results = serp_search(q, num_results=3)
if results:
for r in results:
tech_evidence.append(f"- Found: {r['title']}\n Snippet: {r['snippet']}\n Link: {r['link']}")
tech_evidence_text = "\n".join(tech_evidence)
signal_evidence = []
firmographics_results = serp_search(f"{company_name} Umsatz Mitarbeiterzahl 2023")
firmographics_context = "\n".join([f"- {r['snippet']} ({r['link']})" for r in firmographics_results])
for signal in signals:
# Skip technographic signals here as they are handled above or via generic search
if "incumbent" in signal['id'].lower() or "technographic" in signal['id'].lower(): continue
proof_strategy = signal.get('proofStrategy', {})
query_template = proof_strategy.get('searchQueryTemplate')
search_context = ""
if query_template:
try:
domain = url.split("//")[-1].split("/")[0].replace("www.", "")
except:
domain = ""
query = query_template.replace("{{COMPANY}}", company_name).replace("{COMPANY}", company_name).replace("{{domain}}", domain).replace("{domain}", domain)
results = serp_search(query, num_results=3)
if results:
search_context = "\n".join([f" * Snippet: {r['snippet']}\n Source: {r['link']}" for r in results])
if search_context:
signal_evidence.append(f"SIGNAL '{signal['name']}':\n{search_context}")
evidence_text = "\n\n".join(signal_evidence)
prompt = f"""
You are a Strategic B2B Sales Consultant.
Analyze the company '{company_name}' ({url}) to create a "best-of-breed" sales pitch strategy.
--- STRATEGY (What we are looking for) ---
{json.dumps(signals, indent=2)}
--- EVIDENCE 1: EXTERNAL TECH-STACK INTELLIGENCE ---
Analyze the search results below. Do NOT hallucinate technologies. Only list what is explicitly found.
{tech_evidence_text}
--- EVIDENCE 2: HOMEPAGE CONTENT {scraping_note} ---
{homepage_text[:8000]}
--- EVIDENCE 3: FIRMOGRAPHICS SEARCH ---
{firmographics_context}
--- EVIDENCE 4: TARGETED SIGNAL SEARCH RESULTS ---
{evidence_text}
----------------------------------
TASK:
1. **Firmographics**: Estimate Revenue and Employees.
2. **Technographic Audit**: Check if any relevant competitor technology or legacy system is ACTUALLY found in the evidence.
- **CRITICAL:** If no specific competitor software is found, assume the status is "Greenfield" (Manual Process / Status Quo). Do NOT invent a competitor like SAP Ariba just because it's a common tool.
3. **Status**:
- Set to "Nutzt Wettbewerber" ONLY if a direct competitor is explicitly found.
- Set to "Greenfield" if no competitor tech is found.
- Set to "Bestandskunde" if they already use our solution.
4. **Evaluate Signals**: For each signal, provide a "value" (Yes/No/Partial) and "proof".
5. **Recommendation (Pitch Strategy)**:
- If Greenfield: Pitch against the manual status quo (efficiency, error reduction).
- If Competitor: Pitch replacement/upgrade.
- **Tone**: Strategic, insider-knowledge, specific.
--- LANGUAGE INSTRUCTION ---
IMPORTANT: The entire JSON content (especially 'recommendation', 'proof', 'value') MUST be in {lang_instruction}.
STRICTLY output only JSON:
{{
"companyName": "{company_name}",
"status": "...",
"revenue": "...",
"employees": "...",
"tier": "Tier 1/2/3",
"dynamicAnalysis": {{
"sig_id_from_strategy": {{ "value": "...", "proof": "..." }}
}},
"recommendation": "..."
}}
"""
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"response_mime_type": "application/json"}
}
try:
logger.info("Sende Audit-Anfrage an Gemini API...")
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
response_data = response.json()
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
text = response_data['candidates'][0]['content']['parts'][0]['text']
result = _extract_json_from_text(text)
if not result:
raise ValueError("Konnte kein valides JSON extrahieren")
result['dataSource'] = "Digital Trace Audit (Deep Dive)"
return result
except Exception as e:
logger.error(f"Audit failed for {company_name}: {e}")
return {
"companyName": company_name,
"status": "Unklar",
"revenue": "Error",
"employees": "Error",
"tier": "Tier 3",
"dynamicAnalysis": {},
"recommendation": f"Audit failed: {str(e)}",
"dataSource": "Error"
}
def generate_outreach_campaign(company_data_json, knowledge_base_content, reference_url, specific_role=None, language='de'):
"""
Erstellt personalisierte E-Mail-Kampagnen.
"""
company_name = company_data_json.get('companyName', 'Unknown')
logger.info(f"--- STARTING OUTREACH GENERATION FOR: {company_name} (Role: {specific_role if specific_role else 'Top 5'}) [Lang: {language}] ---")
api_key = load_gemini_api_key()
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
if specific_role:
# --- MODE B: SINGLE ROLE GENERATION (On Demand) ---
task_description = f"""
--- TASK ---
1. **Focus**: Create a highly specific 3-step email campaign ONLY for the role: '{specific_role}'.
2. **Analyze**: Use the Audit Facts to find specific hooks for this role.
3. **Draft**: Write the sequence (Opening, Follow-up, Break-up).
"""
output_format = """
--- OUTPUT FORMAT (Strictly JSON) ---
{
"target_role": "The requested role",
"rationale": "Why this fits...",
"emails": [ ... ]
}
"""
else:
# --- MODE A: INITIAL START (TOP 1 + SUGGESTIONS) ---
task_description = f"""
--- TASK ---
1. **Analyze**: Match the Target Company (Input 2) to the most relevant 'Zielbranche/Segment' from the Knowledge Base (Input 1).
2. **Identify Roles**: Identify ALL relevant 'Rollen' (Personas) from the Knowledge Base that fit this company.
3. **Select Best**: Choose the SINGLE most promising role for immediate outreach based on the Audit findings.
4. **Draft Campaign**: Write a 3-step email sequence for this ONE role.
5. **List Others**: List ALL other relevant roles (including the other top candidates) in 'available_roles' so the user can generate them later.
"""
output_format = """
--- OUTPUT FORMAT (Strictly JSON) ---
{
"campaigns": [
{
"target_role": "Role Name",
"rationale": "Why selected...",
"emails": [ ... ]
}
],
"available_roles": [ "Role 2", "Role 3", "Role 4", "Role 5", ... ]
}
"""
prompt = f"""
You are a Strategic Key Account Manager and deeply technical Industry Insider.
Your goal is to write highly personalized, **operationally specific** outreach emails to the company '{company_name}'.
--- INPUT 1: YOUR IDENTITY & STRATEGY (The Sender) ---
{knowledge_base_content}
--- INPUT 2: THE TARGET COMPANY (Audit Facts) ---
{json.dumps(company_data_json, indent=2)}
--- INPUT 3: THE REFERENCE CLIENT (Social Proof) ---
Reference Client URL: {reference_url}
CRITICAL: This 'Reference Client' is an existing happy customer of ours. You MUST mention them by name to establish trust.
{task_description}
--- TONE & STYLE GUIDELINES (CRITICAL) ---
1. **Professional & Flowing:** Aim for approx. 500-600 characters per email. Use full sentences and professional courtesies. It should feel like a high-quality human message.
2. **Stance:** Act as an **astute industry observer** and peer consultant. You have analyzed their specific situation and identified a strategic bottleneck.
3. **The Opportunity Bridge (Email 1):** Bridge observation to a strategic solution immediately using concrete terms (e.g., "autonome Reinigungsrobotik").
4. **Context-Sensitive Technographics:** Only mention discovered IT or Procurement systems (e.g., SAP Ariba) if it is highly relevant to the **specific role** (e.g., for CEO, CFO, or Head of Procurement). For **purely operational roles** (e.g., Facility Manager, Head of Operations), AVOID mentioning these systems as it may cause confusion; focus entirely on the operational pain (labor shortage) and growth bottlenecks instead.
5. **Soft-Sell vs. Hard-Pitch:** Position technology as a logical answer to the bottleneck. Pitch the **outcome/capability**, not features.
6. **Social Proof as the Engine:** Let the Reference Client ({reference_url}) provide the evidence. Use a role-specific KPI.
7. **Operational Grit:** Use domain-specific terms (e.g., "ASNs", "8D", "TCO") to establish authority.
8. **Language:** {lang_instruction}.
{output_format}
"""
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"response_mime_type": "application/json"}
}
try:
logger.info("Sende Campaign-Anfrage an Gemini API...")
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
response.raise_for_status()
response_data = response.json()
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
text = response_data['candidates'][0]['content']['parts'][0]['text']
result = _extract_json_from_text(text)
if not result:
raise ValueError("Konnte kein valides JSON extrahieren")
return result
except Exception as e:
logger.error(f"Campaign generation failed for {company_name}: {e}")
return {"error": str(e)}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode", required=True)
parser.add_argument("--reference_url")
parser.add_argument("--context_file")
parser.add_argument("--target_market")
parser.add_argument("--company_name")
parser.add_argument("--strategy_json")
parser.add_argument("--summary_of_offer")
parser.add_argument("--company_data_file")
parser.add_argument("--specific_role")
parser.add_argument("--language", default="de") # New Argument
args = parser.parse_args()
if args.mode == "generate_strategy":
with open(args.context_file, "r") as f: context = f.read()
print(json.dumps(generate_search_strategy(args.reference_url, context, args.language)))
elif args.mode == "identify_competitors":
industries = []
if args.context_file:
with open(args.context_file, "r") as f: context = f.read()
industries = _extract_target_industries_from_context(context)
print(json.dumps(identify_competitors(args.reference_url, args.target_market, industries, args.summary_of_offer, args.language)))
elif args.mode == "analyze_company":
strategy = json.loads(args.strategy_json)
print(json.dumps(analyze_company(args.company_name, strategy, args.target_market, args.language)))
elif args.mode == "generate_outreach":
with open(args.company_data_file, "r") as f: company_data = json.load(f)
with open(args.context_file, "r") as f: knowledge_base = f.read()
print(json.dumps(generate_outreach_campaign(company_data, knowledge_base, args.reference_url, args.specific_role, args.language)))
if __name__ == "__main__":
sys.stdout.reconfigure(encoding='utf-8')
try:
main()
sys.stdout.flush()
except Exception as e:
logger.critical(f"Unhandled Exception in Main: {e}", exc_info=True)
# Fallback JSON output so the server doesn't crash on parse error
error_json = json.dumps({"error": f"Critical Script Error: {str(e)}", "details": "Check market_intel.log"})
print(error_json)
sys.exit(1)

View File

@@ -0,0 +1,909 @@
import argparse
import base64
import json
import logging
import re
import sys
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from config import Config
import gtm_db_manager as db_manager
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from helpers import call_gemini_flash, scrape_website_details, call_gemini_image
from config import Config, BASE_DIR # Import Config and BASE_DIR
LOG_DIR = "Log_from_docker"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
ORCHESTRATOR_VERSION = "1.3.0" # Bump version for image fix & language enforcement
run_timestamp = datetime.now().strftime("%y-%m-%d_%H-%M-%S")
log_file_path = os.path.join(LOG_DIR, f"{run_timestamp}_gtm_orchestrator_run.log")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file_path, mode='a', encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
logging.info(f"GTM Architect Orchestrator v{ORCHESTRATOR_VERSION} ({run_timestamp}) starting...")
# !!! CRITICAL FIX: Load API keys at the very beginning !!!
# This ensures Config.API_KEYS is populated before any AI functions are called.
Config.load_api_keys()
def log_and_save(project_id, step_name, data_type, content):
logging.info(f"Project {project_id} - Step: {step_name} - Type: {data_type}")
filename = f"{run_timestamp}_{step_name}_{data_type}.txt"
filepath = os.path.join(LOG_DIR, filename)
try:
with open(filepath, 'w', encoding='utf-8') as f:
if isinstance(content, (dict, list)):
json.dump(content, f, indent=4, ensure_ascii=False)
else:
f.write(str(content))
logging.info(f"Saved {data_type} to {filepath}")
except Exception as e:
logging.error(f"Failed to save {data_type} to file: {e}")
def get_system_instruction(lang):
if lang == 'de':
return """
Du bist ein internationaler Go-to-Market (GTM) Experte für B2B-Technologie-Unternehmen im Bereich Robotik, Facility Management und IoT.
Deine Aufgabe ist es, aus technischen Spezifikationen und Produktbeschreibungen eine umfassende GTM-Strategie zu entwickeln.
Du arbeitest strukturiert, datengetrieben und präzise. Deine Antworten sind immer klar, professionell und direkt auf den Punkt.
Wenn du JSON ausgeben sollst, gib NUR das JSON-Objekt aus, ohne umschließende Text- oder Code-Formatierungen.
Behalte während des gesamten Prozesses eine konsistente Logik bei. Alle Phasen bauen aufeinander auf.
Führe eine interne Plausibilitätsprüfung durch, bevor du eine Antwort gibst.
# CONTEXT: THE WACKLER GROUP ECOSYSTEM
Wir sind Teil der Wackler Group. Wir nutzen das gesamte Dienstleistungsportfolio der Muttergesellschaft, um Hardware-Schwächen in Service-Stärken zu verwandeln.
Das Ziel ist immer eine "Symbiose aus Mensch & Maschine".
# REGEL 5: THE "DYNAMIC SERVICE" LOGIC (UNIVERSAL)
Analysiere zuerst die **Kategorie** des Roboters und wende dann die passende Hybrid-Logik an:
1. CLEANING INDOOR (CARPET) - Sauger für Teppiche
* Robot: Macht die Fläche (80%).
* Human (Wackler Cleaning): Macht Kanten, Ecken, Fleckenentfernung (20%).
2. CLEANING INDOOR (WET SURFACE) - Scheuersauger (Hartboden)
* Robot: Reinigt Flure/Hallen kontinuierlich.
* Human (Wackler Cleaning): Sicherheits-Check (Rutschgefahr), Wasserwechsel, Hygiene-Audit.
3. CLEANING OUTDOOR (SWEEPER) - Kehrmaschine (Asphalt)
* Robot: Nimmt Feinstaub und Zigaretten auf.
* Human (Wackler Cleaning): Leert Mülleimer, entfernt Sperrmüll, pflegt Grünanlagen.
4. POS ROBOTER - Retail/Airport Assistenz
* Robot: Information, Wegweiser, Blickfang.
* Human (Wackler Service): Beratung, Verkauf, emotionale Kundenbindung.
5. SECURITY ROBOTER - Mobile Überwachung (Quadruped/Drohne)
* Robot: "Detektion & Präsenz". 24/7 Patrouille, Wärmebild, keine Müdigkeit.
* Human (Wackler Security): "Bewertung & Intervention". NSL bewertet Alarm, Interventionskraft fährt raus.
* Pitch: "Der Roboter sieht die Gefahr, Wackler beseitigt sie."
6. SERVICE ROBOTER - Transport (Gastro/Klinik)
* Robot: Schweres Tragen (Tabletts, Wäsche) von A nach B.
* Human (Wackler Service): Patientenkontakt, Tisch-Service, Hygiene.
7. TRANSPORT ROBOTER - Intralogistik (Lager)
* Robot: Paletten-Transport, Milkrun.
* Human (Wackler Logistics): Prozesssteuerung, Ausnahmebehandlung, Umpacken.
Wende diese spezifische Logik zwingend in PHASE 4 (Strategy) und PHASE 6 (Sales Enablement) an.
WICHTIG: Antworte IMMER in der vom User geforderten Sprache (Deutsch), auch wenn der Input Englisch ist.
"""
else: # Default to English
return """
You are an international Go-to-Market (GTM) expert for B2B technology companies in robotics, facility management, and IoT.
Your task is to develop a comprehensive GTM strategy from technical specifications and product descriptions.
You are structured, data-driven, and precise. Your answers are always clear, professional, and to the point.
When asked to output JSON, provide ONLY the JSON object without any surrounding text or code formatting.
Maintain consistent logic throughout the process. All phases build on each other.
Perform an internal plausibility check before providing an answer.
# CONTEXT: THE WACKLER GROUP ECOSYSTEM
We are part of the Wackler Group. We leverage the full service portfolio of the parent company to turn hardware weaknesses into service strengths.
The goal is always a "Symbiosis of Man & Machine".
# RULE 5: THE "DYNAMIC SERVICE" LOGIC (UNIVERSAL)
First analyze the **category** of the robot and then apply the appropriate hybrid logic:
1. CLEANING INDOOR (CARPET) - Vacuums for carpets
* Robot: Does the area (80%).
* Human (Wackler Cleaning): Does edges, corners, spot removal (20%).
2. CLEANING INDOOR (WET SURFACE) - Scrubber dryers (Hard floor)
* Robot: Cleans halls/corridors continuously.
* Human (Wackler Cleaning): Safety check (slip hazard), water change, hygiene audit.
3. CLEANING OUTDOOR (SWEEPER) - Sweepers (Asphalt)
* Robot: Picks up fine dust and cigarettes.
* Human (Wackler Cleaning): Empties bins, removes bulky waste, maintains greenery.
4. POS ROBOT - Retail/Airport Assistance
* Robot: Information, wayfinding, eye-catcher.
* Human (Wackler Service): Consultation, sales, emotional customer bonding.
5. SECURITY ROBOT - Mobile Surveillance (Quadruped/Drone)
* Robot: "Detection & Presence". 24/7 patrol, thermal imaging, no fatigue.
* Human (Wackler Security): "Evaluation & Intervention". NSL evaluates alarm, intervention force drives out.
* Pitch: "The robot sees the danger, Wackler eliminates it."
6. SERVICE ROBOT - Transport (Hospitality/Clinic)
* Robot: Heavy lifting (trays, laundry) from A to B.
* Human (Wackler Service): Patient contact, table service, hygiene.
7. TRANSPORT ROBOT - Intralogistics (Warehouse)
* Robot: Pallet transport, milkrun.
* Human (Wackler Logistics): Process control, exception handling, repacking.
Mandatory application of this logic in PHASE 4 (Strategy) and PHASE 6 (Sales Enablement).
IMPORTANT: Always answer in the requested language.
"""
def get_output_lang_instruction(lang):
"""Returns a strong instruction to enforce the output language."""
if lang == 'de':
return "ACHTUNG: Die gesamte Ausgabe (JSON-Werte, Texte, Analysen) MUSS in DEUTSCH sein. Übersetze englische Input-Daten."
return "IMPORTANT: The entire output MUST be in ENGLISH."
# --- ORCHESTRATOR PHASES ---
def list_history(payload):
projects = db_manager.get_all_projects()
return {"projects": projects}
def load_history(payload):
project_id = payload.get('projectId')
if not project_id:
raise ValueError("No projectId provided for loading history.")
data = db_manager.get_project_data(project_id)
if not data:
raise ValueError(f"Project {project_id} not found.")
# FIX: Check for and parse stringified JSON in phase results
if 'phases' in data and isinstance(data['phases'], dict):
for phase_name, phase_result in data['phases'].items():
if isinstance(phase_result, str):
try:
data['phases'][phase_name] = json.loads(phase_result)
except json.JSONDecodeError:
logging.warning(f"Could not decode JSON for {phase_name} in project {project_id}. Leaving as is.")
return data
def delete_session(payload):
project_id = payload.get('projectId')
if not project_id:
raise ValueError("No projectId provided for deletion.")
return db_manager.delete_project(project_id)
def phase1(payload):
product_input = payload.get('productInput', '')
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
# Check if input is a URL and scrape it
if product_input.strip().startswith('http'):
logging.info(f"Input detected as URL. Starting scrape for: {product_input}")
analysis_content = scrape_website_details(product_input)
if "Fehler:" in analysis_content:
# If scraping fails, use the URL itself with a note for the AI.
analysis_content = f"Scraping der URL {product_input} ist fehlgeschlagen. Analysiere das Produkt basierend auf der URL und deinem allgemeinen Wissen."
logging.warning("Scraping failed. Using URL as fallback content for analysis.")
else:
analysis_content = product_input
logging.info("Input is raw text. Analyzing directly.")
# AUTOMATISCHE PROJEKTERSTELLUNG
if not project_id:
# Generiere Namen aus Input
raw_name = product_input.strip()
if raw_name.startswith('http'):
name = f"Web Analysis: {raw_name[:30]}..."
else:
name = (raw_name[:30] + "...") if len(raw_name) > 30 else raw_name
logging.info(f"Creating new project: {name}")
new_proj = db_manager.create_project(name)
project_id = new_proj['id']
logging.info(f"New Project ID: {project_id}")
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 1: PRODUCT ANALYSIS & CONSTRAINTS
Input: "{analysis_content}"
Task:
1. Extract and CONSOLIDATE technical features into 8-12 high-level core capabilities or value propositions. Group minor specs (e.g., specific ports like USB/Ethernet) into broader categories (e.g., "Connectivity & Integration"). Do NOT list every single hardware spec individually. Focus on what matters for the buyer.
2. Define hard constraints (e.g., physical dimensions, max payload, environment limitations).
3. Classify the product into one of the 7 Wackler Categories: [Cleaning Indoor (Carpet), Cleaning Indoor (Wet), Cleaning Outdoor (Sweeper), POS Robot, Security Robot, Service Robot, Transport Robot].
4. Check for internal portfolio conflicts (hypothetical product "Scrubber 5000").
{lang_instr}
Output JSON format ONLY: {{"features": [], "constraints": [], "category": "Identified Category", "conflictCheck": {{"hasConflict": false, "details": "", "relatedProduct": ""}}, "rawAnalysis": ""}}
"""
log_and_save(project_id, "phase1", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase1", "response", response)
try:
data = json.loads(response)
# --- PART 2: HARD FACTS EXTRACTION ---
spec_schema = """
{
"metadata": {
"product_id": "string (slug)",
"brand": "string",
"model_name": "string",
"description": "string (short marketing description of the product)",
"category": "cleaning | service | security | industrial",
"manufacturer_url": "string"
},
"core_specs": {
"battery_runtime_min": "integer (standardized to minutes)",
"charge_time_min": "integer (standardized to minutes)",
"weight_kg": "float",
"dimensions_cm": { "l": "float", "w": "float", "h": "float" },
"max_slope_deg": "float",
"ip_rating": "string",
"climb_height_cm": "float",
"navigation_type": "string (e.g. SLAM, LiDAR, VSLAM)",
"connectivity": ["string"]
},
"layers": {
"cleaning": {
"fresh_water_l": "float",
"dirty_water_l": "float",
"area_performance_sqm_h": "float",
"mop_pressure_kg": "float"
},
"service": {
"max_payload_kg": "float",
"number_of_trays": "integer",
"display_size_inch": "float",
"ads_capable": "boolean"
},
"security": {
"camera_types": ["string"],
"night_vision": "boolean",
"gas_detection": ["string"],
"at_interface": "boolean"
}
},
"extended_features": [
{ "feature": "string", "value": "string", "unit": "string" }
]
}
"""
specs_prompt = f"""
PHASE 1 (Part 2): HARD FACT EXTRACTION
Input: "{analysis_content}"
Task: Extract technical specifications strictly according to the provided JSON schema.
NORMALIZATION RULES (STRICTLY FOLLOW):
1. Time: Convert ALL time values (runtime, charging) to MINUTES (Integer). Example: "1:30 h" -> 90, "2 hours" -> 120.
2. Dimensions/Weight: All lengths in CM, weights in KG.
3. Performance: Area performance always in m²/h.
4. Booleans: Use true/false (not strings).
5. Unknowns: If a value is not in the text, set it to null. DO NOT HALLUCINATE.
LOGIC FOR LAYERS:
- If product uses water/brushes -> Fill 'layers.cleaning'.
- If product delivers items/trays -> Fill 'layers.service'.
- If product patrols/detects -> Fill 'layers.security'.
EXTENDED FEATURES:
- Put any technical feature that doesn't fit the schema into 'extended_features'.
Output JSON format ONLY based on this schema:
{spec_schema}
"""
log_and_save(project_id, "phase1_specs", "prompt", specs_prompt)
specs_response = call_gemini_flash(specs_prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase1_specs", "response", specs_response)
try:
specs_data = json.loads(specs_response)
# FORCE URL PERSISTENCE: If input was a URL, ensure it's in the metadata
if product_input.strip().startswith('http'):
if 'metadata' not in specs_data:
specs_data['metadata'] = {}
specs_data['metadata']['manufacturer_url'] = product_input.strip()
# AUTO-RENAME PROJECT based on extracted metadata
if 'metadata' in specs_data:
brand = specs_data['metadata'].get('brand', '')
model = specs_data['metadata'].get('model_name', '')
if brand or model:
new_name = f"{brand} {model}".strip()
if new_name:
logging.info(f"Renaming project {project_id} to: {new_name}")
db_manager.update_project_name(project_id, new_name)
data['specs'] = specs_data
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON from Gemini response in phase1 (specs): {specs_response}")
data['specs'] = {"error": "Failed to extract specs", "raw": specs_response}
db_manager.save_gtm_result(project_id, 'phase1_result', json.dumps(data))
# WICHTIG: ID zurückgeben, damit Frontend sie speichert
data['projectId'] = project_id
return data
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON from Gemini response in phase1: {response}")
error_response = {
"error": "Die Antwort des KI-Modells war kein gültiges JSON. Das passiert manchmal bei hoher Auslastung. Bitte versuchen Sie es in Kürze erneut.",
"details": response,
"projectId": project_id # Auch bei Fehler ID zurückgeben? Besser nicht, da noch nichts gespeichert.
}
return error_response
def phase2(payload):
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 2: IDEAL CUSTOMER PROFILE (ICP) & DATA PROXIES - STRATEGIC ANALYSIS
**Product Context:**
{json.dumps(phase1_data)}
**Your Task:**
Answer the following strategic questions to determine the Ideal Customer Profiles (ICPs).
**Strategic Questions:**
1. **ICP Identification:** Based on the product's category ({phase1_data.get('category', 'Unknown')}), which 3 industries face the most significant operational challenges (e.g., safety, efficiency, high manual labor costs, security risks) that this product directly solves?
2. **Rationale:** For each identified ICP, provide a concise rationale. Why is this product a perfect fit for this specific industry? (e.g., "Reduces inspection costs by X%", "Improves safety in hazardous environments", "Automates a critical but repetitive task").
3. **Data Proxies:** How can we find these companies online? What specific digital footprints (data proxies) do they leave? Think about:
* Keywords on their websites (e.g., 'plant safety', 'autonomous inspection', 'logistics automation').
* Specific job titles on LinkedIn (e.g., 'Head of Security', 'Logistics Manager', 'Maintenance Lead').
* Their participation in specific industry trade shows or publications.
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"icps": [{{"name": "Industry Name", "rationale": "Why it's a fit."}}], "dataProxies": [{{"target": "e.g., Company Websites", "method": "How to find them."}}]}}
"""
log_and_save(project_id, "phase2", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase2", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase2_result', json.dumps(data))
return data
def phase3(payload):
phase2_data = payload.get('phase2Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 3: WHALE HUNTING & BUYING CENTER ANALYSIS - STRATEGIC ANALYSIS
**Target ICPs (Industries):**
{json.dumps(phase2_data.get('icps'))}
**Your Task:**
Answer the following strategic questions to identify key accounts and decision-makers.
**Strategic Questions:**
1. **Whale Identification:** For each ICP, identify 3-5 specific 'Whale' companies in the DACH market. These should be leaders, innovators, or companies with significant scale in that sector.
2. **Buying Center Roles:** Identify the specific job titles for the 4 Universal Strategic Archetypes in the context of these industries.
* **Operativer Entscheider:** Who feels the pain daily? (e.g., Plant Manager, Store Manager, Head of Logistics).
* **Infrastruktur Verantwortlicher:** Who has to integrate it? (e.g., IT Security, Facility Manager, Legal/Compliance).
* **Wirtschaftlicher Entscheider:** Who signs the check? (e.g., CFO, Purchasing Director).
* **Innovations-Treiber:** Who pushes for the pilot? (e.g., CDO, Strategy Lead).
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"whales": [{{"industry": "ICP Name", "accounts": ["Company A", "Company B"]}}], "roles": ["Operativer Entscheider: [Job Titles]", "Infrastruktur Verantwortlicher: [Job Titles]", "Wirtschaftlicher Entscheider: [Job Titles]", "Innovations-Treiber: [Job Titles]"]}}
"""
log_and_save(project_id, "phase3", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase3", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase3_result', json.dumps(data))
return data
def phase4(payload):
phase3_data = payload.get('phase3Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
all_accounts = []
for w in phase3_data.get('whales', []):
all_accounts.extend(w.get('accounts', []))
prompt = f"""
PHASE 4: STRATEGY & ANGLE DEVELOPMENT - STRATEGIC ANALYSIS
**Product Category:** {phase1_data.get('category')}
**Target Industries:** {json.dumps([w.get('industry') for w in phase3_data.get('whales', [])])}
**Product Features:** {json.dumps(phase1_data.get('features'))}
**Your Task:**
Answer the following strategic questions to build the core of our market approach.
**Strategic Questions:**
1. **Pain Point Analysis:** For each industry segment, what is the single most significant, measurable **Pain Point** this product solves?
2. **Develop the Angle:** What is our unique story? The "Angle" should directly connect a product capability to their primary pain point.
3. **Define Differentiation (Hybrid Service):** Why should they choose us? Explain the specific "Service Gap" that our Hybrid Model (Machine + Human) closes for this specific Category ({phase1_data.get('category')}). E.g., for Security, the gap is "Intervention"; for Cleaning, it is "Edges/Hygiene".
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"strategyMatrix": [{{"segment": "Target Industry", "painPoint": "The core problem.", "angle": "Our unique story.", "differentiation": "Why us (Hybrid Service logic)."}}]}}
"""
log_and_save(project_id, "phase4", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase4", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase4_result', json.dumps(data))
return data
def phase5(payload):
phase4_data = payload.get('phase4Data', {})
phase3_data = payload.get('phase3Data', {})
phase2_data = payload.get('phase2Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
# Logging zur Diagnose
strat_matrix = phase4_data.get('strategyMatrix', [])
logging.info(f"Phase 5 Input Check - Strategy Matrix Rows: {len(strat_matrix)}")
# SPEZIAL-INSTRUKTION FÜR PHASE 5 (REPORTING)
# Wir überschreiben hier die globale JSON-Instruktion, um ausführlichen Text zu erzwingen.
if lang == 'de':
report_sys_instr = """
Du bist ein Senior Business Consultant bei einer Top-Tier-Beratung (wie McKinsey oder BCG).
Deine Aufgabe ist es, einen strategisch tiefgehenden, detaillierten "Go-to-Market Strategy Report" zu verfassen.
REGELN:
1. **Kein JSON:** Deine Ausgabe ist reines, sauber formatiertes Markdown.
2. **Senior Grade:** Schreibe nicht stichpunktartig "dünn", sondern formuliere ganze Sätze und erkläre die Zusammenhänge ("Why it matters").
3. **Vollständigkeit:** Brich niemals mitten in einer Tabelle oder einem Satz ab.
4. **Formatierung:** Nutze Fettgedrucktes, Listen und Tabellen, um die Lesbarkeit zu erhöhen.
"""
else:
report_sys_instr = """
You are a Senior Business Consultant at a top-tier firm (like McKinsey or BCG).
Your task is to write a strategically deep, detailed "Go-to-Market Strategy Report".
RULES:
1. **No JSON:** Your output is pure, cleanly formatted Markdown.
2. **Senior Grade:** Do not write "thin" bullet points. Write full sentences and explain the context ("Why it matters").
3. **Completeness:** Never stop in the middle of a table or sentence.
4. **Formatting:** Use bolding, lists, and tables to enhance readability.
"""
lang_instr = get_output_lang_instruction(lang)
# Reduziere Input-Daten auf das Wesentliche, um den Output-Fokus zu verbessern
# FIX: Include 'specs' (Hard Facts) for the report
lean_phase1 = {
"features": phase1_data.get('features', []),
"constraints": phase1_data.get('constraints', []),
"specs": phase1_data.get('specs', {}),
"category": phase1_data.get('category', 'Unknown')
}
prompt = f"""
PHASE 5: FINAL REPORT GENERATION
INPUT DATA:
- Product: {json.dumps(lean_phase1)}
- ICPs: {json.dumps(phase2_data.get('icps', []))}
- Targets: {json.dumps(phase3_data.get('whales', []))}
- Strategy Matrix: {json.dumps(phase4_data.get('strategyMatrix', []))}
TASK:
Write the "GTM STRATEGY REPORT v3.1" in Markdown.
Expand on the input data. Don't just copy it. Interpret it.
REQUIRED STRUCTURE & CONTENT:
# GTM STRATEGY REPORT v3.1
## 1. Strategic Core
* **Category Definition:** Explicitly state that this product falls under the '{lean_phase1.get('category')}' category.
* **Dynamic Service Logic:** Explain clearly how the "Machine Layer" (What the robot does) and the "Human Service Layer" (What Wackler does) work together for THIS specific category. Use the logic defined for '{lean_phase1.get('category')}'.
## 2. Executive Summary
* Write a compelling management summary (approx. 150 words) outlining the market opportunity and the core value proposition.
## 3. Product Reality Check (Technical Deep Dive)
* **Core Capabilities:** Summarize the top 3-5 capabilities.
* **Technical Constraints:** Create a detailed Markdown table for the Hard Facts.
* Include ALL available specs (Dimensions, Weight, Runtime, Limits, Sensor types, Cleaning performance, etc.) from the input.
* Make it as comprehensive as a technical datasheet to satisfy the "Evaluator" persona.
| Feature | Value | Implication |
| :--- | :--- | :--- |
| ... | ... | ... |
## 4. Target Architecture (ICPs)
* For each ICP, write a short paragraph explaining the "Strategic Fit". Why is this industry under pressure to buy?
* Mention key "Whale" accounts identified.
## 5. Strategy Matrix
* Create a detailed Markdown table mapping the strategy.
* **CRITICAL:** Ensure the table syntax is perfect. use <br> for line breaks inside cells.
* Columns: **Target Segment** | **The Pain (Operational)** | **The Angle (Story)** | **Differentiation (Service Gap)**
* Fill this table with the data from the 'Strategy Matrix' input.
## 6. Operational GTM Roadmap
* **Step 1: Lead Gen:** Recommend specific Inbound/Outbound tactics for these ICPs.
* **Step 2: Consultative Sales:** How to handle the site-check? What constraints need checking?
* **Step 3: Proof of Value:** Define the Pilot Phase (Paid Pilot vs. Free PoC).
* **Step 4: Expansion:** Path to RaaS/Service contracts.
## 7. Commercial Logic (ROI Framework)
* Present the ROI calculation logic.
* **The Formula:** Show the Net Value formula.
* **Input Variables:** List the specific variables the customer needs to provide.
* **Example Calculation:** Provide a hypothetical example calculation with plausible ranges (e.g. "Assuming 20-30% efficiency gain...") to illustrate the potential.
{lang_instr}
Output: Return strictly MARKDOWN formatted text.
"""
log_and_save(project_id, "phase5", "prompt", prompt)
# Use the specialized system instruction here!
report = call_gemini_flash(prompt, system_instruction=report_sys_instr, json_mode=False)
# Clean up potentially fenced markdown code blocks
report = report.strip()
if report.startswith("```markdown"):
report = report.replace("```markdown", "", 1)
if report.startswith("```"):
report = report.replace("```", "", 1)
if report.endswith("```"):
report = report[:-3]
report = report.strip()
log_and_save(project_id, "phase5", "response", report)
db_manager.save_gtm_result(project_id, 'phase5_result', json.dumps({"report": report}))
return {"report": report}
def phase6(payload):
phase4_data = payload.get('phase4Data', {})
phase3_data = payload.get('phase3Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 6: SALES ENABLEMENT & VISUALS - STRATEGIC ANALYSIS
**Context:**
- Product Features: {json.dumps(phase1_data.get('features'))}
- Personas: {json.dumps(phase3_data.get('roles'))}
- Strategy: {json.dumps(phase4_data.get('strategyMatrix'))}
**Your Task:**
Answer the following strategic questions to create sales enablement materials.
**Strategic Questions:**
1. **Anticipate Objections:** For each of the 4 key Archetypes (Operative, Infrastructure, Economic, Innovation), what is their most likely and critical **objection**?
* *Special Focus for 'Infrastructure Responsible' (Gatekeeper):* Address **Legal, Liability & Compliance** issues (e.g. GDPR, DGUV V3, accident liability) specifically.
2. **Formulate Battlecards:** For each objection, formulate a concise **response script**.
* *Requirement:* Use specific **proof points** (e.g., "Certified according to...", "Data hosted in Germany", "Insurance coverage by Wackler") instead of generic promises.
3. **Create Visual Prompts:** For the top 3 use cases, write a detailed **visual prompt** for an image generation AI.
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"battlecards": [{{"persona": "Archetype (Job Title)", "objection": "The key objection.", "responseScript": "The compelling response with proof points."}}], "visualPrompts": [{{"title": "Image Title", "context": "Use case description.", "prompt": "Detailed photorealistic prompt."}}]}}
"""
log_and_save(project_id, "phase6", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase6", "response", response)
data = json.loads(response)
if isinstance(data, list):
data = data[0]
db_manager.save_gtm_result(project_id, 'phase6_result', json.dumps(data))
return data
def phase7(payload):
phase4_data = payload.get('phase4Data', {})
phase2_data = payload.get('phase2Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 7: VERTICAL LANDING PAGE COPY - STRATEGIC ANALYSIS
**Context:**
- ICPs: {json.dumps(phase2_data.get('icps'))}
- Strategy: {json.dumps(phase4_data.get('strategyMatrix'))}
**Your Task:**
Create conversion-optimized landing page copy for the top 2 ICPs by answering the following questions.
**Strategic Questions:**
1. **Headline:** What is the most powerful **outcome** for this industry? The headline must grab the attention of a Decider and state this primary result.
2. **Subline:** How can you elaborate on the headline? Briefly mention the core problem this industry faces and introduce our solution as the answer.
3. **Benefit Bullets:** Transform 3-5 key technical features into tangible **benefit statements** for this specific industry. Each bullet point should answer the customer's question: "What's in it for me?".
4. **Call-to-Action (CTA):** What is the logical next step we want the user to take? The CTA should be clear, concise, and action-oriented.
5. **Apply Wackler Symbiosis:** Ensure the copy clearly communicates the value of the robot combined with the human expert service.
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"landingPages": [{{"industry": "ICP Name", "headline": "The compelling headline.", "subline": "The elaborating subline.", "bullets": ["Benefit 1", "Benefit 2"], "cta": "The call to action."}}]}}
"""
log_and_save(project_id, "phase7", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase7", "response", response)
data = json.loads(response)
if isinstance(data, list):
data = data[0]
db_manager.save_gtm_result(project_id, 'phase7_result', json.dumps(data))
return data
def phase8(payload):
phase2_data = payload.get('phase2Data', {})
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 8: COMMERCIAL LOGIC & ROI CALCULATOR - STRATEGIC ANALYSIS
**Context:**
- Product Category: {phase1_data.get('category')}
- ICPs: {json.dumps(phase2_data.get('icps'))}
**Your Task:**
Develop a calculation framework (NOT just random numbers) for the CFO pitch.
**Strategic Questions:**
1. **Identify the Cost Driver:** What is the unit of cost we are attacking?
2. **ROI Formula & Example:** Create a formula: `Net Value = (Savings + Risk Mitigation) - (TCO)`.
* *CRITICAL:* Provide **PLAUSIBLE EXAMPLE RANGES** for efficiency gains (e.g., "Estimate: 20-30% reduction in manual patrol time") instead of just listing the variable.
* **Do NOT output "undefined".** Give a realistic estimation based on the industry context.
3. **Risk Argument:** Financial value of avoiding the worst-case scenario.
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"businessCases": [{{"industry": "ICP Name", "costDriver": "Unit of cost.", "efficiencyGain": "Plausible estimate range (e.g. 25-35%).", "roiFormula": "The formula with defined variables.", "riskArgument": "The cost of inaction."}}]}}
"""
log_and_save(project_id, "phase8", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase8", "response", response)
data = json.loads(response)
if isinstance(data, list):
data = data[0]
db_manager.save_gtm_result(project_id, 'phase8_result', json.dumps(data))
return data
def phase9(payload):
phase1_data = payload.get('phase1Data', {})
phase4_data = payload.get('phase4Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
lang_instr = get_output_lang_instruction(lang)
prompt = f"""
PHASE 9: THE "FEATURE-TO-VALUE" TRANSLATOR - STRATEGIC ANALYSIS
**Context:**
- Input Features: {json.dumps(phase1_data.get('features'))}
- Strategy Pains: {json.dumps([s.get('painPoint') for s in phase4_data.get('strategyMatrix', [])])}
**Your Task:**
Translate technical features into compelling, value-oriented benefits.
**Structured Process:**
1. **State the Feature:** Pick a key technical feature.
2. **Ask "So what?" (The Consequence):** What is the immediate consequence?
3. **Ask "So what?" again (The Value):** What is the ultimate benefit?
4. **Formulate Headline:** Short, powerful headline.
{lang_instr}
**Output:**
Provide your analysis ONLY in the following JSON format:
{{"techTranslations": [{{"feature": "The technical feature.", "story": "The 'So what? So what?' analysis.", "headline": "The final value headline."}}]}}
"""
log_and_save(project_id, "phase9", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase9", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase9_result', json.dumps(data))
return data
def update_specs(payload):
"""
Updates the technical specifications (Hard Facts) for a project.
This allows manual correction of AI-extracted data.
"""
project_id = payload.get('projectId')
new_specs = payload.get('specs')
if not project_id:
raise ValueError("No projectId provided for update_specs.")
if not new_specs:
raise ValueError("No specs provided for update_specs.")
# Load current project data
project_data = db_manager.get_project_data(project_id)
if not project_data:
raise ValueError(f"Project {project_id} not found.")
phases = project_data.get('phases', {})
phase1_result = phases.get('phase1_result')
if not phase1_result:
raise ValueError("Phase 1 result not found. Cannot update specs.")
# FIX: Parse JSON string if necessary
if isinstance(phase1_result, str):
try:
phase1_result = json.loads(phase1_result)
except json.JSONDecodeError:
raise ValueError("Phase 1 result is corrupted (invalid JSON string).")
# Update specs
phase1_result['specs'] = new_specs
# Save back to DB
# We use save_gtm_result which expects a stringified JSON for the phase result
db_manager.save_gtm_result(project_id, 'phase1_result', json.dumps(phase1_result))
logging.info(f"Updated specs for project {project_id}")
return {"status": "success", "specs": new_specs}
def translate(payload):
# ... (to be implemented)
return {"report": "Translated report will be here."}
def image(payload):
prompt = payload.get('prompt', 'No Prompt')
project_id = payload.get('projectId')
aspect_ratio = payload.get('aspectRatio')
ref_images = payload.get('referenceImagesBase64')
ref_image = None
if ref_images and isinstance(ref_images, list) and len(ref_images) > 0:
ref_image = ref_images[0]
elif payload.get('referenceImage'):
ref_image = payload.get('referenceImage')
log_and_save(project_id, "image", "prompt", f"{prompt} (Ratio: {aspect_ratio or 'default'})")
if ref_image:
logging.info(f"Image-Mode: Reference Image found (Length: {len(ref_image)})")
try:
image_b64 = call_gemini_image(prompt, reference_image_b64=ref_image, aspect_ratio=aspect_ratio)
log_and_save(project_id, "image", "response_b64_preview", image_b64[:100] + "...")
return {"imageBase64": f"data:image/png;base64,{image_b64}"}
except Exception as e:
logging.error(f"Failed to generate image: {e}", exc_info=True)
return {"error": "Image generation failed.", "details": str(e)}
def main():
"""
Main entry point of the script.
Parses command-line arguments to determine which phase to run.
"""
parser = argparse.ArgumentParser(description="GTM Architect Orchestrator")
parser.add_argument("--mode", required=True, help="The execution mode (e.g., phase1, phase2).")
parser.add_argument("--payload_base64", help="The Base64 encoded JSON payload (deprecated, use payload_file).")
parser.add_argument("--payload_file", help="Path to a JSON file containing the payload (preferred).")
args = parser.parse_args()
payload = {}
try:
if args.payload_file:
if not os.path.exists(args.payload_file):
raise FileNotFoundError(f"Payload file not found: {args.payload_file}")
with open(args.payload_file, 'r', encoding='utf-8') as f:
payload = json.load(f)
elif args.payload_base64:
payload_str = base64.b64decode(args.payload_base64).decode('utf-8')
payload = json.loads(payload_str)
else:
raise ValueError("No payload provided (neither --payload_file nor --payload_base64).")
except (json.JSONDecodeError, base64.binascii.Error, ValueError, FileNotFoundError) as e:
logging.error(f"Failed to load payload: {e}")
# Print error as JSON to stdout for the server to catch
print(json.dumps({"error": "Invalid payload.", "details": str(e)}))
sys.exit(1)
# Function mapping to dynamically call the correct phase
modes = {
"phase1": phase1,
"phase2": phase2,
"phase3": phase3,
"phase4": phase4,
"phase5": phase5,
"phase6": phase6,
"phase7": phase7,
"phase8": phase8,
"phase9": phase9,
"update_specs": update_specs,
"translate": translate,
"image": image,
"list_history": list_history,
"load_history": load_history,
"delete_session": delete_session,
}
mode_function = modes.get(args.mode)
if not mode_function:
logging.error(f"Invalid mode specified: {args.mode}")
print(json.dumps({"error": f"Invalid mode: {args.mode}"}))
sys.exit(1)
try:
logging.info(f"Executing mode: {args.mode}")
result = mode_function(payload)
# Ensure the output is always a JSON string
print(json.dumps(result, ensure_ascii=False))
logging.info(f"Successfully executed mode: {args.mode}")
except Exception as e:
logging.error(f"An error occurred during execution of mode '{args.mode}': {e}", exc_info=True)
print(json.dumps({"error": f"An error occurred in {args.mode}.", "details": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,194 @@
import sqlite3
import json
import os
import uuid
from datetime import datetime
# Database path for GTM projects
DB_PATH = os.environ.get("GTM_DB_PATH", "/app/gtm_projects.db")
def get_db_connection():
"""Establishes a connection to the SQLite database."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_gtm_db():
"""Initializes the database and creates the gtm_projects table if it doesn't exist."""
try:
conn = get_db_connection()
# A flexible schema to store project-related data in a single JSON column
conn.execute('''
CREATE TABLE IF NOT EXISTS gtm_projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data JSON NOT NULL
)
''')
conn.commit()
finally:
if conn:
conn.close()
def create_project(name):
"""Creates a new project with a given name and returns the new project's ID."""
conn = get_db_connection()
try:
project_id = str(uuid.uuid4())
initial_data = {"id": project_id, "name": name, "phases": {}}
conn.execute(
'INSERT INTO gtm_projects (id, name, data) VALUES (?, ?, ?)',
(project_id, name, json.dumps(initial_data))
)
conn.commit()
return {"id": project_id, "name": name}
finally:
if conn:
conn.close()
def update_project_name(project_id, new_name):
"""Updates the name of an existing project."""
conn = get_db_connection()
try:
conn.execute(
'UPDATE gtm_projects SET name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?',
(new_name, project_id)
)
conn.commit()
return {"id": project_id, "name": new_name, "status": "updated"}
finally:
if conn:
conn.close()
def save_gtm_result(project_id, phase, result):
"""Saves or updates the result of a specific phase for a given project."""
conn = get_db_connection()
try:
# First, load the existing data
cursor = conn.cursor()
cursor.execute('SELECT data FROM gtm_projects WHERE id = ?', (project_id,))
row = cursor.fetchone()
if not row:
return {"error": "Project not found"}
project_data = json.loads(row['data'])
# Update the specific phase result
if 'phases' not in project_data:
project_data['phases'] = {}
project_data['phases'][phase] = result
# Save the updated data back to the DB
cursor.execute(
'''UPDATE gtm_projects
SET data = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?''',
(json.dumps(project_data), project_id)
)
conn.commit()
return {"id": project_id, "status": f"Phase '{phase}' saved successfully."}
finally:
if conn:
conn.close()
def get_project_data(project_id):
"""Retrieves all data for a specific project."""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute('SELECT data FROM gtm_projects WHERE id = ?', (project_id,))
row = cursor.fetchone()
return json.loads(row['data']) if row else None
finally:
if conn:
conn.close()
def get_all_projects():
"""Lists all projects with key details extracted from the JSON data."""
conn = get_db_connection()
try:
query = """
SELECT
id,
name,
updated_at,
json_extract(data, '$.phases.phase1_result.specs.metadata.model_name') AS productName,
json_extract(data, '$.phases.phase1_result.specs.metadata.category') AS productCategory,
json_extract(data, '$.phases.phase1_result.specs.metadata.description') AS productDescription,
json_extract(data, '$.phases.phase1_result.specs.metadata.manufacturer_url') AS sourceUrl
FROM gtm_projects
ORDER BY updated_at DESC
"""
projects = conn.execute(query).fetchall()
# Convert row objects to dictionaries, handling potential None values
project_list = []
for row in projects:
project_dict = dict(row)
if project_dict.get('productName') is None:
project_dict['productName'] = project_dict['name'] # Fallback to project name
if project_dict.get('productCategory') is None:
project_dict['productCategory'] = "Uncategorized" # Default category
if project_dict.get('productDescription') is None:
project_dict['productDescription'] = "No description available." # Default description
if project_dict.get('sourceUrl') is None:
project_dict['sourceUrl'] = "No source URL found." # Default URL
project_list.append(project_dict)
return project_list
finally:
if conn:
conn.close()
def delete_project(project_id):
"""Deletes a project by its ID."""
conn = get_db_connection()
try:
conn.execute('DELETE FROM gtm_projects WHERE id = ?', (project_id,))
conn.commit()
return {"status": "deleted", "id": project_id}
finally:
if conn:
conn.close()
if __name__ == "__main__":
# Simple CLI for testing and potential Node.js bridge
# Usage: python gtm_db_manager.py [init|create|save|load|list|delete] [args...]
import sys
if len(sys.argv) < 2:
print(json.dumps({"error": "Mode is required."}))
sys.exit(1)
mode = sys.argv[1]
if mode == "init":
init_gtm_db()
print(json.dumps({"status": "GTM database initialized"}))
elif mode == "create":
project_name = sys.argv[2] if len(sys.argv) > 2 else "Untitled GTM Project"
print(json.dumps(create_project(project_name)))
elif mode == "save":
project_id = sys.argv[2]
phase = sys.argv[3]
result_json = sys.argv[4]
print(json.dumps(save_gtm_result(project_id, phase, json.loads(result_json))))
elif mode == "load":
project_id = sys.argv[2]
project = get_project_data(project_id)
print(json.dumps(project if project else {"error": "Project not found"}))
elif mode == "list":
print(json.dumps(get_all_projects()))
elif mode == "delete":
project_id = sys.argv[2]
print(json.dumps(delete_project(project_id)))
else:
print(json.dumps({"error": f"Unknown mode: {mode}"}))

120
market_db_manager.py Normal file
View File

@@ -0,0 +1,120 @@
import sqlite3
import json
import os
import uuid
from datetime import datetime
DB_PATH = os.environ.get("DB_PATH", "/app/market_intelligence.db")
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db_connection()
# Flexible schema: We store almost everything in a 'data' JSON column
conn.execute('''
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data JSON NOT NULL
)
''')
conn.commit()
conn.close()
def save_project(project_data):
"""
Saves a project. If 'id' exists in data, updates it. Otherwise creates new.
"""
conn = get_db_connection()
try:
project_id = project_data.get('id')
# Extract a name for the list view (e.g. from companyName or referenceUrl)
# We assume the frontend passes a 'name' field, or we derive it.
name = project_data.get('name') or project_data.get('companyName') or "Untitled Project"
if not project_id:
# Create New
project_id = str(uuid.uuid4())
project_data['id'] = project_id
conn.execute(
'INSERT INTO projects (id, name, data) VALUES (?, ?, ?)',
(project_id, name, json.dumps(project_data))
)
else:
# Update Existing
conn.execute(
'''UPDATE projects
SET name = ?, data = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?''',
(name, json.dumps(project_data), project_id)
)
conn.commit()
return {"id": project_id, "status": "saved"}
except Exception as e:
return {"error": str(e)}
finally:
conn.close()
def get_all_projects():
conn = get_db_connection()
projects = conn.execute('SELECT id, name, created_at, updated_at FROM projects ORDER BY updated_at DESC').fetchall()
conn.close()
return [dict(ix) for ix in projects]
def load_project(project_id):
conn = get_db_connection()
project = conn.execute('SELECT data FROM projects WHERE id = ?', (project_id,)).fetchone()
conn.close()
if project:
return json.loads(project['data'])
return None
def delete_project(project_id):
conn = get_db_connection()
try:
conn.execute('DELETE FROM projects WHERE id = ?', (project_id,))
conn.commit()
return {"status": "deleted", "id": project_id}
except Exception as e:
return {"error": str(e)}
finally:
conn.close()
if __name__ == "__main__":
import sys
# Simple CLI for Node.js bridge
# Usage: python market_db_manager.py [init|list|save|load|delete] [args...]
mode = sys.argv[1]
if mode == "init":
init_db()
print(json.dumps({"status": "initialized"}))
elif mode == "list":
print(json.dumps(get_all_projects()))
elif mode == "save":
# Data is passed as a JSON string file path to avoid command line length limits
data_file = sys.argv[2]
with open(data_file, 'r') as f:
data = json.load(f)
print(json.dumps(save_project(data)))
elif mode == "load":
p_id = sys.argv[2]
result = load_project(p_id)
print(json.dumps(result if result else {"error": "Project not found"}))
elif mode == "delete":
p_id = sys.argv[2]
print(json.dumps(delete_project(p_id)))

View File

@@ -0,0 +1,89 @@
import os
import sys
def check_path(path, description, context_dir="."):
# Cleanup path string
path = path.strip().strip('"').strip("'")
# Ignore internal docker volumes or absolute paths that might be inside container
if not path.startswith("./") and not path.startswith("/") and not path.startswith(".."):
# Assume named volume or config setting
return True
# Split host:container mapping
host_path_raw = path.split(":")[0]
# Resolve relative paths relative to CWD
if host_path_raw.startswith("./"):
host_path = os.path.join(os.getcwd(), host_path_raw[2:])
elif host_path_raw.startswith("../"):
host_path = os.path.abspath(host_path_raw)
else:
host_path = host_path_raw
if os.path.exists(host_path):
print(f"✅ FOUND: {description} -> {host_path_raw}")
return True
else:
print(f"❌ MISSING: {description} -> {host_path_raw}")
return False
def validate_compose_text():
print("--- 🚀 Starting Pre-Flight Check (Text-Based) ---")
if not os.path.exists("docker-compose.yml"):
print("❌ CRITICAL: docker-compose.yml not found!")
return
with open("docker-compose.yml", "r") as f:
lines = f.readlines()
current_service = "Unknown"
all_valid = True
in_volumes = False
for line in lines:
line = line.rstrip()
clean_line = line.strip()
# Detect Service Block (heuristic)
if line.startswith(" ") and not line.startswith(" ") and ":" in line and not clean_line.startswith("#"):
current_service = clean_line.replace(":", "")
print(f"\nScanning Service: [{current_service}]")
in_volumes = False
continue
# Check Context
if "context:" in clean_line:
path = clean_line.split("context:")[1].strip()
if not check_path(path, f"Build Context ({current_service})"):
all_valid = False
# Check Env File
if clean_line.startswith("- .env"):
if not check_path(".env", f"Env File ({current_service})"):
all_valid = False
# Check Volumes Block Start
if clean_line.startswith("volumes:"):
in_volumes = True
continue
# Check Volume Entries
if in_volumes and clean_line.startswith("-") and ":" in clean_line:
# Simple heuristic to stop reading volumes if indentation changes or new block starts
if not line.startswith(" -"):
in_volumes = False
else:
vol_path = clean_line[1:].strip() # Remove dash
if not check_path(vol_path, f"Volume ({current_service})"):
all_valid = False
print("\n--- 🏁 Result ---")
if all_valid:
print("✅ READY FOR TAKEOFF: All referenced files and directories exist.")
else:
print("❌ ABORT: Missing files detected. Migration would fail.")
if __name__ == "__main__":
validate_compose_text()