This commit introduces a containerized workflow for the development session manager (dev_session.py). - Dockerization: Added gemini.Dockerfile to create a self-contained environment with all Python dependencies, removing the need for manual host setup. - Start Script: Updated start-gemini.sh to build and run the container, executing dev_session.py as the entrypoint. - Session Management: Implemented --done flag to properly terminate a session, update the Notion task status, and clean up the git hook. - Notion Integration: Created and integrated notion_commit_hook.py which is automatically installed/uninstalled to post commit messages to the active Notion task. - Documentation: Updated README_dev_session.md to reflect the new container-based setup, usage, and features.
550 lines
20 KiB
Python
550 lines
20 KiB
Python
import os
|
|
import requests
|
|
import json
|
|
import re
|
|
from typing import List, Dict, Optional, Tuple
|
|
from getpass import getpass
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
# --- API Helper Functions ---
|
|
|
|
def find_database_by_title(token: str, title: str) -> Optional[str]:
|
|
"""Sucht nach einer Datenbank mit einem bestimmten Titel und gibt deren ID zurück."""
|
|
print(f"Suche nach Datenbank '{title}' in Notion...")
|
|
url = "https://api.notion.com/v1/search"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"query": title,
|
|
"filter": {"value": "database", "property": "object"}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
results = response.json().get("results", [])
|
|
|
|
for db in results:
|
|
db_title_parts = db.get("title", [])
|
|
if db_title_parts:
|
|
db_title = db_title_parts[0].get("plain_text", "")
|
|
if db_title.lower() == title.lower():
|
|
db_id = db["id"]
|
|
print(f"Datenbank '{title}' gefunden mit ID: {db_id}")
|
|
return db_id
|
|
|
|
print(f"Fehler: Keine Datenbank mit dem exakten Titel '{title}' gefunden.")
|
|
return None
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler bei der Suche nach der Notion-Datenbank '{title}': {e}")
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return None
|
|
|
|
def query_notion_database(token: str, database_id: str, filter_payload: Dict = None) -> List[Dict]:
|
|
"""Fragt eine Notion-Datenbank ab und gibt eine Liste der Seiten zurück."""
|
|
url = f"https://api.notion.com/v1/databases/{database_id}/query"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
|
|
payload = {}
|
|
if filter_payload:
|
|
payload["filter"] = filter_payload
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
return response.json().get("results", [])
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler bei der Abfrage der Notion-Datenbank {database_id}: {e}")
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return []
|
|
|
|
def get_page_title(page: Dict) -> str:
|
|
"""Extrahiert den Titel einer Notion-Seite."""
|
|
for prop_value in page.get("properties", {}).values():
|
|
if prop_value.get("type") == "title":
|
|
title_parts = prop_value.get("title", [])
|
|
if title_parts:
|
|
return title_parts[0].get("plain_text", "Unbenannt")
|
|
return "Unbenannt"
|
|
|
|
def get_page_property(page: Dict, prop_name: str, prop_type: str = "rich_text") -> Optional[str]:
|
|
"""Extrahiert den Inhalt einer bestimmten Eigenschaft (Property) von einer Seite."""
|
|
prop = page.get("properties", {}).get(prop_name)
|
|
if not prop:
|
|
return None
|
|
|
|
if prop_type == "rich_text" and prop.get("type") == "rich_text":
|
|
text_parts = prop.get("rich_text", [])
|
|
if text_parts:
|
|
return text_parts[0].get("plain_text")
|
|
|
|
# Hier könnten weitere Typen wie 'select', 'number' etc. behandelt werden
|
|
return None
|
|
|
|
def get_database_status_options(token: str, db_id: str) -> List[str]:
|
|
"""Ruft die verfügbaren Status-Optionen für eine Datenbank-Eigenschaft ab."""
|
|
url = f"https://api.notion.com/v1/databases/{db_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
properties = response.json().get("properties", {})
|
|
status_property = properties.get("Status")
|
|
if status_property and status_property["type"] == "status":
|
|
return [option["name"] for option in status_property["status"]["options"]]
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler beim Abrufen der Datenbank-Eigenschaften: {e}")
|
|
return []
|
|
|
|
def update_notion_task_status(token: str, task_id: str, status_value: str = "Doing") -> bool:
|
|
"""Aktualisiert den Status eines Notion-Tasks."""
|
|
print(f"\n--- Aktualisiere Status von Task '{task_id}' auf '{status_value}'... ---")
|
|
url = f"https://api.notion.com/v1/pages/{task_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"properties": {
|
|
"Status": {
|
|
"status": {
|
|
"name": status_value
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.patch(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
print(f"✅ Task-Status erfolgreich auf '{status_value}' aktualisiert.")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Aktualisieren des Task-Status: {e}")
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return False
|
|
|
|
def create_new_notion_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]:
|
|
"""Erstellt einen neuen Task in Notion und verknüpft ihn mit dem Projekt."""
|
|
print("\n--- Neuen Task erstellen ---")
|
|
task_title = input("Bitte gib den Titel für den neuen Task ein: ")
|
|
if not task_title:
|
|
print("Kein Titel angegeben. Abbruch.")
|
|
return None
|
|
|
|
status_options = get_database_status_options(token, tasks_db_id)
|
|
if not status_options:
|
|
print("Fehler: Konnte keine Status-Optionen für die Task-Datenbank finden.")
|
|
return None
|
|
|
|
initial_status = status_options[0] # Nimm den ersten verfügbaren Status
|
|
|
|
url = "https://api.notion.com/v1/pages"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"parent": {"database_id": tasks_db_id},
|
|
"properties": {
|
|
"Name": {
|
|
"title": [{"text": {"content": task_title}}]
|
|
},
|
|
"Project": {
|
|
"relation": [{"id": project_id}]
|
|
},
|
|
"Status": {
|
|
"status": {"name": initial_status}
|
|
}
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
new_task = response.json()
|
|
print(f"✅ Task '{task_title}' erfolgreich erstellt.")
|
|
return new_task
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Erstellen des Tasks: {e}")
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return None
|
|
|
|
def add_comment_to_notion_task(token: str, task_id: str, comment: str) -> bool:
|
|
"""Fügt einen Kommentar zu einer Notion-Seite (Task) hinzu."""
|
|
url = "https://api.notion.com/v1/comments"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"parent": {"page_id": task_id},
|
|
"rich_text": [{
|
|
"text": {
|
|
"content": comment
|
|
}
|
|
}]
|
|
}
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
# Kein print, da dies vom Git-Hook im Hintergrund aufgerufen wird
|
|
return True
|
|
except requests.exceptions.RequestException:
|
|
# Fehler unterdrücken, um den Commit-Prozess nicht zu blockieren
|
|
return False
|
|
|
|
# --- Session Management ---
|
|
|
|
SESSION_DIR = ".dev_session"
|
|
SESSION_FILE_PATH = os.path.join(SESSION_DIR, "SESSION_INFO")
|
|
|
|
def save_session_info(task_id: str, token: str):
|
|
"""Speichert die Task-ID und den Token für den Git-Hook."""
|
|
os.makedirs(SESSION_DIR, exist_ok=True)
|
|
session_data = {
|
|
"task_id": task_id,
|
|
"token": token
|
|
}
|
|
with open(SESSION_FILE_PATH, "w") as f:
|
|
json.dump(session_data, f)
|
|
|
|
def install_git_hook():
|
|
"""Installiert das notion_commit_hook.py Skript als post-commit Git-Hook."""
|
|
git_hooks_dir = os.path.join(".git", "hooks")
|
|
post_commit_hook_path = os.path.join(git_hooks_dir, "post-commit")
|
|
source_hook_script = "notion_commit_hook.py"
|
|
|
|
if not os.path.exists(git_hooks_dir):
|
|
# Wahrscheinlich kein Git-Repository, also nichts tun
|
|
return
|
|
|
|
if not os.path.exists(source_hook_script):
|
|
print(f"Warnung: Hook-Skript {source_hook_script} nicht gefunden. Hook wird nicht installiert.")
|
|
return
|
|
|
|
try:
|
|
# Lese den Inhalt des Quellskripts
|
|
with open(source_hook_script, "r") as f:
|
|
hook_content = f.read()
|
|
|
|
# Schreibe den Inhalt in den post-commit Hook
|
|
with open(post_commit_hook_path, "w") as f:
|
|
f.write(hook_content)
|
|
|
|
# Mache den Hook ausführbar
|
|
os.chmod(post_commit_hook_path, 0o755)
|
|
print("✅ Git-Hook für Notion-Kommentare erfolgreich installiert.")
|
|
|
|
except (IOError, OSError) as e:
|
|
print(f"❌ FEHLER beim Installieren des Git-Hooks: {e}")
|
|
|
|
# --- UI Functions ---
|
|
|
|
def select_project(token: str) -> Optional[Tuple[Dict, Optional[str]]]:
|
|
"""Zeigt eine Liste der Projekte an und lässt den Benutzer eines auswählen."""
|
|
print("--- Lade Projekte aus Notion... ---")
|
|
|
|
projects_db_id = find_database_by_title(token, "Projects [UT]")
|
|
if not projects_db_id:
|
|
return None, None
|
|
|
|
projects = query_notion_database(token, projects_db_id)
|
|
if not projects:
|
|
print("Keine Projekte in der Datenbank gefunden.")
|
|
return None, None
|
|
|
|
print("\nAn welchem Projekt möchtest du arbeiten?")
|
|
for i, project in enumerate(projects):
|
|
print(f"[{i+1}] {get_page_title(project)}")
|
|
|
|
while True:
|
|
try:
|
|
choice = int(input("Bitte wähle eine Nummer: "))
|
|
if 1 <= choice <= len(projects):
|
|
selected_project = projects[choice - 1]
|
|
readme_path = get_page_property(selected_project, "Readme Path")
|
|
return selected_project, readme_path
|
|
else:
|
|
print("Ungültige Auswahl.")
|
|
except ValueError:
|
|
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
|
|
|
|
def select_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]:
|
|
"""Zeigt eine Liste der Tasks für ein Projekt an und lässt den Benutzer auswählen."""
|
|
print("\n--- Lade Tasks für das ausgewählte Projekt... ---")
|
|
|
|
filter_payload = {
|
|
"property": "Project",
|
|
"relation": {"contains": project_id}
|
|
}
|
|
tasks = query_notion_database(token, tasks_db_id, filter_payload=filter_payload)
|
|
|
|
if not tasks:
|
|
print("Keine offenen Tasks für dieses Projekt gefunden.")
|
|
else:
|
|
print("\nWelchen Task möchtest du bearbeiten?")
|
|
for i, task in enumerate(tasks):
|
|
print(f"[{i+1}] {get_page_title(task)}")
|
|
|
|
print(f"[{len(tasks)+1}] Neuen Task für dieses Projekt erstellen")
|
|
|
|
while True:
|
|
try:
|
|
choice = int(input("Bitte wähle eine Nummer: "))
|
|
if 1 <= choice <= len(tasks):
|
|
return tasks[choice - 1]
|
|
elif choice == len(tasks) + 1:
|
|
return {"id": "new_task"} # Signal
|
|
else:
|
|
print("Ungültige Auswahl.")
|
|
except ValueError:
|
|
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
|
|
|
|
import subprocess
|
|
|
|
# --- Context Generation ---
|
|
|
|
def generate_cli_context(project_title: str, task_title: str, task_id: str, readme_path: Optional[str]) -> str:
|
|
"""Erstellt den reinen Kontext-String für die Gemini CLI."""
|
|
|
|
# Fallback, falls kein Pfad in Notion gesetzt ist
|
|
if not readme_path:
|
|
readme_path = "readme.md"
|
|
|
|
context = (
|
|
f"Ich arbeite jetzt am Projekt '{project_title}'. Der Fokus liegt auf dem Task '{task_title}'.\n\n"
|
|
"Die relevanten Dateien für dieses Projekt sind wahrscheinlich:\n"
|
|
"- Die primäre Projektdokumentation: @readme.md\n"
|
|
f"- Die spezifische Dokumentation für dieses Modul: @{readme_path}\n"
|
|
f"- Der Haupt-Code befindet sich wahrscheinlich in: @dev_session.py\n\n"
|
|
f"Mein Ziel ist es, den Task '{task_title}' umzusetzen. Alle Commits für diesen Task sollen die Kennung `[{task_id.split('-')[0]}]` enthalten."
|
|
)
|
|
return context
|
|
|
|
# --- CLI Execution ---
|
|
|
|
def start_gemini_cli(initial_context: str):
|
|
"""Startet die Gemini CLI als interaktiven Subprozess und übergibt den Startkontext."""
|
|
print("\n--- Übergebe an Gemini CLI... ---")
|
|
print("Die Gemini CLI wird jetzt gestartet. Sie können direkt mit der Arbeit beginnen.")
|
|
|
|
try:
|
|
# Der Befehl, um die Gemini CLI zu starten
|
|
command = ["gemini"]
|
|
|
|
# Starte den Prozess
|
|
# stdin=subprocess.PIPE, um den Kontext zu senden
|
|
# text=True für String-basierte Ein-/Ausgabe
|
|
process = subprocess.Popen(command, stdin=subprocess.PIPE, text=True)
|
|
|
|
# Schreibe den initialen Kontext in die Standard-Eingabe der CLI
|
|
# Ein Zeilenumbruch am Ende simuliert das Drücken der Enter-Taste.
|
|
process.communicate(input=initial_context + '\n')
|
|
|
|
except FileNotFoundError:
|
|
print("\n❌ FEHLER: Der 'gemini'-Befehl wurde nicht gefunden.")
|
|
print("Stellen Sie sicher, dass die Gemini CLI korrekt im Container installiert und im PATH verfügbar ist.")
|
|
except Exception as e:
|
|
print(f"\n❌ Ein unerwarteter Fehler ist beim Starten der Gemini CLI aufgetreten: {e}")
|
|
|
|
import shutil
|
|
import argparse
|
|
|
|
# --- Session Management ---
|
|
|
|
SESSION_DIR = ".dev_session"
|
|
SESSION_FILE_PATH = os.path.join(SESSION_DIR, "SESSION_INFO")
|
|
|
|
def save_session_info(task_id: str, token: str):
|
|
"""Speichert die Task-ID und den Token für den Git-Hook."""
|
|
os.makedirs(SESSION_DIR, exist_ok=True)
|
|
session_data = {
|
|
"task_id": task_id,
|
|
"token": token
|
|
}
|
|
with open(SESSION_FILE_PATH, "w") as f:
|
|
json.dump(session_data, f)
|
|
|
|
def install_git_hook():
|
|
"""Installiert das notion_commit_hook.py Skript als post-commit Git-Hook."""
|
|
git_hooks_dir = os.path.join(".git", "hooks")
|
|
post_commit_hook_path = os.path.join(git_hooks_dir, "post-commit")
|
|
source_hook_script = "notion_commit_hook.py"
|
|
|
|
if not os.path.exists(git_hooks_dir):
|
|
# Wahrscheinlich kein Git-Repository, also nichts tun
|
|
return
|
|
|
|
if not os.path.exists(source_hook_script):
|
|
print(f"Warnung: Hook-Skript {source_hook_script} nicht gefunden. Hook wird nicht installiert.")
|
|
return
|
|
|
|
try:
|
|
# Kopiere das Skript und mache es ausführbar
|
|
shutil.copy(source_hook_script, post_commit_hook_path)
|
|
os.chmod(post_commit_hook_path, 0o755)
|
|
print("✅ Git-Hook für Notion-Kommentare erfolgreich installiert.")
|
|
|
|
except (IOError, OSError) as e:
|
|
print(f"❌ FEHLER beim Installieren des Git-Hooks: {e}")
|
|
|
|
def cleanup_session():
|
|
"""Bereinigt die Session-Datei und den Git-Hook."""
|
|
if os.path.exists(SESSION_FILE_PATH):
|
|
os.remove(SESSION_FILE_PATH)
|
|
|
|
post_commit_hook_path = os.path.join(".git", "hooks", "post-commit")
|
|
if os.path.exists(post_commit_hook_path):
|
|
os.remove(post_commit_hook_path)
|
|
|
|
print("🧹 Session-Daten und Git-Hook wurden bereinigt.")
|
|
|
|
def complete_session():
|
|
"""Schließt eine aktive Session ab."""
|
|
print("Schließe aktive Entwicklungs-Session ab...")
|
|
if not os.path.exists(SESSION_FILE_PATH):
|
|
print("Keine aktive Session gefunden.")
|
|
return
|
|
|
|
try:
|
|
with open(SESSION_FILE_PATH, "r") as f:
|
|
session_data = json.load(f)
|
|
task_id = session_data.get("task_id")
|
|
token = session_data.get("token")
|
|
|
|
if task_id and token:
|
|
# Annahme: Der letzte Status in der Liste ist der "Done"-Status
|
|
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
|
|
if tasks_db_id:
|
|
status_options = get_database_status_options(token, tasks_db_id)
|
|
if status_options:
|
|
done_status = status_options[-1]
|
|
update_notion_task_status(token, task_id, done_status)
|
|
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
print("Fehler beim Lesen der Session-Informationen.")
|
|
|
|
finally:
|
|
cleanup_session()
|
|
|
|
user_input = input("\nMöchtest du eine neue Session starten? (j/n): ").lower()
|
|
if user_input == 'j':
|
|
main() # Starte den interaktiven Prozess von vorne
|
|
else:
|
|
print("Auf Wiedersehen!")
|
|
|
|
def start_interactive_session():
|
|
"""Startet den Prozess zur Auswahl von Projekt/Task und startet Gemini."""
|
|
print("Starte interaktive Entwicklungs-Session...")
|
|
|
|
token = os.environ.get('NOTION_API_KEY')
|
|
if not token:
|
|
token = getpass("Bitte gib deinen Notion API Key ein (Eingabe wird nicht angezeigt): ")
|
|
if not token:
|
|
print("Kein Token angegeben. Abbruch.")
|
|
return
|
|
|
|
selected_project, readme_path = select_project(token)
|
|
if not selected_project:
|
|
return
|
|
|
|
project_title = get_page_title(selected_project)
|
|
print(f"\nProjekt '{project_title}' ausgewählt.")
|
|
|
|
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
|
|
if not tasks_db_id:
|
|
return
|
|
|
|
user_choice = select_task(token, selected_project["id"], tasks_db_id)
|
|
|
|
if not user_choice:
|
|
print("Kein Task ausgewählt. Abbruch.")
|
|
return
|
|
|
|
selected_task = None
|
|
if user_choice.get("id") == "new_task":
|
|
selected_task = create_new_notion_task(token, selected_project["id"], tasks_db_id)
|
|
if not selected_task:
|
|
return
|
|
else:
|
|
selected_task = user_choice
|
|
|
|
task_title = get_page_title(selected_task)
|
|
task_id = selected_task["id"]
|
|
print(f"\nTask '{task_title}' ausgewählt.")
|
|
|
|
# Session-Informationen für den Git-Hook speichern
|
|
save_session_info(task_id, token)
|
|
|
|
# Git-Hook installieren, der die Session-Infos nutzt
|
|
install_git_hook()
|
|
|
|
title_slug = re.sub(r'[^a-z0-9\s-]', '', task_title.lower())
|
|
title_slug = re.sub(r'\s+', '-', title_slug)
|
|
title_slug = re.sub(r'-+', '-', title_slug).strip('-')
|
|
|
|
suggested_branch_name = f"feature/task-{task_id.split('-')[0]}-{title_slug}"
|
|
|
|
status_updated = update_notion_task_status(token, task_id, "Doing")
|
|
if not status_updated:
|
|
print("Warnung: Notion-Task-Status konnte nicht aktualisiert werden.")
|
|
|
|
# Finale Setup-Informationen ausgeben
|
|
print("\n------------------------------------------------------------------")
|
|
print("✅ Setup abgeschlossen!")
|
|
print(f"\nDer Notion-Task '{task_title}' wurde auf 'Doing' gesetzt.")
|
|
print(f"\nBitte erstellen Sie jetzt manuell den Git-Branch in einem separaten Terminal:")
|
|
print(f"git checkout -b {suggested_branch_name}")
|
|
print("------------------------------------------------------------------")
|
|
|
|
# CLI-Kontext generieren und die interaktive Session starten
|
|
cli_context = generate_cli_context(project_title, task_title, task_id, readme_path)
|
|
start_gemini_cli(cli_context)
|
|
|
|
|
|
# --- Main Execution ---
|
|
|
|
def main():
|
|
"""Hauptfunktion des Skripts."""
|
|
parser = argparse.ArgumentParser(description="Interaktiver Session-Manager für die Gemini-Entwicklung mit Notion-Integration.")
|
|
parser.add_argument("--done", action="store_true", help="Schließt die aktuelle Entwicklungs-Session ab.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.done:
|
|
complete_session()
|
|
else:
|
|
start_interactive_session()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |