File size: 5,438 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
"""Beta versions of wandb CLI commands.
These commands are experimental and may change or be removed in future versions.
"""
from __future__ import annotations
import pathlib
import sys
import click
import wandb
from wandb.errors import UsageError, WandbCoreNotAvailableError
from wandb.sdk.wandb_sync import _sync
from wandb.util import get_core_path
@click.group()
def beta():
"""Beta versions of wandb CLI commands. Requires wandb-core."""
# this is the future that requires wandb-core!
import wandb.env
wandb._sentry.configure_scope(process_context="wandb_beta")
if wandb.env.is_require_legacy_service():
raise UsageError(
"wandb beta commands can only be used with wandb-core. "
f"Please make sure that `{wandb.env._REQUIRE_LEGACY_SERVICE}` is not set."
)
try:
get_core_path()
except WandbCoreNotAvailableError as e:
wandb._sentry.exception(f"using `wandb beta`. failed with {e}")
click.secho(
(e),
fg="red",
err=True,
)
@beta.command(
name="sync",
context_settings={"default_map": {}},
help="Upload a training run to W&B",
)
@click.pass_context
@click.argument("wandb_dir", nargs=1, type=click.Path(exists=True))
@click.option("--id", "run_id", help="The run you want to upload to.")
@click.option("--project", "-p", help="The project you want to upload to.")
@click.option("--entity", "-e", help="The entity to scope to.")
@click.option("--skip-console", is_flag=True, default=False, help="Skip console logs")
@click.option("--append", is_flag=True, default=False, help="Append run")
@click.option(
"--include",
"-i",
help="Glob to include. Can be used multiple times.",
multiple=True,
)
@click.option(
"--exclude",
"-e",
help="Glob to exclude. Can be used multiple times.",
multiple=True,
)
@click.option(
"--mark-synced/--no-mark-synced",
is_flag=True,
default=True,
help="Mark runs as synced",
)
@click.option(
"--skip-synced/--no-skip-synced",
is_flag=True,
default=True,
help="Skip synced runs",
)
@click.option(
"--dry-run", is_flag=True, help="Perform a dry run without uploading anything."
)
def sync_beta( # noqa: C901
ctx,
wandb_dir=None,
run_id: str | None = None,
project: str | None = None,
entity: str | None = None,
skip_console: bool = False,
append: bool = False,
include: str | None = None,
exclude: str | None = None,
skip_synced: bool = True,
mark_synced: bool = True,
dry_run: bool = False,
) -> None:
import concurrent.futures
from multiprocessing import cpu_count
paths = set()
# TODO: test file discovery logic
# include and exclude globs are evaluated relative to the provided base_path
if include:
for pattern in include:
matching_dirs = list(pathlib.Path(wandb_dir).glob(pattern))
for d in matching_dirs:
if not d.is_dir():
continue
wandb_files = [p for p in d.glob("*.wandb") if p.is_file()]
if len(wandb_files) > 1:
wandb.termwarn(
f"Multiple wandb files found in directory {d}, skipping"
)
elif len(wandb_files) == 1:
paths.add(d)
else:
paths.update({p.parent for p in pathlib.Path(wandb_dir).glob("**/*.wandb")})
for pattern in exclude:
matching_dirs = list(pathlib.Path(wandb_dir).glob(pattern))
for d in matching_dirs:
if not d.is_dir():
continue
if d in paths:
paths.remove(d)
# remove paths that are already synced, if requested
if skip_synced:
synced_paths = set()
for path in paths:
wandb_synced_files = [p for p in path.glob("*.wandb.synced") if p.is_file()]
if len(wandb_synced_files) > 1:
wandb.termwarn(
f"Multiple wandb.synced files found in directory {path}, skipping"
)
elif len(wandb_synced_files) == 1:
synced_paths.add(path)
paths -= synced_paths
if run_id and len(paths) > 1:
# TODO: handle this more gracefully
click.echo("id can only be set for a single run.", err=True)
sys.exit(1)
if not paths:
click.echo("No runs to sync.")
return
click.echo("Found runs:")
for path in paths:
click.echo(f" {path}")
if dry_run:
return
wandb.setup()
# TODO: make it thread-safe in the Rust code
with concurrent.futures.ProcessPoolExecutor(
max_workers=min(len(paths), cpu_count())
) as executor:
futures = []
for path in paths:
# we already know there is only one wandb file in the directory
wandb_file = [p for p in path.glob("*.wandb") if p.is_file()][0]
future = executor.submit(
_sync,
wandb_file,
run_id=run_id,
project=project,
entity=entity,
skip_console=skip_console,
append=append,
mark_synced=mark_synced,
)
futures.append(future)
# Wait for tasks to complete
for _ in concurrent.futures.as_completed(futures):
pass
|