Skip to content
291 changes: 287 additions & 4 deletions scripts/update_lib/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
- Test dependencies (auto-detected from 'from test import ...')
"""

import ast
import functools
import pathlib
from collections import deque

from update_lib.io_utils import read_python_files, safe_parse_ast, safe_read_text
from update_lib.path import construct_lib_path, resolve_module_path
Expand Down Expand Up @@ -226,8 +228,6 @@ def parse_test_imports(content: str) -> set[str]:
Returns:
Set of module names imported from test package
"""
import ast

tree = safe_parse_ast(content)
if tree is None:
return set()
Expand Down Expand Up @@ -262,8 +262,6 @@ def parse_lib_imports(content: str) -> set[str]:
Returns:
Set of imported module names (top-level only)
"""
import ast

tree = safe_parse_ast(content)
if tree is None:
return set()
Expand Down Expand Up @@ -504,3 +502,288 @@ def resolve_all_paths(
result["data"].append(data_path)

return result


@functools.cache
def _build_import_graph(lib_prefix: str = "Lib") -> dict[str, set[str]]:
"""Build a graph of module imports from lib_prefix directory.

Args:
lib_prefix: RustPython Lib directory (default: "Lib")

Returns:
Dict mapping module_name -> set of modules it imports
"""
lib_dir = pathlib.Path(lib_prefix)
if not lib_dir.exists():
return {}

import_graph: dict[str, set[str]] = {}

# Scan all .py files in lib_prefix (excluding test/ directory for module imports)
for entry in lib_dir.iterdir():
if entry.name.startswith(("_", ".")):
continue
if entry.name == "test":
continue

module_name = None
if entry.is_file() and entry.suffix == ".py":
module_name = entry.stem
elif entry.is_dir() and (entry / "__init__.py").exists():
module_name = entry.name

if module_name:
# Parse imports from this module
imports = set()
for _, content in read_python_files(entry):
imports.update(parse_lib_imports(content))
# Remove self-imports
imports.discard(module_name)
import_graph[module_name] = imports

return import_graph


def _build_reverse_graph(import_graph: dict[str, set[str]]) -> dict[str, set[str]]:
"""Build reverse dependency graph (who imports this module).

Args:
import_graph: Forward import graph (module -> imports)

Returns:
Reverse graph (module -> imported_by)
"""
reverse_graph: dict[str, set[str]] = {}

for module, imports in import_graph.items():
for imported in imports:
if imported not in reverse_graph:
reverse_graph[imported] = set()
reverse_graph[imported].add(module)

return reverse_graph


@functools.cache
def get_transitive_imports(
module_name: str,
lib_prefix: str = "Lib",
) -> frozenset[str]:
"""Get all modules that transitively depend on module_name.

Args:
module_name: Target module
lib_prefix: RustPython Lib directory (default: "Lib")

Returns:
Frozenset of module names that import module_name (directly or indirectly)
"""
import_graph = _build_import_graph(lib_prefix)
reverse_graph = _build_reverse_graph(import_graph)

# BFS from module_name following reverse edges
visited: set[str] = set()
queue = deque(reverse_graph.get(module_name, set()))

while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
# Add modules that import current module
for importer in reverse_graph.get(current, set()):
if importer not in visited:
queue.append(importer)

return frozenset(visited)


def _parse_test_submodule_imports(content: str) -> dict[str, set[str]]:
"""Parse 'from test.X import Y' to get submodule imports.

Args:
content: Python file content

Returns:
Dict mapping submodule (e.g., "test_bar") -> set of imported names (e.g., {"helper"})
"""
tree = safe_parse_ast(content)
if tree is None:
return {}

result: dict[str, set[str]] = {}
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if node.module and node.module.startswith("test."):
# from test.test_bar import helper -> test_bar: {helper}
parts = node.module.split(".")
if len(parts) >= 2:
submodule = parts[1]
if submodule not in ("support", "__init__"):
if submodule not in result:
result[submodule] = set()
for alias in node.names:
result[submodule].add(alias.name)

return result


def _build_test_import_graph(test_dir: pathlib.Path) -> dict[str, set[str]]:
"""Build import graph for files within test directory (recursive).

Args:
test_dir: Path to Lib/test/ directory

Returns:
Dict mapping relative path (without .py) -> set of test modules it imports
"""
import_graph: dict[str, set[str]] = {}

# Use **/*.py to recursively find all Python files
for py_file in test_dir.glob("**/*.py"):
content = safe_read_text(py_file)
if content is None:
continue

imports = set()
# Parse "from test import X" style imports
imports.update(parse_test_imports(content))
# Also check direct imports of test modules
all_imports = parse_lib_imports(content)

# Check for files at same level or in test_dir
for imp in all_imports:
# Check in same directory
if (py_file.parent / f"{imp}.py").exists():
imports.add(imp)
# Check in test_dir root
if (test_dir / f"{imp}.py").exists():
imports.add(imp)

# Handle "from test.X import Y" where Y is a file in test_dir/X/
submodule_imports = _parse_test_submodule_imports(content)
for submodule, imported_names in submodule_imports.items():
submodule_dir = test_dir / submodule
if submodule_dir.is_dir():
for name in imported_names:
# Check if it's a file in the submodule directory
if (submodule_dir / f"{name}.py").exists():
imports.add(name)

# Use relative path from test_dir as key (without .py)
rel_path = py_file.relative_to(test_dir)
key = str(rel_path.with_suffix(""))
import_graph[key] = imports

return import_graph


@functools.cache
def find_tests_importing_module(
module_name: str,
lib_prefix: str = "Lib",
include_transitive: bool = True,
) -> frozenset[pathlib.Path]:
"""Find all test files that import the given module (directly or transitively).

Only returns test_*.py files. Support files (like pickletester.py, string_tests.py)
are used for transitive dependency calculation but not included in the result.

Args:
module_name: Module to search for (e.g., "datetime")
lib_prefix: RustPython Lib directory (default: "Lib")
include_transitive: Whether to include transitive dependencies

Returns:
Frozenset of test_*.py file paths that depend on this module
"""
lib_dir = pathlib.Path(lib_prefix)
test_dir = lib_dir / "test"

if not test_dir.exists():
return frozenset()

# Build set of modules to search for (Lib/ modules)
target_modules = {module_name}
if include_transitive:
# Add all modules that transitively depend on module_name
target_modules.update(get_transitive_imports(module_name, lib_prefix))

# Build test directory import graph for transitive analysis within test/
test_import_graph = _build_test_import_graph(test_dir)

# First pass: find all files (by relative path) that directly import target modules
directly_importing: set[str] = set()
for py_file in test_dir.glob("**/*.py"): # Recursive glob
content = safe_read_text(py_file)
if content is None:
continue
imports = parse_lib_imports(content)
if imports & target_modules:
rel_path = py_file.relative_to(test_dir)
directly_importing.add(str(rel_path.with_suffix("")))

# Second pass: find files that transitively import via support files within test/
# BFS to find all files that import any file in all_importing
all_importing = set(directly_importing)
queue = deque(directly_importing)
while queue:
current = queue.popleft()
# Extract the filename (stem) from the relative path for matching
current_path = pathlib.Path(current)
current_stem = current_path.name
# For __init__.py, the import name is the parent directory name
# e.g., "test_json/__init__" -> can be imported as "test_json"
if current_stem == "__init__":
current_stem = current_path.parent.name
for file_key, imports in test_import_graph.items():
if current_stem in imports and file_key not in all_importing:
all_importing.add(file_key)
queue.append(file_key)

# Filter to only test_*.py files and build result paths
result: set[pathlib.Path] = set()
for file_key in all_importing:
# file_key is like "test_foo" or "test_bar/test_sub"
path_parts = pathlib.Path(file_key)
filename = path_parts.name # Get just the filename part
if filename.startswith("test_"):
result.add(test_dir / f"{file_key}.py")

return frozenset(result)


def consolidate_test_paths(
test_paths: frozenset[pathlib.Path],
test_dir: pathlib.Path,
) -> frozenset[str]:
"""Consolidate test paths by grouping test_*/ directory contents into a single entry.

Args:
test_paths: Frozenset of absolute paths to test files
test_dir: Path to the test directory (e.g., Lib/test)

Returns:
Frozenset of consolidated test names:
- "test_foo" for Lib/test/test_foo.py
- "test_sqlite3" for any file in Lib/test/test_sqlite3/
"""
consolidated: set[str] = set()

for path in test_paths:
try:
rel_path = path.relative_to(test_dir)
parts = rel_path.parts

if len(parts) == 1:
# test_foo.py -> test_foo
consolidated.add(rel_path.stem)
else:
# test_sqlite3/test_dbapi.py -> test_sqlite3
consolidated.add(parts[0])
except ValueError:
# Path not relative to test_dir, use stem
consolidated.add(path.stem)

return frozenset(consolidated)
Loading