#!/usr/bin/env python3 """Small clangd navigation helper for agent-friendly C++ code lookup. Examples: python scripts/dev/clangd_nav.py symbols --file src/app_core/brush_ui.h python scripts/dev/clangd_nav.py definition --file src/node_panel_brush.cpp --line 410 --column 30 python scripts/dev/clangd_nav.py references --file src/app_core/brush_ui.h --line 192 --column 43 """ from __future__ import annotations import argparse import json import os from pathlib import Path import queue import subprocess import sys import threading import time from typing import Any DEFAULT_BUILD_DIRS = ( "out/build/windows-clangcl-asan", "out/build/android-arm64", ) def _repo_root() -> Path: return Path(__file__).resolve().parents[2] def _find_compile_commands_dir(repo_root: Path, requested: str | None) -> Path: if requested: path = Path(requested).expanduser() if not path.is_absolute(): path = repo_root / path if path.is_file(): path = path.parent if not (path / "compile_commands.json").exists(): raise SystemExit(f"compile_commands.json not found in {path}") return path.resolve() env_dir = os.environ.get("PP_CLANGD_COMPILE_COMMANDS_DIR") if env_dir: return _find_compile_commands_dir(repo_root, env_dir) for candidate in DEFAULT_BUILD_DIRS: path = repo_root / candidate if (path / "compile_commands.json").exists(): return path.resolve() matches = sorted((repo_root / "out" / "build").glob("*/compile_commands.json")) if matches: return matches[0].parent.resolve() raise SystemExit( "No compile_commands.json found. Configure a Ninja CMake preset first, " "or pass --compile-commands-dir." ) def _resolve_file(repo_root: Path, file_arg: str) -> Path: path = Path(file_arg).expanduser() if not path.is_absolute(): path = repo_root / path if not path.exists(): raise SystemExit(f"file not found: {path}") return path.resolve() def _read_lsp_message(stream: Any) -> dict[str, Any] | None: content_length: int | None = None while True: line = stream.readline() if not line: return None if line in (b"\r\n", b"\n"): break name, _, value = line.decode("ascii", errors="replace").partition(":") if name.lower() == "content-length": content_length = int(value.strip()) if content_length is None: return None payload = stream.read(content_length) if not payload: return None return json.loads(payload.decode("utf-8")) def _write_lsp_message(stream: Any, message: dict[str, Any]) -> None: payload = json.dumps(message, separators=(",", ":")).encode("utf-8") header = f"Content-Length: {len(payload)}\r\n\r\n".encode("ascii") stream.write(header + payload) stream.flush() class ClangdClient: def __init__( self, clangd: str, compile_commands_dir: Path, timeout_seconds: float, background_index: bool) -> None: self._timeout_seconds = timeout_seconds self._next_id = 1 self._responses: dict[int, dict[str, Any]] = {} self._condition = threading.Condition() self._messages: "queue.Queue[dict[str, Any]]" = queue.Queue() clangd_args = [ clangd, f"--compile-commands-dir={compile_commands_dir}", "--log=error", ] if not background_index: clangd_args.append("--background-index=false") self._process = subprocess.Popen( clangd_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) if self._process.stdin is None or self._process.stdout is None: raise RuntimeError("failed to open clangd stdio pipes") self._stdin = self._process.stdin self._stdout = self._process.stdout self._reader = threading.Thread(target=self._reader_loop, daemon=True) self._reader.start() def close(self) -> None: try: self.notify("exit", {}) except Exception: pass try: self._process.terminate() self._process.wait(timeout=2) except Exception: self._process.kill() def _reader_loop(self) -> None: while True: try: message = _read_lsp_message(self._stdout) except Exception as exc: message = {"error": {"message": f"failed to read clangd response: {exc}"}} if message is None: return if "id" in message: with self._condition: self._responses[int(message["id"])] = message self._condition.notify_all() else: self._messages.put(message) def request(self, method: str, params: dict[str, Any]) -> Any: request_id = self._next_id self._next_id += 1 _write_lsp_message( self._stdin, { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params, }, ) deadline = time.monotonic() + self._timeout_seconds with self._condition: while request_id not in self._responses: remaining = deadline - time.monotonic() if remaining <= 0: raise TimeoutError(f"clangd request timed out: {method}") self._condition.wait(remaining) response = self._responses.pop(request_id) if "error" in response: raise RuntimeError(response["error"].get("message", "clangd request failed")) return response.get("result") def notify(self, method: str, params: dict[str, Any]) -> None: _write_lsp_message( self._stdin, { "jsonrpc": "2.0", "method": method, "params": params, }, ) def _position_params(file_path: Path, line: int, column: int) -> dict[str, Any]: if line < 1 or column < 1: raise SystemExit("--line and --column are 1-based and must be positive") return { "textDocument": { "uri": file_path.as_uri() }, "position": { "line": line - 1, "character": column - 1 }, } def _range_to_json(range_value: dict[str, Any]) -> dict[str, Any]: start = range_value["start"] end = range_value["end"] return { "start": { "line": start["line"] + 1, "column": start["character"] + 1 }, "end": { "line": end["line"] + 1, "column": end["character"] + 1 }, } def _location_to_json(value: dict[str, Any]) -> dict[str, Any]: if "targetUri" in value: uri = value["targetUri"] range_value = value.get("targetRange", value.get("targetSelectionRange")) else: uri = value["uri"] range_value = value["range"] return { "uri": uri, "path": _uri_to_path(uri), "range": _range_to_json(range_value), } def _uri_to_path(uri: str) -> str: if uri.startswith("file:///"): raw = uri[8:] if len(raw) >= 3 and raw[1] == ":": return raw.replace("/", "\\") return "/" + raw return uri def _locations_to_json(result: Any) -> list[dict[str, Any]]: if result is None: return [] if isinstance(result, dict): return [_location_to_json(result)] return [_location_to_json(item) for item in result] def _symbols_to_json(symbols: list[dict[str, Any]]) -> list[dict[str, Any]]: def convert(symbol: dict[str, Any]) -> dict[str, Any]: item = { "name": symbol.get("name", ""), "detail": symbol.get("detail", ""), "kind": symbol.get("kind", 0), "range": _range_to_json(symbol["range"]), "selectionRange": _range_to_json(symbol.get("selectionRange", symbol["range"])), } children = symbol.get("children") if children: item["children"] = [convert(child) for child in children] return item return [convert(symbol) for symbol in symbols or []] def _flatten_symbols(symbols: list[dict[str, Any]], parent: str = "") -> list[dict[str, Any]]: flattened: list[dict[str, Any]] = [] for symbol in symbols: qualified_name = f"{parent}::{symbol['name']}" if parent else symbol["name"] item = { "name": symbol["name"], "qualifiedName": qualified_name, "detail": symbol.get("detail", ""), "kind": symbol.get("kind", 0), "range": symbol["range"], "selectionRange": symbol["selectionRange"], } flattened.append(item) flattened.extend(_flatten_symbols(symbol.get("children", []), qualified_name)) return flattened def _limit_results(values: list[dict[str, Any]], max_results: int) -> tuple[list[dict[str, Any]], bool]: if max_results < 1: return values, False return values[:max_results], len(values) > max_results def _hover_to_json(result: Any) -> dict[str, Any] | None: if not result: return None contents = result.get("contents") if isinstance(contents, dict): value = contents.get("value", "") elif isinstance(contents, list): value = "\n".join(str(item.get("value", item)) if isinstance(item, dict) else str(item) for item in contents) else: value = str(contents) output = { "contents": value } if "range" in result: output["range"] = _range_to_json(result["range"]) return output def _open_document(client: ClangdClient, file_path: Path) -> None: language_id = "cpp" if file_path.suffix.lower() in { ".h", ".hpp", ".hh", ".hxx" }: language_id = "cpp" elif file_path.suffix.lower() == ".c": language_id = "c" client.notify( "textDocument/didOpen", { "textDocument": { "uri": file_path.as_uri(), "languageId": language_id, "version": 1, "text": file_path.read_text(encoding="utf-8", errors="replace"), } }, ) def run(args: argparse.Namespace) -> int: repo_root = _repo_root() compile_commands_dir = _find_compile_commands_dir(repo_root, args.compile_commands_dir) file_path = _resolve_file(repo_root, args.file) if args.command == "references" and not args.background_index and not args.allow_incomplete_references: raise SystemExit( "references may be incomplete without clangd background indexing. " "Pass --background-index for a broader best-effort query or " "--allow-incomplete-references for current-translation-unit lookup." ) client = ClangdClient(args.clangd, compile_commands_dir, args.timeout, args.background_index) try: client.request( "initialize", { "processId": None, "rootUri": repo_root.as_uri(), "capabilities": { "textDocument": { "definition": { "linkSupport": True }, "declaration": { "linkSupport": True }, "implementation": { "linkSupport": True }, "references": {}, "hover": {}, "documentSymbol": { "hierarchicalDocumentSymbolSupport": True }, } }, }, ) client.notify("initialized", {}) _open_document(client, file_path) command = args.command result: Any result_count: int | None = None truncated = False if command == "symbols": symbols = _symbols_to_json( client.request("textDocument/documentSymbol", { "textDocument": { "uri": file_path.as_uri() } }) ) if args.hierarchical: result = symbols result_count = len(symbols) else: flattened = _flatten_symbols(symbols) if args.name: needle = args.name.lower() flattened = [ symbol for symbol in flattened if needle in symbol["qualifiedName"].lower() ] result_count = len(flattened) result, truncated = _limit_results(flattened, args.max_results) elif command == "hover": result = _hover_to_json(client.request("textDocument/hover", _position_params(file_path, args.line, args.column))) elif command == "references": params = _position_params(file_path, args.line, args.column) params["context"] = { "includeDeclaration": args.include_declaration } locations = _locations_to_json(client.request("textDocument/references", params)) result_count = len(locations) result, truncated = _limit_results(locations, args.max_results) else: method = { "definition": "textDocument/definition", "declaration": "textDocument/declaration", "implementation": "textDocument/implementation", }[command] locations = _locations_to_json(client.request(method, _position_params(file_path, args.line, args.column))) result_count = len(locations) result, truncated = _limit_results(locations, args.max_results) print(json.dumps( { "ok": True, "command": command, "file": str(file_path), "compileCommandsDir": str(compile_commands_dir), "backgroundIndex": args.background_index, "referenceCompleteness": ( "not-applicable" if command != "references" else ("best-effort-background-index" if args.background_index else "current-translation-unit-only") ), "resultCount": result_count, "truncated": truncated, "result": result, }, indent=2, )) return 0 finally: client.close() def main(argv: list[str]) -> int: parser = argparse.ArgumentParser(description="Navigate C++ symbols through clangd JSON-RPC.") parser.add_argument( "command", choices=("definition", "declaration", "implementation", "references", "hover", "symbols"), ) parser.add_argument("--file", required=True, help="Source/header file to open.") parser.add_argument("--line", type=int, default=1, help="1-based line for position commands.") parser.add_argument("--column", type=int, default=1, help="1-based column for position commands.") parser.add_argument( "--compile-commands-dir", help="Directory containing compile_commands.json. Defaults to PP_CLANGD_COMPILE_COMMANDS_DIR or known build dirs.", ) parser.add_argument("--clangd", default="clangd", help="clangd executable path.") parser.add_argument("--timeout", type=float, default=20.0, help="Request timeout in seconds.") parser.add_argument("--name", help="Case-insensitive symbol-name filter for symbols command.") parser.add_argument("--max-results", type=int, default=100, help="Maximum locations/symbols to print; <=0 disables.") parser.add_argument( "--background-index", action="store_true", help="Allow clangd to build/use its background index for broader cross-translation-unit references.", ) parser.add_argument( "--allow-incomplete-references", action="store_true", help="Permit current-translation-unit-only references when --background-index is not enabled.", ) parser.add_argument( "--hierarchical", action="store_true", help="Print nested document symbols instead of the compact flat symbol list.", ) parser.add_argument( "--include-declaration", action=argparse.BooleanOptionalAction, default=True, help="Include declaration in references results.", ) return run(parser.parse_args(argv)) if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))