Files
Brancheneinstufung2/company-explorer/backend/scripts/sync_notion_to_ce_enhanced.py

186 lines
6.4 KiB
Python

import sys
import os
import requests
import logging
# Setup Paths - Relative to script location in container
# /app/backend/scripts/sync.py -> /app
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from backend.database import SessionLocal, Industry, RoboticsCategory, init_db
from dotenv import load_dotenv
# Try loading from .env in root if exists
load_dotenv(dotenv_path="/app/.env")
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
NOTION_TOKEN = os.getenv("NOTION_API_KEY")
if not NOTION_TOKEN:
# Fallback to file if env missing (legacy way)
try:
with open("/app/notion_token.txt", "r") as f:
NOTION_TOKEN = f.read().strip()
except:
logger.error("NOTION_API_KEY missing in ENV and file!")
sys.exit(1)
HEADERS = {
"Authorization": f"Bearer {NOTION_TOKEN}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
def find_db_id(query_name):
url = "https://api.notion.com/v1/search"
payload = {"query": query_name, "filter": {"value": "database", "property": "object"}}
resp = requests.post(url, headers=HEADERS, json=payload)
if resp.status_code == 200:
results = resp.json().get("results", [])
if results:
return results[0]['id']
return None
def query_all(db_id):
url = f"https://api.notion.com/v1/databases/{db_id}/query"
results = []
has_more = True
next_cursor = None
while has_more:
payload = {}
if next_cursor: payload["start_cursor"] = next_cursor
resp = requests.post(url, headers=HEADERS, json=payload)
data = resp.json()
results.extend(data.get("results", []))
has_more = data.get("has_more", False)
next_cursor = data.get("next_cursor")
return results
def extract_rich_text(prop):
if not prop or "rich_text" not in prop: return ""
return "".join([t.get("plain_text", "") for t in prop.get("rich_text", [])])
def extract_title(prop):
if not prop or "title" not in prop: return ""
return "".join([t.get("plain_text", "") for t in prop.get("title", [])])
def extract_select(prop):
if not prop or "select" not in prop or not prop["select"]: return ""
return prop["select"]["name"]
def extract_number(prop):
if not prop or "number" not in prop: return None
return prop["number"]
def sync():
logger.info("--- Starting Enhanced Sync ---")
# 1. Init DB (ensure tables exist)
init_db()
session = SessionLocal()
# 2. Sync Categories (Products)
cat_db_id = find_db_id("Product Categories") or find_db_id("Products")
if cat_db_id:
logger.info(f"Syncing Products from {cat_db_id}...")
pages = query_all(cat_db_id)
for page in pages:
props = page["properties"]
name = extract_title(props.get("Name") or props.get("Product Name"))
if not name: continue
notion_id = page["id"]
key = name.lower().replace(" ", "_")
# Upsert
cat = session.query(RoboticsCategory).filter(RoboticsCategory.notion_id == notion_id).first()
if not cat:
cat = RoboticsCategory(notion_id=notion_id, key=key)
session.add(cat)
cat.name = name
cat.description = extract_rich_text(props.get("Description"))
session.commit()
else:
logger.warning("Product DB not found!")
# 3. Sync Industries
ind_db_id = find_db_id("Industries")
if ind_db_id:
logger.info(f"Syncing Industries from {ind_db_id}...")
# Clear existing
session.query(Industry).delete()
session.commit()
pages = query_all(ind_db_id)
count = 0
for page in pages:
props = page["properties"]
name = extract_title(props.get("Vertical"))
if not name: continue
ind = Industry(notion_id=page["id"], name=name)
session.add(ind)
# Map Fields
ind.description = extract_rich_text(props.get("Definition"))
ind.notes = extract_rich_text(props.get("Notes"))
ind.pains = extract_rich_text(props.get("Pains"))
ind.gains = extract_rich_text(props.get("Gains"))
# Metrics & Scraper Config (NEW)
ind.metric_type = extract_select(props.get("Metric Type"))
ind.min_requirement = extract_number(props.get("Min. Requirement"))
ind.whale_threshold = extract_number(props.get("Whale Threshold"))
ind.proxy_factor = extract_number(props.get("Proxy Factor"))
ind.scraper_search_term = extract_rich_text(props.get("Scraper Search Term"))
ind.scraper_keywords = extract_rich_text(props.get("Scraper Keywords"))
ind.standardization_logic = extract_rich_text(props.get("Standardization Logic"))
# Status / Priority
prio = extract_select(props.get("Priorität"))
if not prio: prio = extract_select(props.get("Freigegeben"))
ind.priority = prio
ind.status_notion = prio # Legacy
ind.is_focus = (prio == "Freigegeben")
# Ops Focus
if "Ops Focus: Secondary" in props:
ind.ops_focus_secondary = props["Ops Focus: Secondary"].get("checkbox", False)
# Relations
# Primary
rels_prim = props.get("Primary Product Category", {}).get("relation", [])
if rels_prim:
pid = rels_prim[0]["id"]
cat = session.query(RoboticsCategory).filter(RoboticsCategory.notion_id == pid).first()
if cat: ind.primary_category_id = cat.id
# Secondary
rels_sec = props.get("Secondary Product", {}).get("relation", [])
if rels_sec:
pid = rels_sec[0]["id"]
cat = session.query(RoboticsCategory).filter(RoboticsCategory.notion_id == pid).first()
if cat: ind.secondary_category_id = cat.id
count += 1
session.commit()
logger.info(f"✅ Synced {count} industries.")
else:
logger.error("Industries DB not found!")
session.close()
if __name__ == "__main__":
sync()