|
import io |
|
import sys |
|
import typing |
|
import warnings |
|
from abc import ABC, abstractmethod |
|
from collections import deque |
|
from dataclasses import dataclass, field |
|
from datetime import timedelta |
|
from io import RawIOBase, UnsupportedOperation |
|
from math import ceil |
|
from mmap import mmap |
|
from operator import length_hint |
|
from os import PathLike, stat |
|
from threading import Event, RLock, Thread |
|
from types import TracebackType |
|
from typing import ( |
|
Any, |
|
BinaryIO, |
|
Callable, |
|
ContextManager, |
|
Deque, |
|
Dict, |
|
Generic, |
|
Iterable, |
|
List, |
|
NamedTuple, |
|
NewType, |
|
Optional, |
|
Sequence, |
|
TextIO, |
|
Tuple, |
|
Type, |
|
TypeVar, |
|
Union, |
|
) |
|
|
|
if sys.version_info >= (3, 8): |
|
from typing import Literal |
|
else: |
|
from typing_extensions import Literal |
|
|
|
if sys.version_info >= (3, 11): |
|
from typing import Self |
|
else: |
|
from typing_extensions import Self |
|
|
|
from . import filesize, get_console |
|
from .console import Console, Group, JustifyMethod, RenderableType |
|
from .highlighter import Highlighter |
|
from .jupyter import JupyterMixin |
|
from .live import Live |
|
from .progress_bar import ProgressBar |
|
from .spinner import Spinner |
|
from .style import StyleType |
|
from .table import Column, Table |
|
from .text import Text, TextType |
|
|
|
TaskID = NewType("TaskID", int) |
|
|
|
ProgressType = TypeVar("ProgressType") |
|
|
|
GetTimeCallable = Callable[[], float] |
|
|
|
|
|
_I = typing.TypeVar("_I", TextIO, BinaryIO) |
|
|
|
|
|
class _TrackThread(Thread): |
|
"""A thread to periodically update progress.""" |
|
|
|
def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float): |
|
self.progress = progress |
|
self.task_id = task_id |
|
self.update_period = update_period |
|
self.done = Event() |
|
|
|
self.completed = 0 |
|
super().__init__(daemon=True) |
|
|
|
def run(self) -> None: |
|
task_id = self.task_id |
|
advance = self.progress.advance |
|
update_period = self.update_period |
|
last_completed = 0 |
|
wait = self.done.wait |
|
while not wait(update_period) and self.progress.live.is_started: |
|
completed = self.completed |
|
if last_completed != completed: |
|
advance(task_id, completed - last_completed) |
|
last_completed = completed |
|
|
|
self.progress.update(self.task_id, completed=self.completed, refresh=True) |
|
|
|
def __enter__(self) -> "_TrackThread": |
|
self.start() |
|
return self |
|
|
|
def __exit__( |
|
self, |
|
exc_type: Optional[Type[BaseException]], |
|
exc_val: Optional[BaseException], |
|
exc_tb: Optional[TracebackType], |
|
) -> None: |
|
self.done.set() |
|
self.join() |
|
|
|
|
|
def track( |
|
sequence: Union[Sequence[ProgressType], Iterable[ProgressType]], |
|
description: str = "Working...", |
|
total: Optional[float] = None, |
|
completed: int = 0, |
|
auto_refresh: bool = True, |
|
console: Optional[Console] = None, |
|
transient: bool = False, |
|
get_time: Optional[Callable[[], float]] = None, |
|
refresh_per_second: float = 10, |
|
style: StyleType = "bar.back", |
|
complete_style: StyleType = "bar.complete", |
|
finished_style: StyleType = "bar.finished", |
|
pulse_style: StyleType = "bar.pulse", |
|
update_period: float = 0.1, |
|
disable: bool = False, |
|
show_speed: bool = True, |
|
) -> Iterable[ProgressType]: |
|
"""Track progress by iterating over a sequence. |
|
|
|
Args: |
|
sequence (Iterable[ProgressType]): A sequence (must support "len") you wish to iterate over. |
|
description (str, optional): Description of task show next to progress bar. Defaults to "Working". |
|
total: (float, optional): Total number of steps. Default is len(sequence). |
|
completed (int, optional): Number of steps completed so far. Defaults to 0. |
|
auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. |
|
transient: (bool, optional): Clear the progress on exit. Defaults to False. |
|
console (Console, optional): Console to write to. Default creates internal Console instance. |
|
refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. |
|
style (StyleType, optional): Style for the bar background. Defaults to "bar.back". |
|
complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". |
|
finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". |
|
pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". |
|
update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. |
|
disable (bool, optional): Disable display of progress. |
|
show_speed (bool, optional): Show speed if total isn't known. Defaults to True. |
|
Returns: |
|
Iterable[ProgressType]: An iterable of the values in the sequence. |
|
|
|
""" |
|
|
|
columns: List["ProgressColumn"] = ( |
|
[TextColumn("[progress.description]{task.description}")] if description else [] |
|
) |
|
columns.extend( |
|
( |
|
BarColumn( |
|
style=style, |
|
complete_style=complete_style, |
|
finished_style=finished_style, |
|
pulse_style=pulse_style, |
|
), |
|
TaskProgressColumn(show_speed=show_speed), |
|
TimeRemainingColumn(elapsed_when_finished=True), |
|
) |
|
) |
|
progress = Progress( |
|
*columns, |
|
auto_refresh=auto_refresh, |
|
console=console, |
|
transient=transient, |
|
get_time=get_time, |
|
refresh_per_second=refresh_per_second or 10, |
|
disable=disable, |
|
) |
|
|
|
with progress: |
|
yield from progress.track( |
|
sequence, |
|
total=total, |
|
completed=completed, |
|
description=description, |
|
update_period=update_period, |
|
) |
|
|
|
|
|
class _Reader(RawIOBase, BinaryIO): |
|
"""A reader that tracks progress while it's being read from.""" |
|
|
|
def __init__( |
|
self, |
|
handle: BinaryIO, |
|
progress: "Progress", |
|
task: TaskID, |
|
close_handle: bool = True, |
|
) -> None: |
|
self.handle = handle |
|
self.progress = progress |
|
self.task = task |
|
self.close_handle = close_handle |
|
self._closed = False |
|
|
|
def __enter__(self) -> "_Reader": |
|
self.handle.__enter__() |
|
return self |
|
|
|
def __exit__( |
|
self, |
|
exc_type: Optional[Type[BaseException]], |
|
exc_val: Optional[BaseException], |
|
exc_tb: Optional[TracebackType], |
|
) -> None: |
|
self.close() |
|
|
|
def __iter__(self) -> BinaryIO: |
|
return self |
|
|
|
def __next__(self) -> bytes: |
|
line = next(self.handle) |
|
self.progress.advance(self.task, advance=len(line)) |
|
return line |
|
|
|
@property |
|
def closed(self) -> bool: |
|
return self._closed |
|
|
|
def fileno(self) -> int: |
|
return self.handle.fileno() |
|
|
|
def isatty(self) -> bool: |
|
return self.handle.isatty() |
|
|
|
@property |
|
def mode(self) -> str: |
|
return self.handle.mode |
|
|
|
@property |
|
def name(self) -> str: |
|
return self.handle.name |
|
|
|
def readable(self) -> bool: |
|
return self.handle.readable() |
|
|
|
def seekable(self) -> bool: |
|
return self.handle.seekable() |
|
|
|
def writable(self) -> bool: |
|
return False |
|
|
|
def read(self, size: int = -1) -> bytes: |
|
block = self.handle.read(size) |
|
self.progress.advance(self.task, advance=len(block)) |
|
return block |
|
|
|
def readinto(self, b: Union[bytearray, memoryview, mmap]): |
|
n = self.handle.readinto(b) |
|
self.progress.advance(self.task, advance=n) |
|
return n |
|
|
|
def readline(self, size: int = -1) -> bytes: |
|
line = self.handle.readline(size) |
|
self.progress.advance(self.task, advance=len(line)) |
|
return line |
|
|
|
def readlines(self, hint: int = -1) -> List[bytes]: |
|
lines = self.handle.readlines(hint) |
|
self.progress.advance(self.task, advance=sum(map(len, lines))) |
|
return lines |
|
|
|
def close(self) -> None: |
|
if self.close_handle: |
|
self.handle.close() |
|
self._closed = True |
|
|
|
def seek(self, offset: int, whence: int = 0) -> int: |
|
pos = self.handle.seek(offset, whence) |
|
self.progress.update(self.task, completed=pos) |
|
return pos |
|
|
|
def tell(self) -> int: |
|
return self.handle.tell() |
|
|
|
def write(self, s: Any) -> int: |
|
raise UnsupportedOperation("write") |
|
|
|
def writelines(self, lines: Iterable[Any]) -> None: |
|
raise UnsupportedOperation("writelines") |
|
|
|
|
|
class _ReadContext(ContextManager[_I], Generic[_I]): |
|
"""A utility class to handle a context for both a reader and a progress.""" |
|
|
|
def __init__(self, progress: "Progress", reader: _I) -> None: |
|
self.progress = progress |
|
self.reader: _I = reader |
|
|
|
def __enter__(self) -> _I: |
|
self.progress.start() |
|
return self.reader.__enter__() |
|
|
|
def __exit__( |
|
self, |
|
exc_type: Optional[Type[BaseException]], |
|
exc_val: Optional[BaseException], |
|
exc_tb: Optional[TracebackType], |
|
) -> None: |
|
self.progress.stop() |
|
self.reader.__exit__(exc_type, exc_val, exc_tb) |
|
|
|
|
|
def wrap_file( |
|
file: BinaryIO, |
|
total: int, |
|
*, |
|
description: str = "Reading...", |
|
auto_refresh: bool = True, |
|
console: Optional[Console] = None, |
|
transient: bool = False, |
|
get_time: Optional[Callable[[], float]] = None, |
|
refresh_per_second: float = 10, |
|
style: StyleType = "bar.back", |
|
complete_style: StyleType = "bar.complete", |
|
finished_style: StyleType = "bar.finished", |
|
pulse_style: StyleType = "bar.pulse", |
|
disable: bool = False, |
|
) -> ContextManager[BinaryIO]: |
|
"""Read bytes from a file while tracking progress. |
|
|
|
Args: |
|
file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. |
|
total (int): Total number of bytes to read. |
|
description (str, optional): Description of task show next to progress bar. Defaults to "Reading". |
|
auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. |
|
transient: (bool, optional): Clear the progress on exit. Defaults to False. |
|
console (Console, optional): Console to write to. Default creates internal Console instance. |
|
refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. |
|
style (StyleType, optional): Style for the bar background. Defaults to "bar.back". |
|
complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". |
|
finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". |
|
pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". |
|
disable (bool, optional): Disable display of progress. |
|
Returns: |
|
ContextManager[BinaryIO]: A context manager yielding a progress reader. |
|
|
|
""" |
|
|
|
columns: List["ProgressColumn"] = ( |
|
[TextColumn("[progress.description]{task.description}")] if description else [] |
|
) |
|
columns.extend( |
|
( |
|
BarColumn( |
|
style=style, |
|
complete_style=complete_style, |
|
finished_style=finished_style, |
|
pulse_style=pulse_style, |
|
), |
|
DownloadColumn(), |
|
TimeRemainingColumn(), |
|
) |
|
) |
|
progress = Progress( |
|
*columns, |
|
auto_refresh=auto_refresh, |
|
console=console, |
|
transient=transient, |
|
get_time=get_time, |
|
refresh_per_second=refresh_per_second or 10, |
|
disable=disable, |
|
) |
|
|
|
reader = progress.wrap_file(file, total=total, description=description) |
|
return _ReadContext(progress, reader) |
|
|
|
|
|
@typing.overload |
|
def open( |
|
file: Union[str, "PathLike[str]", bytes], |
|
mode: Union[Literal["rt"], Literal["r"]], |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
*, |
|
total: Optional[int] = None, |
|
description: str = "Reading...", |
|
auto_refresh: bool = True, |
|
console: Optional[Console] = None, |
|
transient: bool = False, |
|
get_time: Optional[Callable[[], float]] = None, |
|
refresh_per_second: float = 10, |
|
style: StyleType = "bar.back", |
|
complete_style: StyleType = "bar.complete", |
|
finished_style: StyleType = "bar.finished", |
|
pulse_style: StyleType = "bar.pulse", |
|
disable: bool = False, |
|
) -> ContextManager[TextIO]: |
|
pass |
|
|
|
|
|
@typing.overload |
|
def open( |
|
file: Union[str, "PathLike[str]", bytes], |
|
mode: Literal["rb"], |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
*, |
|
total: Optional[int] = None, |
|
description: str = "Reading...", |
|
auto_refresh: bool = True, |
|
console: Optional[Console] = None, |
|
transient: bool = False, |
|
get_time: Optional[Callable[[], float]] = None, |
|
refresh_per_second: float = 10, |
|
style: StyleType = "bar.back", |
|
complete_style: StyleType = "bar.complete", |
|
finished_style: StyleType = "bar.finished", |
|
pulse_style: StyleType = "bar.pulse", |
|
disable: bool = False, |
|
) -> ContextManager[BinaryIO]: |
|
pass |
|
|
|
|
|
def open( |
|
file: Union[str, "PathLike[str]", bytes], |
|
mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
*, |
|
total: Optional[int] = None, |
|
description: str = "Reading...", |
|
auto_refresh: bool = True, |
|
console: Optional[Console] = None, |
|
transient: bool = False, |
|
get_time: Optional[Callable[[], float]] = None, |
|
refresh_per_second: float = 10, |
|
style: StyleType = "bar.back", |
|
complete_style: StyleType = "bar.complete", |
|
finished_style: StyleType = "bar.finished", |
|
pulse_style: StyleType = "bar.pulse", |
|
disable: bool = False, |
|
) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]: |
|
"""Read bytes from a file while tracking progress. |
|
|
|
Args: |
|
path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. |
|
mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". |
|
buffering (int): The buffering strategy to use, see :func:`io.open`. |
|
encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. |
|
errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. |
|
newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open` |
|
total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size. |
|
description (str, optional): Description of task show next to progress bar. Defaults to "Reading". |
|
auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. |
|
transient: (bool, optional): Clear the progress on exit. Defaults to False. |
|
console (Console, optional): Console to write to. Default creates internal Console instance. |
|
refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. |
|
style (StyleType, optional): Style for the bar background. Defaults to "bar.back". |
|
complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". |
|
finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". |
|
pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". |
|
disable (bool, optional): Disable display of progress. |
|
encoding (str, optional): The encoding to use when reading in text mode. |
|
|
|
Returns: |
|
ContextManager[BinaryIO]: A context manager yielding a progress reader. |
|
|
|
""" |
|
|
|
columns: List["ProgressColumn"] = ( |
|
[TextColumn("[progress.description]{task.description}")] if description else [] |
|
) |
|
columns.extend( |
|
( |
|
BarColumn( |
|
style=style, |
|
complete_style=complete_style, |
|
finished_style=finished_style, |
|
pulse_style=pulse_style, |
|
), |
|
DownloadColumn(), |
|
TimeRemainingColumn(), |
|
) |
|
) |
|
progress = Progress( |
|
*columns, |
|
auto_refresh=auto_refresh, |
|
console=console, |
|
transient=transient, |
|
get_time=get_time, |
|
refresh_per_second=refresh_per_second or 10, |
|
disable=disable, |
|
) |
|
|
|
reader = progress.open( |
|
file, |
|
mode=mode, |
|
buffering=buffering, |
|
encoding=encoding, |
|
errors=errors, |
|
newline=newline, |
|
total=total, |
|
description=description, |
|
) |
|
return _ReadContext(progress, reader) |
|
|
|
|
|
class ProgressColumn(ABC): |
|
"""Base class for a widget to use in progress display.""" |
|
|
|
max_refresh: Optional[float] = None |
|
|
|
def __init__(self, table_column: Optional[Column] = None) -> None: |
|
self._table_column = table_column |
|
self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {} |
|
self._update_time: Optional[float] = None |
|
|
|
def get_table_column(self) -> Column: |
|
"""Get a table column, used to build tasks table.""" |
|
return self._table_column or Column() |
|
|
|
def __call__(self, task: "Task") -> RenderableType: |
|
"""Called by the Progress object to return a renderable for the given task. |
|
|
|
Args: |
|
task (Task): An object containing information regarding the task. |
|
|
|
Returns: |
|
RenderableType: Anything renderable (including str). |
|
""" |
|
current_time = task.get_time() |
|
if self.max_refresh is not None and not task.completed: |
|
try: |
|
timestamp, renderable = self._renderable_cache[task.id] |
|
except KeyError: |
|
pass |
|
else: |
|
if timestamp + self.max_refresh > current_time: |
|
return renderable |
|
|
|
renderable = self.render(task) |
|
self._renderable_cache[task.id] = (current_time, renderable) |
|
return renderable |
|
|
|
@abstractmethod |
|
def render(self, task: "Task") -> RenderableType: |
|
"""Should return a renderable object.""" |
|
|
|
|
|
class RenderableColumn(ProgressColumn): |
|
"""A column to insert an arbitrary column. |
|
|
|
Args: |
|
renderable (RenderableType, optional): Any renderable. Defaults to empty string. |
|
""" |
|
|
|
def __init__( |
|
self, renderable: RenderableType = "", *, table_column: Optional[Column] = None |
|
): |
|
self.renderable = renderable |
|
super().__init__(table_column=table_column) |
|
|
|
def render(self, task: "Task") -> RenderableType: |
|
return self.renderable |
|
|
|
|
|
class SpinnerColumn(ProgressColumn): |
|
"""A column with a 'spinner' animation. |
|
|
|
Args: |
|
spinner_name (str, optional): Name of spinner animation. Defaults to "dots". |
|
style (StyleType, optional): Style of spinner. Defaults to "progress.spinner". |
|
speed (float, optional): Speed factor of spinner. Defaults to 1.0. |
|
finished_text (TextType, optional): Text used when task is finished. Defaults to " ". |
|
""" |
|
|
|
def __init__( |
|
self, |
|
spinner_name: str = "dots", |
|
style: Optional[StyleType] = "progress.spinner", |
|
speed: float = 1.0, |
|
finished_text: TextType = " ", |
|
table_column: Optional[Column] = None, |
|
): |
|
self.spinner = Spinner(spinner_name, style=style, speed=speed) |
|
self.finished_text = ( |
|
Text.from_markup(finished_text) |
|
if isinstance(finished_text, str) |
|
else finished_text |
|
) |
|
super().__init__(table_column=table_column) |
|
|
|
def set_spinner( |
|
self, |
|
spinner_name: str, |
|
spinner_style: Optional[StyleType] = "progress.spinner", |
|
speed: float = 1.0, |
|
) -> None: |
|
"""Set a new spinner. |
|
|
|
Args: |
|
spinner_name (str): Spinner name, see python -m rich.spinner. |
|
spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner". |
|
speed (float, optional): Speed factor of spinner. Defaults to 1.0. |
|
""" |
|
self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed) |
|
|
|
def render(self, task: "Task") -> RenderableType: |
|
text = ( |
|
self.finished_text |
|
if task.finished |
|
else self.spinner.render(task.get_time()) |
|
) |
|
return text |
|
|
|
|
|
class TextColumn(ProgressColumn): |
|
"""A column containing text.""" |
|
|
|
def __init__( |
|
self, |
|
text_format: str, |
|
style: StyleType = "none", |
|
justify: JustifyMethod = "left", |
|
markup: bool = True, |
|
highlighter: Optional[Highlighter] = None, |
|
table_column: Optional[Column] = None, |
|
) -> None: |
|
self.text_format = text_format |
|
self.justify: JustifyMethod = justify |
|
self.style = style |
|
self.markup = markup |
|
self.highlighter = highlighter |
|
super().__init__(table_column=table_column or Column(no_wrap=True)) |
|
|
|
def render(self, task: "Task") -> Text: |
|
_text = self.text_format.format(task=task) |
|
if self.markup: |
|
text = Text.from_markup(_text, style=self.style, justify=self.justify) |
|
else: |
|
text = Text(_text, style=self.style, justify=self.justify) |
|
if self.highlighter: |
|
self.highlighter.highlight(text) |
|
return text |
|
|
|
|
|
class BarColumn(ProgressColumn): |
|
"""Renders a visual progress bar. |
|
|
|
Args: |
|
bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40. |
|
style (StyleType, optional): Style for the bar background. Defaults to "bar.back". |
|
complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". |
|
finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". |
|
pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". |
|
""" |
|
|
|
def __init__( |
|
self, |
|
bar_width: Optional[int] = 40, |
|
style: StyleType = "bar.back", |
|
complete_style: StyleType = "bar.complete", |
|
finished_style: StyleType = "bar.finished", |
|
pulse_style: StyleType = "bar.pulse", |
|
table_column: Optional[Column] = None, |
|
) -> None: |
|
self.bar_width = bar_width |
|
self.style = style |
|
self.complete_style = complete_style |
|
self.finished_style = finished_style |
|
self.pulse_style = pulse_style |
|
super().__init__(table_column=table_column) |
|
|
|
def render(self, task: "Task") -> ProgressBar: |
|
"""Gets a progress bar widget for a task.""" |
|
return ProgressBar( |
|
total=max(0, task.total) if task.total is not None else None, |
|
completed=max(0, task.completed), |
|
width=None if self.bar_width is None else max(1, self.bar_width), |
|
pulse=not task.started, |
|
animation_time=task.get_time(), |
|
style=self.style, |
|
complete_style=self.complete_style, |
|
finished_style=self.finished_style, |
|
pulse_style=self.pulse_style, |
|
) |
|
|
|
|
|
class TimeElapsedColumn(ProgressColumn): |
|
"""Renders time elapsed.""" |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Show time elapsed.""" |
|
elapsed = task.finished_time if task.finished else task.elapsed |
|
if elapsed is None: |
|
return Text("-:--:--", style="progress.elapsed") |
|
delta = timedelta(seconds=max(0, int(elapsed))) |
|
return Text(str(delta), style="progress.elapsed") |
|
|
|
|
|
class TaskProgressColumn(TextColumn): |
|
"""Show task progress as a percentage. |
|
|
|
Args: |
|
text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%". |
|
text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "". |
|
style (StyleType, optional): Style of output. Defaults to "none". |
|
justify (JustifyMethod, optional): Text justification. Defaults to "left". |
|
markup (bool, optional): Enable markup. Defaults to True. |
|
highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None. |
|
table_column (Optional[Column], optional): Table Column to use. Defaults to None. |
|
show_speed (bool, optional): Show speed if total is unknown. Defaults to False. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
text_format: str = "[progress.percentage]{task.percentage:>3.0f}%", |
|
text_format_no_percentage: str = "", |
|
style: StyleType = "none", |
|
justify: JustifyMethod = "left", |
|
markup: bool = True, |
|
highlighter: Optional[Highlighter] = None, |
|
table_column: Optional[Column] = None, |
|
show_speed: bool = False, |
|
) -> None: |
|
self.text_format_no_percentage = text_format_no_percentage |
|
self.show_speed = show_speed |
|
super().__init__( |
|
text_format=text_format, |
|
style=style, |
|
justify=justify, |
|
markup=markup, |
|
highlighter=highlighter, |
|
table_column=table_column, |
|
) |
|
|
|
@classmethod |
|
def render_speed(cls, speed: Optional[float]) -> Text: |
|
"""Render the speed in iterations per second. |
|
|
|
Args: |
|
task (Task): A Task object. |
|
|
|
Returns: |
|
Text: Text object containing the task speed. |
|
""" |
|
if speed is None: |
|
return Text("", style="progress.percentage") |
|
unit, suffix = filesize.pick_unit_and_suffix( |
|
int(speed), |
|
["", "Γ10Β³", "Γ10βΆ", "Γ10βΉ", "Γ10ΒΉΒ²"], |
|
1000, |
|
) |
|
data_speed = speed / unit |
|
return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage") |
|
|
|
def render(self, task: "Task") -> Text: |
|
if task.total is None and self.show_speed: |
|
return self.render_speed(task.finished_speed or task.speed) |
|
text_format = ( |
|
self.text_format_no_percentage if task.total is None else self.text_format |
|
) |
|
_text = text_format.format(task=task) |
|
if self.markup: |
|
text = Text.from_markup(_text, style=self.style, justify=self.justify) |
|
else: |
|
text = Text(_text, style=self.style, justify=self.justify) |
|
if self.highlighter: |
|
self.highlighter.highlight(text) |
|
return text |
|
|
|
|
|
class TimeRemainingColumn(ProgressColumn): |
|
"""Renders estimated time remaining. |
|
|
|
Args: |
|
compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False. |
|
elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False. |
|
""" |
|
|
|
|
|
max_refresh = 0.5 |
|
|
|
def __init__( |
|
self, |
|
compact: bool = False, |
|
elapsed_when_finished: bool = False, |
|
table_column: Optional[Column] = None, |
|
): |
|
self.compact = compact |
|
self.elapsed_when_finished = elapsed_when_finished |
|
super().__init__(table_column=table_column) |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Show time remaining.""" |
|
if self.elapsed_when_finished and task.finished: |
|
task_time = task.finished_time |
|
style = "progress.elapsed" |
|
else: |
|
task_time = task.time_remaining |
|
style = "progress.remaining" |
|
|
|
if task.total is None: |
|
return Text("", style=style) |
|
|
|
if task_time is None: |
|
return Text("--:--" if self.compact else "-:--:--", style=style) |
|
|
|
|
|
minutes, seconds = divmod(int(task_time), 60) |
|
hours, minutes = divmod(minutes, 60) |
|
|
|
if self.compact and not hours: |
|
formatted = f"{minutes:02d}:{seconds:02d}" |
|
else: |
|
formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}" |
|
|
|
return Text(formatted, style=style) |
|
|
|
|
|
class FileSizeColumn(ProgressColumn): |
|
"""Renders completed filesize.""" |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Show data completed.""" |
|
data_size = filesize.decimal(int(task.completed)) |
|
return Text(data_size, style="progress.filesize") |
|
|
|
|
|
class TotalFileSizeColumn(ProgressColumn): |
|
"""Renders total filesize.""" |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Show data completed.""" |
|
data_size = filesize.decimal(int(task.total)) if task.total is not None else "" |
|
return Text(data_size, style="progress.filesize.total") |
|
|
|
|
|
class MofNCompleteColumn(ProgressColumn): |
|
"""Renders completed count/total, e.g. ' 10/1000'. |
|
|
|
Best for bounded tasks with int quantities. |
|
|
|
Space pads the completed count so that progress length does not change as task progresses |
|
past powers of 10. |
|
|
|
Args: |
|
separator (str, optional): Text to separate completed and total values. Defaults to "/". |
|
""" |
|
|
|
def __init__(self, separator: str = "/", table_column: Optional[Column] = None): |
|
self.separator = separator |
|
super().__init__(table_column=table_column) |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Show completed/total.""" |
|
completed = int(task.completed) |
|
total = int(task.total) if task.total is not None else "?" |
|
total_width = len(str(total)) |
|
return Text( |
|
f"{completed:{total_width}d}{self.separator}{total}", |
|
style="progress.download", |
|
) |
|
|
|
|
|
class DownloadColumn(ProgressColumn): |
|
"""Renders file size downloaded and total, e.g. '0.5/2.3 GB'. |
|
|
|
Args: |
|
binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False. |
|
""" |
|
|
|
def __init__( |
|
self, binary_units: bool = False, table_column: Optional[Column] = None |
|
) -> None: |
|
self.binary_units = binary_units |
|
super().__init__(table_column=table_column) |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Calculate common unit for completed and total.""" |
|
completed = int(task.completed) |
|
|
|
unit_and_suffix_calculation_base = ( |
|
int(task.total) if task.total is not None else completed |
|
) |
|
if self.binary_units: |
|
unit, suffix = filesize.pick_unit_and_suffix( |
|
unit_and_suffix_calculation_base, |
|
["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"], |
|
1024, |
|
) |
|
else: |
|
unit, suffix = filesize.pick_unit_and_suffix( |
|
unit_and_suffix_calculation_base, |
|
["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"], |
|
1000, |
|
) |
|
precision = 0 if unit == 1 else 1 |
|
|
|
completed_ratio = completed / unit |
|
completed_str = f"{completed_ratio:,.{precision}f}" |
|
|
|
if task.total is not None: |
|
total = int(task.total) |
|
total_ratio = total / unit |
|
total_str = f"{total_ratio:,.{precision}f}" |
|
else: |
|
total_str = "?" |
|
|
|
download_status = f"{completed_str}/{total_str} {suffix}" |
|
download_text = Text(download_status, style="progress.download") |
|
return download_text |
|
|
|
|
|
class TransferSpeedColumn(ProgressColumn): |
|
"""Renders human readable transfer speed.""" |
|
|
|
def render(self, task: "Task") -> Text: |
|
"""Show data transfer speed.""" |
|
speed = task.finished_speed or task.speed |
|
if speed is None: |
|
return Text("?", style="progress.data.speed") |
|
data_speed = filesize.decimal(int(speed)) |
|
return Text(f"{data_speed}/s", style="progress.data.speed") |
|
|
|
|
|
class ProgressSample(NamedTuple): |
|
"""Sample of progress for a given time.""" |
|
|
|
timestamp: float |
|
"""Timestamp of sample.""" |
|
completed: float |
|
"""Number of steps completed.""" |
|
|
|
|
|
@dataclass |
|
class Task: |
|
"""Information regarding a progress task. |
|
|
|
This object should be considered read-only outside of the :class:`~Progress` class. |
|
|
|
""" |
|
|
|
id: TaskID |
|
"""Task ID associated with this task (used in Progress methods).""" |
|
|
|
description: str |
|
"""str: Description of the task.""" |
|
|
|
total: Optional[float] |
|
"""Optional[float]: Total number of steps in this task.""" |
|
|
|
completed: float |
|
"""float: Number of steps completed""" |
|
|
|
_get_time: GetTimeCallable |
|
"""Callable to get the current time.""" |
|
|
|
finished_time: Optional[float] = None |
|
"""float: Time task was finished.""" |
|
|
|
visible: bool = True |
|
"""bool: Indicates if this task is visible in the progress display.""" |
|
|
|
fields: Dict[str, Any] = field(default_factory=dict) |
|
"""dict: Arbitrary fields passed in via Progress.update.""" |
|
|
|
start_time: Optional[float] = field(default=None, init=False, repr=False) |
|
"""Optional[float]: Time this task was started, or None if not started.""" |
|
|
|
stop_time: Optional[float] = field(default=None, init=False, repr=False) |
|
"""Optional[float]: Time this task was stopped, or None if not stopped.""" |
|
|
|
finished_speed: Optional[float] = None |
|
"""Optional[float]: The last speed for a finished task.""" |
|
|
|
_progress: Deque[ProgressSample] = field( |
|
default_factory=lambda: deque(maxlen=1000), init=False, repr=False |
|
) |
|
|
|
_lock: RLock = field(repr=False, default_factory=RLock) |
|
"""Thread lock.""" |
|
|
|
def get_time(self) -> float: |
|
"""float: Get the current time, in seconds.""" |
|
return self._get_time() |
|
|
|
@property |
|
def started(self) -> bool: |
|
"""bool: Check if the task as started.""" |
|
return self.start_time is not None |
|
|
|
@property |
|
def remaining(self) -> Optional[float]: |
|
"""Optional[float]: Get the number of steps remaining, if a non-None total was set.""" |
|
if self.total is None: |
|
return None |
|
return self.total - self.completed |
|
|
|
@property |
|
def elapsed(self) -> Optional[float]: |
|
"""Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started.""" |
|
if self.start_time is None: |
|
return None |
|
if self.stop_time is not None: |
|
return self.stop_time - self.start_time |
|
return self.get_time() - self.start_time |
|
|
|
@property |
|
def finished(self) -> bool: |
|
"""Check if the task has finished.""" |
|
return self.finished_time is not None |
|
|
|
@property |
|
def percentage(self) -> float: |
|
"""float: Get progress of task as a percentage. If a None total was set, returns 0""" |
|
if not self.total: |
|
return 0.0 |
|
completed = (self.completed / self.total) * 100.0 |
|
completed = min(100.0, max(0.0, completed)) |
|
return completed |
|
|
|
@property |
|
def speed(self) -> Optional[float]: |
|
"""Optional[float]: Get the estimated speed in steps per second.""" |
|
if self.start_time is None: |
|
return None |
|
with self._lock: |
|
progress = self._progress |
|
if not progress: |
|
return None |
|
total_time = progress[-1].timestamp - progress[0].timestamp |
|
if total_time == 0: |
|
return None |
|
iter_progress = iter(progress) |
|
next(iter_progress) |
|
total_completed = sum(sample.completed for sample in iter_progress) |
|
speed = total_completed / total_time |
|
return speed |
|
|
|
@property |
|
def time_remaining(self) -> Optional[float]: |
|
"""Optional[float]: Get estimated time to completion, or ``None`` if no data.""" |
|
if self.finished: |
|
return 0.0 |
|
speed = self.speed |
|
if not speed: |
|
return None |
|
remaining = self.remaining |
|
if remaining is None: |
|
return None |
|
estimate = ceil(remaining / speed) |
|
return estimate |
|
|
|
def _reset(self) -> None: |
|
"""Reset progress.""" |
|
self._progress.clear() |
|
self.finished_time = None |
|
self.finished_speed = None |
|
|
|
|
|
class Progress(JupyterMixin): |
|
"""Renders an auto-updating progress bar(s). |
|
|
|
Args: |
|
console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout. |
|
auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`. |
|
refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None. |
|
speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30. |
|
transient: (bool, optional): Clear the progress on exit. Defaults to False. |
|
redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. |
|
redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True. |
|
get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None. |
|
disable (bool, optional): Disable progress display. Defaults to False |
|
expand (bool, optional): Expand tasks table to fit width. Defaults to False. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*columns: Union[str, ProgressColumn], |
|
console: Optional[Console] = None, |
|
auto_refresh: bool = True, |
|
refresh_per_second: float = 10, |
|
speed_estimate_period: float = 30.0, |
|
transient: bool = False, |
|
redirect_stdout: bool = True, |
|
redirect_stderr: bool = True, |
|
get_time: Optional[GetTimeCallable] = None, |
|
disable: bool = False, |
|
expand: bool = False, |
|
) -> None: |
|
assert refresh_per_second > 0, "refresh_per_second must be > 0" |
|
self._lock = RLock() |
|
self.columns = columns or self.get_default_columns() |
|
self.speed_estimate_period = speed_estimate_period |
|
|
|
self.disable = disable |
|
self.expand = expand |
|
self._tasks: Dict[TaskID, Task] = {} |
|
self._task_index: TaskID = TaskID(0) |
|
self.live = Live( |
|
console=console or get_console(), |
|
auto_refresh=auto_refresh, |
|
refresh_per_second=refresh_per_second, |
|
transient=transient, |
|
redirect_stdout=redirect_stdout, |
|
redirect_stderr=redirect_stderr, |
|
get_renderable=self.get_renderable, |
|
) |
|
self.get_time = get_time or self.console.get_time |
|
self.print = self.console.print |
|
self.log = self.console.log |
|
|
|
@classmethod |
|
def get_default_columns(cls) -> Tuple[ProgressColumn, ...]: |
|
"""Get the default columns used for a new Progress instance: |
|
- a text column for the description (TextColumn) |
|
- the bar itself (BarColumn) |
|
- a text column showing completion percentage (TextColumn) |
|
- an estimated-time-remaining column (TimeRemainingColumn) |
|
If the Progress instance is created without passing a columns argument, |
|
the default columns defined here will be used. |
|
|
|
You can also create a Progress instance using custom columns before |
|
and/or after the defaults, as in this example: |
|
|
|
progress = Progress( |
|
SpinnerColumn(), |
|
*Progress.get_default_columns(), |
|
"Elapsed:", |
|
TimeElapsedColumn(), |
|
) |
|
|
|
This code shows the creation of a Progress display, containing |
|
a spinner to the left, the default columns, and a labeled elapsed |
|
time column. |
|
""" |
|
return ( |
|
TextColumn("[progress.description]{task.description}"), |
|
BarColumn(), |
|
TaskProgressColumn(), |
|
TimeRemainingColumn(), |
|
) |
|
|
|
@property |
|
def console(self) -> Console: |
|
return self.live.console |
|
|
|
@property |
|
def tasks(self) -> List[Task]: |
|
"""Get a list of Task instances.""" |
|
with self._lock: |
|
return list(self._tasks.values()) |
|
|
|
@property |
|
def task_ids(self) -> List[TaskID]: |
|
"""A list of task IDs.""" |
|
with self._lock: |
|
return list(self._tasks.keys()) |
|
|
|
@property |
|
def finished(self) -> bool: |
|
"""Check if all tasks have been completed.""" |
|
with self._lock: |
|
if not self._tasks: |
|
return True |
|
return all(task.finished for task in self._tasks.values()) |
|
|
|
def start(self) -> None: |
|
"""Start the progress display.""" |
|
if not self.disable: |
|
self.live.start(refresh=True) |
|
|
|
def stop(self) -> None: |
|
"""Stop the progress display.""" |
|
self.live.stop() |
|
if not self.console.is_interactive and not self.console.is_jupyter: |
|
self.console.print() |
|
|
|
def __enter__(self) -> Self: |
|
self.start() |
|
return self |
|
|
|
def __exit__( |
|
self, |
|
exc_type: Optional[Type[BaseException]], |
|
exc_val: Optional[BaseException], |
|
exc_tb: Optional[TracebackType], |
|
) -> None: |
|
self.stop() |
|
|
|
def track( |
|
self, |
|
sequence: Union[Iterable[ProgressType], Sequence[ProgressType]], |
|
total: Optional[float] = None, |
|
completed: int = 0, |
|
task_id: Optional[TaskID] = None, |
|
description: str = "Working...", |
|
update_period: float = 0.1, |
|
) -> Iterable[ProgressType]: |
|
"""Track progress by iterating over a sequence. |
|
|
|
Args: |
|
sequence (Sequence[ProgressType]): A sequence of values you want to iterate over and track progress. |
|
total: (float, optional): Total number of steps. Default is len(sequence). |
|
completed (int, optional): Number of steps completed so far. Defaults to 0. |
|
task_id: (TaskID): Task to track. Default is new task. |
|
description: (str, optional): Description of task, if new task is created. |
|
update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. |
|
|
|
Returns: |
|
Iterable[ProgressType]: An iterable of values taken from the provided sequence. |
|
""" |
|
if total is None: |
|
total = float(length_hint(sequence)) or None |
|
|
|
if task_id is None: |
|
task_id = self.add_task(description, total=total, completed=completed) |
|
else: |
|
self.update(task_id, total=total, completed=completed) |
|
|
|
if self.live.auto_refresh: |
|
with _TrackThread(self, task_id, update_period) as track_thread: |
|
for value in sequence: |
|
yield value |
|
track_thread.completed += 1 |
|
else: |
|
advance = self.advance |
|
refresh = self.refresh |
|
for value in sequence: |
|
yield value |
|
advance(task_id, 1) |
|
refresh() |
|
|
|
def wrap_file( |
|
self, |
|
file: BinaryIO, |
|
total: Optional[int] = None, |
|
*, |
|
task_id: Optional[TaskID] = None, |
|
description: str = "Reading...", |
|
) -> BinaryIO: |
|
"""Track progress file reading from a binary file. |
|
|
|
Args: |
|
file (BinaryIO): A file-like object opened in binary mode. |
|
total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given. |
|
task_id (TaskID): Task to track. Default is new task. |
|
description (str, optional): Description of task, if new task is created. |
|
|
|
Returns: |
|
BinaryIO: A readable file-like object in binary mode. |
|
|
|
Raises: |
|
ValueError: When no total value can be extracted from the arguments or the task. |
|
""" |
|
|
|
total_bytes: Optional[float] = None |
|
if total is not None: |
|
total_bytes = total |
|
elif task_id is not None: |
|
with self._lock: |
|
total_bytes = self._tasks[task_id].total |
|
if total_bytes is None: |
|
raise ValueError( |
|
f"unable to get the total number of bytes, please specify 'total'" |
|
) |
|
|
|
|
|
if task_id is None: |
|
task_id = self.add_task(description, total=total_bytes) |
|
else: |
|
self.update(task_id, total=total_bytes) |
|
|
|
return _Reader(file, self, task_id, close_handle=False) |
|
|
|
@typing.overload |
|
def open( |
|
self, |
|
file: Union[str, "PathLike[str]", bytes], |
|
mode: Literal["rb"], |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
*, |
|
total: Optional[int] = None, |
|
task_id: Optional[TaskID] = None, |
|
description: str = "Reading...", |
|
) -> BinaryIO: |
|
pass |
|
|
|
@typing.overload |
|
def open( |
|
self, |
|
file: Union[str, "PathLike[str]", bytes], |
|
mode: Union[Literal["r"], Literal["rt"]], |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
*, |
|
total: Optional[int] = None, |
|
task_id: Optional[TaskID] = None, |
|
description: str = "Reading...", |
|
) -> TextIO: |
|
pass |
|
|
|
def open( |
|
self, |
|
file: Union[str, "PathLike[str]", bytes], |
|
mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
*, |
|
total: Optional[int] = None, |
|
task_id: Optional[TaskID] = None, |
|
description: str = "Reading...", |
|
) -> Union[BinaryIO, TextIO]: |
|
"""Track progress while reading from a binary file. |
|
|
|
Args: |
|
path (Union[str, PathLike[str]]): The path to the file to read. |
|
mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". |
|
buffering (int): The buffering strategy to use, see :func:`io.open`. |
|
encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. |
|
errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. |
|
newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`. |
|
total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used. |
|
task_id (TaskID): Task to track. Default is new task. |
|
description (str, optional): Description of task, if new task is created. |
|
|
|
Returns: |
|
BinaryIO: A readable file-like object in binary mode. |
|
|
|
Raises: |
|
ValueError: When an invalid mode is given. |
|
""" |
|
|
|
_mode = "".join(sorted(mode, reverse=False)) |
|
if _mode not in ("br", "rt", "r"): |
|
raise ValueError(f"invalid mode {mode!r}") |
|
|
|
|
|
line_buffering = buffering == 1 |
|
if _mode == "br" and buffering == 1: |
|
warnings.warn( |
|
"line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used", |
|
RuntimeWarning, |
|
) |
|
buffering = -1 |
|
elif _mode in ("rt", "r"): |
|
if buffering == 0: |
|
raise ValueError("can't have unbuffered text I/O") |
|
elif buffering == 1: |
|
buffering = -1 |
|
|
|
|
|
if total is None: |
|
total = stat(file).st_size |
|
|
|
|
|
if task_id is None: |
|
task_id = self.add_task(description, total=total) |
|
else: |
|
self.update(task_id, total=total) |
|
|
|
|
|
handle = io.open(file, "rb", buffering=buffering) |
|
reader = _Reader(handle, self, task_id, close_handle=True) |
|
|
|
|
|
if mode in ("r", "rt"): |
|
return io.TextIOWrapper( |
|
reader, |
|
encoding=encoding, |
|
errors=errors, |
|
newline=newline, |
|
line_buffering=line_buffering, |
|
) |
|
|
|
return reader |
|
|
|
def start_task(self, task_id: TaskID) -> None: |
|
"""Start a task. |
|
|
|
Starts a task (used when calculating elapsed time). You may need to call this manually, |
|
if you called ``add_task`` with ``start=False``. |
|
|
|
Args: |
|
task_id (TaskID): ID of task. |
|
""" |
|
with self._lock: |
|
task = self._tasks[task_id] |
|
if task.start_time is None: |
|
task.start_time = self.get_time() |
|
|
|
def stop_task(self, task_id: TaskID) -> None: |
|
"""Stop a task. |
|
|
|
This will freeze the elapsed time on the task. |
|
|
|
Args: |
|
task_id (TaskID): ID of task. |
|
""" |
|
with self._lock: |
|
task = self._tasks[task_id] |
|
current_time = self.get_time() |
|
if task.start_time is None: |
|
task.start_time = current_time |
|
task.stop_time = current_time |
|
|
|
def update( |
|
self, |
|
task_id: TaskID, |
|
*, |
|
total: Optional[float] = None, |
|
completed: Optional[float] = None, |
|
advance: Optional[float] = None, |
|
description: Optional[str] = None, |
|
visible: Optional[bool] = None, |
|
refresh: bool = False, |
|
**fields: Any, |
|
) -> None: |
|
"""Update information associated with a task. |
|
|
|
Args: |
|
task_id (TaskID): Task id (returned by add_task). |
|
total (float, optional): Updates task.total if not None. |
|
completed (float, optional): Updates task.completed if not None. |
|
advance (float, optional): Add a value to task.completed if not None. |
|
description (str, optional): Change task description if not None. |
|
visible (bool, optional): Set visible flag if not None. |
|
refresh (bool): Force a refresh of progress information. Default is False. |
|
**fields (Any): Additional data fields required for rendering. |
|
""" |
|
with self._lock: |
|
task = self._tasks[task_id] |
|
completed_start = task.completed |
|
|
|
if total is not None and total != task.total: |
|
task.total = total |
|
task._reset() |
|
if advance is not None: |
|
task.completed += advance |
|
if completed is not None: |
|
task.completed = completed |
|
if description is not None: |
|
task.description = description |
|
if visible is not None: |
|
task.visible = visible |
|
task.fields.update(fields) |
|
update_completed = task.completed - completed_start |
|
|
|
current_time = self.get_time() |
|
old_sample_time = current_time - self.speed_estimate_period |
|
_progress = task._progress |
|
|
|
popleft = _progress.popleft |
|
while _progress and _progress[0].timestamp < old_sample_time: |
|
popleft() |
|
if update_completed > 0: |
|
_progress.append(ProgressSample(current_time, update_completed)) |
|
if ( |
|
task.total is not None |
|
and task.completed >= task.total |
|
and task.finished_time is None |
|
): |
|
task.finished_time = task.elapsed |
|
|
|
if refresh: |
|
self.refresh() |
|
|
|
def reset( |
|
self, |
|
task_id: TaskID, |
|
*, |
|
start: bool = True, |
|
total: Optional[float] = None, |
|
completed: int = 0, |
|
visible: Optional[bool] = None, |
|
description: Optional[str] = None, |
|
**fields: Any, |
|
) -> None: |
|
"""Reset a task so completed is 0 and the clock is reset. |
|
|
|
Args: |
|
task_id (TaskID): ID of task. |
|
start (bool, optional): Start the task after reset. Defaults to True. |
|
total (float, optional): New total steps in task, or None to use current total. Defaults to None. |
|
completed (int, optional): Number of steps completed. Defaults to 0. |
|
visible (bool, optional): Enable display of the task. Defaults to True. |
|
description (str, optional): Change task description if not None. Defaults to None. |
|
**fields (str): Additional data fields required for rendering. |
|
""" |
|
current_time = self.get_time() |
|
with self._lock: |
|
task = self._tasks[task_id] |
|
task._reset() |
|
task.start_time = current_time if start else None |
|
if total is not None: |
|
task.total = total |
|
task.completed = completed |
|
if visible is not None: |
|
task.visible = visible |
|
if fields: |
|
task.fields = fields |
|
if description is not None: |
|
task.description = description |
|
task.finished_time = None |
|
self.refresh() |
|
|
|
def advance(self, task_id: TaskID, advance: float = 1) -> None: |
|
"""Advance task by a number of steps. |
|
|
|
Args: |
|
task_id (TaskID): ID of task. |
|
advance (float): Number of steps to advance. Default is 1. |
|
""" |
|
current_time = self.get_time() |
|
with self._lock: |
|
task = self._tasks[task_id] |
|
completed_start = task.completed |
|
task.completed += advance |
|
update_completed = task.completed - completed_start |
|
old_sample_time = current_time - self.speed_estimate_period |
|
_progress = task._progress |
|
|
|
popleft = _progress.popleft |
|
while _progress and _progress[0].timestamp < old_sample_time: |
|
popleft() |
|
while len(_progress) > 1000: |
|
popleft() |
|
_progress.append(ProgressSample(current_time, update_completed)) |
|
if ( |
|
task.total is not None |
|
and task.completed >= task.total |
|
and task.finished_time is None |
|
): |
|
task.finished_time = task.elapsed |
|
task.finished_speed = task.speed |
|
|
|
def refresh(self) -> None: |
|
"""Refresh (render) the progress information.""" |
|
if not self.disable and self.live.is_started: |
|
self.live.refresh() |
|
|
|
def get_renderable(self) -> RenderableType: |
|
"""Get a renderable for the progress display.""" |
|
renderable = Group(*self.get_renderables()) |
|
return renderable |
|
|
|
def get_renderables(self) -> Iterable[RenderableType]: |
|
"""Get a number of renderables for the progress display.""" |
|
table = self.make_tasks_table(self.tasks) |
|
yield table |
|
|
|
def make_tasks_table(self, tasks: Iterable[Task]) -> Table: |
|
"""Get a table to render the Progress display. |
|
|
|
Args: |
|
tasks (Iterable[Task]): An iterable of Task instances, one per row of the table. |
|
|
|
Returns: |
|
Table: A table instance. |
|
""" |
|
table_columns = ( |
|
( |
|
Column(no_wrap=True) |
|
if isinstance(_column, str) |
|
else _column.get_table_column().copy() |
|
) |
|
for _column in self.columns |
|
) |
|
table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand) |
|
|
|
for task in tasks: |
|
if task.visible: |
|
table.add_row( |
|
*( |
|
( |
|
column.format(task=task) |
|
if isinstance(column, str) |
|
else column(task) |
|
) |
|
for column in self.columns |
|
) |
|
) |
|
return table |
|
|
|
def __rich__(self) -> RenderableType: |
|
"""Makes the Progress class itself renderable.""" |
|
with self._lock: |
|
return self.get_renderable() |
|
|
|
def add_task( |
|
self, |
|
description: str, |
|
start: bool = True, |
|
total: Optional[float] = 100.0, |
|
completed: int = 0, |
|
visible: bool = True, |
|
**fields: Any, |
|
) -> TaskID: |
|
"""Add a new 'task' to the Progress display. |
|
|
|
Args: |
|
description (str): A description of the task. |
|
start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False, |
|
you will need to call `start` manually. Defaults to True. |
|
total (float, optional): Number of total steps in the progress if known. |
|
Set to None to render a pulsing animation. Defaults to 100. |
|
completed (int, optional): Number of steps completed so far. Defaults to 0. |
|
visible (bool, optional): Enable display of the task. Defaults to True. |
|
**fields (str): Additional data fields required for rendering. |
|
|
|
Returns: |
|
TaskID: An ID you can use when calling `update`. |
|
""" |
|
with self._lock: |
|
task = Task( |
|
self._task_index, |
|
description, |
|
total, |
|
completed, |
|
visible=visible, |
|
fields=fields, |
|
_get_time=self.get_time, |
|
_lock=self._lock, |
|
) |
|
self._tasks[self._task_index] = task |
|
if start: |
|
self.start_task(self._task_index) |
|
new_task_index = self._task_index |
|
self._task_index = TaskID(int(self._task_index) + 1) |
|
self.refresh() |
|
return new_task_index |
|
|
|
def remove_task(self, task_id: TaskID) -> None: |
|
"""Delete a task if it exists. |
|
|
|
Args: |
|
task_id (TaskID): A task ID. |
|
|
|
""" |
|
with self._lock: |
|
del self._tasks[task_id] |
|
|
|
|
|
if __name__ == "__main__": |
|
import random |
|
import time |
|
|
|
from .panel import Panel |
|
from .rule import Rule |
|
from .syntax import Syntax |
|
from .table import Table |
|
|
|
syntax = Syntax( |
|
'''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]: |
|
"""Iterate and generate a tuple with a flag for last value.""" |
|
iter_values = iter(values) |
|
try: |
|
previous_value = next(iter_values) |
|
except StopIteration: |
|
return |
|
for value in iter_values: |
|
yield False, previous_value |
|
previous_value = value |
|
yield True, previous_value''', |
|
"python", |
|
line_numbers=True, |
|
) |
|
|
|
table = Table("foo", "bar", "baz") |
|
table.add_row("1", "2", "3") |
|
|
|
progress_renderables = [ |
|
"Text may be printed while the progress bars are rendering.", |
|
Panel("In fact, [i]any[/i] renderable will work"), |
|
"Such as [magenta]tables[/]...", |
|
table, |
|
"Pretty printed structures...", |
|
{"type": "example", "text": "Pretty printed"}, |
|
"Syntax...", |
|
syntax, |
|
Rule("Give it a try!"), |
|
] |
|
|
|
from itertools import cycle |
|
|
|
examples = cycle(progress_renderables) |
|
|
|
console = Console(record=True) |
|
|
|
with Progress( |
|
SpinnerColumn(), |
|
*Progress.get_default_columns(), |
|
TimeElapsedColumn(), |
|
console=console, |
|
transient=False, |
|
) as progress: |
|
task1 = progress.add_task("[red]Downloading", total=1000) |
|
task2 = progress.add_task("[green]Processing", total=1000) |
|
task3 = progress.add_task("[yellow]Thinking", total=None) |
|
|
|
while not progress.finished: |
|
progress.update(task1, advance=0.5) |
|
progress.update(task2, advance=0.3) |
|
time.sleep(0.01) |
|
if random.randint(0, 100) < 1: |
|
progress.log(next(examples)) |
|
|