feat(analysis): Ground Step 8 Reference Analysis

Improves the competitor reference analysis (Step 8) by replacing the previous LLM-only approach with a grounded, scraping-based method.

- Implemented a new scraper to actively search for and parse competitor reference/case study pages.
- The analysis is now based on actual website content, significantly increasing the accuracy and reliability of the results and preventing model hallucinations.
- Updated documentation to reflect the new 'Grounded References' architecture.
This commit is contained in:
2026-01-11 11:01:44 +00:00
parent e10e28c102
commit 63c7219800
4 changed files with 453 additions and 34 deletions

View File

@@ -24,7 +24,6 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY --from=build-stage /app/dist ./dist
# Copy the orchestrator script and .env if needed (though env should be passed via docker-compose)
COPY competitor_analysis_orchestrator.py .
# Expose the port the app runs on
EXPOSE 8000

View File

@@ -55,6 +55,22 @@ if not API_KEY:
if HAS_OLD_GENAI:
old_genai.configure(api_key=API_KEY)
# --- LOGGING SETUP ---
log_dir = "/app/Log_from_docker"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "competitor_analysis_debug.log")
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
],
force=True
)
logging.info("🚀 System started. Logging to {}".format(log_file))
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
@@ -74,7 +90,10 @@ def scrape_text_from_url(url: str) -> str:
return ""
async def discover_and_scrape_website(start_url: str) -> str:
logging.info("Starting discovery for website")
logging.info("Starting discovery for website: {}".format(start_url))
if not start_url:
return ""
base_domain = urlparse(start_url).netloc
urls_to_scrape = {start_url}
@@ -89,7 +108,7 @@ async def discover_and_scrape_website(start_url: str) -> str:
if urlparse(full_url).netloc == base_domain:
urls_to_scrape.add(full_url)
except Exception as e:
logging.error("Failed homepage links: {}".format(e))
logging.error("Failed homepage links for {}: {}".format(start_url, e))
if SERPAPI_KEY:
try:
@@ -100,12 +119,60 @@ async def discover_and_scrape_website(start_url: str) -> str:
for result in results.get("organic_results", []):
urls_to_scrape.add(result["link"])
except Exception as e:
logging.error("SerpAPI failed: {}".format(e))
logging.error("SerpAPI failed for {}: {}".format(start_url, e))
tasks = [asyncio.to_thread(scrape_text_from_url, url) for url in urls_to_scrape]
# Limit to max 5 URLs to prevent timeouts
urls_list = list(urls_to_scrape)[:5]
logging.debug("Scraping URLs for {}: {}".format(start_url, urls_list))
tasks = [asyncio.to_thread(scrape_text_from_url, url) for url in urls_list]
scraped_contents = await asyncio.gather(*tasks)
full_text = "\n\n---" + "-" * 5 + " SEITE " + "-" * 5 + "---" + "\n\n".join(c for c in scraped_contents if c)
return full_text
return full_text[:50000] # Limit context size
async def discover_and_scrape_references_page(start_url: str) -> str:
logging.info("Starting reference discovery for website: {}".format(start_url))
if not start_url:
return ""
base_domain = urlparse(start_url).netloc
urls_to_scrape = {start_url} # Fallback
# 1. Direct Search on Homepage
try:
r = requests.get(start_url, timeout=10, verify=False)
soup = BeautifulSoup(r.content, 'html.parser')
link_keywords = ['referenz', 'kunde', 'case', 'erfolg', 'anwenderbericht', 'customer']
for a in soup.find_all('a', href=True):
href = a['href']
link_text = a.get_text().lower()
if any(k in href.lower() or k in link_text for k in link_keywords):
full_url = urljoin(start_url, href)
if urlparse(full_url).netloc == base_domain:
urls_to_scrape.add(full_url)
except Exception as e:
logging.error("Failed to find reference links on {}: {}".format(start_url, e))
# 2. SerpAPI Search if key is available
if SERPAPI_KEY:
try:
search_query = 'site:{} (Referenzen OR "Case Studies" OR Kundenstimmen OR Erfolgsgeschichten)'.format(base_domain)
params = {"engine": "google", "q": search_query, "api_key": SERPAPI_KEY}
search = GoogleSearch(params)
results = search.get_dict()
for result in results.get("organic_results", []):
urls_to_scrape.add(result["link"])
except Exception as e:
logging.error("SerpAPI for references failed for {}: {}".format(start_url, e))
# Limit to max 5 URLs to prevent timeouts
urls_list = list(urls_to_scrape)[:5]
logging.debug("Scraping reference URLs for {}: {}".format(start_url, urls_list))
tasks = [asyncio.to_thread(scrape_text_from_url, url) for url in urls_list]
scraped_contents = await asyncio.gather(*tasks)
full_text = "\n\n---" + "-" * 5 + " SEITE " + "-" * 5 + "---" + "\n\n".join(c for c in scraped_contents if c)
return full_text[:50000]
def parse_json_response(response_text: str) -> Any:
try:
@@ -127,7 +194,7 @@ async def call_gemini_robustly(prompt: str, schema: dict):
if HAS_OLD_GENAI:
try:
logging.debug("Attempting Legacy SDK gemini-2.0-flash")
gen_config = {"temperature": 0.3, "response_mime_type": "application/json"}
gen_config = {"temperature": 0.3, "response_mime_type": "application/json", "max_output_tokens": 8192}
if schema: gen_config["response_schema"] = schema
model = old_genai.GenerativeModel('gemini-2.0-flash', generation_config=gen_config)
logging.debug("PROMPT: {}".format(prompt[:500]))
@@ -142,7 +209,7 @@ async def call_gemini_robustly(prompt: str, schema: dict):
try:
logging.debug("Attempting Modern SDK gemini-1.5-flash")
client_new = genai.Client(api_key=API_KEY)
config_args = {"temperature": 0.3, "response_mime_type": "application/json"}
config_args = {"temperature": 0.3, "response_mime_type": "application/json", "max_output_tokens": 8192}
if schema: config_args["response_schema"] = schema
response = client_new.models.generate_content(
model='gemini-1.5-flash',
@@ -201,24 +268,97 @@ async def fetch_step3_data(request: FetchStep3DataRequest):
schema = {"type": "object", "properties": {"competitor_candidates": {"type": "array", "items": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}, "confidence": {"type": "number"}, "why": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'url', 'confidence', 'why', 'evidence']}}}, "required": ['competitor_candidates']}
return await call_gemini_robustly(prompt.format(request.market_scope, ', '.join(k_terms)), schema)
# --- HELPER: Manual Logging ---
def log_debug(msg):
try:
with open("/app/Log_from_docker/competitor_analysis_debug.log", "a") as f:
f.write("{} [MANUAL] {}\n".format(time.strftime("%Y-%m-%d %H:%M:%S"), msg))
print(msg, flush=True) # Also to stdout for docker logs
except Exception as e:
print("Logging failed: {}".format(e))
async def analyze_single_competitor(competitor: Any, my_company: Any) -> Optional[Dict]:
c_name = competitor.get('name') if isinstance(competitor, dict) else getattr(competitor, 'name', 'Unknown')
c_url = competitor.get('url') if isinstance(competitor, dict) else getattr(competitor, 'url', '')
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
log_debug("➡️ Analyzing single competitor: {} ({})".format(c_name, c_url))
# 1. Scrape (Grounding)
content = ""
if c_url:
content = await discover_and_scrape_website(c_url)
# Context truncated to prevent overload (15k chars is approx 3-4k tokens)
context_text = content[:15000] if content else "Keine Website-Daten verfügbar."
# 2. Focused Prompt
prompt = r"""Du bist Strategie-Berater. Analysiere den Wettbewerber "{c_name}" im Vergleich zu meinem Unternehmen "{my_name}".
DATENBASIS ({c_name}):
{context}
AUFGABE:
Erstelle eine präzise Analyse. Antworte als valides JSON-Objekt (NICHT als Liste).
Struktur:
{{
"competitor": {{ "name": "{c_name}", "url": "{c_url}" }},
"portfolio": [ {{ "product": "...", "purpose": "..." }} ],
"target_industries": ["..."],
"delivery_model": "...",
"overlap_score": 0-100,
"differentiators": ["..."],
"evidence": [ {{ "url": "...", "snippet": "..." }} ]
}}
""".format(c_name=c_name, my_name=my_name, context=context_text, c_url=c_url)
# 3. Call AI
try:
# We use a simplified schema for the single object
single_analysis_schema = {
"type": "object",
"properties": {
"competitor": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}}},
"portfolio": {"type": "array", "items": {"type": "object", "properties": {"product": {"type": "string"}, "purpose": {"type": "string"}}}},
"target_industries": {"type": "array", "items": {"type": "string"}},
"delivery_model": {"type": "string"},
"overlap_score": {"type": "integer"},
"differentiators": {"type": "array", "items": {"type": "string"}},
"evidence": {"type": "array", "items": evidence_schema}
},
"required": ['competitor', 'portfolio', 'target_industries', 'delivery_model', 'overlap_score', 'differentiators', 'evidence']
}
result = await call_gemini_robustly(prompt, single_analysis_schema)
if result:
log_debug("✅ Finished analysis for {}".format(c_name))
return result
else:
log_debug("⚠️ Empty result for {}".format(c_name))
return None
except Exception as e:
log_debug("❌ Error analyzing {}: {}".format(c_name, e))
return None
class FetchStep4DataRequest(BaseModel): company: Any; competitors: List[Any]; language: str
@app.post("/api/fetchStep4Data")
async def fetch_step4_data(request: FetchStep4DataRequest):
comps_list = []
for c in request.competitors:
name = c.get('name') if isinstance(c, dict) else getattr(c, 'name', 'Unknown')
url = c.get('url') if isinstance(c, dict) else getattr(c, 'url', '')
comps_list.append("- {}: {}".format(name, url))
log_debug("=== STEP 4 START ===")
log_debug("Received {} competitors for analysis.".format(len(request.competitors)))
# Parallel Execution: One AI Task per Competitor
tasks = [analyze_single_competitor(c, request.company) for c in request.competitors]
my_company = request.company
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
# Run all in parallel
results = await asyncio.gather(*tasks)
prompt = r"""Analysiere Portfolio für:
{}
Vergleiche mit {}. Antworte JSON."""
# Filter out None results (failures)
valid_analyses = [r for r in results if r is not None]
schema = {"type": "object", "properties": {"analyses": {"type": "array", "items": {"type": "object", "properties": {"competitor": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}}}, "portfolio": {"type": "array", "items": {"type": "object", "properties": {"product": {"type": "string"}, "purpose": {"type": "string"}}}}, "target_industries": {"type": "array", "items": {"type": "string"}}, "delivery_model": {"type": "string"}, "overlap_score": {"type": "integer"}, "differentiators": {"type": "array", "items": {"type": "string"}}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['competitor', 'portfolio', 'target_industries', 'delivery_model', 'overlap_score', 'differentiators', 'evidence']}}}, "required": ['analyses']}
return await call_gemini_robustly(prompt.format('\n'.join(comps_list), my_name), schema)
log_debug("Step 4 Complete. Returning {}/{} analyses.".format(len(valid_analyses), len(request.competitors)))
return {"analyses": valid_analyses}
class FetchStep5DataSilverBulletsRequest(BaseModel): company: Any; analyses: List[Any]; language: str
@app.post("/api/fetchStep5Data_SilverBullets")
@@ -240,17 +380,264 @@ Antworte JSON."""
schema = {"type": "object", "properties": {"silver_bullets": {"type": "array", "items": {"type": "object", "properties": {"competitor_name": {"type": "string"}, "statement": {"type": "string"}}, "required": ['competitor_name', 'statement']}}}, "required": ['silver_bullets']}
return await call_gemini_robustly(prompt.format(my_name, '\n'.join(lines)), schema)
class FetchStep6DataConclusionRequest(BaseModel): company: Any; analyses: List[Any]; products: List[Any]; industries: List[Any]; silver_bullets: List[Any]; language: str
@app.post("/api/fetchStep6Data_Conclusion")
async def fetch_step6_data_conclusion(request: Any):
return await call_gemini_robustly(r"Erstelle Fazit der Analyse. Antworte JSON.", {{}})
async def fetch_step6_data_conclusion(request: FetchStep6DataConclusionRequest):
log_debug("=== STEP 6 START (Conclusion) ===")
my_company = request.company
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
# Context Preparation
product_names = [p.get('name') for p in request.products]
industry_names = [i.get('name') for i in request.industries]
prompt = r"""Du bist Strategie-Berater. Erstelle ein detailliertes Fazit für "{my_name}" basierend auf der Wettbewerbsanalyse.
DEINE PRODUKTE (Zeilen für Matrix 1): {products}
DEINE ZIELBRANCHEN (Zeilen für Matrix 2): {industries}
ANALYSE-DATEN DER WETTBEWERBER:
{analyses_summary}
AUFGABE:
Erstelle eine komplexe JSON-Struktur mit Matrizen.
REGELN FÜR "product_matrix":
1. Erstelle GENAU einen Eintrag pro Produkt aus der Liste "DEINE PRODUKTE".
2. Das Feld "product" darf NUR den Namen aus dieser Liste enthalten (z.B. "Reinigungsroboter"). KEINE Produktnamen der Wettbewerber!
3. WICHTIG: Das Array "availability" MUSS für JEDEN Wettbewerber einen Eintrag enthalten. ({count} Einträge pro Produkt!).
- "competitor": Exakter Name des Wettbewerbers.
- "has_offering": true, wenn er dieses Produkt anbietet, sonst false.
REGELN FÜR "industry_matrix":
1. Erstelle GENAU einen Eintrag pro Branche aus der Liste "DEINE ZIELBRANCHEN".
2. Das Feld "industry" darf NUR den Namen aus dieser Liste enthalten.
3. WICHTIG: Das Array "availability" MUSS für JEDEN Wettbewerber einen Eintrag enthalten.
Antworte strikt nach diesem Schema.
""".format(
my_name=my_name,
count=len(request.analyses),
products=", ".join(product_names),
industries=", ".join(industry_names),
analyses_summary=json.dumps([{ 'name': a.get('competitor',{}).get('name'), 'portfolio': a.get('portfolio'), 'industries': a.get('target_industries'), 'overlap': a.get('overlap_score') } for a in request.analyses], indent=2)
)
schema = {
"type": "object",
"properties": {
"product_matrix": {
"type": "array",
"items": {
"type": "object",
"properties": {
"product": {"type": "string"},
"availability": {
"type": "array",
"items": {
"type": "object",
"properties": {"competitor": {"type": "string"}, "has_offering": {"type": "boolean"}}
}
}
},
"required": ["product", "availability"]
}
},
"industry_matrix": {
"type": "array",
"items": {
"type": "object",
"properties": {
"industry": {"type": "string"},
"availability": {
"type": "array",
"items": {
"type": "object",
"properties": {"competitor": {"type": "string"}, "has_offering": {"type": "boolean"}}
}
}
},
"required": ["industry", "availability"]
}
},
"overlap_scores": {
"type": "array",
"items": {"type": "object", "properties": {"competitor": {"type": "string"}, "score": {"type": "integer"}}}
},
"summary": {"type": "string"},
"opportunities": {"type": "string"},
"next_questions": {"type": "array", "items": {"type": "string"}}
},
"required": ["product_matrix", "industry_matrix", "overlap_scores", "summary", "opportunities", "next_questions"]
}
# We return the object directly under 'conclusion' key in frontend state, but the API usually returns { conclusion: ... }
# Wait, the frontend code says: const { conclusion } = await fetchStep6...
# So we must return { "conclusion": result }
result = await call_gemini_robustly(prompt, schema)
log_debug("RESPONSE STEP 6: {}".format(json.dumps(result, indent=2)))
return {"conclusion": result}
class FetchStep7DataBattlecardsRequest(BaseModel): company: Any; analyses: List[Any]; silver_bullets: List[Any]; language: str
@app.post("/api/fetchStep7Data_Battlecards")
async def fetch_step7_data_battlecards(request: Any):
return await call_gemini_robustly(r"Erstelle Sales Battlecards. Antworte JSON.", {{}})
async def fetch_step7_data_battlecards(request: FetchStep7DataBattlecardsRequest):
log_debug("=== STEP 7 START (Battlecards) ===")
my_company = request.company
my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me')
# Prepare context
comp_context = []
for a in request.analyses:
c_name = a.get('competitor', {}).get('name', 'Unknown')
diffs = a.get('differentiators', [])
comp_context.append(f"- {c_name}: {', '.join(diffs[:3])}")
silver_bullets_context = []
for sb in request.silver_bullets:
silver_bullets_context.append(f"- {sb.get('competitor_name')}: {sb.get('statement')}")
prompt = r"""Erstelle Sales Battlecards (Vertriebskarten) für die folgenden Wettbewerber von "{my_name}".
WETTBEWERBER & UNTERSCHEIDUNGSMERKMALE:
{competitors}
SILVER BULLETS (Argumentationshilfen):
{bullets}
AUFGABE:
Erstelle für JEDEN oben genannten Wettbewerber eine Battlecard.
- "competitor_name": Exakter Name aus der Liste.
- "win_themes": Warum gewinnen wir?
- "kill_points": Schwächen des Gegners.
- "silver_bullet": Das beste Argument (nutze die Silver Bullets als Inspiration).
Antworte JSON.
""".format(
my_name=my_name,
competitors="\n".join(comp_context),
bullets="\n".join(silver_bullets_context)
)
schema = {
"type": "object",
"properties": {
"battlecards": {
"type": "array",
"items": {
"type": "object",
"properties": {
"competitor_name": {"type": "string"},
"competitor_profile": {
"type": "object",
"properties": { "focus": {"type": "string"}, "positioning": {"type": "string"} }
},
"strengths_vs_weaknesses": {"type": "array", "items": {"type": "string"}},
"landmine_questions": {"type": "array", "items": {"type": "string"}},
"silver_bullet": {"type": "string"}
},
"required": ["competitor_name", "competitor_profile", "strengths_vs_weaknesses", "landmine_questions", "silver_bullet"]
}
}
},
"required": ["battlecards"]
}
result = await call_gemini_robustly(prompt, schema)
return result
async def analyze_single_competitor_references(competitor: Any) -> Optional[Dict]:
c_name = competitor.get('name') if isinstance(competitor, dict) else getattr(competitor, 'name', 'Unknown')
c_url = competitor.get('url') if isinstance(competitor, dict) else getattr(competitor, 'url', '')
log_debug("➡️ Analyzing references for single competitor: {} ({})".format(c_name, c_url))
# 1. Scrape (Grounding)
content = ""
if c_url:
content = await discover_and_scrape_references_page(c_url)
context_text = content[:20000] if content else "Keine Website-Daten für Referenzen verfügbar."
# 2. Focused Prompt
prompt = r"""Du bist ein Analyst. Extrahiere Referenzkunden und Case Studies aus dem folgenden Text für das Unternehmen "{c_name}".
DATENBASIS:
{context_text}
AUFGABE:
Identifiziere handfeste Referenzkunden. Wenn keine spezifischen Namen genannt werden, beschreibe die typischen Kunden und Branchen.
Erstelle eine Liste von Referenzen im JSON-Format. Das Ergebnis MUSS ein Objekt sein, das "competitor_name" und "references" enthält.
STRUKTUR:
{{
"competitor_name": "{c_name}",
"references": [
{{
"name": "...",
"industry": "...",
"testimonial_snippet": "...",
"case_study_url": "..."
}}
]
}}
""".format(c_name=c_name, context_text=context_text)
# 3. Call AI
try:
single_ref_schema = {
"type": "object",
"properties": {
"competitor_name": {"type": "string"},
"references": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"industry": {"type": "string"},
"testimonial_snippet": {"type": "string"},
"case_study_url": {"type": "string", "description": "Vollständige URL zur Case Study, falls gefunden."}
},
"required": ["name", "industry"]
}
}
},
"required": ["competitor_name", "references"]
}
result = await call_gemini_robustly(prompt, single_ref_schema)
if result and 'references' in result:
log_debug("✅ Finished reference analysis for {}".format(c_name))
result['competitor_name'] = c_name # Ensure correct name
return result
else:
log_debug("⚠️ Empty or invalid reference result for {}. Returning fallback.".format(c_name))
return {"competitor_name": c_name, "references": []}
except Exception as e:
log_debug("❌ Error analyzing references for {}: {}".format(c_name, e))
return {"competitor_name": c_name, "references": []}
class FetchStep8DataReferenceAnalysisRequest(BaseModel): competitors: List[Any]; language: str
@app.post("/api/fetchStep8Data_ReferenceAnalysis")
async def fetch_step8_data_reference_analysis(request: Any):
return await call_gemini_robustly(r"Finde Referenzkunden. Antworte JSON.", {{}})
async def fetch_step8_data_reference_analysis(request: FetchStep8DataReferenceAnalysisRequest):
log_debug("=== STEP 8 START (Grounded References) ===")
# Parallel Execution: One Task per Competitor
tasks = [analyze_single_competitor_references(c) for c in request.competitors]
results = await asyncio.gather(*tasks)
# Filter out None results and ensure structure
valid_analyses = [r for r in results if r is not None]
log_debug("Step 8 Complete. Returning {}/{} reference analyses.".format(len(valid_analyses), len(request.competitors)))
return {
"reference_analysis": valid_analyses,
"groundingMetadata": []
}
# Static Files
dist_path = os.path.join(os.getcwd(), "dist")