|
"""Prodigy integration for W&B. |
|
|
|
User can upload Prodigy annotated datasets directly |
|
from the local database to W&B in Tables format. |
|
|
|
Example usage: |
|
|
|
```python |
|
import wandb |
|
from wandb.integration.prodigy import upload_dataset |
|
|
|
run = wandb.init(project="prodigy") |
|
upload_dataset("name_of_dataset") |
|
wandb.finish() |
|
``` |
|
""" |
|
|
|
import base64 |
|
import collections.abc |
|
import io |
|
import urllib |
|
from copy import deepcopy |
|
|
|
import pandas as pd |
|
from PIL import Image |
|
|
|
import wandb |
|
from wandb import util |
|
from wandb.plot.utils import test_missing |
|
from wandb.sdk.lib import telemetry as wb_telemetry |
|
|
|
|
|
def named_entity(docs): |
|
"""Create a named entity visualization. |
|
|
|
Taken from https://github.com/wandb/wandb/blob/main/wandb/plots/named_entity.py. |
|
""" |
|
spacy = util.get_module( |
|
"spacy", |
|
required="part_of_speech requires the spacy library, install with `pip install spacy`", |
|
) |
|
|
|
util.get_module( |
|
"en_core_web_md", |
|
required="part_of_speech requires `en_core_web_md` library, install with `python -m spacy download en_core_web_md`", |
|
) |
|
|
|
|
|
if test_missing(docs=docs): |
|
html = spacy.displacy.render( |
|
docs, style="ent", page=True, minify=True, jupyter=False |
|
) |
|
wandb_html = wandb.Html(html) |
|
return wandb_html |
|
|
|
|
|
def merge(dict1, dict2): |
|
"""Return a new dictionary by merging two dictionaries recursively.""" |
|
result = deepcopy(dict1) |
|
|
|
for key, value in dict2.items(): |
|
if isinstance(value, collections.abc.Mapping): |
|
result[key] = merge(result.get(key, {}), value) |
|
else: |
|
result[key] = deepcopy(dict2[key]) |
|
|
|
return result |
|
|
|
|
|
def get_schema(list_data_dict, struct, array_dict_types): |
|
"""Get a schema of the dataset's structure and data types.""" |
|
|
|
|
|
for _i, item in enumerate(list_data_dict): |
|
|
|
for k, v in item.items(): |
|
|
|
if k not in struct.keys(): |
|
if isinstance(v, list): |
|
if len(v) > 0 and isinstance(v[0], list): |
|
|
|
struct[k] = type(v) |
|
elif len(v) > 0 and not ( |
|
isinstance(v[0], list) or isinstance(v[0], dict) |
|
): |
|
|
|
struct[k] = type(v) |
|
else: |
|
|
|
array_dict_types.append( |
|
k |
|
) |
|
struct[k] = {} |
|
struct[k] = get_schema(v, struct[k], array_dict_types) |
|
elif isinstance(v, dict): |
|
struct[k] = {} |
|
struct[k] = get_schema([v], struct[k], array_dict_types) |
|
else: |
|
struct[k] = type(v) |
|
else: |
|
|
|
|
|
cur_struct = struct[k] |
|
if isinstance(v, list): |
|
if len(v) > 0 and isinstance(v[0], list): |
|
|
|
|
|
if v is not None: |
|
struct[k] = type(v) |
|
elif len(v) > 0 and not ( |
|
isinstance(v[0], list) or isinstance(v[0], dict) |
|
): |
|
|
|
|
|
if v is not None: |
|
struct[k] = type(v) |
|
else: |
|
array_dict_types.append( |
|
k |
|
) |
|
struct[k] = {} |
|
struct[k] = get_schema(v, struct[k], array_dict_types) |
|
|
|
struct[k] = merge(struct[k], cur_struct) |
|
elif isinstance(v, dict): |
|
struct[k] = {} |
|
struct[k] = get_schema([v], struct[k], array_dict_types) |
|
|
|
struct[k] = merge(struct[k], cur_struct) |
|
else: |
|
|
|
if v is not None: |
|
struct[k] = type(v) |
|
|
|
return struct |
|
|
|
|
|
def standardize(item, structure, array_dict_types): |
|
"""Standardize all rows/entries in dataset to fit the schema. |
|
|
|
Will look for missing values and fill it in so all rows have |
|
the same items and structure. |
|
""" |
|
for k, v in structure.items(): |
|
if k not in item: |
|
|
|
if isinstance(v, dict) and (k not in array_dict_types): |
|
|
|
item[k] = {} |
|
standardize(item[k], v, array_dict_types) |
|
elif isinstance(v, dict) and (k in array_dict_types): |
|
|
|
|
|
item[k] = None |
|
else: |
|
|
|
item[k] = v() |
|
else: |
|
|
|
if isinstance(item[k], list): |
|
|
|
condition = ( |
|
not (len(item[k]) > 0 and isinstance(item[k][0], list)) |
|
) and ( |
|
not ( |
|
len(item[k]) > 0 |
|
and not ( |
|
isinstance(item[k][0], list) or isinstance(item[k][0], dict) |
|
) |
|
) |
|
) |
|
if condition: |
|
for sub_item in item[k]: |
|
standardize(sub_item, v, array_dict_types) |
|
elif isinstance(item[k], dict): |
|
standardize(item[k], v, array_dict_types) |
|
|
|
|
|
def create_table(data): |
|
"""Create a W&B Table. |
|
|
|
- Create/decode images from URL/Base64 |
|
- Uses spacy to translate NER span data to visualizations. |
|
""" |
|
|
|
table_df = pd.DataFrame(data) |
|
columns = list(table_df.columns) |
|
if ("spans" in table_df.columns) and ("text" in table_df.columns): |
|
columns.append("spans_visual") |
|
if "image" in columns: |
|
columns.append("image_visual") |
|
main_table = wandb.Table(columns=columns) |
|
|
|
|
|
matrix = table_df.to_dict(orient="records") |
|
|
|
|
|
en_core_web_md = util.get_module( |
|
"en_core_web_md", |
|
required="part_of_speech requires `en_core_web_md` library, install with `python -m spacy download en_core_web_md`", |
|
) |
|
nlp = en_core_web_md.load(disable=["ner"]) |
|
|
|
|
|
for _i, document in enumerate(matrix): |
|
|
|
if ("spans_visual" in columns) and ("text" in columns): |
|
|
|
document["spans_visual"] = None |
|
doc = nlp(document["text"]) |
|
ents = [] |
|
if ("spans" in document) and (document["spans"] is not None): |
|
for span in document["spans"]: |
|
if ("start" in span) and ("end" in span) and ("label" in span): |
|
charspan = doc.char_span( |
|
span["start"], span["end"], span["label"] |
|
) |
|
ents.append(charspan) |
|
doc.ents = ents |
|
document["spans_visual"] = named_entity(docs=doc) |
|
|
|
|
|
if "image" in columns: |
|
|
|
document["image_visual"] = None |
|
if ("image" in document) and (document["image"] is not None): |
|
isurl = urllib.parse.urlparse(document["image"]).scheme in ( |
|
"http", |
|
"https", |
|
) |
|
isbase64 = ("data:" in document["image"]) and ( |
|
";base64" in document["image"] |
|
) |
|
if isurl: |
|
|
|
try: |
|
im = Image.open(urllib.request.urlopen(document["image"])) |
|
document["image_visual"] = wandb.Image(im) |
|
except urllib.error.URLError: |
|
wandb.termwarn(f"Image URL {document['image']} is invalid.") |
|
document["image_visual"] = None |
|
elif isbase64: |
|
|
|
imgb64 = document["image"].split("base64,")[1] |
|
try: |
|
msg = base64.b64decode(imgb64) |
|
buf = io.BytesIO(msg) |
|
im = Image.open(buf) |
|
document["image_visual"] = wandb.Image(im) |
|
except base64.binascii.Error: |
|
wandb.termwarn(f"Base64 string {document['image']} is invalid.") |
|
document["image_visual"] = None |
|
else: |
|
|
|
document["image_visual"] = wandb.Image(document["image"]) |
|
|
|
|
|
values_list = list(document.values()) |
|
main_table.add_data(*values_list) |
|
return main_table |
|
|
|
|
|
def upload_dataset(dataset_name): |
|
"""Upload dataset from local database to Weights & Biases. |
|
|
|
Args: |
|
dataset_name: The name of the dataset in the Prodigy database. |
|
""" |
|
|
|
if wandb.run is None: |
|
raise ValueError("You must call wandb.init() before upload_dataset()") |
|
|
|
with wb_telemetry.context(run=wandb.run) as tel: |
|
tel.feature.prodigy = True |
|
|
|
prodigy_db = util.get_module( |
|
"prodigy.components.db", |
|
required="`prodigy` library is required but not installed. Please see https://prodi.gy/docs/install", |
|
) |
|
|
|
database = prodigy_db.connect() |
|
data = database.get_dataset(dataset_name) |
|
|
|
array_dict_types = [] |
|
schema = get_schema(data, {}, array_dict_types) |
|
|
|
for i, _d in enumerate(data): |
|
standardize(data[i], schema, array_dict_types) |
|
table = create_table(data) |
|
wandb.log({dataset_name: table}) |
|
wandb.termlog(f"Prodigy dataset `{dataset_name}` uploaded.") |
|
|