|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from datetime import timedelta |
|
|
|
from cython.operator cimport dereference as deref |
|
|
|
from pyarrow.includes.common cimport * |
|
from pyarrow.includes.libarrow cimport * |
|
from pyarrow.lib cimport _Weakrefable |
|
from pyarrow.lib import tobytes, frombytes |
|
|
|
|
|
cdef ParquetCipher cipher_from_name(name): |
|
name = name.upper() |
|
if name == 'AES_GCM_V1': |
|
return ParquetCipher_AES_GCM_V1 |
|
elif name == 'AES_GCM_CTR_V1': |
|
return ParquetCipher_AES_GCM_CTR_V1 |
|
else: |
|
raise ValueError(f'Invalid cipher name: {name!r}') |
|
|
|
|
|
cdef cipher_to_name(ParquetCipher cipher): |
|
if ParquetCipher_AES_GCM_V1 == cipher: |
|
return 'AES_GCM_V1' |
|
elif ParquetCipher_AES_GCM_CTR_V1 == cipher: |
|
return 'AES_GCM_CTR_V1' |
|
else: |
|
raise ValueError('Invalid cipher value: {0}'.format(cipher)) |
|
|
|
cdef class EncryptionConfiguration(_Weakrefable): |
|
"""Configuration of the encryption, such as which columns to encrypt""" |
|
|
|
__slots__ = () |
|
|
|
def __init__(self, footer_key, *, column_keys=None, |
|
encryption_algorithm=None, |
|
plaintext_footer=None, double_wrapping=None, |
|
cache_lifetime=None, internal_key_material=None, |
|
data_key_length_bits=None): |
|
self.configuration.reset( |
|
new CEncryptionConfiguration(tobytes(footer_key))) |
|
if column_keys is not None: |
|
self.column_keys = column_keys |
|
if encryption_algorithm is not None: |
|
self.encryption_algorithm = encryption_algorithm |
|
if plaintext_footer is not None: |
|
self.plaintext_footer = plaintext_footer |
|
if double_wrapping is not None: |
|
self.double_wrapping = double_wrapping |
|
if cache_lifetime is not None: |
|
self.cache_lifetime = cache_lifetime |
|
if internal_key_material is not None: |
|
self.internal_key_material = internal_key_material |
|
if data_key_length_bits is not None: |
|
self.data_key_length_bits = data_key_length_bits |
|
|
|
@property |
|
def footer_key(self): |
|
"""ID of the master key for footer encryption/signing""" |
|
return frombytes(self.configuration.get().footer_key) |
|
|
|
@property |
|
def column_keys(self): |
|
""" |
|
List of columns to encrypt, with master key IDs. |
|
""" |
|
column_keys_str = frombytes(self.configuration.get().column_keys) |
|
|
|
|
|
column_keys_to_key_list_str = dict(subString.replace(" ", "").split( |
|
":") for subString in column_keys_str.split(";")) |
|
column_keys_dict = {k: v.split( |
|
",") for k, v in column_keys_to_key_list_str.items()} |
|
return column_keys_dict |
|
|
|
@column_keys.setter |
|
def column_keys(self, dict value): |
|
if value is not None: |
|
|
|
|
|
|
|
|
|
column_keys = "; ".join( |
|
["{}: {}".format(k, ", ".join(v)) for k, v in value.items()]) |
|
self.configuration.get().column_keys = tobytes(column_keys) |
|
|
|
@property |
|
def encryption_algorithm(self): |
|
"""Parquet encryption algorithm. |
|
Can be "AES_GCM_V1" (default), or "AES_GCM_CTR_V1".""" |
|
return cipher_to_name(self.configuration.get().encryption_algorithm) |
|
|
|
@encryption_algorithm.setter |
|
def encryption_algorithm(self, value): |
|
cipher = cipher_from_name(value) |
|
self.configuration.get().encryption_algorithm = cipher |
|
|
|
@property |
|
def plaintext_footer(self): |
|
"""Write files with plaintext footer.""" |
|
return self.configuration.get().plaintext_footer |
|
|
|
@plaintext_footer.setter |
|
def plaintext_footer(self, value): |
|
self.configuration.get().plaintext_footer = value |
|
|
|
@property |
|
def double_wrapping(self): |
|
"""Use double wrapping - where data encryption keys (DEKs) are |
|
encrypted with key encryption keys (KEKs), which in turn are |
|
encrypted with master keys. |
|
If set to false, use single wrapping - where DEKs are |
|
encrypted directly with master keys.""" |
|
return self.configuration.get().double_wrapping |
|
|
|
@double_wrapping.setter |
|
def double_wrapping(self, value): |
|
self.configuration.get().double_wrapping = value |
|
|
|
@property |
|
def cache_lifetime(self): |
|
"""Lifetime of cached entities (key encryption keys, |
|
local wrapping keys, KMS client objects).""" |
|
return timedelta( |
|
seconds=self.configuration.get().cache_lifetime_seconds) |
|
|
|
@cache_lifetime.setter |
|
def cache_lifetime(self, value): |
|
if not isinstance(value, timedelta): |
|
raise TypeError("cache_lifetime should be a timedelta") |
|
self.configuration.get().cache_lifetime_seconds = value.total_seconds() |
|
|
|
@property |
|
def internal_key_material(self): |
|
"""Store key material inside Parquet file footers; this mode doesnβt |
|
produce additional files. If set to false, key material is stored in |
|
separate files in the same folder, which enables key rotation for |
|
immutable Parquet files.""" |
|
return self.configuration.get().internal_key_material |
|
|
|
@internal_key_material.setter |
|
def internal_key_material(self, value): |
|
self.configuration.get().internal_key_material = value |
|
|
|
@property |
|
def data_key_length_bits(self): |
|
"""Length of data encryption keys (DEKs), randomly generated by parquet key |
|
management tools. Can be 128, 192 or 256 bits.""" |
|
return self.configuration.get().data_key_length_bits |
|
|
|
@data_key_length_bits.setter |
|
def data_key_length_bits(self, value): |
|
self.configuration.get().data_key_length_bits = value |
|
|
|
cdef inline shared_ptr[CEncryptionConfiguration] unwrap(self) nogil: |
|
return self.configuration |
|
|
|
|
|
cdef class DecryptionConfiguration(_Weakrefable): |
|
"""Configuration of the decryption, such as cache timeout.""" |
|
|
|
__slots__ = () |
|
|
|
def __init__(self, *, cache_lifetime=None): |
|
self.configuration.reset(new CDecryptionConfiguration()) |
|
|
|
@property |
|
def cache_lifetime(self): |
|
"""Lifetime of cached entities (key encryption keys, |
|
local wrapping keys, KMS client objects).""" |
|
return timedelta( |
|
seconds=self.configuration.get().cache_lifetime_seconds) |
|
|
|
@cache_lifetime.setter |
|
def cache_lifetime(self, value): |
|
self.configuration.get().cache_lifetime_seconds = value.total_seconds() |
|
|
|
cdef inline shared_ptr[CDecryptionConfiguration] unwrap(self) nogil: |
|
return self.configuration |
|
|
|
|
|
cdef class KmsConnectionConfig(_Weakrefable): |
|
"""Configuration of the connection to the Key Management Service (KMS)""" |
|
|
|
__slots__ = () |
|
|
|
def __init__(self, *, kms_instance_id=None, kms_instance_url=None, |
|
key_access_token=None, custom_kms_conf=None): |
|
self.configuration.reset(new CKmsConnectionConfig()) |
|
if kms_instance_id is not None: |
|
self.kms_instance_id = kms_instance_id |
|
if kms_instance_url is not None: |
|
self.kms_instance_url = kms_instance_url |
|
if key_access_token is None: |
|
self.key_access_token = b'DEFAULT' |
|
else: |
|
self.key_access_token = key_access_token |
|
if custom_kms_conf is not None: |
|
self.custom_kms_conf = custom_kms_conf |
|
|
|
@property |
|
def kms_instance_id(self): |
|
"""ID of the KMS instance that will be used for encryption |
|
(if multiple KMS instances are available).""" |
|
return frombytes(self.configuration.get().kms_instance_id) |
|
|
|
@kms_instance_id.setter |
|
def kms_instance_id(self, value): |
|
self.configuration.get().kms_instance_id = tobytes(value) |
|
|
|
@property |
|
def kms_instance_url(self): |
|
"""URL of the KMS instance.""" |
|
return frombytes(self.configuration.get().kms_instance_url) |
|
|
|
@kms_instance_url.setter |
|
def kms_instance_url(self, value): |
|
self.configuration.get().kms_instance_url = tobytes(value) |
|
|
|
@property |
|
def key_access_token(self): |
|
"""Authorization token that will be passed to KMS.""" |
|
return frombytes(self.configuration.get() |
|
.refreshable_key_access_token.get().value()) |
|
|
|
@key_access_token.setter |
|
def key_access_token(self, value): |
|
self.refresh_key_access_token(value) |
|
|
|
@property |
|
def custom_kms_conf(self): |
|
"""A dictionary with KMS-type-specific configuration""" |
|
custom_kms_conf = { |
|
frombytes(k): frombytes(v) |
|
for k, v in self.configuration.get().custom_kms_conf |
|
} |
|
return custom_kms_conf |
|
|
|
@custom_kms_conf.setter |
|
def custom_kms_conf(self, dict value): |
|
if value is not None: |
|
for k, v in value.items(): |
|
if isinstance(k, str) and isinstance(v, str): |
|
self.configuration.get().custom_kms_conf[tobytes(k)] = \ |
|
tobytes(v) |
|
else: |
|
raise TypeError("Expected custom_kms_conf to be " + |
|
"a dictionary of strings") |
|
|
|
def refresh_key_access_token(self, value): |
|
cdef: |
|
shared_ptr[CKeyAccessToken] c_key_access_token = \ |
|
self.configuration.get().refreshable_key_access_token |
|
|
|
c_key_access_token.get().Refresh(tobytes(value)) |
|
|
|
cdef inline shared_ptr[CKmsConnectionConfig] unwrap(self) nogil: |
|
return self.configuration |
|
|
|
@staticmethod |
|
cdef wrap(const CKmsConnectionConfig& config): |
|
result = KmsConnectionConfig() |
|
result.configuration = make_shared[CKmsConnectionConfig](move(config)) |
|
return result |
|
|
|
|
|
|
|
cdef void _cb_wrap_key( |
|
handler, const c_string& key_bytes, |
|
const c_string& master_key_identifier, c_string* out) except *: |
|
mkid_str = frombytes(master_key_identifier) |
|
wrapped_key = handler.wrap_key(key_bytes, mkid_str) |
|
out[0] = tobytes(wrapped_key) |
|
|
|
|
|
cdef void _cb_unwrap_key( |
|
handler, const c_string& wrapped_key, |
|
const c_string& master_key_identifier, c_string* out) except *: |
|
mkid_str = frombytes(master_key_identifier) |
|
wk_str = frombytes(wrapped_key) |
|
key = handler.unwrap_key(wk_str, mkid_str) |
|
out[0] = tobytes(key) |
|
|
|
|
|
cdef class KmsClient(_Weakrefable): |
|
"""The abstract base class for KmsClient implementations.""" |
|
cdef: |
|
shared_ptr[CKmsClient] client |
|
|
|
def __init__(self): |
|
self.init() |
|
|
|
cdef init(self): |
|
cdef: |
|
CPyKmsClientVtable vtable = CPyKmsClientVtable() |
|
|
|
vtable.wrap_key = _cb_wrap_key |
|
vtable.unwrap_key = _cb_unwrap_key |
|
|
|
self.client.reset(new CPyKmsClient(self, vtable)) |
|
|
|
def wrap_key(self, key_bytes, master_key_identifier): |
|
"""Wrap a key - encrypt it with the master key.""" |
|
raise NotImplementedError() |
|
|
|
def unwrap_key(self, wrapped_key, master_key_identifier): |
|
"""Unwrap a key - decrypt it with the master key.""" |
|
raise NotImplementedError() |
|
|
|
cdef inline shared_ptr[CKmsClient] unwrap(self) nogil: |
|
return self.client |
|
|
|
|
|
|
|
cdef void _cb_create_kms_client( |
|
handler, |
|
const CKmsConnectionConfig& kms_connection_config, |
|
shared_ptr[CKmsClient]* out) except *: |
|
connection_config = KmsConnectionConfig.wrap(kms_connection_config) |
|
|
|
result = handler(connection_config) |
|
if not isinstance(result, KmsClient): |
|
raise TypeError( |
|
"callable must return KmsClient instances, but got {}".format( |
|
type(result))) |
|
|
|
out[0] = (<KmsClient> result).unwrap() |
|
|
|
|
|
cdef class CryptoFactory(_Weakrefable): |
|
""" A factory that produces the low-level FileEncryptionProperties and |
|
FileDecryptionProperties objects, from the high-level parameters.""" |
|
|
|
__slots__ = () |
|
|
|
def __init__(self, kms_client_factory): |
|
"""Create CryptoFactory. |
|
|
|
Parameters |
|
---------- |
|
kms_client_factory : a callable that accepts KmsConnectionConfig |
|
and returns a KmsClient |
|
""" |
|
self.factory.reset(new CPyCryptoFactory()) |
|
|
|
if callable(kms_client_factory): |
|
self.init(kms_client_factory) |
|
else: |
|
raise TypeError("Parameter kms_client_factory must be a callable") |
|
|
|
cdef init(self, callable_client_factory): |
|
cdef: |
|
CPyKmsClientFactoryVtable vtable |
|
shared_ptr[CPyKmsClientFactory] kms_client_factory |
|
|
|
vtable.create_kms_client = _cb_create_kms_client |
|
kms_client_factory.reset( |
|
new CPyKmsClientFactory(callable_client_factory, vtable)) |
|
|
|
|
|
|
|
self.factory.get().RegisterKmsClientFactory( |
|
static_pointer_cast[CKmsClientFactory, CPyKmsClientFactory]( |
|
kms_client_factory)) |
|
|
|
def file_encryption_properties(self, |
|
KmsConnectionConfig kms_connection_config, |
|
EncryptionConfiguration encryption_config): |
|
"""Create file encryption properties. |
|
|
|
Parameters |
|
---------- |
|
kms_connection_config : KmsConnectionConfig |
|
Configuration of connection to KMS |
|
|
|
encryption_config : EncryptionConfiguration |
|
Configuration of the encryption, such as which columns to encrypt |
|
|
|
Returns |
|
------- |
|
file_encryption_properties : FileEncryptionProperties |
|
File encryption properties. |
|
""" |
|
cdef: |
|
CResult[shared_ptr[CFileEncryptionProperties]] \ |
|
file_encryption_properties_result |
|
with nogil: |
|
file_encryption_properties_result = \ |
|
self.factory.get().SafeGetFileEncryptionProperties( |
|
deref(kms_connection_config.unwrap().get()), |
|
deref(encryption_config.unwrap().get())) |
|
file_encryption_properties = GetResultValue( |
|
file_encryption_properties_result) |
|
return FileEncryptionProperties.wrap(file_encryption_properties) |
|
|
|
def file_decryption_properties( |
|
self, |
|
KmsConnectionConfig kms_connection_config, |
|
DecryptionConfiguration decryption_config=None): |
|
"""Create file decryption properties. |
|
|
|
Parameters |
|
---------- |
|
kms_connection_config : KmsConnectionConfig |
|
Configuration of connection to KMS |
|
|
|
decryption_config : DecryptionConfiguration, default None |
|
Configuration of the decryption, such as cache timeout. |
|
Can be None. |
|
|
|
Returns |
|
------- |
|
file_decryption_properties : FileDecryptionProperties |
|
File decryption properties. |
|
""" |
|
cdef: |
|
CDecryptionConfiguration c_decryption_config |
|
CResult[shared_ptr[CFileDecryptionProperties]] \ |
|
c_file_decryption_properties |
|
if decryption_config is None: |
|
c_decryption_config = CDecryptionConfiguration() |
|
else: |
|
c_decryption_config = deref(decryption_config.unwrap().get()) |
|
with nogil: |
|
c_file_decryption_properties = \ |
|
self.factory.get().SafeGetFileDecryptionProperties( |
|
deref(kms_connection_config.unwrap().get()), |
|
c_decryption_config) |
|
file_decryption_properties = GetResultValue( |
|
c_file_decryption_properties) |
|
return FileDecryptionProperties.wrap(file_decryption_properties) |
|
|
|
def remove_cache_entries_for_token(self, access_token): |
|
self.factory.get().RemoveCacheEntriesForToken(tobytes(access_token)) |
|
|
|
def remove_cache_entries_for_all_tokens(self): |
|
self.factory.get().RemoveCacheEntriesForAllTokens() |
|
|
|
cdef inline shared_ptr[CPyCryptoFactory] unwrap(self): |
|
return self.factory |
|
|
|
|
|
cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *: |
|
if isinstance(crypto_factory, CryptoFactory): |
|
pycf = (<CryptoFactory> crypto_factory).unwrap() |
|
return static_pointer_cast[CCryptoFactory, CPyCryptoFactory](pycf) |
|
raise TypeError("Expected CryptoFactory, got %s" % type(crypto_factory)) |
|
|
|
|
|
cdef shared_ptr[CKmsConnectionConfig] pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *: |
|
if isinstance(kmsconnectionconfig, KmsConnectionConfig): |
|
return (<KmsConnectionConfig> kmsconnectionconfig).unwrap() |
|
raise TypeError("Expected KmsConnectionConfig, got %s" % type(kmsconnectionconfig)) |
|
|
|
|
|
cdef shared_ptr[CEncryptionConfiguration] pyarrow_unwrap_encryptionconfig(object encryptionconfig) except *: |
|
if isinstance(encryptionconfig, EncryptionConfiguration): |
|
return (<EncryptionConfiguration> encryptionconfig).unwrap() |
|
raise TypeError("Expected EncryptionConfiguration, got %s" % type(encryptionconfig)) |
|
|
|
|
|
cdef shared_ptr[CDecryptionConfiguration] pyarrow_unwrap_decryptionconfig(object decryptionconfig) except *: |
|
if isinstance(decryptionconfig, DecryptionConfiguration): |
|
return (<DecryptionConfiguration> decryptionconfig).unwrap() |
|
raise TypeError("Expected DecryptionConfiguration, got %s" % type(decryptionconfig)) |
|
|