import concurrent.futures import logging import os import queue import tempfile import threading import time from typing import TYPE_CHECKING, Optional, Tuple import wandb import wandb.util from wandb.filesync import stats, step_checksum, step_upload from wandb.sdk.lib.paths import LogicalPath if TYPE_CHECKING: from wandb.sdk.artifacts.artifact_manifest import ArtifactManifest from wandb.sdk.artifacts.artifact_saver import SaveFn from wandb.sdk.internal import file_stream, internal_api from wandb.sdk.internal.settings_static import SettingsStatic logger = logging.getLogger(__name__) class FilePusher: """Parallel file upload class. This manages uploading multiple files in parallel. It will restart a given file's upload job if it receives a notification that that file has been modified. The finish() method will block until all events have been processed and all uploads are complete. """ MAX_UPLOAD_JOBS = 64 def __init__( self, api: "internal_api.Api", file_stream: "file_stream.FileStreamApi", settings: Optional["SettingsStatic"] = None, ) -> None: self._api = api # Temporary directory for copies we make of some file types to # reduce the probability that the file gets changed while we're # uploading it. self._tempdir = tempfile.TemporaryDirectory("wandb") self._stats = stats.Stats() self._incoming_queue: queue.Queue[step_checksum.Event] = queue.Queue() self._event_queue: queue.Queue[step_upload.Event] = queue.Queue() self._step_checksum = step_checksum.StepChecksum( self._api, self._tempdir, self._incoming_queue, self._event_queue, self._stats, ) self._step_checksum.start() self._step_upload = step_upload.StepUpload( self._api, self._stats, self._event_queue, self.MAX_UPLOAD_JOBS, file_stream=file_stream, settings=settings, ) self._step_upload.start() self._stats_thread_stop = threading.Event() if os.environ.get("WANDB_DEBUG"): # debug thread to monitor and report file pusher stats self._stats_thread = threading.Thread( target=self._file_pusher_stats, daemon=True, name="FPStatsThread", ) self._stats_thread.start() def _file_pusher_stats(self) -> None: while not self._stats_thread_stop.is_set(): logger.info(f"FilePusher stats: {self._stats._stats}") time.sleep(1) def get_status(self) -> Tuple[bool, stats.Summary]: running = self.is_alive() summary = self._stats.summary() return running, summary def print_status(self, prefix: bool = True) -> None: step = 0 spinner_states = ["-", "\\", "|", "/"] stop = False while True: if not self.is_alive(): stop = True summary = self._stats.summary() line = f" {summary.uploaded_bytes / 1048576.0:.2f}MB of {summary.total_bytes / 1048576.0:.2f}MB uploaded ({summary.deduped_bytes / 1048576.0:.2f}MB deduped)\r" line = spinner_states[step % 4] + line step += 1 wandb.termlog(line, newline=False, prefix=prefix) if stop: break time.sleep(0.25) dedupe_fraction = ( summary.deduped_bytes / float(summary.total_bytes) if summary.total_bytes > 0 else 0 ) if dedupe_fraction > 0.01: wandb.termlog( "W&B sync reduced upload amount by %.1f%% " % (dedupe_fraction * 100), prefix=prefix, ) # clear progress line. wandb.termlog(" " * 79, prefix=prefix) def file_counts_by_category(self) -> stats.FileCountsByCategory: return self._stats.file_counts_by_category() def file_changed(self, save_name: LogicalPath, path: str, copy: bool = True): """Tell the file pusher that a file's changed and should be uploaded. Args: save_name: string logical location of the file relative to the run directory. path: actual string path of the file to upload on the filesystem. """ # Tests in linux were failing because wandb-events.jsonl didn't exist if not os.path.exists(path) or not os.path.isfile(path): return if os.path.getsize(path) == 0: return event = step_checksum.RequestUpload(path, save_name, copy) self._incoming_queue.put(event) def store_manifest_files( self, manifest: "ArtifactManifest", artifact_id: str, save_fn: "SaveFn", ) -> None: event = step_checksum.RequestStoreManifestFiles(manifest, artifact_id, save_fn) self._incoming_queue.put(event) def commit_artifact( self, artifact_id: str, *, finalize: bool = True, before_commit: step_upload.PreCommitFn, result_future: "concurrent.futures.Future[None]", ): event = step_checksum.RequestCommitArtifact( artifact_id, finalize, before_commit, result_future ) self._incoming_queue.put(event) def finish(self, callback: Optional[step_upload.OnRequestFinishFn] = None): logger.info("shutting down file pusher") self._incoming_queue.put(step_checksum.RequestFinish(callback)) self._stats_thread_stop.set() def join(self) -> None: # NOTE: must have called finish before join logger.info("waiting for file pusher") while self.is_alive(): time.sleep(0.5) self._tempdir.cleanup() def is_alive(self) -> bool: return self._step_checksum.is_alive() or self._step_upload.is_alive()