File size: 6,023 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import concurrent.futures
import logging
import os
import queue
import tempfile
import threading
import time
from typing import TYPE_CHECKING, Optional, Tuple

import wandb
import wandb.util
from wandb.filesync import stats, step_checksum, step_upload
from wandb.sdk.lib.paths import LogicalPath

if TYPE_CHECKING:
    from wandb.sdk.artifacts.artifact_manifest import ArtifactManifest
    from wandb.sdk.artifacts.artifact_saver import SaveFn
    from wandb.sdk.internal import file_stream, internal_api
    from wandb.sdk.internal.settings_static import SettingsStatic


logger = logging.getLogger(__name__)


class FilePusher:
    """Parallel file upload class.

    This manages uploading multiple files in parallel. It will restart a given file's
    upload job if it receives a notification that that file has been modified. The
    finish() method will block until all events have been processed and all uploads are
    complete.
    """

    MAX_UPLOAD_JOBS = 64

    def __init__(
        self,
        api: "internal_api.Api",
        file_stream: "file_stream.FileStreamApi",
        settings: Optional["SettingsStatic"] = None,
    ) -> None:
        self._api = api

        # Temporary directory for copies we make of some file types to
        # reduce the probability that the file gets changed while we're
        # uploading it.
        self._tempdir = tempfile.TemporaryDirectory("wandb")

        self._stats = stats.Stats()

        self._incoming_queue: queue.Queue[step_checksum.Event] = queue.Queue()
        self._event_queue: queue.Queue[step_upload.Event] = queue.Queue()

        self._step_checksum = step_checksum.StepChecksum(
            self._api,
            self._tempdir,
            self._incoming_queue,
            self._event_queue,
            self._stats,
        )
        self._step_checksum.start()

        self._step_upload = step_upload.StepUpload(
            self._api,
            self._stats,
            self._event_queue,
            self.MAX_UPLOAD_JOBS,
            file_stream=file_stream,
            settings=settings,
        )
        self._step_upload.start()

        self._stats_thread_stop = threading.Event()
        if os.environ.get("WANDB_DEBUG"):
            # debug thread to monitor and report file pusher stats
            self._stats_thread = threading.Thread(
                target=self._file_pusher_stats,
                daemon=True,
                name="FPStatsThread",
            )
            self._stats_thread.start()

    def _file_pusher_stats(self) -> None:
        while not self._stats_thread_stop.is_set():
            logger.info(f"FilePusher stats: {self._stats._stats}")
            time.sleep(1)

    def get_status(self) -> Tuple[bool, stats.Summary]:
        running = self.is_alive()
        summary = self._stats.summary()
        return running, summary

    def print_status(self, prefix: bool = True) -> None:
        step = 0
        spinner_states = ["-", "\\", "|", "/"]
        stop = False
        while True:
            if not self.is_alive():
                stop = True
            summary = self._stats.summary()
            line = f" {summary.uploaded_bytes / 1048576.0:.2f}MB of {summary.total_bytes / 1048576.0:.2f}MB uploaded ({summary.deduped_bytes / 1048576.0:.2f}MB deduped)\r"
            line = spinner_states[step % 4] + line
            step += 1
            wandb.termlog(line, newline=False, prefix=prefix)
            if stop:
                break
            time.sleep(0.25)
        dedupe_fraction = (
            summary.deduped_bytes / float(summary.total_bytes)
            if summary.total_bytes > 0
            else 0
        )
        if dedupe_fraction > 0.01:
            wandb.termlog(
                "W&B sync reduced upload amount by %.1f%%             "
                % (dedupe_fraction * 100),
                prefix=prefix,
            )
        # clear progress line.
        wandb.termlog(" " * 79, prefix=prefix)

    def file_counts_by_category(self) -> stats.FileCountsByCategory:
        return self._stats.file_counts_by_category()

    def file_changed(self, save_name: LogicalPath, path: str, copy: bool = True):
        """Tell the file pusher that a file's changed and should be uploaded.

        Args:
            save_name: string logical location of the file relative to the run
                directory.
            path: actual string path of the file to upload on the filesystem.
        """
        # Tests in linux were failing because wandb-events.jsonl didn't exist
        if not os.path.exists(path) or not os.path.isfile(path):
            return
        if os.path.getsize(path) == 0:
            return

        event = step_checksum.RequestUpload(path, save_name, copy)
        self._incoming_queue.put(event)

    def store_manifest_files(
        self,
        manifest: "ArtifactManifest",
        artifact_id: str,
        save_fn: "SaveFn",
    ) -> None:
        event = step_checksum.RequestStoreManifestFiles(manifest, artifact_id, save_fn)
        self._incoming_queue.put(event)

    def commit_artifact(
        self,
        artifact_id: str,
        *,
        finalize: bool = True,
        before_commit: step_upload.PreCommitFn,
        result_future: "concurrent.futures.Future[None]",
    ):
        event = step_checksum.RequestCommitArtifact(
            artifact_id, finalize, before_commit, result_future
        )
        self._incoming_queue.put(event)

    def finish(self, callback: Optional[step_upload.OnRequestFinishFn] = None):
        logger.info("shutting down file pusher")
        self._incoming_queue.put(step_checksum.RequestFinish(callback))
        self._stats_thread_stop.set()

    def join(self) -> None:
        # NOTE: must have called finish before join
        logger.info("waiting for file pusher")
        while self.is_alive():
            time.sleep(0.5)
        self._tempdir.cleanup()

    def is_alive(self) -> bool:
        return self._step_checksum.is_alive() or self._step_upload.is_alive()