Files
Brancheneinstufung2/dealfront_enrichment.py

165 lines
6.1 KiB
Python

#!/usr/bin/env python3
import os
import sys
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# ───────────────────────────────────────────────────────────────────────────────
# Konstanten
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management"
CREDENTIALS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
# ───────────────────────────────────────────────────────────────────────────────
# Logging konfigurieren
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
logging.getLogger("selenium").setLevel(logging.WARNING)
def load_credentials(path):
if not os.path.isfile(path):
logger.error(f"Credentials-Datei nicht gefunden: {path}")
sys.exit(1)
with open(path, encoding="utf-8") as f:
creds = json.load(f)
user = creds.get("username")
pwd = creds.get("password")
if not user or not pwd:
logger.error("Credentials-Datei enthält keinen username/password.")
sys.exit(1)
return user, pwd
class DealfrontScraper:
def __init__(self, driver, wait, username, password):
self.driver = driver
self.wait = wait
self.username = username
self.password = password
def login_and_find_list(self, search_name):
# Login-Seite öffnen
self.driver.get(LOGIN_URL)
# E-Mail/Username und Passwort eintragen
self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
))
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\
.send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\
.send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# Quick-Link "Prospects finden" anklicken
btn = self.wait.until(EC.element_to_be_clickable((
By.XPATH, "//a[normalize-space()='Prospects finden']"
)))
btn.click()
# Gewünschte Suche auswählen
btn = self.wait.until(EC.element_to_be_clickable((
By.XPATH, f"//a[normalize-space()='{search_name}']"
)))
btn.click()
# Auf erste Datenzeile warten
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first))
time.sleep(1)
def extract_current_page_results(self):
# Implicit-Wait kurz absenken
self.driver.implicitly_wait(1)
# Mindestens eine Tabellenzeile im DOM
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel)
results = []
for row in rows:
# Firmenname
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not ne:
continue
name = (ne[0].get_attribute("title") or ne[0].text).strip()
# Website aus 3. Spalte
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if we:
site = we[0].text.strip()
else:
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
site = td3[0].text.strip() if td3 else ""
results.append({"name": name, "website": site})
# Implicit-Wait zurücksetzen
self.driver.implicitly_wait(10)
return results
def click_next_page(self):
# Prev, Seiten, Next Buttons
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns:
return False
nxt = btns[-1]
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
return False
current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text
nxt.click()
self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current)
return True
def run(self, search_name):
self.login_and_find_list(search_name)
all_results = []
while True:
all_results.extend(self.extract_current_page_results())
if not self.click_next_page():
break
return all_results
def main():
username, password = load_credentials(CREDENTIALS_FILE)
# Chrome WebDriver initialisieren
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
service = Service(CHROMEDRIVER_PATH)
driver = webdriver.Chrome(service=service, options=opts)
wait = WebDriverWait(driver, 30)
scraper = DealfrontScraper(driver, wait, username, password)
results = scraper.run(TARGET_SEARCH_NAME)
# Ergebnisse speichern
os.makedirs(OUTPUT_DIR, exist_ok=True)
outfile = os.path.join(OUTPUT_DIR, "results.json")
with open(outfile, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(results)} Einträge in {outfile}")
driver.quit()
if __name__ == "__main__":
main()