dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-09 12:36:38 +00:00
parent 66db573e38
commit 5fa4e559b7

View File

@@ -13,152 +13,140 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
# ──────────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────
# Konstanten # Konstanten
LOGIN_URL = "https://app.dealfront.com/login" LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management" SEARCH_NAME = "Facility Management"
CREDENTIALS_FILE = "dealfront_credentials.json" CREDS_FILE = "dealfront_credentials.json"
OUTPUT_DIR = "output" OUTPUT_DIR = "output"
CHROMEDRIVER_PATH = "/usr/bin/chromedriver" CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s" LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s"
# ──────────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────
# Logging
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.getLogger("selenium").setLevel(logging.WARNING) logging.getLogger("selenium").setLevel(logging.WARNING)
def load_credentials(path): def load_creds(path):
if not os.path.isfile(path): if not os.path.exists(path):
logger.error(f"Credentials-Datei nicht gefunden: {path}") logger.error("Credentials-Datei nicht gefunden: %s", path)
sys.exit(1) sys.exit(1)
with open(path, encoding="utf-8") as f: with open(path, encoding="utf-8") as f:
creds = json.load(f) creds = json.load(f)
user = creds.get("username") u = creds.get("username")
pwd = creds.get("password") p = creds.get("password")
if not user or not pwd: if not u or not p:
logger.error("Credentials-Datei enthält keinen username/password.") logger.error("Username oder Passwort fehlt in %s", path)
sys.exit(1) sys.exit(1)
return user, pwd return u, p
class DealfrontScraper: class DealfrontScraper:
def __init__(self, driver, wait, username, password): def __init__(self, driver, wait, user, pwd):
self.driver = driver self.driver = driver
self.wait = wait self.wait = wait
self.username = username self.user = user
self.password = password self.pwd = pwd
def login_and_find_list(self): def login_and_select_search(self):
# 1) Login # 1) Login
self.driver.get(LOGIN_URL) self.driver.get(LOGIN_URL)
self.wait.until(EC.visibility_of_element_located( self.wait.until(EC.visibility_of_element_located(
(By.CSS_SELECTOR, "input[type='email'], input[type='text']") (By.CSS_SELECTOR, "input[type='email'], input[type='text']")
)) ))
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\ self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']").send_keys(self.user)
.send_keys(self.username) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\
.send_keys(self.password)
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 2) Quick-Link "Prospects finden" per XPath-Text suchen # 2) Auf Sidebar-Liste warten
prospects_xpath = "//*[contains(normalize-space(.), 'Prospects finden')]" ul_selector = "ul[data-userpilot-id='sidebar-searches-list']"
btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, prospects_xpath))) self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, ul_selector)))
btn.click()
# 3) Vordefinierte Suche anklicken # 3) Genaue Suche anklicken über das <div title="…">
search_xpath = f"//*[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]" xpath = f"//ul[@data-userpilot-id='sidebar-searches-list']//div[@title='{SEARCH_NAME}']"
btn2 = self.wait.until(EC.element_to_be_clickable((By.XPATH, search_xpath))) elem = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath)))
btn2.click() # JS-Click, damit alle Listener feuern
self.driver.execute_script("arguments[0].click();", elem)
# 4) Erstes Daten-Element abwarten # 4) Auf erste Datenzeile warten
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first)) self.wait.until(EC.visibility_of_element_located(first))
time.sleep(1) time.sleep(1)
def extract_current_page_results(self): def extract_page(self):
# kurz Implicit-Wait 1s # kurz Implicit-Wait 1 s, damit find_elements nicht blocken
self.driver.implicitly_wait(1) self.driver.implicitly_wait(1)
# auf ≥1 Zeile warten
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_sel)) self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_sel) rows = self.driver.find_elements(*rows_sel)
results = [] out = []
for row in rows: for row in rows:
# Name
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not ne: if not ne:
continue continue
name = (ne[0].get_attribute("title") or ne[0].text).strip() name = (ne[0].get_attribute("title") or ne[0].text).strip()
# Website aus dritter Spalte
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if we: if we:
site = we[0].text.strip() site = we[0].get_attribute("href").split("://")[-1].rstrip("/")
else: else:
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
site = td3[0].text.strip() if td3 else "" site = td3[0].text.strip() if td3 else ""
results.append({"name": name, "website": site}) out.append({"name": name, "website": site})
# Implicit-Wait zurück auf 10s # reset Implicit-Wait
self.driver.implicitly_wait(10) self.driver.implicitly_wait(10)
return results return out
def click_next_page(self): def click_next(self):
# Paginator-Buttons (Prev, 1…n, Next)
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
if not btns: if not btns:
return False return False
nxt = btns[-1] nxt = btns[-1]
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")): if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"):
return False return False
current = self.driver.find_element( current = self.driver.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text ).text
nxt.click() nxt.click()
# auf neue aktive Seite warten
self.wait.until(lambda d: d.find_element( self.wait.until(lambda d: d.find_element(
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text != current) ).text != current)
return True return True
def run(self): def run(self):
self.login_and_find_list() self.login_and_select_search()
all_res = [] all_data = []
while True: while True:
all_res.extend(self.extract_current_page_results()) all_data.extend(self.extract_page())
if not self.click_next_page(): if not self.click_next():
break break
return all_res return all_data
def main(): def main():
user, pwd = load_credentials(CREDENTIALS_FILE) user, pwd = load_creds(CREDS_FILE)
# ChromeDriver starten opts = Options()
opts = Options()
opts.add_argument("--headless") opts.add_argument("--headless")
opts.add_argument("--no-sandbox") opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--disable-dev-shm-usage")
service = Service(CHROMEDRIVER_PATH) service = Service(CHROMEDRIVER_PATH)
driver = webdriver.Chrome(service=service, options=opts) driver = webdriver.Chrome(service=service, options=opts)
wait = WebDriverWait(driver, 30) wait = WebDriverWait(driver, 30)
try: try:
scraper = DealfrontScraper(driver, wait, user, pwd) scraper = DealfrontScraper(driver, wait, user, pwd)
data = scraper.run() data = scraper.run()
finally: finally:
driver.quit() driver.quit()
# speichern
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
out = os.path.join(OUTPUT_DIR, "results.json") out = os.path.join(OUTPUT_DIR, "results.json")
with open(out, "w", encoding="utf-8") as f: with open(out, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2) json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ Fertig: {len(data)} Einträge in '{out}'") print(f"✅ Fertig: {len(data)} Einträge in {out}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()