from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware import pandas as pd import io from pydantic import BaseModel from typing import Dict, List app = FastAPI() # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # --- In-memory Storage --- df_storage = None plz_column_name = None # --- Dummy Geocoding Data (IMPORTANT: TO BE REPLACED) --- # This is a tiny subset of German postal codes for demonstration purposes only. # A real implementation MUST load a comprehensive PLZ dataset from a file # (e.g., a CSV or GeoJSON file) for the application to be useful. PLZ_COORDINATES = { "10115": {"lat": 52.53, "lon": 13.38}, # Berlin "20095": {"lat": 53.55, "lon": 9.99}, # Hamburg "80331": {"lat": 48.13, "lon": 11.57}, # Munich "50667": {"lat": 50.93, "lon": 6.95}, # Cologne "60311": {"lat": 50.11, "lon": 8.68}, # Frankfurt } # --- Pydantic Models --- class FilterRequest(BaseModel): filters: Dict[str, List[str]] # --- API Endpoints --- @app.get("/") def read_root(): return {"message": "Heatmap Tool Backend"} @app.post("/api/upload") async def upload_file(file: UploadFile = File(...)): global df_storage, plz_column_name print(f"--- Received request to /api/upload for file: {file.filename} ---") if not file.filename.endswith('.xlsx'): raise HTTPException(status_code=400, detail="Invalid file format. Please upload an .xlsx file.") try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents), dtype=str) # Read all as string to be safe df.fillna('N/A', inplace=True) # --- PLZ Column Detection --- temp_plz_col = None for col in df.columns: if 'plz' in col.lower(): temp_plz_col = col break if not temp_plz_col: raise HTTPException(status_code=400, detail="No column with 'PLZ' found in the file.") plz_column_name = temp_plz_col # Normalize PLZ data df[plz_column_name] = df[plz_column_name].str.strip().str.zfill(5) # --- Dynamic Filter Detection --- filters = {} for col in df.columns: if col != plz_column_name: unique_values = df[col].unique().tolist() filters[col] = sorted(unique_values) df_storage = df print(f"Successfully processed file. Found PLZ column: '{plz_column_name}'. Detected {len(filters)} filterable columns.") return {"filename": file.filename, "filters": filters, "plz_column": plz_column_name} except Exception as e: print(f"ERROR processing file: {e}") raise HTTPException(status_code=500, detail=f"An error occurred while processing the file: {e}") @app.post("/api/heatmap") async def get_heatmap_data(request: FilterRequest): global df_storage, plz_column_name print(f"--- Received request to /api/heatmap with filters: {request.filters} ---") if df_storage is None: print("ERROR: No data in df_storage. File must be uploaded first.") raise HTTPException(status_code=404, detail="No data available. Please upload a file first.") try: filtered_df = df_storage.copy() # Apply filters from the request for column, values in request.filters.items(): if values: # Only filter if there are values selected filtered_df = filtered_df[filtered_df[column].isin(values)] if filtered_df.empty: return [] # Aggregate data by PLZ plz_counts = filtered_df.groupby(plz_column_name).size().reset_index(name='count') # --- Geocoding Step --- # In a real app, this would be a merge/join with a proper geo dataset heatmap_data = [] for _, row in plz_counts.iterrows(): plz = row[plz_column_name] coords = PLZ_COORDINATES.get(plz) if coords: heatmap_data.append({ "plz": plz, "lat": coords["lat"], "lon": coords["lon"], "count": row["count"] }) print(f"Generated heatmap data with {len(heatmap_data)} PLZ points.") return heatmap_data except Exception as e: print(f"ERROR generating heatmap: {e}") raise HTTPException(status_code=500, detail=f"An error occurred while generating heatmap data: {e}")