dealfront_enrichment.py aktualisiert
This commit is contained in:
@@ -4,37 +4,43 @@ import json
|
||||
import logging
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options, ChromeOptions
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from config import TempConfig # Import deiner Konfigurationsklasse
|
||||
from config import TempConfig # Deine Konfigurationsklasse mit Pfaden und URLs
|
||||
|
||||
# Logging konfigurieren
|
||||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
|
||||
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
||||
template = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
|
||||
logging.basicConfig(level=logging.INFO, format=template)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DealfrontScraper:
|
||||
def __init__(self):
|
||||
logger.info("Initialisiere den DealfrontScraper...")
|
||||
chrome_options = ChromeOptions()
|
||||
# Chrome-Optionen
|
||||
chrome_options = Options()
|
||||
prefs = {"profile.managed_default_content_settings.images": 2}
|
||||
chrome_options.add_experimental_option("prefs", prefs)
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
chrome_options.add_argument("--window-size=1920,1080")
|
||||
|
||||
# WebDriver-Service
|
||||
service = Service(executable_path=TempConfig.CHROMEDRIVER_PATH)
|
||||
try:
|
||||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||||
except Exception:
|
||||
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
|
||||
raise
|
||||
|
||||
# Explicit Wait
|
||||
self.wait = WebDriverWait(self.driver, TempConfig.DEFAULT_TIMEOUT)
|
||||
# Credentials laden
|
||||
self.username, self.password = self._load_credentials()
|
||||
logger.info("WebDriver erfolgreich initialisiert.")
|
||||
|
||||
@@ -42,76 +48,64 @@ class DealfrontScraper:
|
||||
try:
|
||||
with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
|
||||
creds = json.load(f)
|
||||
return creds.get('username'), creds.get('password')
|
||||
return creds['username'], creds['password']
|
||||
except Exception as e:
|
||||
logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}")
|
||||
raise
|
||||
|
||||
def _save_debug_artifacts(self):
|
||||
try:
|
||||
os.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True)
|
||||
ts = time.strftime("%Y%m%d-%H%M%S")
|
||||
png = os.path.join(TempConfig.OUTPUT_DIR, f"error_{ts}.png")
|
||||
html = os.path.join(TempConfig.OUTPUT_DIR, f"error_{ts}.html")
|
||||
self.driver.save_screenshot(png)
|
||||
logger.error(f"Screenshot '{png}' gespeichert.")
|
||||
with open(html, 'w', encoding='utf-8') as f:
|
||||
f.write(self.driver.page_source)
|
||||
logger.error(f"HTML-Source '{html}' gespeichert.")
|
||||
except Exception as e:
|
||||
logger.error(f"Debug-Artefakte konnten nicht gespeichert werden: {e}")
|
||||
|
||||
def login_and_find_list(self, search_name):
|
||||
# Login
|
||||
# Login-Flow
|
||||
logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}")
|
||||
self.driver.get(TempConfig.DEALFRONT_LOGIN_URL)
|
||||
self.wait.until(EC.visibility_of_element_located((By.NAME, 'email'))).send_keys(self.username)
|
||||
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
|
||||
self.driver.find_element(By.NAME, 'password').send_keys(self.password)
|
||||
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
|
||||
logger.info("Login gesendet.")
|
||||
|
||||
# 'Prospects finden'
|
||||
# Klicken auf 'Prospects finden'
|
||||
tile = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, "a[data-test-target-product-tile]"))
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, "a[data-test-target-product-tile='Prospects finden']"))
|
||||
)
|
||||
tile.click()
|
||||
logger.info("'Prospects finden' geklickt.")
|
||||
|
||||
# Vordefinierte Suche auswählen
|
||||
sel = (By.XPATH, f"//div[contains(@class,'truncate') and normalize-space()='{search_name}']")
|
||||
item = self.wait.until(EC.element_to_be_clickable(sel))
|
||||
selector = (By.XPATH, f"//div[contains(@class,'truncate') and normalize-space()='{search_name}']")
|
||||
item = self.wait.until(EC.element_to_be_clickable(selector))
|
||||
item.click()
|
||||
logger.info(f"Suche '{search_name}' geladen.")
|
||||
|
||||
def extract_current_page_results(self):
|
||||
# 1) Kurzer Implicit-Wait
|
||||
# Kurz Implicit-Wait
|
||||
self.driver.implicitly_wait(1)
|
||||
|
||||
# 2) Warten auf erstes Daten-Element
|
||||
first_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||
self.wait.until(EC.visibility_of_element_located(first_locator))
|
||||
# Warten auf erstes Daten-Element
|
||||
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||
self.wait.until(EC.visibility_of_element_located(first))
|
||||
|
||||
logger.info("Extrahiere aktuelle Seite...")
|
||||
results = []
|
||||
|
||||
# 3) Auf mindestens eine Zeile warten
|
||||
# Warten auf mindestens eine Tabellen-Zeile
|
||||
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
||||
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
||||
rows = self.driver.find_elements(*rows_sel)
|
||||
logger.info(f"{len(rows)} Zeilen gefunden.")
|
||||
|
||||
# 4) Namen & Websites extrahieren
|
||||
# Extraktion Namen & Website
|
||||
for i, row in enumerate(rows, 1):
|
||||
names = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||
if not names:
|
||||
logger.warning(f"Zeile {i}: Kein Name gefunden.")
|
||||
# Name
|
||||
name_el = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||
if not name_el:
|
||||
logger.warning(f"Zeile {i}: Kein Name gefunden. Überspringe.")
|
||||
continue
|
||||
name_elem = names[0]
|
||||
name = (name_elem.get_attribute('title') or name_elem.text).strip()
|
||||
elem = name_el[0]
|
||||
name = (elem.get_attribute('title') or elem.text).strip()
|
||||
|
||||
webs = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
||||
if webs:
|
||||
web = webs[0].text.strip()
|
||||
# Website
|
||||
web_el = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
||||
if web_el:
|
||||
web = web_el[0].text.strip()
|
||||
else:
|
||||
cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
|
||||
web = cell[0].text.strip() if cell else ''
|
||||
@@ -119,38 +113,34 @@ class DealfrontScraper:
|
||||
results.append({'name': name, 'website': web})
|
||||
|
||||
logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.")
|
||||
# Implicit-Wait reset
|
||||
# Implicit-Wait zurücksetzen
|
||||
self.driver.implicitly_wait(TempConfig.IMPLICIT_WAIT)
|
||||
return results
|
||||
|
||||
def click_next_page(self) -> bool:
|
||||
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
||||
if not btns:
|
||||
buttons = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
||||
if not buttons:
|
||||
return False
|
||||
nxt = btns[-1]
|
||||
nxt = buttons[-1]
|
||||
if not nxt.is_enabled() or 'disabled' in nxt.get_attribute('class'):
|
||||
return False
|
||||
curr = self.driver.find_element(
|
||||
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||||
).text
|
||||
current = self.driver.find_element(By.CSS_SELECTOR,
|
||||
"nav.eb-pagination a.eb-pagination-button.active").text
|
||||
nxt.click()
|
||||
self.wait.until(
|
||||
lambda d: d.find_element(
|
||||
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||||
).text != curr
|
||||
)
|
||||
self.wait.until(lambda d: d.find_element(By.CSS_SELECTOR,
|
||||
"nav.eb-pagination a.eb-pagination-button.active").text != current)
|
||||
return True
|
||||
|
||||
def run(self, search_name):
|
||||
try:
|
||||
self.login_and_find_list(search_name)
|
||||
all_res = []
|
||||
all_data = []
|
||||
while True:
|
||||
page_res = self.extract_current_page_results()
|
||||
all_res.extend(page_res)
|
||||
page = self.extract_current_page_results()
|
||||
all_data.extend(page)
|
||||
if not self.click_next_page():
|
||||
break
|
||||
return all_res
|
||||
return all_data
|
||||
finally:
|
||||
self.driver.quit()
|
||||
|
||||
@@ -158,5 +148,5 @@ class DealfrontScraper:
|
||||
if __name__ == '__main__':
|
||||
scraper = DealfrontScraper()
|
||||
data = scraper.run('Facility Management')
|
||||
for d in data:
|
||||
print(d)
|
||||
for entry in data:
|
||||
print(entry)
|
||||
|
||||
Reference in New Issue
Block a user