dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-08 19:24:41 +00:00
parent 34a97f8016
commit 043b597f23

View File

@@ -1,32 +1,58 @@
```python
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys
import json import json
import time import time
import sys import logging
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
# ───────────────────────────────────────────────────────────────────────────────
# Konstanten
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management"
CREDENTIALS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
# ───────────────────────────────────────────────────────────────────────────────
# Logging konfigurieren
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
logging.getLogger("selenium").setLevel(logging.WARNING)
def load_credentials(path): def load_credentials(path):
try: if not os.path.isfile(path):
with open(path, encoding='utf-8') as f: logger.error(f"Credentials-Datei nicht gefunden: {path}")
return json.load(f)
except Exception as e:
print(f"Fehler beim Laden der Credentials: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
with open(path, encoding="utf-8") as f:
creds = json.load(f)
user = creds.get("username")
pwd = creds.get("password")
if not user or not pwd:
logger.error("Credentials-Datei enthält keinen username/password.")
sys.exit(1)
return user, pwd
class DealfrontScraper: class DealfrontScraper:
def __init__(self, driver, wait, username, password): def __init__(self, driver, wait, username, password):
self.driver = driver self.driver = driver
self.wait = wait self.wait = wait
self.username = username self.username = username
self.password = password self.password = password
def login_and_find_list(self, search_name): def login_and_find_list(self, search_name):
# 1) Login-Seite öffnen & Credentials absenden # Login-Seite öffnen
self.driver.get(LOGIN_URL) self.driver.get(LOGIN_URL)
# E-Mail/Username und Passwort eintragen
self.wait.until(EC.visibility_of_element_located( self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']") (By.CSS_SELECTOR, "input[type='email'], input[type='text']")
)) ))
@@ -36,84 +62,74 @@ class DealfrontScraper:
.send_keys(self.password) .send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 2) Quick-Link "Prospects finden" über XPath anklicken # Quick-Link "Prospects finden" anklicken
prospects_btn = self.wait.until(EC.element_to_be_clickable(( btn = self.wait.until(EC.element_to_be_clickable((
By.XPATH, By.XPATH, "//a[normalize-space()='Prospects finden']"
"//a[normalize-space()='Prospects finden']"
))) )))
prospects_btn.click() btn.click()
# 3) Gesuchte vordefinierte Suche anklicken # Gewünschte Suche auswählen
search_btn = self.wait.until(EC.element_to_be_clickable(( btn = self.wait.until(EC.element_to_be_clickable((
By.XPATH, By.XPATH, f"//a[normalize-space()='{search_name}']"
f"//a[normalize-space()='{search_name}']"
))) )))
search_btn.click() btn.click()
# 4) Auf erstes Daten-Element warten # Auf erste Datenzeile warten
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first)) self.wait.until(EC.visibility_of_element_located(first))
time.sleep(1) time.sleep(1)
def extract_current_page_results(self): def extract_current_page_results(self):
# kurz Implicit-Wait = 1 s, damit fehlende Elemente schnell übersprungen werden # Implicit-Wait kurz absenken
self.driver.implicitly_wait(1) self.driver.implicitly_wait(1)
# sicherstellen, dass mindestens eine Zeile im DOM ist # Mindestens eine Tabellenzeile im DOM
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel)) self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel) rows = self.driver.find_elements(*rows_sel)
results = [] results = []
for i, row in enumerate(rows, 1): for row in rows:
# Name # Firmenname
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not name_elems: if not ne:
# kein Name-Element gefunden
continue continue
ne = name_elems[0] name = (ne[0].get_attribute("title") or ne[0].text).strip()
company_name = (ne.get_attribute("title") or ne.text).strip()
# Website # Website aus 3. Spalte
web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if web_elems: if we:
website = web_elems[0].text.strip() site = we[0].text.strip()
else: else:
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
website = td3[0].text.strip() if td3 else "" site = td3[0].text.strip() if td3 else ""
results.append({'name': company_name, 'website': website}) results.append({"name": name, "website": site})
# Implicit-Wait wieder zurücksetzen (Standard 10 s) # Implicit-Wait zurücksetzen
self.driver.implicitly_wait(10) self.driver.implicitly_wait(10)
return results return results
def click_next_page(self) -> bool: def click_next_page(self):
# alle Pagination-Buttons: Prev, Zahlen, Next # Prev, Seiten, Next Buttons
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns: if not btns:
return False return False
nxt = btns[-1] nxt = btns[-1]
# falls disabled oder nicht klickbar, Schluss
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")): if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
return False return False
# aktuelle Seite merken, Klick ausführen
current = self.driver.find_element( current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text ).text
nxt.click() nxt.click()
# warten, bis sich die aktive Seitenzahl ändert
self.wait.until(lambda d: d.find_element( self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current) ).text != current)
return True return True
def run(self, search_name): def run(self, search_name):
# Login + Navigation zur Search-List
self.login_and_find_list(search_name) self.login_and_find_list(search_name)
# Paginieren & extrahieren
all_results = [] all_results = []
while True: while True:
all_results.extend(self.extract_current_page_results()) all_results.extend(self.extract_current_page_results())
@@ -122,29 +138,29 @@ class DealfrontScraper:
return all_results return all_results
def main(): def main():
creds = load_credentials("dealfront_credentials.json") username, password = load_credentials(CREDENTIALS_FILE)
username = creds.get("username")
password = creds.get("password") # Chrome WebDriver initialisieren
# WebDriver initialisieren
opts = Options() opts = Options()
opts.add_argument("--headless") opts.add_argument("--headless")
opts.add_argument("--no-sandbox") opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=opts) service = Service(CHROMEDRIVER_PATH)
driver = webdriver.Chrome(service=service, options=opts)
wait = WebDriverWait(driver, 30) wait = WebDriverWait(driver, 30)
# Scraper starten
scraper = DealfrontScraper(driver, wait, username, password) scraper = DealfrontScraper(driver, wait, username, password)
results = scraper.run("Facility Management") results = scraper.run(TARGET_SEARCH_NAME)
# Output-Ordner anlegen und als JSON speichern # Ergebnisse speichern
os.makedirs("output", exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
out_file = os.path.join("output", "results.json") outfile = os.path.join(OUTPUT_DIR, "results.json")
with open(out_file, "w", encoding="utf-8") as f: with open(outfile, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2) json.dump(results, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(results)} Einträge in '{out_file}'") print(f"✅ Fertig: {len(results)} Einträge in {outfile}")
driver.quit() driver.quit()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
```