Implemented a full time tracking feature. The system now displays the previously recorded time in hh:mm format when a session starts. When a work unit is completed, the invested time is automatically calculated, added to the total in Notion, and included in the status report. Various bugs were fixed during this process.
918 lines
37 KiB
Python
918 lines
37 KiB
Python
import os
|
|
import requests
|
|
import json
|
|
import re
|
|
from typing import List, Dict, Optional, Tuple
|
|
from getpass import getpass
|
|
from dotenv import load_dotenv
|
|
import argparse
|
|
import shutil
|
|
|
|
load_dotenv()
|
|
|
|
# --- API Helper Functions ---
|
|
|
|
def find_database_by_title(token: str, title: str) -> Optional[str]:
|
|
"""Sucht nach einer Datenbank mit einem bestimmten Titel und gibt deren ID zurück."""
|
|
print(f"Suche nach Datenbank '{title}' in Notion...")
|
|
url = "https://api.notion.com/v1/search"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"query": title,
|
|
"filter": {"value": "database", "property": "object"}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
results = response.json().get("results", [])
|
|
|
|
for db in results:
|
|
db_title_parts = db.get("title", [])
|
|
if db_title_parts:
|
|
db_title = db_title_parts[0].get("plain_text", "")
|
|
if db_title.lower() == title.lower():
|
|
db_id = db["id"]
|
|
print(f"Datenbank '{title}' gefunden mit ID: {db_id}")
|
|
return db_id
|
|
|
|
print(f"Fehler: Keine Datenbank mit dem exakten Titel '{title}' gefunden.")
|
|
return None
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler bei der Suche nach der Notion-Datenbank '{title}': {e}")
|
|
if e.response is not None:
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return None
|
|
|
|
def query_notion_database(token: str, database_id: str, filter_payload: Dict = None) -> List[Dict]:
|
|
"""Fragt eine Notion-Datenbank ab und gibt eine Liste der Seiten zurück."""
|
|
url = f"https://api.notion.com/v1/databases/{database_id}/query"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
|
|
payload = {}
|
|
if filter_payload:
|
|
payload["filter"] = filter_payload
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
return response.json().get("results", [])
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler bei der Abfrage der Notion-Datenbank {database_id}: {e}")
|
|
if e.response is not None:
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return []
|
|
|
|
def get_page_title(page: Dict) -> str:
|
|
"""Extrahiert den Titel einer Notion-Seite."""
|
|
for prop_value in page.get("properties", {}).values():
|
|
if prop_value.get("type") == "title":
|
|
title_parts = prop_value.get("title", [])
|
|
if title_parts:
|
|
return title_parts[0].get("plain_text", "Unbenannt")
|
|
return "Unbenannt"
|
|
|
|
def get_page_property(page: Dict, prop_name: str, prop_type: str = "rich_text") -> Optional[str]:
|
|
"""Extrahiert den Inhalt einer bestimmten Eigenschaft (Property) von einer Seite."""
|
|
prop = page.get("properties", {}).get(prop_name)
|
|
if not prop:
|
|
return None
|
|
|
|
if prop_type == "rich_text" and prop.get("type") == "rich_text":
|
|
text_parts = prop.get("rich_text", [])
|
|
if text_parts:
|
|
return text_parts[0].get("plain_text")
|
|
|
|
# Hier könnten weitere Typen wie 'select', 'number' etc. behandelt werden
|
|
return None
|
|
|
|
def get_page_number_property(page: Dict, prop_name: str) -> Optional[float]:
|
|
"""Extrahiert den Wert einer 'number'-Eigenschaft von einer Seite."""
|
|
prop = page.get("properties", {}).get(prop_name)
|
|
if not prop or prop.get("type") != "number":
|
|
return None
|
|
return prop.get("number")
|
|
|
|
def decimal_hours_to_hhmm(decimal_hours: float) -> str:
|
|
"""Wandelt Dezimalstunden in das Format 'HH:MM' um."""
|
|
if decimal_hours is None:
|
|
return "00:00"
|
|
hours = int(decimal_hours)
|
|
minutes = int((decimal_hours * 60) % 60)
|
|
return f"{hours:02d}:{minutes:02d}"
|
|
|
|
|
|
def get_page_content(token: str, page_id: str) -> str:
|
|
"""Ruft den gesamten Textinhalt einer Notion-Seite ab, indem es die Blöcke zusammenfügt, mit Paginierung."""
|
|
url = f"https://api.notion.com/v1/blocks/{page_id}/children"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
full_text = []
|
|
next_cursor = None
|
|
has_more = True
|
|
|
|
try:
|
|
while has_more:
|
|
params = {"page_size": 100} # Max page size
|
|
if next_cursor:
|
|
params["start_cursor"] = next_cursor
|
|
|
|
response = requests.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
blocks = data.get("results", [])
|
|
|
|
for block in blocks:
|
|
block_type = block["type"]
|
|
text_content = ""
|
|
|
|
if block_type in ["paragraph", "heading_1", "heading_2", "heading_3",
|
|
"bulleted_list_item", "numbered_list_item", "to_do", "callout"]:
|
|
rich_text_array = block[block_type].get("rich_text", [])
|
|
for rich_text in rich_text_array:
|
|
text_content += rich_text.get("plain_text", "")
|
|
elif block_type == "code":
|
|
rich_text_array = block["code"].get("rich_text", [])
|
|
for rich_text in rich_text_array:
|
|
text_content += rich_text.get("plain_text", "")
|
|
text_content = f"```\n{text_content}\n```" # Markdown für Codeblöcke
|
|
elif block_type == "unsupported":
|
|
text_content = "[Unsupported Block Type]"
|
|
|
|
if text_content:
|
|
# Füge grundlegende Formatierung für bessere Lesbarkeit hinzu
|
|
if block_type == "heading_1":
|
|
full_text.append(f"# {text_content}")
|
|
elif block_type == "heading_2":
|
|
full_text.append(f"## {text_content}")
|
|
elif block_type == "heading_3":
|
|
full_text.append(f"### {text_content}")
|
|
elif block_type == "bulleted_list_item":
|
|
full_text.append(f"- {text_content}")
|
|
elif block_type == "numbered_list_item":
|
|
full_text.append(f"1. {text_content}") # Einfache Nummerierung
|
|
elif block_type == "to_do":
|
|
checked = "[x]" if block["to_do"].get("checked") else "[ ]"
|
|
full_text.append(f"{checked} {text_content}")
|
|
elif block_type == "callout":
|
|
# Extrahiere Icon und Text
|
|
icon = block["callout"].get("icon", {}).get("emoji", "")
|
|
full_text.append(f"> {icon} {text_content}")
|
|
else: # paragraph und andere Standardtexte
|
|
full_text.append(text_content)
|
|
|
|
next_cursor = data.get("next_cursor")
|
|
has_more = data.get("has_more", False) and next_cursor
|
|
|
|
return "\n".join(full_text)
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler beim Abrufen des Seiteninhalts für Page-ID {page_id}: {e}")
|
|
if e.response is not None:
|
|
try:
|
|
if e.response: # Wenn eine Antwort vorhanden ist
|
|
error_details = e.response.json()
|
|
print(f"Notion API Fehlerdetails: {json.dumps(error_details, indent=2)}")
|
|
except json.JSONDecodeError:
|
|
print(f"Notion API Rohantwort: {e.response.text}")
|
|
return ""
|
|
|
|
|
|
|
|
def get_database_status_options(token: str, db_id: str) -> List[str]:
|
|
"""Ruft die verfügbaren Status-Optionen für eine Datenbank-Eigenschaft ab."""
|
|
url = f"https://api.notion.com/v1/databases/{db_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
properties = response.json().get("properties", {})
|
|
status_property = properties.get("Status")
|
|
if status_property and status_property["type"] == "status":
|
|
return [option["name"] for option in status_property["status"]["options"]]
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Fehler beim Abrufen der Datenbank-Eigenschaften: {e}")
|
|
return []
|
|
|
|
def update_notion_task_property(token: str, task_id: str, payload: Dict) -> bool:
|
|
"""Aktualisiert eine beliebige Eigenschaft eines Notion-Tasks."""
|
|
print(f"\n--- Aktualisiere Eigenschaft von Task '{task_id}'... ---")
|
|
url = f"https://api.notion.com/v1/pages/{task_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
|
|
update_payload = {"properties": payload}
|
|
|
|
try:
|
|
response = requests.patch(url, headers=headers, json=update_payload)
|
|
response.raise_for_status()
|
|
print(f"✅ Task-Eigenschaft erfolgreich aktualisiert.")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Aktualisieren der Task-Eigenschaft: {e}")
|
|
if e.response is not None:
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return False
|
|
|
|
def create_new_notion_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]:
|
|
"""Erstellt einen neuen Task in Notion und verknüpft ihn mit dem Projekt."""
|
|
print("\n--- Neuen Task erstellen ---")
|
|
task_title = input("Bitte gib den Titel für den neuen Task ein: ")
|
|
if not task_title:
|
|
print("Kein Titel angegeben. Abbruch.")
|
|
return None
|
|
|
|
status_options = get_database_status_options(token, tasks_db_id)
|
|
if not status_options:
|
|
print("Fehler: Konnte keine Status-Optionen für die Task-Datenbank finden.")
|
|
return None
|
|
|
|
initial_status = status_options[0] # Nimm den ersten verfügbaren Status
|
|
|
|
url = "https://api.notion.com/v1/pages"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"parent": {"database_id": tasks_db_id},
|
|
"properties": {
|
|
"Name": {
|
|
"title": [{"text": {"content": task_title}}]
|
|
},
|
|
"Project": {
|
|
"relation": [{"id": project_id}]
|
|
},
|
|
"Status": {
|
|
"status": {"name": initial_status}
|
|
}
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
new_task = response.json()
|
|
print(f"✅ Task '{task_title}' erfolgreich erstellt.")
|
|
return new_task
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Erstellen des Tasks: {e}")
|
|
if e.response is not None:
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return None
|
|
|
|
def add_comment_to_notion_task(token: str, task_id: str, comment: str) -> bool:
|
|
"""Fügt einen Kommentar zu einer Notion-Seite (Task) hinzu."""
|
|
url = "https://api.notion.com/v1/comments"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {
|
|
"parent": {"page_id": task_id},
|
|
"rich_text": [{
|
|
"text": {
|
|
"content": comment
|
|
}
|
|
}]
|
|
}
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
print(f"✅ Kommentar erfolgreich zum Notion-Task hinzugefügt.")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Hinzufügen des Kommentars zum Notion-Task: {e}")
|
|
if e.response is not None:
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return False
|
|
|
|
def append_blocks_to_notion_page(token: str, page_id: str, blocks: List[Dict]) -> bool:
|
|
"""Hängt Inhaltsblöcke an eine Notion-Seite an."""
|
|
url = f"https://api.notion.com/v1/blocks/{page_id}/children"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
payload = {"children": blocks}
|
|
try:
|
|
response = requests.patch(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
print(f"✅ Statusbericht erfolgreich an die Notion-Task-Seite angehängt.")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Anhängen des Statusberichts an die Notion-Seite: {e}")
|
|
if e.response is not None:
|
|
try:
|
|
print(f"Antwort des Servers: {json.dumps(e.response.json(), indent=2)}")
|
|
except:
|
|
print(f"Antwort des Servers: {e.response.text}")
|
|
return False
|
|
|
|
# --- Session Management ---
|
|
|
|
SESSION_DIR = ".dev_session"
|
|
SESSION_FILE_PATH = os.path.join(SESSION_DIR, "SESSION_INFO")
|
|
|
|
def save_session_info(task_id: str, token: str):
|
|
"""Speichert die Task-ID und den Token für den Git-Hook."""
|
|
os.makedirs(SESSION_DIR, exist_ok=True)
|
|
session_data = {
|
|
"task_id": task_id,
|
|
"token": token
|
|
}
|
|
with open(SESSION_FILE_PATH, "w") as f:
|
|
json.dump(session_data, f)
|
|
|
|
def install_git_hook():
|
|
"""Installiert das notion_commit_hook.py Skript als post-commit Git-Hook."""
|
|
git_hooks_dir = os.path.join(".git", "hooks")
|
|
post_commit_hook_path = os.path.join(git_hooks_dir, "post-commit")
|
|
source_hook_script = "notion_commit_hook.py"
|
|
|
|
if not os.path.exists(git_hooks_dir):
|
|
# Wahrscheinlich kein Git-Repository, also nichts tun
|
|
return
|
|
|
|
if not os.path.exists(source_hook_script):
|
|
print(f"Warnung: Hook-Skript {source_hook_script} nicht gefunden. Hook wird nicht installiert.")
|
|
return
|
|
|
|
try:
|
|
# Lese den Inhalt des Quellskripts
|
|
with open(source_hook_script, "r") as f:
|
|
hook_content = f.read()
|
|
|
|
# Schreibe den Inhalt in den post-commit Hook
|
|
with open(post_commit_hook_path, "w") as f:
|
|
f.write(hook_content)
|
|
|
|
# Mache den Hook ausführbar
|
|
os.chmod(post_commit_hook_path, 0o755)
|
|
print("✅ Git-Hook für Notion-Kommentare erfolgreich installiert.")
|
|
|
|
except (IOError, OSError) as e:
|
|
print(f"❌ FEHLER beim Installieren des Git-Hooks: {e}")
|
|
|
|
# --- UI Functions ---
|
|
|
|
def select_project(token: str) -> Optional[Tuple[Dict, Optional[str]]]:
|
|
"""Zeigt eine Liste der Projekte an und lässt den Benutzer eines auswählen."""
|
|
print("--- Lade Projekte aus Notion... ---")
|
|
|
|
projects_db_id = find_database_by_title(token, "Projects [UT]")
|
|
if not projects_db_id:
|
|
return None, None
|
|
|
|
projects = query_notion_database(token, projects_db_id)
|
|
if not projects:
|
|
print("Keine Projekte in der Datenbank gefunden.")
|
|
return None, None
|
|
|
|
print("\nAn welchem Projekt möchtest du arbeiten?")
|
|
for i, project in enumerate(projects):
|
|
print(f"[{i+1}] {get_page_title(project)}")
|
|
|
|
while True:
|
|
try:
|
|
choice = int(input("Bitte wähle eine Nummer: "))
|
|
if 1 <= choice <= len(projects):
|
|
selected_project = projects[choice - 1]
|
|
readme_path = get_page_property(selected_project, "Readme Path")
|
|
return selected_project, readme_path
|
|
else:
|
|
print("Ungültige Auswahl.")
|
|
except ValueError:
|
|
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
|
|
|
|
def select_task(token: str, project_id: str, tasks_db_id: str) -> Optional[Dict]:
|
|
"""Zeigt eine Liste der Tasks für ein Projekt an und lässt den Benutzer auswählen."""
|
|
print("\n--- Lade Tasks für das ausgewählte Projekt... ---")
|
|
|
|
filter_payload = {
|
|
"property": "Project",
|
|
"relation": {"contains": project_id}
|
|
}
|
|
tasks = query_notion_database(token, tasks_db_id, filter_payload=filter_payload)
|
|
|
|
if not tasks:
|
|
print("Keine offenen Tasks für dieses Projekt gefunden.")
|
|
else:
|
|
print("\nWelchen Task möchtest du bearbeiten?")
|
|
for i, task in enumerate(tasks):
|
|
print(f"[{i+1}] {get_page_title(task)}")
|
|
|
|
print(f"[{len(tasks)+1}] Neuen Task für dieses Projekt erstellen")
|
|
|
|
while True:
|
|
try:
|
|
choice = int(input("Bitte wähle eine Nummer: "))
|
|
if 1 <= choice <= len(tasks):
|
|
return tasks[choice - 1]
|
|
elif choice == len(tasks) + 1:
|
|
return {"id": "new_task"} # Signal
|
|
else:
|
|
print("Ungültige Auswahl.")
|
|
except ValueError:
|
|
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
|
|
|
|
import subprocess
|
|
from datetime import datetime
|
|
|
|
# --- Git Summary Generation ---
|
|
|
|
def generate_git_summary() -> Tuple[str, str]:
|
|
"""Generiert eine Zusammenfassung der Git-Änderungen und Commit-Nachrichten seit dem letzten Push zum Main-Branch."""
|
|
try:
|
|
# Finde den aktuellen Main-Branch Namen (master oder main)
|
|
try:
|
|
main_branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).decode("utf-8").strip()
|
|
if main_branch not in ["main", "master"]:
|
|
# Versuche, den Remote-Tracking-Branch für main/master zu finden
|
|
result = subprocess.run(["git", "branch", "-r"], capture_output=True, text=True)
|
|
if "origin/main" in result.stdout:
|
|
main_branch = "origin/main"
|
|
elif "origin/master" in result.stdout:
|
|
main_branch = "origin/master"
|
|
else:
|
|
print("Warnung: Konnte keinen 'main' oder 'master' Branch finden. Git-Zusammenfassung wird möglicherweise unvollständig sein.")
|
|
main_branch = "HEAD~1" # Fallback zum letzten Commit, falls kein Main-Branch gefunden wird
|
|
except subprocess.CalledProcessError:
|
|
main_branch = "HEAD~1" # Fallback, falls gar kein Branch gefunden wird
|
|
|
|
# Git diff --stat
|
|
diff_stat_cmd = ["git", "diff", "--stat", f"{main_branch}...HEAD"]
|
|
diff_stat = subprocess.check_output(diff_stat_cmd).decode("utf-8").strip()
|
|
|
|
# Git log --pretty
|
|
commit_log_cmd = ["git", "log", "--pretty=format:- %s", f"{main_branch}...HEAD"]
|
|
commit_messages = subprocess.check_output(commit_log_cmd).decode("utf-8").strip()
|
|
|
|
return diff_stat, commit_messages
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ FEHLER beim Generieren der Git-Zusammenfassung: {e}")
|
|
return "", ""
|
|
|
|
# --- Report Status to Notion ---
|
|
|
|
def report_status_to_notion(
|
|
status_override: Optional[str],
|
|
todos_override: Optional[str],
|
|
git_changes_override: Optional[str],
|
|
commit_messages_override: Optional[str],
|
|
summary_override: Optional[str]
|
|
) -> None:
|
|
"""
|
|
Erstellt einen Statusbericht für den Notion-Task, entweder interaktiv oder mit überschriebenen Werten.
|
|
"""
|
|
if not os.path.exists(SESSION_FILE_PATH):
|
|
print("❌ FEHLER: Keine aktive Session gefunden. Kann keinen Statusbericht erstellen.")
|
|
return
|
|
|
|
try:
|
|
with open(SESSION_FILE_PATH, "r") as f:
|
|
session_data = json.load(f)
|
|
task_id = session_data.get("task_id")
|
|
token = session_data.get("token")
|
|
|
|
if not (task_id and token):
|
|
print("❌ FEHLER: Session-Daten unvollständig. Kann keinen Statusbericht erstellen.")
|
|
return
|
|
|
|
# Time tracking logic
|
|
session_start_time_str = session_data.get("session_start_time")
|
|
if session_start_time_str:
|
|
session_start_time = datetime.fromisoformat(session_start_time_str)
|
|
elapsed_time = datetime.now() - session_start_time
|
|
elapsed_hours = elapsed_time.total_seconds() / 3600
|
|
|
|
# Get current task page to read existing duration
|
|
# Note: This is a simplified way. A more robust solution might query the DB
|
|
# to get the page object without a separate API call if we already have it.
|
|
# For now, a direct API call is clear and ensures we have the latest data.
|
|
task_page_url = f"https://api.notion.com/v1/pages/{task_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Notion-Version": "2022-06-28"
|
|
}
|
|
try:
|
|
page_response = requests.get(task_page_url, headers=headers)
|
|
page_response.raise_for_status()
|
|
task_page = page_response.json()
|
|
|
|
current_duration = get_page_number_property(task_page, "Total Duration (h)") or 0.0
|
|
new_total_duration = current_duration + elapsed_hours
|
|
|
|
duration_payload = {
|
|
"Total Duration (h)": {
|
|
"number": new_total_duration
|
|
}
|
|
}
|
|
update_notion_task_property(token, task_id, duration_payload)
|
|
print(f"✅ Zeiterfassung: {elapsed_hours:.2f} Stunden zum Task hinzugefügt. Neue Gesamtdauer: {new_total_duration:.2f} Stunden.")
|
|
|
|
# Reset session start time for the next interval
|
|
save_session_info(task_id, token)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ FEHLER beim Abrufen der Task-Details für die Zeiterfassung: {e}")
|
|
|
|
print(f"--- Erstelle Statusbericht für Task {task_id} ---")
|
|
|
|
# Git-Zusammenfassung generieren (immer, wenn nicht explizit überschrieben)
|
|
actual_git_changes = git_changes_override
|
|
actual_commit_messages = commit_messages_override
|
|
if not git_changes_override or not commit_messages_override:
|
|
print("Generiere Git-Zusammenfassung...")
|
|
diff_stat, commit_log = generate_git_summary()
|
|
if not git_changes_override:
|
|
actual_git_changes = diff_stat
|
|
if not commit_messages_override:
|
|
actual_commit_messages = commit_log
|
|
|
|
# Status abfragen oder übernehmen
|
|
actual_status = status_override
|
|
if not actual_status:
|
|
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
|
|
if tasks_db_id:
|
|
status_options = get_database_status_options(token, tasks_db_id)
|
|
if status_options:
|
|
print("\nBitte wähle den neuen Status des Tasks:")
|
|
for i, option in enumerate(status_options):
|
|
print(f"[{i+1}] {option}")
|
|
while True:
|
|
try:
|
|
choice = int(input("Wähle eine Nummer: "))
|
|
if 1 <= choice <= len(status_options):
|
|
actual_status = status_options[choice - 1]
|
|
break
|
|
else:
|
|
print("Ungültige Auswahl.")
|
|
except ValueError:
|
|
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
|
|
else:
|
|
print("Warnung: Konnte Status-Optionen nicht abrufen. Bitte Status manuell eingeben.")
|
|
actual_status = input("Bitte gib den neuen Status manuell ein: ")
|
|
|
|
if not actual_status:
|
|
print("❌ FEHLER: Kein Status festgelegt. Abbruch des Berichts.")
|
|
return
|
|
|
|
# Detaillierte Zusammenfassung abfragen oder übernehmen
|
|
actual_summary = summary_override
|
|
if not actual_summary:
|
|
print("\nBitte gib eine Zusammenfassung der Arbeit ein (was wurde getan, Ergebnisse, Probleme etc.).")
|
|
user_summary_lines = []
|
|
while True:
|
|
line = input()
|
|
if not line:
|
|
break
|
|
user_summary_lines.append(line)
|
|
actual_summary = "\n".join(user_summary_lines)
|
|
|
|
# To-Dos abfragen oder übernehmen
|
|
actual_todos = todos_override
|
|
if not actual_todos:
|
|
user_todos = input("\nGibt es offene To-Dos oder nächste Schritte? (Leer lassen zum Überspringen): ")
|
|
actual_todos = user_todos.strip()
|
|
|
|
# Kommentar zusammenstellen
|
|
report_lines = []
|
|
# Diese Zeilen werden jetzt innerhalb des Code-Blocks formatiert
|
|
|
|
# Add invested time to the report if available
|
|
if 'elapsed_hours' in locals():
|
|
elapsed_hhmm = decimal_hours_to_hhmm(elapsed_hours)
|
|
report_lines.append(f"Investierte Zeit in dieser Session: {elapsed_hhmm}")
|
|
|
|
report_lines.append(f"Neuer Status: {actual_status}")
|
|
|
|
if actual_summary:
|
|
report_lines.append("\nArbeitszusammenfassung:")
|
|
report_lines.append(actual_summary)
|
|
|
|
if actual_git_changes or actual_commit_messages:
|
|
report_lines.append("\nTechnische Änderungen (Git):")
|
|
if actual_git_changes:
|
|
report_lines.append(f"```{actual_git_changes}```")
|
|
if actual_commit_messages:
|
|
report_lines.append("\nCommit Nachrichten:")
|
|
report_lines.append(f"```{actual_commit_messages}```")
|
|
|
|
if actual_todos:
|
|
report_lines.append("\nOffene To-Dos / Nächste Schritte:")
|
|
for todo_item in actual_todos.split('\n'):
|
|
report_lines.append(f"- {todo_item.strip()}")
|
|
|
|
report_content = "\n".join(report_lines)
|
|
|
|
# Notion Blöcke für die API erstellen
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M')
|
|
notion_blocks = [
|
|
{
|
|
"object": "block",
|
|
"type": "heading_2",
|
|
"heading_2": {
|
|
"rich_text": [{"type": "text", "text": {"content": f"🤖 Status-Update ({timestamp})"}}]
|
|
}
|
|
},
|
|
{
|
|
"object": "block",
|
|
"type": "code",
|
|
"code": {
|
|
"rich_text": [{"type": "text", "text": {"content": report_content}}],
|
|
"language": "yaml"
|
|
}
|
|
}
|
|
]
|
|
|
|
# Notion aktualisieren
|
|
append_blocks_to_notion_page(token, task_id, notion_blocks)
|
|
status_payload = {"Status": {"status": {"name": actual_status}}}
|
|
update_notion_task_property(token, task_id, status_payload)
|
|
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
print(f"❌ FEHLER beim Lesen der Session-Informationen für Statusbericht: {e}")
|
|
except Exception as e:
|
|
print(f"❌ Unerwarteter Fehler beim Erstellen des Statusberichts: {e}")
|
|
|
|
|
|
# --- Context Generation ---
|
|
|
|
|
|
def generate_cli_context(project_title: str, task_title: str, task_id: str, readme_path: Optional[str], task_description: Optional[str], total_duration: float) -> str:
|
|
"""Erstellt den reinen Kontext-String für die Gemini CLI."""
|
|
|
|
# Fallback, falls kein Pfad in Notion gesetzt ist
|
|
if not readme_path:
|
|
readme_path = "readme.md"
|
|
|
|
description_part = ""
|
|
if task_description:
|
|
description_part = (
|
|
f"\n**Aufgabenbeschreibung:**\n"
|
|
f"```\n{task_description}\n```\n"
|
|
)
|
|
|
|
duration_hhmm = decimal_hours_to_hhmm(total_duration)
|
|
duration_part = f"Bisher erfasster Zeitaufwand (Notion): {duration_hhmm} Stunden.\n"
|
|
|
|
context = (
|
|
f"Ich arbeite jetzt am Projekt '{project_title}'. Der Fokus liegt auf dem Task '{task_title}'.\n"
|
|
f"{description_part}\n"
|
|
f"{duration_part}"
|
|
"Die relevanten Dateien für dieses Projekt sind wahrscheinlich:\n"
|
|
"- Die primäre Projektdokumentation: @readme.md\n"
|
|
f"- Die spezifische Dokumentation für dieses Modul: @{readme_path}\n\n"
|
|
f"Mein Ziel ist es, den Task '{task_title}' umzusetzen. Alle Commits für diesen Task sollen die Kennung `[{task_id.split('-')[0]}]` enthalten.\n\n"
|
|
"**WICHTIGER BEFEHL:** Bevor du mit der Implementierung oder einer Code-Änderung beginnst, fasse die Aufgabe in deinen eigenen Worten zusammen, erstelle einen detaillierten, schrittweisen Plan zur Lösung und **warte auf meine explizite Bestätigung**, bevor du den ersten Schritt ausführst."
|
|
)
|
|
return context
|
|
|
|
# --- CLI Execution ---
|
|
|
|
# Die start_gemini_cli Funktion wird entfernt, da das aufrufende Skript jetzt die Gemini CLI startet.
|
|
|
|
def save_session_info(task_id: str, token: str):
|
|
"""Speichert die Task-ID, den Token und den Startzeitpunkt für den Git-Hook."""
|
|
os.makedirs(SESSION_DIR, exist_ok=True)
|
|
session_data = {
|
|
"task_id": task_id,
|
|
"token": token,
|
|
"session_start_time": datetime.now().isoformat()
|
|
}
|
|
with open(SESSION_FILE_PATH, "w") as f:
|
|
json.dump(session_data, f)
|
|
|
|
def install_git_hook():
|
|
"""Installiert das notion_commit_hook.py Skript als post-commit Git-Hook."""
|
|
git_hooks_dir = os.path.join(".git", "hooks")
|
|
post_commit_hook_path = os.path.join(git_hooks_dir, "post-commit")
|
|
source_hook_script = "notion_commit_hook.py"
|
|
|
|
if not os.path.exists(git_hooks_dir):
|
|
# Wahrscheinlich kein Git-Repository, also nichts tun
|
|
return
|
|
|
|
if not os.path.exists(source_hook_script):
|
|
print(f"Warnung: Hook-Skript {source_hook_script} nicht gefunden. Hook wird nicht installiert.")
|
|
return
|
|
|
|
try:
|
|
# Kopiere das Skript und mache es ausführbar
|
|
shutil.copy(source_hook_script, post_commit_hook_path)
|
|
os.chmod(post_commit_hook_path, 0o755)
|
|
print("✅ Git-Hook für Notion-Kommentare erfolgreich installiert.")
|
|
|
|
except (IOError, OSError) as e:
|
|
print(f"❌ FEHLER beim Installieren des Git-Hooks: {e}")
|
|
|
|
def cleanup_session():
|
|
"""Bereinigt die Session-Datei und den Git-Hook."""
|
|
if os.path.exists(SESSION_FILE_PATH):
|
|
os.remove(SESSION_FILE_PATH)
|
|
|
|
post_commit_hook_path = os.path.join(".git", "hooks", "post-commit")
|
|
if os.path.exists(post_commit_hook_path):
|
|
os.remove(post_commit_hook_path)
|
|
|
|
print("🧹 Session-Daten und Git-Hook wurden bereinigt.")
|
|
|
|
def complete_session():
|
|
"""Schließt eine aktive Session ab."""
|
|
print("Schließe aktive Entwicklungs-Session ab...")
|
|
if not os.path.exists(SESSION_FILE_PATH):
|
|
print("Keine aktive Session gefunden.")
|
|
return
|
|
|
|
try:
|
|
with open(SESSION_FILE_PATH, "r") as f:
|
|
session_data = json.load(f)
|
|
task_id = session_data.get("task_id")
|
|
token = session_data.get("token")
|
|
|
|
if task_id and token:
|
|
# Annahme: Der letzte Status in der Liste ist der "Done"-Status
|
|
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
|
|
if tasks_db_id:
|
|
status_options = get_database_status_options(token, tasks_db_id)
|
|
if status_options:
|
|
done_status = status_options[-1]
|
|
status_payload = {"Status": {"status": {"name": done_status}}}
|
|
update_notion_task_property(token, task_id, status_payload)
|
|
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
print("Fehler beim Lesen der Session-Informationen.")
|
|
|
|
finally:
|
|
cleanup_session()
|
|
|
|
user_input = input("\nMöchtest du eine neue Session starten? (j/n): ").lower()
|
|
if user_input == 'j':
|
|
main() # Starte den interaktiven Prozess von vorne
|
|
else:
|
|
print("Auf Wiedersehen!")
|
|
|
|
def start_interactive_session():
|
|
"""Startet den Prozess zur Auswahl von Projekt/Task und startet Gemini."""
|
|
print("Starte interaktive Entwicklungs-Session...")
|
|
|
|
token = os.environ.get('NOTION_API_KEY')
|
|
if not token:
|
|
token = getpass("Bitte gib deinen Notion API Key ein (Eingabe wird nicht angezeigt): ")
|
|
if not token:
|
|
print("Kein Token angegeben. Abbruch.")
|
|
return
|
|
|
|
selected_project, readme_path = select_project(token)
|
|
if not selected_project:
|
|
return
|
|
|
|
project_title = get_page_title(selected_project)
|
|
print(f"\nProjekt '{project_title}' ausgewählt.")
|
|
|
|
tasks_db_id = find_database_by_title(token, "Tasks [UT]")
|
|
if not tasks_db_id:
|
|
return
|
|
|
|
user_choice = select_task(token, selected_project["id"], tasks_db_id)
|
|
|
|
if not user_choice:
|
|
print("Kein Task ausgewählt. Abbruch.")
|
|
return
|
|
|
|
selected_task = None
|
|
if user_choice.get("id") == "new_task":
|
|
selected_task = create_new_notion_task(token, selected_project["id"], tasks_db_id)
|
|
if not selected_task:
|
|
return
|
|
else:
|
|
selected_task = user_choice
|
|
|
|
task_title = get_page_title(selected_task)
|
|
task_id = selected_task["id"]
|
|
print(f"\nTask '{task_title}' ausgewählt.")
|
|
|
|
# NEU: Lade die Task-Beschreibung und die bisherige Dauer
|
|
task_description = get_page_content(token, task_id)
|
|
total_duration_decimal = get_page_number_property(selected_task, "Total Duration (h)") or 0.0
|
|
total_duration_hhmm = decimal_hours_to_hhmm(total_duration_decimal)
|
|
print(f"> Bisher für diesen Task erfasst: {total_duration_hhmm} Stunden.")
|
|
|
|
# Session-Informationen für den Git-Hook speichern
|
|
save_session_info(task_id, token)
|
|
|
|
# Git-Hook installieren, der die Session-Infos nutzt
|
|
install_git_hook()
|
|
|
|
title_slug = re.sub(r'[^a-z0-9\s-]', '', task_title.lower())
|
|
title_slug = re.sub(r'\s+', '-', title_slug)
|
|
title_slug = re.sub(r'-+', '-', title_slug).strip('-')
|
|
|
|
suggested_branch_name = f"feature/task-{task_id.split('-')[0]}-{title_slug}"
|
|
|
|
status_payload = {"Status": {"status": {"name": "Doing"}}}
|
|
status_updated = update_notion_task_property(token, task_id, status_payload)
|
|
if not status_updated:
|
|
print("Warnung: Notion-Task-Status konnte nicht aktualisiert werden.")
|
|
|
|
# Finale Setup-Informationen ausgeben
|
|
print("\n------------------------------------------------------------------")
|
|
print("✅ Setup abgeschlossen!")
|
|
print(f"\nDer Notion-Task '{task_title}' wurde auf 'Doing' gesetzt.")
|
|
print(f"\nBitte erstellen Sie jetzt manuell den Git-Branch in einem separaten Terminal:")
|
|
print(f"git checkout -b {suggested_branch_name}")
|
|
print("------------------------------------------------------------------")
|
|
|
|
# CLI-Kontext generieren und an stdout ausgeben, damit das Startskript ihn aufgreifen kann
|
|
cli_context = generate_cli_context(project_title, task_title, task_id, readme_path, task_description, total_duration_decimal)
|
|
print("\n---GEMINI_CLI_CONTEXT_START---")
|
|
print(cli_context)
|
|
print("---GEMINI_CLI_CONTEXT_END---")
|
|
# Das Skript beenden, damit das aufrufende Shell-Skript die Gemini CLI starten kann
|
|
exit(0)
|
|
|
|
|
|
# --- Main Execution ---
|
|
|
|
def main():
|
|
"""Hauptfunktion des Skripts."""
|
|
parser = argparse.ArgumentParser(description="Interaktiver Session-Manager für die Gemini-Entwicklung mit Notion-Integration.")
|
|
parser.add_argument("--done", action="store_true", help="Schließt die aktuelle Entwicklungs-Session ab.")
|
|
parser.add_argument("--add-comment", type=str, help="Fügt einen Kommentar zum aktuellen Notion-Task hinzu.")
|
|
parser.add_argument("--report-status", action="store_true", help="Erstellt einen Statusbericht für den Notion-Task.")
|
|
parser.add_argument("--status", type=str, help="Status, der im Notion-Task gesetzt werden soll (z.B. 'In Bearbeitung', 'Bereit für Review').")
|
|
parser.add_argument("--todos", type=str, help="Eine durch '\n' getrennte Liste offener To-Dos.")
|
|
parser.add_argument("--git-changes", type=str, help="Zusammenfassung der Git-Änderungen (git diff --stat).")
|
|
parser.add_argument("--commit-messages", type=str, help="Eine durch '\n' getrennte Liste der Commit-Nachrichten.")
|
|
parser.add_argument("--summary", type=str, help="Eine detaillierte textuelle Zusammenfassung der erledigten Arbeit.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.done:
|
|
complete_session()
|
|
elif args.add_comment:
|
|
if not os.path.exists(SESSION_FILE_PATH):
|
|
print("❌ FEHLER: Keine aktive Session gefunden. Kann keinen Kommentar hinzufügen.")
|
|
return
|
|
try:
|
|
with open(SESSION_FILE_PATH, "r") as f:
|
|
session_data = json.load(f)
|
|
task_id = session_data.get("task_id")
|
|
token = session_data.get("token")
|
|
|
|
if task_id and token:
|
|
add_comment_to_notion_task(token, task_id, args.add_comment)
|
|
else:
|
|
print("❌ FEHLER: Session-Daten unvollständig. Kann keinen Kommentar hinzufügen.")
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
print("❌ FEHLER: Fehler beim Lesen der Session-Informationen. Kann keinen Kommentar hinzufügen.")
|
|
elif args.report_status:
|
|
report_status_to_notion(
|
|
status_override=args.status,
|
|
todos_override=args.todos,
|
|
git_changes_override=args.git_changes,
|
|
commit_messages_override=args.commit_messages,
|
|
summary_override=args.summary
|
|
)
|
|
else:
|
|
start_interactive_session()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |