- Bereinigt alle Dockerfiles (market, content, b2b) von COPY gemini_api_key.txt. - Aktualisiert market_intel_orchestrator.py und b2b_marketing_orchestrator.py, um API-Keys strikt aus Umgebungsvariablen zu lesen. - Verhindert Build-Fehler durch fehlende lokale Token-Dateien.
674 lines
30 KiB
Python
674 lines
30 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import sys # Import sys for stderr
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import logging
|
|
from datetime import datetime
|
|
import re # Für Regex-Operationen
|
|
|
|
# --- AUTARKES LOGGING SETUP --- #
|
|
def create_self_contained_log_filename(mode):
|
|
"""
|
|
Erstellt einen zeitgestempelten Logdateinamen für den Orchestrator.
|
|
Verwendet ein festes Log-Verzeichnis innerhalb des Docker-Containers.
|
|
NEU: Nur eine Datei pro Tag, um Log-Spam zu verhindern.
|
|
"""
|
|
log_dir_path = "/app/Log" # Festes Verzeichnis im Container
|
|
if not os.path.exists(log_dir_path):
|
|
os.makedirs(log_dir_path, exist_ok=True)
|
|
|
|
# Nur Datum verwenden, nicht Uhrzeit, damit alle Runs des Tages in einer Datei landen
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
filename = f"{date_str}_market_intel.log"
|
|
return os.path.join(log_dir_path, filename)
|
|
|
|
log_filename = create_self_contained_log_filename("market_intel_orchestrator")
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='[%(asctime)s] %(levelname)s [%(funcName)s]: %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S',
|
|
handlers=[
|
|
logging.FileHandler(log_filename, mode='a', encoding='utf-8'),
|
|
logging.StreamHandler(sys.stderr)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
# --- END AUTARKES LOGGING SETUP --- #
|
|
|
|
def load_gemini_api_key(file_path=None):
|
|
"""Loads the Gemini API Key from environment variables."""
|
|
api_key = os.getenv("GEMINI_API_KEY")
|
|
if not api_key:
|
|
logger.critical("Gemini API Key not found in environment variables.")
|
|
# Fallback for local dev if absolutely necessary, but prefer env
|
|
if file_path and os.path.exists(file_path):
|
|
with open(file_path, "r") as f: return f.read().strip()
|
|
raise ValueError("GEMINI_API_KEY not set")
|
|
return api_key
|
|
|
|
def load_serp_api_key(file_path=None):
|
|
"""Loads the SerpAPI Key from environment variables."""
|
|
api_key = os.getenv("SERP_API_KEY")
|
|
if not api_key:
|
|
logger.warning("SerpAPI Key not found in environment variables.")
|
|
return None
|
|
return api_key
|
|
|
|
def get_website_text(url):
|
|
# Auto-fix missing scheme
|
|
if url and not url.startswith('http'):
|
|
url = 'https://' + url
|
|
|
|
logger.info(f"Scraping URL: {url}")
|
|
try:
|
|
# Use a more realistic, modern User-Agent to avoid blocking
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
|
|
'Referer': 'https://www.google.com/'
|
|
}
|
|
response = requests.get(url, headers=headers, timeout=15) # Increased timeout
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
|
|
tag.decompose()
|
|
text = soup.get_text(separator=' ', strip=True)
|
|
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
|
|
return text[:15000] # Increased limit
|
|
except Exception as e:
|
|
logger.error(f"Scraping failed for {url}: {e}")
|
|
return None
|
|
|
|
def serp_search(query, num_results=3):
|
|
"""Führt eine Google-Suche über SerpAPI durch."""
|
|
api_key = load_serp_api_key()
|
|
if not api_key:
|
|
logger.warning("SerpAPI Key fehlt. Suche übersprungen.")
|
|
return []
|
|
|
|
logger.info(f"SerpAPI Suche: {query}")
|
|
try:
|
|
params = {
|
|
"engine": "google",
|
|
"q": query,
|
|
"api_key": api_key,
|
|
"num": num_results,
|
|
"hl": "de",
|
|
"gl": "de"
|
|
}
|
|
response = requests.get("https://serpapi.com/search", params=params, timeout=20)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
results = []
|
|
if "organic_results" in data:
|
|
for result in data["organic_results"]:
|
|
results.append({
|
|
"title": result.get("title"),
|
|
"link": result.get("link"),
|
|
"snippet": result.get("snippet")
|
|
})
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"SerpAPI Fehler: {e}")
|
|
return []
|
|
|
|
def _extract_target_industries_from_context(context_content):
|
|
md = context_content
|
|
# Versuche verschiedene Muster für die Tabelle, falls das Format variiert
|
|
step2_match = re.search(r'##\s*Schritt\s*2:[\s\S]*?(?=\n##\s*Schritt\s*\d:|\s*$)', md, re.IGNORECASE)
|
|
if not step2_match:
|
|
# Fallback: Suche nach "Zielbranche" irgendwo im Text
|
|
match = re.search(r'Zielbranche\s*\|?\s*([^|\n]+)', md, re.IGNORECASE)
|
|
if match:
|
|
return [s.strip() for s in match.group(1).split(',')]
|
|
return []
|
|
|
|
table_lines = []
|
|
in_table = False
|
|
for line in step2_match.group(0).split('\n'):
|
|
if line.strip().startswith('|'):
|
|
in_table = True
|
|
table_lines.append(line.strip())
|
|
elif in_table:
|
|
break
|
|
|
|
if len(table_lines) < 3: return []
|
|
header = [s.strip() for s in table_lines[0].split('|') if s.strip()]
|
|
industry_col = next((h for h in header if re.search(r'zielbranche|segment|branche|industrie', h, re.IGNORECASE)), None)
|
|
if not industry_col: return []
|
|
|
|
col_idx = header.index(industry_col)
|
|
industries = []
|
|
for line in table_lines[2:]:
|
|
cells = [s.strip() for s in line.split('|') if s.strip()]
|
|
if len(cells) > col_idx: industries.append(cells[col_idx])
|
|
return list(set(industries))
|
|
|
|
def _extract_json_from_text(text):
|
|
"""
|
|
Versucht, ein JSON-Objekt aus einem Textstring zu extrahieren,
|
|
unabhängig von Markdown-Formatierung (```json ... ```).
|
|
"""
|
|
try:
|
|
# 1. Versuch: Direktersatz von Markdown-Tags (falls vorhanden)
|
|
clean_text = text.replace("```json", "").replace("```", "").strip()
|
|
return json.loads(clean_text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
try:
|
|
# 2. Versuch: Regex Suche nach dem ersten { und letzten }
|
|
json_match = re.search(r"(\{[\s\S]*\})", text)
|
|
if json_match:
|
|
return json.loads(json_match.group(1))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
logger.error(f"JSON Parsing fehlgeschlagen. Roher Text: {text[:500]}...")
|
|
return None
|
|
|
|
def generate_search_strategy(reference_url, context_content, language='de'):
|
|
logger.info(f"Generating strategy for {reference_url} (Language: {language})")
|
|
api_key = load_gemini_api_key()
|
|
target_industries = _extract_target_industries_from_context(context_content)
|
|
|
|
homepage_text = get_website_text(reference_url)
|
|
if not homepage_text:
|
|
logger.warning(f"Strategy Generation: Could not scrape {reference_url}. Relying on context.")
|
|
homepage_text = "[WEBSITE ACCESS DENIED] - The strategy must be developed based on the provided STRATEGIC CONTEXT and the URL name alone."
|
|
|
|
# Switch to stable 2.5-pro model (which works for v1beta)
|
|
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
|
|
|
|
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
|
|
|
|
prompt = f"""
|
|
You are a B2B Market Intelligence Architect.
|
|
|
|
--- ROLE DEFINITION ---
|
|
You are working for the company described in the "STRATEGIC CONTEXT" below (The "Hunter").
|
|
Your goal is to find new potential customers who look exactly like the "REFERENCE CLIENT" described below (The "Seed" / "Prey").
|
|
|
|
--- STRATEGIC CONTEXT (YOUR COMPANY / THE OFFER) ---
|
|
{context_content}
|
|
|
|
--- REFERENCE CLIENT HOMEPAGE (THE IDEAL CUSTOMER TO CLONE) ---
|
|
URL: {reference_url}
|
|
CONTENT: {homepage_text[:10000]}
|
|
|
|
--- TASK ---
|
|
Develop a search strategy to find **Lookalikes of the Reference Client** who would be interested in **Your Company's Offer**.
|
|
|
|
1. **summaryOfOffer**: A 1-sentence summary of what the **REFERENCE CLIENT** does (NOT what your company does). We need this to search for similar companies.
|
|
2. **idealCustomerProfile**: A concise definition of the Ideal Customer Profile (ICP) based on the Reference Client's characteristics.
|
|
3. **searchStrategyICP**: A detailed description of the Ideal Customer Profile (ICP) based on the analysis.
|
|
4. **digitalSignals**: Identification and description of relevant digital signals that indicate purchase interest or engagement for YOUR offer.
|
|
5. **targetPages**: A list of the most important target pages on the company website relevant for marketing and sales activities.
|
|
6. **signals**: Identify exactly 4 specific digital signals to check on potential lookalikes.
|
|
- **CRITICAL**: One signal MUST be "Technographic / Incumbent Search". It must look for existing competitor software or legacy systems that **YOUR COMPANY'S OFFER** replaces or complements.
|
|
- The other 3 signals should focus on business pains or strategic fit.
|
|
|
|
--- SIGNAL DEFINITION ---
|
|
For EACH signal, you MUST provide:
|
|
- `id`: A unique ID (e.g., "sig_1").
|
|
- `name`: A short, descriptive name.
|
|
- `description`: What does this signal indicate?
|
|
- `targetPageKeywords`: A list of 3-5 keywords to look for on a company's website (e.g., ["career", "jobs"] for a hiring signal).
|
|
- `proofStrategy`: An object containing:
|
|
- `likelySource`: Where on the website or web is this info found? (e.g., "Careers Page").
|
|
- `searchQueryTemplate`: A Google search query to find this. Use `{{COMPANY}}` as a placeholder for the company name.
|
|
Example: `site:{{COMPANY}} "software engineer" OR "developer"`
|
|
|
|
--- LANGUAGE INSTRUCTION ---
|
|
IMPORTANT: The entire JSON content (descriptions, rationale, summaries) MUST be in {lang_instruction}. Translate if necessary.
|
|
|
|
--- OUTPUT FORMAT ---
|
|
Return ONLY a valid JSON object.
|
|
{{
|
|
"summaryOfOffer": "The Reference Client provides...",
|
|
"idealCustomerProfile": "...",
|
|
"searchStrategyICP": "...",
|
|
"digitalSignals": "...",
|
|
"targetPages": "...",
|
|
"signals": [ ... ]
|
|
}}
|
|
"""
|
|
|
|
payload = {"contents": [{"parts": [{"text": prompt}]}]}
|
|
logger.info("Sende Anfrage an Gemini API...")
|
|
try:
|
|
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
|
|
response.raise_for_status()
|
|
res_json = response.json()
|
|
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
|
|
|
|
text = res_json['candidates'][0]['content']['parts'][0]['text']
|
|
|
|
# DEBUG LOGGING FOR RAW JSON
|
|
logger.error(f"RAW GEMINI JSON RESPONSE: {text}")
|
|
|
|
result = _extract_json_from_text(text)
|
|
|
|
if not result:
|
|
raise ValueError("Konnte kein valides JSON extrahieren")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Strategy generation failed: {e}")
|
|
# Return fallback to avoid frontend crash
|
|
return {
|
|
"summaryOfOffer": "Error generating strategy. Please check logs.",
|
|
"idealCustomerProfile": "Error generating ICP. Please check logs.",
|
|
"searchStrategyICP": "Error generating Search Strategy ICP. Please check logs.",
|
|
"digitalSignals": "Error generating Digital Signals. Please check logs.",
|
|
"targetPages": "Error generating Target Pages. Please check logs.",
|
|
"signals": []
|
|
}
|
|
|
|
def identify_competitors(reference_url, target_market, industries, summary_of_offer=None, language='de'):
|
|
logger.info(f"Identifying competitors for {reference_url} (Language: {language})")
|
|
api_key = load_gemini_api_key()
|
|
# Switch to stable 2.5-pro model
|
|
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
|
|
|
|
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
|
|
|
|
prompt = f"""
|
|
You are a B2B Market Analyst. Find 3-5 direct competitors or highly similar companies (lookalikes) for the company at `{reference_url}`.
|
|
|
|
--- CONTEXT ---
|
|
- Reference Client Business (What they do): {summary_of_offer}
|
|
- Target Market: {target_market}
|
|
- Relevant Industries: {', '.join(industries)}
|
|
|
|
--- TASK ---
|
|
Identify companies that are **similar to the Reference Client** (i.e., Lookalikes).
|
|
We are looking for other companies that do the same thing as `{reference_url}`.
|
|
|
|
Categorize them into three groups:
|
|
1. 'localCompetitors': Competitors in the same immediate region/city.
|
|
2. 'nationalCompetitors': Competitors operating across the same country.
|
|
3. 'internationalCompetitors': Global players.
|
|
|
|
For EACH competitor, you MUST provide:
|
|
- `id`: A unique, URL-friendly identifier (e.g., "competitor-name-gmbh").
|
|
- `name`: The official, full name of the company.
|
|
- `description`: A concise explanation of why they are a competitor.
|
|
|
|
--- LANGUAGE INSTRUCTION ---
|
|
IMPORTANT: The entire JSON content (descriptions) MUST be in {lang_instruction}.
|
|
|
|
--- OUTPUT FORMAT ---
|
|
Return ONLY a valid JSON object with the following structure:
|
|
{{
|
|
"localCompetitors": [ {{ "id": "...", "name": "...", "description": "..." }} ],
|
|
"nationalCompetitors": [ ... ],
|
|
"internationalCompetitors": [ ... ]
|
|
}}
|
|
"""
|
|
|
|
payload = {"contents": [{"parts": [{"text": prompt}]}]}
|
|
logger.info("Sende Anfrage an Gemini API...")
|
|
# logger.debug(f"Rohe Gemini API-Anfrage (JSON): {json.dumps(payload, indent=2)}")
|
|
try:
|
|
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
|
|
response.raise_for_status()
|
|
res_json = response.json()
|
|
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
|
|
|
|
text = res_json['candidates'][0]['content']['parts'][0]['text']
|
|
result = _extract_json_from_text(text)
|
|
|
|
if not result:
|
|
raise ValueError("Konnte kein valides JSON extrahieren")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Competitor identification failed: {e}")
|
|
return {"localCompetitors": [], "nationalCompetitors": [], "internationalCompetitors": []}
|
|
|
|
def analyze_company(company_name, strategy, target_market, language='de'):
|
|
logger.info(f"--- STARTING DEEP TECH AUDIT FOR: {company_name} (Language: {language}) ---")
|
|
api_key = load_gemini_api_key()
|
|
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
|
|
|
|
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
|
|
|
|
# ... (Rest of function logic remains same, just update prompt) ...
|
|
# 1. Website Finding (SerpAPI fallback to Gemini)
|
|
url = None
|
|
website_search_results = serp_search(f"{company_name} offizielle Website")
|
|
if website_search_results:
|
|
url = website_search_results[0].get("link")
|
|
logger.info(f"Website via SerpAPI gefunden: {url}")
|
|
|
|
if not url:
|
|
# Fallback: Frage Gemini (Low Confidence)
|
|
logger.info("Keine URL via SerpAPI, frage Gemini...")
|
|
prompt_url = f"What is the official homepage URL for the company '{company_name}' in the market '{target_market}'? Respond with ONLY the single, complete URL and nothing else."
|
|
payload_url = {"contents": [{"parts": [{"text": prompt_url}]}]}
|
|
logger.info("Sende Anfrage an Gemini API (URL Fallback)...")
|
|
try:
|
|
res = requests.post(GEMINI_API_URL, json=payload_url, headers={'Content-Type': 'application/json'}, timeout=15)
|
|
res.raise_for_status()
|
|
res_json = res.json()
|
|
candidate = res_json.get('candidates', [{}])[0]
|
|
content = candidate.get('content', {}).get('parts', [{}])[0]
|
|
text_response = content.get('text', '').strip()
|
|
url_match = re.search(r'(https?://[^\s"]+)', text_response)
|
|
if url_match:
|
|
url = url_match.group(1)
|
|
except Exception as e:
|
|
logger.error(f"Gemini URL Fallback failed: {e}")
|
|
pass
|
|
|
|
if not url or not url.startswith("http"):
|
|
return {"error": f"Could not find website for {company_name}"}
|
|
|
|
homepage_text = ""
|
|
scraping_note = ""
|
|
|
|
if url and url.startswith("http"):
|
|
scraped_content = get_website_text(url)
|
|
if scraped_content:
|
|
homepage_text = scraped_content
|
|
else:
|
|
homepage_text = "[WEBSITE ACCESS DENIED]"
|
|
scraping_note = "(Website Content Unavailable)"
|
|
else:
|
|
homepage_text = "No valid URL found."
|
|
scraping_note = "(No URL found)"
|
|
|
|
tech_evidence = []
|
|
|
|
# NEU: Dynamische Suche basierend auf Strategie statt Hardcoded Liste
|
|
# Wir suchen NICHT mehr proaktiv nach SAP Ariba, es sei denn, es steht in der Strategie.
|
|
# Stattdessen machen wir eine generische "Tech Stack"-Suche.
|
|
tech_queries = [
|
|
f'site:{url.split("//")[-1].split("/")[0] if url and "//" in url else company_name} "software" OR "technology" OR "system"',
|
|
f'"{company_name}" "technology stack"',
|
|
f'"{company_name}" "partners"'
|
|
]
|
|
|
|
# Add explicit tech signals from strategy if they exist
|
|
signals = strategy.get('signals', [])
|
|
for signal in signals:
|
|
if "technographic" in signal.get('id', '').lower() or "incumbent" in signal.get('id', '').lower():
|
|
keywords = signal.get('targetPageKeywords', [])
|
|
for kw in keywords:
|
|
tech_queries.append(f'"{company_name}" "{kw}"')
|
|
|
|
# Deduplicate queries and limit
|
|
tech_queries = list(set(tech_queries))[:4]
|
|
|
|
for q in tech_queries:
|
|
results = serp_search(q, num_results=3)
|
|
if results:
|
|
for r in results:
|
|
tech_evidence.append(f"- Found: {r['title']}\n Snippet: {r['snippet']}\n Link: {r['link']}")
|
|
|
|
tech_evidence_text = "\n".join(tech_evidence)
|
|
signal_evidence = []
|
|
firmographics_results = serp_search(f"{company_name} Umsatz Mitarbeiterzahl 2023")
|
|
firmographics_context = "\n".join([f"- {r['snippet']} ({r['link']})" for r in firmographics_results])
|
|
|
|
for signal in signals:
|
|
# Skip technographic signals here as they are handled above or via generic search
|
|
if "incumbent" in signal['id'].lower() or "technographic" in signal['id'].lower(): continue
|
|
|
|
proof_strategy = signal.get('proofStrategy', {})
|
|
query_template = proof_strategy.get('searchQueryTemplate')
|
|
search_context = ""
|
|
if query_template:
|
|
try:
|
|
domain = url.split("//")[-1].split("/")[0].replace("www.", "")
|
|
except:
|
|
domain = ""
|
|
query = query_template.replace("{{COMPANY}}", company_name).replace("{COMPANY}", company_name).replace("{{domain}}", domain).replace("{domain}", domain)
|
|
results = serp_search(query, num_results=3)
|
|
if results:
|
|
search_context = "\n".join([f" * Snippet: {r['snippet']}\n Source: {r['link']}" for r in results])
|
|
if search_context:
|
|
signal_evidence.append(f"SIGNAL '{signal['name']}':\n{search_context}")
|
|
|
|
evidence_text = "\n\n".join(signal_evidence)
|
|
|
|
prompt = f"""
|
|
You are a Strategic B2B Sales Consultant.
|
|
Analyze the company '{company_name}' ({url}) to create a "best-of-breed" sales pitch strategy.
|
|
|
|
--- STRATEGY (What we are looking for) ---
|
|
{json.dumps(signals, indent=2)}
|
|
|
|
--- EVIDENCE 1: EXTERNAL TECH-STACK INTELLIGENCE ---
|
|
Analyze the search results below. Do NOT hallucinate technologies. Only list what is explicitly found.
|
|
{tech_evidence_text}
|
|
|
|
--- EVIDENCE 2: HOMEPAGE CONTENT {scraping_note} ---
|
|
{homepage_text[:8000]}
|
|
|
|
--- EVIDENCE 3: FIRMOGRAPHICS SEARCH ---
|
|
{firmographics_context}
|
|
|
|
--- EVIDENCE 4: TARGETED SIGNAL SEARCH RESULTS ---
|
|
{evidence_text}
|
|
----------------------------------
|
|
|
|
TASK:
|
|
1. **Firmographics**: Estimate Revenue and Employees.
|
|
2. **Technographic Audit**: Check if any relevant competitor technology or legacy system is ACTUALLY found in the evidence.
|
|
- **CRITICAL:** If no specific competitor software is found, assume the status is "Greenfield" (Manual Process / Status Quo). Do NOT invent a competitor like SAP Ariba just because it's a common tool.
|
|
3. **Status**:
|
|
- Set to "Nutzt Wettbewerber" ONLY if a direct competitor is explicitly found.
|
|
- Set to "Greenfield" if no competitor tech is found.
|
|
- Set to "Bestandskunde" if they already use our solution.
|
|
4. **Evaluate Signals**: For each signal, provide a "value" (Yes/No/Partial) and "proof".
|
|
5. **Recommendation (Pitch Strategy)**:
|
|
- If Greenfield: Pitch against the manual status quo (efficiency, error reduction).
|
|
- If Competitor: Pitch replacement/upgrade.
|
|
- **Tone**: Strategic, insider-knowledge, specific.
|
|
|
|
--- LANGUAGE INSTRUCTION ---
|
|
IMPORTANT: The entire JSON content (especially 'recommendation', 'proof', 'value') MUST be in {lang_instruction}.
|
|
|
|
STRICTLY output only JSON:
|
|
{{
|
|
"companyName": "{company_name}",
|
|
"status": "...",
|
|
"revenue": "...",
|
|
"employees": "...",
|
|
"tier": "Tier 1/2/3",
|
|
"dynamicAnalysis": {{
|
|
"sig_id_from_strategy": {{ "value": "...", "proof": "..." }}
|
|
}},
|
|
"recommendation": "..."
|
|
}}
|
|
"""
|
|
|
|
payload = {
|
|
"contents": [{"parts": [{"text": prompt}]}],
|
|
"generationConfig": {"response_mime_type": "application/json"}
|
|
}
|
|
|
|
try:
|
|
logger.info("Sende Audit-Anfrage an Gemini API...")
|
|
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
|
|
|
|
text = response_data['candidates'][0]['content']['parts'][0]['text']
|
|
result = _extract_json_from_text(text)
|
|
|
|
if not result:
|
|
raise ValueError("Konnte kein valides JSON extrahieren")
|
|
|
|
result['dataSource'] = "Digital Trace Audit (Deep Dive)"
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Audit failed for {company_name}: {e}")
|
|
return {
|
|
"companyName": company_name,
|
|
"status": "Unklar",
|
|
"revenue": "Error",
|
|
"employees": "Error",
|
|
"tier": "Tier 3",
|
|
"dynamicAnalysis": {},
|
|
"recommendation": f"Audit failed: {str(e)}",
|
|
"dataSource": "Error"
|
|
}
|
|
|
|
def generate_outreach_campaign(company_data_json, knowledge_base_content, reference_url, specific_role=None, language='de'):
|
|
"""
|
|
Erstellt personalisierte E-Mail-Kampagnen.
|
|
"""
|
|
company_name = company_data_json.get('companyName', 'Unknown')
|
|
logger.info(f"--- STARTING OUTREACH GENERATION FOR: {company_name} (Role: {specific_role if specific_role else 'Top 5'}) [Lang: {language}] ---")
|
|
|
|
api_key = load_gemini_api_key()
|
|
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro:generateContent?key={api_key}"
|
|
|
|
lang_instruction = "GERMAN (Deutsch)" if language == 'de' else "ENGLISH"
|
|
|
|
if specific_role:
|
|
# --- MODE B: SINGLE ROLE GENERATION (On Demand) ---
|
|
task_description = f"""
|
|
--- TASK ---
|
|
1. **Focus**: Create a highly specific 3-step email campaign ONLY for the role: '{specific_role}'.
|
|
2. **Analyze**: Use the Audit Facts to find specific hooks for this role.
|
|
3. **Draft**: Write the sequence (Opening, Follow-up, Break-up).
|
|
"""
|
|
output_format = """
|
|
--- OUTPUT FORMAT (Strictly JSON) ---
|
|
{
|
|
"target_role": "The requested role",
|
|
"rationale": "Why this fits...",
|
|
"emails": [ ... ]
|
|
}
|
|
"""
|
|
else:
|
|
# --- MODE A: INITIAL START (TOP 1 + SUGGESTIONS) ---
|
|
task_description = f"""
|
|
--- TASK ---
|
|
1. **Analyze**: Match the Target Company (Input 2) to the most relevant 'Zielbranche/Segment' from the Knowledge Base (Input 1).
|
|
2. **Identify Roles**: Identify ALL relevant 'Rollen' (Personas) from the Knowledge Base that fit this company.
|
|
3. **Select Best**: Choose the SINGLE most promising role for immediate outreach based on the Audit findings.
|
|
4. **Draft Campaign**: Write a 3-step email sequence for this ONE role.
|
|
5. **List Others**: List ALL other relevant roles (including the other top candidates) in 'available_roles' so the user can generate them later.
|
|
"""
|
|
output_format = """
|
|
--- OUTPUT FORMAT (Strictly JSON) ---
|
|
{
|
|
"campaigns": [
|
|
{
|
|
"target_role": "Role Name",
|
|
"rationale": "Why selected...",
|
|
"emails": [ ... ]
|
|
}
|
|
],
|
|
"available_roles": [ "Role 2", "Role 3", "Role 4", "Role 5", ... ]
|
|
}
|
|
"""
|
|
|
|
prompt = f"""
|
|
You are a Strategic Key Account Manager and deeply technical Industry Insider.
|
|
Your goal is to write highly personalized, **operationally specific** outreach emails to the company '{company_name}'.
|
|
|
|
--- INPUT 1: YOUR IDENTITY & STRATEGY (The Sender) ---
|
|
{knowledge_base_content}
|
|
|
|
--- INPUT 2: THE TARGET COMPANY (Audit Facts) ---
|
|
{json.dumps(company_data_json, indent=2)}
|
|
|
|
--- INPUT 3: THE REFERENCE CLIENT (Social Proof) ---
|
|
Reference Client URL: {reference_url}
|
|
|
|
CRITICAL: This 'Reference Client' is an existing happy customer of ours. You MUST mention them by name to establish trust.
|
|
|
|
{task_description}
|
|
|
|
--- TONE & STYLE GUIDELINES (CRITICAL) ---
|
|
1. **Professional & Flowing:** Aim for approx. 500-600 characters per email. Use full sentences and professional courtesies. It should feel like a high-quality human message.
|
|
2. **Stance:** Act as an **astute industry observer** and peer consultant. You have analyzed their specific situation and identified a strategic bottleneck.
|
|
3. **The Opportunity Bridge (Email 1):** Bridge observation to a strategic solution immediately using concrete terms (e.g., "autonome Reinigungsrobotik").
|
|
4. **Context-Sensitive Technographics:** Only mention discovered IT or Procurement systems (e.g., SAP Ariba) if it is highly relevant to the **specific role** (e.g., for CEO, CFO, or Head of Procurement). For **purely operational roles** (e.g., Facility Manager, Head of Operations), AVOID mentioning these systems as it may cause confusion; focus entirely on the operational pain (labor shortage) and growth bottlenecks instead.
|
|
5. **Soft-Sell vs. Hard-Pitch:** Position technology as a logical answer to the bottleneck. Pitch the **outcome/capability**, not features.
|
|
6. **Social Proof as the Engine:** Let the Reference Client ({reference_url}) provide the evidence. Use a role-specific KPI.
|
|
7. **Operational Grit:** Use domain-specific terms (e.g., "ASNs", "8D", "TCO") to establish authority.
|
|
8. **Language:** {lang_instruction}.
|
|
|
|
{output_format}
|
|
"""
|
|
|
|
payload = {
|
|
"contents": [{"parts": [{"text": prompt}]}],
|
|
"generationConfig": {"response_mime_type": "application/json"}
|
|
}
|
|
|
|
try:
|
|
logger.info("Sende Campaign-Anfrage an Gemini API...")
|
|
response = requests.post(GEMINI_API_URL, json=payload, headers={'Content-Type': 'application/json'})
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
logger.info(f"Gemini API-Antwort erhalten (Status: {response.status_code}).")
|
|
|
|
text = response_data['candidates'][0]['content']['parts'][0]['text']
|
|
result = _extract_json_from_text(text)
|
|
|
|
if not result:
|
|
raise ValueError("Konnte kein valides JSON extrahieren")
|
|
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Campaign generation failed for {company_name}: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--mode", required=True)
|
|
parser.add_argument("--reference_url")
|
|
parser.add_argument("--context_file")
|
|
parser.add_argument("--target_market")
|
|
parser.add_argument("--company_name")
|
|
parser.add_argument("--strategy_json")
|
|
parser.add_argument("--summary_of_offer")
|
|
parser.add_argument("--company_data_file")
|
|
parser.add_argument("--specific_role")
|
|
parser.add_argument("--language", default="de") # New Argument
|
|
args = parser.parse_args()
|
|
|
|
if args.mode == "generate_strategy":
|
|
with open(args.context_file, "r") as f: context = f.read()
|
|
print(json.dumps(generate_search_strategy(args.reference_url, context, args.language)))
|
|
elif args.mode == "identify_competitors":
|
|
industries = []
|
|
if args.context_file:
|
|
with open(args.context_file, "r") as f: context = f.read()
|
|
industries = _extract_target_industries_from_context(context)
|
|
print(json.dumps(identify_competitors(args.reference_url, args.target_market, industries, args.summary_of_offer, args.language)))
|
|
elif args.mode == "analyze_company":
|
|
strategy = json.loads(args.strategy_json)
|
|
print(json.dumps(analyze_company(args.company_name, strategy, args.target_market, args.language)))
|
|
elif args.mode == "generate_outreach":
|
|
with open(args.company_data_file, "r") as f: company_data = json.load(f)
|
|
with open(args.context_file, "r") as f: knowledge_base = f.read()
|
|
print(json.dumps(generate_outreach_campaign(company_data, knowledge_base, args.reference_url, args.specific_role, args.language)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.stdout.reconfigure(encoding='utf-8')
|
|
try:
|
|
main()
|
|
sys.stdout.flush()
|
|
except Exception as e:
|
|
logger.critical(f"Unhandled Exception in Main: {e}", exc_info=True)
|
|
# Fallback JSON output so the server doesn't crash on parse error
|
|
error_json = json.dumps({"error": f"Critical Script Error: {str(e)}", "details": "Check market_intel.log"})
|
|
print(error_json)
|
|
sys.exit(1) |