Files
Brancheneinstufung2/dealfront_enrichment.py

165 lines
6.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import os
import sys
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# ────────────────────────────────────────────────────────────────────────────
# Konstanten
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management"
CREDENTIALS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
# ────────────────────────────────────────────────────────────────────────────
# Logging
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
logging.getLogger("selenium").setLevel(logging.WARNING)
def load_credentials(path):
if not os.path.isfile(path):
logger.error(f"Credentials-Datei nicht gefunden: {path}")
sys.exit(1)
with open(path, encoding="utf-8") as f:
creds = json.load(f)
user = creds.get("username")
pwd = creds.get("password")
if not user or not pwd:
logger.error("Credentials-Datei enthält keinen username/password.")
sys.exit(1)
return user, pwd
class DealfrontScraper:
def __init__(self, driver, wait, username, password):
self.driver = driver
self.wait = wait
self.username = username
self.password = password
def login_and_find_list(self):
# 1) Login
self.driver.get(LOGIN_URL)
self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
))
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\
.send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\
.send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 2) Quick-Link "Prospects finden" per XPath-Text suchen
prospects_xpath = "//*[contains(normalize-space(.), 'Prospects finden')]"
btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, prospects_xpath)))
btn.click()
# 3) Vordefinierte Suche anklicken
search_xpath = f"//*[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]"
btn2 = self.wait.until(EC.element_to_be_clickable((By.XPATH, search_xpath)))
btn2.click()
# 4) Erstes Daten-Element abwarten
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first))
time.sleep(1)
def extract_current_page_results(self):
# kurz Implicit-Wait 1s
self.driver.implicitly_wait(1)
# auf ≥1 Zeile warten
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel)
results = []
for row in rows:
# Name
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not ne:
continue
name = (ne[0].get_attribute("title") or ne[0].text).strip()
# Website aus dritter Spalte
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if we:
site = we[0].text.strip()
else:
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
site = td3[0].text.strip() if td3 else ""
results.append({"name": name, "website": site})
# Implicit-Wait zurück auf 10s
self.driver.implicitly_wait(10)
return results
def click_next_page(self):
# Paginator-Buttons (Prev, 1…n, Next)
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns:
return False
nxt = btns[-1]
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
return False
current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text
nxt.click()
# auf neue aktive Seite warten
self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current)
return True
def run(self):
self.login_and_find_list()
all_res = []
while True:
all_res.extend(self.extract_current_page_results())
if not self.click_next_page():
break
return all_res
def main():
user, pwd = load_credentials(CREDENTIALS_FILE)
# ChromeDriver starten
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
service = Service(CHROMEDRIVER_PATH)
driver = webdriver.Chrome(service=service, options=opts)
wait = WebDriverWait(driver, 30)
try:
scraper = DealfrontScraper(driver, wait, user, pwd)
data = scraper.run()
finally:
driver.quit()
# speichern
os.makedirs(OUTPUT_DIR, exist_ok=True)
out = os.path.join(OUTPUT_DIR, "results.json")
with open(out, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(data)} Einträge in '{out}'")
if __name__ == "__main__":
main()