180 lines
6.9 KiB
Python
180 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.common.exceptions import TimeoutException
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
# ────────────────────────────────────────────────────────────────
|
|
# Konstanten
|
|
LOGIN_URL = "https://app.dealfront.com/login"
|
|
TARGET_TAB = "Target"
|
|
SEARCH_NAME = "Facility Management"
|
|
CREDS_FILE = "dealfront_credentials.json"
|
|
OUTPUT_DIR = "output"
|
|
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
|
|
LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s"
|
|
# ────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
|
|
logger = logging.getLogger(__name__)
|
|
logging.getLogger("selenium").setLevel(logging.WARNING)
|
|
|
|
def load_creds(path):
|
|
if not os.path.exists(path):
|
|
logger.error("Credentials-Datei nicht gefunden: %s", path)
|
|
sys.exit(1)
|
|
creds = json.load(open(path, encoding="utf-8"))
|
|
u, p = creds.get("username"), creds.get("password")
|
|
if not u or not p:
|
|
logger.error("username/password fehlen in %s", path)
|
|
sys.exit(1)
|
|
return u, p
|
|
|
|
class DealfrontScraper:
|
|
def __init__(self, driver, wait, user, pwd):
|
|
self.driver = driver
|
|
self.wait = wait
|
|
self.user = user
|
|
self.pwd = pwd
|
|
|
|
def login_and_select_search(self):
|
|
# 1) Login-Seite aufrufen
|
|
self.driver.get(LOGIN_URL)
|
|
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[type='email'],input[type='text']")))
|
|
|
|
# 2) Credentials eintragen
|
|
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'],input[type='text']").send_keys(self.user)
|
|
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd)
|
|
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
|
|
|
|
# 3) Auf Target-Tab klicken
|
|
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_TAB))).click()
|
|
|
|
# 4) Sidebar mit Such-List laden
|
|
sidebar_sel = "ul[data-userpilot-id='sidebar-searches-list']"
|
|
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, sidebar_sel)))
|
|
|
|
# 5) Deine Suche anklicken (div[title=…])
|
|
div_sel = f"div[title='{SEARCH_NAME}']"
|
|
el = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, div_sel)))
|
|
self.driver.execute_script("arguments[0].click()", el)
|
|
|
|
# 6) Kurzes Warten, bis erste Zeile da ist
|
|
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "a.t-highlight-text.t-highlight-text-snippet")))
|
|
time.sleep(1)
|
|
|
|
def extract_current_page_results(self):
|
|
# 1) Kurz Implicit-Wait absenken
|
|
self.driver.implicitly_wait(1)
|
|
|
|
# 2) Auf erstes Daten-Element warten und Puffer
|
|
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
|
self.wait.until(EC.visibility_of_element_located(first))
|
|
time.sleep(1)
|
|
|
|
try:
|
|
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
|
|
results = []
|
|
|
|
# 3) Auf mindestens eine Tabellenzeile warten
|
|
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
|
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
|
rows = self.driver.find_elements(*rows_sel)
|
|
logger.info(f"{len(rows)} Firmen-Zeilen gefunden.")
|
|
|
|
# 4) Schleife ohne weitere Sleeps
|
|
for i, row in enumerate(rows, 1):
|
|
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
|
if not name_elems:
|
|
logger.warning(f"Zeile {i}: Kein Name-Element gefunden. Überspringe.")
|
|
continue
|
|
ne = name_elems[0]
|
|
company_name = (ne.get_attribute("title") or ne.text).strip()
|
|
|
|
web_elems = row.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
|
|
website = web_elems[0].text.strip() if web_elems else ""
|
|
|
|
results.append({'name': company_name, 'website': website})
|
|
|
|
logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return []
|
|
|
|
finally:
|
|
# 5) Implicit-Wait wieder auf Standard setzen (z.B. 10 s)
|
|
self.driver.implicitly_wait(10)
|
|
|
|
def click_next_page(self):
|
|
# Paginator-Buttons greifen
|
|
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
|
if not btns:
|
|
return False
|
|
nxt = btns[-1]
|
|
# Ende erreicht?
|
|
if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"):
|
|
return False
|
|
|
|
current = self.driver.find_element(
|
|
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
|
).text
|
|
nxt.click()
|
|
# auf Seitenwechsel warten
|
|
self.wait.until(lambda d: d.find_element(
|
|
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
|
).text != current)
|
|
return True
|
|
|
|
def run(self):
|
|
logger.info("Starte Login und Sucheauswahl…")
|
|
self.login_and_select_search()
|
|
|
|
all_res = []
|
|
page = 1
|
|
while True:
|
|
logger.info(f"Seite {page}: Extrahiere Daten…")
|
|
all_res.extend(self.extract_current_page_results())
|
|
if not self.click_next_page():
|
|
break
|
|
page += 1
|
|
return all_res
|
|
|
|
def main():
|
|
user, pwd = load_creds(CREDS_FILE)
|
|
|
|
opts = Options()
|
|
opts.add_argument("--headless")
|
|
opts.add_argument("--no-sandbox")
|
|
opts.add_argument("--disable-dev-shm-usage")
|
|
service = Service(CHROMEDRIVER_PATH)
|
|
driver = webdriver.Chrome(service=service, options=opts)
|
|
wait = WebDriverWait(driver, 30)
|
|
|
|
try:
|
|
scraper = DealfrontScraper(driver, wait, user, pwd)
|
|
results = scraper.run()
|
|
finally:
|
|
driver.quit()
|
|
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
path = os.path.join(OUTPUT_DIR, "results.json")
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(results, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(f"✅ Fertig: {len(results)} Einträge in {path}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|