124 lines
4.0 KiB
Python
124 lines
4.0 KiB
Python
from fastapi import FastAPI, File, UploadFile, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
import pandas as pd
|
|
import io
|
|
from pydantic import BaseModel
|
|
from typing import Dict, List
|
|
|
|
app = FastAPI()
|
|
|
|
# Configure CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Allows all origins
|
|
allow_credentials=True,
|
|
allow_methods=["*"], # Allows all methods
|
|
allow_headers=["*"], # Allows all headers
|
|
)
|
|
|
|
# --- In-memory Storage ---
|
|
df_storage = None
|
|
plz_column_name = None
|
|
|
|
# --- Dummy Geocoding Data (IMPORTANT: TO BE REPLACED) ---
|
|
# This is a tiny subset of German postal codes for demonstration purposes only.
|
|
# A real implementation MUST load a comprehensive PLZ dataset from a file
|
|
# (e.g., a CSV or GeoJSON file) for the application to be useful.
|
|
PLZ_COORDINATES = {
|
|
"10115": {"lat": 52.53, "lon": 13.38}, # Berlin
|
|
"20095": {"lat": 53.55, "lon": 9.99}, # Hamburg
|
|
"80331": {"lat": 48.13, "lon": 11.57}, # Munich
|
|
"50667": {"lat": 50.93, "lon": 6.95}, # Cologne
|
|
"60311": {"lat": 50.11, "lon": 8.68}, # Frankfurt
|
|
}
|
|
|
|
# --- Pydantic Models ---
|
|
class FilterRequest(BaseModel):
|
|
filters: Dict[str, List[str]]
|
|
|
|
# --- API Endpoints ---
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"message": "Heatmap Tool Backend"}
|
|
|
|
@app.post("/api/upload")
|
|
async def upload_file(file: UploadFile = File(...)):
|
|
global df_storage, plz_column_name
|
|
if not file.filename.endswith('.xlsx'):
|
|
raise HTTPException(status_code=400, detail="Invalid file format. Please upload an .xlsx file.")
|
|
|
|
try:
|
|
contents = await file.read()
|
|
df = pd.read_excel(io.BytesIO(contents), dtype=str) # Read all as string to be safe
|
|
df.fillna('N/A', inplace=True)
|
|
|
|
|
|
# --- PLZ Column Detection ---
|
|
temp_plz_col = None
|
|
for col in df.columns:
|
|
if 'plz' in col.lower():
|
|
temp_plz_col = col
|
|
break
|
|
|
|
if not temp_plz_col:
|
|
raise HTTPException(status_code=400, detail="No column with 'PLZ' found in the file.")
|
|
|
|
plz_column_name = temp_plz_col
|
|
# Normalize PLZ data
|
|
df[plz_column_name] = df[plz_column_name].str.strip().str.zfill(5)
|
|
|
|
|
|
# --- Dynamic Filter Detection ---
|
|
filters = {}
|
|
for col in df.columns:
|
|
if col != plz_column_name:
|
|
unique_values = df[col].unique().tolist()
|
|
filters[col] = sorted(unique_values)
|
|
|
|
df_storage = df
|
|
|
|
return {"filename": file.filename, "filters": filters, "plz_column": plz_column_name}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"An error occurred while processing the file: {e}")
|
|
|
|
|
|
@app.post("/api/heatmap")
|
|
async def get_heatmap_data(request: FilterRequest):
|
|
global df_storage, plz_column_name
|
|
if df_storage is None:
|
|
raise HTTPException(status_code=404, detail="No data available. Please upload a file first.")
|
|
|
|
try:
|
|
filtered_df = df_storage.copy()
|
|
|
|
# Apply filters from the request
|
|
for column, values in request.filters.items():
|
|
if values: # Only filter if there are values selected
|
|
filtered_df = filtered_df[filtered_df[column].isin(values)]
|
|
|
|
if filtered_df.empty:
|
|
return []
|
|
|
|
# Aggregate data by PLZ
|
|
plz_counts = filtered_df.groupby(plz_column_name).size().reset_index(name='count')
|
|
|
|
# --- Geocoding Step ---
|
|
# In a real app, this would be a merge/join with a proper geo dataset
|
|
heatmap_data = []
|
|
for _, row in plz_counts.iterrows():
|
|
plz = row[plz_column_name]
|
|
coords = PLZ_COORDINATES.get(plz)
|
|
if coords:
|
|
heatmap_data.append({
|
|
"plz": plz,
|
|
"lat": coords["lat"],
|
|
"lon": coords["lon"],
|
|
"count": row["count"]
|
|
})
|
|
|
|
return heatmap_data
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"An error occurred while generating heatmap data: {e}")
|