126 lines
5.4 KiB
Python
126 lines
5.4 KiB
Python
import os
|
|
import csv
|
|
import logging
|
|
import requests
|
|
import json
|
|
from dotenv import load_dotenv
|
|
|
|
# --- Configuration ---
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("customer_product_report")
|
|
OUTPUT_FILE = 'product_report.csv'
|
|
SALE_LIMIT = 1000 # Process the top 1000 most recently updated sales
|
|
PRODUCT_KEYWORDS = [
|
|
'OMNIE', 'CD-01', 'RR-02-017', 'Service', 'Dienstleistung',
|
|
'Wartung', 'Support', 'Installation', 'Beratung'
|
|
]
|
|
|
|
# --- Auth & API Client Classes (from previous scripts) ---
|
|
class AuthHandler:
|
|
def __init__(self):
|
|
load_dotenv(override=True)
|
|
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
|
|
self.client_secret = os.getenv("SO_CLIENT_SECRET")
|
|
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
|
|
self.redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
|
|
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
|
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
|
|
if not all([self.client_id, self.client_secret, self.refresh_token]):
|
|
raise ValueError("SuperOffice credentials missing in .env file.")
|
|
def get_access_token(self):
|
|
return self._refresh_access_token()
|
|
def _refresh_access_token(self):
|
|
token_domain = "online.superoffice.com" if "online" in self.env.lower() else "sod.superoffice.com"
|
|
url = f"https://{token_domain}/login/common/oauth/tokens"
|
|
data = {"grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, "redirect_uri": self.redirect_uri}
|
|
try:
|
|
resp = requests.post(url, data=data)
|
|
resp.raise_for_status()
|
|
return resp.json().get("access_token")
|
|
except requests.RequestException as e:
|
|
logger.error(f"❌ Connection Error during token refresh: {e}")
|
|
return None
|
|
|
|
class SuperOfficeClient:
|
|
def __init__(self, auth_handler):
|
|
self.auth_handler = auth_handler
|
|
self.base_url = f"https://{self.auth_handler.env}.superoffice.com/{self.auth_handler.cust_id}/api/v1"
|
|
self.access_token = self.auth_handler.get_access_token()
|
|
if not self.access_token:
|
|
raise Exception("Failed to obtain access token.")
|
|
self.headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", "Accept": "application/json"}
|
|
def _get(self, endpoint):
|
|
url = f"{self.base_url}/{endpoint}"
|
|
logger.debug(f"GET: {url}")
|
|
resp = requests.get(url, headers=self.headers)
|
|
if resp.status_code == 204: return None
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def find_keywords(text):
|
|
"""Searches for keywords in a given text, case-insensitively."""
|
|
found = []
|
|
if not text:
|
|
return found
|
|
text_lower = text.lower()
|
|
for keyword in PRODUCT_KEYWORDS:
|
|
if keyword.lower() in text_lower:
|
|
found.append(keyword)
|
|
return found
|
|
|
|
def main():
|
|
logger.info("--- Starting Customer Product Report Generation ---")
|
|
|
|
try:
|
|
auth = AuthHandler()
|
|
client = SuperOfficeClient(auth)
|
|
|
|
# 1. Fetch the most recently updated sales
|
|
logger.info(f"Fetching the last {SALE_LIMIT} updated sales...")
|
|
# OData query to get the top N sales that have a contact associated
|
|
sales_endpoint = f"Sale?$filter=Contact ne null&$orderby=saleId desc&$top={SALE_LIMIT}&$select=SaleId,Heading,Contact"
|
|
sales_response = client._get(sales_endpoint)
|
|
|
|
if not sales_response or 'value' not in sales_response:
|
|
logger.warning("No sales with associated contacts found.")
|
|
return
|
|
|
|
sales = sales_response['value']
|
|
logger.info(f"Found {len(sales)} sales with associated contacts to process.")
|
|
# Removed the debug log to avoid excessive output of the same data
|
|
|
|
# 2. Process each sale and write to CSV
|
|
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile:
|
|
fieldnames = ['SaleID', 'CustomerName', 'SaleHeading', 'DetectedKeywords']
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for sale in sales:
|
|
if not sale.get('Contact') or not sale['Contact'].get('ContactId'):
|
|
logger.warning(f"Skipping Sale {sale.get('SaleId')} because it has no linked Contact.")
|
|
continue
|
|
|
|
sale_id = sale.get('SaleId')
|
|
heading = sale.get('Heading', 'N/A')
|
|
customer_name = sale['Contact'].get('Name', 'N/A')
|
|
|
|
# Find keywords in the heading
|
|
keywords_found = find_keywords(heading)
|
|
|
|
writer.writerow({
|
|
'SaleID': sale_id,
|
|
'CustomerName': customer_name,
|
|
'SaleHeading': heading,
|
|
'DetectedKeywords': ', '.join(keywords_found) if keywords_found else 'None'
|
|
})
|
|
|
|
logger.info(f"--- ✅ Report generation complete. ---")
|
|
logger.info(f"Results saved to '{OUTPUT_FILE}'.")
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(f"❌ API Error: {e.response.status_code} - {e.response.text}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
|
|
|
|
if __name__ == "__main__":
|
|
main() |