helpers.py aktualisiert

This commit is contained in:
2025-07-20 05:44:56 +00:00
parent 018b8acbdc
commit b1efeb3d8c

View File

@@ -1343,110 +1343,102 @@ def search_linkedin_contacts(company_name, website, position_query, crm_kurzform
# 11. WEBSITE SCRAPING & VALIDATION UTILITIES # 11. WEBSITE SCRAPING & VALIDATION UTILITIES
# ============================================================================== # ==============================================================================
def get_website_raw(url, max_length=20000): def get_website_raw(url, max_length=30000, verify_cert=False):
""" """
Holt Textinhalt von einer Website, versucht Cookie-Banner zu umgehen. Holt Textinhalt von einer Website mit intelligenter Inhalts- und Banner-Erkennung.
Versucht zuerst eine sichere Verbindung, bei SSL-Fehler einen unsicheren Fallback. Basiert auf der bewährten Logik aus v1.7.9.
GEHÄRTETE VERSION: Gibt unter allen Umständen einen String zurück.
""" """
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or url.strip().lower() in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]:
# --- START: Allumfassender Try-Block --- logger.debug(f"get_website_raw übersprungen: Ungültige URL '{url}'.")
return "k.A."
if not url.lower().startswith(("http://", "https://")):
url = "https://" + url
headers = {"User-Agent": random.choice(USER_AGENTS)}
response = None
error_reason = "Unbekannter Fehler"
return_marker = False
try: try:
if not url or not isinstance(url, str) or url.strip().lower() in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]: logger.debug(f"Versuche Website abzurufen: {url[:100]}... (verify={verify_cert})")
logger.debug(f"get_website_raw skipped: Ungueltige oder leere URL '{url}'.") response = requests.get(
return "k.A. (Ungueltige URL)" url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers,
verify=verify_cert, allow_redirects=True, stream=False
if not url.lower().startswith(("http://", "https://")): )
url = "https://" + url response.raise_for_status()
headers = {"User-Agent": random.choice(USER_AGENTS)}
response = None
error_reason = None error_reason = None
return_marker = False except requests.exceptions.SSLError as e_ssl:
error_reason = f"SSL Fehler: {str(e_ssl)[:100]}..."
logger.warning(f"SSL Fehler für {url[:100]}...: {e_ssl}")
except requests.exceptions.Timeout:
error_reason = f"Timeout ({getattr(Config, 'REQUEST_TIMEOUT', 20)}s)"
logger.warning(f"{error_reason} für {url[:100]}...")
except requests.exceptions.ConnectionError as e_conn:
error_reason = f"Connection Error: {str(e_conn)[:100]}..."
logger.warning(f"{error_reason} für {url[:100]}...")
if any(err_str in str(e_conn) for err_str in ["[Errno -2]", "[Errno -3]", "[Errno 111]", "[Errno 113]", "Failed to establish"]):
return_marker = True
except requests.exceptions.HTTPError as e_http:
error_reason = f"HTTP Error {e_http.response.status_code}"
logger.warning(f"{error_reason} für {url[:100]}...")
if e_http.response.status_code == 404:
return_marker = True
except Exception as e_gen:
error_reason = f"Allg. Fehler: {type(e_gen).__name__}"
logger.error(f"Allgemeiner Fehler beim Abrufen von {url[:100]}...: {e_gen}", exc_info=True)
# --- IHR KOMPLETTER, BEWÄHRTER REQUEST-BLOCK --- if return_marker:
try: logger.warning(f"Markiere URL {url[:100]}... zur erneuten Prüfung (Grund: {error_reason}).")
logger.debug(f"Versuche Website sicher abzurufen: {url[:100]}... (verify=True)") return URL_CHECK_MARKER
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=True, allow_redirects=True, stream=False) elif response is None or error_reason:
response.raise_for_status() return f"k.A. ({error_reason})"
except requests.exceptions.SSLError:
logger.warning(f"SSL-Fehler fuer {url[:100]}... Versuche erneut mit verify=False.")
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=False, allow_redirects=True, stream=False)
response.raise_for_status()
# Der restliche Teil dieses Blocks wird nun vom äußeren except gefangen
except requests.exceptions.Timeout as e_timeout:
raise e_timeout # Wirft den Fehler, damit der äußere Block ihn fängt
except requests.exceptions.ConnectionError as e_conn:
raise e_conn
except requests.exceptions.HTTPError as e_http:
raise e_http
except Exception as e_gen:
raise e_gen
# --- IHR KOMPLETTE, BEWÄHRTE HTML-PARSING-LOGIK --- try:
# Erfolgreiche Antwort verarbeiten
response.encoding = response.apparent_encoding response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser'))
content_selectors = ['main', 'article', '#content', '#main-content', '.main-content', '.content', 'div[role="main"]', 'div.page-content', 'div.container']
content_area = None # Intelligente Inhalts-Selektion
content_selectors = ['main', 'article', '#content', '.content', 'div[role="main"]']
content_area = soup.find('body') # Fallback auf body
for selector in content_selectors: for selector in content_selectors:
content_area = soup.select_one(selector) selected_area = soup.select_one(selector)
if content_area: break if selected_area:
content_area = selected_area
if not content_area: logger.debug(f"Haupt-Inhaltsbereich mit '{selector}' gefunden.")
content_area = soup.find('body') break
# KORREKTE EINRÜCKUNG if not content_area:
if content_area: logger.warning(f"Kein <body> oder spezifischer Inhaltsbereich in {url[:100]}... gefunden.")
banner_selectors = ['[id*="cookie"]', '[class*="cookie"]', '[id*="consent"]', '[class*="consent"]', '.cookie-banner', '.consent-banner', '.modal', '#modal', '.popup', '#popup', '[role="dialog"]', '[aria-modal="true"]']
banners_removed_count = 0
for selector in banner_selectors:
try:
potential_banners = content_area.select(selector)
for banner in potential_banners:
banner_text = banner.get_text(" ", strip=True).lower()
keywords = ["cookie", "zustimm", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "analyse", "marketing"]
element_id_class = (banner.get('id', '') + ' ' + ' '.join(banner.get('class', []))).lower()
if any(keyword in banner_text for keyword in keywords) or any(keyword in element_id_class for keyword in keywords):
banner.decompose()
banners_removed_count += 1
except Exception as e_select:
logger.debug(f"Fehler beim Versuch Banner mit Selektor '{selector}' zu entfernen: {e_select}")
if banners_removed_count > 0:
logger.debug(f"{banners_removed_count} potenzielle Banner-Elemente fuer {url[:100]}... entfernt.")
if content_area:
banner_selectors = ['[id*="cookie"]', '[class*="cookie"]', '[id*="consent"]', '[class*="consent"]', '.cookie-banner', '.consent-banner', '.modal', '#modal', '.popup', '#popup', '[role="dialog"]', '[aria-modal="true"]']
for selector in banner_selectors:
for banner in content_area.select(selector):
banner.decompose()
for script_or_style in content_area(["script", "style"]):
script_or_style.decompose()
text = content_area.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text).strip()
banner_keywords_strict = ["cookie", "zustimmen", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "marketing"]
if len(text) < 500 and sum(1 for keyword in banner_keywords_strict if keyword in text.lower()) >= 3:
logger.warning(f"WARNUNG: Extrahierter Text fuer {url[:100]}... scheint nur Cookie-Banner zu sein. Verwerfe Text.")
return "k.A. (Nur Cookie-Banner erkannt)"
result = text[:max_length]
logger.debug(f"Website {url[:100]}... erfolgreich gescrapt. Extrahierter Text (Laenge {len(result)}).")
return result if result else "k.A. (Extraktion leer)"
else:
logger.warning(f"Kein <body> oder spezifischer Inhaltsbereich gefunden in {url[:100]}...")
return "k.A. (Kein Body gefunden)" return "k.A. (Kein Body gefunden)"
# --- ENDE: Allumfassender Catch-Block, der JEDEN Fehler abfängt --- # Intelligente Banner-Entfernung
except requests.exceptions.HTTPError as e: banner_selectors = ['[id*="cookie"]', '[class*="cookie"]', '[id*="consent"]', '[class*="consent"]', '[role="dialog"]']
return f"k.A. (HTTP Fehler {e.response.status_code})" for selector in banner_selectors:
except requests.exceptions.RequestException as e: for banner in content_area.select(selector):
return f"k.A. (Netzwerkfehler: {type(e).__name__})" banner.decompose()
except Exception as e:
logger.error(f"Unerwarteter Parsing-Fehler in get_website_raw fuer {url[:100]}: {e}", exc_info=False) # Text extrahieren und säubern
return f"k.A. (Allg. Fehler: {type(e).__name__})" text = content_area.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text).strip()
# Finale Prüfung auf reinen Banner-Inhalt
banner_keywords_strict = ["cookie", "zustimmen", "ablehnen", "datenschutz", "privacy", "akzeptieren"]
text_lower_sample = text[:1000].lower() # Prüfe nur den Anfang des Textes
keyword_hits = sum(1 for keyword in banner_keywords_strict if keyword in text_lower_sample)
if len(text) < 500 and keyword_hits >= 2:
logger.warning(f"Text für {url[:100]}... scheint nur Cookie-Banner zu sein. Verwerfe Text.")
return "k.A. (Nur Cookie-Banner erkannt)"
logger.debug(f"Website {url[:100]}... erfolgreich gescrapt. Extrahierter Text (Länge {len(text)}).")
return text[:max_length] if text else "k.A. (Extraktion leer)"
except Exception as e_parse:
logger.error(f"Fehler beim Parsen von HTML von {url[:100]}...: {e_parse}", exc_info=True)
return f"k.A. (Fehler Parsing)"
def scrape_website_details(url): def scrape_website_details(url):
"""Extrahiert Meta-Details (Titel, Beschreibung, H1s) von einer URL. Ist gehärtet.""" """Extrahiert Meta-Details (Titel, Beschreibung, H1s) von einer URL. Ist gehärtet."""