|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from pyarrow.includes.common cimport * |
|
from pyarrow.includes.libarrow cimport * |
|
|
|
from pyarrow.lib cimport (_Weakrefable, Schema, |
|
RecordBatchReader, MemoryPool, |
|
maybe_unbox_memory_pool, |
|
get_input_stream, pyarrow_wrap_table, |
|
pyarrow_wrap_schema, pyarrow_unwrap_schema) |
|
|
|
|
|
cdef class ReadOptions(_Weakrefable): |
|
""" |
|
Options for reading JSON files. |
|
|
|
Parameters |
|
---------- |
|
use_threads : bool, optional (default True) |
|
Whether to use multiple threads to accelerate reading |
|
block_size : int, optional |
|
How much bytes to process at a time from the input stream. |
|
This will determine multi-threading granularity as well as |
|
the size of individual chunks in the Table. |
|
""" |
|
|
|
|
|
__slots__ = () |
|
|
|
def __init__(self, use_threads=None, block_size=None): |
|
self.options = CJSONReadOptions.Defaults() |
|
if use_threads is not None: |
|
self.use_threads = use_threads |
|
if block_size is not None: |
|
self.block_size = block_size |
|
|
|
@property |
|
def use_threads(self): |
|
""" |
|
Whether to use multiple threads to accelerate reading. |
|
""" |
|
return self.options.use_threads |
|
|
|
@use_threads.setter |
|
def use_threads(self, value): |
|
self.options.use_threads = value |
|
|
|
@property |
|
def block_size(self): |
|
""" |
|
How much bytes to process at a time from the input stream. |
|
|
|
This will determine multi-threading granularity as well as the size of |
|
individual chunks in the Table. |
|
""" |
|
return self.options.block_size |
|
|
|
@block_size.setter |
|
def block_size(self, value): |
|
self.options.block_size = value |
|
|
|
def __reduce__(self): |
|
return ReadOptions, ( |
|
self.use_threads, |
|
self.block_size |
|
) |
|
|
|
def equals(self, ReadOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.json.ReadOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return ( |
|
self.use_threads == other.use_threads and |
|
self.block_size == other.block_size |
|
) |
|
|
|
def __eq__(self, other): |
|
try: |
|
return self.equals(other) |
|
except TypeError: |
|
return False |
|
|
|
@staticmethod |
|
cdef ReadOptions wrap(CJSONReadOptions options): |
|
out = ReadOptions() |
|
out.options = options |
|
return out |
|
|
|
|
|
cdef class ParseOptions(_Weakrefable): |
|
""" |
|
Options for parsing JSON files. |
|
|
|
Parameters |
|
---------- |
|
explicit_schema : Schema, optional (default None) |
|
Optional explicit schema (no type inference, ignores other fields). |
|
newlines_in_values : bool, optional (default False) |
|
Whether objects may be printed across multiple lines (for example |
|
pretty printed). If false, input must end with an empty line. |
|
unexpected_field_behavior : str, default "infer" |
|
How JSON fields outside of explicit_schema (if given) are treated. |
|
|
|
Possible behaviors: |
|
|
|
- "ignore": unexpected JSON fields are ignored |
|
- "error": error out on unexpected JSON fields |
|
- "infer": unexpected JSON fields are type-inferred and included in |
|
the output |
|
""" |
|
|
|
__slots__ = () |
|
|
|
def __init__(self, explicit_schema=None, newlines_in_values=None, |
|
unexpected_field_behavior=None): |
|
self.options = CJSONParseOptions.Defaults() |
|
if explicit_schema is not None: |
|
self.explicit_schema = explicit_schema |
|
if newlines_in_values is not None: |
|
self.newlines_in_values = newlines_in_values |
|
if unexpected_field_behavior is not None: |
|
self.unexpected_field_behavior = unexpected_field_behavior |
|
|
|
def __reduce__(self): |
|
return ParseOptions, ( |
|
self.explicit_schema, |
|
self.newlines_in_values, |
|
self.unexpected_field_behavior |
|
) |
|
|
|
@property |
|
def explicit_schema(self): |
|
""" |
|
Optional explicit schema (no type inference, ignores other fields) |
|
""" |
|
if self.options.explicit_schema.get() == NULL: |
|
return None |
|
else: |
|
return pyarrow_wrap_schema(self.options.explicit_schema) |
|
|
|
@explicit_schema.setter |
|
def explicit_schema(self, value): |
|
self.options.explicit_schema = pyarrow_unwrap_schema(value) |
|
|
|
@property |
|
def newlines_in_values(self): |
|
""" |
|
Whether newline characters are allowed in JSON values. |
|
Setting this to True reduces the performance of multi-threaded |
|
JSON reading. |
|
""" |
|
return self.options.newlines_in_values |
|
|
|
@newlines_in_values.setter |
|
def newlines_in_values(self, value): |
|
self.options.newlines_in_values = value |
|
|
|
@property |
|
def unexpected_field_behavior(self): |
|
""" |
|
How JSON fields outside of explicit_schema (if given) are treated. |
|
|
|
Possible behaviors: |
|
|
|
- "ignore": unexpected JSON fields are ignored |
|
- "error": error out on unexpected JSON fields |
|
- "infer": unexpected JSON fields are type-inferred and included in |
|
the output |
|
|
|
Set to "infer" by default. |
|
""" |
|
v = self.options.unexpected_field_behavior |
|
if v == CUnexpectedFieldBehavior_Ignore: |
|
return "ignore" |
|
elif v == CUnexpectedFieldBehavior_Error: |
|
return "error" |
|
elif v == CUnexpectedFieldBehavior_InferType: |
|
return "infer" |
|
else: |
|
raise ValueError('Unexpected value for unexpected_field_behavior') |
|
|
|
@unexpected_field_behavior.setter |
|
def unexpected_field_behavior(self, value): |
|
cdef CUnexpectedFieldBehavior v |
|
|
|
if value == "ignore": |
|
v = CUnexpectedFieldBehavior_Ignore |
|
elif value == "error": |
|
v = CUnexpectedFieldBehavior_Error |
|
elif value == "infer": |
|
v = CUnexpectedFieldBehavior_InferType |
|
else: |
|
raise ValueError( |
|
"Unexpected value `{}` for `unexpected_field_behavior`, pass " |
|
"either `ignore`, `error` or `infer`.".format(value) |
|
) |
|
|
|
self.options.unexpected_field_behavior = v |
|
|
|
def equals(self, ParseOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.json.ParseOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return ( |
|
self.explicit_schema == other.explicit_schema and |
|
self.newlines_in_values == other.newlines_in_values and |
|
self.unexpected_field_behavior == other.unexpected_field_behavior |
|
) |
|
|
|
def __eq__(self, other): |
|
try: |
|
return self.equals(other) |
|
except TypeError: |
|
return False |
|
|
|
@staticmethod |
|
cdef ParseOptions wrap(CJSONParseOptions options): |
|
out = ParseOptions() |
|
out.options = options |
|
return out |
|
|
|
|
|
cdef _get_reader(input_file, shared_ptr[CInputStream]* out): |
|
use_memory_map = False |
|
get_input_stream(input_file, use_memory_map, out) |
|
|
|
cdef _get_read_options(ReadOptions read_options, CJSONReadOptions* out): |
|
if read_options is None: |
|
out[0] = CJSONReadOptions.Defaults() |
|
else: |
|
out[0] = read_options.options |
|
|
|
cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out): |
|
if parse_options is None: |
|
out[0] = CJSONParseOptions.Defaults() |
|
else: |
|
out[0] = parse_options.options |
|
|
|
|
|
cdef class JSONStreamingReader(RecordBatchReader): |
|
"""An object that reads record batches incrementally from a JSON file. |
|
|
|
Should not be instantiated directly by user code. |
|
""" |
|
cdef readonly: |
|
Schema schema |
|
|
|
def __init__(self): |
|
raise TypeError(f"Do not call {self.__class__.__name__}'s " |
|
"constructor directly, " |
|
"use pyarrow.json.open_json() instead.") |
|
|
|
cdef _open(self, shared_ptr[CInputStream] stream, |
|
CJSONReadOptions c_read_options, |
|
CJSONParseOptions c_parse_options, |
|
MemoryPool memory_pool): |
|
cdef: |
|
shared_ptr[CSchema] c_schema |
|
CIOContext io_context |
|
|
|
io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) |
|
|
|
with nogil: |
|
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue( |
|
CJSONStreamingReader.Make(stream, move(c_read_options), |
|
move(c_parse_options), io_context)) |
|
c_schema = self.reader.get().schema() |
|
|
|
self.schema = pyarrow_wrap_schema(c_schema) |
|
|
|
|
|
def read_json(input_file, read_options=None, parse_options=None, |
|
MemoryPool memory_pool=None): |
|
""" |
|
Read a Table from a stream of JSON data. |
|
|
|
Parameters |
|
---------- |
|
input_file : str, path or file-like object |
|
The location of JSON data. Currently only the line-delimited JSON |
|
format is supported. |
|
read_options : pyarrow.json.ReadOptions, optional |
|
Options for the JSON reader (see ReadOptions constructor for defaults). |
|
parse_options : pyarrow.json.ParseOptions, optional |
|
Options for the JSON parser |
|
(see ParseOptions constructor for defaults). |
|
memory_pool : MemoryPool, optional |
|
Pool to allocate Table memory from. |
|
|
|
Returns |
|
------- |
|
:class:`pyarrow.Table` |
|
Contents of the JSON file as a in-memory table. |
|
""" |
|
cdef: |
|
shared_ptr[CInputStream] stream |
|
CJSONReadOptions c_read_options |
|
CJSONParseOptions c_parse_options |
|
shared_ptr[CJSONReader] reader |
|
shared_ptr[CTable] table |
|
|
|
_get_reader(input_file, &stream) |
|
_get_read_options(read_options, &c_read_options) |
|
_get_parse_options(parse_options, &c_parse_options) |
|
|
|
reader = GetResultValue( |
|
CJSONReader.Make(maybe_unbox_memory_pool(memory_pool), |
|
stream, c_read_options, c_parse_options)) |
|
|
|
with nogil: |
|
table = GetResultValue(reader.get().Read()) |
|
|
|
return pyarrow_wrap_table(table) |
|
|
|
|
|
def open_json(input_file, read_options=None, parse_options=None, |
|
MemoryPool memory_pool=None): |
|
""" |
|
Open a streaming reader of JSON data. |
|
|
|
Reading using this function is always single-threaded. |
|
|
|
Parameters |
|
---------- |
|
input_file : string, path or file-like object |
|
The location of JSON data. If a string or path, and if it ends |
|
with a recognized compressed file extension (e.g. ".gz" or ".bz2"), |
|
the data is automatically decompressed when reading. |
|
read_options : pyarrow.json.ReadOptions, optional |
|
Options for the JSON reader (see pyarrow.json.ReadOptions constructor |
|
for defaults) |
|
parse_options : pyarrow.json.ParseOptions, optional |
|
Options for the JSON parser |
|
(see pyarrow.json.ParseOptions constructor for defaults) |
|
memory_pool : MemoryPool, optional |
|
Pool to allocate RecordBatch memory from |
|
|
|
Returns |
|
------- |
|
:class:`pyarrow.json.JSONStreamingReader` |
|
""" |
|
cdef: |
|
shared_ptr[CInputStream] stream |
|
CJSONReadOptions c_read_options |
|
CJSONParseOptions c_parse_options |
|
JSONStreamingReader reader |
|
|
|
_get_reader(input_file, &stream) |
|
_get_read_options(read_options, &c_read_options) |
|
_get_parse_options(parse_options, &c_parse_options) |
|
|
|
reader = JSONStreamingReader.__new__(JSONStreamingReader) |
|
reader._open(stream, move(c_read_options), move(c_parse_options), |
|
memory_pool) |
|
return reader |
|
|