dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-08 19:11:58 +00:00
parent a5f1e04187
commit f0c13e3b5a

View File

@@ -1,156 +1,143 @@
#!/usr/bin/env python3
import os
import json
import time
import logging
import sys
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
# Temporäre, autarke Konfiguration (ersetzt externes config.py)
class TempConfig:
DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management" # Kann angepasst werden
DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
DEFAULT_TIMEOUT = 30
IMPLICIT_WAIT = 10
OUTPUT_DIR = "/app/output"
# Logging konfigurieren
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logging.getLogger("selenium").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# Sicherstellen, dass OUTPUT_DIR existiert
os.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True)
ios.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True)
def load_credentials(path):
try:
with open(path, encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Fehler beim Laden der Credentials: {e}", file=sys.stderr)
sys.exit(1)
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere den DealfrontScraper...")
# Chrome-Optionen
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs)
def __init__(self, driver, wait, username, password):
self.driver = driver
self.wait = wait
self.username = username
self.password = password
# WebDriver
service = Service(executable_path=TempConfig.CHROMEDRIVER_PATH)
try:
self.driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception:
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise
self.wait = WebDriverWait(self.driver, TempConfig.DEFAULT_TIMEOUT)
self.driver.implicitly_wait(TempConfig.IMPLICIT_WAIT)
# Credentials laden
self.username, self.password = self._load_credentials()
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
creds = json.load(f)
return creds['username'], creds['password']
except Exception as e:
logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}")
raise
def login_and_find_list(self):
# Login
logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}")
self.driver.get(TempConfig.DEALFRONT_LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, 'email'))).send_keys(self.username)
self.driver.find_element(By.NAME, 'password').send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login gesendet.")
# 'Prospects finden' anklicken
tile = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "a[data-test-target-product-tile='Prospects finden']"))
)
tile.click()
logger.info("'Prospects finden' geklickt.")
# Vordefinierte Suche auswählen
sel = (By.XPATH, f"//div[contains(@class,'truncate') and normalize-space()='{TempConfig.TARGET_SEARCH_NAME}']")
item = self.wait.until(EC.element_to_be_clickable(sel))
item.click()
logger.info(f"Suche '{TempConfig.TARGET_SEARCH_NAME}' geladen.")
def login_and_find_list(self, search_name):
# 1) Login-Seite aufrufen
self.driver.get("https://app.dealfront.com/login")
# 2) Auf E-Mail-/Username-Feld warten, dann befüllen
self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
))
email_in = self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
pwd_in = self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")
email_in.clear(); email_in.send_keys(self.username)
pwd_in.clear(); pwd_in.send_keys(self.password)
# 3) Absenden
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 4) Auf den Quick-Link "Prospects finden" warten und klicken
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "Prospects finden")))
self.driver.find_element(By.LINK_TEXT, "Prospects finden").click()
# 5) Auf die Liste der Suchen warten und dort die gewünschte anklicken
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, search_name)))
self.driver.find_element(By.LINK_TEXT, search_name).click()
# 6) Auf das erste Daten-Element warten, damit die Tabelle geladen ist
first_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_locator))
time.sleep(1)
def extract_current_page_results(self):
# Warte auf erstes Daten-Element
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first))
# kurz Implicit-Wait = 1 s, damit fehlende Elemente schnell übersprungen werden
self.driver.implicitly_wait(1)
logger.info("Extrahiere aktuelle Seite...")
results = []
# Warten auf Zeilen
# sicherstellen, dass mindestens eine Zeile im DOM ist
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel)
logger.info(f"{len(rows)} Zeilen gefunden.")
results = []
for i, row in enumerate(rows, 1):
# Name
name_el = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not name_el:
logger.warning(f"Zeile {i}: Kein Name gefunden. Überspringe.")
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not name_elems:
# kein Name-Element gefunden
continue
name = (name_el[0].get_attribute('title') or name_el[0].text).strip()
ne = name_elems[0]
company_name = (ne.get_attribute("title") or ne.text).strip()
# Website
web_el = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if web_el:
web = web_el[0].text.strip()
web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if web_elems:
website = web_elems[0].text.strip()
else:
cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
web = cell[0].text.strip() if cell else ''
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
website = td3[0].text.strip() if td3 else ""
results.append({'name': name, 'website': web})
results.append({'name': company_name, 'website': website})
logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.")
# Implicit-Wait wieder zurücksetzen (Standard 10 s)
self.driver.implicitly_wait(10)
return results
def click_next_page(self) -> bool:
# alle Pagination-Buttons: Prev, Zahlen, Next
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns:
return False
nxt = btns[-1]
if not nxt.is_enabled() or 'disabled' in nxt.get_attribute('class'):
# falls disabled oder nicht klickbar, Schluss
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
return False
curr = self.driver.find_element(By.CSS_SELECTOR,
"nav.eb-pagination a.eb-pagination-button.active").text
# aktuelle Seite merken, Klick ausführen
current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text
nxt.click()
self.wait.until(lambda d: d.find_element(By.CSS_SELECTOR,
"nav.eb-pagination a.eb-pagination-button.active").text != curr)
# warten, bis sich die aktive Seitenzahl ändert
self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current)
return True
def run(self):
try:
self.login_and_find_list()
all_data = []
while True:
all_data.extend(self.extract_current_page_results())
if not self.click_next_page():
break
return all_data
finally:
self.driver.quit()
def run(self, search_name):
# Login + Navigation zur Search-List
self.login_and_find_list(search_name)
if __name__ == '__main__':
scraper = DealfrontScraper()
data = scraper.run()
for entry in data:
print(entry)
# Paginieren & extrahieren
all_results = []
while True:
all_results.extend(self.extract_current_page_results())
if not self.click_next_page():
break
return all_results
def main():
creds = load_credentials("credentials.json")
username = creds.get("username")
password = creds.get("password")
# WebDriver initialisieren
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=opts)
wait = WebDriverWait(driver, 30)
# Scraper starten
scraper = DealfrontScraper(driver, wait, username, password)
results = scraper.run("Facility Management")
# Output-Ordner anlegen und als JSON speichern
os.makedirs("output", exist_ok=True)
out_file = os.path.join("output", "results.json")
with open(out_file, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(results)} Einträge in '{out_file}'")
driver.quit()
if __name__ == "__main__":
main()