dealfront_enrichment.py aktualisiert
This commit is contained in:
@@ -13,152 +13,140 @@ from selenium.webdriver.common.by import By
|
|||||||
from selenium.webdriver.support.ui import WebDriverWait
|
from selenium.webdriver.support.ui import WebDriverWait
|
||||||
from selenium.webdriver.support import expected_conditions as EC
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
|
|
||||||
# ────────────────────────────────────────────────────────────────────────────
|
# ────────────────────────────────────────────────────────────────
|
||||||
# Konstanten
|
# Konstanten
|
||||||
LOGIN_URL = "https://app.dealfront.com/login"
|
LOGIN_URL = "https://app.dealfront.com/login"
|
||||||
TARGET_SEARCH_NAME = "Facility Management"
|
SEARCH_NAME = "Facility Management"
|
||||||
CREDENTIALS_FILE = "dealfront_credentials.json"
|
CREDS_FILE = "dealfront_credentials.json"
|
||||||
OUTPUT_DIR = "output"
|
OUTPUT_DIR = "output"
|
||||||
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
|
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
|
||||||
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
|
LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s"
|
||||||
# ────────────────────────────────────────────────────────────────────────────
|
# ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
# Logging
|
|
||||||
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
logging.getLogger("selenium").setLevel(logging.WARNING)
|
logging.getLogger("selenium").setLevel(logging.WARNING)
|
||||||
|
|
||||||
def load_credentials(path):
|
def load_creds(path):
|
||||||
if not os.path.isfile(path):
|
if not os.path.exists(path):
|
||||||
logger.error(f"Credentials-Datei nicht gefunden: {path}")
|
logger.error("Credentials-Datei nicht gefunden: %s", path)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
with open(path, encoding="utf-8") as f:
|
with open(path, encoding="utf-8") as f:
|
||||||
creds = json.load(f)
|
creds = json.load(f)
|
||||||
user = creds.get("username")
|
u = creds.get("username")
|
||||||
pwd = creds.get("password")
|
p = creds.get("password")
|
||||||
if not user or not pwd:
|
if not u or not p:
|
||||||
logger.error("Credentials-Datei enthält keinen username/password.")
|
logger.error("Username oder Passwort fehlt in %s", path)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
return user, pwd
|
return u, p
|
||||||
|
|
||||||
class DealfrontScraper:
|
class DealfrontScraper:
|
||||||
def __init__(self, driver, wait, username, password):
|
def __init__(self, driver, wait, user, pwd):
|
||||||
self.driver = driver
|
self.driver = driver
|
||||||
self.wait = wait
|
self.wait = wait
|
||||||
self.username = username
|
self.user = user
|
||||||
self.password = password
|
self.pwd = pwd
|
||||||
|
|
||||||
def login_and_find_list(self):
|
def login_and_select_search(self):
|
||||||
# 1) Login
|
# 1) Login
|
||||||
self.driver.get(LOGIN_URL)
|
self.driver.get(LOGIN_URL)
|
||||||
self.wait.until(EC.visibility_of_element_located(
|
self.wait.until(EC.visibility_of_element_located(
|
||||||
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
|
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
|
||||||
))
|
))
|
||||||
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\
|
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']").send_keys(self.user)
|
||||||
.send_keys(self.username)
|
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd)
|
||||||
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\
|
|
||||||
.send_keys(self.password)
|
|
||||||
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
|
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
|
||||||
|
|
||||||
# 2) Quick-Link "Prospects finden" per XPath-Text suchen
|
# 2) Auf Sidebar-Liste warten
|
||||||
prospects_xpath = "//*[contains(normalize-space(.), 'Prospects finden')]"
|
ul_selector = "ul[data-userpilot-id='sidebar-searches-list']"
|
||||||
btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, prospects_xpath)))
|
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, ul_selector)))
|
||||||
btn.click()
|
|
||||||
|
|
||||||
# 3) Vordefinierte Suche anklicken
|
# 3) Genaue Suche anklicken über das <div title="…">
|
||||||
search_xpath = f"//*[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]"
|
xpath = f"//ul[@data-userpilot-id='sidebar-searches-list']//div[@title='{SEARCH_NAME}']"
|
||||||
btn2 = self.wait.until(EC.element_to_be_clickable((By.XPATH, search_xpath)))
|
elem = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath)))
|
||||||
btn2.click()
|
# JS-Click, damit alle Listener feuern
|
||||||
|
self.driver.execute_script("arguments[0].click();", elem)
|
||||||
|
|
||||||
# 4) Erstes Daten-Element abwarten
|
# 4) Auf erste Datenzeile warten
|
||||||
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||||
self.wait.until(EC.visibility_of_element_located(first))
|
self.wait.until(EC.visibility_of_element_located(first))
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
def extract_current_page_results(self):
|
def extract_page(self):
|
||||||
# kurz Implicit-Wait 1 s
|
# kurz Implicit-Wait 1 s, damit find_elements nicht blocken
|
||||||
self.driver.implicitly_wait(1)
|
self.driver.implicitly_wait(1)
|
||||||
|
|
||||||
# auf ≥1 Zeile warten
|
|
||||||
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
||||||
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
||||||
rows = self.driver.find_elements(*rows_sel)
|
rows = self.driver.find_elements(*rows_sel)
|
||||||
|
|
||||||
results = []
|
out = []
|
||||||
for row in rows:
|
for row in rows:
|
||||||
# Name
|
|
||||||
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||||||
if not ne:
|
if not ne:
|
||||||
continue
|
continue
|
||||||
name = (ne[0].get_attribute("title") or ne[0].text).strip()
|
name = (ne[0].get_attribute("title") or ne[0].text).strip()
|
||||||
|
|
||||||
# Website aus dritter Spalte
|
|
||||||
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
||||||
if we:
|
if we:
|
||||||
site = we[0].text.strip()
|
site = we[0].get_attribute("href").split("://")[-1].rstrip("/")
|
||||||
else:
|
else:
|
||||||
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
|
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
|
||||||
site = td3[0].text.strip() if td3 else ""
|
site = td3[0].text.strip() if td3 else ""
|
||||||
|
|
||||||
results.append({"name": name, "website": site})
|
out.append({"name": name, "website": site})
|
||||||
|
|
||||||
# Implicit-Wait zurück auf 10 s
|
# reset Implicit-Wait
|
||||||
self.driver.implicitly_wait(10)
|
self.driver.implicitly_wait(10)
|
||||||
return results
|
return out
|
||||||
|
|
||||||
def click_next_page(self):
|
def click_next(self):
|
||||||
# Paginator-Buttons (Prev, 1…n, Next)
|
|
||||||
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
||||||
if not btns:
|
if not btns:
|
||||||
return False
|
return False
|
||||||
nxt = btns[-1]
|
nxt = btns[-1]
|
||||||
if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")):
|
if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
current = self.driver.find_element(
|
current = self.driver.find_element(
|
||||||
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||||||
).text
|
).text
|
||||||
nxt.click()
|
nxt.click()
|
||||||
# auf neue aktive Seite warten
|
|
||||||
self.wait.until(lambda d: d.find_element(
|
self.wait.until(lambda d: d.find_element(
|
||||||
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||||||
).text != current)
|
).text != current)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.login_and_find_list()
|
self.login_and_select_search()
|
||||||
all_res = []
|
all_data = []
|
||||||
while True:
|
while True:
|
||||||
all_res.extend(self.extract_current_page_results())
|
all_data.extend(self.extract_page())
|
||||||
if not self.click_next_page():
|
if not self.click_next():
|
||||||
break
|
break
|
||||||
return all_res
|
return all_data
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
user, pwd = load_credentials(CREDENTIALS_FILE)
|
user, pwd = load_creds(CREDS_FILE)
|
||||||
|
|
||||||
# ChromeDriver starten
|
opts = Options()
|
||||||
opts = Options()
|
|
||||||
opts.add_argument("--headless")
|
opts.add_argument("--headless")
|
||||||
opts.add_argument("--no-sandbox")
|
opts.add_argument("--no-sandbox")
|
||||||
opts.add_argument("--disable-dev-shm-usage")
|
opts.add_argument("--disable-dev-shm-usage")
|
||||||
service = Service(CHROMEDRIVER_PATH)
|
service = Service(CHROMEDRIVER_PATH)
|
||||||
driver = webdriver.Chrome(service=service, options=opts)
|
driver = webdriver.Chrome(service=service, options=opts)
|
||||||
wait = WebDriverWait(driver, 30)
|
wait = WebDriverWait(driver, 30)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
scraper = DealfrontScraper(driver, wait, user, pwd)
|
scraper = DealfrontScraper(driver, wait, user, pwd)
|
||||||
data = scraper.run()
|
data = scraper.run()
|
||||||
finally:
|
finally:
|
||||||
driver.quit()
|
driver.quit()
|
||||||
|
|
||||||
# speichern
|
|
||||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||||
out = os.path.join(OUTPUT_DIR, "results.json")
|
out = os.path.join(OUTPUT_DIR, "results.json")
|
||||||
with open(out, "w", encoding="utf-8") as f:
|
with open(out, "w", encoding="utf-8") as f:
|
||||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
print(f"✅ Fertig: {len(data)} Einträge in '{out}'")
|
print(f"✅ Fertig: {len(data)} Einträge in {out}")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user