diff --git a/.gitignore b/.gitignore index 4882a7c..a18440c 100644 --- a/.gitignore +++ b/.gitignore @@ -76,6 +76,17 @@ Plugins/**/Intermediate/* # Cache files for the editor to use DerivedDataCache/* +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.pyc +*.pyo +*.pyd +.pytest_cache/ + # Folders and files used for local plugin development workflows/pipelines .env /.aider.tags.cache.v3 @@ -84,5 +95,13 @@ DerivedDataCache/* /Config /Development .aider* -CLAUDE.md +# Agent instructions (CLAUDE.md is local-only, AGENTS.md is tracked) +**/CLAUDE.md /.claude +/.idea +.mcp.json +/build.sh +*.ps1 +/BuildLogs +/Documentation +nul diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..2c633c7 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,93 @@ +# AGENTS.md + +This file provides guidance to AI coding agents when working with code in this repository. + +# NodeToCode Unreal Engine 5 Plugin + +# CRITICAL - START + +## Understanding NodeToCode's MCP Server Tools + +@Source/Private/MCP/Tools/AGENTS.md + +## Unreal Engine Source Path + +Run `Content/Scripts/detect-ue-path.sh` (Mac/Linux) or `Content\Scripts\detect-ue-path.bat` (Windows) to detect your UE installation and update this file. + +UE_SOURCE_PATH: + +This is where source files for Unreal Engine 5 can be found and searched through for research purposes. NEVER try to modify files here. + +## TESTING COMPILATION + +The build scripts support UE 5.4, 5.5, 5.6, and 5.7 with auto-detection of installations. + +**Windows (build.ps1):** +```powershell +.\build.ps1 -UEVersion 5.5 # Build for specific version +.\build.ps1 -All # Build all detected versions +.\build.ps1 -WhatIf # Preview detected UE installations +``` + +**Mac/Linux (build.sh):** +```bash +./build.sh -v 5.5 # Build for specific version +./build.sh --all # Build all detected versions +./build.sh --whatif # Preview detected UE installations +``` + +Logs are saved to `BuildLogs/Build__.log`. Running without arguments shows help and detected UE versions. + +# CRITICAL - END + +## Overview + +NodeToCode is an Unreal Engine 5 plugin for translating Blueprints into various programming languages (C++, Python, JavaScript, C#, Swift, Pseudocode) using Large Language Models (LLMs). It integrates into the Unreal Editor, providing tools to collect Blueprint nodes, manage LLM interactions, and display generated code with syntax highlighting. + +**Key Features:** +* **Blueprint-to-Code Translation:** Converts selected Blueprint graphs/entire Blueprints. +* **Multi-LLM Support:** Integrates with OpenAI, Anthropic, Google Gemini, DeepSeek, local Ollama instances, and LM Studio. +* **Editor Integration:** Adds a "Node to Code" toolbar button in the Blueprint Editor for translation, JSON export, and a dedicated plugin window. +* **Interactive Code Editor:** Displays generated code with syntax highlighting and theme customization. +* **Configuration:** Plugin settings for LLM provider selection, API keys (stored securely), target language, and logging. +* **Reasoning Model Support:** Prepended model commands for controlling reasoning models and feature toggles. + +## Context Navigation + +For detailed information on specific areas of the codebase, navigate to the relevant context file: + +``` +Context/ +├── architecture/ +│ └── AGENTS-architecture.md → Core plugin setup, editor integration, settings +│ +├── translation-pipeline/ +│ └── AGENTS-translation-pipeline.md → Blueprint node → code conversion, batch translation +│ +├── llm-providers/ +│ └── AGENTS-llm-providers.md → LLM services, adding/updating providers +│ +├── mcp-server/ +│ └── AGENTS-mcp-server.md → MCP server implementation, tools, resources +│ +├── python-scripting/ +│ └── AGENTS-python-scripting.md → Python bridge, script management system +│ +├── code-editor/ +│ └── AGENTS-code-editor.md → Syntax highlighting, themes, code display +│ +├── development-workflow/ +│ └── AGENTS-development-workflow.md → Git submodules, IDE setup, build process +``` + +### Quick Reference + +| Task | Context File | +|------|--------------| +| Plugin initialization, settings | `Context/architecture/` | +| Node collection, translation | `Context/translation-pipeline/` | +| Adding LLM providers/models | `Context/llm-providers/` | +| MCP tools development | `Context/mcp-server/` + `@Source/Private/MCP/Tools/AGENTS.md` | +| Python automation | `Context/python-scripting/` | +| Code editor styling | `Context/code-editor/` | +| Git workflow, IDE config | `Context/development-workflow/` | diff --git a/Content/Python/mcp_bridge/launch_bridge.bat b/Content/Python/mcp_bridge/launch_bridge.bat new file mode 100644 index 0000000..a9bb216 --- /dev/null +++ b/Content/Python/mcp_bridge/launch_bridge.bat @@ -0,0 +1,126 @@ +@echo off +REM NodeToCode MCP Bridge Launcher (Windows) +REM Finds Python using the same methods as UnrealVersionSelector: +REM 1. Windows Registry (Epic Games Launcher installations) +REM 2. Windows Registry (Custom/source builds) +REM 3. Common installation paths as fallback +REM 4. System Python as final fallback + +setlocal EnableDelayedExpansion + +REM Get the directory where this script is located +set "SCRIPT_DIR=%~dp0" +set "BRIDGE_SCRIPT=%SCRIPT_DIR%nodetocode_bridge.py" + +REM Check if bridge script exists +if not exist "%BRIDGE_SCRIPT%" ( + echo [NodeToCode-Bridge] ERROR: Bridge script not found at %BRIDGE_SCRIPT% 1>&2 + exit /b 1 +) + +set "PYTHON_EXE=" + +REM === Method 1: Check Registry for Launcher-installed UE versions === +REM These are registered at HKEY_LOCAL_MACHINE\SOFTWARE\EpicGames\Unreal Engine\ + +for %%V in (5.7 5.6 5.5 5.4 5.3) do ( + if "!PYTHON_EXE!"=="" ( + for /f "tokens=2*" %%A in ('reg query "HKEY_LOCAL_MACHINE\SOFTWARE\EpicGames\Unreal Engine\%%V" /v InstalledDirectory 2^>nul') do ( + set "UE_DIR=%%B" + if exist "!UE_DIR!\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" ( + set "PYTHON_EXE=!UE_DIR!\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" + echo [NodeToCode-Bridge] Found UE %%V Python via registry: !PYTHON_EXE! 1>&2 + ) + ) + ) +) + +REM === Method 2: Check Registry for Custom/Source builds === +REM These are registered at HKEY_CURRENT_USER\SOFTWARE\Epic Games\Unreal Engine\Builds + +if "!PYTHON_EXE!"=="" ( + for /f "tokens=1,2*" %%A in ('reg query "HKEY_CURRENT_USER\SOFTWARE\Epic Games\Unreal Engine\Builds" 2^>nul') do ( + if "!PYTHON_EXE!"=="" ( + REM %%A is the GUID/name, %%B is REG_SZ, %%C is the path + if "%%B"=="REG_SZ" ( + set "UE_DIR=%%C" + if exist "!UE_DIR!\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" ( + set "PYTHON_EXE=!UE_DIR!\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" + echo [NodeToCode-Bridge] Found custom UE build Python via registry: !PYTHON_EXE! 1>&2 + ) + ) + ) + ) +) + +REM === Method 3: Check LauncherInstalled.dat (Epic Games Launcher data file) === + +if "!PYTHON_EXE!"=="" ( + set "LAUNCHER_FILE=%PROGRAMDATA%\Epic\UnrealEngineLauncher\LauncherInstalled.dat" + if exist "!LAUNCHER_FILE!" ( + REM Parse JSON to find InstallLocation - basic parsing with findstr + for /f "tokens=2 delims=:," %%A in ('type "!LAUNCHER_FILE!" ^| findstr /C:"InstallLocation"') do ( + if "!PYTHON_EXE!"=="" ( + set "INSTALL_PATH=%%~A" + REM Remove quotes and whitespace + set "INSTALL_PATH=!INSTALL_PATH:"=!" + set "INSTALL_PATH=!INSTALL_PATH: =!" + if exist "!INSTALL_PATH!\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" ( + set "PYTHON_EXE=!INSTALL_PATH!\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" + echo [NodeToCode-Bridge] Found UE Python via LauncherInstalled.dat: !PYTHON_EXE! 1>&2 + ) + ) + ) + ) +) + +REM === Method 4: Common installation paths as fallback === + +if "!PYTHON_EXE!"=="" ( + set "UE_PATHS=C:\Program Files\Epic Games;D:\Program Files\Epic Games;E:\Program Files\Epic Games" + set "UE_PATHS=!UE_PATHS!;C:\Epic Games;D:\Epic Games;E:\Epic Games" + set "UE_PATHS=!UE_PATHS!;C:\UE;D:\UE;E:\UE;F:\UE;G:\UE" + + set "UE_VERSIONS=UE_5.7 UE_5.6 UE_5.5 UE_5.4 UE_5.3 UE_5.2 UE_5.1 UE_5.0" + + for %%P in (!UE_PATHS!) do ( + for %%V in (!UE_VERSIONS!) do ( + if "!PYTHON_EXE!"=="" ( + set "TEST_PATH=%%P\%%V\Engine\Binaries\ThirdParty\Python3\Win64\python.exe" + if exist "!TEST_PATH!" ( + set "PYTHON_EXE=!TEST_PATH!" + echo [NodeToCode-Bridge] Found UE Python at common path: !PYTHON_EXE! 1>&2 + ) + ) + ) + ) +) + +REM === Method 5: Fall back to system Python === + +if "!PYTHON_EXE!"=="" ( + where python >nul 2>&1 + if !ERRORLEVEL! EQU 0 ( + set "PYTHON_EXE=python" + echo [NodeToCode-Bridge] Using system Python 1>&2 + ) +) + +if "!PYTHON_EXE!"=="" ( + where python3 >nul 2>&1 + if !ERRORLEVEL! EQU 0 ( + set "PYTHON_EXE=python3" + echo [NodeToCode-Bridge] Using system Python3 1>&2 + ) +) + +REM === No Python found === + +if "!PYTHON_EXE!"=="" ( + echo [NodeToCode-Bridge] ERROR: Python not found. 1>&2 + echo [NodeToCode-Bridge] Please install Unreal Engine or Python. 1>&2 + exit /b 1 +) + +REM Run the bridge with all passed arguments +"!PYTHON_EXE!" "%BRIDGE_SCRIPT%" %* diff --git a/Content/Python/mcp_bridge/launch_bridge.sh b/Content/Python/mcp_bridge/launch_bridge.sh new file mode 100644 index 0000000..b1c4d9a --- /dev/null +++ b/Content/Python/mcp_bridge/launch_bridge.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# NodeToCode MCP Bridge Launcher (macOS/Linux) +# Automatically finds Python - prefers UE's bundled Python, falls back to system Python + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BRIDGE_SCRIPT="$SCRIPT_DIR/nodetocode_bridge.py" + +# Check if bridge script exists +if [ ! -f "$BRIDGE_SCRIPT" ]; then + echo "[NodeToCode-Bridge] ERROR: Bridge script not found at $BRIDGE_SCRIPT" >&2 + exit 1 +fi + +PYTHON_EXE="" + +# === Try to find UE's bundled Python === + +# Common UE installation paths on macOS +UE_PATHS=( + "/Users/Shared/Epic Games" + "$HOME/Epic Games" + "/Applications/Epic Games" +) + +# UE versions to check (newest first) +UE_VERSIONS=("UE_5.7" "UE_5.6" "UE_5.5" "UE_5.4" "UE_5.3" "UE_5.2" "UE_5.1" "UE_5.0") + +for ue_path in "${UE_PATHS[@]}"; do + for ue_version in "${UE_VERSIONS[@]}"; do + TEST_PATH="$ue_path/$ue_version/Engine/Binaries/ThirdParty/Python3/Mac/bin/python3" + if [ -f "$TEST_PATH" ]; then + PYTHON_EXE="$TEST_PATH" + echo "[NodeToCode-Bridge] Found UE Python: $PYTHON_EXE" >&2 + break 2 + fi + done +done + +# === Fall back to system Python === + +if [ -z "$PYTHON_EXE" ]; then + if command -v python3 &> /dev/null; then + PYTHON_EXE="python3" + echo "[NodeToCode-Bridge] Using system Python3" >&2 + elif command -v python &> /dev/null; then + PYTHON_EXE="python" + echo "[NodeToCode-Bridge] Using system Python" >&2 + else + echo "[NodeToCode-Bridge] ERROR: Python not found. Please install Python or specify UE Python path." >&2 + exit 1 + fi +fi + +# Run the bridge with all passed arguments +exec "$PYTHON_EXE" "$BRIDGE_SCRIPT" "$@" diff --git a/Content/Python/mcp_bridge/nodetocode_bridge.py b/Content/Python/mcp_bridge/nodetocode_bridge.py new file mode 100644 index 0000000..d96e2ea --- /dev/null +++ b/Content/Python/mcp_bridge/nodetocode_bridge.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +NodeToCode MCP Bridge + +A lightweight stdio-to-HTTP bridge that enables MCP clients (like Claude Desktop) +to communicate with the NodeToCode UE5 plugin's HTTP MCP server. + +Zero external dependencies - uses only Python standard library. + +Usage: + python nodetocode_bridge.py [--port PORT] [--host HOST] [--debug] +""" + +import sys +import json +import urllib.request +import urllib.error +import argparse +import threading +from typing import Optional + +# Configuration +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 27000 +DEFAULT_TIMEOUT = 300 # 5 minutes for long-running tools + +# Global debug flag +debug_mode = False + + +def log_debug(message: str) -> None: + """Log debug message to stderr.""" + if debug_mode: + print(f"[NodeToCode-Bridge] {message}", file=sys.stderr, flush=True) + + +def log_error(message: str) -> None: + """Log error message to stderr.""" + print(f"[NodeToCode-Bridge ERROR] {message}", file=sys.stderr, flush=True) + + +def check_ue5_health(host: str, port: int) -> bool: + """Check if UE5 MCP server is running.""" + url = f"http://{host}:{port}/mcp/health" + log_debug(f"Probing health at {url}") + + try: + req = urllib.request.Request(url, method='GET') + with urllib.request.urlopen(req, timeout=2) as resp: + if resp.status == 200: + log_debug(f"Health check passed: {resp.read().decode('utf-8')}") + return True + except urllib.error.URLError as e: + log_debug(f"Health check failed: {e}") + except Exception as e: + log_debug(f"Health check exception: {e}") + + return False + + +def find_ue5_server(host: str, port: int) -> Optional[tuple]: + """Find running UE5 MCP server, trying multiple hosts/ports.""" + # Try 127.0.0.1 first to avoid IPv6 issues + hosts_to_try = ["127.0.0.1", "localhost"] if host == "localhost" else [host] + + for h in hosts_to_try: + if check_ue5_health(h, port): + return (h, port) + + # Try port range + for h in hosts_to_try: + for p in range(27000, 27011): + if p == port and h == host: + continue # Already tried + if check_ue5_health(h, p): + return (h, p) + + return None + + +def forward_request(host: str, port: int, request_data: str, session_id: Optional[str] = None) -> tuple: + """Forward a request to UE5 and return (response_body, new_session_id).""" + url = f"http://{host}:{port}/mcp" + + headers = {'Content-Type': 'application/json'} + if session_id: + headers['Mcp-Session-Id'] = session_id + + log_debug(f"Forwarding to UE5: {request_data[:200]}...") + + try: + req = urllib.request.Request( + url, + data=request_data.encode('utf-8'), + headers=headers, + method='POST' + ) + + with urllib.request.urlopen(req, timeout=DEFAULT_TIMEOUT) as resp: + response_body = resp.read().decode('utf-8') + new_session_id = resp.headers.get('Mcp-Session-Id', session_id) + + log_debug(f"Response status: {resp.status}") + log_debug(f"Response body: {response_body[:200]}...") + + # Handle 202 Accepted (long-running tool with SSE) + if resp.status == 202: + return handle_sse_response(host, response_body, new_session_id) + + return (response_body, new_session_id) + + except urllib.error.HTTPError as e: + error_body = e.read().decode('utf-8') if e.fp else str(e) + log_error(f"HTTP error {e.code}: {error_body}") + return (error_body, session_id) + + except urllib.error.URLError as e: + log_error(f"URL error: {e.reason}") + error = { + "jsonrpc": "2.0", + "id": None, + "error": {"code": -32603, "message": f"Connection failed: {e.reason}"} + } + return (json.dumps(error), session_id) + + except Exception as e: + log_error(f"Unexpected error: {e}") + error = { + "jsonrpc": "2.0", + "id": None, + "error": {"code": -32603, "message": str(e)} + } + return (json.dumps(error), session_id) + + +def handle_sse_response(host: str, response_body: str, session_id: Optional[str]) -> tuple: + """Handle SSE streaming for long-running tools.""" + try: + sse_info = json.loads(response_body) + + if sse_info.get("status") != "accepted": + return (response_body, session_id) + + sse_url = sse_info.get("sseUrl") + task_id = sse_info.get("taskId") + + if not sse_url: + log_error("No SSE URL in accepted response") + return (response_body, session_id) + + log_debug(f"Starting SSE listener for task: {task_id}") + + # Connect to SSE stream + req = urllib.request.Request(sse_url, method='GET') + + with urllib.request.urlopen(req, timeout=DEFAULT_TIMEOUT) as resp: + final_response = None + buffer = "" + + for chunk in iter(lambda: resp.read(1024).decode('utf-8'), ''): + buffer += chunk + + # Process complete SSE events + while '\n\n' in buffer: + event_str, buffer = buffer.split('\n\n', 1) + event_type, event_data = parse_sse_event(event_str) + + if event_type in ('progress', 'notification'): + # Forward progress notifications immediately + if event_data: + print(event_data, flush=True) + log_debug(f"Forwarded {event_type}: {event_data[:100]}...") + + elif event_type in ('response', 'result'): + # Final response + final_response = event_data + break + + if final_response: + break + + if final_response: + return (final_response, session_id) + + # If we didn't get a final response, return the original + return (response_body, session_id) + + except Exception as e: + log_error(f"SSE handling error: {e}") + return (response_body, session_id) + + +def parse_sse_event(event_str: str) -> tuple: + """Parse an SSE event string into (event_type, data).""" + event_type = "message" + data = "" + + for line in event_str.split('\n'): + if line.startswith('event:'): + event_type = line[6:].strip() + elif line.startswith('data:'): + data = line[5:].strip() + + return (event_type, data) + + +def write_response(response: str) -> None: + """Write response to stdout as a single line (NDJSON format).""" + try: + # Parse and re-serialize to ensure single-line compact JSON + # This handles pretty-printed responses from UE5 + obj = json.loads(response) + compact = json.dumps(obj, separators=(',', ':')) + print(compact, flush=True) + log_debug(f"Sent: {compact[:200]}...") + except json.JSONDecodeError: + # If not valid JSON, send as-is (shouldn't happen) + print(response.replace('\n', ' ').replace('\r', ''), flush=True) + log_debug(f"Sent (raw): {response[:200]}...") + + +def write_error(request_id, code: int, message: str) -> None: + """Write an error response to stdout.""" + error = { + "jsonrpc": "2.0", + "id": request_id, + "error": {"code": code, "message": message} + } + # Use compact format directly since we're creating the JSON ourselves + print(json.dumps(error, separators=(',', ':')), flush=True) + log_debug(f"Sent error: {message}") + + +def main(): + global debug_mode + + # Parse arguments + parser = argparse.ArgumentParser(description='NodeToCode MCP Bridge') + parser.add_argument('--host', default=DEFAULT_HOST, help='UE5 server host') + parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='UE5 server port') + parser.add_argument('--debug', action='store_true', help='Enable debug logging') + args = parser.parse_args() + + debug_mode = args.debug + + log_debug("NodeToCode MCP Bridge starting...") + log_debug(f"Configuration: host={args.host}, port={args.port}") + + # Find UE5 server + server = find_ue5_server(args.host, args.port) + + if not server: + log_error("Failed to connect to NodeToCode UE5 plugin. Is the Unreal Editor running with NodeToCode?") + sys.exit(1) + + host, port = server + log_debug(f"Connected to UE5 MCP server at {host}:{port}") + + # Session tracking + session_id = None + + # Process stdin + try: + for line in sys.stdin: + line = line.strip() + if not line: + continue + + log_debug(f"Received: {line[:200]}...") + + try: + # Validate JSON + request = json.loads(line) + request_id = request.get("id") + + # Forward to UE5 + response, session_id = forward_request(host, port, line, session_id) + + # Handle response + if response: + # Ensure response has correct ID + try: + resp_obj = json.loads(response) + if request_id is not None and resp_obj.get("id") is None: + resp_obj["id"] = request_id + response = json.dumps(resp_obj) + except: + pass + + write_response(response) + + except json.JSONDecodeError as e: + log_error(f"JSON parse error: {e}") + write_error(None, -32700, f"Parse error: {e}") + + except Exception as e: + log_error(f"Error processing request: {e}") + write_error(None, -32603, str(e)) + + except KeyboardInterrupt: + log_debug("Interrupted, shutting down...") + except Exception as e: + log_error(f"Fatal error: {e}") + sys.exit(1) + + log_debug("Bridge shutting down") + + +if __name__ == "__main__": + main() diff --git a/Content/Python/mcp_bridge/ue_python_finder.py b/Content/Python/mcp_bridge/ue_python_finder.py new file mode 100644 index 0000000..7ee33fa --- /dev/null +++ b/Content/Python/mcp_bridge/ue_python_finder.py @@ -0,0 +1,339 @@ +r""" +UE Python Finder + +Finds Unreal Engine's bundled Python installation using the same methods +as UnrealVersionSelector: +1. Windows Registry (HKEY_CURRENT_USER\SOFTWARE\Epic Games\Unreal Engine\Builds) +2. Epic Games Launcher's LauncherInstalled.dat file +3. Common installation paths as fallback + +This module uses only Python standard library. +""" + +import os +import sys +import json +import re +from typing import Optional, List, Tuple + +# Platform-specific imports +if sys.platform == 'win32': + import winreg + + +def log_debug(message: str, debug: bool = False) -> None: + """Log debug message to stderr.""" + if debug: + print(f"[UE-Python-Finder] {message}", file=sys.stderr, flush=True) + + +def get_ue_installs_from_registry(debug: bool = False) -> List[Tuple[str, str]]: + """ + Get UE installations from Windows Registry. + Returns list of (version_name, install_path) tuples. + """ + installations = [] + + if sys.platform != 'win32': + return installations + + # Check HKEY_CURRENT_USER\SOFTWARE\Epic Games\Unreal Engine\Builds + # This is where custom/source builds are registered + try: + key_path = r"SOFTWARE\Epic Games\Unreal Engine\Builds" + with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path) as key: + i = 0 + while True: + try: + name, value, _ = winreg.EnumValue(key, i) + if os.path.isdir(value): + log_debug(f"Registry build found: {name} -> {value}", debug) + installations.append((name, value)) + i += 1 + except OSError: + break + except FileNotFoundError: + log_debug("Registry key for custom builds not found", debug) + except Exception as e: + log_debug(f"Error reading registry builds: {e}", debug) + + # Check HKEY_LOCAL_MACHINE\SOFTWARE\EpicGames\Unreal Engine + # This is where launcher-installed versions are registered + try: + key_path = r"SOFTWARE\EpicGames\Unreal Engine" + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path) as key: + i = 0 + while True: + try: + subkey_name = winreg.EnumKey(key, i) + try: + with winreg.OpenKey(key, subkey_name) as subkey: + install_dir, _ = winreg.QueryValueEx(subkey, "InstalledDirectory") + if os.path.isdir(install_dir): + log_debug(f"Registry installed found: {subkey_name} -> {install_dir}", debug) + installations.append((subkey_name, install_dir)) + except (FileNotFoundError, OSError): + pass + i += 1 + except OSError: + break + except FileNotFoundError: + log_debug("Registry key for installed versions not found", debug) + except Exception as e: + log_debug(f"Error reading registry installations: {e}", debug) + + return installations + + +def get_ue_installs_from_launcher(debug: bool = False) -> List[Tuple[str, str]]: + """ + Get UE installations from Epic Games Launcher's LauncherInstalled.dat file. + Returns list of (version_name, install_path) tuples. + """ + installations = [] + + # LauncherInstalled.dat location + if sys.platform == 'win32': + launcher_file = os.path.join( + os.environ.get('PROGRAMDATA', 'C:\\ProgramData'), + 'Epic', 'UnrealEngineLauncher', 'LauncherInstalled.dat' + ) + elif sys.platform == 'darwin': + launcher_file = os.path.expanduser( + '~/Library/Application Support/Epic/UnrealEngineLauncher/LauncherInstalled.dat' + ) + else: + return installations + + if not os.path.exists(launcher_file): + log_debug(f"Launcher file not found: {launcher_file}", debug) + return installations + + try: + with open(launcher_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + for item in data.get('InstallationList', []): + install_path = item.get('InstallLocation', '') + app_name = item.get('AppName', '') + + # UE installations have AppName like "UE_5.4" + if install_path and os.path.isdir(install_path): + # Try to determine version from path or app name + version = app_name if app_name else os.path.basename(install_path) + log_debug(f"Launcher found: {version} -> {install_path}", debug) + installations.append((version, install_path)) + + except json.JSONDecodeError as e: + log_debug(f"Error parsing launcher file: {e}", debug) + except Exception as e: + log_debug(f"Error reading launcher file: {e}", debug) + + return installations + + +def get_python_path_for_ue_install(install_path: str, debug: bool = False) -> Optional[str]: + """ + Get the Python executable path for a given UE installation. + """ + if sys.platform == 'win32': + python_path = os.path.join( + install_path, 'Engine', 'Binaries', 'ThirdParty', 'Python3', 'Win64', 'python.exe' + ) + elif sys.platform == 'darwin': + python_path = os.path.join( + install_path, 'Engine', 'Binaries', 'ThirdParty', 'Python3', 'Mac', 'bin', 'python3' + ) + else: + python_path = os.path.join( + install_path, 'Engine', 'Binaries', 'ThirdParty', 'Python3', 'Linux', 'bin', 'python3' + ) + + if os.path.exists(python_path): + log_debug(f"Found Python at: {python_path}", debug) + return python_path + + log_debug(f"Python not found at: {python_path}", debug) + return None + + +def get_source_path_for_ue_install(install_path: str, debug: bool = False) -> Optional[str]: + """ + Get the Engine/Source path for a given UE installation. + """ + source_path = os.path.join(install_path, 'Engine', 'Source') + if os.path.isdir(source_path): + log_debug(f"Found Source at: {source_path}", debug) + return source_path + + log_debug(f"Source not found at: {source_path}", debug) + return None + + +def find_ue_source(prefer_newest: bool = True, debug: bool = False) -> Optional[str]: + """ + Find UE Engine/Source path. + + Args: + prefer_newest: If True, prefer the newest UE version. If False, prefer oldest. + debug: Enable debug logging. + + Returns: + Path to Engine/Source directory, or None if not found. + """ + all_installations = [] + + # Gather installations from all sources + all_installations.extend(get_ue_installs_from_registry(debug)) + all_installations.extend(get_ue_installs_from_launcher(debug)) + + # Remove duplicates (by install path) + seen_paths = set() + unique_installations = [] + for version, path in all_installations: + normalized_path = os.path.normpath(path).lower() + if normalized_path not in seen_paths: + seen_paths.add(normalized_path) + unique_installations.append((version, path)) + + if not unique_installations: + log_debug("No UE installations found via registry or launcher", debug) + return None + + # Sort by version + unique_installations.sort( + key=lambda x: parse_ue_version(x[0]), + reverse=prefer_newest + ) + + log_debug(f"Found {len(unique_installations)} UE installation(s)", debug) + + # Find first installation with valid Source directory + for version, install_path in unique_installations: + source_path = get_source_path_for_ue_install(install_path, debug) + if source_path: + log_debug(f"Selected: {version} ({install_path})", debug) + return source_path + + log_debug("No UE installation has valid Source directory", debug) + return None + + +def parse_ue_version(version_str: str) -> Tuple[int, int, int]: + """ + Parse a UE version string into a tuple for sorting. + Examples: "UE_5.4" -> (5, 4, 0), "5.3.2" -> (5, 3, 2) + """ + # Extract version numbers + match = re.search(r'(\d+)\.(\d+)(?:\.(\d+))?', version_str) + if match: + major = int(match.group(1)) + minor = int(match.group(2)) + patch = int(match.group(3)) if match.group(3) else 0 + return (major, minor, patch) + return (0, 0, 0) + + +def find_ue_python(prefer_newest: bool = True, debug: bool = False) -> Optional[str]: + """ + Find UE's bundled Python installation. + + Args: + prefer_newest: If True, prefer the newest UE version. If False, prefer oldest. + debug: Enable debug logging. + + Returns: + Path to Python executable, or None if not found. + """ + all_installations = [] + + # Gather installations from all sources + all_installations.extend(get_ue_installs_from_registry(debug)) + all_installations.extend(get_ue_installs_from_launcher(debug)) + + # Remove duplicates (by install path) + seen_paths = set() + unique_installations = [] + for version, path in all_installations: + normalized_path = os.path.normpath(path).lower() + if normalized_path not in seen_paths: + seen_paths.add(normalized_path) + unique_installations.append((version, path)) + + if not unique_installations: + log_debug("No UE installations found via registry or launcher", debug) + return None + + # Sort by version + unique_installations.sort( + key=lambda x: parse_ue_version(x[0]), + reverse=prefer_newest + ) + + log_debug(f"Found {len(unique_installations)} UE installation(s)", debug) + + # Find first installation with valid Python + for version, install_path in unique_installations: + python_path = get_python_path_for_ue_install(install_path, debug) + if python_path: + log_debug(f"Selected: {version} ({install_path})", debug) + return python_path + + log_debug("No UE installation has valid Python", debug) + return None + + +def find_system_python(debug: bool = False) -> Optional[str]: + """Find system Python installation.""" + import shutil + + for cmd in ['python3', 'python']: + path = shutil.which(cmd) + if path: + log_debug(f"Found system Python: {path}", debug) + return path + + log_debug("System Python not found", debug) + return None + + +def find_python(prefer_ue: bool = True, debug: bool = False) -> Optional[str]: + """ + Find Python installation, preferring UE's bundled Python by default. + + Args: + prefer_ue: If True, try UE Python first. If False, try system Python first. + debug: Enable debug logging. + + Returns: + Path to Python executable, or None if not found. + """ + if prefer_ue: + python_path = find_ue_python(debug=debug) + if python_path: + return python_path + return find_system_python(debug) + else: + python_path = find_system_python(debug) + if python_path: + return python_path + return find_ue_python(debug=debug) + + +if __name__ == '__main__': + # When run directly, print the found Python path + import argparse + + parser = argparse.ArgumentParser(description='Find UE Python installation') + parser.add_argument('--debug', action='store_true', help='Enable debug output') + parser.add_argument('--system-first', action='store_true', help='Prefer system Python') + args = parser.parse_args() + + python_path = find_python(prefer_ue=not args.system_first, debug=args.debug) + + if python_path: + print(python_path) + sys.exit(0) + else: + print("Python not found", file=sys.stderr) + sys.exit(1) diff --git a/Content/Python/nodetocode/__init__.py b/Content/Python/nodetocode/__init__.py new file mode 100644 index 0000000..3d44feb --- /dev/null +++ b/Content/Python/nodetocode/__init__.py @@ -0,0 +1,223 @@ +""" +NodeToCode Python Module for Unreal Engine + +Provides Blueprint manipulation and script management utilities accessible from +the run-python MCP tool. All functions return standardized dictionaries with +'success', 'data', and 'error' keys. + +Quick Start: + import nodetocode as n2c + + # Get the currently focused blueprint + bp = n2c.get_focused_blueprint() + if bp['success']: + print(f"Blueprint: {bp['data']['name']}") + print(f"Status: {bp['data']['status']}") + + # Compile the blueprint + result = n2c.compile_blueprint() + if result['success']: + print("Compilation successful!") + + # Save the blueprint + result = n2c.save_blueprint() + if result['data']['was_saved']: + print("Saved to disk") + +Script Management: + # Search for existing scripts + matches = n2c.search_scripts("health system") + + # Run a saved script + result = n2c.run_script("create_health_system", max_hp=100) + + # Save a new script for reuse + n2c.save_script("my_script", code, "Description", tags=["gameplay"]) + +Graph Editing: + # Search for and add nodes + results = n2c.search_blueprint_nodes("Print String") + node = results['data']['nodes'][0] + added = n2c.add_node_to_graph(node['displayName'], node['spawnMetadata']['actionIdentifier']) + + # Connect nodes + n2c.connect_pins([{"from": {...}, "to": {...}}]) + + # Find and delete nodes + found = n2c.find_nodes_in_graph(["Print"]) + n2c.delete_nodes([found['data']['nodes'][0]['nodeGuid']]) + +Function Pin Management: + # Add function parameters + n2c.add_function_input_pin("Target", "/Script/Engine.Actor") + n2c.add_function_return_pin("bSuccess", "bool") + +Return Format: + All functions return a dictionary with: + - 'success': bool - True if the operation succeeded + - 'data': dict or None - Operation-specific result data + - 'error': str or None - Error message if success is False + +Available Functions: + Blueprint Operations: + - get_focused_blueprint() - Get info about the focused Blueprint + - compile_blueprint(path=None) - Compile a Blueprint + - save_blueprint(path=None) - Save a Blueprint to disk + - load_blueprint(path) - Load a Blueprint by path + - open_blueprint(path, focus_graph) - Open Blueprint in editor + - open_blueprint_function(name) - Focus a function in the open Blueprint + + Script Management: + - list_scripts(category, limit) - List available scripts + - search_scripts(query, limit) - Search scripts by name/description/tags + - get_script(name) - Load script code and metadata + - get_script_functions(name) - Get function signatures (token-efficient) + - run_script(name, **kwargs) - Execute a saved script + - save_script(name, code, description, tags, category) - Save new script + - delete_script(name) - Remove a script + - get_script_stats() - Get script library statistics + + Graph Editing: + - search_blueprint_nodes(term, context_sensitive, max) - Search for nodes to add + - add_node_to_graph(name, action_id, x, y) - Add a node to focused graph + - connect_pins(connections, break_existing) - Connect pins between nodes + - set_pin_value(node_guid, pin_guid, value) - Set input pin default value + - delete_nodes(guids, preserve_connections, force) - Delete nodes + - find_nodes_in_graph(terms, type, case_sensitive) - Find nodes by keyword/GUID + - create_comment_node(guids, text, color, font_size) - Create comment around nodes + + Function Pin Management: + - add_function_input_pin(name, type, default, by_ref, tooltip) - Add input param + - add_function_return_pin(name, type, tooltip) - Add return value + - remove_function_entry_pin(name) - Remove input parameter + - remove_function_return_pin(name) - Remove return value + + NodeToCode-Specific (Tagging & LLM): + - tag_graph(tag, category, description) - Tag the focused graph + - list_tags(category, tag) - List all tags with optional filters + - remove_tag(graph_guid, tag) - Remove a tag from a graph + - get_llm_providers() - Get all available LLM providers + - get_active_provider() - Get current LLM provider info + + Utilities: + - get_editor_subsystem(class) - Get an editor subsystem + - is_blueprint_valid(obj) - Check if object is a Blueprint + - log_info/warning/error(msg) - Log messages +""" + +__version__ = "1.0.0" +__author__ = "Protospatial" + +# Blueprint operations +from .blueprint import ( + get_focused_blueprint, + compile_blueprint, + save_blueprint, + load_blueprint, +) + +# Script management +from .scripts import ( + list_scripts, + search_scripts, + get_script, + get_script_functions, + run_script, + save_script, + delete_script, + get_script_stats, +) + +# NodeToCode-specific features (tagging, LLM info, navigation) +from .bridge import ( + tag_graph, + list_tags, + remove_tag, + get_llm_providers, + get_active_provider, + open_blueprint, + open_blueprint_function, +) + +# Graph editing operations +from .graph import ( + search_blueprint_nodes, + add_node_to_graph, + connect_pins, + set_pin_value, + delete_nodes, + find_nodes_in_graph, + create_comment_node, +) + +# Function pin management +from .functions import ( + add_function_input_pin, + add_function_return_pin, + remove_function_entry_pin, + remove_function_return_pin, +) + +# Utility functions +from .utils import ( + get_editor_subsystem, + is_blueprint_valid, + get_project_content_dir, + get_project_dir, + log_info, + log_warning, + log_error, + make_success_result, + make_error_result, +) + +__all__ = [ + # Version info + '__version__', + '__author__', + # Blueprint operations + 'get_focused_blueprint', + 'compile_blueprint', + 'save_blueprint', + 'load_blueprint', + # Script management + 'list_scripts', + 'search_scripts', + 'get_script', + 'get_script_functions', + 'run_script', + 'save_script', + 'delete_script', + 'get_script_stats', + # Graph editing + 'search_blueprint_nodes', + 'add_node_to_graph', + 'connect_pins', + 'set_pin_value', + 'delete_nodes', + 'find_nodes_in_graph', + 'create_comment_node', + # Function pin management + 'add_function_input_pin', + 'add_function_return_pin', + 'remove_function_entry_pin', + 'remove_function_return_pin', + # NodeToCode-specific (tagging, LLM info, navigation) + 'tag_graph', + 'list_tags', + 'remove_tag', + 'get_llm_providers', + 'get_active_provider', + 'open_blueprint', + 'open_blueprint_function', + # Utilities + 'get_editor_subsystem', + 'is_blueprint_valid', + 'get_project_content_dir', + 'get_project_dir', + 'log_info', + 'log_warning', + 'log_error', + 'make_success_result', + 'make_error_result', +] diff --git a/Content/Python/nodetocode/blueprint.py b/Content/Python/nodetocode/blueprint.py new file mode 100644 index 0000000..93436b0 --- /dev/null +++ b/Content/Python/nodetocode/blueprint.py @@ -0,0 +1,200 @@ +""" +Blueprint manipulation functions for NodeToCode. + +These functions provide safe, error-handled wrappers around NodeToCode's C++ functionality +via the UN2CPythonBridge class. All functions return standardized result dictionaries +with 'success', 'data', and 'error' keys. +""" + +import unreal +import json +from typing import Dict, Any, Optional + +from .utils import ( + make_success_result, + make_error_result, + log_info, + log_error +) + + +def _parse_bridge_result(json_str: str) -> Dict[str, Any]: + """ + Parse the JSON result from UN2CPythonBridge functions. + + Args: + json_str: JSON string returned by the bridge function + + Returns: + Parsed dictionary with success, data, and error keys + """ + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + return make_error_result(f"Failed to parse bridge response: {e}") + + +def get_focused_blueprint() -> Dict[str, Any]: + """ + Get information about the currently focused Blueprint in the editor. + + Uses NodeToCode's C++ infrastructure to collect and serialize the Blueprint + into N2CJSON format, which includes detailed node and connection information. + + Returns: + dict: { + 'success': bool, + 'data': { + 'name': str, # Blueprint asset name + 'path': str, # Full asset path + 'graph_name': str, # Name of the focused graph + 'node_count': int, # Number of nodes in the graph + 'n2c_json': dict, # Full N2CJSON representation + } or None, + 'error': str or None + } + + Example: + bp = get_focused_blueprint() + if bp['success']: + print(f"Editing: {bp['data']['name']}") + print(f"Nodes: {bp['data']['node_count']}") + """ + try: + # Call the C++ bridge function + result_json = unreal.N2CPythonBridge.get_focused_blueprint_json() + return _parse_bridge_result(result_json) + except Exception as e: + log_error(f"get_focused_blueprint failed: {e}") + return make_error_result(str(e)) + + +def compile_blueprint(blueprint_path: Optional[str] = None) -> Dict[str, Any]: + """ + Compile a Blueprint. If no path is provided, compiles the focused Blueprint. + + Note: Currently only supports compiling the focused Blueprint. + Path-based compilation will be added in a future update. + + Args: + blueprint_path: Optional asset path (currently ignored, uses focused Blueprint) + + Returns: + dict: { + 'success': bool, # True if compilation succeeded without errors + 'data': { + 'blueprint_name': str, + 'status': str, # Post-compilation status + 'had_errors': bool, + 'had_warnings': bool, + } or None, + 'error': str or None + } + + Example: + result = compile_blueprint() + if result['success']: + print("Compilation successful!") + elif result['data'] and result['data']['had_errors']: + print("Compilation failed with errors") + """ + try: + if blueprint_path: + log_info(f"Note: blueprint_path parameter not yet implemented, using focused Blueprint") + + # Call the C++ bridge function + result_json = unreal.N2CPythonBridge.compile_focused_blueprint() + return _parse_bridge_result(result_json) + except Exception as e: + log_error(f"compile_blueprint failed: {e}") + return make_error_result(str(e)) + + +def save_blueprint(blueprint_path: Optional[str] = None, + only_if_dirty: bool = True) -> Dict[str, Any]: + """ + Save a Blueprint to disk. If no path is provided, saves the focused Blueprint. + + Note: Currently only supports saving the focused Blueprint. + Path-based saving will be added in a future update. + + Args: + blueprint_path: Optional asset path (currently ignored, uses focused Blueprint) + only_if_dirty: If True, only saves if the Blueprint has unsaved changes. + Default is True to avoid unnecessary disk writes. + + Returns: + dict: { + 'success': bool, + 'data': { + 'blueprint_name': str, + 'was_dirty': bool, + 'was_saved': bool, + } or None, + 'error': str or None + } + + Example: + result = save_blueprint() + if result['success'] and result['data']['was_saved']: + print("Blueprint saved to disk") + """ + try: + if blueprint_path: + log_info(f"Note: blueprint_path parameter not yet implemented, using focused Blueprint") + + # Call the C++ bridge function + result_json = unreal.N2CPythonBridge.save_focused_blueprint(only_if_dirty) + return _parse_bridge_result(result_json) + except Exception as e: + log_error(f"save_blueprint failed: {e}") + return make_error_result(str(e)) + + +def load_blueprint(blueprint_path: str) -> Dict[str, Any]: + """ + Load a Blueprint asset by path. + + Args: + blueprint_path: Asset path to the Blueprint (e.g., '/Game/BP_MyActor') + + Returns: + dict: { + 'success': bool, + 'data': { + 'name': str, + 'path': str, + 'class_name': str, + 'parent_class': str, + } or None, + 'error': str or None + } + + Example: + bp = load_blueprint('/Game/Blueprints/BP_Character') + if bp['success']: + print(f"Loaded: {bp['data']['name']}") + """ + try: + if not blueprint_path: + return make_error_result('blueprint_path is required') + + blueprint = unreal.load_asset(blueprint_path) + if not blueprint or not isinstance(blueprint, unreal.Blueprint): + return make_error_result(f'Blueprint not found at path: {blueprint_path}') + + # Get parent class info + parent_class_name = '' + if blueprint.parent_class: + parent_class_name = blueprint.parent_class.get_name() + + return make_success_result({ + 'name': blueprint.get_name(), + 'path': blueprint.get_path_name(), + 'class_name': blueprint.get_class().get_name(), + 'parent_class': parent_class_name, + }) + + except Exception as e: + log_error(f"load_blueprint failed: {e}") + return make_error_result(str(e)) diff --git a/Content/Python/nodetocode/bridge.py b/Content/Python/nodetocode/bridge.py new file mode 100644 index 0000000..a875bbd --- /dev/null +++ b/Content/Python/nodetocode/bridge.py @@ -0,0 +1,279 @@ +""" +Python wrappers for NodeToCode-specific C++ bridge functions. + +These expose unique NodeToCode features not available in the standard UE Python API: +- Custom Blueprint graph tagging system +- LLM provider information and discovery + +Usage: + import nodetocode as n2c + + # Tag the focused graph + result = n2c.tag_graph("gameplay", category="Systems", description="Core gameplay logic") + + # List all tags + tags = n2c.list_tags() + + # Get LLM provider info + provider = n2c.get_active_provider() +""" + +import json +from typing import Dict, Any, Optional + +import unreal + +from .utils import make_success_result, make_error_result, log_info, log_warning, log_error + + +def _parse_bridge_result(json_str: str) -> Dict[str, Any]: + """ + Parse JSON result from C++ bridge functions. + + Args: + json_str: JSON string from bridge function + + Returns: + Parsed dictionary with success/data/error structure + """ + if not json_str: + return make_error_result("Empty response from bridge function") + + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + log_error(f"Failed to parse bridge response: {e}") + return make_error_result(f"Failed to parse bridge response: {e}") + + +# ============== Tagging System ============== + +def tag_graph(tag: str, category: str = "Default", description: str = "") -> Dict[str, Any]: + """ + Tag the currently focused Blueprint graph. + + This uses NodeToCode's custom tagging system to organize and track Blueprint graphs. + Tags persist across editor sessions and can be used for batch operations. + + Args: + tag: Tag name to apply (required) + category: Category for organization (default "Default") + description: Optional description for the tag + + Returns: + {success, data: {tag, category, description, graph_guid, graph_name, blueprint_name}, error} + + Example: + result = tag_graph("player_controller", "Systems", "Main player input handling") + if result['success']: + print(f"Tagged graph: {result['data']['graph_name']}") + """ + if not tag or not tag.strip(): + return make_error_result("Tag name cannot be empty") + + try: + result = unreal.N2CPythonBridge.tag_focused_graph(tag.strip(), category.strip(), description.strip()) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"tag_graph failed: {e}") + return make_error_result(str(e)) + + +def list_tags(category: Optional[str] = None, tag: Optional[str] = None) -> Dict[str, Any]: + """ + List tags with optional filters. + + Args: + category: Filter by category (None for all) + tag: Filter by tag name (None for all) + + Returns: + {success, data: {tags: [...], count, total_categories, total_unique_tags}, error} + + Each tag entry contains: + - tag: Tag name + - category: Tag category + - description: Optional description + - graph_guid: GUID of the tagged graph + - graph_name: Name of the graph + - blueprint_path: Path to the owning Blueprint + - timestamp: When the tag was applied + + Example: + # List all tags + all_tags = list_tags() + + # List tags in a category + system_tags = list_tags(category="Systems") + + # Find graphs with a specific tag + gameplay_graphs = list_tags(tag="gameplay") + """ + try: + result = unreal.N2CPythonBridge.list_tags(category or "", tag or "") + return _parse_bridge_result(result) + except Exception as e: + log_error(f"list_tags failed: {e}") + return make_error_result(str(e)) + + +def remove_tag(graph_guid: str, tag: str) -> Dict[str, Any]: + """ + Remove a tag from a graph. + + Args: + graph_guid: The graph's GUID (string format, from list_tags) + tag: Tag name to remove + + Returns: + {success, data: {removed: bool, removed_count, tag, graph_guid, remaining_tags}, error} + + Example: + # Get tags first to find GUIDs + tags = list_tags() + for t in tags['data']['tags']: + if t['tag'] == 'old_tag': + remove_tag(t['graph_guid'], 'old_tag') + """ + if not graph_guid or not graph_guid.strip(): + return make_error_result("graph_guid cannot be empty") + + if not tag or not tag.strip(): + return make_error_result("tag cannot be empty") + + try: + result = unreal.N2CPythonBridge.remove_tag(graph_guid.strip(), tag.strip()) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"remove_tag failed: {e}") + return make_error_result(str(e)) + + +# ============== LLM Provider Info ============== + +def get_llm_providers() -> Dict[str, Any]: + """ + Get all available LLM providers and their configuration. + + This returns information about all LLM providers supported by NodeToCode, + including which one is currently active. + + Returns: + {success, data: {providers: [...], current_provider, provider_count}, error} + + Each provider entry contains: + - name: Provider identifier (OpenAI, Anthropic, etc.) + - display_name: Human-readable name + - is_local: Whether this is a local provider (Ollama, LM Studio) + - is_current: Whether this is the currently active provider + + Example: + providers = get_llm_providers() + if providers['success']: + for p in providers['data']['providers']: + status = " (active)" if p['is_current'] else "" + print(f"{p['display_name']}{status}") + """ + try: + result = unreal.N2CPythonBridge.get_llm_providers() + return _parse_bridge_result(result) + except Exception as e: + log_error(f"get_llm_providers failed: {e}") + return make_error_result(str(e)) + + +def get_active_provider() -> Dict[str, Any]: + """ + Get the currently active LLM provider. + + Returns detailed information about the currently configured LLM provider, + including the model and endpoint. + + Returns: + {success, data: {name, display_name, model, endpoint, is_local}, error} + + Example: + provider = get_active_provider() + if provider['success']: + print(f"Using {provider['data']['display_name']}") + print(f"Model: {provider['data']['model']}") + if provider['data']['is_local']: + print(f"Local endpoint: {provider['data']['endpoint']}") + """ + try: + result = unreal.N2CPythonBridge.get_active_provider() + return _parse_bridge_result(result) + except Exception as e: + log_error(f"get_active_provider failed: {e}") + return make_error_result(str(e)) + + +# ============== Blueprint Editor Navigation ============== + +def open_blueprint(blueprint_path: str, focus_graph: str = "EventGraph") -> Dict[str, Any]: + """ + Open a Blueprint asset in the editor and focus on a specific graph. + + IMPORTANT: A graph must be focused for other functions like get_focused_blueprint(), + add_node_to_graph(), etc. to work. Defaults to "EventGraph" if not specified. + + Args: + blueprint_path: Asset path of the Blueprint (e.g., "/Game/Blueprints/BP_MyActor") + focus_graph: Graph name to focus. Defaults to "EventGraph". Can also be + "ConstructionScript" or any function name in the Blueprint. + + Returns: + {success, data: {blueprintName, blueprintPath, focusedGraph, eventGraphCount, functionCount}, error} + + Example: + # Open a Blueprint (focuses EventGraph by default) + result = open_blueprint("/Game/Blueprints/BP_Player") + + # Open and focus on a specific function + result = open_blueprint("/Game/Blueprints/BP_Player", "HandleInput") + """ + if not blueprint_path or not blueprint_path.strip(): + return make_error_result("blueprint_path cannot be empty") + + try: + result = unreal.N2CPythonBridge.open_blueprint( + blueprint_path.strip(), + (focus_graph or "EventGraph").strip() + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"open_blueprint failed: {e}") + return make_error_result(str(e)) + + +def open_blueprint_function(function_name: str) -> Dict[str, Any]: + """ + Open/focus a function graph in the currently open Blueprint. + + The Blueprint must already be open in the editor. Use open_blueprint() first + if needed. + + Args: + function_name: Name of the function to open + + Returns: + {success, data: {functionName, blueprintName, blueprintPath, graphGuid}, error} + + Example: + # First open the Blueprint + open_blueprint("/Game/Blueprints/BP_Player") + + # Then focus on a specific function + result = open_blueprint_function("HandleDamage") + if result['success']: + print(f"Opened function: {result['data']['functionName']}") + """ + if not function_name or not function_name.strip(): + return make_error_result("function_name cannot be empty") + + try: + result = unreal.N2CPythonBridge.open_blueprint_function(function_name.strip()) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"open_blueprint_function failed: {e}") + return make_error_result(str(e)) diff --git a/Content/Python/nodetocode/functions.py b/Content/Python/nodetocode/functions.py new file mode 100644 index 0000000..3ff2feb --- /dev/null +++ b/Content/Python/nodetocode/functions.py @@ -0,0 +1,257 @@ +""" +Python wrappers for Blueprint function pin management. + +These functions enable programmatic manipulation of function parameters: +- Add input parameters to functions +- Add return values to functions +- Remove input parameters +- Remove return values + +Usage: + import nodetocode as n2c + + # Add an input parameter to the focused function + result = n2c.add_function_input_pin( + pin_name="TargetActor", + type_identifier="/Script/Engine.Actor", + is_pass_by_reference=True + ) + + # Add a return value + result = n2c.add_function_return_pin( + pin_name="bSuccess", + type_identifier="bool" + ) + + # Remove pins + n2c.remove_function_entry_pin("OldParameter") + n2c.remove_function_return_pin("UnusedReturn") +""" + +import json +from typing import Dict, Any, Optional + +import unreal + +from .utils import make_success_result, make_error_result, log_info, log_warning, log_error + + +def _parse_bridge_result(json_str: str) -> Dict[str, Any]: + """ + Parse JSON result from C++ bridge functions. + + Args: + json_str: JSON string from bridge function + + Returns: + Parsed dictionary with success/data/error structure + """ + if not json_str: + return make_error_result("Empty response from bridge function") + + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + log_error(f"Failed to parse bridge response: {e}") + return make_error_result(f"Failed to parse bridge response: {e}") + + +# ============== Function Input Pins ============== + +def add_function_input_pin( + pin_name: str, + type_identifier: str, + default_value: str = "", + is_pass_by_reference: bool = False, + tooltip: str = "" +) -> Dict[str, Any]: + """ + Add an input parameter to the currently focused Blueprint function. + + The function graph must be focused in the Blueprint editor. + + Args: + pin_name: Name for the new input parameter + type_identifier: Type identifier, e.g.: + - Primitives: "bool", "int32", "float", "FString", "FName", "FText" + - Vectors: "FVector", "FVector2D", "FRotator", "FTransform" + - Objects: "/Script/Engine.Actor", "/Script/Engine.StaticMeshComponent" + default_value: Optional default value for the parameter + is_pass_by_reference: Whether the parameter is passed by reference + tooltip: Optional tooltip description for the parameter + + Returns: + { + success: bool, + data: { + pinName: str, # Requested name + actualName: str, # Actual name (may differ if made unique) + pinGuid: str, # Unique pin identifier + functionName: str, # Name of the function + typeIdentifier: str # Resolved type + }, + error: str or None + } + + Example: + # Add a simple boolean parameter + result = add_function_input_pin("bEnabled", "bool", default_value="true") + + # Add an Actor reference parameter + result = add_function_input_pin( + pin_name="TargetActor", + type_identifier="/Script/Engine.Actor", + is_pass_by_reference=True, + tooltip="The actor to affect" + ) + """ + if not pin_name or not pin_name.strip(): + return make_error_result("pin_name cannot be empty") + + if not type_identifier or not type_identifier.strip(): + return make_error_result("type_identifier cannot be empty") + + try: + result = unreal.N2CPythonBridge.add_function_input_pin( + pin_name.strip(), + type_identifier.strip(), + default_value, + is_pass_by_reference, + tooltip + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"add_function_input_pin failed: {e}") + return make_error_result(str(e)) + + +def remove_function_entry_pin(pin_name: str) -> Dict[str, Any]: + """ + Remove an input parameter from the currently focused Blueprint function. + + Args: + pin_name: Name of the pin to remove (internal name or display name) + + Returns: + { + success: bool, + data: { + removedPin: str, + functionName: str + }, + error: str or None + } + + Example: + result = remove_function_entry_pin("OldParameter") + if result['success']: + print(f"Removed pin from {result['data']['functionName']}") + """ + if not pin_name or not pin_name.strip(): + return make_error_result("pin_name cannot be empty") + + try: + result = unreal.N2CPythonBridge.remove_function_entry_pin(pin_name.strip()) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"remove_function_entry_pin failed: {e}") + return make_error_result(str(e)) + + +# ============== Function Return Pins ============== + +def add_function_return_pin( + pin_name: str, + type_identifier: str, + tooltip: str = "" +) -> Dict[str, Any]: + """ + Add a return value to the currently focused Blueprint function. + + Blueprint functions can have multiple return values (unlike C++). + The function graph must be focused in the Blueprint editor. + + Args: + pin_name: Name for the new return value + type_identifier: Type identifier, e.g.: + - Primitives: "bool", "int32", "float", "FString" + - Vectors: "FVector", "FRotator", "FTransform" + - Objects: "/Script/Engine.Actor" + tooltip: Optional tooltip description for the return value + + Returns: + { + success: bool, + data: { + pinName: str, # Requested name + actualName: str, # Actual name (may differ if made unique) + pinGuid: str, # Unique pin identifier + functionName: str, # Name of the function + typeIdentifier: str # Resolved type + }, + error: str or None + } + + Example: + # Add a boolean return value + result = add_function_return_pin( + pin_name="bSuccess", + type_identifier="bool", + tooltip="True if the operation succeeded" + ) + + # Add a found actor return value + result = add_function_return_pin( + pin_name="FoundActor", + type_identifier="/Script/Engine.Actor" + ) + """ + if not pin_name or not pin_name.strip(): + return make_error_result("pin_name cannot be empty") + + if not type_identifier or not type_identifier.strip(): + return make_error_result("type_identifier cannot be empty") + + try: + result = unreal.N2CPythonBridge.add_function_return_pin( + pin_name.strip(), + type_identifier.strip(), + tooltip + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"add_function_return_pin failed: {e}") + return make_error_result(str(e)) + + +def remove_function_return_pin(pin_name: str) -> Dict[str, Any]: + """ + Remove a return value from the currently focused Blueprint function. + + Args: + pin_name: Name of the return pin to remove (internal name or display name) + + Returns: + { + success: bool, + data: { + removedPin: str, + functionName: str + }, + error: str or None + } + + Example: + result = remove_function_return_pin("UnusedReturn") + if result['success']: + print(f"Removed return pin from {result['data']['functionName']}") + """ + if not pin_name or not pin_name.strip(): + return make_error_result("pin_name cannot be empty") + + try: + result = unreal.N2CPythonBridge.remove_function_return_pin(pin_name.strip()) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"remove_function_return_pin failed: {e}") + return make_error_result(str(e)) diff --git a/Content/Python/nodetocode/graph.py b/Content/Python/nodetocode/graph.py new file mode 100644 index 0000000..7430fda --- /dev/null +++ b/Content/Python/nodetocode/graph.py @@ -0,0 +1,534 @@ +""" +Python wrappers for Blueprint graph editing operations. + +These functions enable programmatic Blueprint graph manipulation from Python: +- Search for and add Blueprint nodes +- Connect pins between nodes +- Delete nodes from graphs +- Set input pin default values +- Find/enumerate nodes in a graph +- Create comment nodes for organization + +WORKFLOW - The typical pattern for building Blueprint logic: +1. search_blueprint_nodes() - Find available nodes matching your query +2. add_node_to_graph() - Add a node, returns nodeGuid and pinGuid values +3. connect_pins() - Wire nodes together using the GUIDs from step 2 + +Usage: + import nodetocode as n2c + + # Step 1: Search for nodes + results = n2c.search_blueprint_nodes("Print String") + + # Step 2: Add a node to the focused graph + node = n2c.add_node_to_graph( + results['data']['nodes'][0]['displayName'], + results['data']['nodes'][0]['spawnMetadata']['actionIdentifier'] + ) + + # Step 3: Connect pins between nodes (IMPORTANT: This is the next step after add_node_to_graph) + # Use the nodeGuid and pinGuid values returned from add_node_to_graph + n2c.connect_pins([{ + "from": {"nodeGuid": node['data']['nodeGuid'], "pinGuid": "..."}, + "to": {"nodeGuid": "...", "pinGuid": "..."} + }]) + + # Find existing nodes in the graph (use when you need GUIDs for existing nodes) + found = n2c.find_nodes_in_graph(["Print"]) +""" + +import json +from typing import Dict, Any, List, Optional, Union + +import unreal + +from .utils import make_success_result, make_error_result, log_info, log_warning, log_error + + +def _parse_bridge_result(json_str: str) -> Dict[str, Any]: + """ + Parse JSON result from C++ bridge functions. + + Args: + json_str: JSON string from bridge function + + Returns: + Parsed dictionary with success/data/error structure + """ + if not json_str: + return make_error_result("Empty response from bridge function") + + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + log_error(f"Failed to parse bridge response: {e}") + return make_error_result(f"Failed to parse bridge response: {e}") + + +# ============== Node Search & Creation ============== + +def search_blueprint_nodes( + search_term: str, + context_sensitive: bool = True, + max_results: int = 20, + category: str = "", + exclude_vm_functions: bool = True +) -> Dict[str, Any]: + """ + Search for Blueprint nodes/actions matching a search term. + + This searches through available Blueprint actions that can be added to graphs. + Use the returned actionIdentifier with add_node_to_graph() to spawn nodes. + + Args: + search_term: Text query to search for (e.g., "Print String", "Branch") + context_sensitive: If True, filters results based on focused Blueprint context + max_results: Maximum number of results to return (1-100) + category: Optional category filter. Common categories: + - "flowcontrol" - Branch, Sequence, ForLoop, etc. + - "operators" - Math operators (+, -, *, /) + - "struct" - Make/Break struct nodes + - "casting" - Cast nodes + - "math" - Math functions + exclude_vm_functions: If True (default), excludes low-level VM math functions + like Multiply_FloatFloat that are rarely used directly + + Returns: + { + success: bool, + data: { + nodes: [ + { + name: str, + displayName: str, + spawnMetadata: { + actionIdentifier: str, + isContextSensitive: bool + } + }, + ... + ], + count: int + }, + error: str or None + } + + Example: + # Search for Branch in flow control category + results = search_blueprint_nodes("Branch", category="flowcontrol") + + # Search for all print nodes + results = search_blueprint_nodes("Print String") + if results['success'] and results['data']['count'] > 0: + node = results['data']['nodes'][0] + print(f"Found: {node['displayName']}") + print(f"ActionID: {node['spawnMetadata']['actionIdentifier']}") + """ + if not search_term or not search_term.strip(): + return make_error_result("search_term cannot be empty") + + try: + result = unreal.N2CPythonBridge.search_blueprint_nodes( + search_term.strip(), + context_sensitive, + max_results, + category, + exclude_vm_functions + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"search_blueprint_nodes failed: {e}") + return make_error_result(str(e)) + + +def add_node_to_graph( + node_name: str, + action_identifier: str, + location_x: float = 0.0, + location_y: float = 0.0 +) -> Dict[str, Any]: + """ + Add a Blueprint node to the currently focused graph. + + Use search_blueprint_nodes() first to get the actionIdentifier for the node. + The response includes full pin information, allowing immediate connection + without needing to call find_nodes_in_graph(). + + NEXT STEP: After adding a node, use connect_pins() to wire the node's pins + to other nodes using the nodeGuid and pinGuid values returned in this response. + + Args: + node_name: Name of the node (used for action lookup) + action_identifier: Identifier from search results spawnMetadata + location_x: X position for node placement + location_y: Y position for node placement + + Returns: + { + success: bool, + data: { + nodeGuid: str, + nodeName: str, + graphName: str, + blueprintName: str, + location: {x: float, y: float}, + inputPins: [ + {pinGuid: str, pinName: str, displayName: str, type: str, defaultValue?: str}, + ... + ], + outputPins: [ + {pinGuid: str, pinName: str, displayName: str, type: str}, + ... + ], + nextStep: { + function: "connect_pins", + description: str # Guidance on using connect_pins() + } + }, + error: str or None + } + + Example: + # First search for the node + results = search_blueprint_nodes("Print String") + node_info = results['data']['nodes'][0] + + # Then add it + added = add_node_to_graph( + node_info['displayName'], + node_info['spawnMetadata']['actionIdentifier'], + location_x=400, + location_y=200 + ) + if added['success']: + print(f"Added node with GUID: {added['data']['nodeGuid']}") + # Pin GUIDs are immediately available for connecting + exec_pin = next(p for p in added['data']['outputPins'] if p['type'] == 'exec') + print(f"Exec output pin: {exec_pin['pinGuid']}") + + # NEXT STEP: Connect this node to others using connect_pins() + # connect_pins([{ + # "from": {"nodeGuid": added['data']['nodeGuid'], "pinGuid": exec_pin['pinGuid']}, + # "to": {"nodeGuid": "other_node_guid", "pinGuid": "other_pin_guid"} + # }]) + """ + if not node_name or not node_name.strip(): + return make_error_result("node_name cannot be empty") + + if not action_identifier or not action_identifier.strip(): + return make_error_result("action_identifier cannot be empty") + + try: + result = unreal.N2CPythonBridge.add_node_to_graph( + node_name.strip(), + action_identifier.strip(), + location_x, + location_y + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"add_node_to_graph failed: {e}") + return make_error_result(str(e)) + + +# ============== Pin Operations ============== + +def connect_pins( + connections: List[Dict[str, Any]], + break_existing_links: bool = True +) -> Dict[str, Any]: + """ + Connect pins between Blueprint nodes. + + Each connection specifies a source pin and target pin using node/pin GUIDs. + + Args: + connections: List of connection specifications, each with: + { + "from": { + "nodeGuid": str, # Source node GUID + "pinGuid": str, # Source pin GUID + "pinName": str # Optional fallback name + }, + "to": { + "nodeGuid": str, # Target node GUID + "pinGuid": str, # Target pin GUID + "pinName": str # Optional fallback name + } + } + break_existing_links: If True, breaks existing connections before making new ones + + Returns: + { + success: bool, + data: { + succeeded: [{fromNode, fromPin, toNode, toPin}, ...], + failed: [{error: str}, ...], + summary: {succeeded: int, failed: int} + }, + error: str or None + } + + Example: + result = connect_pins([ + { + "from": {"nodeGuid": "ABC...", "pinGuid": "DEF..."}, + "to": {"nodeGuid": "GHI...", "pinGuid": "JKL..."} + } + ]) + if result['success']: + print(f"Connected {result['data']['summary']['succeeded']} pins") + """ + if not connections: + return make_error_result("connections list cannot be empty") + + try: + connections_json = json.dumps(connections) + result = unreal.N2CPythonBridge.connect_pins( + connections_json, + break_existing_links + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"connect_pins failed: {e}") + return make_error_result(str(e)) + + +def set_pin_value( + node_guid: str, + pin_guid: str, + value: str, + pin_name: str = "" +) -> Dict[str, Any]: + """ + Set the default value of an input pin on a Blueprint node. + + Only works on input pins that are not connected and not execution pins. + + Args: + node_guid: GUID of the node containing the pin + pin_guid: GUID of the pin to modify + value: The value to set (as string, will be converted to appropriate type) + pin_name: Optional pin name for fallback lookup if GUID fails + + Returns: + { + success: bool, + data: { + nodeGuid: str, + pinGuid: str, + pinName: str, + oldValue: str, + newValue: str + }, + error: str or None + } + + Example: + result = set_pin_value( + node_guid="ABC123...", + pin_guid="DEF456...", + value="Hello, World!" + ) + if result['success']: + print(f"Changed from '{result['data']['oldValue']}' to '{result['data']['newValue']}'") + """ + if not node_guid or not node_guid.strip(): + return make_error_result("node_guid cannot be empty") + + if not pin_guid or not pin_guid.strip(): + return make_error_result("pin_guid cannot be empty") + + try: + result = unreal.N2CPythonBridge.set_pin_value( + node_guid.strip(), + pin_guid.strip(), + value, + pin_name.strip() if pin_name else "" + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"set_pin_value failed: {e}") + return make_error_result(str(e)) + + +# ============== Node Operations ============== + +def delete_nodes( + node_guids: List[str], + preserve_connections: bool = False, + force: bool = False +) -> Dict[str, Any]: + """ + Delete nodes from the focused Blueprint graph. + + Args: + node_guids: List of node GUIDs to delete + preserve_connections: If True, attempts to bridge connections around deleted nodes + force: If True, bypasses protection checks for entry/result nodes + + Returns: + { + success: bool, + data: { + deletedNodes: [{guid: str, name: str}, ...], + deletedCount: int + }, + error: str or None + } + + Example: + result = delete_nodes(["ABC123...", "DEF456..."]) + if result['success']: + print(f"Deleted {result['data']['deletedCount']} nodes") + """ + if not node_guids: + return make_error_result("node_guids list cannot be empty") + + try: + guids_json = json.dumps(node_guids) + result = unreal.N2CPythonBridge.delete_nodes( + guids_json, + preserve_connections, + force + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"delete_nodes failed: {e}") + return make_error_result(str(e)) + + +def find_nodes_in_graph( + search_terms: List[str], + search_type: str = "keyword", + case_sensitive: bool = False, + max_results: int = 50 +) -> Dict[str, Any]: + """ + Find nodes in the focused Blueprint graph by keywords or GUIDs. + + Args: + search_terms: List of keywords or GUIDs to search for + search_type: Either "keyword" or "guid" + case_sensitive: Whether keyword search is case-sensitive + max_results: Maximum nodes to return (1-200) + + Returns: + { + success: bool, + data: { + nodes: [ + { + nodeGuid: str, + nodeName: str, + nodeClass: str, + posX: float, + posY: float, + inputPins: [{pinGuid, pinName, displayName, type}, ...], + outputPins: [{pinGuid, pinName, displayName, type}, ...] + }, + ... + ], + metadata: { + blueprintName: str, + graphName: str, + totalFound: int, + totalInGraph: int + } + }, + error: str or None + } + + Example: + # Find all Print nodes + result = find_nodes_in_graph(["Print"]) + + # Find specific node by GUID + result = find_nodes_in_graph( + ["ABC123-DEF456-..."], + search_type="guid" + ) + """ + if not search_terms: + return make_error_result("search_terms list cannot be empty") + + if search_type not in ("keyword", "guid"): + return make_error_result("search_type must be 'keyword' or 'guid'") + + try: + terms_json = json.dumps(search_terms) + result = unreal.N2CPythonBridge.find_nodes_in_graph( + terms_json, + search_type, + case_sensitive, + max_results + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"find_nodes_in_graph failed: {e}") + return make_error_result(str(e)) + + +def create_comment_node( + node_guids: List[str], + comment_text: str = "Comment", + color: Optional[Dict[str, float]] = None, + font_size: int = 18, + move_mode: str = "group", + padding: float = 50.0 +) -> Dict[str, Any]: + """ + Create a comment node around specified Blueprint nodes. + + Args: + node_guids: List of node GUIDs to encompass in the comment + comment_text: Text for the comment + color: RGB color dict with r, g, b values 0-1 (default: dark gray) + font_size: Font size (1-1000) + move_mode: "group" (nodes move with comment) or "none" + padding: Extra padding around nodes in pixels + + Returns: + { + success: bool, + data: { + commentGuid: str, + commentText: str, + includedNodeCount: int, + position: {x: float, y: float}, + size: {width: float, height: float} + }, + error: str or None + } + + Example: + # Create a green comment around nodes + result = create_comment_node( + node_guids=["ABC123...", "DEF456..."], + comment_text="Input Handling", + color={"r": 0.2, "g": 0.8, "b": 0.2}, + font_size=24 + ) + """ + if not node_guids: + return make_error_result("node_guids list cannot be empty") + + # Default color is dark gray + r = color.get("r", 0.075) if color else 0.075 + g = color.get("g", 0.075) if color else 0.075 + b = color.get("b", 0.075) if color else 0.075 + + try: + guids_json = json.dumps(node_guids) + result = unreal.N2CPythonBridge.create_comment_node( + guids_json, + comment_text, + r, g, b, + font_size, + move_mode, + padding + ) + return _parse_bridge_result(result) + except Exception as e: + log_error(f"create_comment_node failed: {e}") + return make_error_result(str(e)) diff --git a/Content/Python/nodetocode/scripts.py b/Content/Python/nodetocode/scripts.py new file mode 100644 index 0000000..d90567e --- /dev/null +++ b/Content/Python/nodetocode/scripts.py @@ -0,0 +1,946 @@ +""" +Script management for NodeToCode Python environment. + +Provides CRUD operations for Python scripts with dual-path search: +- Plugin bundled scripts: Plugins/NodeToCode/Content/Python/scripts/ (read-only) +- Project user scripts: Content/Python/scripts/ (read-write) + +Scripts are indexed in script_registry.json for fast search and discovery. +All list/search/get operations search BOTH locations, with project scripts +taking precedence over plugin scripts with the same name. +Save/delete operations only affect project scripts. + +Usage: + import nodetocode as n2c + + # List available scripts (includes both plugin and project scripts) + scripts = n2c.list_scripts() + + # Search for scripts + matches = n2c.search_scripts("health system") + + # Get script code and metadata + script = n2c.get_script("create_health_system") + # script["source"] will be "plugin" or "project" + + # Execute a saved script + result = n2c.run_script("create_health_system", initial_health=100.0) + + # Save a new script (always saves to project folder) + n2c.save_script( + "my_script", + "print('Hello!')", + "A simple hello script", + tags=["example", "hello"] + ) + + # Delete a script (only project scripts can be deleted) + n2c.delete_script("my_script") +""" + +import os +import json +import re +from datetime import datetime +from typing import Dict, Any, Optional, List + +import unreal + +from .utils import make_success_result, make_error_result, log_info, log_warning, log_error + + +# Script storage paths +def _get_project_scripts_dir() -> str: + """Get the project-level scripts directory (user scripts).""" + return os.path.join(unreal.Paths.project_content_dir(), "Python", "scripts") + + +def _get_plugin_scripts_dir() -> str: + """Get the plugin's bundled scripts directory.""" + # Find the NodeToCode plugin's Content/Python/scripts folder + plugin_dir = unreal.Paths.project_plugins_dir() + return os.path.join(plugin_dir, "NodeToCode", "Content", "Python", "scripts") + + +def _get_scripts_dir() -> str: + """Get the primary scripts storage directory (project-level for user scripts).""" + return _get_project_scripts_dir() + + +def _get_registry_path() -> str: + """Get the project script registry JSON file path.""" + return os.path.join(_get_project_scripts_dir(), "script_registry.json") + + +def _get_plugin_registry_path() -> str: + """Get the plugin's bundled script registry JSON file path.""" + return os.path.join(_get_plugin_scripts_dir(), "script_registry.json") + + +def _ensure_scripts_dir() -> bool: + """ + Ensure the scripts directory structure exists. + + Returns: + True if directory exists or was created, False on error + """ + scripts_dir = _get_scripts_dir() + try: + if not os.path.exists(scripts_dir): + os.makedirs(scripts_dir) + log_info(f"Created scripts directory: {scripts_dir}") + + # Ensure default category exists + general_dir = os.path.join(scripts_dir, "general") + if not os.path.exists(general_dir): + os.makedirs(general_dir) + + return True + except Exception as e: + log_error(f"Failed to create scripts directory: {e}") + return False + + +def _load_single_registry(registry_path: str) -> Dict[str, Any]: + """ + Load a single script registry from JSON. + + Args: + registry_path: Path to the registry JSON file + + Returns: + Registry dictionary or empty default structure + """ + if not os.path.exists(registry_path): + return { + "version": "1.0", + "scripts": {}, + "categories": ["general"], + "stats": { + "total_scripts": 0, + "last_updated": datetime.utcnow().isoformat() + "Z" + } + } + + try: + with open(registry_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + log_error(f"Failed to load script registry from {registry_path}: {e}") + return { + "version": "1.0", + "scripts": {}, + "categories": ["general"], + "stats": { + "total_scripts": 0, + "last_updated": datetime.utcnow().isoformat() + "Z" + } + } + + +def _load_registry() -> Dict[str, Any]: + """ + Load and merge script registries from both plugin and project directories. + + Plugin scripts are loaded first, then project scripts are merged on top. + Project scripts with the same name will override plugin scripts. + + Returns: + Merged registry dictionary + """ + # Load plugin's bundled scripts first + plugin_registry = _load_single_registry(_get_plugin_registry_path()) + + # Mark plugin scripts with their source + for name, info in plugin_registry.get("scripts", {}).items(): + info["_source"] = "plugin" + # Store the base directory for resolving paths + info["_base_dir"] = _get_plugin_scripts_dir() + + # Load project's user scripts + project_registry = _load_single_registry(_get_registry_path()) + + # Mark project scripts with their source + for name, info in project_registry.get("scripts", {}).items(): + info["_source"] = "project" + info["_base_dir"] = _get_project_scripts_dir() + + # Merge: project scripts override plugin scripts with same name + merged_scripts = {} + merged_scripts.update(plugin_registry.get("scripts", {})) + merged_scripts.update(project_registry.get("scripts", {})) + + # Merge categories + merged_categories = list(set( + plugin_registry.get("categories", ["general"]) + + project_registry.get("categories", ["general"]) + )) + + return { + "version": "1.0", + "scripts": merged_scripts, + "categories": sorted(merged_categories), + "stats": { + "total_scripts": len(merged_scripts), + "last_updated": datetime.utcnow().isoformat() + "Z" + } + } + + +def _load_project_registry() -> Dict[str, Any]: + """ + Load only the project's script registry (not merged with plugin). + + Use this for save/delete operations that should only affect project scripts. + + Returns: + Project registry dictionary + """ + return _load_single_registry(_get_registry_path()) + + +def _save_registry(registry: Dict[str, Any]) -> bool: + """ + Save the script registry to JSON (project registry only). + + Args: + registry: Registry dictionary to save + + Returns: + True on success, False on error + """ + if not _ensure_scripts_dir(): + return False + + registry_path = _get_registry_path() + + # Remove internal metadata fields before saving + clean_registry = { + "version": registry.get("version", "1.0"), + "scripts": {}, + "categories": registry.get("categories", ["general"]), + "stats": registry.get("stats", {}) + } + + # Only save project scripts, exclude plugin scripts and internal fields + for name, info in registry.get("scripts", {}).items(): + if info.get("_source") != "plugin": + # Create a clean copy without internal fields + clean_info = {k: v for k, v in info.items() if not k.startswith("_")} + clean_registry["scripts"][name] = clean_info + + # Update stats + clean_registry["stats"]["total_scripts"] = len(clean_registry.get("scripts", {})) + clean_registry["stats"]["last_updated"] = datetime.utcnow().isoformat() + "Z" + + try: + with open(registry_path, 'w', encoding='utf-8') as f: + json.dump(clean_registry, f, indent=2) + return True + except Exception as e: + log_error(f"Failed to save script registry: {e}") + return False + + +def _sanitize_name(name: str) -> str: + """ + Sanitize a script name to be a valid filename. + + Args: + name: Script name to sanitize + + Returns: + Sanitized name safe for filesystem + """ + # Replace spaces and special chars with underscores + sanitized = re.sub(r'[^\w\-]', '_', name.lower()) + # Remove consecutive underscores + sanitized = re.sub(r'_+', '_', sanitized) + # Remove leading/trailing underscores + sanitized = sanitized.strip('_') + return sanitized or "unnamed_script" + + +def _match_query(text: str, query: str) -> float: + """ + Calculate a simple match score between text and query. + + Args: + text: Text to search in + query: Search query + + Returns: + Match score from 0.0 to 1.0 + """ + if not text or not query: + return 0.0 + + text_lower = text.lower() + query_lower = query.lower() + + # Exact match + if query_lower == text_lower: + return 1.0 + + # Contains full query + if query_lower in text_lower: + return 0.8 + + # Word match - check if all query words appear + query_words = query_lower.split() + text_words = text_lower.split() + + matching_words = sum(1 for qw in query_words if any(qw in tw for tw in text_words)) + if matching_words == len(query_words): + return 0.6 + elif matching_words > 0: + return 0.3 * (matching_words / len(query_words)) + + return 0.0 + + +def list_scripts(category: Optional[str] = None, limit: int = 20) -> Dict[str, Any]: + """ + List available scripts with optional category filter. + + Args: + category: Filter by category (None for all) + limit: Max scripts to return (default 20) + + Returns: + {success, data: {scripts: [...], total: int, categories: [...]}, error} + + Each script entry contains: name, description, category, tags, usage_count + """ + try: + registry = _load_registry() + scripts = registry.get("scripts", {}) + categories = registry.get("categories", ["general"]) + + # Filter by category if specified + if category: + filtered = { + name: info for name, info in scripts.items() + if info.get("category", "general") == category + } + else: + filtered = scripts + + # Sort by usage_count (most used first), then by name + sorted_scripts = sorted( + filtered.items(), + key=lambda x: (-x[1].get("usage_count", 0), x[0]) + ) + + # Limit results + limited = sorted_scripts[:limit] + + # Format output (exclude full code, just metadata) + script_list = [] + for name, info in limited: + script_list.append({ + "name": name, + "description": info.get("description", ""), + "category": info.get("category", "general"), + "tags": info.get("tags", []), + "source": info.get("_source", "project"), + "usage_count": info.get("usage_count", 0), + "last_used": info.get("last_used"), + }) + + return make_success_result({ + "scripts": script_list, + "total": len(filtered), + "returned": len(script_list), + "categories": categories + }) + + except Exception as e: + log_error(f"list_scripts failed: {e}") + return make_error_result(str(e)) + + +def search_scripts(query: str, limit: int = 10) -> Dict[str, Any]: + """ + Search scripts by name, description, or tags. + + Args: + query: Search text (case-insensitive) + limit: Max results (default 10) + + Returns: + {success, data: {matches: [...], query: str, total: int}, error} + + Each match contains: name, description, category, tags, relevance score + """ + if not query or not query.strip(): + return make_error_result("Search query cannot be empty") + + try: + registry = _load_registry() + scripts = registry.get("scripts", {}) + + # Score each script + scored = [] + for name, info in scripts.items(): + # Check name + name_score = _match_query(name, query) + + # Check description + desc_score = _match_query(info.get("description", ""), query) + + # Check tags + tags = info.get("tags", []) + tag_score = max((_match_query(tag, query) for tag in tags), default=0.0) + + # Combined score (name weighted highest) + combined = max(name_score * 1.0, desc_score * 0.8, tag_score * 0.9) + + if combined > 0.1: # Threshold for inclusion + scored.append({ + "name": name, + "description": info.get("description", ""), + "category": info.get("category", "general"), + "tags": tags, + "source": info.get("_source", "project"), + "relevance": round(combined, 2), + "usage_count": info.get("usage_count", 0), + }) + + # Sort by relevance, then usage + scored.sort(key=lambda x: (-x["relevance"], -x["usage_count"])) + + # Limit results + matches = scored[:limit] + + return make_success_result({ + "matches": matches, + "query": query, + "total": len(scored), + "returned": len(matches) + }) + + except Exception as e: + log_error(f"search_scripts failed: {e}") + return make_error_result(str(e)) + + +def get_script(name: str) -> Dict[str, Any]: + """ + Load full script code and metadata by name. + + Searches both plugin bundled scripts and project user scripts. + + Args: + name: Script name (without .py extension) + + Returns: + {success, data: {name, code, description, tags, category, parameters, source, ...}, error} + """ + if not name or not name.strip(): + return make_error_result("Script name cannot be empty") + + try: + registry = _load_registry() + scripts = registry.get("scripts", {}) + + # Find script (case-insensitive) + name_lower = name.lower() + script_info = None + actual_name = None + + for script_name, info in scripts.items(): + if script_name.lower() == name_lower: + script_info = info + actual_name = script_name + break + + if not script_info: + return make_error_result(f"Script '{name}' not found") + + # Get the base directory for this script (plugin or project) + base_dir = script_info.get("_base_dir", _get_scripts_dir()) + script_path = os.path.join(base_dir, script_info.get("path", "")) + + if not os.path.exists(script_path): + return make_error_result(f"Script file not found: {script_path}") + + with open(script_path, 'r', encoding='utf-8') as f: + code = f.read() + + return make_success_result({ + "name": actual_name, + "code": code, + "description": script_info.get("description", ""), + "tags": script_info.get("tags", []), + "category": script_info.get("category", "general"), + "parameters": script_info.get("parameters", []), + "path": script_info.get("path"), + "source": script_info.get("_source", "project"), + "created": script_info.get("created"), + "last_used": script_info.get("last_used"), + "usage_count": script_info.get("usage_count", 0), + }) + + except Exception as e: + log_error(f"get_script failed: {e}") + return make_error_result(str(e)) + + +def run_script(name: str, **kwargs) -> Dict[str, Any]: + """ + Execute a saved script with optional parameters. + + Parameters can be passed as keyword arguments and will be available + as local variables in the script execution context. + + Args: + name: Script name to execute + **kwargs: Parameters to pass to the script as local variables + + Returns: + {success, data: