File size: 7,858 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import pytz
import wandb
from wandb.sdk.integration_utils.auto_logging import Response
from wandb.sdk.lib.runid import generate_id
logger = logging.getLogger(__name__)
SUPPORTED_PIPELINE_TASKS = [
"text-classification",
"sentiment-analysis",
"question-answering",
"summarization",
"translation",
"text2text-generation",
"text-generation",
# "conversational",
]
PIPELINES_WITH_TOP_K = [
"text-classification",
"sentiment-analysis",
"question-answering",
]
class HuggingFacePipelineRequestResponseResolver:
"""Resolver for HuggingFace's pipeline request and responses, providing necessary data transformations and formatting.
This is based off (from wandb.sdk.integration_utils.auto_logging import RequestResponseResolver)
"""
autolog_id = None
def __call__(
self,
args: Sequence[Any],
kwargs: Dict[str, Any],
response: Response,
start_time: float,
time_elapsed: float,
) -> Optional[Dict[str, Any]]:
"""Main call method for this class.
:param args: list of arguments
:param kwargs: dictionary of keyword arguments
:param response: the response from the request
:param start_time: time when request started
:param time_elapsed: time elapsed for the request
:returns: packed data as a dictionary for logging to wandb, None if an exception occurred
"""
try:
pipe, input_data = args[:2]
task = pipe.task
# Translation tasks are in the form of `translation_x_to_y`
if task in SUPPORTED_PIPELINE_TASKS or task.startswith("translation"):
model = self._get_model(pipe)
if model is None:
return None
model_alias = model.name_or_path
timestamp = datetime.now(pytz.utc)
input_data, response = self._transform_task_specific_data(
task, input_data, response
)
formatted_data = self._format_data(task, input_data, response, kwargs)
packed_data = self._create_table(
formatted_data, model_alias, timestamp, time_elapsed
)
table_name = os.environ.get("WANDB_AUTOLOG_TABLE_NAME", f"{task}")
# TODO: Let users decide the name in a way that does not use an environment variable
return {
table_name: wandb.Table(
columns=packed_data[0], data=packed_data[1:]
)
}
logger.warning(
f"The task: `{task}` is not yet supported.\nPlease contact `wandb` to notify us if you would like support for this task"
)
except Exception as e:
logger.warning(e)
return None
# TODO: This should have a dependency on PreTrainedModel. i.e. isinstance(PreTrainedModel)
# from transformers.modeling_utils import PreTrainedModel
# We do not want this dependency explicitly in our codebase so we make a very general
# assumption about the structure of the pipeline which may have unintended consequences
def _get_model(self, pipe) -> Optional[Any]:
"""Extracts model from the pipeline.
:param pipe: the HuggingFace pipeline
:returns: Model if available, None otherwise
"""
model = pipe.model
try:
return model.model
except AttributeError:
logger.info(
"Model does not have a `.model` attribute. Assuming `pipe.model` is the correct model."
)
return model
@staticmethod
def _transform_task_specific_data(
task: str, input_data: Union[List[Any], Any], response: Union[List[Any], Any]
) -> Tuple[Union[List[Any], Any], Union[List[Any], Any]]:
"""Transform input and response data based on specific tasks.
:param task: the task name
:param input_data: the input data
:param response: the response data
:returns: tuple of transformed input_data and response
"""
if task == "question-answering":
input_data = input_data if isinstance(input_data, list) else [input_data]
input_data = [data.__dict__ for data in input_data]
elif task == "conversational":
# We only grab the latest input/output pair from the conversation
# Logging the whole conversation renders strangely.
input_data = input_data if isinstance(input_data, list) else [input_data]
input_data = [data.__dict__["past_user_inputs"][-1] for data in input_data]
response = response if isinstance(response, list) else [response]
response = [data.__dict__["generated_responses"][-1] for data in response]
return input_data, response
def _format_data(
self,
task: str,
input_data: Union[List[Any], Any],
response: Union[List[Any], Any],
kwargs: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Formats input data, response, and kwargs into a list of dictionaries.
:param task: the task name
:param input_data: the input data
:param response: the response data
:param kwargs: dictionary of keyword arguments
:returns: list of dictionaries containing formatted data
"""
input_data = input_data if isinstance(input_data, list) else [input_data]
response = response if isinstance(response, list) else [response]
formatted_data = []
for i_text, r_text in zip(input_data, response):
# Unpack single element responses for better rendering in wandb UI when it is a task without top_k
# top_k = 1 would unpack the response into a single element while top_k > 1 would be a list
# this would cause the UI to not properly concatenate the tables of the same task by omitting the elements past the first
if (
(isinstance(r_text, list))
and (len(r_text) == 1)
and task not in PIPELINES_WITH_TOP_K
):
r_text = r_text[0]
formatted_data.append(
{"input": i_text, "response": r_text, "kwargs": kwargs}
)
return formatted_data
def _create_table(
self,
formatted_data: List[Dict[str, Any]],
model_alias: str,
timestamp: float,
time_elapsed: float,
) -> List[List[Any]]:
"""Creates a table from formatted data, model alias, timestamp, and elapsed time.
:param formatted_data: list of dictionaries containing formatted data
:param model_alias: alias of the model
:param timestamp: timestamp of the data
:param time_elapsed: time elapsed from the beginning
:returns: list of lists, representing a table of data. [0]th element = columns. [1]st element = data
"""
header = [
"ID",
"Model Alias",
"Timestamp",
"Elapsed Time",
"Input",
"Response",
"Kwargs",
]
table = [header]
autolog_id = generate_id(length=16)
for data in formatted_data:
row = [
autolog_id,
model_alias,
timestamp,
time_elapsed,
data["input"],
data["response"],
data["kwargs"],
]
table.append(row)
self.autolog_id = autolog_id
return table
def get_latest_id(self):
return self.autolog_id
|