jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
"""Prodigy integration for W&B.
User can upload Prodigy annotated datasets directly
from the local database to W&B in Tables format.
Example usage:
```python
import wandb
from wandb.integration.prodigy import upload_dataset
run = wandb.init(project="prodigy")
upload_dataset("name_of_dataset")
wandb.finish()
```
"""
import base64
import collections.abc
import io
import urllib
from copy import deepcopy
import pandas as pd
from PIL import Image
import wandb
from wandb import util
from wandb.plot.utils import test_missing
from wandb.sdk.lib import telemetry as wb_telemetry
def named_entity(docs):
"""Create a named entity visualization.
Taken from https://github.com/wandb/wandb/blob/main/wandb/plots/named_entity.py.
"""
spacy = util.get_module(
"spacy",
required="part_of_speech requires the spacy library, install with `pip install spacy`",
)
util.get_module(
"en_core_web_md",
required="part_of_speech requires `en_core_web_md` library, install with `python -m spacy download en_core_web_md`",
)
# Test for required packages and missing & non-integer values in docs data
if test_missing(docs=docs):
html = spacy.displacy.render(
docs, style="ent", page=True, minify=True, jupyter=False
)
wandb_html = wandb.Html(html)
return wandb_html
def merge(dict1, dict2):
"""Return a new dictionary by merging two dictionaries recursively."""
result = deepcopy(dict1)
for key, value in dict2.items():
if isinstance(value, collections.abc.Mapping):
result[key] = merge(result.get(key, {}), value)
else:
result[key] = deepcopy(dict2[key])
return result
def get_schema(list_data_dict, struct, array_dict_types):
"""Get a schema of the dataset's structure and data types."""
# Get the structure of the JSON objects in the database
# This is similar to getting a JSON schema but with slightly different format
for _i, item in enumerate(list_data_dict):
# If the list contains dict objects
for k, v in item.items():
# Check if key already exists in template
if k not in struct.keys():
if isinstance(v, list):
if len(v) > 0 and isinstance(v[0], list):
# nested list structure
struct[k] = type(v) # type list
elif len(v) > 0 and not (
isinstance(v[0], list) or isinstance(v[0], dict)
):
# list of singular values
struct[k] = type(v) # type list
else:
# list of dicts
array_dict_types.append(
k
) # keep track of keys that are type list[dict]
struct[k] = {}
struct[k] = get_schema(v, struct[k], array_dict_types)
elif isinstance(v, dict):
struct[k] = {}
struct[k] = get_schema([v], struct[k], array_dict_types)
else:
struct[k] = type(v)
else:
# Get the value of struct[k] which is the current template
# Find new keys and then merge the two templates together
cur_struct = struct[k]
if isinstance(v, list):
if len(v) > 0 and isinstance(v[0], list):
# nested list coordinate structure
# if the value in the item is currently None, then update
if v is not None:
struct[k] = type(v) # type list
elif len(v) > 0 and not (
isinstance(v[0], list) or isinstance(v[0], dict)
):
# single list with values
# if the value in the item is currently None, then update
if v is not None:
struct[k] = type(v) # type list
else:
array_dict_types.append(
k
) # keep track of keys that are type list[dict]
struct[k] = {}
struct[k] = get_schema(v, struct[k], array_dict_types)
# merge cur_struct and struct[k], remove duplicates
struct[k] = merge(struct[k], cur_struct)
elif isinstance(v, dict):
struct[k] = {}
struct[k] = get_schema([v], struct[k], array_dict_types)
# merge cur_struct and struct[k], remove duplicates
struct[k] = merge(struct[k], cur_struct)
else:
# if the value in the item is currently None, then update
if v is not None:
struct[k] = type(v)
return struct
def standardize(item, structure, array_dict_types):
"""Standardize all rows/entries in dataset to fit the schema.
Will look for missing values and fill it in so all rows have
the same items and structure.
"""
for k, v in structure.items():
if k not in item:
# If the structure/field does not exist
if isinstance(v, dict) and (k not in array_dict_types):
# If key k is of type dict, and not not a type list[dict]
item[k] = {}
standardize(item[k], v, array_dict_types)
elif isinstance(v, dict) and (k in array_dict_types):
# If key k is of type dict, and is actually of type list[dict],
# just treat as a list and set to None by default
item[k] = None
else:
# Assign a default type
item[k] = v()
else:
# If the structure/field already exists and is a list or dict
if isinstance(item[k], list):
# ignore if item is a nested list structure or list of non-dicts
condition = (
not (len(item[k]) > 0 and isinstance(item[k][0], list))
) and (
not (
len(item[k]) > 0
and not (
isinstance(item[k][0], list) or isinstance(item[k][0], dict)
)
)
)
if condition:
for sub_item in item[k]:
standardize(sub_item, v, array_dict_types)
elif isinstance(item[k], dict):
standardize(item[k], v, array_dict_types)
def create_table(data):
"""Create a W&B Table.
- Create/decode images from URL/Base64
- Uses spacy to translate NER span data to visualizations.
"""
# create table object from columns
table_df = pd.DataFrame(data)
columns = list(table_df.columns)
if ("spans" in table_df.columns) and ("text" in table_df.columns):
columns.append("spans_visual")
if "image" in columns:
columns.append("image_visual")
main_table = wandb.Table(columns=columns)
# Convert to dictionary format to maintain order during processing
matrix = table_df.to_dict(orient="records")
# Import en_core_web_md if exists
en_core_web_md = util.get_module(
"en_core_web_md",
required="part_of_speech requires `en_core_web_md` library, install with `python -m spacy download en_core_web_md`",
)
nlp = en_core_web_md.load(disable=["ner"])
# Go through each individual row
for _i, document in enumerate(matrix):
# Text NER span visualizations
if ("spans_visual" in columns) and ("text" in columns):
# Add visuals for spans
document["spans_visual"] = None
doc = nlp(document["text"])
ents = []
if ("spans" in document) and (document["spans"] is not None):
for span in document["spans"]:
if ("start" in span) and ("end" in span) and ("label" in span):
charspan = doc.char_span(
span["start"], span["end"], span["label"]
)
ents.append(charspan)
doc.ents = ents
document["spans_visual"] = named_entity(docs=doc)
# Convert image link to wandb Image
if "image" in columns:
# Turn into wandb image
document["image_visual"] = None
if ("image" in document) and (document["image"] is not None):
isurl = urllib.parse.urlparse(document["image"]).scheme in (
"http",
"https",
)
isbase64 = ("data:" in document["image"]) and (
";base64" in document["image"]
)
if isurl:
# is url
try:
im = Image.open(urllib.request.urlopen(document["image"]))
document["image_visual"] = wandb.Image(im)
except urllib.error.URLError:
wandb.termwarn(f"Image URL {document['image']} is invalid.")
document["image_visual"] = None
elif isbase64:
# is base64 uri
imgb64 = document["image"].split("base64,")[1]
try:
msg = base64.b64decode(imgb64)
buf = io.BytesIO(msg)
im = Image.open(buf)
document["image_visual"] = wandb.Image(im)
except base64.binascii.Error:
wandb.termwarn(f"Base64 string {document['image']} is invalid.")
document["image_visual"] = None
else:
# is data path
document["image_visual"] = wandb.Image(document["image"])
# Create row and append to table
values_list = list(document.values())
main_table.add_data(*values_list)
return main_table
def upload_dataset(dataset_name):
"""Upload dataset from local database to Weights & Biases.
Args:
dataset_name: The name of the dataset in the Prodigy database.
"""
# Check if wandb.init has been called
if wandb.run is None:
raise ValueError("You must call wandb.init() before upload_dataset()")
with wb_telemetry.context(run=wandb.run) as tel:
tel.feature.prodigy = True
prodigy_db = util.get_module(
"prodigy.components.db",
required="`prodigy` library is required but not installed. Please see https://prodi.gy/docs/install",
)
# Retrieve and upload prodigy dataset
database = prodigy_db.connect()
data = database.get_dataset(dataset_name)
array_dict_types = []
schema = get_schema(data, {}, array_dict_types)
for i, _d in enumerate(data):
standardize(data[i], schema, array_dict_types)
table = create_table(data)
wandb.log({dataset_name: table})
wandb.termlog(f"Prodigy dataset `{dataset_name}` uploaded.")