import os import json import asyncio import logging import random import time from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Dict, Any, Optional from urllib.parse import urljoin, urlparse # --- DEPENDENCIES --- import requests from bs4 import BeautifulSoup from serpapi import GoogleSearch # --- DUAL SDK IMPORTS --- HAS_NEW_GENAI = False HAS_OLD_GENAI = False try: from google import genai from google.genai import types HAS_NEW_GENAI = True logging.info("✅ SUCCESS: Loaded 'google-genai' SDK.") except ImportError: logging.warning("⚠️ WARNING: 'google-genai' not found. Fallback.") try: import google.generativeai as old_genai HAS_OLD_GENAI = True logging.info("✅ SUCCESS: Loaded legacy 'google.generativeai' SDK.") except ImportError: logging.warning("⚠️ WARNING: Legacy 'google.generativeai' not found.") # Load environment variables load_dotenv() API_KEY = os.getenv("GEMINI_API_KEY") SERPAPI_KEY = os.getenv("SERPAPI_KEY") # Robust API Key Loading if not API_KEY: key_file_path = "/app/gemini_api_key.txt" if os.path.exists(key_file_path): with open(key_file_path, 'r') as f: API_KEY = f.read().strip() if not API_KEY: raise ValueError("GEMINI_API_KEY not set.") # Configure SDKs if HAS_OLD_GENAI: old_genai.configure(api_key=API_KEY) app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) # --- CORE SCRAPING & AI LOGIC --- def scrape_text_from_url(url: str) -> str: try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} response = requests.get(url, headers=headers, timeout=10, verify=False) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'aside']): element.decompose() return ' '.join(soup.stripped_strings) except Exception as e: logging.warning("Failed to scrape: {}".format(e)) return "" async def discover_and_scrape_website(start_url: str) -> str: logging.info("Starting discovery for website") base_domain = urlparse(start_url).netloc urls_to_scrape = {start_url} try: r = requests.get(start_url, timeout=10, verify=False) soup = BeautifulSoup(r.content, 'html.parser') link_keywords = ['product', 'solution', 'industrie', 'branche', 'lösung', 'anwendung'] for a in soup.find_all('a', href=True): href = a['href'] if any(k in href.lower() for k in link_keywords): full_url = urljoin(start_url, href) if urlparse(full_url).netloc == base_domain: urls_to_scrape.add(full_url) except Exception as e: logging.error("Failed homepage links: {}".format(e)) if SERPAPI_KEY: try: search_query = 'site:{} (produkte OR solutions OR branchen)'.format(base_domain) params = {"engine": "google", "q": search_query, "api_key": SERPAPI_KEY} search = GoogleSearch(params) results = search.get_dict() for result in results.get("organic_results", []): urls_to_scrape.add(result["link"]) except Exception as e: logging.error("SerpAPI failed: {}".format(e)) tasks = [asyncio.to_thread(scrape_text_from_url, url) for url in urls_to_scrape] scraped_contents = await asyncio.gather(*tasks) full_text = "\n\n---" + "-" * 5 + " SEITE " + "-" * 5 + "---" + "\n\n".join(c for c in scraped_contents if c) return full_text def parse_json_response(response_text: str) -> Any: try: if not response_text: return {} cleaned_text = response_text.strip() if cleaned_text.startswith("```"): lines = cleaned_text.splitlines() if lines[0].startswith("```"): lines = lines[1:] if lines[-1].startswith("```"): lines = lines[:-1] cleaned_text = "\n".join(lines).strip() result = json.loads(cleaned_text) return result[0] if isinstance(result, list) and result else result except Exception as e: logging.error("CRITICAL: Failed JSON: {}".format(e)) return {} async def call_gemini_robustly(prompt: str, schema: dict): last_err = None if HAS_OLD_GENAI: try: logging.debug("Attempting Legacy SDK gemini-2.0-flash") gen_config = {"temperature": 0.3, "response_mime_type": "application/json"} if schema: gen_config["response_schema"] = schema model = old_genai.GenerativeModel('gemini-2.0-flash', generation_config=gen_config) logging.debug("PROMPT: {}".format(prompt[:500])) response = await model.generate_content_async(prompt) logging.debug("RESPONSE: {}".format(response.text[:500])) return parse_json_response(response.text) except Exception as e: last_err = e logging.warning("Legacy failed: {}".format(e)) if HAS_NEW_GENAI: try: logging.debug("Attempting Modern SDK gemini-1.5-flash") client_new = genai.Client(api_key=API_KEY) config_args = {"temperature": 0.3, "response_mime_type": "application/json"} if schema: config_args["response_schema"] = schema response = client_new.models.generate_content( model='gemini-1.5-flash', contents=prompt, generation_config=types.GenerateContentConfig(**config_args) ) return parse_json_response(response.text) except Exception as e: logging.error("Modern SDK failed: {}".format(e)) raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail="No Gemini SDK available.") # --- Schemas --- evidence_schema = {"type": "object", "properties": {"url": {"type": "string"}, "snippet": {"type": "string"}}, "required": ['url', 'snippet']} product_schema = {"type": "object", "properties": {"name": {"type": "string"}, "purpose": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'purpose', 'evidence']} industry_schema = {"type": "object", "properties": {"name": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'evidence']} # --- Endpoints --- class ProductDetailsRequest(BaseModel): name: str; url: str; language: str @app.post("/api/fetchProductDetails") async def fetch_product_details(request: ProductDetailsRequest): prompt = r"""Analysiere die URL {} und beschreibe den Zweck von "{}" in 1-2 Sätzen. Antworte JSON.""" return await call_gemini_robustly(prompt.format(request.url, request.name), product_schema) class FetchStep1DataRequest(BaseModel): start_url: str; language: str @app.post("/api/fetchStep1Data") async def fetch_step1_data(request: FetchStep1DataRequest): grounding_text = await discover_and_scrape_website(request.start_url) prompt = r"""Extrahiere Hauptprodukte und Zielbranchen aus dem Text. TEXT: {} Antworte JSON.""" schema = {"type": "object", "properties": {"products": {"type": "array", "items": product_schema}, "target_industries": {"type": "array", "items": industry_schema}}, "required": ['products', 'target_industries']} return await call_gemini_robustly(prompt.format(grounding_text), schema) class FetchStep2DataRequest(BaseModel): products: List[Any]; industries: List[Any]; language: str @app.post("/api/fetchStep2Data") async def fetch_step2_data(request: FetchStep2DataRequest): p_names = [] for p in request.products: name = p.get('name') if isinstance(p, dict) else getattr(p, 'name', str(p)) p_names.append(name) prompt = r"""Leite Keywords für Recherche ab: {}. Antworte JSON.""" schema = {"type": "object", "properties": {"keywords": {"type": "array", "items": {"type": "object", "properties": {"term": {"type": "string"}, "rationale": {"type": "string"}}, "required": ['term', 'rationale']}}}, "required": ['keywords']} return await call_gemini_robustly(prompt.format(', '.join(p_names)), schema) class FetchStep3DataRequest(BaseModel): keywords: List[Any]; market_scope: str; language: str @app.post("/api/fetchStep3Data") async def fetch_step3_data(request: FetchStep3DataRequest): k_terms = [] for k in request.keywords: term = k.get('term') if isinstance(k, dict) else getattr(k, 'term', str(k)) k_terms.append(term) prompt = r"""Finde Wettbewerber für Markt {} basierend auf: {}. Antworte JSON.""" schema = {"type": "object", "properties": {"competitor_candidates": {"type": "array", "items": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}, "confidence": {"type": "number"}, "why": {"type": "string"}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['name', 'url', 'confidence', 'why', 'evidence']}}}, "required": ['competitor_candidates']} return await call_gemini_robustly(prompt.format(request.market_scope, ', '.join(k_terms)), schema) class FetchStep4DataRequest(BaseModel): company: Any; competitors: List[Any]; language: str @app.post("/api/fetchStep4Data") async def fetch_step4_data(request: FetchStep4DataRequest): comps_list = [] for c in request.competitors: name = c.get('name') if isinstance(c, dict) else getattr(c, 'name', 'Unknown') url = c.get('url') if isinstance(c, dict) else getattr(c, 'url', '') comps_list.append("- {}: {}".format(name, url)) my_company = request.company my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me') prompt = r"""Analysiere Portfolio für: {} Vergleiche mit {}. Antworte JSON.""" schema = {"type": "object", "properties": {"analyses": {"type": "array", "items": {"type": "object", "properties": {"competitor": {"type": "object", "properties": {"name": {"type": "string"}, "url": {"type": "string"}}}, "portfolio": {"type": "array", "items": {"type": "object", "properties": {"product": {"type": "string"}, "purpose": {"type": "string"}}}}, "target_industries": {"type": "array", "items": {"type": "string"}}, "delivery_model": {"type": "string"}, "overlap_score": {"type": "integer"}, "differentiators": {"type": "array", "items": {"type": "string"}}, "evidence": {"type": "array", "items": evidence_schema}}, "required": ['competitor', 'portfolio', 'target_industries', 'delivery_model', 'overlap_score', 'differentiators', 'evidence']}}}, "required": ['analyses']} return await call_gemini_robustly(prompt.format('\n'.join(comps_list), my_name), schema) class FetchStep5DataSilverBulletsRequest(BaseModel): company: Any; analyses: List[Any]; language: str @app.post("/api/fetchStep5Data_SilverBullets") async def fetch_step5_data_silver_bullets(request: FetchStep5DataSilverBulletsRequest): lines = [] for a in request.analyses: comp_obj = a.get('competitor') if isinstance(a, dict) else getattr(a, 'competitor', {}) name = comp_obj.get('name') if isinstance(comp_obj, dict) else getattr(comp_obj, 'name', 'Unknown') diffs_list = a.get('differentiators', []) if isinstance(a, dict) else getattr(a, 'differentiators', []) lines.append("- {}: {}".format(name, ', '.join(diffs_list))) my_company = request.company my_name = my_company.get('name') if isinstance(my_company, dict) else getattr(my_company, 'name', 'Me') prompt = r"""Erstelle Silver Bullets für {} gegen: {} Antworte JSON.""" schema = {"type": "object", "properties": {"silver_bullets": {"type": "array", "items": {"type": "object", "properties": {"competitor_name": {"type": "string"}, "statement": {"type": "string"}}, "required": ['competitor_name', 'statement']}}}, "required": ['silver_bullets']} return await call_gemini_robustly(prompt.format(my_name, '\n'.join(lines)), schema) @app.post("/api/fetchStep6Data_Conclusion") async def fetch_step6_data_conclusion(request: Any): return await call_gemini_robustly(r"Erstelle Fazit der Analyse. Antworte JSON.", {{}}) @app.post("/api/fetchStep7Data_Battlecards") async def fetch_step7_data_battlecards(request: Any): return await call_gemini_robustly(r"Erstelle Sales Battlecards. Antworte JSON.", {{}}) @app.post("/api/fetchStep8Data_ReferenceAnalysis") async def fetch_step8_data_reference_analysis(request: Any): return await call_gemini_robustly(r"Finde Referenzkunden. Antworte JSON.", {{}}) # Static Files dist_path = os.path.join(os.getcwd(), "dist") if os.path.exists(dist_path): app.mount("/", StaticFiles(directory=dist_path, html=True), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)