File size: 20,584 Bytes
d631808
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
# This module contains all CLI-related things.
# Why does this file exist, and why not put this in `__main__`?
#
# We might be tempted to import things from `__main__` later,
# but that will cause problems; the code will get executed twice:
#
# - When we run `python -m griffe`, Python will execute
#   `__main__.py` as a script. That means there won't be any
#   `griffe.__main__` in `sys.modules`.
# - When you import `__main__` it will get executed again (as a module) because
#   there's no `griffe.__main__` in `sys.modules`.

from __future__ import annotations

import argparse
import json
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Callable

import colorama

from _griffe import debug
from _griffe.diff import find_breaking_changes
from _griffe.encoders import JSONEncoder
from _griffe.enumerations import ExplanationStyle, Parser
from _griffe.exceptions import ExtensionError, GitError
from _griffe.extensions.base import load_extensions
from _griffe.git import get_latest_tag, get_repo_root
from _griffe.loader import GriffeLoader, load, load_git
from _griffe.logger import logger

if TYPE_CHECKING:
    from collections.abc import Sequence

    from _griffe.extensions.base import Extension, Extensions


DEFAULT_LOG_LEVEL = os.getenv("GRIFFE_LOG_LEVEL", "INFO").upper()
"""The default log level for the CLI.

This can be overridden by the `GRIFFE_LOG_LEVEL` environment variable.
"""


class _DebugInfo(argparse.Action):
    def __init__(self, nargs: int | str | None = 0, **kwargs: Any) -> None:
        super().__init__(nargs=nargs, **kwargs)

    def __call__(self, *args: Any, **kwargs: Any) -> None:  # noqa: ARG002
        debug._print_debug_info()
        sys.exit(0)


def _print_data(data: str, output_file: str | IO | None) -> None:
    if isinstance(output_file, str):
        with open(output_file, "w") as fd:  # noqa: PTH123
            print(data, file=fd)
    else:
        if output_file is None:
            output_file = sys.stdout
        print(data, file=output_file)


def _load_packages(
    packages: Sequence[str],
    *,
    extensions: Extensions | None = None,
    search_paths: Sequence[str | Path] | None = None,
    docstring_parser: Parser | None = None,
    docstring_options: dict[str, Any] | None = None,
    resolve_aliases: bool = True,
    resolve_implicit: bool = False,
    resolve_external: bool | None = None,
    allow_inspection: bool = True,
    force_inspection: bool = False,
    store_source: bool = True,
    find_stubs_package: bool = False,
) -> GriffeLoader:
    # Create a single loader.
    loader = GriffeLoader(
        extensions=extensions,
        search_paths=search_paths,
        docstring_parser=docstring_parser,
        docstring_options=docstring_options,
        allow_inspection=allow_inspection,
        force_inspection=force_inspection,
        store_source=store_source,
    )

    # Load each package.
    for package in packages:
        if not package:
            logger.debug("Empty package name, continuing")
            continue
        logger.info("Loading package %s", package)
        try:
            loader.load(package, try_relative_path=True, find_stubs_package=find_stubs_package)
        except ModuleNotFoundError as error:
            logger.error("Could not find package %s: %s", package, error)  # noqa: TRY400
        except ImportError:
            logger.exception("Tried but could not import package %s", package)
    logger.info("Finished loading packages")

    # Resolve aliases.
    if resolve_aliases:
        logger.info("Starting alias resolution")
        unresolved, iterations = loader.resolve_aliases(implicit=resolve_implicit, external=resolve_external)
        if unresolved:
            logger.info("%s aliases were still unresolved after %s iterations", len(unresolved), iterations)
        else:
            logger.info("All aliases were resolved after %s iterations", iterations)
    return loader


_level_choices = ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")


def _extensions_type(value: str) -> Sequence[str | dict[str, Any]]:
    try:
        return json.loads(value)
    except json.JSONDecodeError:
        return value.split(",")


def get_parser() -> argparse.ArgumentParser:
    """Return the CLI argument parser.

    Returns:
        An argparse parser.
    """
    usage = "%(prog)s [GLOBAL_OPTS...] COMMAND [COMMAND_OPTS...]"
    description = "Signatures for entire Python programs. "
    "Extract the structure, the frame, the skeleton of your project, "
    "to generate API documentation or find breaking changes in your API."
    parser = argparse.ArgumentParser(add_help=False, usage=usage, description=description, prog="griffe")

    main_help = "Show this help message and exit. Commands also accept the -h/--help option."
    subcommand_help = "Show this help message and exit."

    global_options = parser.add_argument_group(title="Global options")
    global_options.add_argument("-h", "--help", action="help", help=main_help)
    global_options.add_argument("-V", "--version", action="version", version=f"%(prog)s {debug._get_version()}")
    global_options.add_argument("--debug-info", action=_DebugInfo, help="Print debug information.")

    def add_common_options(subparser: argparse.ArgumentParser) -> None:
        common_options = subparser.add_argument_group(title="Common options")
        common_options.add_argument("-h", "--help", action="help", help=subcommand_help)
        search_options = subparser.add_argument_group(title="Search options")
        search_options.add_argument(
            "-s",
            "--search",
            dest="search_paths",
            action="append",
            type=Path,
            help="Paths to search packages into.",
        )
        search_options.add_argument(
            "-y",
            "--sys-path",
            dest="append_sys_path",
            action="store_true",
            help="Whether to append `sys.path` to search paths specified with `-s`.",
        )
        loading_options = subparser.add_argument_group(title="Loading options")
        loading_options.add_argument(
            "-B",
            "--find-stubs-packages",
            dest="find_stubs_package",
            action="store_true",
            default=False,
            help="Whether to look for stubs-only packages and merge them with concrete ones.",
        )
        loading_options.add_argument(
            "-e",
            "--extensions",
            default={},
            type=_extensions_type,
            help="A list of extensions to use.",
        )
        loading_options.add_argument(
            "-X",
            "--no-inspection",
            dest="allow_inspection",
            action="store_false",
            default=True,
            help="Disallow inspection of builtin/compiled/not found modules.",
        )
        loading_options.add_argument(
            "-x",
            "--force-inspection",
            dest="force_inspection",
            action="store_true",
            default=False,
            help="Force inspection of everything, even when sources are found.",
        )
        debug_options = subparser.add_argument_group(title="Debugging options")
        debug_options.add_argument(
            "-L",
            "--log-level",
            metavar="LEVEL",
            default=DEFAULT_LOG_LEVEL,
            choices=_level_choices,
            type=str.upper,
            help="Set the log level: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`.",
        )

    # ========= SUBPARSERS ========= #
    subparsers = parser.add_subparsers(
        dest="subcommand",
        title="Commands",
        metavar="COMMAND",
        prog="griffe",
        required=True,
    )

    def add_subparser(command: str, text: str, **kwargs: Any) -> argparse.ArgumentParser:
        return subparsers.add_parser(command, add_help=False, help=text, description=text, **kwargs)

    # ========= DUMP PARSER ========= #
    dump_parser = add_subparser("dump", "Load package-signatures and dump them as JSON.")
    dump_options = dump_parser.add_argument_group(title="Dump options")
    dump_options.add_argument("packages", metavar="PACKAGE", nargs="+", help="Packages to find, load and dump.")
    dump_options.add_argument(
        "-f",
        "--full",
        action="store_true",
        default=False,
        help="Whether to dump full data in JSON.",
    )
    dump_options.add_argument(
        "-o",
        "--output",
        default=sys.stdout,
        help="Output file. Supports templating to output each package in its own file, with `{package}`.",
    )
    dump_options.add_argument(
        "-d",
        "--docstyle",
        dest="docstring_parser",
        default=None,
        type=Parser,
        help="The docstring style to parse.",
    )
    dump_options.add_argument(
        "-D",
        "--docopts",
        dest="docstring_options",
        default={},
        type=json.loads,
        help="The options for the docstring parser.",
    )
    dump_options.add_argument(
        "-r",
        "--resolve-aliases",
        action="store_true",
        help="Whether to resolve aliases.",
    )
    dump_options.add_argument(
        "-I",
        "--resolve-implicit",
        action="store_true",
        help="Whether to resolve implicitly exported aliases as well. "
        "Aliases are explicitly exported when defined in `__all__`.",
    )
    dump_options.add_argument(
        "-U",
        "--resolve-external",
        dest="resolve_external",
        action="store_true",
        help="Always resolve aliases pointing to external/unknown modules (not loaded directly)."
        "Default is to resolve only from one module to its private sibling (`ast` -> `_ast`).",
    )
    dump_options.add_argument(
        "--no-resolve-external",
        dest="resolve_external",
        action="store_false",
        help="Never resolve aliases pointing to external/unknown modules (not loaded directly)."
        "Default is to resolve only from one module to its private sibling (`ast` -> `_ast`).",
    )
    dump_options.add_argument(
        "-S",
        "--stats",
        action="store_true",
        help="Show statistics at the end.",
    )
    add_common_options(dump_parser)

    # ========= CHECK PARSER ========= #
    check_parser = add_subparser("check", "Check for API breakages or possible improvements.")
    check_options = check_parser.add_argument_group(title="Check options")
    check_options.add_argument("package", metavar="PACKAGE", help="Package to find, load and check, as path.")
    check_options.add_argument(
        "-a",
        "--against",
        metavar="REF",
        help="Older Git reference (commit, branch, tag) to check against. Default: load latest tag.",
    )
    check_options.add_argument(
        "-b",
        "--base-ref",
        metavar="BASE_REF",
        help="Git reference (commit, branch, tag) to check. Default: load current code.",
    )
    check_options.add_argument(
        "--color",
        dest="color",
        action="store_true",
        default=None,
        help="Force enable colors in the output.",
    )
    check_options.add_argument(
        "--no-color",
        dest="color",
        action="store_false",
        default=None,
        help="Force disable colors in the output.",
    )
    check_options.add_argument("-v", "--verbose", action="store_true", help="Verbose output.")
    formats = [fmt.value for fmt in ExplanationStyle]
    check_options.add_argument("-f", "--format", dest="style", choices=formats, default=None, help="Output format.")
    add_common_options(check_parser)

    return parser


def dump(
    packages: Sequence[str],
    *,
    output: str | IO | None = None,
    full: bool = False,
    docstring_parser: Parser | None = None,
    docstring_options: dict[str, Any] | None = None,
    extensions: Sequence[str | dict[str, Any] | Extension | type[Extension]] | None = None,
    resolve_aliases: bool = False,
    resolve_implicit: bool = False,
    resolve_external: bool | None = None,
    search_paths: Sequence[str | Path] | None = None,
    find_stubs_package: bool = False,
    append_sys_path: bool = False,
    allow_inspection: bool = True,
    force_inspection: bool = False,
    stats: bool = False,
) -> int:
    """Load packages data and dump it as JSON.

    Parameters:
        packages: The packages to load and dump.
        output: Where to output the JSON-serialized data.
        full: Whether to output full or minimal data.
        docstring_parser: The docstring parser to use. By default, no parsing is done.
        docstring_options: Additional docstring parsing options.
        resolve_aliases: Whether to resolve aliases (indirect objects references).
        resolve_implicit: Whether to resolve every alias or only the explicitly exported ones.
        resolve_external: Whether to load additional, unspecified modules to resolve aliases.
            Default is to resolve only from one module to its private sibling (`ast` -> `_ast`).
        extensions: The extensions to use.
        search_paths: The paths to search into.
        find_stubs_package: Whether to search for stubs-only packages.
            If both the package and its stubs are found, they'll be merged together.
            If only the stubs are found, they'll be used as the package itself.
        append_sys_path: Whether to append the contents of `sys.path` to the search paths.
        allow_inspection: Whether to allow inspecting modules when visiting them is not possible.
        force_inspection: Whether to force using dynamic analysis when loading data.
        stats: Whether to compute and log stats about loading.

    Returns:
        `0` for success, `1` for failure.
    """
    # Prepare options.
    per_package_output = False
    if isinstance(output, str) and output.format(package="package") != output:
        per_package_output = True

    search_paths = list(search_paths) if search_paths else []
    if append_sys_path:
        search_paths.extend(sys.path)

    try:
        loaded_extensions = load_extensions(*(extensions or ()))
    except ExtensionError:
        logger.exception("Could not load extensions")
        return 1

    # Load packages.
    loader = _load_packages(
        packages,
        extensions=loaded_extensions,
        search_paths=search_paths,
        docstring_parser=docstring_parser,
        docstring_options=docstring_options,
        resolve_aliases=resolve_aliases,
        resolve_implicit=resolve_implicit,
        resolve_external=resolve_external,
        allow_inspection=allow_inspection,
        force_inspection=force_inspection,
        store_source=False,
        find_stubs_package=find_stubs_package,
    )
    data_packages = loader.modules_collection.members

    # Serialize and dump packages.
    started = datetime.now(tz=timezone.utc)
    if per_package_output:
        for package_name, data in data_packages.items():
            serialized = data.as_json(indent=2, full=full, sort_keys=True)
            _print_data(serialized, output.format(package=package_name))  # type: ignore[union-attr]
    else:
        serialized = json.dumps(data_packages, cls=JSONEncoder, indent=2, full=full, sort_keys=True)
        _print_data(serialized, output)
    elapsed = datetime.now(tz=timezone.utc) - started

    if stats:
        loader_stats = loader.stats()
        loader_stats.time_spent_serializing = elapsed.microseconds
        logger.info(loader_stats.as_text())

    return 0 if len(data_packages) == len(packages) else 1


def check(
    package: str | Path,
    against: str | None = None,
    against_path: str | Path | None = None,
    *,
    base_ref: str | None = None,
    extensions: Sequence[str | dict[str, Any] | Extension | type[Extension]] | None = None,
    search_paths: Sequence[str | Path] | None = None,
    append_sys_path: bool = False,
    find_stubs_package: bool = False,
    allow_inspection: bool = True,
    force_inspection: bool = False,
    verbose: bool = False,
    color: bool | None = None,
    style: str | ExplanationStyle | None = None,
) -> int:
    """Check for API breaking changes in two versions of the same package.

    Parameters:
        package: The package to load and check.
        against: Older Git reference (commit, branch, tag) to check against.
        against_path: Path when the "against" reference is checked out.
        base_ref: Git reference (commit, branch, tag) to check.
        extensions: The extensions to use.
        search_paths: The paths to search into.
        append_sys_path: Whether to append the contents of `sys.path` to the search paths.
        allow_inspection: Whether to allow inspecting modules when visiting them is not possible.
        force_inspection: Whether to force using dynamic analysis when loading data.
        verbose: Use a verbose output.

    Returns:
        `0` for success, `1` for failure.
    """
    # Prepare options.
    search_paths = list(search_paths) if search_paths else []
    if append_sys_path:
        search_paths.extend(sys.path)

    try:
        against = against or get_latest_tag(package)
    except GitError as error:
        print(f"griffe: error: {error}", file=sys.stderr)
        return 2
    against_path = against_path or package
    repository = get_repo_root(against_path)

    try:
        loaded_extensions = load_extensions(*(extensions or ()))
    except ExtensionError:
        logger.exception("Could not load extensions")
        return 1

    # Load old and new version of the package.
    old_package = load_git(
        against_path,
        ref=against,
        repo=repository,
        extensions=loaded_extensions,
        search_paths=search_paths,
        allow_inspection=allow_inspection,
        force_inspection=force_inspection,
        resolve_aliases=True,
        resolve_external=None,
    )
    if base_ref:
        new_package = load_git(
            package,
            ref=base_ref,
            repo=repository,
            extensions=loaded_extensions,
            search_paths=search_paths,
            allow_inspection=allow_inspection,
            force_inspection=force_inspection,
            find_stubs_package=find_stubs_package,
            resolve_aliases=True,
            resolve_external=None,
        )
    else:
        new_package = load(
            package,
            try_relative_path=True,
            extensions=loaded_extensions,
            search_paths=search_paths,
            allow_inspection=allow_inspection,
            force_inspection=force_inspection,
            find_stubs_package=find_stubs_package,
            resolve_aliases=True,
            resolve_external=None,
        )

    # Find and display API breakages.
    breakages = list(find_breaking_changes(old_package, new_package))

    if color is None and (force_color := os.getenv("FORCE_COLOR", None)) is not None:
        color = force_color.lower() in {"1", "true", "y", "yes", "on"}
    colorama.deinit()
    colorama.init(strip=color if color is None else not color)

    if style is None:
        style = ExplanationStyle.VERBOSE if verbose else ExplanationStyle.ONE_LINE
    else:
        style = ExplanationStyle(style)
    for breakage in breakages:
        print(breakage.explain(style=style), file=sys.stderr)

    if breakages:
        return 1
    return 0


def main(args: list[str] | None = None) -> int:
    """Run the main program.

    This function is executed when you type `griffe` or `python -m griffe`.

    Parameters:
        args: Arguments passed from the command line.

    Returns:
        An exit code.
    """
    # Parse arguments.
    parser = get_parser()
    opts: argparse.Namespace = parser.parse_args(args)
    opts_dict = opts.__dict__
    opts_dict.pop("debug_info")
    subcommand = opts_dict.pop("subcommand")

    # Initialize logging.
    log_level = opts_dict.pop("log_level", DEFAULT_LOG_LEVEL)
    try:
        level = getattr(logging, log_level)
    except AttributeError:
        choices = "', '".join(_level_choices)
        print(
            f"griffe: error: invalid log level '{log_level}' (choose from '{choices}')",
            file=sys.stderr,
        )
        return 1
    else:
        logging.basicConfig(format="%(levelname)-10s %(message)s", level=level)

    # Run subcommand.
    commands: dict[str, Callable[..., int]] = {"check": check, "dump": dump}
    return commands[subcommand](**opts_dict)