helpers.py aktualisiert

This commit is contained in:
2025-07-18 14:37:52 +00:00
parent 6b2be48470
commit 47d8a85e46

View File

@@ -1334,122 +1334,135 @@ def search_linkedin_contacts(company_name, website, position_query, crm_kurzform
# 11. WEBSITE SCRAPING & VALIDATION UTILITIES # 11. WEBSITE SCRAPING & VALIDATION UTILITIES
# ============================================================================== # ==============================================================================
@retry_on_failure
def get_website_raw(url, max_length=20000): # verify_cert wird entfernt def get_website_raw(url, max_length=20000): # verify_cert wird entfernt
""" """
Holt Textinhalt von einer Website, versucht Cookie-Banner zu umgehen. Holt Textinhalt von einer Website, versucht Cookie-Banner zu umgehen.
Versucht zuerst eine sichere Verbindung, bei SSL-Fehler einen unsicheren Fallback. Versucht zuerst eine sichere Verbindung, bei SSL-Fehler einen unsicheren Fallback.
""" """
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or url.strip().lower() in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]: # KORREKTUR: Start des allumfassenden try-Blocks
logger.debug(f"get_website_raw skipped: Ungueltige oder leere URL '{url}'.")
return "k.A."
if not url.lower().startswith(("http://", "https://")):
url = "https://" + url
headers = {"User-Agent": random.choice(USER_AGENTS)}
response = None
error_reason = "Unbekannter Fehler"
return_marker = False
try: try:
# Erster Versuch: Immer mit Zertifikatsprüfung (sicher) if not url or not isinstance(url, str) or url.strip().lower() in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]:
logger.debug(f"Versuche Website sicher abzurufen: {url[:100]}... (verify=True)") logger.debug(f"get_website_raw skipped: Ungueltige oder leere URL '{url}'.")
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=True, allow_redirects=True, stream=False) return "k.A. (Ungueltige URL)" # Etwas informativerer Fehler
response.raise_for_status()
error_reason = None if not url.lower().startswith(("http://", "https://")):
except requests.exceptions.SSLError: url = "https://" + url
# Zweiter Versuch bei SSL-Fehler: Ohne Zertifikatsprüfung
logger.warning(f"SSL-Fehler fuer {url[:100]}... Versuche erneut mit verify=False.") headers = {"User-Agent": random.choice(USER_AGENTS)}
response = None
error_reason = "Unbekannter Fehler"
return_marker = False
try: try:
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=False, allow_redirects=True, stream=False) # Erster Versuch: Immer mit Zertifikatsprüfung (sicher)
logger.debug(f"Versuche Website sicher abzurufen: {url[:100]}... (verify=True)")
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=True, allow_redirects=True, stream=False)
response.raise_for_status() response.raise_for_status()
error_reason = None # Fehler wurde behoben error_reason = None
except Exception as e_fallback: except requests.exceptions.SSLError:
# Wenn auch der Fallback fehlschlägt, ist es ein anderer Fehler # Zweiter Versuch bei SSL-Fehler: Ohne Zertifikatsprüfung
error_reason = f"Fallback-Request fehlgeschlagen nach SSLError: {type(e_fallback).__name__}" logger.warning(f"SSL-Fehler fuer {url[:100]}... Versuche erneut mit verify=False.")
logger.error(f"{error_reason} fuer {url[:100]}") try:
except requests.exceptions.Timeout as e_timeout: response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=False, allow_redirects=True, stream=False)
error_reason = f"Timeout ({getattr(Config, 'REQUEST_TIMEOUT', 20)}s)" response.raise_for_status()
logger.warning(f"{error_reason} fuer {url[:100]}...") error_reason = None # Fehler wurde behoben
except requests.exceptions.ConnectionError as e_conn: except Exception as e_fallback:
error_reason = f"Connection Error: {str(e_conn)[:100]}..." # Wenn auch der Fallback fehlschlägt, ist es ein anderer Fehler
logger.warning(f"{error_reason} fuer {url[:100]}...") error_reason = f"Fallback-Request fehlgeschlagen nach SSLError: {type(e_fallback).__name__}"
if "[Errno -2]" in str(e_conn) or "[Errno -3]" in str(e_conn) or "[Errno 111]" in str(e_conn) or "[Errno 113]" in str(e_conn) or "Failed to establish" in str(e_conn): logger.error(f"{error_reason} fuer {url[:100]}")
return_marker = True except requests.exceptions.Timeout as e_timeout:
except requests.exceptions.HTTPError as e_http: error_reason = f"Timeout ({getattr(Config, 'REQUEST_TIMEOUT', 20)}s)"
status_code = e_http.response.status_code logger.warning(f"{error_reason} fuer {url[:100]}...")
error_reason = f"HTTP Error {status_code} ({e_http.response.reason})" except requests.exceptions.ConnectionError as e_conn:
logger.warning(f"{error_reason} fuer {url[:100]}...") error_reason = f"Connection Error: {str(e_conn)[:100]}..."
if status_code == 404: logger.warning(f"{error_reason} fuer {url[:100]}...")
return_marker = True if "[Errno -2]" in str(e_conn) or "[Errno -3]" in str(e_conn) or "[Errno 111]" in str(e_conn) or "[Errno 113]" in str(e_conn) or "Failed to establish" in str(e_conn):
except Exception as e_gen: return_marker = True
error_reason = f"Allg. Fehler: {type(e_gen).__name__} - {str(e_gen)[:100]}..." except requests.exceptions.HTTPError as e_http:
logger.error(f"Allgemeiner Fehler beim Abrufen von {url[:100]}...: {e_gen}") status_code = e_http.response.status_code
logger.debug(traceback.format_exc()) error_reason = f"HTTP Error {status_code} ({e_http.response.reason})"
logger.warning(f"{error_reason} fuer {url[:100]}...")
if status_code == 404:
return_marker = True
except Exception as e_gen:
error_reason = f"Allg. Fehler: {type(e_gen).__name__} - {str(e_gen)[:100]}..."
logger.error(f"Allgemeiner Fehler beim Abrufen von {url[:100]}...: {e_gen}")
logger.debug(traceback.format_exc())
if return_marker: if return_marker:
logger.warning(f"Markiere URL {url[:100]}... zur erneuten Prüfung (Grund: {error_reason}).") logger.warning(f"Markiere URL {url[:100]}... zur erneuten Prüfung (Grund: {error_reason}).")
return URL_CHECK_MARKER return URL_CHECK_MARKER
elif response is None or error_reason: elif response is None or error_reason:
return f"k.A. ({error_reason})" return f"k.A. ({error_reason})"
try: try:
response.encoding = response.apparent_encoding response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser'))
content_selectors = ['main', 'article', '#content', '#main-content', '.main-content', '.content', 'div[role="main"]', 'div.page-content', 'div.container'] content_selectors = ['main', 'article', '#content', '#main-content', '.main-content', '.content', 'div[role="main"]', 'div.page-content', 'div.container']
content_area = None content_area = None
for selector in content_selectors: for selector in content_selectors:
content_area = soup.select_one(selector) content_area = soup.select_one(selector)
if content_area: break if content_area: break
if not content_area:
content_area = soup.find('body')
if content_area:
banner_selectors = ['[id*="cookie"]', '[class*="cookie"]', '[id*="consent"]', '[class*="consent"]', '.cookie-banner', '.consent-banner', '.modal', '#modal', '.popup', '#popup', '[role="dialog"]', '[aria-modal="true"]']
banners_removed_count = 0
for selector in banner_selectors:
try:
potential_banners = content_area.select(selector)
for banner in potential_banners:
banner_text = banner.get_text(" ", strip=True).lower()
keywords = ["cookie", "zustimm", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "analyse", "marketing"]
element_id_class = (banner.get('id', '') + ' ' + ' '.join(banner.get('class', []))).lower()
if any(keyword in banner_text for keyword in keywords) or any(keyword in element_id_class for keyword in keywords):
banner.decompose()
banners_removed_count += 1
except Exception as e_select:
logger.debug(f"Fehler beim Versuch Banner mit Selektor '{selector}' zu entfernen: {e_select}")
if banners_removed_count > 0:
logger.debug(f"{banners_removed_count} potenzielle Banner-Elemente fuer {url[:100]}... entfernt.")
if not content_area:
content_area = soup.find('body')
if content_area: if content_area:
banner_selectors = ['[id*="cookie"]', '[class*="cookie"]', '[id*="consent"]', '[class*="consent"]', '.cookie-banner', '.consent-banner', '.modal', '#modal', '.popup', '#popup', '[role="dialog"]', '[aria-modal="true"]'] for script_or_style in content_area(["script", "style"]):
banners_removed_count = 0 script_or_style.decompose()
for selector in banner_selectors:
try:
potential_banners = content_area.select(selector)
for banner in potential_banners:
banner_text = banner.get_text(" ", strip=True).lower()
keywords = ["cookie", "zustimm", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "analyse", "marketing"]
element_id_class = (banner.get('id', '') + ' ' + ' '.join(banner.get('class', []))).lower()
if any(keyword in banner_text for keyword in keywords) or any(keyword in element_id_class for keyword in keywords):
banner.decompose()
banners_removed_count += 1
except Exception as e_select:
logger.debug(f"Fehler beim Versuch Banner mit Selektor '{selector}' zu entfernen: {e_select}")
if banners_removed_count > 0:
logger.debug(f"{banners_removed_count} potenzielle Banner-Elemente fuer {url[:100]}... entfernt.")
if content_area: text = content_area.get_text(separator=' ', strip=True)
for script_or_style in content_area(["script", "style"]): text = re.sub(r'\s+', ' ', text).strip()
script_or_style.decompose()
text = content_area.get_text(separator=' ', strip=True) banner_keywords_strict = ["cookie", "zustimmen", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "marketing"]
text = re.sub(r'\s+', ' ', text).strip() text_lower = text.lower()
keyword_hits = sum(1 for keyword in banner_keywords_strict if keyword in text_lower)
banner_keywords_strict = ["cookie", "zustimmen", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "marketing"] if len(text) < 500 and keyword_hits >= 3:
text_lower = text.lower() logger.warning(f"WARNUNG: Extrahierter Text fuer {url[:100]}... scheint nur Cookie-Banner zu sein (Laenge {len(text)}, {keyword_hits} Keywords). Verwerfe Text.")
keyword_hits = sum(1 for keyword in banner_keywords_strict if keyword in text_lower) return "k.A. (Nur Cookie-Banner erkannt)"
if len(text) < 500 and keyword_hits >= 3: result = text[:max_length]
logger.warning(f"WARNUNG: Extrahierter Text fuer {url[:100]}... scheint nur Cookie-Banner zu sein (Laenge {len(text)}, {keyword_hits} Keywords). Verwerfe Text.") logger.debug(f"Website {url[:100]}... erfolgreich gescrapt. Extrahierter Text (Laenge {len(result)}).")
return "k.A. (Nur Cookie-Banner erkannt)" return result if result else "k.A. (Extraktion leer)"
else:
logger.warning(f"Kein <body> oder spezifischer Inhaltsbereich gefunden in {url[:100]}...")
return "k.A. (Kein Body gefunden)"
result = text[:max_length] except Exception as e_parse:
logger.debug(f"Website {url[:100]}... erfolgreich gescrapt. Extrahierter Text (Laenge {len(result)}).") logger.error(f"Fehler beim Parsen von HTML von {url[:100]}...: {type(e_parse).__name__} - {e_parse}")
return result if result else "k.A. (Extraktion leer)" logger.debug(traceback.format_exc())
else: return f"k.A. (Fehler Parsing: {str(e_parse)[:50]}...)"
logger.warning(f"Kein <body> oder spezifischer Inhaltsbereich gefunden in {url[:100]}...")
return "k.A. (Kein Body gefunden)"
except Exception as e_parse: # KORREKTUR: Allumfassender Catch-Block für alle restlichen Fehler
logger.error(f"Fehler beim Parsen von HTML von {url[:100]}...: {type(e_parse).__name__} - {e_parse}") except requests.exceptions.RequestException as e:
logger.debug(traceback.format_exc()) # Extrahiere Status-Code, falls vorhanden
return f"k.A. (Fehler Parsing: {str(e_parse)[:50]}...)" status_code = e.response.status_code if e.response is not None else "N/A"
error_msg = f"k.A. (Netzwerkfehler: {type(e).__name__}, Status: {status_code})"
logger.warning(f"{error_msg} für URL {url[:100]}")
return error_msg
except Exception as e:
error_msg = f"k.A. (Allg. Fehler: {type(e).__name__})"
logger.error(f"{error_msg} für URL {url[:100]}", exc_info=False)
return error_msg
def scrape_website_details(url): def scrape_website_details(url):