Files
Brancheneinstufung2/dealfront_enrichment.py

168 lines
6.2 KiB
Python

#!/usr/bin/env python3
import os
import sys
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# ────────────────────────────────────────────────────────────────
# Konstanten
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_TAB = "Target"
SEARCH_NAME = "Facility Management"
CREDS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s"
# ────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
logging.getLogger("selenium").setLevel(logging.WARNING)
def load_creds(path):
if not os.path.exists(path):
logger.error("Credentials-Datei nicht gefunden: %s", path)
sys.exit(1)
creds = json.load(open(path, encoding="utf-8"))
u, p = creds.get("username"), creds.get("password")
if not u or not p:
logger.error("username/password fehlen in %s", path)
sys.exit(1)
return u, p
class DealfrontScraper:
def __init__(self, driver, wait, user, pwd):
self.driver = driver
self.wait = wait
self.user = user
self.pwd = pwd
def login_and_select_search(self):
# 1) Login-Seite aufrufen
self.driver.get(LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[type='email'],input[type='text']")))
# 2) Credentials eintragen
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'],input[type='text']").send_keys(self.user)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 3) Auf Target-Tab klicken
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_TAB))).click()
# 4) Sidebar mit Such-List laden
sidebar_sel = "ul[data-userpilot-id='sidebar-searches-list']"
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, sidebar_sel)))
# 5) Deine Suche anklicken (div[title=…])
div_sel = f"div[title='{SEARCH_NAME}']"
el = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, div_sel)))
self.driver.execute_script("arguments[0].click()", el)
# 6) Kurzes Warten, bis erste Zeile da ist
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "a.t-highlight-text.t-highlight-text-snippet")))
time.sleep(1)
def extract_current_page_results(self):
# kurzer Implicit-Wait für schnelles Fallback
self.driver.implicitly_wait(1)
# auf ≥1 Zeile warten
rows = self.wait.until(EC.presence_of_all_elements_located((
By.CSS_SELECTOR, "table#t-result-table tbody tr[id]"
)))
data = []
for row in rows:
# Name-Element
ne = row.find_elements(By.CSS_SELECTOR, "a.t-highlight-text.t-highlight-text-snippet")
if not ne:
continue
name = (ne[0].get_attribute("title") or ne[0].text).strip()
# Website-Element
we = row.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
if we:
site = we[0].get_attribute("href").split("://")[-1].rstrip("/")
else:
# Fallback: Zellen-Text
txt = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
site = txt[0].text.strip() if txt else ""
data.append({"name": name, "website": site})
# Implicit-Wait zurücksetzen
self.driver.implicitly_wait(10)
logger.info(f" Extrahiert: {len(data)} Zeilen")
return data
def click_next_page(self):
# Paginator-Buttons greifen
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns:
return False
nxt = btns[-1]
# Ende erreicht?
if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"):
return False
current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text
nxt.click()
# auf Seitenwechsel warten
self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current)
return True
def run(self):
logger.info("Starte Login und Sucheauswahl…")
self.login_and_select_search()
all_res = []
page = 1
while True:
logger.info(f"Seite {page}: Extrahiere Daten…")
all_res.extend(self.extract_current_page_results())
if not self.click_next_page():
break
page += 1
return all_res
def main():
user, pwd = load_creds(CREDS_FILE)
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
service = Service(CHROMEDRIVER_PATH)
driver = webdriver.Chrome(service=service, options=opts)
wait = WebDriverWait(driver, 30)
try:
scraper = DealfrontScraper(driver, wait, user, pwd)
results = scraper.run()
finally:
driver.quit()
os.makedirs(OUTPUT_DIR, exist_ok=True)
path = os.path.join(OUTPUT_DIR, "results.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info(f"✅ Fertig: {len(results)} Einträge in {path}")
if __name__ == "__main__":
main()