dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-08 19:35:05 +00:00
parent a135cbdced
commit 8840abe77e

View File

@@ -8,18 +8,19 @@ import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# ───────────────────────────────────────────────────────────────────────────────
# Konstanten
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management"
CREDENTIALS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management"
CREDENTIALS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
# ───────────────────────────────────────────────────────────────────────────────
# Logging konfigurieren
@@ -47,33 +48,34 @@ class DealfrontScraper:
self.username = username
self.password = password
def login_and_find_list(self, search_name):
# Login-Seite öffnen
def login_and_find_list(self):
# 1) Login-Seite öffnen
self.driver.get(LOGIN_URL)
# E-Mail/Username und Passwort eintragen
# 2) Credentials eintragen & absenden
self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
))
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\
.send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\
.send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']").send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# Quick-Link "Prospects finden" anklicken
btn = self.wait.until(EC.element_to_be_clickable((
By.XPATH, "//a[normalize-space()='Prospects finden']"
)))
btn.click()
# 3) Quick-Link "Prospects finden" anklicken (fällt zurück auf href-Suche, falls Link-Text fehlt)
try:
btn = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "Prospects finden")))
except TimeoutException:
btn = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "a[href*='prospects']")))
self.driver.execute_script("arguments[0].click();", btn)
# Gewünschte Suche auswählen
btn = self.wait.until(EC.element_to_be_clickable((
By.XPATH, f"//a[normalize-space()='{search_name}']"
)))
btn.click()
# 4) Gewünschte vordefinierte Suche anklicken
try:
btn2 = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_SEARCH_NAME)))
except TimeoutException:
xpath = f"//a[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]"
btn2 = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
self.driver.execute_script("arguments[0].click();", btn2)
# Auf erste Datenzeile warten
# 5) Auf erstes Daten-Element warten
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first))
time.sleep(1)
@@ -82,14 +84,14 @@ class DealfrontScraper:
# Implicit-Wait kurz absenken
self.driver.implicitly_wait(1)
# Mindestens eine Tabellenzeile im DOM
# Auf mindestens eine Tabellenzeile warten, dann alle extrahieren
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel)
results = []
for row in rows:
# Firmenname
# Name
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not ne:
continue
@@ -105,17 +107,17 @@ class DealfrontScraper:
results.append({"name": name, "website": site})
# Implicit-Wait zurücksetzen
# Implicit-Wait wieder auf Standard (10s) setzen
self.driver.implicitly_wait(10)
return results
def click_next_page(self):
# Prev, Seiten, Next Buttons
# Paginator-Buttons greifen
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns:
return False
nxt = btns[-1]
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"):
return False
current = self.driver.find_element(
@@ -127,8 +129,8 @@ class DealfrontScraper:
).text != current)
return True
def run(self, search_name):
self.login_and_find_list(search_name)
def run(self):
self.login_and_find_list()
all_results = []
while True:
all_results.extend(self.extract_current_page_results())
@@ -139,7 +141,7 @@ class DealfrontScraper:
def main():
username, password = load_credentials(CREDENTIALS_FILE)
# Chrome WebDriver initialisieren
# WebDriver initialisieren
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
@@ -148,17 +150,19 @@ def main():
driver = webdriver.Chrome(service=service, options=opts)
wait = WebDriverWait(driver, 30)
scraper = DealfrontScraper(driver, wait, username, password)
results = scraper.run(TARGET_SEARCH_NAME)
try:
scraper = DealfrontScraper(driver, wait, username, password)
results = scraper.run()
finally:
driver.quit()
# Ergebnisse speichern
os.makedirs(OUTPUT_DIR, exist_ok=True)
outfile = os.path.join(OUTPUT_DIR, "results.json")
with open(outfile, "w", encoding="utf-8") as f:
out = os.path.join(OUTPUT_DIR, "results.json")
with open(out, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(results)} Einträge in {outfile}")
driver.quit()
print(f"✅ Fertig: {len(results)} Einträge in '{out}'")
if __name__ == "__main__":
main()