|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from cython.operator cimport dereference as deref |
|
|
|
from collections import namedtuple |
|
from collections.abc import Mapping |
|
|
|
from pyarrow.includes.common cimport * |
|
from pyarrow.includes.libarrow cimport * |
|
from pyarrow.includes.libarrow_python cimport * |
|
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, |
|
RecordBatchReader, ensure_type, |
|
maybe_unbox_memory_pool, get_input_stream, |
|
get_writer, native_transcoding_input_stream, |
|
pyarrow_unwrap_batch, pyarrow_unwrap_schema, |
|
pyarrow_unwrap_table, pyarrow_wrap_schema, |
|
pyarrow_wrap_table, pyarrow_wrap_data_type, |
|
pyarrow_unwrap_data_type, Table, RecordBatch, |
|
StopToken, _CRecordBatchWriter) |
|
from pyarrow.lib import frombytes, tobytes, SignalStopHandler |
|
|
|
|
|
cdef unsigned char _single_char(s) except 0: |
|
val = ord(s) |
|
if val == 0 or val > 127: |
|
raise ValueError("Expecting an ASCII character") |
|
return <unsigned char> val |
|
|
|
|
|
_InvalidRow = namedtuple( |
|
"_InvalidRow", ("expected_columns", "actual_columns", "number", "text"), |
|
module=__name__) |
|
|
|
|
|
class InvalidRow(_InvalidRow): |
|
""" |
|
Description of an invalid row in a CSV file. |
|
|
|
Parameters |
|
---------- |
|
expected_columns : int |
|
The expected number of columns in the row. |
|
actual_columns : int |
|
The actual number of columns in the row. |
|
number : int or None |
|
The physical row number if known, otherwise None. |
|
text : str |
|
The contents of the row. |
|
""" |
|
__slots__ = () |
|
|
|
|
|
cdef CInvalidRowResult _handle_invalid_row( |
|
handler, const CCSVInvalidRow& c_row) except CInvalidRowResult_Error: |
|
|
|
row_number = c_row.number if c_row.number >= 0 else None |
|
row = InvalidRow(c_row.expected_columns, c_row.actual_columns, |
|
row_number, frombytes(<c_string> c_row.text)) |
|
result = handler(row) |
|
if result == 'error': |
|
return CInvalidRowResult_Error |
|
elif result == 'skip': |
|
return CInvalidRowResult_Skip |
|
else: |
|
raise ValueError("Invalid return value for invalid row handler: " |
|
f"expected 'error' or 'skip', got {result!r}") |
|
|
|
|
|
cdef class ReadOptions(_Weakrefable): |
|
""" |
|
Options for reading CSV files. |
|
|
|
Parameters |
|
---------- |
|
use_threads : bool, optional (default True) |
|
Whether to use multiple threads to accelerate reading |
|
block_size : int, optional |
|
How much bytes to process at a time from the input stream. |
|
This will determine multi-threading granularity as well as |
|
the size of individual record batches or table chunks. |
|
Minimum valid value for block size is 1 |
|
skip_rows : int, optional (default 0) |
|
The number of rows to skip before the column names (if any) |
|
and the CSV data. |
|
skip_rows_after_names : int, optional (default 0) |
|
The number of rows to skip after the column names. |
|
This number can be larger than the number of rows in one |
|
block, and empty rows are counted. |
|
The order of application is as follows: |
|
- `skip_rows` is applied (if non-zero); |
|
- column names are read (unless `column_names` is set); |
|
- `skip_rows_after_names` is applied (if non-zero). |
|
column_names : list, optional |
|
The column names of the target table. If empty, fall back on |
|
`autogenerate_column_names`. |
|
autogenerate_column_names : bool, optional (default False) |
|
Whether to autogenerate column names if `column_names` is empty. |
|
If true, column names will be of the form "f0", "f1"... |
|
If false, column names will be read from the first CSV row |
|
after `skip_rows`. |
|
encoding : str, optional (default 'utf8') |
|
The character encoding of the CSV data. Columns that cannot |
|
decode using this encoding can still be read as Binary. |
|
|
|
Examples |
|
-------- |
|
|
|
Defining an example data: |
|
|
|
>>> import io |
|
>>> s = "1,2,3\\nFlamingo,2,2022-03-01\\nHorse,4,2022-03-02\\nBrittle stars,5,2022-03-03\\nCentipede,100,2022-03-04" |
|
>>> print(s) |
|
1,2,3 |
|
Flamingo,2,2022-03-01 |
|
Horse,4,2022-03-02 |
|
Brittle stars,5,2022-03-03 |
|
Centipede,100,2022-03-04 |
|
|
|
Ignore the first numbered row and substitute it with defined |
|
or autogenerated column names: |
|
|
|
>>> from pyarrow import csv |
|
>>> read_options = csv.ReadOptions( |
|
... column_names=["animals", "n_legs", "entry"], |
|
... skip_rows=1) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
entry: date32[day] |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede"]] |
|
n_legs: [[2,4,5,100]] |
|
entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] |
|
|
|
>>> read_options = csv.ReadOptions(autogenerate_column_names=True, |
|
... skip_rows=1) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options) |
|
pyarrow.Table |
|
f0: string |
|
f1: int64 |
|
f2: date32[day] |
|
---- |
|
f0: [["Flamingo","Horse","Brittle stars","Centipede"]] |
|
f1: [[2,4,5,100]] |
|
f2: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] |
|
|
|
Remove the first 2 rows of the data: |
|
|
|
>>> read_options = csv.ReadOptions(skip_rows_after_names=2) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options) |
|
pyarrow.Table |
|
1: string |
|
2: int64 |
|
3: date32[day] |
|
---- |
|
1: [["Brittle stars","Centipede"]] |
|
2: [[5,100]] |
|
3: [[2022-03-03,2022-03-04]] |
|
""" |
|
|
|
|
|
__slots__ = () |
|
|
|
|
|
def __cinit__(self, *argw, **kwargs): |
|
self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults())) |
|
|
|
def __init__(self, *, use_threads=None, block_size=None, skip_rows=None, |
|
skip_rows_after_names=None, column_names=None, |
|
autogenerate_column_names=None, encoding='utf8'): |
|
if use_threads is not None: |
|
self.use_threads = use_threads |
|
if block_size is not None: |
|
self.block_size = block_size |
|
if skip_rows is not None: |
|
self.skip_rows = skip_rows |
|
if skip_rows_after_names is not None: |
|
self.skip_rows_after_names = skip_rows_after_names |
|
if column_names is not None: |
|
self.column_names = column_names |
|
if autogenerate_column_names is not None: |
|
self.autogenerate_column_names= autogenerate_column_names |
|
|
|
self.encoding = encoding |
|
|
|
@property |
|
def use_threads(self): |
|
""" |
|
Whether to use multiple threads to accelerate reading. |
|
""" |
|
return deref(self.options).use_threads |
|
|
|
@use_threads.setter |
|
def use_threads(self, value): |
|
deref(self.options).use_threads = value |
|
|
|
@property |
|
def block_size(self): |
|
""" |
|
How much bytes to process at a time from the input stream. |
|
This will determine multi-threading granularity as well as |
|
the size of individual record batches or table chunks. |
|
""" |
|
return deref(self.options).block_size |
|
|
|
@block_size.setter |
|
def block_size(self, value): |
|
deref(self.options).block_size = value |
|
|
|
@property |
|
def skip_rows(self): |
|
""" |
|
The number of rows to skip before the column names (if any) |
|
and the CSV data. |
|
See `skip_rows_after_names` for interaction description |
|
""" |
|
return deref(self.options).skip_rows |
|
|
|
@skip_rows.setter |
|
def skip_rows(self, value): |
|
deref(self.options).skip_rows = value |
|
|
|
@property |
|
def skip_rows_after_names(self): |
|
""" |
|
The number of rows to skip after the column names. |
|
This number can be larger than the number of rows in one |
|
block, and empty rows are counted. |
|
The order of application is as follows: |
|
- `skip_rows` is applied (if non-zero); |
|
- column names are read (unless `column_names` is set); |
|
- `skip_rows_after_names` is applied (if non-zero). |
|
""" |
|
return deref(self.options).skip_rows_after_names |
|
|
|
@skip_rows_after_names.setter |
|
def skip_rows_after_names(self, value): |
|
deref(self.options).skip_rows_after_names = value |
|
|
|
@property |
|
def column_names(self): |
|
""" |
|
The column names of the target table. If empty, fall back on |
|
`autogenerate_column_names`. |
|
""" |
|
return [frombytes(s) for s in deref(self.options).column_names] |
|
|
|
@column_names.setter |
|
def column_names(self, value): |
|
deref(self.options).column_names.clear() |
|
for item in value: |
|
deref(self.options).column_names.push_back(tobytes(item)) |
|
|
|
@property |
|
def autogenerate_column_names(self): |
|
""" |
|
Whether to autogenerate column names if `column_names` is empty. |
|
If true, column names will be of the form "f0", "f1"... |
|
If false, column names will be read from the first CSV row |
|
after `skip_rows`. |
|
""" |
|
return deref(self.options).autogenerate_column_names |
|
|
|
@autogenerate_column_names.setter |
|
def autogenerate_column_names(self, value): |
|
deref(self.options).autogenerate_column_names = value |
|
|
|
def validate(self): |
|
check_status(deref(self.options).Validate()) |
|
|
|
def equals(self, ReadOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.csv.ReadOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return ( |
|
self.use_threads == other.use_threads and |
|
self.block_size == other.block_size and |
|
self.skip_rows == other.skip_rows and |
|
self.skip_rows_after_names == other.skip_rows_after_names and |
|
self.column_names == other.column_names and |
|
self.autogenerate_column_names == |
|
other.autogenerate_column_names and |
|
self.encoding == other.encoding |
|
) |
|
|
|
@staticmethod |
|
cdef ReadOptions wrap(CCSVReadOptions options): |
|
out = ReadOptions() |
|
out.options.reset(new CCSVReadOptions(move(options))) |
|
out.encoding = 'utf8' |
|
return out |
|
|
|
def __getstate__(self): |
|
return (self.use_threads, self.block_size, self.skip_rows, |
|
self.column_names, self.autogenerate_column_names, |
|
self.encoding, self.skip_rows_after_names) |
|
|
|
def __setstate__(self, state): |
|
(self.use_threads, self.block_size, self.skip_rows, |
|
self.column_names, self.autogenerate_column_names, |
|
self.encoding, self.skip_rows_after_names) = state |
|
|
|
def __eq__(self, other): |
|
try: |
|
return self.equals(other) |
|
except TypeError: |
|
return False |
|
|
|
|
|
cdef class ParseOptions(_Weakrefable): |
|
""" |
|
Options for parsing CSV files. |
|
|
|
Parameters |
|
---------- |
|
delimiter : 1-character string, optional (default ',') |
|
The character delimiting individual cells in the CSV data. |
|
quote_char : 1-character string or False, optional (default '"') |
|
The character used optionally for quoting CSV values |
|
(False if quoting is not allowed). |
|
double_quote : bool, optional (default True) |
|
Whether two quotes in a quoted CSV value denote a single quote |
|
in the data. |
|
escape_char : 1-character string or False, optional (default False) |
|
The character used optionally for escaping special characters |
|
(False if escaping is not allowed). |
|
newlines_in_values : bool, optional (default False) |
|
Whether newline characters are allowed in CSV values. |
|
Setting this to True reduces the performance of multi-threaded |
|
CSV reading. |
|
ignore_empty_lines : bool, optional (default True) |
|
Whether empty lines are ignored in CSV input. |
|
If False, an empty line is interpreted as containing a single empty |
|
value (assuming a one-column CSV file). |
|
invalid_row_handler : callable, optional (default None) |
|
If not None, this object is called for each CSV row that fails |
|
parsing (because of a mismatching number of columns). |
|
It should accept a single InvalidRow argument and return either |
|
"skip" or "error" depending on the desired outcome. |
|
|
|
Examples |
|
-------- |
|
|
|
Defining an example file from bytes object: |
|
|
|
>>> import io |
|
>>> s = ( |
|
... "animals;n_legs;entry\\n" |
|
... "Flamingo;2;2022-03-01\\n" |
|
... "# Comment here:\\n" |
|
... "Horse;4;2022-03-02\\n" |
|
... "Brittle stars;5;2022-03-03\\n" |
|
... "Centipede;100;2022-03-04" |
|
... ) |
|
>>> print(s) |
|
animals;n_legs;entry |
|
Flamingo;2;2022-03-01 |
|
# Comment here: |
|
Horse;4;2022-03-02 |
|
Brittle stars;5;2022-03-03 |
|
Centipede;100;2022-03-04 |
|
>>> source = io.BytesIO(s.encode()) |
|
|
|
Read the data from a file skipping rows with comments |
|
and defining the delimiter: |
|
|
|
>>> from pyarrow import csv |
|
>>> def skip_comment(row): |
|
... if row.text.startswith("# "): |
|
... return 'skip' |
|
... else: |
|
... return 'error' |
|
... |
|
>>> parse_options = csv.ParseOptions(delimiter=";", invalid_row_handler=skip_comment) |
|
>>> csv.read_csv(source, parse_options=parse_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
entry: date32[day] |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede"]] |
|
n_legs: [[2,4,5,100]] |
|
entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] |
|
""" |
|
__slots__ = () |
|
|
|
def __cinit__(self, *argw, **kwargs): |
|
self._invalid_row_handler = None |
|
self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults())) |
|
|
|
def __init__(self, *, delimiter=None, quote_char=None, double_quote=None, |
|
escape_char=None, newlines_in_values=None, |
|
ignore_empty_lines=None, invalid_row_handler=None): |
|
if delimiter is not None: |
|
self.delimiter = delimiter |
|
if quote_char is not None: |
|
self.quote_char = quote_char |
|
if double_quote is not None: |
|
self.double_quote = double_quote |
|
if escape_char is not None: |
|
self.escape_char = escape_char |
|
if newlines_in_values is not None: |
|
self.newlines_in_values = newlines_in_values |
|
if ignore_empty_lines is not None: |
|
self.ignore_empty_lines = ignore_empty_lines |
|
if invalid_row_handler is not None: |
|
self.invalid_row_handler = invalid_row_handler |
|
|
|
@property |
|
def delimiter(self): |
|
""" |
|
The character delimiting individual cells in the CSV data. |
|
""" |
|
return chr(deref(self.options).delimiter) |
|
|
|
@delimiter.setter |
|
def delimiter(self, value): |
|
deref(self.options).delimiter = _single_char(value) |
|
|
|
@property |
|
def quote_char(self): |
|
""" |
|
The character used optionally for quoting CSV values |
|
(False if quoting is not allowed). |
|
""" |
|
if deref(self.options).quoting: |
|
return chr(deref(self.options).quote_char) |
|
else: |
|
return False |
|
|
|
@quote_char.setter |
|
def quote_char(self, value): |
|
if value is False: |
|
deref(self.options).quoting = False |
|
else: |
|
deref(self.options).quote_char = _single_char(value) |
|
deref(self.options).quoting = True |
|
|
|
@property |
|
def double_quote(self): |
|
""" |
|
Whether two quotes in a quoted CSV value denote a single quote |
|
in the data. |
|
""" |
|
return deref(self.options).double_quote |
|
|
|
@double_quote.setter |
|
def double_quote(self, value): |
|
deref(self.options).double_quote = value |
|
|
|
@property |
|
def escape_char(self): |
|
""" |
|
The character used optionally for escaping special characters |
|
(False if escaping is not allowed). |
|
""" |
|
if deref(self.options).escaping: |
|
return chr(deref(self.options).escape_char) |
|
else: |
|
return False |
|
|
|
@escape_char.setter |
|
def escape_char(self, value): |
|
if value is False: |
|
deref(self.options).escaping = False |
|
else: |
|
deref(self.options).escape_char = _single_char(value) |
|
deref(self.options).escaping = True |
|
|
|
@property |
|
def newlines_in_values(self): |
|
""" |
|
Whether newline characters are allowed in CSV values. |
|
Setting this to True reduces the performance of multi-threaded |
|
CSV reading. |
|
""" |
|
return deref(self.options).newlines_in_values |
|
|
|
@newlines_in_values.setter |
|
def newlines_in_values(self, value): |
|
deref(self.options).newlines_in_values = value |
|
|
|
@property |
|
def ignore_empty_lines(self): |
|
""" |
|
Whether empty lines are ignored in CSV input. |
|
If False, an empty line is interpreted as containing a single empty |
|
value (assuming a one-column CSV file). |
|
""" |
|
return deref(self.options).ignore_empty_lines |
|
|
|
@property |
|
def invalid_row_handler(self): |
|
""" |
|
Optional handler for invalid rows. |
|
|
|
If not None, this object is called for each CSV row that fails |
|
parsing (because of a mismatching number of columns). |
|
It should accept a single InvalidRow argument and return either |
|
"skip" or "error" depending on the desired outcome. |
|
""" |
|
return self._invalid_row_handler |
|
|
|
@invalid_row_handler.setter |
|
def invalid_row_handler(self, value): |
|
if value is not None and not callable(value): |
|
raise TypeError("Expected callable or None, " |
|
f"got instance of {type(value)!r}") |
|
self._invalid_row_handler = value |
|
deref(self.options).invalid_row_handler = MakeInvalidRowHandler( |
|
<function[PyInvalidRowCallback]> &_handle_invalid_row, value) |
|
|
|
@ignore_empty_lines.setter |
|
def ignore_empty_lines(self, value): |
|
deref(self.options).ignore_empty_lines = value |
|
|
|
def validate(self): |
|
check_status(deref(self.options).Validate()) |
|
|
|
def equals(self, ParseOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.csv.ParseOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return ( |
|
self.delimiter == other.delimiter and |
|
self.quote_char == other.quote_char and |
|
self.double_quote == other.double_quote and |
|
self.escape_char == other.escape_char and |
|
self.newlines_in_values == other.newlines_in_values and |
|
self.ignore_empty_lines == other.ignore_empty_lines and |
|
self._invalid_row_handler == other._invalid_row_handler |
|
) |
|
|
|
@staticmethod |
|
cdef ParseOptions wrap(CCSVParseOptions options): |
|
out = ParseOptions() |
|
out.options.reset(new CCSVParseOptions(move(options))) |
|
return out |
|
|
|
def __getstate__(self): |
|
return (self.delimiter, self.quote_char, self.double_quote, |
|
self.escape_char, self.newlines_in_values, |
|
self.ignore_empty_lines, self.invalid_row_handler) |
|
|
|
def __setstate__(self, state): |
|
(self.delimiter, self.quote_char, self.double_quote, |
|
self.escape_char, self.newlines_in_values, |
|
self.ignore_empty_lines, self.invalid_row_handler) = state |
|
|
|
def __eq__(self, other): |
|
try: |
|
return self.equals(other) |
|
except TypeError: |
|
return False |
|
|
|
|
|
cdef class _ISO8601(_Weakrefable): |
|
""" |
|
A special object indicating ISO-8601 parsing. |
|
""" |
|
__slots__ = () |
|
|
|
def __str__(self): |
|
return 'ISO8601' |
|
|
|
def __eq__(self, other): |
|
return isinstance(other, _ISO8601) |
|
|
|
|
|
ISO8601 = _ISO8601() |
|
|
|
|
|
cdef class ConvertOptions(_Weakrefable): |
|
""" |
|
Options for converting CSV data. |
|
|
|
Parameters |
|
---------- |
|
check_utf8 : bool, optional (default True) |
|
Whether to check UTF8 validity of string columns. |
|
column_types : pyarrow.Schema or dict, optional |
|
Explicitly map column names to column types. Passing this argument |
|
disables type inference on the defined columns. |
|
null_values : list, optional |
|
A sequence of strings that denote nulls in the data |
|
(defaults are appropriate in most cases). Note that by default, |
|
string columns are not checked for null values. To enable |
|
null checking for those, specify ``strings_can_be_null=True``. |
|
true_values : list, optional |
|
A sequence of strings that denote true booleans in the data |
|
(defaults are appropriate in most cases). |
|
false_values : list, optional |
|
A sequence of strings that denote false booleans in the data |
|
(defaults are appropriate in most cases). |
|
decimal_point : 1-character string, optional (default '.') |
|
The character used as decimal point in floating-point and decimal |
|
data. |
|
strings_can_be_null : bool, optional (default False) |
|
Whether string / binary columns can have null values. |
|
If true, then strings in null_values are considered null for |
|
string columns. |
|
If false, then all strings are valid string values. |
|
quoted_strings_can_be_null : bool, optional (default True) |
|
Whether quoted values can be null. |
|
If true, then strings in "null_values" are also considered null |
|
when they appear quoted in the CSV file. Otherwise, quoted values |
|
are never considered null. |
|
include_columns : list, optional |
|
The names of columns to include in the Table. |
|
If empty, the Table will include all columns from the CSV file. |
|
If not empty, only these columns will be included, in this order. |
|
include_missing_columns : bool, optional (default False) |
|
If false, columns in `include_columns` but not in the CSV file will |
|
error out. |
|
If true, columns in `include_columns` but not in the CSV file will |
|
produce a column of nulls (whose type is selected using |
|
`column_types`, or null by default). |
|
This option is ignored if `include_columns` is empty. |
|
auto_dict_encode : bool, optional (default False) |
|
Whether to try to automatically dict-encode string / binary data. |
|
If true, then when type inference detects a string or binary column, |
|
it it dict-encoded up to `auto_dict_max_cardinality` distinct values |
|
(per chunk), after which it switches to regular encoding. |
|
This setting is ignored for non-inferred columns (those in |
|
`column_types`). |
|
auto_dict_max_cardinality : int, optional |
|
The maximum dictionary cardinality for `auto_dict_encode`. |
|
This value is per chunk. |
|
timestamp_parsers : list, optional |
|
A sequence of strptime()-compatible format strings, tried in order |
|
when attempting to infer or convert timestamp values (the special |
|
value ISO8601() can also be given). By default, a fast built-in |
|
ISO-8601 parser is used. |
|
|
|
Examples |
|
-------- |
|
|
|
Defining an example data: |
|
|
|
>>> import io |
|
>>> s = ( |
|
... "animals,n_legs,entry,fast\\n" |
|
... "Flamingo,2,01/03/2022,Yes\\n" |
|
... "Horse,4,02/03/2022,Yes\\n" |
|
... "Brittle stars,5,03/03/2022,No\\n" |
|
... "Centipede,100,04/03/2022,No\\n" |
|
... ",6,05/03/2022," |
|
... ) |
|
>>> print(s) |
|
animals,n_legs,entry,fast |
|
Flamingo,2,01/03/2022,Yes |
|
Horse,4,02/03/2022,Yes |
|
Brittle stars,5,03/03/2022,No |
|
Centipede,100,04/03/2022,No |
|
,6,05/03/2022, |
|
|
|
Change the type of a column: |
|
|
|
>>> import pyarrow as pa |
|
>>> from pyarrow import csv |
|
>>> convert_options = csv.ConvertOptions(column_types={"n_legs": pa.float64()}) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: double |
|
entry: string |
|
fast: string |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] |
|
n_legs: [[2,4,5,100,6]] |
|
entry: [["01/03/2022","02/03/2022","03/03/2022","04/03/2022","05/03/2022"]] |
|
fast: [["Yes","Yes","No","No",""]] |
|
|
|
Define a date parsing format to get a timestamp type column |
|
(in case dates are not in ISO format and not converted by default): |
|
|
|
>>> convert_options = csv.ConvertOptions( |
|
... timestamp_parsers=["%m/%d/%Y", "%m-%d-%Y"]) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
entry: timestamp[s] |
|
fast: string |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] |
|
n_legs: [[2,4,5,100,6]] |
|
entry: [[2022-01-03 00:00:00,2022-02-03 00:00:00,2022-03-03 00:00:00,2022-04-03 00:00:00,2022-05-03 00:00:00]] |
|
fast: [["Yes","Yes","No","No",""]] |
|
|
|
Specify a subset of columns to be read: |
|
|
|
>>> convert_options = csv.ConvertOptions( |
|
... include_columns=["animals", "n_legs"]) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] |
|
n_legs: [[2,4,5,100,6]] |
|
|
|
List additional column to be included as a null typed column: |
|
|
|
>>> convert_options = csv.ConvertOptions( |
|
... include_columns=["animals", "n_legs", "location"], |
|
... include_missing_columns=True) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
location: null |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] |
|
n_legs: [[2,4,5,100,6]] |
|
location: [5 nulls] |
|
|
|
Define columns as dictionary type (by default only the |
|
string/binary columns are dictionary encoded): |
|
|
|
>>> convert_options = csv.ConvertOptions( |
|
... timestamp_parsers=["%m/%d/%Y", "%m-%d-%Y"], |
|
... auto_dict_encode=True) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: dictionary<values=string, indices=int32, ordered=0> |
|
n_legs: int64 |
|
entry: timestamp[s] |
|
fast: dictionary<values=string, indices=int32, ordered=0> |
|
---- |
|
animals: [ -- dictionary: |
|
["Flamingo","Horse","Brittle stars","Centipede",""] -- indices: |
|
[0,1,2,3,4]] |
|
n_legs: [[2,4,5,100,6]] |
|
entry: [[2022-01-03 00:00:00,2022-02-03 00:00:00,2022-03-03 00:00:00,2022-04-03 00:00:00,2022-05-03 00:00:00]] |
|
fast: [ -- dictionary: |
|
["Yes","No",""] -- indices: |
|
[0,0,1,1,2]] |
|
|
|
Set upper limit for the number of categories. If the categories |
|
is more than the limit, the conversion to dictionary will not |
|
happen: |
|
|
|
>>> convert_options = csv.ConvertOptions( |
|
... include_columns=["animals"], |
|
... auto_dict_encode=True, |
|
... auto_dict_max_cardinality=2) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: string |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede",""]] |
|
|
|
Set empty strings to missing values: |
|
|
|
>>> convert_options = csv.ConvertOptions(include_columns=["animals", "n_legs"], |
|
... strings_can_be_null=True) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede",null]] |
|
n_legs: [[2,4,5,100,6]] |
|
|
|
Define values to be True and False when converting a column |
|
into a bool type: |
|
|
|
>>> convert_options = csv.ConvertOptions( |
|
... include_columns=["fast"], |
|
... false_values=["No"], |
|
... true_values=["Yes"]) |
|
>>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options) |
|
pyarrow.Table |
|
fast: bool |
|
---- |
|
fast: [[true,true,false,false,null]] |
|
""" |
|
|
|
|
|
__slots__ = () |
|
|
|
def __cinit__(self, *argw, **kwargs): |
|
self.options.reset( |
|
new CCSVConvertOptions(CCSVConvertOptions.Defaults())) |
|
|
|
def __init__(self, *, check_utf8=None, column_types=None, null_values=None, |
|
true_values=None, false_values=None, decimal_point=None, |
|
strings_can_be_null=None, quoted_strings_can_be_null=None, |
|
include_columns=None, include_missing_columns=None, |
|
auto_dict_encode=None, auto_dict_max_cardinality=None, |
|
timestamp_parsers=None): |
|
if check_utf8 is not None: |
|
self.check_utf8 = check_utf8 |
|
if column_types is not None: |
|
self.column_types = column_types |
|
if null_values is not None: |
|
self.null_values = null_values |
|
if true_values is not None: |
|
self.true_values = true_values |
|
if false_values is not None: |
|
self.false_values = false_values |
|
if decimal_point is not None: |
|
self.decimal_point = decimal_point |
|
if strings_can_be_null is not None: |
|
self.strings_can_be_null = strings_can_be_null |
|
if quoted_strings_can_be_null is not None: |
|
self.quoted_strings_can_be_null = quoted_strings_can_be_null |
|
if include_columns is not None: |
|
self.include_columns = include_columns |
|
if include_missing_columns is not None: |
|
self.include_missing_columns = include_missing_columns |
|
if auto_dict_encode is not None: |
|
self.auto_dict_encode = auto_dict_encode |
|
if auto_dict_max_cardinality is not None: |
|
self.auto_dict_max_cardinality = auto_dict_max_cardinality |
|
if timestamp_parsers is not None: |
|
self.timestamp_parsers = timestamp_parsers |
|
|
|
@property |
|
def check_utf8(self): |
|
""" |
|
Whether to check UTF8 validity of string columns. |
|
""" |
|
return deref(self.options).check_utf8 |
|
|
|
@check_utf8.setter |
|
def check_utf8(self, value): |
|
deref(self.options).check_utf8 = value |
|
|
|
@property |
|
def strings_can_be_null(self): |
|
""" |
|
Whether string / binary columns can have null values. |
|
""" |
|
return deref(self.options).strings_can_be_null |
|
|
|
@strings_can_be_null.setter |
|
def strings_can_be_null(self, value): |
|
deref(self.options).strings_can_be_null = value |
|
|
|
@property |
|
def quoted_strings_can_be_null(self): |
|
""" |
|
Whether quoted values can be null. |
|
""" |
|
return deref(self.options).quoted_strings_can_be_null |
|
|
|
@quoted_strings_can_be_null.setter |
|
def quoted_strings_can_be_null(self, value): |
|
deref(self.options).quoted_strings_can_be_null = value |
|
|
|
@property |
|
def column_types(self): |
|
""" |
|
Explicitly map column names to column types. |
|
""" |
|
d = {frombytes(item.first): pyarrow_wrap_data_type(item.second) |
|
for item in deref(self.options).column_types} |
|
return d |
|
|
|
@column_types.setter |
|
def column_types(self, value): |
|
cdef: |
|
shared_ptr[CDataType] typ |
|
|
|
if isinstance(value, Mapping): |
|
value = value.items() |
|
|
|
deref(self.options).column_types.clear() |
|
for item in value: |
|
if isinstance(item, Field): |
|
k = item.name |
|
v = item.type |
|
else: |
|
k, v = item |
|
typ = pyarrow_unwrap_data_type(ensure_type(v)) |
|
assert typ != NULL |
|
deref(self.options).column_types[tobytes(k)] = typ |
|
|
|
@property |
|
def null_values(self): |
|
""" |
|
A sequence of strings that denote nulls in the data. |
|
""" |
|
return [frombytes(x) for x in deref(self.options).null_values] |
|
|
|
@null_values.setter |
|
def null_values(self, value): |
|
deref(self.options).null_values = [tobytes(x) for x in value] |
|
|
|
@property |
|
def true_values(self): |
|
""" |
|
A sequence of strings that denote true booleans in the data. |
|
""" |
|
return [frombytes(x) for x in deref(self.options).true_values] |
|
|
|
@true_values.setter |
|
def true_values(self, value): |
|
deref(self.options).true_values = [tobytes(x) for x in value] |
|
|
|
@property |
|
def false_values(self): |
|
""" |
|
A sequence of strings that denote false booleans in the data. |
|
""" |
|
return [frombytes(x) for x in deref(self.options).false_values] |
|
|
|
@false_values.setter |
|
def false_values(self, value): |
|
deref(self.options).false_values = [tobytes(x) for x in value] |
|
|
|
@property |
|
def decimal_point(self): |
|
""" |
|
The character used as decimal point in floating-point and decimal |
|
data. |
|
""" |
|
return chr(deref(self.options).decimal_point) |
|
|
|
@decimal_point.setter |
|
def decimal_point(self, value): |
|
deref(self.options).decimal_point = _single_char(value) |
|
|
|
@property |
|
def auto_dict_encode(self): |
|
""" |
|
Whether to try to automatically dict-encode string / binary data. |
|
""" |
|
return deref(self.options).auto_dict_encode |
|
|
|
@auto_dict_encode.setter |
|
def auto_dict_encode(self, value): |
|
deref(self.options).auto_dict_encode = value |
|
|
|
@property |
|
def auto_dict_max_cardinality(self): |
|
""" |
|
The maximum dictionary cardinality for `auto_dict_encode`. |
|
|
|
This value is per chunk. |
|
""" |
|
return deref(self.options).auto_dict_max_cardinality |
|
|
|
@auto_dict_max_cardinality.setter |
|
def auto_dict_max_cardinality(self, value): |
|
deref(self.options).auto_dict_max_cardinality = value |
|
|
|
@property |
|
def include_columns(self): |
|
""" |
|
The names of columns to include in the Table. |
|
|
|
If empty, the Table will include all columns from the CSV file. |
|
If not empty, only these columns will be included, in this order. |
|
""" |
|
return [frombytes(s) for s in deref(self.options).include_columns] |
|
|
|
@include_columns.setter |
|
def include_columns(self, value): |
|
deref(self.options).include_columns.clear() |
|
for item in value: |
|
deref(self.options).include_columns.push_back(tobytes(item)) |
|
|
|
@property |
|
def include_missing_columns(self): |
|
""" |
|
If false, columns in `include_columns` but not in the CSV file will |
|
error out. |
|
If true, columns in `include_columns` but not in the CSV file will |
|
produce a null column (whose type is selected using `column_types`, |
|
or null by default). |
|
This option is ignored if `include_columns` is empty. |
|
""" |
|
return deref(self.options).include_missing_columns |
|
|
|
@include_missing_columns.setter |
|
def include_missing_columns(self, value): |
|
deref(self.options).include_missing_columns = value |
|
|
|
@property |
|
def timestamp_parsers(self): |
|
""" |
|
A sequence of strptime()-compatible format strings, tried in order |
|
when attempting to infer or convert timestamp values (the special |
|
value ISO8601() can also be given). By default, a fast built-in |
|
ISO-8601 parser is used. |
|
""" |
|
cdef: |
|
shared_ptr[CTimestampParser] c_parser |
|
c_string kind |
|
|
|
parsers = [] |
|
for c_parser in deref(self.options).timestamp_parsers: |
|
kind = deref(c_parser).kind() |
|
if kind == b'strptime': |
|
parsers.append(frombytes(deref(c_parser).format())) |
|
else: |
|
assert kind == b'iso8601' |
|
parsers.append(ISO8601) |
|
|
|
return parsers |
|
|
|
@timestamp_parsers.setter |
|
def timestamp_parsers(self, value): |
|
cdef: |
|
vector[shared_ptr[CTimestampParser]] c_parsers |
|
|
|
for v in value: |
|
if isinstance(v, str): |
|
c_parsers.push_back(CTimestampParser.MakeStrptime(tobytes(v))) |
|
elif v == ISO8601: |
|
c_parsers.push_back(CTimestampParser.MakeISO8601()) |
|
else: |
|
raise TypeError("Expected list of str or ISO8601 objects") |
|
|
|
deref(self.options).timestamp_parsers = move(c_parsers) |
|
|
|
@staticmethod |
|
cdef ConvertOptions wrap(CCSVConvertOptions options): |
|
out = ConvertOptions() |
|
out.options.reset(new CCSVConvertOptions(move(options))) |
|
return out |
|
|
|
def validate(self): |
|
check_status(deref(self.options).Validate()) |
|
|
|
def equals(self, ConvertOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.csv.ConvertOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return ( |
|
self.check_utf8 == other.check_utf8 and |
|
self.column_types == other.column_types and |
|
self.null_values == other.null_values and |
|
self.true_values == other.true_values and |
|
self.false_values == other.false_values and |
|
self.decimal_point == other.decimal_point and |
|
self.timestamp_parsers == other.timestamp_parsers and |
|
self.strings_can_be_null == other.strings_can_be_null and |
|
self.quoted_strings_can_be_null == |
|
other.quoted_strings_can_be_null and |
|
self.auto_dict_encode == other.auto_dict_encode and |
|
self.auto_dict_max_cardinality == |
|
other.auto_dict_max_cardinality and |
|
self.include_columns == other.include_columns and |
|
self.include_missing_columns == other.include_missing_columns |
|
) |
|
|
|
def __getstate__(self): |
|
return (self.check_utf8, self.column_types, self.null_values, |
|
self.true_values, self.false_values, self.decimal_point, |
|
self.timestamp_parsers, self.strings_can_be_null, |
|
self.quoted_strings_can_be_null, self.auto_dict_encode, |
|
self.auto_dict_max_cardinality, self.include_columns, |
|
self.include_missing_columns) |
|
|
|
def __setstate__(self, state): |
|
(self.check_utf8, self.column_types, self.null_values, |
|
self.true_values, self.false_values, self.decimal_point, |
|
self.timestamp_parsers, self.strings_can_be_null, |
|
self.quoted_strings_can_be_null, self.auto_dict_encode, |
|
self.auto_dict_max_cardinality, self.include_columns, |
|
self.include_missing_columns) = state |
|
|
|
def __eq__(self, other): |
|
try: |
|
return self.equals(other) |
|
except TypeError: |
|
return False |
|
|
|
|
|
cdef _get_reader(input_file, ReadOptions read_options, |
|
shared_ptr[CInputStream]* out): |
|
use_memory_map = False |
|
get_input_stream(input_file, use_memory_map, out) |
|
if read_options is not None: |
|
out[0] = native_transcoding_input_stream(out[0], |
|
read_options.encoding, |
|
'utf8') |
|
|
|
|
|
cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out): |
|
if read_options is None: |
|
out[0] = CCSVReadOptions.Defaults() |
|
else: |
|
out[0] = deref(read_options.options) |
|
|
|
|
|
cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out): |
|
if parse_options is None: |
|
out[0] = CCSVParseOptions.Defaults() |
|
else: |
|
out[0] = deref(parse_options.options) |
|
|
|
|
|
cdef _get_convert_options(ConvertOptions convert_options, |
|
CCSVConvertOptions* out): |
|
if convert_options is None: |
|
out[0] = CCSVConvertOptions.Defaults() |
|
else: |
|
out[0] = deref(convert_options.options) |
|
|
|
|
|
cdef class CSVStreamingReader(RecordBatchReader): |
|
"""An object that reads record batches incrementally from a CSV file. |
|
|
|
Should not be instantiated directly by user code. |
|
""" |
|
cdef readonly: |
|
Schema schema |
|
|
|
def __init__(self): |
|
raise TypeError("Do not call {}'s constructor directly, " |
|
"use pyarrow.csv.open_csv() instead." |
|
.format(self.__class__.__name__)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cdef _open(self, shared_ptr[CInputStream] stream, |
|
CCSVReadOptions c_read_options, |
|
CCSVParseOptions c_parse_options, |
|
CCSVConvertOptions c_convert_options, |
|
MemoryPool memory_pool): |
|
cdef: |
|
shared_ptr[CSchema] c_schema |
|
CIOContext io_context |
|
|
|
io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) |
|
|
|
with nogil: |
|
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue( |
|
CCSVStreamingReader.Make( |
|
io_context, stream, |
|
move(c_read_options), move(c_parse_options), |
|
move(c_convert_options))) |
|
c_schema = self.reader.get().schema() |
|
|
|
self.schema = pyarrow_wrap_schema(c_schema) |
|
|
|
|
|
def read_csv(input_file, read_options=None, parse_options=None, |
|
convert_options=None, MemoryPool memory_pool=None): |
|
""" |
|
Read a Table from a stream of CSV data. |
|
|
|
Parameters |
|
---------- |
|
input_file : string, path or file-like object |
|
The location of CSV data. If a string or path, and if it ends |
|
with a recognized compressed file extension (e.g. ".gz" or ".bz2"), |
|
the data is automatically decompressed when reading. |
|
read_options : pyarrow.csv.ReadOptions, optional |
|
Options for the CSV reader (see pyarrow.csv.ReadOptions constructor |
|
for defaults) |
|
parse_options : pyarrow.csv.ParseOptions, optional |
|
Options for the CSV parser |
|
(see pyarrow.csv.ParseOptions constructor for defaults) |
|
convert_options : pyarrow.csv.ConvertOptions, optional |
|
Options for converting CSV data |
|
(see pyarrow.csv.ConvertOptions constructor for defaults) |
|
memory_pool : MemoryPool, optional |
|
Pool to allocate Table memory from |
|
|
|
Returns |
|
------- |
|
:class:`pyarrow.Table` |
|
Contents of the CSV file as a in-memory table. |
|
|
|
Examples |
|
-------- |
|
|
|
Defining an example file from bytes object: |
|
|
|
>>> import io |
|
>>> s = ( |
|
... "animals,n_legs,entry\\n" |
|
... "Flamingo,2,2022-03-01\\n" |
|
... "Horse,4,2022-03-02\\n" |
|
... "Brittle stars,5,2022-03-03\\n" |
|
... "Centipede,100,2022-03-04" |
|
... ) |
|
>>> print(s) |
|
animals,n_legs,entry |
|
Flamingo,2,2022-03-01 |
|
Horse,4,2022-03-02 |
|
Brittle stars,5,2022-03-03 |
|
Centipede,100,2022-03-04 |
|
>>> source = io.BytesIO(s.encode()) |
|
|
|
Reading from the file |
|
|
|
>>> from pyarrow import csv |
|
>>> csv.read_csv(source) |
|
pyarrow.Table |
|
animals: string |
|
n_legs: int64 |
|
entry: date32[day] |
|
---- |
|
animals: [["Flamingo","Horse","Brittle stars","Centipede"]] |
|
n_legs: [[2,4,5,100]] |
|
entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]] |
|
""" |
|
cdef: |
|
shared_ptr[CInputStream] stream |
|
CCSVReadOptions c_read_options |
|
CCSVParseOptions c_parse_options |
|
CCSVConvertOptions c_convert_options |
|
CIOContext io_context |
|
SharedPtrNoGIL[CCSVReader] reader |
|
shared_ptr[CTable] table |
|
|
|
_get_reader(input_file, read_options, &stream) |
|
_get_read_options(read_options, &c_read_options) |
|
_get_parse_options(parse_options, &c_parse_options) |
|
_get_convert_options(convert_options, &c_convert_options) |
|
|
|
with SignalStopHandler() as stop_handler: |
|
io_context = CIOContext( |
|
maybe_unbox_memory_pool(memory_pool), |
|
(<StopToken> stop_handler.stop_token).stop_token) |
|
reader = GetResultValue(CCSVReader.Make( |
|
io_context, stream, |
|
c_read_options, c_parse_options, c_convert_options)) |
|
|
|
with nogil: |
|
table = GetResultValue(reader.get().Read()) |
|
|
|
return pyarrow_wrap_table(table) |
|
|
|
|
|
def open_csv(input_file, read_options=None, parse_options=None, |
|
convert_options=None, MemoryPool memory_pool=None): |
|
""" |
|
Open a streaming reader of CSV data. |
|
|
|
Reading using this function is always single-threaded. |
|
|
|
Parameters |
|
---------- |
|
input_file : string, path or file-like object |
|
The location of CSV data. If a string or path, and if it ends |
|
with a recognized compressed file extension (e.g. ".gz" or ".bz2"), |
|
the data is automatically decompressed when reading. |
|
read_options : pyarrow.csv.ReadOptions, optional |
|
Options for the CSV reader (see pyarrow.csv.ReadOptions constructor |
|
for defaults) |
|
parse_options : pyarrow.csv.ParseOptions, optional |
|
Options for the CSV parser |
|
(see pyarrow.csv.ParseOptions constructor for defaults) |
|
convert_options : pyarrow.csv.ConvertOptions, optional |
|
Options for converting CSV data |
|
(see pyarrow.csv.ConvertOptions constructor for defaults) |
|
memory_pool : MemoryPool, optional |
|
Pool to allocate RecordBatch memory from |
|
|
|
Returns |
|
------- |
|
:class:`pyarrow.csv.CSVStreamingReader` |
|
""" |
|
cdef: |
|
shared_ptr[CInputStream] stream |
|
CCSVReadOptions c_read_options |
|
CCSVParseOptions c_parse_options |
|
CCSVConvertOptions c_convert_options |
|
CSVStreamingReader reader |
|
|
|
_get_reader(input_file, read_options, &stream) |
|
_get_read_options(read_options, &c_read_options) |
|
_get_parse_options(parse_options, &c_parse_options) |
|
_get_convert_options(convert_options, &c_convert_options) |
|
|
|
reader = CSVStreamingReader.__new__(CSVStreamingReader) |
|
reader._open(stream, move(c_read_options), move(c_parse_options), |
|
move(c_convert_options), memory_pool) |
|
return reader |
|
|
|
|
|
def _raise_invalid_function_option(value, description, *, |
|
exception_class=ValueError): |
|
raise exception_class(f"\"{value}\" is not a valid {description}") |
|
|
|
|
|
cdef CQuotingStyle unwrap_quoting_style(quoting_style) except *: |
|
if quoting_style == "needed": |
|
return CQuotingStyle_Needed |
|
elif quoting_style == "all_valid": |
|
return CQuotingStyle_AllValid |
|
elif quoting_style == "none": |
|
return CQuotingStyle_None |
|
_raise_invalid_function_option(quoting_style, "quoting style") |
|
|
|
|
|
cdef wrap_quoting_style(quoting_style): |
|
if quoting_style == CQuotingStyle_Needed: |
|
return 'needed' |
|
elif quoting_style == CQuotingStyle_AllValid: |
|
return 'all_valid' |
|
elif quoting_style == CQuotingStyle_None: |
|
return 'none' |
|
|
|
|
|
cdef class WriteOptions(_Weakrefable): |
|
""" |
|
Options for writing CSV files. |
|
|
|
Parameters |
|
---------- |
|
include_header : bool, optional (default True) |
|
Whether to write an initial header line with column names |
|
batch_size : int, optional (default 1024) |
|
How many rows to process together when converting and writing |
|
CSV data |
|
delimiter : 1-character string, optional (default ",") |
|
The character delimiting individual cells in the CSV data. |
|
quoting_style : str, optional (default "needed") |
|
Whether to quote values, and if so, which quoting style to use. |
|
The following values are accepted: |
|
|
|
- "needed" (default): only enclose values in quotes when needed. |
|
- "all_valid": enclose all valid values in quotes; nulls are not quoted. |
|
- "none": do not enclose any values in quotes; values containing |
|
special characters (such as quotes, cell delimiters or line endings) |
|
will raise an error. |
|
""" |
|
|
|
|
|
__slots__ = () |
|
|
|
def __init__(self, *, include_header=None, batch_size=None, |
|
delimiter=None, quoting_style=None): |
|
self.options.reset(new CCSVWriteOptions(CCSVWriteOptions.Defaults())) |
|
if include_header is not None: |
|
self.include_header = include_header |
|
if batch_size is not None: |
|
self.batch_size = batch_size |
|
if delimiter is not None: |
|
self.delimiter = delimiter |
|
if quoting_style is not None: |
|
self.quoting_style = quoting_style |
|
|
|
@property |
|
def include_header(self): |
|
""" |
|
Whether to write an initial header line with column names. |
|
""" |
|
return deref(self.options).include_header |
|
|
|
@include_header.setter |
|
def include_header(self, value): |
|
deref(self.options).include_header = value |
|
|
|
@property |
|
def batch_size(self): |
|
""" |
|
How many rows to process together when converting and writing |
|
CSV data. |
|
""" |
|
return deref(self.options).batch_size |
|
|
|
@batch_size.setter |
|
def batch_size(self, value): |
|
deref(self.options).batch_size = value |
|
|
|
@property |
|
def delimiter(self): |
|
""" |
|
The character delimiting individual cells in the CSV data. |
|
""" |
|
return chr(deref(self.options).delimiter) |
|
|
|
@delimiter.setter |
|
def delimiter(self, value): |
|
deref(self.options).delimiter = _single_char(value) |
|
|
|
@property |
|
def quoting_style(self): |
|
""" |
|
Whether to quote values, and if so, which quoting style to use. |
|
The following values are accepted: |
|
|
|
- "needed" (default): only enclose values in quotes when needed. |
|
- "all_valid": enclose all valid values in quotes; nulls are not quoted. |
|
- "none": do not enclose any values in quotes; values containing |
|
special characters (such as quotes, cell delimiters or line endings) |
|
will raise an error. |
|
""" |
|
return wrap_quoting_style(deref(self.options).quoting_style) |
|
|
|
@quoting_style.setter |
|
def quoting_style(self, value): |
|
deref(self.options).quoting_style = unwrap_quoting_style(value) |
|
|
|
@staticmethod |
|
cdef WriteOptions wrap(CCSVWriteOptions options): |
|
out = WriteOptions() |
|
out.options.reset(new CCSVWriteOptions(move(options))) |
|
return out |
|
|
|
def validate(self): |
|
check_status(self.options.get().Validate()) |
|
|
|
|
|
cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): |
|
if write_options is None: |
|
out[0] = CCSVWriteOptions.Defaults() |
|
else: |
|
out[0] = deref(write_options.options) |
|
|
|
|
|
def write_csv(data, output_file, write_options=None, |
|
MemoryPool memory_pool=None): |
|
""" |
|
Write record batch or table to a CSV file. |
|
|
|
Parameters |
|
---------- |
|
data : pyarrow.RecordBatch or pyarrow.Table |
|
The data to write. |
|
output_file : string, path, pyarrow.NativeFile, or file-like object |
|
The location where to write the CSV data. |
|
write_options : pyarrow.csv.WriteOptions |
|
Options to configure writing the CSV data. |
|
memory_pool : MemoryPool, optional |
|
Pool for temporary allocations. |
|
|
|
Examples |
|
-------- |
|
|
|
>>> import pyarrow as pa |
|
>>> from pyarrow import csv |
|
|
|
>>> legs = pa.array([2, 4, 5, 100]) |
|
>>> animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"]) |
|
>>> entry_date = pa.array(["01/03/2022", "02/03/2022", |
|
... "03/03/2022", "04/03/2022"]) |
|
>>> table = pa.table([animals, legs, entry_date], |
|
... names=["animals", "n_legs", "entry"]) |
|
|
|
>>> csv.write_csv(table, "animals.csv") |
|
|
|
>>> write_options = csv.WriteOptions(include_header=False) |
|
>>> csv.write_csv(table, "animals.csv", write_options=write_options) |
|
|
|
>>> write_options = csv.WriteOptions(delimiter=";") |
|
>>> csv.write_csv(table, "animals.csv", write_options=write_options) |
|
""" |
|
cdef: |
|
shared_ptr[COutputStream] stream |
|
CCSVWriteOptions c_write_options |
|
CMemoryPool* c_memory_pool |
|
CRecordBatch* batch |
|
CTable* table |
|
_get_write_options(write_options, &c_write_options) |
|
|
|
get_writer(output_file, &stream) |
|
c_memory_pool = maybe_unbox_memory_pool(memory_pool) |
|
c_write_options.io_context = CIOContext(c_memory_pool) |
|
if isinstance(data, RecordBatch): |
|
batch = pyarrow_unwrap_batch(data).get() |
|
with nogil: |
|
check_status(WriteCSV(deref(batch), c_write_options, stream.get())) |
|
elif isinstance(data, Table): |
|
table = pyarrow_unwrap_table(data).get() |
|
with nogil: |
|
check_status(WriteCSV(deref(table), c_write_options, stream.get())) |
|
else: |
|
raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'") |
|
|
|
|
|
cdef class CSVWriter(_CRecordBatchWriter): |
|
""" |
|
Writer to create a CSV file. |
|
|
|
Parameters |
|
---------- |
|
sink : str, path, pyarrow.OutputStream or file-like object |
|
The location where to write the CSV data. |
|
schema : pyarrow.Schema |
|
The schema of the data to be written. |
|
write_options : pyarrow.csv.WriteOptions |
|
Options to configure writing the CSV data. |
|
memory_pool : MemoryPool, optional |
|
Pool for temporary allocations. |
|
""" |
|
|
|
def __init__(self, sink, Schema schema, *, |
|
WriteOptions write_options=None, MemoryPool memory_pool=None): |
|
cdef: |
|
shared_ptr[COutputStream] c_stream |
|
shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) |
|
CCSVWriteOptions c_write_options |
|
CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool) |
|
_get_write_options(write_options, &c_write_options) |
|
c_write_options.io_context = CIOContext(c_memory_pool) |
|
get_writer(sink, &c_stream) |
|
with nogil: |
|
self.writer = GetResultValue(MakeCSVWriter( |
|
c_stream, c_schema, c_write_options)) |
|
|