File size: 11,368 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
"""Prodigy integration for W&B.

User can upload Prodigy annotated datasets directly
from the local database to W&B in Tables format.

Example usage:

```python
import wandb
from wandb.integration.prodigy import upload_dataset

run = wandb.init(project="prodigy")
upload_dataset("name_of_dataset")
wandb.finish()
```
"""

import base64
import collections.abc
import io
import urllib
from copy import deepcopy

import pandas as pd
from PIL import Image

import wandb
from wandb import util
from wandb.plot.utils import test_missing
from wandb.sdk.lib import telemetry as wb_telemetry


def named_entity(docs):
    """Create a named entity visualization.

    Taken from https://github.com/wandb/wandb/blob/main/wandb/plots/named_entity.py.
    """
    spacy = util.get_module(
        "spacy",
        required="part_of_speech requires the spacy library, install with `pip install spacy`",
    )

    util.get_module(
        "en_core_web_md",
        required="part_of_speech requires `en_core_web_md` library, install with `python -m spacy download en_core_web_md`",
    )

    # Test for required packages and missing & non-integer values in docs data
    if test_missing(docs=docs):
        html = spacy.displacy.render(
            docs, style="ent", page=True, minify=True, jupyter=False
        )
        wandb_html = wandb.Html(html)
        return wandb_html


def merge(dict1, dict2):
    """Return a new dictionary by merging two dictionaries recursively."""
    result = deepcopy(dict1)

    for key, value in dict2.items():
        if isinstance(value, collections.abc.Mapping):
            result[key] = merge(result.get(key, {}), value)
        else:
            result[key] = deepcopy(dict2[key])

    return result


def get_schema(list_data_dict, struct, array_dict_types):
    """Get a schema of the dataset's structure and data types."""
    # Get the structure of the JSON objects in the database
    # This is similar to getting a JSON schema but with slightly different format
    for _i, item in enumerate(list_data_dict):
        # If the list contains dict objects
        for k, v in item.items():
            # Check if key already exists in template
            if k not in struct.keys():
                if isinstance(v, list):
                    if len(v) > 0 and isinstance(v[0], list):
                        # nested list structure
                        struct[k] = type(v)  # type list
                    elif len(v) > 0 and not (
                        isinstance(v[0], list) or isinstance(v[0], dict)
                    ):
                        # list of singular values
                        struct[k] = type(v)  # type list
                    else:
                        # list of dicts
                        array_dict_types.append(
                            k
                        )  # keep track of keys that are type list[dict]
                        struct[k] = {}
                        struct[k] = get_schema(v, struct[k], array_dict_types)
                elif isinstance(v, dict):
                    struct[k] = {}
                    struct[k] = get_schema([v], struct[k], array_dict_types)
                else:
                    struct[k] = type(v)
            else:
                # Get the value of struct[k] which is the current template
                # Find new keys and then merge the two templates together
                cur_struct = struct[k]
                if isinstance(v, list):
                    if len(v) > 0 and isinstance(v[0], list):
                        # nested list coordinate structure
                        # if the value in the item is currently None, then update
                        if v is not None:
                            struct[k] = type(v)  # type list
                    elif len(v) > 0 and not (
                        isinstance(v[0], list) or isinstance(v[0], dict)
                    ):
                        # single list with values
                        # if the value in the item is currently None, then update
                        if v is not None:
                            struct[k] = type(v)  # type list
                    else:
                        array_dict_types.append(
                            k
                        )  # keep track of keys that are type list[dict]
                        struct[k] = {}
                        struct[k] = get_schema(v, struct[k], array_dict_types)
                        # merge cur_struct and struct[k], remove duplicates
                        struct[k] = merge(struct[k], cur_struct)
                elif isinstance(v, dict):
                    struct[k] = {}
                    struct[k] = get_schema([v], struct[k], array_dict_types)
                    # merge cur_struct and struct[k], remove duplicates
                    struct[k] = merge(struct[k], cur_struct)
                else:
                    # if the value in the item is currently None, then update
                    if v is not None:
                        struct[k] = type(v)

    return struct


def standardize(item, structure, array_dict_types):
    """Standardize all rows/entries in dataset to fit the schema.

    Will look for missing values and fill it in so all rows have
    the same items and structure.
    """
    for k, v in structure.items():
        if k not in item:
            # If the structure/field does not exist
            if isinstance(v, dict) and (k not in array_dict_types):
                # If key k is of type dict, and not not a type list[dict]
                item[k] = {}
                standardize(item[k], v, array_dict_types)
            elif isinstance(v, dict) and (k in array_dict_types):
                # If key k is of type dict, and is actually of type list[dict],
                # just treat as a list and set to None by default
                item[k] = None
            else:
                # Assign a default type
                item[k] = v()
        else:
            # If the structure/field already exists and is a list or dict
            if isinstance(item[k], list):
                # ignore if item is a nested list structure or list of non-dicts
                condition = (
                    not (len(item[k]) > 0 and isinstance(item[k][0], list))
                ) and (
                    not (
                        len(item[k]) > 0
                        and not (
                            isinstance(item[k][0], list) or isinstance(item[k][0], dict)
                        )
                    )
                )
                if condition:
                    for sub_item in item[k]:
                        standardize(sub_item, v, array_dict_types)
            elif isinstance(item[k], dict):
                standardize(item[k], v, array_dict_types)


def create_table(data):
    """Create a W&B Table.

    - Create/decode images from URL/Base64
    - Uses spacy to translate NER span data to visualizations.
    """
    # create table object from columns
    table_df = pd.DataFrame(data)
    columns = list(table_df.columns)
    if ("spans" in table_df.columns) and ("text" in table_df.columns):
        columns.append("spans_visual")
    if "image" in columns:
        columns.append("image_visual")
    main_table = wandb.Table(columns=columns)

    # Convert to dictionary format to maintain order during processing
    matrix = table_df.to_dict(orient="records")

    # Import en_core_web_md if exists
    en_core_web_md = util.get_module(
        "en_core_web_md",
        required="part_of_speech requires `en_core_web_md` library, install with `python -m spacy download en_core_web_md`",
    )
    nlp = en_core_web_md.load(disable=["ner"])

    # Go through each individual row
    for _i, document in enumerate(matrix):
        # Text NER span visualizations
        if ("spans_visual" in columns) and ("text" in columns):
            # Add visuals for spans
            document["spans_visual"] = None
            doc = nlp(document["text"])
            ents = []
            if ("spans" in document) and (document["spans"] is not None):
                for span in document["spans"]:
                    if ("start" in span) and ("end" in span) and ("label" in span):
                        charspan = doc.char_span(
                            span["start"], span["end"], span["label"]
                        )
                        ents.append(charspan)
                doc.ents = ents
                document["spans_visual"] = named_entity(docs=doc)

        # Convert image link to wandb Image
        if "image" in columns:
            # Turn into wandb image
            document["image_visual"] = None
            if ("image" in document) and (document["image"] is not None):
                isurl = urllib.parse.urlparse(document["image"]).scheme in (
                    "http",
                    "https",
                )
                isbase64 = ("data:" in document["image"]) and (
                    ";base64" in document["image"]
                )
                if isurl:
                    # is url
                    try:
                        im = Image.open(urllib.request.urlopen(document["image"]))
                        document["image_visual"] = wandb.Image(im)
                    except urllib.error.URLError:
                        wandb.termwarn(f"Image URL {document['image']} is invalid.")
                        document["image_visual"] = None
                elif isbase64:
                    # is base64 uri
                    imgb64 = document["image"].split("base64,")[1]
                    try:
                        msg = base64.b64decode(imgb64)
                        buf = io.BytesIO(msg)
                        im = Image.open(buf)
                        document["image_visual"] = wandb.Image(im)
                    except base64.binascii.Error:
                        wandb.termwarn(f"Base64 string {document['image']} is invalid.")
                        document["image_visual"] = None
                else:
                    # is data path
                    document["image_visual"] = wandb.Image(document["image"])

        # Create row and append to table
        values_list = list(document.values())
        main_table.add_data(*values_list)
    return main_table


def upload_dataset(dataset_name):
    """Upload dataset from local database to Weights & Biases.

    Args:
        dataset_name: The name of the dataset in the Prodigy database.
    """
    # Check if wandb.init has been called
    if wandb.run is None:
        raise ValueError("You must call wandb.init() before upload_dataset()")

    with wb_telemetry.context(run=wandb.run) as tel:
        tel.feature.prodigy = True

    prodigy_db = util.get_module(
        "prodigy.components.db",
        required="`prodigy` library is required but not installed. Please see https://prodi.gy/docs/install",
    )
    # Retrieve and upload prodigy dataset
    database = prodigy_db.connect()
    data = database.get_dataset(dataset_name)

    array_dict_types = []
    schema = get_schema(data, {}, array_dict_types)

    for i, _d in enumerate(data):
        standardize(data[i], schema, array_dict_types)
    table = create_table(data)
    wandb.log({dataset_name: table})
    wandb.termlog(f"Prodigy dataset `{dataset_name}` uploaded.")