Files
Brancheneinstufung2/connector-superoffice/generate_customer_product_report.py

126 lines
5.4 KiB
Python

import os
import csv
import logging
import requests
import json
from dotenv import load_dotenv
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("customer_product_report")
OUTPUT_FILE = 'product_report.csv'
SALE_LIMIT = 1000 # Process the top 1000 most recently updated sales
PRODUCT_KEYWORDS = [
'OMNIE', 'CD-01', 'RR-02-017', 'Service', 'Dienstleistung',
'Wartung', 'Support', 'Installation', 'Beratung'
]
# --- Auth & API Client Classes (from previous scripts) ---
class AuthHandler:
def __init__(self):
load_dotenv(override=True)
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
self.client_secret = os.getenv("SO_CLIENT_SECRET")
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
self.redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
self.env = os.getenv("SO_ENVIRONMENT", "sod")
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
if not all([self.client_id, self.client_secret, self.refresh_token]):
raise ValueError("SuperOffice credentials missing in .env file.")
def get_access_token(self):
return self._refresh_access_token()
def _refresh_access_token(self):
token_domain = "online.superoffice.com" if "online" in self.env.lower() else "sod.superoffice.com"
url = f"https://{token_domain}/login/common/oauth/tokens"
data = {"grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, "redirect_uri": self.redirect_uri}
try:
resp = requests.post(url, data=data)
resp.raise_for_status()
return resp.json().get("access_token")
except requests.RequestException as e:
logger.error(f"❌ Connection Error during token refresh: {e}")
return None
class SuperOfficeClient:
def __init__(self, auth_handler):
self.auth_handler = auth_handler
self.base_url = f"https://{self.auth_handler.env}.superoffice.com/{self.auth_handler.cust_id}/api/v1"
self.access_token = self.auth_handler.get_access_token()
if not self.access_token:
raise Exception("Failed to obtain access token.")
self.headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", "Accept": "application/json"}
def _get(self, endpoint):
url = f"{self.base_url}/{endpoint}"
logger.debug(f"GET: {url}")
resp = requests.get(url, headers=self.headers)
if resp.status_code == 204: return None
resp.raise_for_status()
return resp.json()
def find_keywords(text):
"""Searches for keywords in a given text, case-insensitively."""
found = []
if not text:
return found
text_lower = text.lower()
for keyword in PRODUCT_KEYWORDS:
if keyword.lower() in text_lower:
found.append(keyword)
return found
def main():
logger.info("--- Starting Customer Product Report Generation ---")
try:
auth = AuthHandler()
client = SuperOfficeClient(auth)
# 1. Fetch the most recently updated sales
logger.info(f"Fetching the last {SALE_LIMIT} updated sales...")
# OData query to get the top N sales that have a contact associated
sales_endpoint = f"Sale?$filter=Contact ne null&$orderby=saleId desc&$top={SALE_LIMIT}&$select=SaleId,Heading,Contact"
sales_response = client._get(sales_endpoint)
if not sales_response or 'value' not in sales_response:
logger.warning("No sales with associated contacts found.")
return
sales = sales_response['value']
logger.info(f"Found {len(sales)} sales with associated contacts to process.")
# Removed the debug log to avoid excessive output of the same data
# 2. Process each sale and write to CSV
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['SaleID', 'CustomerName', 'SaleHeading', 'DetectedKeywords']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for sale in sales:
if not sale.get('Contact') or not sale['Contact'].get('ContactId'):
logger.warning(f"Skipping Sale {sale.get('SaleId')} because it has no linked Contact.")
continue
sale_id = sale.get('SaleId')
heading = sale.get('Heading', 'N/A')
customer_name = sale['Contact'].get('Name', 'N/A')
# Find keywords in the heading
keywords_found = find_keywords(heading)
writer.writerow({
'SaleID': sale_id,
'CustomerName': customer_name,
'SaleHeading': heading,
'DetectedKeywords': ', '.join(keywords_found) if keywords_found else 'None'
})
logger.info(f"--- ✅ Report generation complete. ---")
logger.info(f"Results saved to '{OUTPUT_FILE}'.")
except requests.exceptions.HTTPError as e:
logger.error(f"❌ API Error: {e.response.status_code} - {e.response.text}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
if __name__ == "__main__":
main()