|
"""Beta versions of wandb CLI commands. |
|
|
|
These commands are experimental and may change or be removed in future versions. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import pathlib |
|
import sys |
|
|
|
import click |
|
|
|
import wandb |
|
from wandb.errors import UsageError, WandbCoreNotAvailableError |
|
from wandb.sdk.wandb_sync import _sync |
|
from wandb.util import get_core_path |
|
|
|
|
|
@click.group() |
|
def beta(): |
|
"""Beta versions of wandb CLI commands. Requires wandb-core.""" |
|
|
|
import wandb.env |
|
|
|
wandb._sentry.configure_scope(process_context="wandb_beta") |
|
|
|
if wandb.env.is_require_legacy_service(): |
|
raise UsageError( |
|
"wandb beta commands can only be used with wandb-core. " |
|
f"Please make sure that `{wandb.env._REQUIRE_LEGACY_SERVICE}` is not set." |
|
) |
|
|
|
try: |
|
get_core_path() |
|
except WandbCoreNotAvailableError as e: |
|
wandb._sentry.exception(f"using `wandb beta`. failed with {e}") |
|
click.secho( |
|
(e), |
|
fg="red", |
|
err=True, |
|
) |
|
|
|
|
|
@beta.command( |
|
name="sync", |
|
context_settings={"default_map": {}}, |
|
help="Upload a training run to W&B", |
|
) |
|
@click.pass_context |
|
@click.argument("wandb_dir", nargs=1, type=click.Path(exists=True)) |
|
@click.option("--id", "run_id", help="The run you want to upload to.") |
|
@click.option("--project", "-p", help="The project you want to upload to.") |
|
@click.option("--entity", "-e", help="The entity to scope to.") |
|
@click.option("--skip-console", is_flag=True, default=False, help="Skip console logs") |
|
@click.option("--append", is_flag=True, default=False, help="Append run") |
|
@click.option( |
|
"--include", |
|
"-i", |
|
help="Glob to include. Can be used multiple times.", |
|
multiple=True, |
|
) |
|
@click.option( |
|
"--exclude", |
|
"-e", |
|
help="Glob to exclude. Can be used multiple times.", |
|
multiple=True, |
|
) |
|
@click.option( |
|
"--mark-synced/--no-mark-synced", |
|
is_flag=True, |
|
default=True, |
|
help="Mark runs as synced", |
|
) |
|
@click.option( |
|
"--skip-synced/--no-skip-synced", |
|
is_flag=True, |
|
default=True, |
|
help="Skip synced runs", |
|
) |
|
@click.option( |
|
"--dry-run", is_flag=True, help="Perform a dry run without uploading anything." |
|
) |
|
def sync_beta( |
|
ctx, |
|
wandb_dir=None, |
|
run_id: str | None = None, |
|
project: str | None = None, |
|
entity: str | None = None, |
|
skip_console: bool = False, |
|
append: bool = False, |
|
include: str | None = None, |
|
exclude: str | None = None, |
|
skip_synced: bool = True, |
|
mark_synced: bool = True, |
|
dry_run: bool = False, |
|
) -> None: |
|
import concurrent.futures |
|
from multiprocessing import cpu_count |
|
|
|
paths = set() |
|
|
|
|
|
|
|
if include: |
|
for pattern in include: |
|
matching_dirs = list(pathlib.Path(wandb_dir).glob(pattern)) |
|
for d in matching_dirs: |
|
if not d.is_dir(): |
|
continue |
|
wandb_files = [p for p in d.glob("*.wandb") if p.is_file()] |
|
if len(wandb_files) > 1: |
|
wandb.termwarn( |
|
f"Multiple wandb files found in directory {d}, skipping" |
|
) |
|
elif len(wandb_files) == 1: |
|
paths.add(d) |
|
else: |
|
paths.update({p.parent for p in pathlib.Path(wandb_dir).glob("**/*.wandb")}) |
|
|
|
for pattern in exclude: |
|
matching_dirs = list(pathlib.Path(wandb_dir).glob(pattern)) |
|
for d in matching_dirs: |
|
if not d.is_dir(): |
|
continue |
|
if d in paths: |
|
paths.remove(d) |
|
|
|
|
|
if skip_synced: |
|
synced_paths = set() |
|
for path in paths: |
|
wandb_synced_files = [p for p in path.glob("*.wandb.synced") if p.is_file()] |
|
if len(wandb_synced_files) > 1: |
|
wandb.termwarn( |
|
f"Multiple wandb.synced files found in directory {path}, skipping" |
|
) |
|
elif len(wandb_synced_files) == 1: |
|
synced_paths.add(path) |
|
paths -= synced_paths |
|
|
|
if run_id and len(paths) > 1: |
|
|
|
click.echo("id can only be set for a single run.", err=True) |
|
sys.exit(1) |
|
|
|
if not paths: |
|
click.echo("No runs to sync.") |
|
return |
|
|
|
click.echo("Found runs:") |
|
for path in paths: |
|
click.echo(f" {path}") |
|
|
|
if dry_run: |
|
return |
|
|
|
wandb.setup() |
|
|
|
|
|
with concurrent.futures.ProcessPoolExecutor( |
|
max_workers=min(len(paths), cpu_count()) |
|
) as executor: |
|
futures = [] |
|
for path in paths: |
|
|
|
wandb_file = [p for p in path.glob("*.wandb") if p.is_file()][0] |
|
future = executor.submit( |
|
_sync, |
|
wandb_file, |
|
run_id=run_id, |
|
project=project, |
|
entity=entity, |
|
skip_console=skip_console, |
|
append=append, |
|
mark_synced=mark_synced, |
|
) |
|
futures.append(future) |
|
|
|
|
|
for _ in concurrent.futures.as_completed(futures): |
|
pass |
|
|