feat(gtm-architect): Finalize migration and implement web scraping

- Refactors the gtm-architect Dockerfile for a flat, more efficient build process.
- Implements robust web scraping via BeautifulSoup in helpers.py for URL analysis in phase1.
- Makes shared library imports (gspread, pandas, etc.) in helpers.py optional to prevent ModuleNotFoundErrors in microservices.
- Implements the main execution logic in the orchestrator to handle command-line arguments.
- Updates documentation to reflect the new architecture, scraping feature, and dependency handling.
This commit is contained in:
2026-01-03 08:43:53 +00:00
parent 2663d85ae7
commit 302a211239
7 changed files with 282 additions and 64 deletions

View File

@@ -14,7 +14,7 @@ import gtm_db_manager as db_manager
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from helpers import call_gemini_flash
from helpers import call_gemini_flash, scrape_website_details
LOG_DIR = "Log_from_docker"
if not os.path.exists(LOG_DIR):
@@ -46,35 +46,92 @@ def log_and_save(project_id, step_name, data_type, content):
except Exception as e:
logging.error(f"Failed to save {data_type} to file: {e}")
def get_text_from_url(url):
try:
logging.info(f"Scraping URL: {url}")
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside']):
element.decompose()
text = soup.get_text(separator=' ', strip=True)
logging.info(f"Scraping successful. Content length: {len(text)}")
return text[:30000]
except Exception as e:
logging.error(f"Scraping failed for URL {url}: {e}")
return ""
def get_system_instruction(lang):
# Same as before
pass
if lang == 'de':
return """
Du bist ein internationaler Go-to-Market (GTM) Experte für B2B-Technologie-Unternehmen im Bereich Robotik, Facility Management und IoT.
Deine Aufgabe ist es, aus technischen Spezifikationen und Produktbeschreibungen eine umfassende GTM-Strategie zu entwickeln.
Du arbeitest strukturiert, datengetrieben und präzise. Deine Antworten sind immer klar, professionell und direkt auf den Punkt.
Wenn du JSON ausgeben sollst, gib NUR das JSON-Objekt aus, ohne umschließende Text- oder Code-Formatierungen.
Behalte während des gesamten Prozesses eine konsistente Logik bei. Alle Phasen bauen aufeinander auf.
Führe eine interne Plausibilitätsprüfung durch, bevor du eine Antwort gibst.
Verwende "Wackler Symbiosis" als internes Framework für die Analyse von Produkt-Synergien.
Nutze das "Hybrid Service Logic" Konzept, um zu bewerten, ob ein Produkt mit einer Dienstleistung kombiniert werden muss (z.B. bei hohen Wartungsanforderungen).
"""
else: # Default to English
return """
You are an international Go-to-Market (GTM) expert for B2B technology companies in robotics, facility management, and IoT.
Your task is to develop a comprehensive GTM strategy from technical specifications and product descriptions.
You are structured, data-driven, and precise. Your answers are always clear, professional, and to the point.
When asked to output JSON, provide ONLY the JSON object without any surrounding text or code formatting.
Maintain consistent logic throughout the process. All phases build on each other.
Perform an internal plausibility check before providing an answer.
Use "Wackler Symbiosis" as an internal framework for analyzing product synergies.
Use the "Hybrid Service Logic" concept to evaluate if a product needs to be combined with a service (e.g., due to high maintenance requirements).
"""
# --- ORCHESTRATOR PHASES ---
def phase1(payload):
# ... (implementation from before)
pass
product_input = payload.get('productInput', '')
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
# Check if input is a URL and scrape it
if product_input.strip().startswith('http'):
logging.info(f"Input detected as URL. Starting scrape for: {product_input}")
analysis_content = scrape_website_details(product_input)
if "Fehler:" in analysis_content:
# If scraping fails, use the URL itself with a note for the AI.
analysis_content = f"Scraping der URL {product_input} ist fehlgeschlagen. Analysiere das Produkt basierend auf der URL und deinem allgemeinen Wissen."
logging.warning("Scraping failed. Using URL as fallback content for analysis.")
else:
analysis_content = product_input
logging.info("Input is raw text. Analyzing directly.")
sys_instr = get_system_instruction(lang)
prompt = f"""
PHASE 1: PRODUCT ANALYSIS & CONSTRAINTS
Input: "{analysis_content}"
Task: 1. Extract technical features. 2. Define hard constraints. 3. Check for internal portfolio conflicts (hypothetical product "Scrubber 5000").
Output JSON format ONLY: {{"features": [], "constraints": [], "conflictCheck": {{"hasConflict": false, "details": "", "relatedProduct": ""}}, "rawAnalysis": ""}}
"""
log_and_save(project_id, "phase1", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase1", "response", response)
try:
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase1_result', json.dumps(data))
return data
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON from Gemini response in phase1: {response}")
# Return a structured error that the frontend can display
error_response = {
"error": "Die Antwort des KI-Modells war kein gültiges JSON. Das passiert manchmal bei hoher Auslastung. Bitte versuchen Sie es in Kürze erneut.",
"details": response
}
return error_response
def phase2(payload):
# ... (implementation from before)
pass
phase1_data = payload.get('phase1Data', {})
lang = payload.get('lang', 'de')
project_id = payload.get('projectId')
sys_instr = get_system_instruction(lang)
prompt = f"""
PHASE 2: IDEAL CUSTOMER PROFILE (ICP) & DATA PROXIES
Product Context: {json.dumps(phase1_data)}
Task: 1. Identify top 3 ICPs (Ideal Customer Profiles/Industries). 2. Define data proxies for identifying these ICPs online.
Output JSON format ONLY: {{"icps": [{{"name": "", "rationale": ""}}], "dataProxies": [{{"target": "", "method": ""}}]}}
"""
log_and_save(project_id, "phase2", "prompt", prompt)
response = call_gemini_flash(prompt, system_instruction=sys_instr, json_mode=True)
log_and_save(project_id, "phase2", "response", response)
data = json.loads(response)
db_manager.save_gtm_result(project_id, 'phase2_result', json.dumps(data))
return data
def phase3(payload):
phase2_data = payload.get('phase2Data', {})
@@ -241,8 +298,58 @@ def image(payload):
return {"imageBase64": ""}
def main():
# ... (main function from before)
pass
"""
Main entry point of the script.
Parses command-line arguments to determine which phase to run.
"""
parser = argparse.ArgumentParser(description="GTM Architect Orchestrator")
parser.add_argument("--mode", required=True, help="The execution mode (e.g., phase1, phase2).")
parser.add_argument("--payload_base64", required=True, help="The Base64 encoded JSON payload.")
args = parser.parse_args()
try:
payload_str = base64.b64decode(args.payload_base64).decode('utf-8')
payload = json.loads(payload_str)
except (json.JSONDecodeError, base64.binascii.Error) as e:
logging.error(f"Failed to decode payload: {e}")
# Print error as JSON to stdout for the server to catch
print(json.dumps({"error": "Invalid payload format.", "details": str(e)}))
sys.exit(1)
# Function mapping to dynamically call the correct phase
modes = {
"phase1": phase1,
"phase2": phase2,
"phase3": phase3,
"phase4": phase4,
"phase5": phase5,
"phase6": phase6,
"phase7": phase7,
"phase8": phase8,
"phase9": phase9,
"translate": translate,
"image": image,
}
mode_function = modes.get(args.mode)
if not mode_function:
logging.error(f"Invalid mode specified: {args.mode}")
print(json.dumps({"error": f"Invalid mode: {args.mode}"}))
sys.exit(1)
try:
logging.info(f"Executing mode: {args.mode}")
result = mode_function(payload)
# Ensure the output is always a JSON string
print(json.dumps(result, ensure_ascii=False))
logging.info(f"Successfully executed mode: {args.mode}")
except Exception as e:
logging.error(f"An error occurred during execution of mode '{args.mode}': {e}", exc_info=True)
print(json.dumps({"error": f"An error occurred in {args.mode}.", "details": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()