from __future__ import annotations import collections.abc as cabc import contextlib import io import os import shlex import shutil import sys import tempfile import typing as t from types import TracebackType from . import _compat from . import formatting from . import termui from . import utils from ._compat import _find_binary_reader if t.TYPE_CHECKING: from _typeshed import ReadableBuffer from .core import Command class EchoingStdin: def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: self._input = input self._output = output self._paused = False def __getattr__(self, x: str) -> t.Any: return getattr(self._input, x) def _echo(self, rv: bytes) -> bytes: if not self._paused: self._output.write(rv) return rv def read(self, n: int = -1) -> bytes: return self._echo(self._input.read(n)) def read1(self, n: int = -1) -> bytes: return self._echo(self._input.read1(n)) # type: ignore def readline(self, n: int = -1) -> bytes: return self._echo(self._input.readline(n)) def readlines(self) -> list[bytes]: return [self._echo(x) for x in self._input.readlines()] def __iter__(self) -> cabc.Iterator[bytes]: return iter(self._echo(x) for x in self._input) def __repr__(self) -> str: return repr(self._input) @contextlib.contextmanager def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: if stream is None: yield else: stream._paused = True yield stream._paused = False class BytesIOCopy(io.BytesIO): """Patch ``io.BytesIO`` to let the written stream be copied to another. .. versionadded:: 8.2 """ def __init__(self, copy_to: io.BytesIO) -> None: super().__init__() self.copy_to = copy_to def flush(self) -> None: super().flush() self.copy_to.flush() def write(self, b: ReadableBuffer) -> int: self.copy_to.write(b) return super().write(b) class StreamMixer: """Mixes `` and `` streams. The result is available in the ``output`` attribute. .. versionadded:: 8.2 """ def __init__(self) -> None: self.output: io.BytesIO = io.BytesIO() self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) class _NamedTextIOWrapper(io.TextIOWrapper): def __init__( self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any ) -> None: super().__init__(buffer, **kwargs) self._name = name self._mode = mode @property def name(self) -> str: return self._name @property def mode(self) -> str: return self._mode def __next__(self) -> str: # type: ignore try: line = super().__next__() except StopIteration as e: raise EOFError() from e return line def make_input_stream( input: str | bytes | t.IO[t.Any] | None, charset: str ) -> t.BinaryIO: # Is already an input stream. if hasattr(input, "read"): rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) if rv is not None: return rv raise TypeError("Could not find binary reader for input stream.") if input is None: input = b"" elif isinstance(input, str): input = input.encode(charset) return io.BytesIO(input) class Result: """Holds the captured result of an invoked CLI script. :param runner: The runner that created the result :param stdout_bytes: The standard output as bytes. :param stderr_bytes: The standard error as bytes. :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would see it in its terminal. :param return_value: The value returned from the invoked command. :param exit_code: The exit code as integer. :param exception: The exception that happened if one did. :param exc_info: Exception information (exception type, exception instance, traceback type). .. versionchanged:: 8.2 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and ``mix_stderr`` has been removed. .. versionadded:: 8.0 Added ``return_value``. """ def __init__( self, runner: CliRunner, stdout_bytes: bytes, stderr_bytes: bytes, output_bytes: bytes, return_value: t.Any, exit_code: int, exception: BaseException | None, exc_info: tuple[type[BaseException], BaseException, TracebackType] | None = None, ): self.runner = runner self.stdout_bytes = stdout_bytes self.stderr_bytes = stderr_bytes self.output_bytes = output_bytes self.return_value = return_value self.exit_code = exit_code self.exception = exception self.exc_info = exc_info @property def output(self) -> str: """The terminal output as unicode string, as the user would see it. .. versionchanged:: 8.2 No longer a proxy for ``self.stdout``. Now has its own independent stream that is mixing `` and ``, in the order they were written. """ return self.output_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) @property def stdout(self) -> str: """The standard output as unicode string.""" return self.stdout_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) @property def stderr(self) -> str: """The standard error as unicode string. .. versionchanged:: 8.2 No longer raise an exception, always returns the `` string. """ return self.stderr_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) def __repr__(self) -> str: exc_str = repr(self.exception) if self.exception else "okay" return f"<{type(self).__name__} {exc_str}>" class CliRunner: """The CLI runner provides functionality to invoke a Click command line script for unittesting purposes in a isolated environment. This only works in single-threaded systems without any concurrency as it changes the global interpreter state. :param charset: the character set for the input and output data. :param env: a dictionary with environment variables for overriding. :param echo_stdin: if this is set to `True`, then reading from `` writes to ``. This is useful for showing examples in some circumstances. Note that regular prompts will automatically echo the input. :param catch_exceptions: Whether to catch any exceptions other than ``SystemExit`` when running :meth:`~CliRunner.invoke`. .. versionchanged:: 8.2 Added the ``catch_exceptions`` parameter. .. versionchanged:: 8.2 ``mix_stderr`` parameter has been removed. """ def __init__( self, charset: str = "utf-8", env: cabc.Mapping[str, str | None] | None = None, echo_stdin: bool = False, catch_exceptions: bool = True, ) -> None: self.charset = charset self.env: cabc.Mapping[str, str | None] = env or {} self.echo_stdin = echo_stdin self.catch_exceptions = catch_exceptions def get_default_prog_name(self, cli: Command) -> str: """Given a command object it will return the default program name for it. The default is the `name` attribute or ``"root"`` if not set. """ return cli.name or "root" def make_env( self, overrides: cabc.Mapping[str, str | None] | None = None ) -> cabc.Mapping[str, str | None]: """Returns the environment overrides for invoking a script.""" rv = dict(self.env) if overrides: rv.update(overrides) return rv @contextlib.contextmanager def isolation( self, input: str | bytes | t.IO[t.Any] | None = None, env: cabc.Mapping[str, str | None] | None = None, color: bool = False, ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: """A context manager that sets up the isolation for invoking of a command line tool. This sets up `` with the given input data and `os.environ` with the overrides from the given dictionary. This also rebinds some internals in Click to be mocked (like the prompt functionality). This is automatically done in the :meth:`invoke` method. :param input: the input stream to put into `sys.stdin`. :param env: the environment overrides as dictionary. :param color: whether the output should contain color codes. The application can still override this explicitly. .. versionadded:: 8.2 An additional output stream is returned, which is a mix of `` and `` streams. .. versionchanged:: 8.2 Always returns the `` stream. .. versionchanged:: 8.0 `` is opened with ``errors="backslashreplace"`` instead of the default ``"strict"``. .. versionchanged:: 4.0 Added the ``color`` parameter. """ bytes_input = make_input_stream(input, self.charset) echo_input = None old_stdin = sys.stdin old_stdout = sys.stdout old_stderr = sys.stderr old_forced_width = formatting.FORCED_WIDTH formatting.FORCED_WIDTH = 80 env = self.make_env(env) stream_mixer = StreamMixer() if self.echo_stdin: bytes_input = echo_input = t.cast( t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) ) sys.stdin = text_input = _NamedTextIOWrapper( bytes_input, encoding=self.charset, name="", mode="r" ) if self.echo_stdin: # Force unbuffered reads, otherwise TextIOWrapper reads a # large chunk which is echoed early. text_input._CHUNK_SIZE = 1 # type: ignore sys.stdout = _NamedTextIOWrapper( stream_mixer.stdout, encoding=self.charset, name="", mode="w" ) sys.stderr = _NamedTextIOWrapper( stream_mixer.stderr, encoding=self.charset, name="", mode="w", errors="backslashreplace", ) @_pause_echo(echo_input) # type: ignore def visible_input(prompt: str | None = None) -> str: sys.stdout.write(prompt or "") val = next(text_input).rstrip("\r\n") sys.stdout.write(f"{val}\n") sys.stdout.flush() return val @_pause_echo(echo_input) # type: ignore def hidden_input(prompt: str | None = None) -> str: sys.stdout.write(f"{prompt or ''}\n") sys.stdout.flush() return next(text_input).rstrip("\r\n") @_pause_echo(echo_input) # type: ignore def _getchar(echo: bool) -> str: char = sys.stdin.read(1) if echo: sys.stdout.write(char) sys.stdout.flush() return char default_color = color def should_strip_ansi( stream: t.IO[t.Any] | None = None, color: bool | None = None ) -> bool: if color is None: return not default_color return not color old_visible_prompt_func = termui.visible_prompt_func old_hidden_prompt_func = termui.hidden_prompt_func old__getchar_func = termui._getchar old_should_strip_ansi = utils.should_strip_ansi # type: ignore old__compat_should_strip_ansi = _compat.should_strip_ansi termui.visible_prompt_func = visible_input termui.hidden_prompt_func = hidden_input termui._getchar = _getchar utils.should_strip_ansi = should_strip_ansi # type: ignore _compat.should_strip_ansi = should_strip_ansi old_env = {} try: for key, value in env.items(): old_env[key] = os.environ.get(key) if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) finally: for key, value in old_env.items(): if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value sys.stdout = old_stdout sys.stderr = old_stderr sys.stdin = old_stdin termui.visible_prompt_func = old_visible_prompt_func termui.hidden_prompt_func = old_hidden_prompt_func termui._getchar = old__getchar_func utils.should_strip_ansi = old_should_strip_ansi # type: ignore _compat.should_strip_ansi = old__compat_should_strip_ansi formatting.FORCED_WIDTH = old_forced_width def invoke( self, cli: Command, args: str | cabc.Sequence[str] | None = None, input: str | bytes | t.IO[t.Any] | None = None, env: cabc.Mapping[str, str | None] | None = None, catch_exceptions: bool | None = None, color: bool = False, **extra: t.Any, ) -> Result: """Invokes a command in an isolated environment. The arguments are forwarded directly to the command line script, the `extra` keyword arguments are passed to the :meth:`~clickpkg.Command.main` function of the command. This returns a :class:`Result` object. :param cli: the command to invoke :param args: the arguments to invoke. It may be given as an iterable or a string. When given as string it will be interpreted as a Unix shell command. More details at :func:`shlex.split`. :param input: the input data for `sys.stdin`. :param env: the environment overrides. :param catch_exceptions: Whether to catch any other exceptions than ``SystemExit``. If :data:`None`, the value from :class:`CliRunner` is used. :param extra: the keyword arguments to pass to :meth:`main`. :param color: whether the output should contain color codes. The application can still override this explicitly. .. versionadded:: 8.2 The result object has the ``output_bytes`` attribute with the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would see it in its terminal. .. versionchanged:: 8.2 The result object always returns the ``stderr_bytes`` stream. .. versionchanged:: 8.0 The result object has the ``return_value`` attribute with the value returned from the invoked command. .. versionchanged:: 4.0 Added the ``color`` parameter. .. versionchanged:: 3.0 Added the ``catch_exceptions`` parameter. .. versionchanged:: 3.0 The result object has the ``exc_info`` attribute with the traceback if available. """ exc_info = None if catch_exceptions is None: catch_exceptions = self.catch_exceptions with self.isolation(input=input, env=env, color=color) as outstreams: return_value = None exception: BaseException | None = None exit_code = 0 if isinstance(args, str): args = shlex.split(args) try: prog_name = extra.pop("prog_name") except KeyError: prog_name = self.get_default_prog_name(cli) try: return_value = cli.main(args=args or (), prog_name=prog_name, **extra) except SystemExit as e: exc_info = sys.exc_info() e_code = t.cast("int | t.Any | None", e.code) if e_code is None: e_code = 0 if e_code != 0: exception = e if not isinstance(e_code, int): sys.stdout.write(str(e_code)) sys.stdout.write("\n") e_code = 1 exit_code = e_code except Exception as e: if not catch_exceptions: raise exception = e exit_code = 1 exc_info = sys.exc_info() finally: sys.stdout.flush() sys.stderr.flush() stdout = outstreams[0].getvalue() stderr = outstreams[1].getvalue() output = outstreams[2].getvalue() return Result( runner=self, stdout_bytes=stdout, stderr_bytes=stderr, output_bytes=output, return_value=return_value, exit_code=exit_code, exception=exception, exc_info=exc_info, # type: ignore ) @contextlib.contextmanager def isolated_filesystem( self, temp_dir: str | os.PathLike[str] | None = None ) -> cabc.Iterator[str]: """A context manager that creates a temporary directory and changes the current working directory to it. This isolates tests that affect the contents of the CWD to prevent them from interfering with each other. :param temp_dir: Create the temporary directory under this directory. If given, the created directory is not removed when exiting. .. versionchanged:: 8.0 Added the ``temp_dir`` parameter. """ cwd = os.getcwd() dt = tempfile.mkdtemp(dir=temp_dir) os.chdir(dt) try: yield dt finally: os.chdir(cwd) if temp_dir is None: try: shutil.rmtree(dt) except OSError: pass