File size: 5,438 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""Beta versions of wandb CLI commands.

These commands are experimental and may change or be removed in future versions.
"""

from __future__ import annotations

import pathlib
import sys

import click

import wandb
from wandb.errors import UsageError, WandbCoreNotAvailableError
from wandb.sdk.wandb_sync import _sync
from wandb.util import get_core_path


@click.group()
def beta():
    """Beta versions of wandb CLI commands. Requires wandb-core."""
    # this is the future that requires wandb-core!
    import wandb.env

    wandb._sentry.configure_scope(process_context="wandb_beta")

    if wandb.env.is_require_legacy_service():
        raise UsageError(
            "wandb beta commands can only be used with wandb-core. "
            f"Please make sure that `{wandb.env._REQUIRE_LEGACY_SERVICE}` is not set."
        )

    try:
        get_core_path()
    except WandbCoreNotAvailableError as e:
        wandb._sentry.exception(f"using `wandb beta`. failed with {e}")
        click.secho(
            (e),
            fg="red",
            err=True,
        )


@beta.command(
    name="sync",
    context_settings={"default_map": {}},
    help="Upload a training run to W&B",
)
@click.pass_context
@click.argument("wandb_dir", nargs=1, type=click.Path(exists=True))
@click.option("--id", "run_id", help="The run you want to upload to.")
@click.option("--project", "-p", help="The project you want to upload to.")
@click.option("--entity", "-e", help="The entity to scope to.")
@click.option("--skip-console", is_flag=True, default=False, help="Skip console logs")
@click.option("--append", is_flag=True, default=False, help="Append run")
@click.option(
    "--include",
    "-i",
    help="Glob to include. Can be used multiple times.",
    multiple=True,
)
@click.option(
    "--exclude",
    "-e",
    help="Glob to exclude. Can be used multiple times.",
    multiple=True,
)
@click.option(
    "--mark-synced/--no-mark-synced",
    is_flag=True,
    default=True,
    help="Mark runs as synced",
)
@click.option(
    "--skip-synced/--no-skip-synced",
    is_flag=True,
    default=True,
    help="Skip synced runs",
)
@click.option(
    "--dry-run", is_flag=True, help="Perform a dry run without uploading anything."
)
def sync_beta(  # noqa: C901
    ctx,
    wandb_dir=None,
    run_id: str | None = None,
    project: str | None = None,
    entity: str | None = None,
    skip_console: bool = False,
    append: bool = False,
    include: str | None = None,
    exclude: str | None = None,
    skip_synced: bool = True,
    mark_synced: bool = True,
    dry_run: bool = False,
) -> None:
    import concurrent.futures
    from multiprocessing import cpu_count

    paths = set()

    # TODO: test file discovery logic
    # include and exclude globs are evaluated relative to the provided base_path
    if include:
        for pattern in include:
            matching_dirs = list(pathlib.Path(wandb_dir).glob(pattern))
            for d in matching_dirs:
                if not d.is_dir():
                    continue
                wandb_files = [p for p in d.glob("*.wandb") if p.is_file()]
                if len(wandb_files) > 1:
                    wandb.termwarn(
                        f"Multiple wandb files found in directory {d}, skipping"
                    )
                elif len(wandb_files) == 1:
                    paths.add(d)
    else:
        paths.update({p.parent for p in pathlib.Path(wandb_dir).glob("**/*.wandb")})

    for pattern in exclude:
        matching_dirs = list(pathlib.Path(wandb_dir).glob(pattern))
        for d in matching_dirs:
            if not d.is_dir():
                continue
            if d in paths:
                paths.remove(d)

    # remove paths that are already synced, if requested
    if skip_synced:
        synced_paths = set()
        for path in paths:
            wandb_synced_files = [p for p in path.glob("*.wandb.synced") if p.is_file()]
            if len(wandb_synced_files) > 1:
                wandb.termwarn(
                    f"Multiple wandb.synced files found in directory {path}, skipping"
                )
            elif len(wandb_synced_files) == 1:
                synced_paths.add(path)
        paths -= synced_paths

    if run_id and len(paths) > 1:
        # TODO: handle this more gracefully
        click.echo("id can only be set for a single run.", err=True)
        sys.exit(1)

    if not paths:
        click.echo("No runs to sync.")
        return

    click.echo("Found runs:")
    for path in paths:
        click.echo(f"  {path}")

    if dry_run:
        return

    wandb.setup()

    # TODO: make it thread-safe in the Rust code
    with concurrent.futures.ProcessPoolExecutor(
        max_workers=min(len(paths), cpu_count())
    ) as executor:
        futures = []
        for path in paths:
            # we already know there is only one wandb file in the directory
            wandb_file = [p for p in path.glob("*.wandb") if p.is_file()][0]
            future = executor.submit(
                _sync,
                wandb_file,
                run_id=run_id,
                project=project,
                entity=entity,
                skip_console=skip_console,
                append=append,
                mark_synced=mark_synced,
            )
            futures.append(future)

        # Wait for tasks to complete
        for _ in concurrent.futures.as_completed(futures):
            pass