File size: 21,711 Bytes
d631808
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
# This module contains the code allowing to find modules.
#
# Note: It might be possible to replace a good part of this module's logic
# with utilities from `importlib` (however the util in question is private):
#
# ```pycon
# >>> from importlib.util import _find_spec
# >>> _find_spec("griffe.agents", _find_spec("griffe", None).submodule_search_locations)
# ModuleSpec(
#     name='griffe.agents',
#     loader=<_frozen_importlib_external.SourceFileLoader object at 0x7fa5f34e8110>,
#     origin='/media/data/dev/griffe/src/griffe/agents/__init__.py',
#     submodule_search_locations=['/media/data/dev/griffe/src/griffe/agents'],
# )
# ```

from __future__ import annotations

import ast
import os
import re
import sys
from collections import defaultdict
from contextlib import suppress
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar

from _griffe.exceptions import UnhandledEditableModuleError
from _griffe.logger import logger

if TYPE_CHECKING:
    from collections.abc import Iterator, Sequence
    from re import Pattern

    from _griffe.models import Module


_editable_editables_patterns = [re.compile(pat) for pat in (r"^__editables_\w+\.py$", r"^_editable_impl_\w+\.py$")]
_editable_setuptools_patterns = [re.compile(pat) for pat in (r"^__editable__\w+\.py$",)]
_editable_scikit_build_core_patterns = [re.compile(pat) for pat in (r"^_\w+_editable.py$",)]
_editable_meson_python_patterns = [re.compile(pat) for pat in (r"^_\w+_editable_loader.py$",)]

NamePartsType = tuple[str, ...]
"""Type alias for the parts of a module name."""
NamePartsAndPathType = tuple[NamePartsType, Path]
"""Type alias for the parts of a module name and its path."""


def _match_pattern(string: str, patterns: Sequence[Pattern]) -> bool:
    return any(pattern.match(string) for pattern in patterns)


@dataclass
class Package:
    """This class is a simple placeholder used during the process of finding packages.

    Parameters:
        name: The package name.
        path: The package path(s).
        stubs: An optional path to the related stubs file (.pyi).
    """

    name: str
    """Package name."""
    path: Path
    """Package folder path."""
    stubs: Path | None = None
    """Package stubs file."""


@dataclass
class NamespacePackage:
    """This class is a simple placeholder used during the process of finding packages.

    Parameters:
        name: The package name.
        path: The package paths.
    """

    name: str
    """Namespace package name."""
    path: list[Path]
    """Namespace package folder paths."""


class ModuleFinder:
    """The Griffe finder, allowing to find modules on the file system.

    The module finder is generally not used directly.
    Each [`GriffeLoader`][griffe.GriffeLoader] instance creates its own module finder instance.
    The finder can be configured when instantiating the loader
    thanks to the [loader][griffe.GriffeLoader]'s `search_paths` parameter.
    """

    accepted_py_module_extensions: ClassVar[list[str]] = [".py", ".pyc", ".pyo", ".pyd", ".pyi", ".so"]
    """List of extensions supported by the finder."""
    extensions_set: ClassVar[set[str]] = set(accepted_py_module_extensions)
    """Set of extensions supported by the finder."""

    def __init__(self, search_paths: Sequence[str | Path] | None = None) -> None:
        """Initialize the finder.

        Parameters:
            search_paths: Optional paths to search into.
        """
        self._paths_contents: dict[Path, list[Path]] = {}
        self.search_paths: list[Path] = []
        """The finder search paths."""

        # Optimization: pre-compute Paths to relieve CPU when joining paths.
        for path in search_paths or sys.path:
            self.append_search_path(Path(path))

        self._always_scan_for: dict[str, list[Path]] = defaultdict(list)
        self._extend_from_pth_files()

    def append_search_path(self, path: Path) -> None:
        """Append a search path.

        The path will be resolved (absolute, normalized).
        The path won't be appended if it is already in the search paths list.

        Parameters:
            path: The path to append.
        """
        path = path.resolve()
        if path not in self.search_paths:
            self.search_paths.append(path)

    def insert_search_path(self, position: int, path: Path) -> None:
        """Insert a search path at the given position.

        The path will be resolved (absolute, normalized).
        The path won't be inserted if it is already in the search paths list.

        Parameters:
            position: The insert position in the list.
            path: The path to insert.
        """
        path = path.resolve()
        if path not in self.search_paths:
            self.search_paths.insert(position, path)

    def find_spec(
        self,
        module: str | Path,
        *,
        try_relative_path: bool = True,
        find_stubs_package: bool = False,
    ) -> tuple[str, Package | NamespacePackage]:
        """Find the top-level parent module of a module.

        If a Path is passed, only try to find the module as a file path.
        If a string is passed, first try to find the module as a file path,
        then look into the search paths.

        Parameters:
            module: The module name or path.
            try_relative_path: Whether to try finding the module as a relative path,
                when the given module is not already a path.
            find_stubs_package: Whether to search for stubs-only package.
                If both the package and its stubs are found, they'll be merged together.
                If only the stubs are found, they'll be used as the package itself.

        Raises:
            FileNotFoundError: When a Path was passed and the module could not be found:

                - the directory has no `__init__.py` file in it
                - the path does not exist

            ModuleNotFoundError: When a string was passed and the module could not be found:

                - no `module/__init__.py`
                - no `module.py`
                - no `module.pth`
                - no `module` directory (namespace packages)
                - or unsupported .pth file

        Returns:
            The name of the module, and an instance representing its (namespace) package.
        """
        module_path: Path | list[Path]
        if isinstance(module, Path):
            module_name, module_path = self._module_name_path(module)
            top_module_name = self._top_module_name(module_path)
        elif try_relative_path:
            try:
                module_name, module_path = self._module_name_path(Path(module))
            except FileNotFoundError:
                module_name = module
                top_module_name = module.split(".", 1)[0]
            else:
                top_module_name = self._top_module_name(module_path)
        else:
            module_name = module
            top_module_name = module.split(".", 1)[0]

        # Only search for actual package, let exceptions bubble up.
        if not find_stubs_package:
            return module_name, self.find_package(top_module_name)

        # Search for both package and stubs-only package.
        try:
            package = self.find_package(top_module_name)
        except ModuleNotFoundError:
            package = None
        try:
            stubs = self.find_package(top_module_name + "-stubs")
        except ModuleNotFoundError:
            stubs = None

        # None found, raise error.
        if package is None and stubs is None:
            raise ModuleNotFoundError(top_module_name)

        # Both found, assemble them to be merged later.
        if package and stubs:
            if isinstance(package, Package) and isinstance(stubs, Package):
                package.stubs = stubs.path
            elif isinstance(package, NamespacePackage) and isinstance(stubs, NamespacePackage):
                package.path += stubs.path
            return module_name, package

        # Return either one.
        return module_name, package or stubs  # type: ignore[return-value]

    def find_package(self, module_name: str) -> Package | NamespacePackage:
        """Find a package or namespace package.

        Parameters:
            module_name: The module name.

        Raises:
            ModuleNotFoundError: When the module cannot be found.

        Returns:
            A package or namespace package wrapper.
        """
        filepaths = [
            Path(module_name),
            # TODO: Handle .py[cod] and .so files?
            # This would be needed for package that are composed
            # solely of a file with such an extension.
            Path(f"{module_name}.py"),
        ]

        real_module_name = module_name
        real_module_name = real_module_name.removesuffix("-stubs")
        namespace_dirs = []
        for path in self.search_paths:
            path_contents = self._contents(path)
            if path_contents:
                for choice in filepaths:
                    abs_path = path / choice
                    if abs_path in path_contents:
                        if abs_path.suffix:
                            stubs = abs_path.with_suffix(".pyi")
                            return Package(real_module_name, abs_path, stubs if stubs.exists() else None)
                        init_module = abs_path / "__init__.py"
                        if init_module.exists() and not _is_pkg_style_namespace(init_module):
                            stubs = init_module.with_suffix(".pyi")
                            return Package(real_module_name, init_module, stubs if stubs.exists() else None)
                        init_module = abs_path / "__init__.pyi"
                        if init_module.exists():
                            # Stubs package.
                            return Package(real_module_name, init_module, None)
                        namespace_dirs.append(abs_path)

        if namespace_dirs:
            return NamespacePackage(module_name, namespace_dirs)

        raise ModuleNotFoundError(module_name)

    def iter_submodules(
        self,
        path: Path | list[Path],
        seen: set | None = None,
    ) -> Iterator[NamePartsAndPathType]:
        """Iterate on a module's submodules, if any.

        Parameters:
            path: The module path.
            seen: If not none, this set is used to skip some files.
                The goal is to replicate the behavior of Python by
                only using the first packages (with `__init__` modules)
                of the same name found in different namespace packages.
                As soon as we find an `__init__` module, we add its parent
                path to the `seen` set, which will be reused when scanning
                the next namespace packages.

        Yields:
            name_parts (tuple[str, ...]): The parts of a submodule name.
            filepath (Path): A submodule filepath.
        """
        if isinstance(path, list):
            # We never enter this condition again in recursive calls,
            # so we just have to set `seen` once regardless of its value.
            seen = set()
            for path_elem in path:
                yield from self.iter_submodules(path_elem, seen)
            return

        if path.stem == "__init__":
            path = path.parent
        # Optimization: just check if the file name ends with .py[icod]/.so
        # (to distinguish it from a directory), not if it's an actual file.
        elif path.suffix in self.extensions_set:
            return

        # `seen` is only set when we scan a list of paths (namespace package).
        # `skip` is used to prevent yielding modules
        # of a regular subpackage that we already yielded
        # from another part of the namespace.
        skip = set(seen or ())

        for subpath in self._filter_py_modules(path):
            rel_subpath = subpath.relative_to(path)
            if rel_subpath.parent in skip:
                logger.debug("Skip %s, another module took precedence", subpath)
                continue
            py_file = rel_subpath.suffix == ".py"
            stem = rel_subpath.stem
            if not py_file:
                # `.py[cod]` and `.so` files look like `name.cpython-38-x86_64-linux-gnu.ext`.
                stem = stem.split(".", 1)[0]
            if stem == "__init__":
                # Optimization: since it's a relative path, if it has only one part
                # and is named __init__, it means it's the starting path
                # (no need to compare it against starting path).
                if len(rel_subpath.parts) == 1:
                    continue
                yield rel_subpath.parts[:-1], subpath
                if seen is not None:
                    seen.add(rel_subpath.parent)
            elif py_file:
                yield rel_subpath.with_suffix("").parts, subpath
            else:
                yield rel_subpath.with_name(stem).parts, subpath

    def submodules(self, module: Module) -> list[NamePartsAndPathType]:
        """Return the list of a module's submodules.

        Parameters:
            module: The parent module.

        Returns:
            A list of tuples containing the parts of the submodule name and its path.
        """
        return sorted(
            chain(
                self.iter_submodules(module.filepath),
                self.iter_submodules(self._always_scan_for[module.name]),
            ),
            key=_module_depth,
        )

    def _module_name_path(self, path: Path) -> tuple[str, Path]:
        # Always return absolute paths to avoid working-directory-dependent issues.
        path = path.absolute()
        if path.is_dir():
            for ext in self.accepted_py_module_extensions:
                module_path = path / f"__init__{ext}"
                if module_path.exists():
                    return path.name, module_path
            return path.name, path
        if path.exists():
            if path.stem == "__init__":
                return path.parent.name, path
            return path.stem, path
        raise FileNotFoundError

    def _contents(self, path: Path) -> list[Path]:
        if path not in self._paths_contents:
            try:
                self._paths_contents[path] = list(path.iterdir())
            except (FileNotFoundError, NotADirectoryError):
                self._paths_contents[path] = []
        return self._paths_contents[path]

    def _append_search_path(self, path: Path) -> None:
        if path not in self.search_paths:
            self.search_paths.append(path)

    def _extend_from_pth_files(self) -> None:
        for path in self.search_paths:
            for item in self._contents(path):
                if item.suffix == ".pth":
                    for directory in _handle_pth_file(item):
                        if scan := directory.always_scan_for:
                            self._always_scan_for[scan].append(directory.path.joinpath(scan))
                        self.append_search_path(directory.path)

    def _filter_py_modules(self, path: Path) -> Iterator[Path]:
        for root, dirs, files in os.walk(path, topdown=True, followlinks=True):
            # Optimization: modify dirs in-place to exclude `__pycache__` directories.
            dirs[:] = [dir for dir in dirs if dir != "__pycache__"]
            for relfile in files:
                if os.path.splitext(relfile)[1] in self.extensions_set:  # noqa: PTH122
                    yield Path(root, relfile)

    def _top_module_name(self, path: Path) -> str:
        # First find if a parent is in search paths.
        parent_path = path if path.is_dir() else path.parent
        # Always resolve parent path to compare for relativeness against resolved search paths.
        parent_path = parent_path.resolve()
        for search_path in self.search_paths:
            with suppress(ValueError, IndexError):
                rel_path = parent_path.relative_to(search_path.resolve())
                return rel_path.parts[0]
        # If not, get the highest directory with an `__init__` module,
        # add its parent to search paths and return it.
        while parent_path.parent != parent_path and (parent_path.parent / "__init__.py").exists():
            parent_path = parent_path.parent
        self.insert_search_path(0, parent_path.parent)
        return parent_path.name


_re_pkgresources = re.compile(r"(?:__import__\([\"']pkg_resources[\"']\).declare_namespace\(__name__\))")
_re_pkgutil = re.compile(r"(?:__path__ = __import__\([\"']pkgutil[\"']\).extend_path\(__path__, __name__\))")
_re_import_line = re.compile(r"^import[ \t]+\w+$")


# TODO: For more robustness, we should load and minify the AST
# to search for particular call statements.
def _is_pkg_style_namespace(init_module: Path) -> bool:
    code = init_module.read_text(encoding="utf8")
    return bool(_re_pkgresources.search(code) or _re_pkgutil.search(code))


def _module_depth(name_parts_and_path: NamePartsAndPathType) -> int:
    return len(name_parts_and_path[0])


@dataclass
class _SP:
    path: Path
    always_scan_for: str = ""


def _handle_pth_file(path: Path) -> list[_SP]:
    # Support for .pth files pointing to directories.
    # From https://docs.python.org/3/library/site.html:
    # A path configuration file is a file whose name has the form name.pth
    # and exists in one of the four directories mentioned above;
    # its contents are additional items (one per line) to be added to sys.path.
    # Non-existing items are never added to sys.path,
    # and no check is made that the item refers to a directory rather than a file.
    # No item is added to sys.path more than once.
    # Blank lines and lines beginning with # are skipped.
    # Lines starting with import (followed by space or tab) are executed.
    directories: list[_SP] = []
    try:
        # It turns out PyTorch recommends its users to use `.pth` as the extension
        # when saving models on the disk. These model files are not encoded in UTF8.
        # If UTF8 decoding fails, we skip the .pth file.
        text = path.read_text(encoding="utf8")
    except UnicodeDecodeError:
        return directories
    for line in text.strip().replace(";", "\n").splitlines(keepends=False):
        line = line.strip()  # noqa: PLW2901
        if _re_import_line.match(line):
            editable_module = path.parent / f"{line[len('import') :].lstrip()}.py"
            with suppress(UnhandledEditableModuleError):
                return _handle_editable_module(editable_module)
        if line and not line.startswith("#") and os.path.exists(line):  # noqa: PTH110
            directories.append(_SP(Path(line)))
    return directories


def _handle_editable_module(path: Path) -> list[_SP]:
    if _match_pattern(path.name, (*_editable_editables_patterns, *_editable_scikit_build_core_patterns)):
        # Support for how 'editables' write these files:
        # example line: `F.map_module('griffe', '/media/data/dev/griffe/src/griffe/__init__.py')`.
        # And how 'scikit-build-core' writes these files:
        # example line: `install({'griffe': '/media/data/dev/griffe/src/griffe/__init__.py'}, {'cmake_example': ...}, None, False, True)`.
        try:
            editable_lines = path.read_text(encoding="utf8").strip().splitlines(keepends=False)
        except FileNotFoundError as error:
            raise UnhandledEditableModuleError(path) from error
        new_path = Path(editable_lines[-1].split("'")[3])
        if new_path.name.startswith("__init__"):
            return [_SP(new_path.parent.parent)]
        return [_SP(new_path)]
    if _match_pattern(path.name, _editable_setuptools_patterns):
        # Support for how 'setuptools' writes these files:
        # example line: `MAPPING = {'griffe': '/media/data/dev/griffe/src/griffe', 'briffe': '/media/data/dev/griffe/src/briffe'}`.
        # with annotation: `MAPPING: dict[str, str] = {...}`.
        parsed_module = ast.parse(path.read_text())
        for node in parsed_module.body:
            if isinstance(node, ast.Assign):
                target = node.targets[0]
            elif isinstance(node, ast.AnnAssign):
                target = node.target
            else:
                continue
            if isinstance(target, ast.Name) and target.id == "MAPPING" and isinstance(node.value, ast.Dict):  # type: ignore[attr-defined]
                return [_SP(Path(cst.value).parent) for cst in node.value.values if isinstance(cst, ast.Constant)]  # type: ignore[attr-defined]
    if _match_pattern(path.name, _editable_meson_python_patterns):
        # Support for how 'meson-python' writes these files:
        # example line: `install({'package', 'module1'}, '/media/data/dev/griffe/build/cp311', ["path"], False)`.
        # Compiled modules then found in the cp311 folder, under src/package.
        parsed_module = ast.parse(path.read_text())
        for node in parsed_module.body:
            if (
                isinstance(node, ast.Expr)
                and isinstance(node.value, ast.Call)
                and isinstance(node.value.func, ast.Name)
                and node.value.func.id == "install"
                and isinstance(node.value.args[1], ast.Constant)
            ):
                build_path = Path(node.value.args[1].value, "src")
                # NOTE: What if there are multiple packages?
                pkg_name = next(build_path.iterdir()).name
                return [_SP(build_path, always_scan_for=pkg_name)]
    raise UnhandledEditableModuleError(path)