Files
Brancheneinstufung2/competitor-analysis-app/competitor_analysis_orchestrator.py
Floke b5e6c415c7 feat(market-intel): Finalize Level 4 Competitive Radar (Semantics & Relations)
- Implemented semantic classification for Products (e.g. 'Cleaning', 'Logistics') and Battlecards (e.g. 'Price', 'Support').
- Created 'import_competitive_radar.py' for full 4-database relational import to Notion.
- Updated Orchestrator with new prompts for structured output.
- Cleaned up obsolete scripts.
2026-01-11 12:54:12 +00:00

688 lines
29 KiB
Python

import os
import json
import asyncio
import logging
import random
import time
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from urllib.parse import urljoin, urlparse
# --- DEPENDENCIES ---
import requests
from bs4 import BeautifulSoup
from serpapi import GoogleSearch
# --- DUAL SDK IMPORTS ---
HAS_NEW_GENAI = False
HAS_OLD_GENAI = False
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
logging.info("✅ SUCCESS: Loaded 'google-genai' SDK.")
except ImportError:
logging.warning("⚠️ WARNING: 'google-genai' not found. Fallback.")
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
logging.info("✅ SUCCESS: Loaded legacy 'google.generativeai' SDK.")
except ImportError:
logging.warning("⚠️ WARNING: Legacy 'google.generativeai' not found.")
# Load environment variables
load_dotenv()
API_KEY = os.getenv("GEMINI_API_KEY")
SERPAPI_KEY = os.getenv("SERPAPI_KEY")
# Robust API Key Loading
if not API_KEY:
key_file_path = "/app/gemini_api_key.txt"
if os.path.exists(key_file_path):
with open(key_file_path, 'r') as f:
API_KEY = f.read().strip()
if not API_KEY:
raise ValueError("GEMINI_API_KEY not set.")
# Configure SDKs
if HAS_OLD_GENAI:
old_genai.configure(api_key=API_KEY)
# --- LOGGING SETUP ---
log_dir = "/app/Log_from_docker"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "competitor_analysis_debug.log")
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
],
force=True
)
logging.info("🚀 System started. Logging to {}".format(log_file))
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
# --- CORE SCRAPING & AI LOGIC ---
def scrape_text_from_url(url: str) -> str:
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
response = requests.get(url, headers=headers, timeout=10, verify=False)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for element in soup(['script', 'style', 'nav', 'footer', 'aside']):
element.decompose()
return ' '.join(soup.stripped_strings)
except Exception as e:
logging.warning("Failed to scrape: {}".format(e))
return ""
async def discover_and_scrape_website(start_url: str) -> str:
logging.info("Starting discovery for website: {}".format(start_url))
if not start_url:
return ""
base_domain = urlparse(start_url).netloc
urls_to_scrape = {start_url}
try:
r = requests.get(start_url, timeout=10, verify=False)
soup = BeautifulSoup(r.content, 'html.parser')
link_keywords = ['product', 'solution', 'industrie', 'branche', 'lösung', 'anwendung']
for a in soup.find_all('a', href=True):
href = a['href']
if any(k in href.lower() for k in link_keywords):
full_url = urljoin(start_url, href)
if urlparse(full_url).netloc == base_domain:
urls_to_scrape.add(full_url)
except Exception as e:
logging.error("Failed homepage links for {}: {}".format(start_url, e))
if SERPAPI_KEY:
try:
search_query = 'site:{} (produkte OR solutions OR branchen)'.format(base_domain)
params = {"engine": "google", "q": search_query, "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
results = search.get_dict()
for result in results.get("organic_results", []):
urls_to_scrape.add(result["link"])
except Exception as e:
logging.error("SerpAPI failed for {}: {}".format(start_url, e))
# Limit to max 5 URLs to prevent timeouts
urls_list = list(urls_to_scrape)[:5]
logging.debug("Scraping URLs for {}: {}".format(start_url, urls_list))
tasks = [asyncio.to_thread(scrape_text_from_url, url) for url in urls_list]
scraped_contents = await asyncio.gather(*tasks)
full_text = "\n\n---" + "-" * 5 + " SEITE " + "-" * 5 + "---" + "\n\n".join(c for c in scraped_contents if c)
return full_text[:50000] # Limit context size
async def discover_and_scrape_references_page(start_url: str) -> str:
logging.info("Starting reference discovery for website: {}".format(start_url))
if not start_url:
return ""
base_domain = urlparse(start_url).netloc
urls_to_scrape = {start_url} # Fallback
# 1. Direct Search on Homepage
try:
r = requests.get(start_url, timeout=10, verify=False)
soup = BeautifulSoup(r.content, 'html.parser')
link_keywords = ['referenz', 'kunde', 'case', 'erfolg', 'anwenderbericht', 'customer']
for a in soup.find_all('a', href=True):
href = a['href']
link_text = a.get_text().lower()
if any(k in href.lower() or k in link_text for k in link_keywords):
full_url = urljoin(start_url, href)
if urlparse(full_url).netloc == base_domain:
urls_to_scrape.add(full_url)
except Exception as e:
logging.error("Failed to find reference links on {}: {}".format(start_url, e))
# 2. SerpAPI Search if key is available
if SERPAPI_KEY:
try:
search_query = 'site:{} (Referenzen OR "Case Studies" OR Kundenstimmen OR Erfolgsgeschichten)'.format(base_domain)
params = {"engine": "google", "q": search_query, "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
results = search.get_dict()
for result in results.get("organic_results", []):
urls_to_scrape.add(result["link"])
except Exception as e:
logging.error("SerpAPI for references failed for {}: {}".format(start_url, e))
# Limit to max 5 URLs to prevent timeouts
urls_list = list(urls_to_scrape)[:5]
logging.debug("Scraping reference URLs for {}: {}".format(start_url, urls_list))
tasks = [asyncio.to_thread(scrape_text_from_url, url) for url in urls_list]
scraped_contents = await asyncio.gather(*tasks)
full_text = "\n\n---" + "-" * 5 + " SEITE " + "-" * 5 + "---" + "\n\n".join(c for c in scraped_contents if c)
return full_text[:50000]
def parse_json_response(response_text: str) -> Any:
try:
if not response_text: return {}
cleaned_text = response_text.strip()
if cleaned_text.startswith("```"):
lines = cleaned_text.splitlines()
if lines[0].startswith("```"): lines = lines[1:]
if lines[-1].startswith("```"): lines = lines[:-1]
cleaned_text = "\n".join(lines).strip()
result = json.loads(cleaned_text)
return result[0] if isinstance(result, list) and result else result
except Exception as e:
logging.error("CRITICAL: Failed JSON: {}".format(e))
return {}
async def call_gemini_robustly(prompt: str, schema: dict):
last_err = None
if HAS_OLD_GENAI:
try:
logging.debug("Attempting Legacy SDK gemini-2.0-flash")
gen_config = {"temperature": 0.3, "response_mime_type": "application/json", "max_output_tokens": 8192}
if schema: gen_config["response_schema"] = schema
model = old_genai.GenerativeModel('gemini-2.0-flash', generation_config=gen_config)
logging.debug("PROMPT: {}".format(prompt[:500]))
response = await model.generate_content_async(prompt)
logging.debug("RESPONSE: {}".format(response.text[:500]))
return parse_json_response(response.text)
except Exception as e:
last_err = e
logging.warning("Legacy failed: {}".format(e))
if HAS_NEW_GENAI:
try:
logging.debug("Attempting Modern SDK gemini-1.5-flash")
client_new = genai.Client(api_key=API_KEY)
config_args = {"temperature": 0.3, "response_mime_type": "application/json", "max_output_tokens": 8192}
if schema: config_args["response_schema"] = schema
response = client_new.models.generate_content(
model='gemini-1.5-flash',
contents=prompt,
generation_config=types.GenerateContentConfig(**config_args)
)
return parse_json_response(response.text)
except Exception as e:
logging.error("Modern SDK failed: {}".format(e))
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=500, detail="No Gemini SDK available.")
# --- Schemas ---
evidence_schema = {"type": "object", "properties": {"url": {"type": "string"}, "snippet": {"type": "string"}}, "required": ['url', 'snippet']}
product_schema = {"type": "object", "properties": {"name": {"type": "string"}, "purpose": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'purpose', 'evidence']}
industry_schema = {"type": "object", "properties": {"name": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'evidence']}
# --- Endpoints ---
class ProductDetailsRequest(BaseModel): name: str; url: str; language: str
@app.post("/api/fetchProductDetails")
async def fetch_product_details(request: ProductDetailsRequest):
prompt = r"""Analysiere die URL {} und beschreibe den Zweck von "{}" in 1-2 Sätzen. Antworte JSON."""
return await call_gemini_robustly(prompt.format(request.url, request.name), product_schema)
class FetchStep1DataRequest(BaseModel): start_url: str; language: str
@app.post("/api/fetchStep1Data")
async def fetch_step1_data(request: FetchStep1DataRequest):
grounding_text = await discover_and_scrape_website(request.start_url)
prompt = r"""Extrahiere Hauptprodukte und Zielbranchen aus dem Text.
TEXT:
{}
Antworte JSON."""
schema = {"type": "object", "properties": {"products": {"type": "array", "items": product_schema}, "target_industries": {"type": "array", "items": industry_schema}}, "required": ['products', 'target_industries']}
return await call_gemini_robustly(prompt.format(grounding_text), schema)
class FetchStep2DataRequest(BaseModel): products: List[Any]; industries: List[Any]; language: str
@app.post("/api/fetchStep2Data")
async def fetch_step2_data(request: FetchStep2DataRequest):
p_names = []
for p in request.products:
name = p.get('name') if isinstance(p, dict) else getattr(p, 'name', str(p))
p_names.append(name)
prompt = r"""Leite Keywords für Recherche ab: {}. Antworte JSON."""
schema = {"type": "object", "properties": {"keywords": {"type": "array", "items": {"type": "object", "properties": {"term": {"type": "string"}, "rationale": {"type": "string"}}, "required": ['term', 'rationale']}}}, "required": ['keywords']}
return await call_gemini_robustly(prompt.format(', '.join(p_names)), schema)
class FetchStep3DataRequest(BaseModel): keywords: List[Any]; market_scope: str; language: str
@app.post("/api/fetchStep3Data")
async def fetch_step3_data(request: FetchStep3DataRequest):
k_terms = []
for k in request.keywords:
term = k.get('term') if isinstance(k, dict) else getattr(k, 'term', str(k))
k_terms.append(term)
prompt = r"""Finde Wettbewerber für Markt {} basierend auf: {}. Antworte JSON."""
schema = {"type": "object", "properties": {"competitor_candidates": {"type": "array", "items": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}, "confidence": {"type": "number"}, "why": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'url', 'confidence', 'why', 'evidence']}}}, "required": ['competitor_candidates']}
return await call_gemini_robustly(prompt.format(request.market_scope, ', '.join(k_terms)), schema)
# --- HELPER: Manual Logging ---
def log_debug(msg):
try:
with open("/app/Log_from_docker/competitor_analysis_debug.log", "a") as f:
f.write("{} [MANUAL] {}\n".format(time.strftime("%Y-%m-%d %H:%M:%S"), msg))
print(msg, flush=True) # Also to stdout for docker logs
except Exception as e:
print("Logging failed: {}".format(e))
async def analyze_single_competitor(competitor: Any, my_company: Any) -> Optional[Dict]:
c_name = competitor.get('name') if isinstance(competitor, dict) else getattr(competitor, 'name', 'Unknown')
c_url = competitor.get('url') if isinstance(competitor, dict) else getattr(competitor, 'url', '')
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
log_debug("➡️ Analyzing single competitor: {} ({})".format(c_name, c_url))
# 1. Scrape (Grounding)
content = ""
if c_url:
content = await discover_and_scrape_website(c_url)
# Context truncated to prevent overload (15k chars is approx 3-4k tokens)
context_text = content[:15000] if content else "Keine Website-Daten verfügbar."
# 2. Focused Prompt
prompt = r"""Du bist Strategie-Berater. Analysiere den Wettbewerber "{c_name}" im Vergleich zu meinem Unternehmen "{my_name}".
DATENBASIS ({c_name}):
{context}
AUFGABE:
Erstelle eine präzise Analyse. Antworte als valides JSON-Objekt (NICHT als Liste).
STANDARD-KATEGORIEN FÜR PRODUKTE:
- "Cleaning (Indoor)"
- "Cleaning (Outdoor)"
- "Transport/Logistics"
- "Service/Gastro"
- "Security/Inspection"
- "Software/Fleet Mgmt"
- "Other"
Struktur:
{{
"competitor": {{ "name": "{c_name}", "url": "{c_url}" }},
"portfolio": [ {{ "product": "...", "purpose": "...", "category": "..." }} ],
"target_industries": ["..."],
"delivery_model": "...",
"overlap_score": 0-100,
"differentiators": ["..."],
"evidence": [ {{ "url": "...", "snippet": "..." }} ]
}}
""".format(c_name=c_name, my_name=my_name, context=context_text, c_url=c_url)
# 3. Call AI
try:
# We use a simplified schema for the single object
single_analysis_schema = {
"type": "object",
"properties": {
"competitor": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}}},
"portfolio": {"type": "array", "items": {
"type": "object",
"properties": {
"product": {"type": "string"},
"purpose": {"type": "string"},
"category": {"type": "string", "enum": ["Cleaning (Indoor)", "Cleaning (Outdoor)", "Transport/Logistics", "Service/Gastro", "Security/Inspection", "Software/Fleet Mgmt", "Other"]}
}
}},
"target_industries": {"type": "array", "items": {"type": "string"}},
"delivery_model": {"type": "string"},
"overlap_score": {"type": "integer"},
"differentiators": {"type": "array", "items": {"type": "string"}},
"evidence": {"type": "array", "items": evidence_schema}
},
"required": ['competitor', 'portfolio', 'target_industries', 'delivery_model', 'overlap_score', 'differentiators', 'evidence']
}
result = await call_gemini_robustly(prompt, single_analysis_schema)
if result:
log_debug("✅ Finished analysis for {}".format(c_name))
return result
else:
log_debug("⚠️ Empty result for {}".format(c_name))
return None
except Exception as e:
log_debug("❌ Error analyzing {}: {}".format(c_name, e))
return None
class FetchStep4DataRequest(BaseModel): company: Any; competitors: List[Any]; language: str
@app.post("/api/fetchStep4Data")
async def fetch_step4_data(request: FetchStep4DataRequest):
log_debug("=== STEP 4 START ===")
log_debug("Received {} competitors for analysis.".format(len(request.competitors)))
# Parallel Execution: One AI Task per Competitor
tasks = [analyze_single_competitor(c, request.company) for c in request.competitors]
# Run all in parallel
results = await asyncio.gather(*tasks)
# Filter out None results (failures)
valid_analyses = [r for r in results if r is not None]
log_debug("Step 4 Complete. Returning {}/{} analyses.".format(len(valid_analyses), len(request.competitors)))
return {"analyses": valid_analyses}
class FetchStep5DataSilverBulletsRequest(BaseModel): company: Any; analyses: List[Any]; language: str
@app.post("/api/fetchStep5Data_SilverBullets")
async def fetch_step5_data_silver_bullets(request: FetchStep5DataSilverBulletsRequest):
lines = []
for a in request.analyses:
comp_obj = a.get('competitor') if isinstance(a, dict) else getattr(a, 'competitor', {})
name = comp_obj.get('name') if isinstance(comp_obj, dict) else getattr(comp_obj, 'name', 'Unknown')
diffs_list = a.get('differentiators', []) if isinstance(a, dict) else getattr(a, 'differentiators', [])
lines.append("- {}: {}".format(name, ', '.join(diffs_list)))
my_company = request.company
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
prompt = r"""Erstelle Silver Bullets für {} gegen:
{}
Antworte JSON."""
schema = {"type": "object", "properties": {"silver_bullets": {"type": "array", "items": {"type": "object", "properties": {"competitor_name": {"type": "string"}, "statement": {"type": "string"}}, "required": ['competitor_name', 'statement']}}}, "required": ['silver_bullets']}
return await call_gemini_robustly(prompt.format(my_name, '\n'.join(lines)), schema)
class FetchStep6DataConclusionRequest(BaseModel): company: Any; analyses: List[Any]; products: List[Any]; industries: List[Any]; silver_bullets: List[Any]; language: str
@app.post("/api/fetchStep6Data_Conclusion")
async def fetch_step6_data_conclusion(request: FetchStep6DataConclusionRequest):
log_debug("=== STEP 6 START (Conclusion) ===")
my_company = request.company
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
# Context Preparation
product_names = [p.get('name') for p in request.products]
industry_names = [i.get('name') for i in request.industries]
prompt = r"""Du bist Strategie-Berater. Erstelle ein detailliertes Fazit für "{my_name}" basierend auf der Wettbewerbsanalyse.
DEINE PRODUKTE (Zeilen für Matrix 1): {products}
DEINE ZIELBRANCHEN (Zeilen für Matrix 2): {industries}
ANALYSE-DATEN DER WETTBEWERBER:
{analyses_summary}
AUFGABE:
Erstelle eine komplexe JSON-Struktur mit Matrizen.
REGELN FÜR "product_matrix":
1. Erstelle GENAU einen Eintrag pro Produkt aus der Liste "DEINE PRODUKTE".
2. Das Feld "product" darf NUR den Namen aus dieser Liste enthalten (z.B. "Reinigungsroboter"). KEINE Produktnamen der Wettbewerber!
3. WICHTIG: Das Array "availability" MUSS für JEDEN Wettbewerber einen Eintrag enthalten. ({count} Einträge pro Produkt!).
- "competitor": Exakter Name des Wettbewerbers.
- "has_offering": true, wenn er dieses Produkt anbietet, sonst false.
REGELN FÜR "industry_matrix":
1. Erstelle GENAU einen Eintrag pro Branche aus der Liste "DEINE ZIELBRANCHEN".
2. Das Feld "industry" darf NUR den Namen aus dieser Liste enthalten.
3. WICHTIG: Das Array "availability" MUSS für JEDEN Wettbewerber einen Eintrag enthalten.
Antworte strikt nach diesem Schema.
""".format(
my_name=my_name,
count=len(request.analyses),
products=", ".join(product_names),
industries=", ".join(industry_names),
analyses_summary=json.dumps([{ 'name': a.get('competitor',{}).get('name'), 'portfolio': a.get('portfolio'), 'industries': a.get('target_industries'), 'overlap': a.get('overlap_score') } for a in request.analyses], indent=2)
)
schema = {
"type": "object",
"properties": {
"product_matrix": {
"type": "array",
"items": {
"type": "object",
"properties": {
"product": {"type": "string"},
"availability": {
"type": "array",
"items": {
"type": "object",
"properties": {"competitor": {"type": "string"}, "has_offering": {"type": "boolean"}}
}
}
},
"required": ["product", "availability"]
}
},
"industry_matrix": {
"type": "array",
"items": {
"type": "object",
"properties": {
"industry": {"type": "string"},
"availability": {
"type": "array",
"items": {
"type": "object",
"properties": {"competitor": {"type": "string"}, "has_offering": {"type": "boolean"}}
}
}
},
"required": ["industry", "availability"]
}
},
"overlap_scores": {
"type": "array",
"items": {"type": "object", "properties": {"competitor": {"type": "string"}, "score": {"type": "integer"}}}
},
"summary": {"type": "string"},
"opportunities": {"type": "string"},
"next_questions": {"type": "array", "items": {"type": "string"}}
},
"required": ["product_matrix", "industry_matrix", "overlap_scores", "summary", "opportunities", "next_questions"]
}
# We return the object directly under 'conclusion' key in frontend state, but the API usually returns { conclusion: ... }
# Wait, the frontend code says: const { conclusion } = await fetchStep6...
# So we must return { "conclusion": result }
result = await call_gemini_robustly(prompt, schema)
log_debug("RESPONSE STEP 6: {}".format(json.dumps(result, indent=2)))
return {"conclusion": result}
class FetchStep7DataBattlecardsRequest(BaseModel): company: Any; analyses: List[Any]; silver_bullets: List[Any]; language: str
@app.post("/api/fetchStep7Data_Battlecards")
async def fetch_step7_data_battlecards(request: FetchStep7DataBattlecardsRequest):
log_debug("=== STEP 7 START (Battlecards) ===")
my_company = request.company
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
# Prepare context
comp_context = []
for a in request.analyses:
c_name = a.get('competitor', {}).get('name', 'Unknown')
diffs = a.get('differentiators', [])
comp_context.append(f"- {c_name}: {', '.join(diffs[:3])}")
silver_bullets_context = []
for sb in request.silver_bullets:
silver_bullets_context.append(f"- {sb.get('competitor_name')}: {sb.get('statement')}")
prompt = r"""Erstelle Sales Battlecards (Vertriebskarten) für die folgenden Wettbewerber von "{my_name}".
WETTBEWERBER & UNTERSCHEIDUNGSMERKMALE:
{competitors}
SILVER BULLETS (Argumentationshilfen):
{bullets}
KATEGORIEN FÜR LANDMINES & SCHWÄCHEN:
- "Price/TCO"
- "Service/Support"
- "Technology/AI"
- "Performance"
- "Trust/Reliability"
- "Company Viability"
AUFGABE:
Erstelle für JEDEN oben genannten Wettbewerber eine Battlecard.
- "competitor_name": Exakter Name aus der Liste.
- "win_themes": Warum gewinnen wir?
- "kill_points": Schwächen des Gegners.
- "silver_bullet": Das beste Argument.
- "landmine_questions": Kritische Fragen für den Kunden.
- WICHTIG: Ordne jedem Punkt in "landmine_questions" und "strengths_vs_weaknesses" eine der oben genannten Kategorien zu.
Antworte JSON.
""".format(
my_name=my_name,
competitors="\n".join(comp_context),
bullets="\n".join(silver_bullets_context)
)
schema = {
"type": "object",
"properties": {
"battlecards": {
"type": "array",
"items": {
"type": "object",
"properties": {
"competitor_name": {"type": "string"},
"competitor_profile": {
"type": "object",
"properties": { "focus": {"type": "string"}, "positioning": {"type": "string"} }
},
"strengths_vs_weaknesses": {
"type": "array",
"items": {
"type": "object",
"properties": {"text": {"type": "string"}, "category": {"type": "string"}}
}
},
"landmine_questions": {
"type": "array",
"items": {
"type": "object",
"properties": {"text": {"type": "string"}, "category": {"type": "string"}}
}
},
"silver_bullet": {"type": "string"}
},
"required": ["competitor_name", "competitor_profile", "strengths_vs_weaknesses", "landmine_questions", "silver_bullet"]
}
}
},
"required": ["battlecards"]
}
result = await call_gemini_robustly(prompt, schema)
return result
async def analyze_single_competitor_references(competitor: Any) -> Optional[Dict]:
c_name = competitor.get('name') if isinstance(competitor, dict) else getattr(competitor, 'name', 'Unknown')
c_url = competitor.get('url') if isinstance(competitor, dict) else getattr(competitor, 'url', '')
log_debug("➡️ Analyzing references for single competitor: {} ({})".format(c_name, c_url))
# 1. Scrape (Grounding)
content = ""
if c_url:
content = await discover_and_scrape_references_page(c_url)
context_text = content[:20000] if content else "Keine Website-Daten für Referenzen verfügbar."
# 2. Focused Prompt
prompt = r"""Du bist ein Analyst. Extrahiere Referenzkunden und Case Studies aus dem folgenden Text für das Unternehmen "{c_name}".
DATENBASIS:
{context_text}
AUFGABE:
Identifiziere handfeste Referenzkunden. Wenn keine spezifischen Namen genannt werden, beschreibe die typischen Kunden und Branchen.
Erstelle eine Liste von Referenzen im JSON-Format. Das Ergebnis MUSS ein Objekt sein, das "competitor_name" und "references" enthält.
STRUKTUR:
{{
"competitor_name": "{c_name}",
"references": [
{{
"name": "...",
"industry": "...",
"testimonial_snippet": "...",
"case_study_url": "..."
}}
]
}}
""".format(c_name=c_name, context_text=context_text)
# 3. Call AI
try:
single_ref_schema = {
"type": "object",
"properties": {
"competitor_name": {"type": "string"},
"references": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"industry": {"type": "string"},
"testimonial_snippet": {"type": "string"},
"case_study_url": {"type": "string", "description": "Vollständige URL zur Case Study, falls gefunden."}
},
"required": ["name", "industry"]
}
}
},
"required": ["competitor_name", "references"]
}
result = await call_gemini_robustly(prompt, single_ref_schema)
if result and 'references' in result:
log_debug("✅ Finished reference analysis for {}".format(c_name))
result['competitor_name'] = c_name # Ensure correct name
return result
else:
log_debug("⚠️ Empty or invalid reference result for {}. Returning fallback.".format(c_name))
return {"competitor_name": c_name, "references": []}
except Exception as e:
log_debug("❌ Error analyzing references for {}: {}".format(c_name, e))
return {"competitor_name": c_name, "references": []}
class FetchStep8DataReferenceAnalysisRequest(BaseModel): competitors: List[Any]; language: str
@app.post("/api/fetchStep8Data_ReferenceAnalysis")
async def fetch_step8_data_reference_analysis(request: FetchStep8DataReferenceAnalysisRequest):
log_debug("=== STEP 8 START (Grounded References) ===")
# Parallel Execution: One Task per Competitor
tasks = [analyze_single_competitor_references(c) for c in request.competitors]
results = await asyncio.gather(*tasks)
# Filter out None results and ensure structure
valid_analyses = [r for r in results if r is not None]
log_debug("Step 8 Complete. Returning {}/{} reference analyses.".format(len(valid_analyses), len(request.competitors)))
return {
"reference_analysis": valid_analyses,
"groundingMetadata": []
}
# Static Files
dist_path = os.path.join(os.getcwd(), "dist")
if os.path.exists(dist_path):
app.mount("/", StaticFiles(directory=dist_path, html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)