File size: 5,180 Bytes
e0be88b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import argparse
import importlib.util
import logging
import os
import sys
from typing import Dict, Tuple
from psycopg2.extensions import register_adapter
from psycopg2.extras import Json
register_adapter(dict, Json)
class ImportModuleException(Exception):
pass
class MetricsRecorder:
def __init__(
self, connection, logger: logging.Logger, repository: str, branch: str, commit_id: str, commit_msg: str
):
self.conn = connection
self.conn.autocommit = True
self.logger = logger
self.repository = repository
self.branch = branch
self.commit_id = commit_id
self.commit_msg = commit_msg
def initialise_benchmark(self, metadata: Dict[str, str]) -> int:
"""
Creates a new benchmark, returns the benchmark id
"""
# gpu_name: str, model_id: str
with self.conn.cursor() as cur:
cur.execute(
"INSERT INTO benchmarks (repository, branch, commit_id, commit_message, metadata) VALUES (%s, %s, %s, %s, %s) RETURNING benchmark_id",
(self.repository, self.branch, self.commit_id, self.commit_msg, metadata),
)
benchmark_id = cur.fetchone()[0]
logger.debug(f"initialised benchmark #{benchmark_id}")
return benchmark_id
def collect_device_measurements(self, benchmark_id: int, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes):
"""
Collect device metrics, such as CPU & GPU usage. These are "static", as in you cannot pass arbitrary arguments to the function.
"""
with self.conn.cursor() as cur:
cur.execute(
"INSERT INTO device_measurements (benchmark_id, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes) VALUES (%s, %s, %s, %s, %s)",
(benchmark_id, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes),
)
self.logger.debug(
f"inserted device measurements for benchmark #{benchmark_id} [CPU util: {cpu_util}, mem MBs: {mem_megabytes}, GPU util: {gpu_util}, GPU mem MBs: {gpu_mem_megabytes}]"
)
def collect_model_measurements(self, benchmark_id: int, measurements: Dict[str, float]):
with self.conn.cursor() as cur:
cur.execute(
"""
INSERT INTO model_measurements (
benchmark_id,
measurements
) VALUES (%s, %s)
""",
(
benchmark_id,
measurements,
),
)
self.logger.debug(f"inserted model measurements for benchmark #{benchmark_id}: {measurements}")
def close(self):
self.conn.close()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(levelname)s - %(asctime)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
def parse_arguments() -> Tuple[str, str, str, str]:
"""
Parse command line arguments for the benchmarking CLI.
"""
parser = argparse.ArgumentParser(description="CLI for benchmarking the huggingface/transformers.")
parser.add_argument(
"repository",
type=str,
help="The repository name on which the benchmarking is performed.",
)
parser.add_argument(
"branch",
type=str,
help="The branch name on which the benchmarking is performed.",
)
parser.add_argument(
"commit_id",
type=str,
help="The commit hash on which the benchmarking is performed.",
)
parser.add_argument(
"commit_msg",
type=str,
help="The commit message associated with the commit, truncated to 70 characters.",
)
args = parser.parse_args()
return args.repository, args.branch, args.commit_id, args.commit_msg
def import_from_path(module_name, file_path):
try:
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
except Exception as e:
raise ImportModuleException(f"failed to load python module: {e}")
if __name__ == "__main__":
benchmarks_folder_path = os.path.dirname(os.path.realpath(__file__))
repository, branch, commit_id, commit_msg = parse_arguments()
for entry in os.scandir(benchmarks_folder_path):
try:
if not entry.name.endswith(".py"):
continue
if entry.path == __file__:
continue
logger.debug(f"loading: {entry.name}")
module = import_from_path(entry.name.split(".")[0], entry.path)
logger.info(f"running benchmarks in: {entry.name}")
module.run_benchmark(logger, repository, branch, commit_id, commit_msg)
except ImportModuleException as e:
logger.error(e)
except Exception as e:
logger.error(f"error running benchmarks for {entry.name}: {e}")
|