dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-08 19:38:51 +00:00
parent 4e4268e78d
commit 66db573e38

View File

@@ -13,7 +13,7 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
# ─────────────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────
# Konstanten # Konstanten
LOGIN_URL = "https://app.dealfront.com/login" LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management" TARGET_SEARCH_NAME = "Facility Management"
@@ -21,9 +21,9 @@ CREDENTIALS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output" OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver" CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s" LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
# ─────────────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────
# Logging konfigurieren # Logging
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.getLogger("selenium").setLevel(logging.WARNING) logging.getLogger("selenium").setLevel(logging.WARNING)
@@ -49,42 +49,37 @@ class DealfrontScraper:
self.password = password self.password = password
def login_and_find_list(self): def login_and_find_list(self):
# 1) Login-Seite öffnen # 1) Login
self.driver.get(LOGIN_URL) self.driver.get(LOGIN_URL)
# 2) Credentials eintragen & absenden
self.wait.until(EC.visibility_of_element_located( self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']") (By.CSS_SELECTOR, "input[type='email'], input[type='text']")
)) ))
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']").send_keys(self.username) self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) .send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\
.send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 3) Quick-Link "Prospects finden" anklicken (fällt zurück auf href-Suche, falls Link-Text fehlt) # 2) Quick-Link "Prospects finden" per XPath-Text suchen
try: prospects_xpath = "//*[contains(normalize-space(.), 'Prospects finden')]"
btn = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "Prospects finden"))) btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, prospects_xpath)))
except TimeoutException: btn.click()
btn = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "a[href*='prospects']")))
self.driver.execute_script("arguments[0].click();", btn)
# 4) Gewünschte vordefinierte Suche anklicken # 3) Vordefinierte Suche anklicken
try: search_xpath = f"//*[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]"
btn2 = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_SEARCH_NAME))) btn2 = self.wait.until(EC.element_to_be_clickable((By.XPATH, search_xpath)))
except TimeoutException: btn2.click()
xpath = f"//a[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]"
btn2 = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
self.driver.execute_script("arguments[0].click();", btn2)
# 5) Auf erstes Daten-Element warten # 4) Erstes Daten-Element abwarten
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first)) self.wait.until(EC.visibility_of_element_located(first))
time.sleep(1) time.sleep(1)
def extract_current_page_results(self): def extract_current_page_results(self):
# Implicit-Wait kurz absenken # kurz Implicit-Wait 1s
self.driver.implicitly_wait(1) self.driver.implicitly_wait(1)
# Auf mindestens eine Tabellenzeile warten, dann alle extrahieren # auf ≥1 Zeile warten
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel)) self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel) rows = self.driver.find_elements(*rows_sel)
@@ -97,7 +92,7 @@ class DealfrontScraper:
continue continue
name = (ne[0].get_attribute("title") or ne[0].text).strip() name = (ne[0].get_attribute("title") or ne[0].text).strip()
# Website aus 3. Spalte # Website aus dritter Spalte
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if we: if we:
site = we[0].text.strip() site = we[0].text.strip()
@@ -107,23 +102,24 @@ class DealfrontScraper:
results.append({"name": name, "website": site}) results.append({"name": name, "website": site})
# Implicit-Wait wieder auf Standard (10s) setzen # Implicit-Wait zurück auf 10s
self.driver.implicitly_wait(10) self.driver.implicitly_wait(10)
return results return results
def click_next_page(self): def click_next_page(self):
# Paginator-Buttons greifen # Paginator-Buttons (Prev, 1…n, Next)
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns: if not btns:
return False return False
nxt = btns[-1] nxt = btns[-1]
if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"): if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
return False return False
current = self.driver.find_element( current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text ).text
nxt.click() nxt.click()
# auf neue aktive Seite warten
self.wait.until(lambda d: d.find_element( self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current) ).text != current)
@@ -131,17 +127,17 @@ class DealfrontScraper:
def run(self): def run(self):
self.login_and_find_list() self.login_and_find_list()
all_results = [] all_res = []
while True: while True:
all_results.extend(self.extract_current_page_results()) all_res.extend(self.extract_current_page_results())
if not self.click_next_page(): if not self.click_next_page():
break break
return all_results return all_res
def main(): def main():
username, password = load_credentials(CREDENTIALS_FILE) user, pwd = load_credentials(CREDENTIALS_FILE)
# WebDriver initialisieren # ChromeDriver starten
opts = Options() opts = Options()
opts.add_argument("--headless") opts.add_argument("--headless")
opts.add_argument("--no-sandbox") opts.add_argument("--no-sandbox")
@@ -151,18 +147,18 @@ def main():
wait = WebDriverWait(driver, 30) wait = WebDriverWait(driver, 30)
try: try:
scraper = DealfrontScraper(driver, wait, username, password) scraper = DealfrontScraper(driver, wait, user, pwd)
results = scraper.run() data = scraper.run()
finally: finally:
driver.quit() driver.quit()
# Ergebnisse speichern # speichern
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
out = os.path.join(OUTPUT_DIR, "results.json") out = os.path.join(OUTPUT_DIR, "results.json")
with open(out, "w", encoding="utf-8") as f: with open(out, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2) json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(results)} Einträge in '{out}'") print(f"✅ Fertig: {len(data)} Einträge in '{out}'")
if __name__ == "__main__": if __name__ == "__main__":
main() main()