Files
Brancheneinstufung2/dealfront_enrichment.py

144 lines
5.6 KiB
Python

#!/usr/bin/env python3
import os
import json
import time
import sys
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def load_credentials(path):
try:
with open(path, encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Fehler beim Laden der Credentials: {e}", file=sys.stderr)
sys.exit(1)
class DealfrontScraper:
def __init__(self, driver, wait, username, password):
self.driver = driver
self.wait = wait
self.username = username
self.password = password
def login_and_find_list(self, search_name):
# 1) Login-Seite aufrufen
self.driver.get("https://app.dealfront.com/login")
# 2) Auf E-Mail-/Username-Feld warten, dann befüllen
self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
))
email_in = self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
pwd_in = self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")
email_in.clear(); email_in.send_keys(self.username)
pwd_in.clear(); pwd_in.send_keys(self.password)
# 3) Absenden
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 4) Auf den Quick-Link "Prospects finden" warten und klicken
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "Prospects finden")))
self.driver.find_element(By.LINK_TEXT, "Prospects finden").click()
# 5) Auf die Liste der Suchen warten und dort die gewünschte anklicken
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, search_name)))
self.driver.find_element(By.LINK_TEXT, search_name).click()
# 6) Auf das erste Daten-Element warten, damit die Tabelle geladen ist
first_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_locator))
time.sleep(1)
def extract_current_page_results(self):
# kurz Implicit-Wait = 1 s, damit fehlende Elemente schnell übersprungen werden
self.driver.implicitly_wait(1)
# sicherstellen, dass mindestens eine Zeile im DOM ist
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel)
results = []
for i, row in enumerate(rows, 1):
# Name
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not name_elems:
# kein Name-Element gefunden
continue
ne = name_elems[0]
company_name = (ne.get_attribute("title") or ne.text).strip()
# Website
web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if web_elems:
website = web_elems[0].text.strip()
else:
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
website = td3[0].text.strip() if td3 else ""
results.append({'name': company_name, 'website': website})
# Implicit-Wait wieder zurücksetzen (Standard 10 s)
self.driver.implicitly_wait(10)
return results
def click_next_page(self) -> bool:
# alle Pagination-Buttons: Prev, Zahlen, Next
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns:
return False
nxt = btns[-1]
# falls disabled oder nicht klickbar, Schluss
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
return False
# aktuelle Seite merken, Klick ausführen
current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text
nxt.click()
# warten, bis sich die aktive Seitenzahl ändert
self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current)
return True
def run(self, search_name):
# Login + Navigation zur Search-List
self.login_and_find_list(search_name)
# Paginieren & extrahieren
all_results = []
while True:
all_results.extend(self.extract_current_page_results())
if not self.click_next_page():
break
return all_results
def main():
creds = load_credentials("dealfront_credentials.json")
username = creds.get("username")
password = creds.get("password")
# WebDriver initialisieren
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=opts)
wait = WebDriverWait(driver, 30)
# Scraper starten
scraper = DealfrontScraper(driver, wait, username, password)
results = scraper.run("Facility Management")
# Output-Ordner anlegen und als JSON speichern
os.makedirs("output", exist_ok=True)
out_file = os.path.join("output", "results.json")
with open(out_file, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(results)} Einträge in '{out_file}'")
driver.quit()
if __name__ == "__main__":
main()