jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
"""job builder."""
import json
import logging
import os
import re
import sys
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Literal,
Optional,
Tuple,
TypedDict,
Union,
)
import wandb
from wandb.sdk.artifacts._internal_artifact import InternalArtifact
from wandb.sdk.artifacts.artifact import Artifact
from wandb.sdk.data_types._dtypes import TypeRegistry
from wandb.sdk.internal.internal_api import Api
from wandb.sdk.lib.filenames import DIFF_FNAME, METADATA_FNAME, REQUIREMENTS_FNAME
from wandb.util import make_artifact_name_safe
from .settings_static import SettingsStatic
_logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from wandb.proto.wandb_internal_pb2 import ArtifactRecord
FROZEN_REQUIREMENTS_FNAME = "requirements.frozen.txt"
JOB_FNAME = "wandb-job.json"
JOB_ARTIFACT_TYPE = "job"
LOG_LEVEL = Literal["log", "warn", "error"]
class Version:
def __init__(self, major: int, minor: int, patch: int):
self._major = major
self._minor = minor
self._patch = patch
def __repr__(self) -> str:
return f"{self._major}.{self._minor}.{self._patch}"
def __lt__(self, other: "Version") -> bool:
if self._major < other._major:
return True
elif self._major == other._major:
if self._minor < other._minor:
return True
elif self._minor == other._minor:
if self._patch < other._patch:
return True
return False
def __eq__(self, other: object) -> bool:
if not isinstance(other, Version):
return NotImplemented
return (
self._major == other._major
and self._minor == other._minor
and self._patch == other._patch
)
# Minimum supported wandb version for keys in the source dict of wandb-job.json
SOURCE_KEYS_MIN_SUPPORTED_VERSION = {
"dockerfile": Version(0, 17, 0),
"build_context": Version(0, 17, 0),
}
class GitInfo(TypedDict):
remote: str
commit: str
class GitSourceDict(TypedDict):
git: GitInfo
entrypoint: List[str]
notebook: bool
build_context: Optional[str]
dockerfile: Optional[str]
class ArtifactSourceDict(TypedDict):
artifact: str
entrypoint: List[str]
notebook: bool
build_context: Optional[str]
dockerfile: Optional[str]
class ImageSourceDict(TypedDict):
image: str
class JobSourceDict(TypedDict, total=False):
_version: str
source_type: str
source: Union[GitSourceDict, ArtifactSourceDict, ImageSourceDict]
input_types: Dict[str, Any]
output_types: Dict[str, Any]
runtime: Optional[str]
class ArtifactInfoForJob(TypedDict):
id: str
name: str
def get_min_supported_for_source_dict(
source: Union[GitSourceDict, ArtifactSourceDict, ImageSourceDict],
) -> Optional[Version]:
"""Get the minimum supported wandb version the source dict of wandb-job.json."""
min_seen = None
for key in source:
new_ver = SOURCE_KEYS_MIN_SUPPORTED_VERSION.get(key)
if new_ver:
if min_seen is None or new_ver < min_seen:
min_seen = new_ver
return min_seen
class JobBuilder:
_settings: SettingsStatic
_metadatafile_path: Optional[str]
_requirements_path: Optional[str]
_config: Optional[Dict[str, Any]]
_summary: Optional[Dict[str, Any]]
_logged_code_artifact: Optional[ArtifactInfoForJob]
_disable: bool
_partial_source_id: Optional[str] # Partial job source artifact id.
_aliases: List[str]
_job_seq_id: Optional[str]
_job_version_alias: Optional[str]
_is_notebook_run: bool
_verbose: bool
def __init__(self, settings: SettingsStatic, verbose: bool = False):
self._settings = settings
self._metadatafile_path = None
self._requirements_path = None
self._config = None
self._summary = None
self._logged_code_artifact = None
self._job_seq_id = None
self._job_version_alias = None
self._disable = settings.disable_job_creation or settings.x_disable_machine_info
self._partial_source_id = None
self._aliases = []
self._source_type: Optional[Literal["repo", "artifact", "image"]] = (
settings.job_source # type: ignore[assignment]
)
self._is_notebook_run = self._get_is_notebook_run()
self._verbose = verbose
self._partial = False
def set_config(self, config: Dict[str, Any]) -> None:
self._config = config
def set_summary(self, summary: Dict[str, Any]) -> None:
self._summary = summary
@property
def disable(self) -> bool:
return self._disable
@disable.setter
def disable(self, val: bool) -> None:
self._disable = val
@property
def input_types(self) -> Dict[str, Any]:
return TypeRegistry.type_of(self._config).to_json()
@property
def output_types(self) -> Dict[str, Any]:
return TypeRegistry.type_of(self._summary).to_json()
def set_partial_source_id(self, source_id: str) -> None:
self._partial_source_id = source_id
def _handle_server_artifact(
self, res: Optional[Dict], artifact: "ArtifactRecord"
) -> None:
if artifact.type == "job" and res is not None:
try:
if res["artifactSequence"]["latestArtifact"] is None:
self._job_version_alias = "v0"
elif res["artifactSequence"]["latestArtifact"]["id"] == res["id"]:
self._job_version_alias = (
f"v{res['artifactSequence']['latestArtifact']['versionIndex']}"
)
else:
self._job_version_alias = f"v{res['artifactSequence']['latestArtifact']['versionIndex'] + 1}"
self._job_seq_id = res["artifactSequence"]["id"]
except KeyError as e:
_logger.info(f"Malformed response from ArtifactSaver.save {e}")
if artifact.type == "code" and res is not None:
self._logged_code_artifact = ArtifactInfoForJob(
{
"id": res["id"],
"name": artifact.name,
}
)
def _build_repo_job_source(
self,
program_relpath: str,
metadata: Dict[str, Any],
) -> Tuple[Optional[GitSourceDict], Optional[str]]:
git_info: Dict[str, str] = metadata.get("git", {})
remote = git_info.get("remote")
commit = git_info.get("commit")
root = metadata.get("root")
assert remote is not None
assert commit is not None
if self._is_notebook_run:
if not os.path.exists(
os.path.join(os.getcwd(), os.path.basename(program_relpath))
):
return None, None
if root is None or self._settings.x_jupyter_root is None:
_logger.info("target path does not exist, exiting")
return None, None
assert self._settings.x_jupyter_root is not None
# git notebooks set the root to the git root,
# jupyter_root contains the path where the jupyter notebook was started
# program_relpath contains the path from jupyter_root to the file
# full program path here is actually the relpath from the program to the git root
full_program_path = os.path.join(
os.path.relpath(str(self._settings.x_jupyter_root), root),
program_relpath,
)
full_program_path = os.path.normpath(full_program_path)
# if the notebook server is started above the git repo need to clear all the ..s
if full_program_path.startswith(".."):
split_path = full_program_path.split("/")
count_dots = 0
for p in split_path:
if p == "..":
count_dots += 1
full_program_path = "/".join(split_path[2 * count_dots :])
else:
full_program_path = program_relpath
entrypoint = self._get_entrypoint(full_program_path, metadata)
# TODO: update executable to a method that supports pex
source: GitSourceDict = {
"git": {"remote": remote, "commit": commit},
"entrypoint": entrypoint,
"notebook": self._is_notebook_run,
"build_context": metadata.get("build_context"),
"dockerfile": metadata.get("dockerfile"),
}
name = self._make_job_name(f"{remote}_{program_relpath}")
return source, name
def _log_if_verbose(self, message: str, level: LOG_LEVEL) -> None:
log_func: Optional[Union[Callable[[Any], None], Callable[[Any], None]]] = None
if level == "log":
_logger.info(message)
log_func = wandb.termlog
elif level == "warn":
_logger.warning(message)
log_func = wandb.termwarn
elif level == "error":
_logger.error(message)
log_func = wandb.termerror
if self._verbose and log_func is not None:
log_func(message)
def _build_artifact_job_source(
self,
program_relpath: str,
metadata: Dict[str, Any],
) -> Tuple[Optional[ArtifactSourceDict], Optional[str]]:
assert isinstance(self._logged_code_artifact, dict)
# TODO: should we just always exit early if the path doesn't exist?
if self._is_notebook_run and not self._is_colab_run():
full_program_relpath = os.path.relpath(program_relpath, os.getcwd())
# if the resolved path doesn't exist, then we shouldn't make a job because it will fail
if not os.path.exists(full_program_relpath):
# when users call log code in a notebook the code artifact starts
# at the directory the notebook is in instead of the jupyter core
if not os.path.exists(os.path.basename(program_relpath)):
_logger.info("target path does not exist, exiting")
self._log_if_verbose(
"No program path found when generating artifact job source for a non-colab notebook run. See https://docs.wandb.ai/guides/launch/create-job",
"warn",
)
return None, None
full_program_relpath = os.path.basename(program_relpath)
else:
full_program_relpath = program_relpath
entrypoint = self._get_entrypoint(full_program_relpath, metadata)
# TODO: update executable to a method that supports pex
source: ArtifactSourceDict = {
"entrypoint": entrypoint,
"notebook": self._is_notebook_run,
"artifact": f"wandb-artifact://_id/{self._logged_code_artifact['id']}",
"build_context": metadata.get("build_context"),
"dockerfile": metadata.get("dockerfile"),
}
artifact_basename, *_ = self._logged_code_artifact["name"].split(":")
name = self._make_job_name(artifact_basename)
return source, name
def _build_image_job_source(
self, metadata: Dict[str, Any]
) -> Tuple[ImageSourceDict, str]:
image_name = metadata.get("docker")
assert isinstance(image_name, str)
raw_image_name = image_name
if ":" in image_name:
tag = image_name.split(":")[-1]
# if tag looks properly formatted, assume its a tag
# regex: alphanumeric and "_" "-" "."
if re.fullmatch(r"([a-zA-Z0-9_\-\.]+)", tag):
raw_image_name = raw_image_name.replace(f":{tag}", "")
self._aliases += [tag]
source: ImageSourceDict = {
"image": image_name,
}
name = self._make_job_name(raw_image_name)
return source, name
def _make_job_name(self, input_str: str) -> str:
"""Use job name from settings if provided, else use programmatic name."""
if self._settings.job_name:
return self._settings.job_name
return make_artifact_name_safe(f"job-{input_str}")
def _get_entrypoint(
self,
program_relpath: str,
metadata: Dict[str, Any],
) -> List[str]:
# if building a partial job from CLI, overwrite entrypoint and notebook
# should already be in metadata from create_job
if self._partial:
if metadata.get("entrypoint"):
entrypoint: List[str] = metadata["entrypoint"]
return entrypoint
# job is being built from a run
entrypoint = [os.path.basename(sys.executable), program_relpath]
return entrypoint
def _get_is_notebook_run(self) -> bool:
return hasattr(self._settings, "_jupyter") and bool(self._settings._jupyter)
def _is_colab_run(self) -> bool:
return hasattr(self._settings, "_colab") and bool(self._settings._colab)
def _build_job_source(
self,
source_type: str,
program_relpath: Optional[str],
metadata: Dict[str, Any],
) -> Tuple[
Union[GitSourceDict, ArtifactSourceDict, ImageSourceDict, None],
Optional[str],
]:
"""Construct a job source dict and name from the current run.
Args:
source_type (str): The type of source to build the job from. One of
"repo", "artifact", or "image".
"""
source: Union[
GitSourceDict,
ArtifactSourceDict,
ImageSourceDict,
None,
] = None
if source_type == "repo":
source, name = self._build_repo_job_source(
program_relpath or "",
metadata,
)
elif source_type == "artifact":
source, name = self._build_artifact_job_source(
program_relpath or "",
metadata,
)
elif source_type == "image" and self._has_image_job_ingredients(metadata):
source, name = self._build_image_job_source(metadata)
else:
source = None
if source is None:
if source_type:
self._log_if_verbose(
f"Source type is set to '{source_type}' but some required information is missing "
"from the environment. A job will not be created from this run. See "
"https://docs.wandb.ai/guides/launch/create-job",
"warn",
)
return None, None
return source, name
def build(
self,
api: Api,
build_context: Optional[str] = None,
dockerfile: Optional[str] = None,
base_image: Optional[str] = None,
) -> Optional[Artifact]:
"""Build a job artifact from the current run.
Args:
api (Api): The API object to use to create the job artifact.
build_context (Optional[str]): Path within the job source code to
the image build context. Saved as part of the job for future
builds.
dockerfile (Optional[str]): Path within the build context the
Dockerfile. Saved as part of the job for future builds.
base_image (Optional[str]): The base image used to run the job code.
Returns:
Optional[Artifact]: The job artifact if it was successfully built,
otherwise None.
"""
_logger.info("Attempting to build job artifact")
# If a partial job was used, write the input/output types to the metadata
# rather than building a new job version.
if self._partial_source_id is not None:
new_metadata = {
"input_types": {"@wandb.config": self.input_types},
"output_types": self.output_types,
}
api.update_artifact_metadata(
self._partial_source_id,
new_metadata,
)
return None
if not os.path.exists(
os.path.join(self._settings.files_dir, REQUIREMENTS_FNAME)
):
self._log_if_verbose(
"No requirements.txt found, not creating job artifact. See https://docs.wandb.ai/guides/launch/create-job",
"warn",
)
return None
metadata = self._handle_metadata_file()
if metadata is None:
self._log_if_verbose(
f"Ensure read and write access to run files dir: {self._settings.files_dir}, control this via the WANDB_DIR env var. See https://docs.wandb.ai/guides/track/environment-variables",
"warn",
)
return None
runtime: Optional[str] = metadata.get("python")
# can't build a job without a python version
if runtime is None:
self._log_if_verbose(
"No python version found in metadata, not creating job artifact. "
"See https://docs.wandb.ai/guides/launch/create-job",
"warn",
)
return None
input_types = TypeRegistry.type_of(self._config).to_json()
output_types = TypeRegistry.type_of(self._summary).to_json()
name: Optional[str] = None
source_info: Optional[JobSourceDict] = None
# configure job from environment
source_type = self._get_source_type(metadata)
if not source_type:
# if source_type is None, then we don't have enough information to build a job
# if the user intended to create a job, warn.
if (
self._settings.job_name
or self._settings.job_source
or self._source_type
):
self._log_if_verbose(
"No source type found, not creating job artifact", "warn"
)
return None
program_relpath = self._get_program_relpath(source_type, metadata)
if not self._partial and source_type != "image" and not program_relpath:
self._log_if_verbose(
"No program path found, not creating job artifact. "
"See https://docs.wandb.ai/guides/launch/create-job",
"warn",
)
return None
source, name = self._build_job_source(
source_type,
program_relpath,
metadata,
)
if source is None:
return None
if build_context:
source["build_context"] = build_context # type: ignore[typeddict-item]
if dockerfile:
source["dockerfile"] = dockerfile # type: ignore[typeddict-item]
if base_image:
source["base_image"] = base_image # type: ignore[typeddict-item]
# Pop any keys that are initialized to None. The current TypedDict
# system for source dicts requires all keys to be present, but we
# don't want to include keys that are None in the final dict.
for key in list(source.keys()):
if source[key] is None: # type: ignore[literal-required]
source.pop(key) # type: ignore[literal-require,misc]
source_info = {
"_version": str(get_min_supported_for_source_dict(source) or "v0"),
"source_type": source_type,
"source": source,
"input_types": input_types,
"output_types": output_types,
"runtime": runtime,
}
assert source_info is not None
assert name is not None
artifact = InternalArtifact(name, JOB_ARTIFACT_TYPE)
_logger.info("adding wandb-job metadata file")
with artifact.new_file("wandb-job.json") as f:
f.write(json.dumps(source_info, indent=4))
artifact.add_file(
os.path.join(self._settings.files_dir, REQUIREMENTS_FNAME),
name=FROZEN_REQUIREMENTS_FNAME,
)
if source_type == "repo":
# add diff
if os.path.exists(os.path.join(self._settings.files_dir, DIFF_FNAME)):
artifact.add_file(
os.path.join(self._settings.files_dir, DIFF_FNAME),
name=DIFF_FNAME,
)
return artifact
def _get_source_type(self, metadata: Dict[str, Any]) -> Optional[str]:
if self._source_type:
return self._source_type
if self._has_git_job_ingredients(metadata):
_logger.info("is repo sourced job")
return "repo"
if self._has_artifact_job_ingredients():
_logger.info("is artifact sourced job")
return "artifact"
if self._has_image_job_ingredients(metadata):
_logger.info("is image sourced job")
return "image"
_logger.info("no source found")
return None
def _get_program_relpath(
self, source_type: str, metadata: Dict[str, Any]
) -> Optional[str]:
if self._is_notebook_run:
_logger.info("run is notebook based run")
program = metadata.get("program")
if not program:
self._log_if_verbose(
"Notebook 'program' path not found in metadata. See https://docs.wandb.ai/guides/launch/create-job",
"warn",
)
return program
if source_type == "artifact" or self._settings.job_source == "artifact":
# if the job is set to be an artifact, use relpath guaranteed
# to be correct. 'codePath' uses the root path when in git repo
# fallback to codePath if strictly local relpath not present
return metadata.get("codePathLocal") or metadata.get("codePath")
return metadata.get("codePath")
def _handle_metadata_file(
self,
) -> Optional[Dict]:
if os.path.exists(os.path.join(self._settings.files_dir, METADATA_FNAME)):
with open(os.path.join(self._settings.files_dir, METADATA_FNAME)) as f:
metadata: Dict = json.load(f)
return metadata
return None
def _has_git_job_ingredients(self, metadata: Dict[str, Any]) -> bool:
git_info: Dict[str, str] = metadata.get("git", {})
if self._is_notebook_run and metadata.get("root") is None:
return False
return git_info.get("remote") is not None and git_info.get("commit") is not None
def _has_artifact_job_ingredients(self) -> bool:
return self._logged_code_artifact is not None
def _has_image_job_ingredients(self, metadata: Dict[str, Any]) -> bool:
return metadata.get("docker") is not None