Files
Brancheneinstufung2/dev_session.py
Floke a841fac255 [2f488f42] Diesen Text sollte ich nicht selbst schreiben müssen.
Diesen Text sollte ich nicht selbst schreiben müssen.
2026-01-27 11:48:42 +00:00

951 lines
39 KiB
Python

import os
import requests
import json
import re
from typing import List, Dict, Optional, Tuple
from getpass import getpass
from dotenv import load_dotenv
import argparse
import shutil
load_dotenv()
# --- API Helper Functions ---
def find_database_by_title(token: str, title: str) -> Optional[str]:
"""Sucht nach einer Datenbank mit einem bestimmten Titel und gibt deren ID zurück."""
print(f"Suche nach Datenbank '{title}' in Notion...")
url = "https://api.notion.com/v1/search"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
payload = {
"query": title,
"filter": {"value": "database", "property": "object"}
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
results = response.json().get("results", [])
for db in results:
db_title_parts = db.get("title", [])
if db_title_parts:
db_title = db_title_parts[0].get("plain_text", "")
if db_title.lower() == title.lower():
db_id = db["id"]
print(f"Datenbank '{title}' gefunden mit ID: {db_id}")
return db_id
print(f"Fehler: Keine Datenbank mit dem exakten Titel '{title}' gefunden.")
return None
except requests.exceptions.RequestException as e:
print(f"Fehler bei der Suche nach der Notion-Datenbank '{title}': {e}")
if e.response is not None:
try:
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
except:
print(f"Antwort des Servers: {e.response.text}")
return None
def query_notion_database(token: str, database_id: str, filter_payload: Dict = None) -> List[Dict]:
"""Fragt eine Notion-Datenbank ab und gibt eine Liste der Seiten zurück."""
url = f"https://api.notion.com/v1/databases/{database_id}/query"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
payload = {}
if filter_payload:
payload["filter"] = filter_payload
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json().get("results", [])
except requests.exceptions.RequestException as e:
print(f"Fehler bei der Abfrage der Notion-Datenbank {database_id}: {e}")
if e.response is not None:
try:
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
except:
print(f"Antwort des Servers: {e.response.text}")
return []
def get_page_title(page: Dict) -> str:
"""Extrahiert den Titel einer Notion-Seite."""
for prop_value in page.get("properties", {}).values():
if prop_value.get("type") == "title":
title_parts = prop_value.get("title", [])
if title_parts:
return title_parts[0].get("plain_text", "Unbenannt")
return "Unbenannt"
def get_page_property(page: Dict, prop_name: str, prop_type: str = "rich_text") -> Optional[str]:
"""Extrahiert den Inhalt einer bestimmten Eigenschaft (Property) von einer Seite."""
prop = page.get("properties", {}).get(prop_name)
if not prop:
return None
if prop_type == "rich_text" and prop.get("type") == "rich_text":
text_parts = prop.get("rich_text", [])
if text_parts:
return text_parts[0].get("plain_text")
# Hier könnten weitere Typen wie 'select', 'number' etc. behandelt werden
return None
def get_page_number_property(page: Dict, prop_name: str) -> Optional[float]:
"""Extrahiert den Wert einer 'number'-Eigenschaft von einer Seite."""
prop = page.get("properties", {}).get(prop_name)
if not prop or prop.get("type") != "number":
return None
return prop.get("number")
def decimal_hours_to_hhmm(decimal_hours: float) -> str:
"""Wandelt Dezimalstunden in das Format 'HH:MM' um."""
if decimal_hours is None:
return "00:00"
hours = int(decimal_hours)
minutes = int((decimal_hours * 60) % 60)
return f"{hours:02d}:{minutes:02d}"
def get_page_content(token: str, page_id: str) -> str:
"""Ruft den gesamten Textinhalt einer Notion-Seite ab, indem es die Blöcke zusammenfügt, mit Paginierung."""
url = f"https://api.notion.com/v1/blocks/{page_id}/children"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
full_text = []
next_cursor = None
has_more = True
try:
while has_more:
params = {"page_size": 100} # Max page size
if next_cursor:
params["start_cursor"] = next_cursor
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
blocks = data.get("results", [])
for block in blocks:
block_type = block["type"]
text_content = ""
if block_type in ["paragraph", "heading_1", "heading_2", "heading_3",
"bulleted_list_item", "numbered_list_item", "to_do", "callout"]:
rich_text_array = block[block_type].get("rich_text", [])
for rich_text in rich_text_array:
text_content += rich_text.get("plain_text", "")
elif block_type == "code":
rich_text_array = block["code"].get("rich_text", [])
for rich_text in rich_text_array:
text_content += rich_text.get("plain_text", "")
text_content = f"```\n{text_content}\n```" # Markdown für Codeblöcke
elif block_type == "unsupported":
text_content = "[Unsupported Block Type]"
if text_content:
# Füge grundlegende Formatierung für bessere Lesbarkeit hinzu
if block_type == "heading_1":
full_text.append(f"# {text_content}")
elif block_type == "heading_2":
full_text.append(f"## {text_content}")
elif block_type == "heading_3":
full_text.append(f"### {text_content}")
elif block_type == "bulleted_list_item":
full_text.append(f"- {text_content}")
elif block_type == "numbered_list_item":
full_text.append(f"1. {text_content}") # Einfache Nummerierung
elif block_type == "to_do":
checked = "[x]" if block["to_do"].get("checked") else "[ ]"
full_text.append(f"{checked} {text_content}")
elif block_type == "callout":
# Extrahiere Icon und Text
icon = block["callout"].get("icon", {}).get("emoji", "")
full_text.append(f"> {icon} {text_content}")
else: # paragraph und andere Standardtexte
full_text.append(text_content)
next_cursor = data.get("next_cursor")
has_more = data.get("has_more", False) and next_cursor
return "\n".join(full_text)
except requests.exceptions.RequestException as e:
print(f"Fehler beim Abrufen des Seiteninhalts für Page-ID {page_id}: {e}")
if e.response is not None:
try:
if e.response: # Wenn eine Antwort vorhanden ist
error_details = e.response.json()
print(f"Notion API Fehlerdetails: {json.dumps(error_details, indent=2)}")
except json.JSONDecodeError:
print(f"Notion API Rohantwort: {e.response.text}")
return ""
def get_database_status_options(token: str, db_id: str) -> List[str]:
"""Ruft die verfügbaren Status-Optionen für eine Datenbank-Eigenschaft ab."""
url = f"https://api.notion.com/v1/databases/{db_id}"
headers = {
"Authorization": f"Bearer {token}",
"Notion-Version": "2022-06-28"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
properties = response.json().get("properties", {})
status_property = properties.get("Status")
if status_property and status_property["type"] == "status":
return [option["name"] for option in status_property["status"]["options"]]
except requests.exceptions.RequestException as e:
print(f"Fehler beim Abrufen der Datenbank-Eigenschaften: {e}")
return []
def update_notion_task_property(token: str, task_id: str, payload: Dict) -> bool:
"""Aktualisiert eine beliebige Eigenschaft eines Notion-Tasks."""
print(f"\n--- Aktualisiere Eigenschaft von Task '{task_id}'... ---")
url = f"https://api.notion.com/v1/pages/{task_id}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
update_payload = {"properties": payload}
try:
response = requests.patch(url, headers=headers, json=update_payload)
response.raise_for_status()
print(f"✅ Task-Eigenschaft erfolgreich aktualisiert.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ FEHLER beim Aktualisieren der Task-Eigenschaft: {e}")
if e.response is not None:
try:
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
except:
print(f"Antwort des Servers: {e.response.text}")
return False
def create_new_notion_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]:
"""Erstellt einen neuen Task in Notion und verknüpft ihn mit dem Projekt."""
print("\n--- Neuen Task erstellen ---")
task_title = input("Bitte gib den Titel für den neuen Task ein: ")
if not task_title:
print("Kein Titel angegeben. Abbruch.")
return None
status_options = get_database_status_options(token, tasks_db_id)
if not status_options:
print("Fehler: Konnte keine Status-Optionen für die Task-Datenbank finden.")
return None
initial_status = status_options[0] # Nimm den ersten verfügbaren Status
url = "https://api.notion.com/v1/pages"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
payload = {
"parent": {"database_id": tasks_db_id},
"properties": {
"Name": {
"title": [{"text": {"content": task_title}}]
},
"Project": {
"relation": [{"id": project_id}]
},
"Status": {
"status": {"name": initial_status}
}
}
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
new_task = response.json()
print(f"✅ Task '{task_title}' erfolgreich erstellt.")
return new_task
except requests.exceptions.RequestException as e:
print(f"❌ FEHLER beim Erstellen des Tasks: {e}")
if e.response is not None:
try:
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
except:
print(f"Antwort des Servers: {e.response.text}")
return None
def add_comment_to_notion_task(token: str, task_id: str, comment: str) -> bool:
"""Fügt einen Kommentar zu einer Notion-Seite (Task) hinzu."""
url = "https://api.notion.com/v1/comments"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
payload = {
"parent": {"page_id": task_id},
"rich_text": [{
"text": {
"content": comment
}
}]
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
print(f"✅ Kommentar erfolgreich zum Notion-Task hinzugefügt.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ FEHLER beim Hinzufügen des Kommentars zum Notion-Task: {e}")
if e.response is not None:
try:
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
except:
print(f"Antwort des Servers: {e.response.text}")
return False
def append_blocks_to_notion_page(token: str, page_id: str, blocks: List[Dict]) -> bool:
"""Hängt Inhaltsblöcke an eine Notion-Seite an."""
url = f"https://api.notion.com/v1/blocks/{page_id}/children"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
payload = {"children": blocks}
try:
response = requests.patch(url, headers=headers, json=payload)
response.raise_for_status()
print(f"✅ Statusbericht erfolgreich an die Notion-Task-Seite angehängt.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ FEHLER beim Anhängen des Statusberichts an die Notion-Seite: {e}")
if e.response is not None:
try:
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
except:
print(f"Antwort des Servers: {e.response.text}")
return False
# --- Session Management ---
SESSION_DIR = ".dev_session"
SESSION_FILE_PATH = os.path.join(SESSION_DIR, "SESSION_INFO")
def install_git_hook():
"""Installiert das notion_commit_hook.py Skript als post-commit Git-Hook."""
git_hooks_dir = os.path.join(".git", "hooks")
post_commit_hook_path = os.path.join(git_hooks_dir, "post-commit")
source_hook_script = "notion_commit_hook.py"
if not os.path.exists(git_hooks_dir):
# Wahrscheinlich kein Git-Repository, also nichts tun
return
if not os.path.exists(source_hook_script):
print(f"Warnung: Hook-Skript {source_hook_script} nicht gefunden. Hook wird nicht installiert.")
return
try:
# Lese den Inhalt des Quellskripts
with open(source_hook_script, "r") as f:
hook_content = f.read()
# Schreibe den Inhalt in den post-commit Hook
with open(post_commit_hook_path, "w") as f:
f.write(hook_content)
# Mache den Hook ausführbar
os.chmod(post_commit_hook_path, 0o755)
print("✅ Git-Hook für Notion-Kommentare erfolgreich installiert.")
except (IOError, OSError) as e:
print(f"❌ FEHLER beim Installieren des Git-Hooks: {e}")
# --- UI Functions ---
def select_project(token: str) -> Optional[Tuple[Dict, Optional[str]]]:
"""Zeigt eine Liste der Projekte an und lässt den Benutzer eines auswählen."""
print("--- Lade Projekte aus Notion... ---")
projects_db_id = find_database_by_title(token, "Projects [UT]")
if not projects_db_id:
return None, None
projects = query_notion_database(token, projects_db_id)
if not projects:
print("Keine Projekte in der Datenbank gefunden.")
return None, None
print("\nAn welchem Projekt möchtest du arbeiten?")
for i, project in enumerate(projects):
print(f"[{i+1}] {get_page_title(project)}")
while True:
try:
choice = int(input("Bitte wähle eine Nummer: "))
if 1 <= choice <= len(projects):
selected_project = projects[choice - 1]
readme_path = get_page_property(selected_project, "Readme Path")
return selected_project, readme_path
else:
print("Ungültige Auswahl.")
except ValueError:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
def select_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]:
"""Zeigt eine Liste der Tasks für ein Projekt an und lässt den Benutzer auswählen."""
print("\n--- Lade Tasks für das ausgewählte Projekt... ---")
filter_payload = {
"property": "Project",
"relation": {"contains": project_id}
}
tasks = query_notion_database(token, tasks_db_id, filter_payload=filter_payload)
if not tasks:
print("Keine offenen Tasks für dieses Projekt gefunden.")
else:
print("\nWelchen Task möchtest du bearbeiten?")
for i, task in enumerate(tasks):
print(f"[{i+1}] {get_page_title(task)}")
print(f"[{len(tasks)+1}] Neuen Task für dieses Projekt erstellen")
while True:
try:
choice = int(input("Bitte wähle eine Nummer: "))
if 1 <= choice <= len(tasks):
return tasks[choice - 1]
elif choice == len(tasks) + 1:
return {"id": "new_task"} # Signal
else:
print("Ungültige Auswahl.")
except ValueError:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
import subprocess
from datetime import datetime
from zoneinfo import ZoneInfo # Für Zeitzonen-Handling
# Definiere die Zeitzone für Berlin
BERLIN_TZ = ZoneInfo("Europe/Berlin")
# --- Git Summary Generation ---
def generate_git_summary() -> Tuple[str, str]:
"""Generiert eine Zusammenfassung der Git-Änderungen und Commit-Nachrichten seit dem letzten Push zum Main-Branch."""
try:
# Finde den aktuellen Main-Branch Namen (master oder main)
try:
main_branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).decode("utf-8").strip()
if main_branch not in ["main", "master"]:
# Versuche, den Remote-Tracking-Branch für main/master zu finden
result = subprocess.run(["git", "branch", "-r"], capture_output=True, text=True)
if "origin/main" in result.stdout:
main_branch = "origin/main"
elif "origin/master" in result.stdout:
main_branch = "origin/master"
else:
print("Warnung: Konnte keinen 'main' oder 'master' Branch finden. Git-Zusammenfassung wird möglicherweise unvollständig sein.")
main_branch = "HEAD~1" # Fallback zum letzten Commit, falls kein Main-Branch gefunden wird
except subprocess.CalledProcessError:
main_branch = "HEAD~1" # Fallback, falls gar kein Branch gefunden wird
# Git diff --stat
diff_stat_cmd = ["git", "diff", "--stat", f"{main_branch}...HEAD"]
diff_stat = subprocess.check_output(diff_stat_cmd).decode("utf-8").strip()
# Git log --pretty
commit_log_cmd = ["git", "log", "--pretty=format:- %s", f"{main_branch}...HEAD"]
commit_messages = subprocess.check_output(commit_log_cmd).decode("utf-8").strip()
return diff_stat, commit_messages
except subprocess.CalledProcessError as e:
print(f"❌ FEHLER beim Generieren der Git-Zusammenfassung: {e}")
return "", ""
def git_push_with_retry() -> bool:
"""Versucht, Änderungen zu pushen, und führt bei einem non-fast-forward-Fehler einen Rebase und erneuten Push durch."""
print("\n--- Führe git push aus ---")
try:
subprocess.run(["git", "push"], check=True)
print("✅ Git push erfolgreich.")
return True
except subprocess.CalledProcessError as e:
if "non-fast-forward" in e.stderr.decode("utf-8"):
print("⚠️ Git push abgelehnt (non-fast-forward). Versuche git pull --rebase und erneuten Push...")
try:
subprocess.run(["git", "pull", "--rebase"], check=True)
print("✅ Git pull --rebase erfolgreich. Versuche erneuten Push...")
subprocess.run(["git", "push"], check=True)
print("✅ Git push nach Rebase erfolgreich.")
return True
except subprocess.CalledProcessError as pull_e:
print(f"❌ FEHLER bei git pull --rebase oder erneutem Push: {pull_e}")
print("Bitte löse Konflikte manuell und pushe dann.")
return False
else:
print(f"❌ FEHLER bei git push: {e}")
return False
except Exception as e:
print(f"❌ Unerwarteter Fehler bei git push: {e}")
return False
# --- Report Status to Notion ---
def report_status_to_notion(
status_override: Optional[str],
todos_override: Optional[str],
git_changes_override: Optional[str],
commit_messages_override: Optional[str],
summary_override: Optional[str]
) -> None:
"""
Erstellt einen Statusbericht für den Notion-Task, entweder interaktiv oder mit überschriebenen Werten.
"""
if not os.path.exists(SESSION_FILE_PATH):
print("❌ FEHLER: Keine aktive Session gefunden. Kann keinen Statusbericht erstellen.")
return
try:
with open(SESSION_FILE_PATH, "r") as f:
session_data = json.load(f)
task_id = session_data.get("task_id")
token = session_data.get("token")
if not (task_id and token):
print("❌ FEHLER: Session-Daten unvollständig. Kann keinen Statusbericht erstellen.")
return
# Time tracking logic
session_start_time_str = session_data.get("session_start_time")
if session_start_time_str:
session_start_time = datetime.fromisoformat(session_start_time_str)
elapsed_time = datetime.now() - session_start_time
elapsed_hours = elapsed_time.total_seconds() / 3600
# Get current task page to read existing duration
# Note: This is a simplified way. A more robust solution might query the DB
# to get the page object without a separate API call if we already have it.
# For now, a direct API call is clear and ensures we have the latest data.
task_page_url = f"https://api.notion.com/v1/pages/{task_id}"
headers = {
"Authorization": f"Bearer {token}",
"Notion-Version": "2022-06-28"
}
try:
page_response = requests.get(task_page_url, headers=headers)
page_response.raise_for_status()
task_page = page_response.json()
current_duration = get_page_number_property(task_page, "Total Duration (h)") or 0.0
new_total_duration = current_duration + elapsed_hours
duration_payload = {
"Total Duration (h)": {
"number": new_total_duration
}
}
update_notion_task_property(token, task_id, duration_payload)
print(f"✅ Zeiterfassung: {elapsed_hours:.2f} Stunden zum Task hinzugefügt. Neue Gesamtdauer: {new_total_duration:.2f} Stunden.")
# Reset session start time for the next interval
save_session_info(task_id, token)
except requests.exceptions.RequestException as e:
print(f"❌ FEHLER beim Abrufen der Task-Details für die Zeiterfassung: {e}")
print(f"--- Erstelle Statusbericht für Task {task_id} ---")
# Git-Zusammenfassung generieren (immer, wenn nicht explizit überschrieben)
actual_git_changes = git_changes_override
actual_commit_messages = commit_messages_override
if not git_changes_override or not commit_messages_override:
print("Generiere Git-Zusammenfassung...")
diff_stat, commit_log = generate_git_summary()
if not git_changes_override:
actual_git_changes = diff_stat
if not commit_messages_override:
actual_commit_messages = commit_log
# Status abfragen oder übernehmen
actual_status = status_override
if not actual_status:
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
if tasks_db_id:
status_options = get_database_status_options(token, tasks_db_id)
if status_options:
print("\nBitte wähle den neuen Status des Tasks:")
for i, option in enumerate(status_options):
print(f"[{i+1}] {option}")
while True:
try:
choice = int(input("Wähle eine Nummer: "))
if 1 <= choice <= len(status_options):
actual_status = status_options[choice - 1]
break
else:
print("Ungültige Auswahl.")
except ValueError:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
else:
print("❌ FEHLER: Konnte Status-Optionen nicht abrufen. Abbruch des Berichts.")
return
if not actual_status:
print("❌ FEHLER: Kein Status festgelegt. Abbruch des Berichts.")
return
# Detaillierte Zusammenfassung abfragen oder übernehmen
actual_summary = summary_override
if not actual_summary:
print("\nBitte gib eine Zusammenfassung der Arbeit ein (was wurde getan, Ergebnisse, Probleme etc.).")
user_summary_lines = []
while True:
line = input()
if not line:
break
user_summary_lines.append(line)
actual_summary = "\n".join(user_summary_lines)
# To-Dos abfragen oder übernehmen
actual_todos = todos_override
if not actual_todos:
user_todos = input("\nGibt es offene To-Dos oder nächste Schritte? (Leer lassen zum Überspringen): ")
actual_todos = user_todos.strip()
# Kommentar zusammenstellen
report_lines = []
# Diese Zeilen werden jetzt innerhalb des Code-Blocks formatiert
# Add invested time to the report if available
if 'elapsed_hours' in locals():
elapsed_hhmm = decimal_hours_to_hhmm(elapsed_hours)
report_lines.append(f"Investierte Zeit in dieser Session: {elapsed_hhmm}")
report_lines.append(f"Neuer Status: {actual_status}")
if actual_summary:
report_lines.append("\nArbeitszusammenfassung:")
report_lines.append(actual_summary)
if actual_git_changes or actual_commit_messages:
report_lines.append("\nTechnische Änderungen (Git):")
if actual_git_changes:
report_lines.append(f"```{actual_git_changes}```")
if actual_commit_messages:
report_lines.append("\nCommit Nachrichten:")
report_lines.append(f"```{actual_commit_messages}```")
if actual_todos:
report_lines.append("\nOffene To-Dos / Nächste Schritte:")
for todo_item in actual_todos.split('\n'):
report_lines.append(f"- {todo_item.strip()}")
report_content = "\n".join(report_lines)
# Notion Blöcke für die API erstellen
timestamp = datetime.now(BERLIN_TZ).strftime('%Y-%m-%d %H:%M')
notion_blocks = [
{
"object": "block",
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": f"🤖 Status-Update ({timestamp} Berlin Time)"}}]
}
},
{
"object": "block",
"type": "code",
"code": {
"rich_text": [{"type": "text", "text": {"content": report_content}}],
"language": "yaml"
}
}
]
# Notion aktualisieren
append_blocks_to_notion_page(token, task_id, notion_blocks)
status_payload = {"Status": {"status": {"name": actual_status}}}
update_notion_task_property(token, task_id, status_payload)
# --- Git Operationen ---
print("\n--- Führe Git-Operationen aus ---")
try:
subprocess.run(["git", "add", "."], check=True)
print("✅ Alle Änderungen gestaged (git add .).")
# Prüfen, ob es Änderungen zum Committen gibt
git_status_output = subprocess.run(["git", "status", "--porcelain"], capture_output=True, text=True, check=True).stdout.strip()
if not git_status_output:
print("⚠️ Keine Änderungen zum Committen gefunden. Überspringe git commit.")
return # Beende die Funktion, da nichts zu tun ist
# Commit-Nachricht erstellen
commit_subject = actual_summary.splitlines()[0] if actual_summary else "Notion Status Update"
commit_message = f"[{task_id.split('-')[0]}] {commit_subject}\n\n{actual_summary}"
subprocess.run(["git", "commit", "-m", commit_message], check=True)
print("✅ Git commit erfolgreich.")
# Interaktive Abfrage für git push
push_choice = input("\n✅ Commit erfolgreich erstellt. Sollen die Änderungen jetzt gepusht werden? (j/n): ").lower()
if push_choice == 'j':
git_push_with_retry()
except subprocess.CalledProcessError as e:
print(f"❌ FEHLER bei Git-Operationen: {e}")
except Exception as e:
print(f"❌ Unerwarteter Fehler bei Git-Operationen: {e}")
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"❌ FEHLER beim Lesen der Session-Informationen für Statusbericht: {e}")
except Exception as e:
print(f"❌ Unerwarteter Fehler beim Erstellen des Statusberichts: {e}")
# --- Context Generation ---
def generate_cli_context(project_title: str, task_title: str, task_id: str, readme_path: Optional[str], task_description: Optional[str], total_duration: float) -> str:
"""Erstellt den reinen Kontext-String für die Gemini CLI."""
# Fallback, falls kein Pfad in Notion gesetzt ist
if not readme_path:
readme_path = "readme.md"
description_part = ""
if task_description:
description_part = (
f"\n**Aufgabenbeschreibung:**\n"
f"```\n{task_description}\n```\n"
)
duration_hhmm = decimal_hours_to_hhmm(total_duration)
duration_part = f"Bisher erfasster Zeitaufwand (Notion): {duration_hhmm} Stunden.\n"
context = (
f"Ich arbeite jetzt am Projekt '{project_title}'. Der Fokus liegt auf dem Task '{task_title}'.\n"
f"{description_part}\n"
f"{duration_part}"
"Die relevanten Dateien für dieses Projekt sind wahrscheinlich:\n"
"- Die primäre Projektdokumentation: @readme.md\n"
f"- Die spezifische Dokumentation für dieses Modul: @{readme_path}\n\n"
f"Mein Ziel ist es, den Task '{task_title}' umzusetzen. Alle Commits für diesen Task sollen die Kennung `[{task_id.split('-')[0]}]` enthalten.\n\n"
"**WICHTIGER BEFEHL:** Bevor du mit der Implementierung oder einer Code-Änderung beginnst, fasse die Aufgabe in deinen eigenen Worten zusammen, erstelle einen detaillierten, schrittweisen Plan zur Lösung und **warte auf meine explizite Bestätigung**, bevor du den ersten Schritt ausführst."
)
return context
# --- CLI Execution ---
# Die start_gemini_cli Funktion wird entfernt, da das aufrufende Skript jetzt die Gemini CLI startet.
def save_session_info(task_id: str, token: str):
"""Speichert die Task-ID, den Token und den Startzeitpunkt für den Git-Hook."""
os.makedirs(SESSION_DIR, exist_ok=True)
session_data = {
"task_id": task_id,
"token": token,
"session_start_time": datetime.now().isoformat()
}
with open(SESSION_FILE_PATH, "w") as f:
json.dump(session_data, f)
def install_git_hook():
"""Installiert das notion_commit_hook.py Skript als post-commit Git-Hook."""
pass
def cleanup_session():
"""Bereinigt die Session-Datei und den Git-Hook."""
if os.path.exists(SESSION_FILE_PATH):
os.remove(SESSION_FILE_PATH)
post_commit_hook_path = os.path.join(".git", "hooks", "post-commit")
if os.path.exists(post_commit_hook_path):
os.remove(post_commit_hook_path)
print("🧹 Session-Daten und Git-Hook wurden bereinigt.")
def complete_session():
"""Schließt eine aktive Session ab."""
print("Schließe aktive Entwicklungs-Session ab...")
if not os.path.exists(SESSION_FILE_PATH):
print("Keine aktive Session gefunden.")
return
try:
with open(SESSION_FILE_PATH, "r") as f:
session_data = json.load(f)
task_id = session_data.get("task_id")
token = session_data.get("token")
if task_id and token:
# Annahme: Der letzte Status in der Liste ist der "Done"-Status
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
if tasks_db_id:
status_options = get_database_status_options(token, tasks_db_id)
if status_options:
done_status = status_options[-1]
status_payload = {"Status": {"status": {"name": done_status}}}
update_notion_task_property(token, task_id, status_payload)
except (FileNotFoundError, json.JSONDecodeError):
print("Fehler beim Lesen der Session-Informationen.")
finally:
cleanup_session()
user_input = input("\nMöchtest du eine neue Session starten? (j/n): ").lower()
if user_input == 'j':
main() # Starte den interaktiven Prozess von vorne
else:
print("Auf Wiedersehen!")
def start_interactive_session():
"""Startet den Prozess zur Auswahl von Projekt/Task und startet Gemini."""
print("Starte interaktive Entwicklungs-Session...")
token = os.environ.get('NOTION_API_KEY')
if not token:
token = getpass("Bitte gib deinen Notion API Key ein (Eingabe wird nicht angezeigt): ")
if not token:
print("Kein Token angegeben. Abbruch.")
return
selected_project, readme_path = select_project(token)
if not selected_project:
return
project_title = get_page_title(selected_project)
print(f"\nProjekt '{project_title}' ausgewählt.")
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
if not tasks_db_id:
return
user_choice = select_task(token, selected_project["id"], tasks_db_id)
if not user_choice:
print("Kein Task ausgewählt. Abbruch.")
return
selected_task = None
if user_choice.get("id") == "new_task":
selected_task = create_new_notion_task(token, selected_project["id"], tasks_db_id)
if not selected_task:
return
else:
selected_task = user_choice
task_title = get_page_title(selected_task)
task_id = selected_task["id"]
print(f"\nTask '{task_title}' ausgewählt.")
# NEU: Lade die Task-Beschreibung und die bisherige Dauer
task_description = get_page_content(token, task_id)
total_duration_decimal = get_page_number_property(selected_task, "Total Duration (h)") or 0.0
total_duration_hhmm = decimal_hours_to_hhmm(total_duration_decimal)
print(f"> Bisher für diesen Task erfasst: {total_duration_hhmm} Stunden.")
# Session-Informationen für den Git-Hook speichern
save_session_info(task_id, token)
# Git-Hook installieren, der die Session-Infos nutzt
install_git_hook()
title_slug = re.sub(r'[^a-z0-9\s-]', '', task_title.lower())
title_slug = re.sub(r'\s+', '-', title_slug)
title_slug = re.sub(r'-+', '-', title_slug).strip('-')
suggested_branch_name = f"feature/task-{task_id.split('-')[0]}-{title_slug}"
status_payload = {"Status": {"status": {"name": "Doing"}}}
status_updated = update_notion_task_property(token, task_id, status_payload)
if not status_updated:
print("Warnung: Notion-Task-Status konnte nicht aktualisiert werden.")
# Finale Setup-Informationen ausgeben
print("\n------------------------------------------------------------------")
print("✅ Setup abgeschlossen!")
print(f"\nDer Notion-Task '{task_title}' wurde auf 'Doing' gesetzt.")
print(f"\nBitte erstellen Sie jetzt manuell den Git-Branch in einem separaten Terminal:")
print(f"git checkout -b {suggested_branch_name}")
print("------------------------------------------------------------------")
# CLI-Kontext generieren und an stdout ausgeben, damit das Startskript ihn aufgreifen kann
cli_context = generate_cli_context(project_title, task_title, task_id, readme_path, task_description, total_duration_decimal)
print("\n---GEMINI_CLI_CONTEXT_START---")
print(cli_context)
print("---GEMINI_CLI_CONTEXT_END---")
# Das Skript beenden, damit das aufrufende Shell-Skript die Gemini CLI starten kann
exit(0)
# --- Main Execution ---
def main():
"""Hauptfunktion des Skripts."""
# Test-Kommentar für den Workflow-Test
parser = argparse.ArgumentParser(description="Interaktiver Session-Manager für die Gemini-Entwicklung mit Notion-Integration.")
parser.add_argument("--done", action="store_true", help="Schließt die aktuelle Entwicklungs-Session ab.")
parser.add_argument("--add-comment", type=str, help="Fügt einen Kommentar zum aktuellen Notion-Task hinzu.")
parser.add_argument("--report-status", action="store_true", help="Erstellt einen Statusbericht für den Notion-Task.")
parser.add_argument("--status", type=str, help="Status, der im Notion-Task gesetzt werden soll (z.B. 'In Bearbeitung', 'Bereit für Review').")
parser.add_argument("--todos", type=str, help="Eine durch '\n' getrennte Liste offener To-Dos.")
parser.add_argument("--git-changes", type=str, help="Zusammenfassung der Git-Änderungen (git diff --stat).")
parser.add_argument("--commit-messages", type=str, help="Eine durch '\n' getrennte Liste der Commit-Nachrichten.")
parser.add_argument("--summary", type=str, help="Eine detaillierte textuelle Zusammenfassung der erledigten Arbeit.")
args = parser.parse_args()
if args.done:
complete_session()
elif args.add_comment:
if not os.path.exists(SESSION_FILE_PATH):
print("❌ FEHLER: Keine aktive Session gefunden. Kann keinen Kommentar hinzufügen.")
return
try:
with open(SESSION_FILE_PATH, "r") as f:
session_data = json.load(f)
task_id = session_data.get("task_id")
token = session_data.get("token")
if task_id and token:
add_comment_to_notion_task(token, task_id, args.add_comment)
else:
print("❌ FEHLER: Session-Daten unvollständig. Kann keinen Kommentar hinzufügen.")
except (FileNotFoundError, json.JSONDecodeError):
print("❌ FEHLER: Fehler beim Lesen der Session-Informationen. Kann keinen Kommentar hinzufügen.")
elif args.report_status:
report_status_to_notion(
status_override=args.status,
todos_override=args.todos,
git_changes_override=args.git_changes,
commit_messages_override=args.commit_messages,
summary_override=args.summary
)
else:
start_interactive_session()
if __name__ == "__main__":
main()