import os import requests import json import re from typing import List, Dict, Optional, Tuple from getpass import getpass from dotenv import load_dotenv import argparse import shutil load_dotenv() # --- API Helper Functions --- def find_database_by_title(token: str, title: str) -> Optional[str]: """Sucht nach einer Datenbank mit einem bestimmten Titel und gibt deren ID zurück.""" print(f"Suche nach Datenbank '{title}' in Notion...") url = "https://api.notion.com/v1/search" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } payload = { "query": title, "filter": {"value": "database", "property": "object"} } try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() results = response.json().get("results", []) for db in results: db_title_parts = db.get("title", []) if db_title_parts: db_title = db_title_parts[0].get("plain_text", "") if db_title.lower() == title.lower(): db_id = db["id"] print(f"Datenbank '{title}' gefunden mit ID: {db_id}") return db_id print(f"Fehler: Keine Datenbank mit dem exakten Titel '{title}' gefunden.") return None except requests.exceptions.RequestException as e: print(f"Fehler bei der Suche nach der Notion-Datenbank '{title}': {e}") if e.response is not None: try: print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}") except: print(f"Antwort des Servers: {e.response.text}") return None def query_notion_database(token: str, database_id: str, filter_payload: Dict = None) -> List[Dict]: """Fragt eine Notion-Datenbank ab und gibt eine Liste der Seiten zurück.""" url = f"https://api.notion.com/v1/databases/{database_id}/query" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } payload = {} if filter_payload: payload["filter"] = filter_payload try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() return response.json().get("results", []) except requests.exceptions.RequestException as e: print(f"Fehler bei der Abfrage der Notion-Datenbank {database_id}: {e}") if e.response is not None: try: print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}") except: print(f"Antwort des Servers: {e.response.text}") return [] def get_page_title(page: Dict) -> str: """Extrahiert den Titel einer Notion-Seite.""" for prop_value in page.get("properties", {}).values(): if prop_value.get("type") == "title": title_parts = prop_value.get("title", []) if title_parts: return title_parts[0].get("plain_text", "Unbenannt") return "Unbenannt" def get_page_property(page: Dict, prop_name: str, prop_type: str = "rich_text") -> Optional[str]: """Extrahiert den Inhalt einer bestimmten Eigenschaft (Property) von einer Seite.""" prop = page.get("properties", {}).get(prop_name) if not prop: return None if prop_type == "rich_text" and prop.get("type") == "rich_text": text_parts = prop.get("rich_text", []) if text_parts: return text_parts[0].get("plain_text") # Hier könnten weitere Typen wie 'select', 'number' etc. behandelt werden return None def get_page_number_property(page: Dict, prop_name: str) -> Optional[float]: """Extrahiert den Wert einer 'number'-Eigenschaft von einer Seite.""" prop = page.get("properties", {}).get(prop_name) if not prop or prop.get("type") != "number": return None return prop.get("number") def decimal_hours_to_hhmm(decimal_hours: float) -> str: """Wandelt Dezimalstunden in das Format 'HH:MM' um.""" if decimal_hours is None: return "00:00" hours = int(decimal_hours) minutes = int((decimal_hours * 60) % 60) return f"{hours:02d}:{minutes:02d}" def get_page_content(token: str, page_id: str) -> str: """Ruft den gesamten Textinhalt einer Notion-Seite ab, indem es die Blöcke zusammenfügt, mit Paginierung.""" url = f"https://api.notion.com/v1/blocks/{page_id}/children" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } full_text = [] next_cursor = None has_more = True try: while has_more: params = {"page_size": 100} # Max page size if next_cursor: params["start_cursor"] = next_cursor response = requests.get(url, headers=headers, params=params) response.raise_for_status() data = response.json() blocks = data.get("results", []) for block in blocks: block_type = block["type"] text_content = "" if block_type in ["paragraph", "heading_1", "heading_2", "heading_3", "bulleted_list_item", "numbered_list_item", "to_do", "callout"]: rich_text_array = block[block_type].get("rich_text", []) for rich_text in rich_text_array: text_content += rich_text.get("plain_text", "") elif block_type == "code": rich_text_array = block["code"].get("rich_text", []) for rich_text in rich_text_array: text_content += rich_text.get("plain_text", "") text_content = f"```\n{text_content}\n```" # Markdown für Codeblöcke elif block_type == "unsupported": text_content = "[Unsupported Block Type]" if text_content: # Füge grundlegende Formatierung für bessere Lesbarkeit hinzu if block_type == "heading_1": full_text.append(f"# {text_content}") elif block_type == "heading_2": full_text.append(f"## {text_content}") elif block_type == "heading_3": full_text.append(f"### {text_content}") elif block_type == "bulleted_list_item": full_text.append(f"- {text_content}") elif block_type == "numbered_list_item": full_text.append(f"1. {text_content}") # Einfache Nummerierung elif block_type == "to_do": checked = "[x]" if block["to_do"].get("checked") else "[ ]" full_text.append(f"{checked} {text_content}") elif block_type == "callout": # Extrahiere Icon und Text icon = block["callout"].get("icon", {}).get("emoji", "") full_text.append(f"> {icon} {text_content}") else: # paragraph und andere Standardtexte full_text.append(text_content) next_cursor = data.get("next_cursor") has_more = data.get("has_more", False) and next_cursor return "\n".join(full_text) except requests.exceptions.RequestException as e: print(f"Fehler beim Abrufen des Seiteninhalts für Page-ID {page_id}: {e}") if e.response is not None: try: if e.response: # Wenn eine Antwort vorhanden ist error_details = e.response.json() print(f"Notion API Fehlerdetails: {json.dumps(error_details, indent=2)}") except json.JSONDecodeError: print(f"Notion API Rohantwort: {e.response.text}") return "" def get_database_status_options(token: str, db_id: str) -> List[str]: """Ruft die verfügbaren Status-Optionen für eine Datenbank-Eigenschaft ab.""" url = f"https://api.notion.com/v1/databases/{db_id}" headers = { "Authorization": f"Bearer {token}", "Notion-Version": "2022-06-28" } try: response = requests.get(url, headers=headers) response.raise_for_status() properties = response.json().get("properties", {}) status_property = properties.get("Status") if status_property and status_property["type"] == "status": return [option["name"] for option in status_property["status"]["options"]] except requests.exceptions.RequestException as e: print(f"Fehler beim Abrufen der Datenbank-Eigenschaften: {e}") return [] def update_notion_task_property(token: str, task_id: str, payload: Dict) -> bool: """Aktualisiert eine beliebige Eigenschaft eines Notion-Tasks.""" print(f"\n--- Aktualisiere Eigenschaft von Task '{task_id}'... ---") url = f"https://api.notion.com/v1/pages/{task_id}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } update_payload = {"properties": payload} try: response = requests.patch(url, headers=headers, json=update_payload) response.raise_for_status() print(f"✅ Task-Eigenschaft erfolgreich aktualisiert.") return True except requests.exceptions.RequestException as e: print(f"❌ FEHLER beim Aktualisieren der Task-Eigenschaft: {e}") if e.response is not None: try: print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}") except: print(f"Antwort des Servers: {e.response.text}") return False def create_new_notion_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]: """Erstellt einen neuen Task in Notion und verknüpft ihn mit dem Projekt.""" print("\n--- Neuen Task erstellen ---") task_title = input("Bitte gib den Titel für den neuen Task ein: ") if not task_title: print("Kein Titel angegeben. Abbruch.") return None status_options = get_database_status_options(token, tasks_db_id) if not status_options: print("Fehler: Konnte keine Status-Optionen für die Task-Datenbank finden.") return None initial_status = status_options[0] # Nimm den ersten verfügbaren Status url = "https://api.notion.com/v1/pages" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } payload = { "parent": {"database_id": tasks_db_id}, "properties": { "Name": { "title": [{"text": {"content": task_title}}] }, "Project": { "relation": [{"id": project_id}] }, "Status": { "status": {"name": initial_status} } } } try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() new_task = response.json() print(f"✅ Task '{task_title}' erfolgreich erstellt.") return new_task except requests.exceptions.RequestException as e: print(f"❌ FEHLER beim Erstellen des Tasks: {e}") if e.response is not None: try: print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}") except: print(f"Antwort des Servers: {e.response.text}") return None def add_comment_to_notion_task(token: str, task_id: str, comment: str) -> bool: """Fügt einen Kommentar zu einer Notion-Seite (Task) hinzu.""" url = "https://api.notion.com/v1/comments" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } payload = { "parent": {"page_id": task_id}, "rich_text": [{ "text": { "content": comment } }] } try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() print(f"✅ Kommentar erfolgreich zum Notion-Task hinzugefügt.") return True except requests.exceptions.RequestException as e: print(f"❌ FEHLER beim Hinzufügen des Kommentars zum Notion-Task: {e}") if e.response is not None: try: print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}") except: print(f"Antwort des Servers: {e.response.text}") return False def append_blocks_to_notion_page(token: str, page_id: str, blocks: List[Dict]) -> bool: """Hängt Inhaltsblöcke an eine Notion-Seite an.""" url = f"https://api.notion.com/v1/blocks/{page_id}/children" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28" } payload = {"children": blocks} try: response = requests.patch(url, headers=headers, json=payload) response.raise_for_status() print(f"✅ Statusbericht erfolgreich an die Notion-Task-Seite angehängt.") return True except requests.exceptions.RequestException as e: print(f"❌ FEHLER beim Anhängen des Statusberichts an die Notion-Seite: {e}") if e.response is not None: try: print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}") except: print(f"Antwort des Servers: {e.response.text}") return False # --- Session Management --- SESSION_DIR = ".dev_session" SESSION_FILE_PATH = os.path.join(SESSION_DIR, "SESSION_INFO") def install_git_hook(): """Installiert das notion_commit_hook.py Skript als post-commit Git-Hook.""" git_hooks_dir = os.path.join(".git", "hooks") post_commit_hook_path = os.path.join(git_hooks_dir, "post-commit") source_hook_script = "notion_commit_hook.py" if not os.path.exists(git_hooks_dir): # Wahrscheinlich kein Git-Repository, also nichts tun return if not os.path.exists(source_hook_script): print(f"Warnung: Hook-Skript {source_hook_script} nicht gefunden. Hook wird nicht installiert.") return try: # Lese den Inhalt des Quellskripts with open(source_hook_script, "r") as f: hook_content = f.read() # Schreibe den Inhalt in den post-commit Hook with open(post_commit_hook_path, "w") as f: f.write(hook_content) # Mache den Hook ausführbar os.chmod(post_commit_hook_path, 0o755) print("✅ Git-Hook für Notion-Kommentare erfolgreich installiert.") except (IOError, OSError) as e: print(f"❌ FEHLER beim Installieren des Git-Hooks: {e}") # --- UI Functions --- def select_project(token: str) -> Optional[Tuple[Dict, Optional[str]]]: """Zeigt eine Liste der Projekte an und lässt den Benutzer eines auswählen.""" print("--- Lade Projekte aus Notion... ---") projects_db_id = find_database_by_title(token, "Projects [UT]") if not projects_db_id: return None, None projects = query_notion_database(token, projects_db_id) if not projects: print("Keine Projekte in der Datenbank gefunden.") return None, None print("\nAn welchem Projekt möchtest du arbeiten?") for i, project in enumerate(projects): print(f"[{i+1}] {get_page_title(project)}") while True: try: choice = int(input("Bitte wähle eine Nummer: ")) if 1 <= choice <= len(projects): selected_project = projects[choice - 1] readme_path = get_page_property(selected_project, "Readme Path") return selected_project, readme_path else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe. Bitte eine Zahl eingeben.") def select_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]: """Zeigt eine Liste der Tasks für ein Projekt an und lässt den Benutzer auswählen.""" print("\n--- Lade Tasks für das ausgewählte Projekt... ---") filter_payload = { "property": "Project", "relation": {"contains": project_id} } tasks = query_notion_database(token, tasks_db_id, filter_payload=filter_payload) if not tasks: print("Keine offenen Tasks für dieses Projekt gefunden.") else: print("\nWelchen Task möchtest du bearbeiten?") for i, task in enumerate(tasks): print(f"[{i+1}] {get_page_title(task)}") print(f"[{len(tasks)+1}] Neuen Task für dieses Projekt erstellen") while True: try: choice = int(input("Bitte wähle eine Nummer: ")) if 1 <= choice <= len(tasks): return tasks[choice - 1] elif choice == len(tasks) + 1: return {"id": "new_task"} # Signal else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe. Bitte eine Zahl eingeben.") import subprocess from datetime import datetime from zoneinfo import ZoneInfo # Für Zeitzonen-Handling # Definiere die Zeitzone für Berlin BERLIN_TZ = ZoneInfo("Europe/Berlin") # --- Git Summary Generation --- def generate_git_summary() -> Tuple[str, str]: """Generiert eine Zusammenfassung der Git-Änderungen und Commit-Nachrichten seit dem letzten Push zum Main-Branch.""" try: # Finde den aktuellen Main-Branch Namen (master oder main) try: main_branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).decode("utf-8").strip() if main_branch not in ["main", "master"]: # Versuche, den Remote-Tracking-Branch für main/master zu finden result = subprocess.run(["git", "branch", "-r"], capture_output=True, text=True) if "origin/main" in result.stdout: main_branch = "origin/main" elif "origin/master" in result.stdout: main_branch = "origin/master" else: print("Warnung: Konnte keinen 'main' oder 'master' Branch finden. Git-Zusammenfassung wird möglicherweise unvollständig sein.") main_branch = "HEAD~1" # Fallback zum letzten Commit, falls kein Main-Branch gefunden wird except subprocess.CalledProcessError: main_branch = "HEAD~1" # Fallback, falls gar kein Branch gefunden wird # Git diff --stat diff_stat_cmd = ["git", "diff", "--stat", f"{main_branch}...HEAD"] diff_stat = subprocess.check_output(diff_stat_cmd).decode("utf-8").strip() # Git log --pretty commit_log_cmd = ["git", "log", "--pretty=format:- %s", f"{main_branch}...HEAD"] commit_messages = subprocess.check_output(commit_log_cmd).decode("utf-8").strip() return diff_stat, commit_messages except subprocess.CalledProcessError as e: print(f"❌ FEHLER beim Generieren der Git-Zusammenfassung: {e}") return "", "" def git_push_with_retry() -> bool: """Versucht, Änderungen zu pushen, und führt bei einem non-fast-forward-Fehler einen Rebase und erneuten Push durch.""" print("\n--- Führe git push aus ---") try: subprocess.run(["git", "push"], check=True) print("✅ Git push erfolgreich.") return True except subprocess.CalledProcessError as e: if "non-fast-forward" in e.stderr.decode("utf-8"): print("⚠️ Git push abgelehnt (non-fast-forward). Versuche git pull --rebase und erneuten Push...") try: subprocess.run(["git", "pull", "--rebase"], check=True) print("✅ Git pull --rebase erfolgreich. Versuche erneuten Push...") subprocess.run(["git", "push"], check=True) print("✅ Git push nach Rebase erfolgreich.") return True except subprocess.CalledProcessError as pull_e: print(f"❌ FEHLER bei git pull --rebase oder erneutem Push: {pull_e}") print("Bitte löse Konflikte manuell und pushe dann.") return False else: print(f"❌ FEHLER bei git push: {e}") return False except Exception as e: print(f"❌ Unerwarteter Fehler bei git push: {e}") return False # --- Report Status to Notion --- def report_status_to_notion( status_override: Optional[str], todos_override: Optional[str], git_changes_override: Optional[str], commit_messages_override: Optional[str], summary_override: Optional[str] ) -> None: """ Erstellt einen Statusbericht für den Notion-Task, entweder interaktiv oder mit überschriebenen Werten. """ if not os.path.exists(SESSION_FILE_PATH): print("❌ FEHLER: Keine aktive Session gefunden. Kann keinen Statusbericht erstellen.") return try: with open(SESSION_FILE_PATH, "r") as f: session_data = json.load(f) task_id = session_data.get("task_id") token = session_data.get("token") if not (task_id and token): print("❌ FEHLER: Session-Daten unvollständig. Kann keinen Statusbericht erstellen.") return # Time tracking logic session_start_time_str = session_data.get("session_start_time") if session_start_time_str: session_start_time = datetime.fromisoformat(session_start_time_str) elapsed_time = datetime.now() - session_start_time elapsed_hours = elapsed_time.total_seconds() / 3600 # Get current task page to read existing duration # Note: This is a simplified way. A more robust solution might query the DB # to get the page object without a separate API call if we already have it. # For now, a direct API call is clear and ensures we have the latest data. task_page_url = f"https://api.notion.com/v1/pages/{task_id}" headers = { "Authorization": f"Bearer {token}", "Notion-Version": "2022-06-28" } try: page_response = requests.get(task_page_url, headers=headers) page_response.raise_for_status() task_page = page_response.json() current_duration = get_page_number_property(task_page, "Total Duration (h)") or 0.0 new_total_duration = current_duration + elapsed_hours duration_payload = { "Total Duration (h)": { "number": new_total_duration } } update_notion_task_property(token, task_id, duration_payload) print(f"✅ Zeiterfassung: {elapsed_hours:.2f} Stunden zum Task hinzugefügt. Neue Gesamtdauer: {new_total_duration:.2f} Stunden.") # Reset session start time for the next interval save_session_info(task_id, token) except requests.exceptions.RequestException as e: print(f"❌ FEHLER beim Abrufen der Task-Details für die Zeiterfassung: {e}") print(f"--- Erstelle Statusbericht für Task {task_id} ---") # Git-Zusammenfassung generieren (immer, wenn nicht explizit überschrieben) actual_git_changes = git_changes_override actual_commit_messages = commit_messages_override if not git_changes_override or not commit_messages_override: print("Generiere Git-Zusammenfassung...") diff_stat, commit_log = generate_git_summary() if not git_changes_override: actual_git_changes = diff_stat if not commit_messages_override: actual_commit_messages = commit_log # Status abfragen oder übernehmen actual_status = status_override if not actual_status: tasks_db_id = find_database_by_title(token, "Tasks [UT]") if tasks_db_id: status_options = get_database_status_options(token, tasks_db_id) if status_options: print("\nBitte wähle den neuen Status des Tasks:") for i, option in enumerate(status_options): print(f"[{i+1}] {option}") while True: try: choice = int(input("Wähle eine Nummer: ")) if 1 <= choice <= len(status_options): actual_status = status_options[choice - 1] break else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe. Bitte eine Zahl eingeben.") else: print("❌ FEHLER: Konnte Status-Optionen nicht abrufen. Abbruch des Berichts.") return if not actual_status: print("❌ FEHLER: Kein Status festgelegt. Abbruch des Berichts.") return # Detaillierte Zusammenfassung abfragen oder übernehmen actual_summary = summary_override if not actual_summary: print("\nBitte gib eine Zusammenfassung der Arbeit ein (was wurde getan, Ergebnisse, Probleme etc.).") user_summary_lines = [] while True: line = input() if not line: break user_summary_lines.append(line) actual_summary = "\n".join(user_summary_lines) # To-Dos abfragen oder übernehmen actual_todos = todos_override if not actual_todos: user_todos = input("\nGibt es offene To-Dos oder nächste Schritte? (Leer lassen zum Überspringen): ") actual_todos = user_todos.strip() # Kommentar zusammenstellen report_lines = [] # Diese Zeilen werden jetzt innerhalb des Code-Blocks formatiert # Add invested time to the report if available if 'elapsed_hours' in locals(): elapsed_hhmm = decimal_hours_to_hhmm(elapsed_hours) report_lines.append(f"Investierte Zeit in dieser Session: {elapsed_hhmm}") report_lines.append(f"Neuer Status: {actual_status}") if actual_summary: report_lines.append("\nArbeitszusammenfassung:") report_lines.append(actual_summary) if actual_git_changes or actual_commit_messages: report_lines.append("\nTechnische Änderungen (Git):") if actual_git_changes: report_lines.append(f"```{actual_git_changes}```") if actual_commit_messages: report_lines.append("\nCommit Nachrichten:") report_lines.append(f"```{actual_commit_messages}```") if actual_todos: report_lines.append("\nOffene To-Dos / Nächste Schritte:") for todo_item in actual_todos.split('\n'): report_lines.append(f"- {todo_item.strip()}") report_content = "\n".join(report_lines) # Notion Blöcke für die API erstellen timestamp = datetime.now(BERLIN_TZ).strftime('%Y-%m-%d %H:%M') notion_blocks = [ { "object": "block", "type": "heading_2", "heading_2": { "rich_text": [{"type": "text", "text": {"content": f"🤖 Status-Update ({timestamp} Berlin Time)"}}] } }, { "object": "block", "type": "code", "code": { "rich_text": [{"type": "text", "text": {"content": report_content}}], "language": "yaml" } } ] # Notion aktualisieren append_blocks_to_notion_page(token, task_id, notion_blocks) status_payload = {"Status": {"status": {"name": actual_status}}} update_notion_task_property(token, task_id, status_payload) # --- Git Operationen --- print("\n--- Führe Git-Operationen aus ---") try: subprocess.run(["git", "add", "."], check=True) print("✅ Alle Änderungen gestaged (git add .).") # Prüfen, ob es Änderungen zum Committen gibt git_status_output = subprocess.run(["git", "status", "--porcelain"], capture_output=True, text=True, check=True).stdout.strip() if not git_status_output: print("⚠️ Keine Änderungen zum Committen gefunden. Überspringe git commit.") return # Beende die Funktion, da nichts zu tun ist # Commit-Nachricht erstellen commit_subject = actual_summary.splitlines()[0] if actual_summary else "Notion Status Update" commit_message = f"[{task_id.split('-')[0]}] {commit_subject}\n\n{actual_summary}" subprocess.run(["git", "commit", "-m", commit_message], check=True) print("✅ Git commit erfolgreich.") # Interaktive Abfrage für git push push_choice = input("\n✅ Commit erfolgreich erstellt. Sollen die Änderungen jetzt gepusht werden? (j/n): ").lower() if push_choice == 'j': git_push_with_retry() except subprocess.CalledProcessError as e: print(f"❌ FEHLER bei Git-Operationen: {e}") except Exception as e: print(f"❌ Unerwarteter Fehler bei Git-Operationen: {e}") except (FileNotFoundError, json.JSONDecodeError) as e: print(f"❌ FEHLER beim Lesen der Session-Informationen für Statusbericht: {e}") except Exception as e: print(f"❌ Unerwarteter Fehler beim Erstellen des Statusberichts: {e}") # --- Context Generation --- def generate_cli_context(project_title: str, task_title: str, task_id: str, readme_path: Optional[str], task_description: Optional[str], total_duration: float) -> str: """Erstellt den reinen Kontext-String für die Gemini CLI.""" # Fallback, falls kein Pfad in Notion gesetzt ist if not readme_path: readme_path = "readme.md" description_part = "" if task_description: description_part = ( f"\n**Aufgabenbeschreibung:**\n" f"```\n{task_description}\n```\n" ) duration_hhmm = decimal_hours_to_hhmm(total_duration) duration_part = f"Bisher erfasster Zeitaufwand (Notion): {duration_hhmm} Stunden.\n" context = ( f"Ich arbeite jetzt am Projekt '{project_title}'. Der Fokus liegt auf dem Task '{task_title}'.\n" f"{description_part}\n" f"{duration_part}" "Die relevanten Dateien für dieses Projekt sind wahrscheinlich:\n" "- Die primäre Projektdokumentation: @readme.md\n" f"- Die spezifische Dokumentation für dieses Modul: @{readme_path}\n\n" f"Mein Ziel ist es, den Task '{task_title}' umzusetzen. Alle Commits für diesen Task sollen die Kennung `[{task_id.split('-')[0]}]` enthalten.\n\n" "**WICHTIGER BEFEHL:** Bevor du mit der Implementierung oder einer Code-Änderung beginnst, fasse die Aufgabe in deinen eigenen Worten zusammen, erstelle einen detaillierten, schrittweisen Plan zur Lösung und **warte auf meine explizite Bestätigung**, bevor du den ersten Schritt ausführst." ) return context # --- CLI Execution --- # Die start_gemini_cli Funktion wird entfernt, da das aufrufende Skript jetzt die Gemini CLI startet. def save_session_info(task_id: str, token: str): """Speichert die Task-ID, den Token und den Startzeitpunkt für den Git-Hook.""" os.makedirs(SESSION_DIR, exist_ok=True) session_data = { "task_id": task_id, "token": token, "session_start_time": datetime.now().isoformat() } with open(SESSION_FILE_PATH, "w") as f: json.dump(session_data, f) def install_git_hook(): """Installiert das notion_commit_hook.py Skript als post-commit Git-Hook.""" pass def cleanup_session(): """Bereinigt die Session-Datei und den Git-Hook.""" if os.path.exists(SESSION_FILE_PATH): os.remove(SESSION_FILE_PATH) post_commit_hook_path = os.path.join(".git", "hooks", "post-commit") if os.path.exists(post_commit_hook_path): os.remove(post_commit_hook_path) print("🧹 Session-Daten und Git-Hook wurden bereinigt.") def complete_session(): """Schließt eine aktive Session ab.""" print("Schließe aktive Entwicklungs-Session ab...") if not os.path.exists(SESSION_FILE_PATH): print("Keine aktive Session gefunden.") return try: with open(SESSION_FILE_PATH, "r") as f: session_data = json.load(f) task_id = session_data.get("task_id") token = session_data.get("token") if task_id and token: # Annahme: Der letzte Status in der Liste ist der "Done"-Status tasks_db_id = find_database_by_title(token, "Tasks [UT]") if tasks_db_id: status_options = get_database_status_options(token, tasks_db_id) if status_options: done_status = status_options[-1] status_payload = {"Status": {"status": {"name": done_status}}} update_notion_task_property(token, task_id, status_payload) except (FileNotFoundError, json.JSONDecodeError): print("Fehler beim Lesen der Session-Informationen.") finally: cleanup_session() user_input = input("\nMöchtest du eine neue Session starten? (j/n): ").lower() if user_input == 'j': main() # Starte den interaktiven Prozess von vorne else: print("Auf Wiedersehen!") def start_interactive_session(): """Startet den Prozess zur Auswahl von Projekt/Task und startet Gemini.""" print("Starte interaktive Entwicklungs-Session...") token = os.environ.get('NOTION_API_KEY') if not token: token = getpass("Bitte gib deinen Notion API Key ein (Eingabe wird nicht angezeigt): ") if not token: print("Kein Token angegeben. Abbruch.") return selected_project, readme_path = select_project(token) if not selected_project: return project_title = get_page_title(selected_project) print(f"\nProjekt '{project_title}' ausgewählt.") tasks_db_id = find_database_by_title(token, "Tasks [UT]") if not tasks_db_id: return user_choice = select_task(token, selected_project["id"], tasks_db_id) if not user_choice: print("Kein Task ausgewählt. Abbruch.") return selected_task = None if user_choice.get("id") == "new_task": selected_task = create_new_notion_task(token, selected_project["id"], tasks_db_id) if not selected_task: return else: selected_task = user_choice task_title = get_page_title(selected_task) task_id = selected_task["id"] print(f"\nTask '{task_title}' ausgewählt.") # NEU: Lade die Task-Beschreibung und die bisherige Dauer task_description = get_page_content(token, task_id) total_duration_decimal = get_page_number_property(selected_task, "Total Duration (h)") or 0.0 total_duration_hhmm = decimal_hours_to_hhmm(total_duration_decimal) print(f"> Bisher für diesen Task erfasst: {total_duration_hhmm} Stunden.") # Session-Informationen für den Git-Hook speichern save_session_info(task_id, token) # Git-Hook installieren, der die Session-Infos nutzt install_git_hook() title_slug = re.sub(r'[^a-z0-9\s-]', '', task_title.lower()) title_slug = re.sub(r'\s+', '-', title_slug) title_slug = re.sub(r'-+', '-', title_slug).strip('-') suggested_branch_name = f"feature/task-{task_id.split('-')[0]}-{title_slug}" status_payload = {"Status": {"status": {"name": "Doing"}}} status_updated = update_notion_task_property(token, task_id, status_payload) if not status_updated: print("Warnung: Notion-Task-Status konnte nicht aktualisiert werden.") # Finale Setup-Informationen ausgeben print("\n------------------------------------------------------------------") print("✅ Setup abgeschlossen!") print(f"\nDer Notion-Task '{task_title}' wurde auf 'Doing' gesetzt.") print(f"\nBitte erstellen Sie jetzt manuell den Git-Branch in einem separaten Terminal:") print(f"git checkout -b {suggested_branch_name}") print("------------------------------------------------------------------") # CLI-Kontext generieren und an stdout ausgeben, damit das Startskript ihn aufgreifen kann cli_context = generate_cli_context(project_title, task_title, task_id, readme_path, task_description, total_duration_decimal) print("\n---GEMINI_CLI_CONTEXT_START---") print(cli_context) print("---GEMINI_CLI_CONTEXT_END---") # Das Skript beenden, damit das aufrufende Shell-Skript die Gemini CLI starten kann exit(0) # --- Main Execution --- def main(): """Hauptfunktion des Skripts.""" parser = argparse.ArgumentParser(description="Interaktiver Session-Manager für die Gemini-Entwicklung mit Notion-Integration.") parser.add_argument("--done", action="store_true", help="Schließt die aktuelle Entwicklungs-Session ab.") parser.add_argument("--add-comment", type=str, help="Fügt einen Kommentar zum aktuellen Notion-Task hinzu.") parser.add_argument("--report-status", action="store_true", help="Erstellt einen Statusbericht für den Notion-Task.") parser.add_argument("--status", type=str, help="Status, der im Notion-Task gesetzt werden soll (z.B. 'In Bearbeitung', 'Bereit für Review').") parser.add_argument("--todos", type=str, help="Eine durch '\n' getrennte Liste offener To-Dos.") parser.add_argument("--git-changes", type=str, help="Zusammenfassung der Git-Änderungen (git diff --stat).") parser.add_argument("--commit-messages", type=str, help="Eine durch '\n' getrennte Liste der Commit-Nachrichten.") parser.add_argument("--summary", type=str, help="Eine detaillierte textuelle Zusammenfassung der erledigten Arbeit.") args = parser.parse_args() if args.done: complete_session() elif args.add_comment: if not os.path.exists(SESSION_FILE_PATH): print("❌ FEHLER: Keine aktive Session gefunden. Kann keinen Kommentar hinzufügen.") return try: with open(SESSION_FILE_PATH, "r") as f: session_data = json.load(f) task_id = session_data.get("task_id") token = session_data.get("token") if task_id and token: add_comment_to_notion_task(token, task_id, args.add_comment) else: print("❌ FEHLER: Session-Daten unvollständig. Kann keinen Kommentar hinzufügen.") except (FileNotFoundError, json.JSONDecodeError): print("❌ FEHLER: Fehler beim Lesen der Session-Informationen. Kann keinen Kommentar hinzufügen.") elif args.report_status: report_status_to_notion( status_override=args.status, todos_override=args.todos, git_changes_override=args.git_changes, commit_messages_override=args.commit_messages, summary_override=args.summary ) else: start_interactive_session() if __name__ == "__main__": main()