dealfront_enrichment.py aktualisiert
This commit is contained in:
@@ -1,156 +1,143 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import logging
|
import sys
|
||||||
from selenium import webdriver
|
from selenium import webdriver
|
||||||
from selenium.webdriver.common.by import By
|
|
||||||
from selenium.webdriver.chrome.service import Service
|
|
||||||
from selenium.webdriver.chrome.options import Options
|
from selenium.webdriver.chrome.options import Options
|
||||||
|
from selenium.webdriver.common.by import By
|
||||||
from selenium.webdriver.support.ui import WebDriverWait
|
from selenium.webdriver.support.ui import WebDriverWait
|
||||||
from selenium.webdriver.support import expected_conditions as EC
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
from selenium.common.exceptions import NoSuchElementException
|
|
||||||
|
|
||||||
# Temporäre, autarke Konfiguration (ersetzt externes config.py)
|
def load_credentials(path):
|
||||||
class TempConfig:
|
try:
|
||||||
DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login"
|
with open(path, encoding='utf-8') as f:
|
||||||
TARGET_SEARCH_NAME = "Facility Management" # Kann angepasst werden
|
return json.load(f)
|
||||||
DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json"
|
except Exception as e:
|
||||||
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
|
print(f"Fehler beim Laden der Credentials: {e}", file=sys.stderr)
|
||||||
DEFAULT_TIMEOUT = 30
|
sys.exit(1)
|
||||||
IMPLICIT_WAIT = 10
|
|
||||||
OUTPUT_DIR = "/app/output"
|
|
||||||
|
|
||||||
# Logging konfigurieren
|
|
||||||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
|
|
||||||
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
|
||||||
logging.getLogger("selenium").setLevel(logging.WARNING)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Sicherstellen, dass OUTPUT_DIR existiert
|
|
||||||
os.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True)
|
|
||||||
|
|
||||||
ios.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True)
|
|
||||||
|
|
||||||
class DealfrontScraper:
|
class DealfrontScraper:
|
||||||
def __init__(self):
|
def __init__(self, driver, wait, username, password):
|
||||||
logger.info("Initialisiere den DealfrontScraper...")
|
self.driver = driver
|
||||||
# Chrome-Optionen
|
self.wait = wait
|
||||||
chrome_options = Options()
|
self.username = username
|
||||||
chrome_options.add_argument("--headless")
|
self.password = password
|
||||||
chrome_options.add_argument("--no-sandbox")
|
|
||||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
||||||
chrome_options.add_argument("--window-size=1920,1080")
|
|
||||||
prefs = {"profile.managed_default_content_settings.images": 2}
|
|
||||||
chrome_options.add_experimental_option("prefs", prefs)
|
|
||||||
|
|
||||||
# WebDriver
|
def login_and_find_list(self, search_name):
|
||||||
service = Service(executable_path=TempConfig.CHROMEDRIVER_PATH)
|
# 1) Login-Seite aufrufen
|
||||||
try:
|
self.driver.get("https://app.dealfront.com/login")
|
||||||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
# 2) Auf E-Mail-/Username-Feld warten, dann befüllen
|
||||||
except Exception:
|
self.wait.until(EC.visibility_of_element_located(
|
||||||
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
|
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
|
||||||
raise
|
))
|
||||||
self.wait = WebDriverWait(self.driver, TempConfig.DEFAULT_TIMEOUT)
|
email_in = self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
|
||||||
self.driver.implicitly_wait(TempConfig.IMPLICIT_WAIT)
|
pwd_in = self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")
|
||||||
|
email_in.clear(); email_in.send_keys(self.username)
|
||||||
# Credentials laden
|
pwd_in.clear(); pwd_in.send_keys(self.password)
|
||||||
self.username, self.password = self._load_credentials()
|
# 3) Absenden
|
||||||
logger.info("WebDriver erfolgreich initialisiert.")
|
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
|
||||||
|
# 4) Auf den Quick-Link "Prospects finden" warten und klicken
|
||||||
def _load_credentials(self):
|
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "Prospects finden")))
|
||||||
try:
|
self.driver.find_element(By.LINK_TEXT, "Prospects finden").click()
|
||||||
with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
|
# 5) Auf die Liste der Suchen warten und dort die gewünschte anklicken
|
||||||
creds = json.load(f)
|
self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, search_name)))
|
||||||
return creds['username'], creds['password']
|
self.driver.find_element(By.LINK_TEXT, search_name).click()
|
||||||
except Exception as e:
|
# 6) Auf das erste Daten-Element warten, damit die Tabelle geladen ist
|
||||||
logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}")
|
first_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||||
raise
|
self.wait.until(EC.visibility_of_element_located(first_locator))
|
||||||
|
time.sleep(1)
|
||||||
def login_and_find_list(self):
|
|
||||||
# Login
|
|
||||||
logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}")
|
|
||||||
self.driver.get(TempConfig.DEALFRONT_LOGIN_URL)
|
|
||||||
self.wait.until(EC.visibility_of_element_located((By.NAME, 'email'))).send_keys(self.username)
|
|
||||||
self.driver.find_element(By.NAME, 'password').send_keys(self.password)
|
|
||||||
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
|
|
||||||
logger.info("Login gesendet.")
|
|
||||||
|
|
||||||
# 'Prospects finden' anklicken
|
|
||||||
tile = self.wait.until(
|
|
||||||
EC.element_to_be_clickable((By.CSS_SELECTOR, "a[data-test-target-product-tile='Prospects finden']"))
|
|
||||||
)
|
|
||||||
tile.click()
|
|
||||||
logger.info("'Prospects finden' geklickt.")
|
|
||||||
|
|
||||||
# Vordefinierte Suche auswählen
|
|
||||||
sel = (By.XPATH, f"//div[contains(@class,'truncate') and normalize-space()='{TempConfig.TARGET_SEARCH_NAME}']")
|
|
||||||
item = self.wait.until(EC.element_to_be_clickable(sel))
|
|
||||||
item.click()
|
|
||||||
logger.info(f"Suche '{TempConfig.TARGET_SEARCH_NAME}' geladen.")
|
|
||||||
|
|
||||||
def extract_current_page_results(self):
|
def extract_current_page_results(self):
|
||||||
# Warte auf erstes Daten-Element
|
# kurz Implicit-Wait = 1 s, damit fehlende Elemente schnell übersprungen werden
|
||||||
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
self.driver.implicitly_wait(1)
|
||||||
self.wait.until(EC.visibility_of_element_located(first))
|
|
||||||
|
|
||||||
logger.info("Extrahiere aktuelle Seite...")
|
# sicherstellen, dass mindestens eine Zeile im DOM ist
|
||||||
results = []
|
|
||||||
|
|
||||||
# Warten auf Zeilen
|
|
||||||
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
||||||
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
||||||
rows = self.driver.find_elements(*rows_sel)
|
rows = self.driver.find_elements(*rows_sel)
|
||||||
logger.info(f"{len(rows)} Zeilen gefunden.")
|
|
||||||
|
|
||||||
|
results = []
|
||||||
for i, row in enumerate(rows, 1):
|
for i, row in enumerate(rows, 1):
|
||||||
# Name
|
# Name
|
||||||
name_el = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||||
if not name_el:
|
if not name_elems:
|
||||||
logger.warning(f"Zeile {i}: Kein Name gefunden. Überspringe.")
|
# kein Name-Element gefunden
|
||||||
continue
|
continue
|
||||||
name = (name_el[0].get_attribute('title') or name_el[0].text).strip()
|
ne = name_elems[0]
|
||||||
|
company_name = (ne.get_attribute("title") or ne.text).strip()
|
||||||
|
|
||||||
# Website
|
# Website
|
||||||
web_el = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
||||||
if web_el:
|
if web_elems:
|
||||||
web = web_el[0].text.strip()
|
website = web_elems[0].text.strip()
|
||||||
else:
|
else:
|
||||||
cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
|
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
|
||||||
web = cell[0].text.strip() if cell else ''
|
website = td3[0].text.strip() if td3 else ""
|
||||||
|
|
||||||
results.append({'name': name, 'website': web})
|
results.append({'name': company_name, 'website': website})
|
||||||
|
|
||||||
logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.")
|
# Implicit-Wait wieder zurücksetzen (Standard 10 s)
|
||||||
|
self.driver.implicitly_wait(10)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def click_next_page(self) -> bool:
|
def click_next_page(self) -> bool:
|
||||||
|
# alle Pagination-Buttons: Prev, Zahlen, Next
|
||||||
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
||||||
if not btns:
|
if not btns:
|
||||||
return False
|
return False
|
||||||
nxt = btns[-1]
|
nxt = btns[-1]
|
||||||
if not nxt.is_enabled() or 'disabled' in nxt.get_attribute('class'):
|
# falls disabled oder nicht klickbar, Schluss
|
||||||
|
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
|
||||||
return False
|
return False
|
||||||
curr = self.driver.find_element(By.CSS_SELECTOR,
|
|
||||||
"nav.eb-pagination a.eb-pagination-button.active").text
|
# aktuelle Seite merken, Klick ausführen
|
||||||
|
current = self.driver.find_element(
|
||||||
|
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||||||
|
).text
|
||||||
nxt.click()
|
nxt.click()
|
||||||
self.wait.until(lambda d: d.find_element(By.CSS_SELECTOR,
|
# warten, bis sich die aktive Seitenzahl ändert
|
||||||
"nav.eb-pagination a.eb-pagination-button.active").text != curr)
|
self.wait.until(lambda d: d.find_element(
|
||||||
|
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||||||
|
).text != current)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def run(self):
|
def run(self, search_name):
|
||||||
try:
|
# Login + Navigation zur Search-List
|
||||||
self.login_and_find_list()
|
self.login_and_find_list(search_name)
|
||||||
all_data = []
|
|
||||||
while True:
|
|
||||||
all_data.extend(self.extract_current_page_results())
|
|
||||||
if not self.click_next_page():
|
|
||||||
break
|
|
||||||
return all_data
|
|
||||||
finally:
|
|
||||||
self.driver.quit()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
# Paginieren & extrahieren
|
||||||
scraper = DealfrontScraper()
|
all_results = []
|
||||||
data = scraper.run()
|
while True:
|
||||||
for entry in data:
|
all_results.extend(self.extract_current_page_results())
|
||||||
print(entry)
|
if not self.click_next_page():
|
||||||
|
break
|
||||||
|
return all_results
|
||||||
|
|
||||||
|
def main():
|
||||||
|
creds = load_credentials("credentials.json")
|
||||||
|
username = creds.get("username")
|
||||||
|
password = creds.get("password")
|
||||||
|
# WebDriver initialisieren
|
||||||
|
opts = Options()
|
||||||
|
opts.add_argument("--headless")
|
||||||
|
opts.add_argument("--no-sandbox")
|
||||||
|
opts.add_argument("--disable-dev-shm-usage")
|
||||||
|
driver = webdriver.Chrome(options=opts)
|
||||||
|
wait = WebDriverWait(driver, 30)
|
||||||
|
|
||||||
|
# Scraper starten
|
||||||
|
scraper = DealfrontScraper(driver, wait, username, password)
|
||||||
|
results = scraper.run("Facility Management")
|
||||||
|
|
||||||
|
# Output-Ordner anlegen und als JSON speichern
|
||||||
|
os.makedirs("output", exist_ok=True)
|
||||||
|
out_file = os.path.join("output", "results.json")
|
||||||
|
with open(out_file, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(results, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
print(f"✅ Fertig: {len(results)} Einträge in '{out_file}'")
|
||||||
|
driver.quit()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user