|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import pytest |
|
from datetime import timedelta |
|
|
|
import pyarrow as pa |
|
try: |
|
import pyarrow.parquet as pq |
|
import pyarrow.parquet.encryption as pe |
|
except ImportError: |
|
pq = None |
|
pe = None |
|
else: |
|
from pyarrow.tests.parquet.encryption import ( |
|
InMemoryKmsClient, verify_file_encrypted) |
|
|
|
|
|
PARQUET_NAME = 'encrypted_table.in_mem.parquet' |
|
FOOTER_KEY = b"0123456789112345" |
|
FOOTER_KEY_NAME = "footer_key" |
|
COL_KEY = b"1234567890123450" |
|
COL_KEY_NAME = "col_key" |
|
|
|
|
|
|
|
|
|
|
|
pytestmark = [ |
|
pytest.mark.parquet_encryption, |
|
pytest.mark.parquet |
|
] |
|
|
|
|
|
@pytest.fixture(scope='module') |
|
def data_table(): |
|
data_table = pa.Table.from_pydict({ |
|
'a': pa.array([1, 2, 3]), |
|
'b': pa.array(['a', 'b', 'c']), |
|
'c': pa.array(['x', 'y', 'z']) |
|
}) |
|
return data_table |
|
|
|
|
|
@pytest.fixture(scope='module') |
|
def basic_encryption_config(): |
|
basic_encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME, |
|
column_keys={ |
|
COL_KEY_NAME: ["a", "b"], |
|
}) |
|
return basic_encryption_config |
|
|
|
|
|
def setup_encryption_environment(custom_kms_conf): |
|
""" |
|
Sets up and returns the KMS connection configuration and crypto factory |
|
based on provided KMS configuration parameters. |
|
""" |
|
kms_connection_config = pe.KmsConnectionConfig(custom_kms_conf=custom_kms_conf) |
|
|
|
def kms_factory(kms_connection_configuration): |
|
return InMemoryKmsClient(kms_connection_configuration) |
|
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
|
|
return kms_connection_config, crypto_factory |
|
|
|
|
|
def write_encrypted_file(path, data_table, footer_key_name, col_key_name, |
|
footer_key, col_key, encryption_config): |
|
""" |
|
Writes an encrypted parquet file based on the provided parameters. |
|
""" |
|
|
|
custom_kms_conf = { |
|
footer_key_name: footer_key.decode("UTF-8"), |
|
col_key_name: col_key.decode("UTF-8"), |
|
} |
|
|
|
|
|
kms_connection_config, crypto_factory = setup_encryption_environment( |
|
custom_kms_conf) |
|
|
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
return kms_connection_config, crypto_factory |
|
|
|
|
|
def test_encrypted_parquet_write_read(tempdir, data_table): |
|
"""Write an encrypted parquet, verify it's encrypted, and then read it.""" |
|
path = tempdir / PARQUET_NAME |
|
|
|
|
|
|
|
|
|
encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME, |
|
column_keys={ |
|
COL_KEY_NAME: ["a", "b"], |
|
}, |
|
encryption_algorithm="AES_GCM_V1", |
|
cache_lifetime=timedelta(minutes=5.0), |
|
data_key_length_bits=256) |
|
|
|
kms_connection_config, crypto_factory = write_encrypted_file( |
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY, |
|
encryption_config) |
|
|
|
verify_file_encrypted(path) |
|
|
|
|
|
decryption_config = pe.DecryptionConfiguration( |
|
cache_lifetime=timedelta(minutes=5.0)) |
|
result_table = read_encrypted_parquet( |
|
path, decryption_config, kms_connection_config, crypto_factory) |
|
assert data_table.equals(result_table) |
|
|
|
|
|
def write_encrypted_parquet(path, table, encryption_config, |
|
kms_connection_config, crypto_factory): |
|
file_encryption_properties = crypto_factory.file_encryption_properties( |
|
kms_connection_config, encryption_config) |
|
assert file_encryption_properties is not None |
|
with pq.ParquetWriter( |
|
path, table.schema, |
|
encryption_properties=file_encryption_properties) as writer: |
|
writer.write_table(table) |
|
|
|
|
|
def read_encrypted_parquet(path, decryption_config, |
|
kms_connection_config, crypto_factory): |
|
file_decryption_properties = crypto_factory.file_decryption_properties( |
|
kms_connection_config, decryption_config) |
|
assert file_decryption_properties is not None |
|
meta = pq.read_metadata( |
|
path, decryption_properties=file_decryption_properties) |
|
assert meta.num_columns == 3 |
|
schema = pq.read_schema( |
|
path, decryption_properties=file_decryption_properties) |
|
assert len(schema.names) == 3 |
|
|
|
result = pq.ParquetFile( |
|
path, decryption_properties=file_decryption_properties) |
|
return result.read(use_threads=True) |
|
|
|
|
|
def test_encrypted_parquet_write_read_wrong_key(tempdir, data_table): |
|
"""Write an encrypted parquet, verify it's encrypted, |
|
and then read it using wrong keys.""" |
|
path = tempdir / PARQUET_NAME |
|
|
|
|
|
|
|
|
|
encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME, |
|
column_keys={ |
|
COL_KEY_NAME: ["a", "b"], |
|
}, |
|
encryption_algorithm="AES_GCM_V1", |
|
cache_lifetime=timedelta(minutes=5.0), |
|
data_key_length_bits=256) |
|
|
|
write_encrypted_file(path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, |
|
FOOTER_KEY, COL_KEY, encryption_config) |
|
|
|
verify_file_encrypted(path) |
|
|
|
wrong_kms_connection_config, wrong_crypto_factory = setup_encryption_environment({ |
|
FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"), |
|
COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"), |
|
}) |
|
|
|
decryption_config = pe.DecryptionConfiguration( |
|
cache_lifetime=timedelta(minutes=5.0)) |
|
with pytest.raises(ValueError, match=r"Incorrect master key used"): |
|
read_encrypted_parquet( |
|
path, decryption_config, wrong_kms_connection_config, |
|
wrong_crypto_factory) |
|
|
|
|
|
def test_encrypted_parquet_read_no_decryption_config(tempdir, data_table): |
|
"""Write an encrypted parquet, verify it's encrypted, |
|
but then try to read it without decryption properties.""" |
|
test_encrypted_parquet_write_read(tempdir, data_table) |
|
|
|
with pytest.raises(IOError, match=r"no decryption"): |
|
pq.ParquetFile(tempdir / PARQUET_NAME).read() |
|
|
|
|
|
def test_encrypted_parquet_read_metadata_no_decryption_config( |
|
tempdir, data_table): |
|
"""Write an encrypted parquet, verify it's encrypted, |
|
but then try to read its metadata without decryption properties.""" |
|
test_encrypted_parquet_write_read(tempdir, data_table) |
|
|
|
with pytest.raises(IOError, match=r"no decryption"): |
|
pq.read_metadata(tempdir / PARQUET_NAME) |
|
|
|
|
|
def test_encrypted_parquet_read_schema_no_decryption_config( |
|
tempdir, data_table): |
|
"""Write an encrypted parquet, verify it's encrypted, |
|
but then try to read its schema without decryption properties.""" |
|
test_encrypted_parquet_write_read(tempdir, data_table) |
|
with pytest.raises(IOError, match=r"no decryption"): |
|
pq.read_schema(tempdir / PARQUET_NAME) |
|
|
|
|
|
def test_encrypted_parquet_write_no_col_key(tempdir, data_table): |
|
"""Write an encrypted parquet, but give only footer key, |
|
without column key.""" |
|
path = tempdir / 'encrypted_table_no_col_key.in_mem.parquet' |
|
|
|
|
|
encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME) |
|
|
|
with pytest.raises(OSError, |
|
match="Either column_keys or uniform_encryption " |
|
"must be set"): |
|
|
|
write_encrypted_file(path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, |
|
FOOTER_KEY, b"", encryption_config) |
|
|
|
|
|
def test_encrypted_parquet_write_kms_error(tempdir, data_table, |
|
basic_encryption_config): |
|
"""Write an encrypted parquet, but raise KeyError in KmsClient.""" |
|
path = tempdir / 'encrypted_table_kms_error.in_mem.parquet' |
|
encryption_config = basic_encryption_config |
|
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig() |
|
|
|
def kms_factory(kms_connection_configuration): |
|
|
|
|
|
return InMemoryKmsClient(kms_connection_configuration) |
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
with pytest.raises(KeyError, match="footer_key"): |
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
|
|
def test_encrypted_parquet_write_kms_specific_error(tempdir, data_table, |
|
basic_encryption_config): |
|
"""Write an encrypted parquet, but raise KeyError in KmsClient.""" |
|
path = tempdir / 'encrypted_table_kms_error.in_mem.parquet' |
|
encryption_config = basic_encryption_config |
|
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig() |
|
|
|
class ThrowingKmsClient(pe.KmsClient): |
|
"""A KmsClient implementation that throws exception in |
|
wrap/unwrap calls |
|
""" |
|
|
|
def __init__(self, config): |
|
"""Create an InMemoryKmsClient instance.""" |
|
pe.KmsClient.__init__(self) |
|
self.config = config |
|
|
|
def wrap_key(self, key_bytes, master_key_identifier): |
|
raise ValueError("Cannot Wrap Key") |
|
|
|
def unwrap_key(self, wrapped_key, master_key_identifier): |
|
raise ValueError("Cannot Unwrap Key") |
|
|
|
def kms_factory(kms_connection_configuration): |
|
|
|
return ThrowingKmsClient(kms_connection_configuration) |
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
with pytest.raises(ValueError, match="Cannot Wrap Key"): |
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
|
|
def test_encrypted_parquet_write_kms_factory_error(tempdir, data_table, |
|
basic_encryption_config): |
|
"""Write an encrypted parquet, but raise ValueError in kms_factory.""" |
|
path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet' |
|
encryption_config = basic_encryption_config |
|
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig() |
|
|
|
def kms_factory(kms_connection_configuration): |
|
raise ValueError('Cannot create KmsClient') |
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
with pytest.raises(ValueError, |
|
match="Cannot create KmsClient"): |
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
|
|
def test_encrypted_parquet_write_kms_factory_type_error( |
|
tempdir, data_table, basic_encryption_config): |
|
"""Write an encrypted parquet, but use wrong KMS client type |
|
that doesn't implement KmsClient.""" |
|
path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet' |
|
encryption_config = basic_encryption_config |
|
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig() |
|
|
|
class WrongTypeKmsClient(): |
|
"""This is not an implementation of KmsClient. |
|
""" |
|
|
|
def __init__(self, config): |
|
self.master_keys_map = config.custom_kms_conf |
|
|
|
def wrap_key(self, key_bytes, master_key_identifier): |
|
return None |
|
|
|
def unwrap_key(self, wrapped_key, master_key_identifier): |
|
return None |
|
|
|
def kms_factory(kms_connection_configuration): |
|
return WrongTypeKmsClient(kms_connection_configuration) |
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
with pytest.raises(TypeError): |
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
|
|
def test_encrypted_parquet_encryption_configuration(): |
|
def validate_encryption_configuration(encryption_config): |
|
assert FOOTER_KEY_NAME == encryption_config.footer_key |
|
assert ["a", "b"] == encryption_config.column_keys[COL_KEY_NAME] |
|
assert "AES_GCM_CTR_V1" == encryption_config.encryption_algorithm |
|
assert encryption_config.plaintext_footer |
|
assert not encryption_config.double_wrapping |
|
assert timedelta(minutes=10.0) == encryption_config.cache_lifetime |
|
assert not encryption_config.internal_key_material |
|
assert 192 == encryption_config.data_key_length_bits |
|
|
|
encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME, |
|
column_keys={COL_KEY_NAME: ["a", "b"], }, |
|
encryption_algorithm="AES_GCM_CTR_V1", |
|
plaintext_footer=True, |
|
double_wrapping=False, |
|
cache_lifetime=timedelta(minutes=10.0), |
|
internal_key_material=False, |
|
data_key_length_bits=192, |
|
) |
|
validate_encryption_configuration(encryption_config) |
|
|
|
encryption_config_1 = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME) |
|
encryption_config_1.column_keys = {COL_KEY_NAME: ["a", "b"], } |
|
encryption_config_1.encryption_algorithm = "AES_GCM_CTR_V1" |
|
encryption_config_1.plaintext_footer = True |
|
encryption_config_1.double_wrapping = False |
|
encryption_config_1.cache_lifetime = timedelta(minutes=10.0) |
|
encryption_config_1.internal_key_material = False |
|
encryption_config_1.data_key_length_bits = 192 |
|
validate_encryption_configuration(encryption_config_1) |
|
|
|
|
|
def test_encrypted_parquet_decryption_configuration(): |
|
decryption_config = pe.DecryptionConfiguration( |
|
cache_lifetime=timedelta(minutes=10.0)) |
|
assert timedelta(minutes=10.0) == decryption_config.cache_lifetime |
|
|
|
decryption_config_1 = pe.DecryptionConfiguration() |
|
decryption_config_1.cache_lifetime = timedelta(minutes=10.0) |
|
assert timedelta(minutes=10.0) == decryption_config_1.cache_lifetime |
|
|
|
|
|
def test_encrypted_parquet_kms_configuration(): |
|
def validate_kms_connection_config(kms_connection_config): |
|
assert "Instance1" == kms_connection_config.kms_instance_id |
|
assert "URL1" == kms_connection_config.kms_instance_url |
|
assert "MyToken" == kms_connection_config.key_access_token |
|
assert ({"key1": "key_material_1", "key2": "key_material_2"} == |
|
kms_connection_config.custom_kms_conf) |
|
|
|
kms_connection_config = pe.KmsConnectionConfig( |
|
kms_instance_id="Instance1", |
|
kms_instance_url="URL1", |
|
key_access_token="MyToken", |
|
custom_kms_conf={ |
|
"key1": "key_material_1", |
|
"key2": "key_material_2", |
|
}) |
|
validate_kms_connection_config(kms_connection_config) |
|
|
|
kms_connection_config_1 = pe.KmsConnectionConfig() |
|
kms_connection_config_1.kms_instance_id = "Instance1" |
|
kms_connection_config_1.kms_instance_url = "URL1" |
|
kms_connection_config_1.key_access_token = "MyToken" |
|
kms_connection_config_1.custom_kms_conf = { |
|
"key1": "key_material_1", |
|
"key2": "key_material_2", |
|
} |
|
validate_kms_connection_config(kms_connection_config_1) |
|
|
|
|
|
@pytest.mark.xfail(reason="Plaintext footer - reading plaintext column subset" |
|
" reads encrypted columns too") |
|
def test_encrypted_parquet_write_read_plain_footer_single_wrapping( |
|
tempdir, data_table): |
|
"""Write an encrypted parquet, with plaintext footer |
|
and with single wrapping, |
|
verify it's encrypted, and then read plaintext columns.""" |
|
path = tempdir / PARQUET_NAME |
|
|
|
|
|
|
|
|
|
encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME, |
|
column_keys={ |
|
COL_KEY_NAME: ["a", "b"], |
|
}, |
|
plaintext_footer=True, |
|
double_wrapping=False) |
|
|
|
kms_connection_config = pe.KmsConnectionConfig( |
|
custom_kms_conf={ |
|
FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"), |
|
COL_KEY_NAME: COL_KEY.decode("UTF-8"), |
|
} |
|
) |
|
|
|
def kms_factory(kms_connection_configuration): |
|
return InMemoryKmsClient(kms_connection_configuration) |
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(reason="External key material not supported yet") |
|
def test_encrypted_parquet_write_external(tempdir, data_table): |
|
"""Write an encrypted parquet, with external key |
|
material. |
|
Currently it's not implemented, so should throw |
|
an exception""" |
|
path = tempdir / PARQUET_NAME |
|
|
|
|
|
encryption_config = pe.EncryptionConfiguration( |
|
footer_key=FOOTER_KEY_NAME, |
|
column_keys={}, |
|
internal_key_material=False) |
|
|
|
kms_connection_config = pe.KmsConnectionConfig( |
|
custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")} |
|
) |
|
|
|
def kms_factory(kms_connection_configuration): |
|
return InMemoryKmsClient(kms_connection_configuration) |
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory) |
|
|
|
write_encrypted_parquet(path, data_table, encryption_config, |
|
kms_connection_config, crypto_factory) |
|
|
|
|
|
def test_encrypted_parquet_loop(tempdir, data_table, basic_encryption_config): |
|
"""Write an encrypted parquet, verify it's encrypted, |
|
and then read it multithreaded in a loop.""" |
|
path = tempdir / PARQUET_NAME |
|
|
|
|
|
|
|
|
|
kms_connection_config, crypto_factory = write_encrypted_file( |
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY, |
|
basic_encryption_config) |
|
|
|
verify_file_encrypted(path) |
|
|
|
decryption_config = pe.DecryptionConfiguration( |
|
cache_lifetime=timedelta(minutes=5.0)) |
|
|
|
for i in range(50): |
|
|
|
file_decryption_properties = crypto_factory.file_decryption_properties( |
|
kms_connection_config, decryption_config) |
|
assert file_decryption_properties is not None |
|
|
|
result = pq.ParquetFile( |
|
path, decryption_properties=file_decryption_properties) |
|
result_table = result.read(use_threads=True) |
|
assert data_table.equals(result_table) |
|
|
|
|
|
def test_read_with_deleted_crypto_factory(tempdir, data_table, basic_encryption_config): |
|
""" |
|
Test that decryption properties can be used if the crypto factory is no longer alive |
|
""" |
|
path = tempdir / PARQUET_NAME |
|
kms_connection_config, crypto_factory = write_encrypted_file( |
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY, |
|
basic_encryption_config) |
|
verify_file_encrypted(path) |
|
|
|
|
|
|
|
decryption_config = pe.DecryptionConfiguration( |
|
cache_lifetime=timedelta(minutes=5.0)) |
|
file_decryption_properties = crypto_factory.file_decryption_properties( |
|
kms_connection_config, decryption_config) |
|
del crypto_factory |
|
|
|
result = pq.ParquetFile( |
|
path, decryption_properties=file_decryption_properties) |
|
result_table = result.read(use_threads=True) |
|
assert data_table.equals(result_table) |
|
|
|
|
|
def test_encrypted_parquet_read_table(tempdir, data_table, basic_encryption_config): |
|
"""Write an encrypted parquet then read it back using read_table.""" |
|
path = tempdir / PARQUET_NAME |
|
|
|
|
|
kms_connection_config, crypto_factory = write_encrypted_file( |
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY, |
|
basic_encryption_config) |
|
|
|
decryption_config = pe.DecryptionConfiguration( |
|
cache_lifetime=timedelta(minutes=5.0)) |
|
file_decryption_properties = crypto_factory.file_decryption_properties( |
|
kms_connection_config, decryption_config) |
|
|
|
|
|
result_table = pq.read_table(path, decryption_properties=file_decryption_properties) |
|
|
|
|
|
assert data_table.equals(result_table) |
|
|
|
|
|
result_table = pq.read_table( |
|
tempdir, decryption_properties=file_decryption_properties) |
|
assert data_table.equals(result_table) |
|
|