# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import abc from collections import OrderedDict from decimal import Decimal import io import itertools import json import string import unittest try: import numpy as np except ImportError: np = None import pytest import pyarrow as pa from pyarrow.json import read_json, open_json, ReadOptions, ParseOptions def generate_col_names(): # 'a', 'b'... 'z', then 'aa', 'ab'... letters = string.ascii_lowercase yield from letters for first in letters: for second in letters: yield first + second def make_random_json(num_cols=2, num_rows=10, linesep='\r\n'): arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) col_names = list(itertools.islice(generate_col_names(), num_cols)) lines = [] for row in arr.T: json_obj = OrderedDict([(k, int(v)) for (k, v) in zip(col_names, row)]) lines.append(json.dumps(json_obj)) data = linesep.join(lines).encode() columns = [pa.array(col, type=pa.int64()) for col in arr] expected = pa.Table.from_arrays(columns, col_names) return data, expected def check_options_class_pickling(cls, pickler, **attr_values): opts = cls(**attr_values) new_opts = pickler.loads(pickler.dumps(opts, protocol=pickler.HIGHEST_PROTOCOL)) for name, value in attr_values.items(): assert getattr(new_opts, name) == value def test_read_options(pickle_module): cls = ReadOptions opts = cls() assert opts.block_size > 0 opts.block_size = 12345 assert opts.block_size == 12345 assert opts.use_threads is True opts.use_threads = False assert opts.use_threads is False opts = cls(block_size=1234, use_threads=False) assert opts.block_size == 1234 assert opts.use_threads is False check_options_class_pickling(cls, pickler=pickle_module, block_size=1234, use_threads=False) def test_parse_options(pickle_module): cls = ParseOptions opts = cls() assert opts.newlines_in_values is False assert opts.explicit_schema is None opts.newlines_in_values = True assert opts.newlines_in_values is True schema = pa.schema([pa.field('foo', pa.int32())]) opts.explicit_schema = schema assert opts.explicit_schema == schema assert opts.unexpected_field_behavior == "infer" for value in ["ignore", "error", "infer"]: opts.unexpected_field_behavior = value assert opts.unexpected_field_behavior == value with pytest.raises(ValueError): opts.unexpected_field_behavior = "invalid-value" check_options_class_pickling(cls, pickler=pickle_module, explicit_schema=schema, newlines_in_values=False, unexpected_field_behavior="ignore") class BaseTestJSON(abc.ABC): @abc.abstractmethod def read_bytes(self, b, **kwargs): """ :param b: bytes to be parsed :param kwargs: arguments passed on to open the json file :return: b parsed as a single Table """ raise NotImplementedError def check_names(self, table, names): assert table.num_columns == len(names) assert [c.name for c in table.columns] == names def test_block_sizes(self): rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}' read_options = ReadOptions() parse_options = ParseOptions() for data in [rows, rows + b'\n']: for newlines_in_values in [False, True]: parse_options.newlines_in_values = newlines_in_values read_options.block_size = 4 with pytest.raises(ValueError, match="try to increase block size"): self.read_bytes(data, read_options=read_options, parse_options=parse_options) # Validate reader behavior with various block sizes. # There used to be bugs in this area. for block_size in range(9, 20): read_options.block_size = block_size table = self.read_bytes(data, read_options=read_options, parse_options=parse_options) assert table.to_pydict() == {'a': [1, 2, 3]} def test_no_newline_at_end(self): rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}' table = self.read_bytes(rows) assert table.to_pydict() == { 'a': [1, 4], 'b': [2, 5], 'c': [3, 6], } def test_simple_ints(self): # Infer integer columns rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}\n' table = self.read_bytes(rows) schema = pa.schema([('a', pa.int64()), ('b', pa.int64()), ('c', pa.int64())]) assert table.schema == schema assert table.to_pydict() == { 'a': [1, 4], 'b': [2, 5], 'c': [3, 6], } def test_simple_varied(self): # Infer various kinds of data rows = (b'{"a": 1,"b": 2, "c": "3", "d": false}\n' b'{"a": 4.0, "b": -5, "c": "foo", "d": true}\n') table = self.read_bytes(rows) schema = pa.schema([('a', pa.float64()), ('b', pa.int64()), ('c', pa.string()), ('d', pa.bool_())]) assert table.schema == schema assert table.to_pydict() == { 'a': [1.0, 4.0], 'b': [2, -5], 'c': ["3", "foo"], 'd': [False, True], } def test_simple_nulls(self): # Infer various kinds of data, with nulls rows = (b'{"a": 1, "b": 2, "c": null, "d": null, "e": null}\n' b'{"a": null, "b": -5, "c": "foo", "d": null, "e": true}\n' b'{"a": 4.5, "b": null, "c": "nan", "d": null,"e": false}\n') table = self.read_bytes(rows) schema = pa.schema([('a', pa.float64()), ('b', pa.int64()), ('c', pa.string()), ('d', pa.null()), ('e', pa.bool_())]) assert table.schema == schema assert table.to_pydict() == { 'a': [1.0, None, 4.5], 'b': [2, -5, None], 'c': [None, "foo", "nan"], 'd': [None, None, None], 'e': [None, True, False], } def test_empty_lists(self): # ARROW-10955: Infer list(null) rows = b'{"a": []}' table = self.read_bytes(rows) schema = pa.schema([('a', pa.list_(pa.null()))]) assert table.schema == schema assert table.to_pydict() == {'a': [[]]} def test_empty_rows(self): rows = b'{}\n{}\n' table = self.read_bytes(rows) schema = pa.schema([]) assert table.schema == schema assert table.num_columns == 0 assert table.num_rows == 2 def test_explicit_schema_decimal(self): rows = (b'{"a": 1}\n' b'{"a": 1.45}\n' b'{"a": -23.456}\n' b'{}\n') expected = { 'a': [Decimal("1"), Decimal("1.45"), Decimal("-23.456"), None], } decimal_types = (pa.decimal32, pa.decimal64, pa.decimal128, pa.decimal256) for type_factory in decimal_types: schema = pa.schema([('a', type_factory(9, 4))]) opts = ParseOptions(explicit_schema=schema) table = self.read_bytes(rows, parse_options=opts) assert table.schema == schema assert table.to_pydict() == expected def test_explicit_schema_with_unexpected_behaviour(self): # infer by default rows = (b'{"foo": "bar", "num": 0}\n' b'{"foo": "baz", "num": 1}\n') schema = pa.schema([ ('foo', pa.binary()) ]) opts = ParseOptions(explicit_schema=schema) table = self.read_bytes(rows, parse_options=opts) assert table.schema == pa.schema([ ('foo', pa.binary()), ('num', pa.int64()) ]) assert table.to_pydict() == { 'foo': [b'bar', b'baz'], 'num': [0, 1], } # ignore the unexpected fields opts = ParseOptions(explicit_schema=schema, unexpected_field_behavior="ignore") table = self.read_bytes(rows, parse_options=opts) assert table.schema == pa.schema([ ('foo', pa.binary()), ]) assert table.to_pydict() == { 'foo': [b'bar', b'baz'], } # raise error opts = ParseOptions(explicit_schema=schema, unexpected_field_behavior="error") with pytest.raises(pa.ArrowInvalid, match="JSON parse error: unexpected field"): self.read_bytes(rows, parse_options=opts) @pytest.mark.numpy def test_small_random_json(self): data, expected = make_random_json(num_cols=2, num_rows=10) table = self.read_bytes(data) assert table.schema == expected.schema assert table.equals(expected) assert table.to_pydict() == expected.to_pydict() @pytest.mark.numpy def test_load_large_json(self): data, expected = make_random_json(num_cols=2, num_rows=100100) # set block size is 10MB read_options = ReadOptions(block_size=1024*1024*10) table = self.read_bytes(data, read_options=read_options) assert table.num_rows == 100100 assert expected.num_rows == 100100 @pytest.mark.numpy def test_stress_block_sizes(self): # Test a number of small block sizes to stress block stitching data_base, expected = make_random_json(num_cols=2, num_rows=100) read_options = ReadOptions() parse_options = ParseOptions() for data in [data_base, data_base.rstrip(b'\r\n')]: for newlines_in_values in [False, True]: parse_options.newlines_in_values = newlines_in_values for block_size in [22, 23, 37]: read_options.block_size = block_size table = self.read_bytes(data, read_options=read_options, parse_options=parse_options) assert table.schema == expected.schema if not table.equals(expected): # Better error output assert table.to_pydict() == expected.to_pydict() class BaseTestJSONRead(BaseTestJSON): def read_bytes(self, b, **kwargs): return self.read_json(pa.py_buffer(b), **kwargs) def test_file_object(self): data = b'{"a": 1, "b": 2}\n' expected_data = {'a': [1], 'b': [2]} bio = io.BytesIO(data) table = self.read_json(bio) assert table.to_pydict() == expected_data # Text files not allowed sio = io.StringIO(data.decode()) with pytest.raises(TypeError): self.read_json(sio) def test_reconcile_across_blocks(self): # ARROW-12065: reconciling inferred types across blocks first_row = b'{ }\n' read_options = ReadOptions(block_size=len(first_row)) for next_rows, expected_pylist in [ (b'{"a": 0}', [None, 0]), (b'{"a": []}', [None, []]), (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), (b'{"a": {}}', [None, {}]), (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', [None, {"b": None}, {"b": {"c": 1}}]), ]: table = self.read_bytes(first_row + next_rows, read_options=read_options) expected = {"a": expected_pylist} assert table.to_pydict() == expected # Check that the issue was exercised assert table.column("a").num_chunks > 1 class BaseTestStreamingJSONRead(BaseTestJSON): def open_json(self, json, *args, **kwargs): """ Reads the JSON file into memory using pyarrow's open_json json The JSON bytes args Positional arguments to be forwarded to pyarrow's open_json kwargs Keyword arguments to be forwarded to pyarrow's open_json """ read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = self.use_threads return open_json(json, *args, **kwargs) def open_bytes(self, b, **kwargs): return self.open_json(pa.py_buffer(b), **kwargs) def check_reader(self, reader, expected_schema, expected_data): assert reader.schema == expected_schema batches = list(reader) assert len(batches) == len(expected_data) for batch, expected_batch in zip(batches, expected_data): batch.validate(full=True) assert batch.schema == expected_schema assert batch.to_pydict() == expected_batch def read_bytes(self, b, **kwargs): return self.open_bytes(b, **kwargs).read_all() def test_file_object(self): data = b'{"a": 1, "b": 2}\n' expected_data = {'a': [1], 'b': [2]} bio = io.BytesIO(data) reader = self.open_json(bio) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) self.check_reader(reader, expected_schema, [expected_data]) def test_bad_first_chunk(self): bad_first_chunk = b'{"i": 0 }\n{"i": 1}' read_options = ReadOptions() read_options.block_size = 3 with pytest.raises( pa.ArrowInvalid, match="straddling object straddles two block boundaries*" ): self.open_bytes(bad_first_chunk, read_options=read_options) def test_bad_middle_chunk(self): bad_middle_chunk = b'{"i": 0}\n{"i": 1}\n{"i": 2}' read_options = ReadOptions() read_options.block_size = 10 expected_schema = pa.schema([('i', pa.int64())]) reader = self.open_bytes(bad_middle_chunk, read_options=read_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == { 'i': [0] } with pytest.raises( pa.ArrowInvalid, match="straddling object straddles two block boundaries*" ): reader.read_next_batch() with pytest.raises(StopIteration): reader.read_next_batch() def test_bad_first_parse(self): bad_first_block = b'{"n": }\n{"n": 10000}' read_options = ReadOptions() read_options.block_size = 16 with pytest.raises(pa.ArrowInvalid, match="JSON parse error: Invalid value.*"): self.open_bytes(bad_first_block, read_options=read_options) def test_bad_middle_parse_after_empty(self): bad_first_block = b'{ }{"n": }\n{"n": 10000}' read_options = ReadOptions() read_options.block_size = 16 with pytest.raises(pa.ArrowInvalid, match="JSON parse error: Invalid value.*"): self.open_bytes(bad_first_block, read_options=read_options) def test_bad_middle_parse(self): bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}' read_options = ReadOptions() read_options.block_size = 10 expected_schema = pa.schema([('n', pa.int64())]) reader = self.open_bytes(bad_middle_chunk, read_options=read_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == { 'n': [1000] } with pytest.raises( pa.ArrowInvalid, match="JSON parse error:\ Missing a comma or '}' after an object member*" ): reader.read_next_batch() with pytest.raises(StopIteration): reader.read_next_batch() def test_non_linewise_chunker_first_block(self): bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' read_options = ReadOptions(block_size=10) parse_options = ParseOptions(newlines_in_values=True) expected_schema = pa.schema([('n', pa.int64())]) reader = self.open_bytes( bad_middle_chunk, read_options=read_options, parse_options=parse_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == { 'n': [0] } with pytest.raises(pa.ArrowInvalid, match="JSON parse error *"): reader.read_next_batch() with pytest.raises(StopIteration): reader.read_next_batch() def test_non_linewise_chunker_bad_first_block(self): bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' read_options = ReadOptions(block_size=10) parse_options = ParseOptions(newlines_in_values=True) expected_schema = pa.schema([('n', pa.int64())]) reader = self.open_bytes( bad_middle_chunk, read_options=read_options, parse_options=parse_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == { 'n': [0] } with pytest.raises(pa.ArrowInvalid, match="JSON parse error *"): reader.read_next_batch() with pytest.raises(StopIteration): reader.read_next_batch() def test_non_linewise_chunker_bad_middle_block(self): bad_middle_chunk = b'{"n": 0}\n{"n": 1}\n{}"n":2}\n{"n": 3}' read_options = ReadOptions(block_size=10) parse_options = ParseOptions(newlines_in_values=True) expected_schema = pa.schema([('n', pa.int64())]) reader = self.open_bytes( bad_middle_chunk, read_options=read_options, parse_options=parse_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == { 'n': [0] } assert reader.read_next_batch().to_pydict() == { 'n': [1] } with pytest.raises(pa.ArrowInvalid, match="JSON parse error *"): reader.read_next_batch() with pytest.raises(StopIteration): reader.read_next_batch() def test_ignore_leading_empty_blocks(self): leading_empty_chunk = b' \n{"b": true, "s": "foo"}' explicit_schema = pa.schema([ ('b', pa.bool_()), ('s', pa.utf8()) ]) read_options = ReadOptions(block_size=24) parse_options = ParseOptions(explicit_schema=explicit_schema) expected_data = { 'b': [True], 's': ["foo"] } reader = self.open_bytes( leading_empty_chunk, read_options=read_options, parse_options=parse_options) self.check_reader(reader, explicit_schema, [expected_data]) def test_inference(self): rows = b'{"a": 0, "b": "foo" }\n\ {"a": 1, "c": true }\n{"a": 2, "d": 4.0}' expected_schema = pa.schema([ ('a', pa.int64()), ('b', pa.utf8()) ]) expected_data = {'a': [0], 'b': ["foo"]} read_options = ReadOptions(block_size=32) parse_options = ParseOptions(unexpected_field_behavior="infer") reader = self.open_bytes( rows, read_options=read_options, parse_options=parse_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == expected_data with pytest.raises(pa.ArrowInvalid, match="JSON parse error: unexpected field"): reader.read_next_batch() expected_schema = pa.schema([ ('a', pa.int64()), ('b', pa.utf8()), ('c', pa.bool_()), ]) expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]} read_options = ReadOptions(block_size=64) reader = self.open_bytes(rows, read_options=read_options, parse_options=parse_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == expected_data with pytest.raises(pa.ArrowInvalid, match="JSON parse error: unexpected field"): reader.read_next_batch() expected_schema = pa.schema([ ('a', pa.int64()), ('b', pa.utf8()), ('c', pa.bool_()), ('d', pa.float64()), ]) expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None], 'c': [None, True, None], 'd': [None, None, 4.0]} read_options = ReadOptions(block_size=96) reader = self.open_bytes(rows, read_options=read_options, parse_options=parse_options) assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == expected_data class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): def read_json(self, *args, **kwargs): read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = False table = read_json(*args, **kwargs) table.validate(full=True) return table class TestParallelJSONRead(BaseTestJSONRead, unittest.TestCase): def read_json(self, *args, **kwargs): read_options = kwargs.setdefault('read_options', ReadOptions()) read_options.use_threads = True table = read_json(*args, **kwargs) table.validate(full=True) return table class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): use_threads = False @pytest.mark.threading class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): use_threads = True