|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from datetime import datetime, timezone, timedelta |
|
import gzip |
|
import os |
|
import pathlib |
|
from urllib.request import urlopen |
|
import subprocess |
|
import sys |
|
import time |
|
|
|
import pytest |
|
import weakref |
|
|
|
import pyarrow as pa |
|
from pyarrow.tests.test_io import assert_file_not_found |
|
from pyarrow.tests.util import (_filesystem_uri, ProxyHandler, |
|
_configure_s3_limited_user, |
|
running_on_musllinux) |
|
|
|
from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, |
|
LocalFileSystem, SubTreeFileSystem, _MockFileSystem, |
|
FileSystemHandler, PyFileSystem, FSSpecHandler, |
|
copy_files) |
|
from pyarrow.util import find_free_port |
|
|
|
|
|
here = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
class DummyHandler(FileSystemHandler): |
|
def __init__(self, value=42): |
|
self._value = value |
|
|
|
def __eq__(self, other): |
|
if isinstance(other, FileSystemHandler): |
|
return self._value == other._value |
|
return NotImplemented |
|
|
|
def __ne__(self, other): |
|
if isinstance(other, FileSystemHandler): |
|
return self._value != other._value |
|
return NotImplemented |
|
|
|
def get_type_name(self): |
|
return "dummy" |
|
|
|
def normalize_path(self, path): |
|
return path |
|
|
|
def get_file_info(self, paths): |
|
info = [] |
|
for path in paths: |
|
if "file" in path: |
|
info.append(FileInfo(path, FileType.File)) |
|
elif "dir" in path: |
|
info.append(FileInfo(path, FileType.Directory)) |
|
elif "notfound" in path: |
|
info.append(FileInfo(path, FileType.NotFound)) |
|
elif "badtype" in path: |
|
|
|
info.append(object()) |
|
else: |
|
raise IOError |
|
return info |
|
|
|
def get_file_info_selector(self, selector): |
|
if selector.base_dir != "somedir": |
|
if selector.allow_not_found: |
|
return [] |
|
else: |
|
raise FileNotFoundError(selector.base_dir) |
|
infos = [ |
|
FileInfo("somedir/file1", FileType.File, size=123), |
|
FileInfo("somedir/subdir1", FileType.Directory), |
|
] |
|
if selector.recursive: |
|
infos += [ |
|
FileInfo("somedir/subdir1/file2", FileType.File, size=456), |
|
] |
|
return infos |
|
|
|
def create_dir(self, path, recursive): |
|
if path == "recursive": |
|
assert recursive is True |
|
elif path == "non-recursive": |
|
assert recursive is False |
|
else: |
|
raise IOError |
|
|
|
def delete_dir(self, path): |
|
assert path == "delete_dir" |
|
|
|
def delete_dir_contents(self, path, missing_dir_ok): |
|
if not path.strip("/"): |
|
raise ValueError |
|
assert path == "delete_dir_contents" |
|
|
|
def delete_root_dir_contents(self): |
|
pass |
|
|
|
def delete_file(self, path): |
|
assert path == "delete_file" |
|
|
|
def move(self, src, dest): |
|
assert src == "move_from" |
|
assert dest == "move_to" |
|
|
|
def copy_file(self, src, dest): |
|
assert src == "copy_file_from" |
|
assert dest == "copy_file_to" |
|
|
|
def open_input_stream(self, path): |
|
if "notfound" in path: |
|
raise FileNotFoundError(path) |
|
data = "{0}:input_stream".format(path).encode('utf8') |
|
return pa.BufferReader(data) |
|
|
|
def open_input_file(self, path): |
|
if "notfound" in path: |
|
raise FileNotFoundError(path) |
|
data = "{0}:input_file".format(path).encode('utf8') |
|
return pa.BufferReader(data) |
|
|
|
def open_output_stream(self, path, metadata): |
|
if "notfound" in path: |
|
raise FileNotFoundError(path) |
|
return pa.BufferOutputStream() |
|
|
|
def open_append_stream(self, path, metadata): |
|
if "notfound" in path: |
|
raise FileNotFoundError(path) |
|
return pa.BufferOutputStream() |
|
|
|
|
|
@pytest.fixture |
|
def localfs(request, tempdir): |
|
return dict( |
|
fs=LocalFileSystem(), |
|
pathfn=lambda p: (tempdir / p).as_posix(), |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def py_localfs(request, tempdir): |
|
return dict( |
|
fs=PyFileSystem(ProxyHandler(LocalFileSystem())), |
|
pathfn=lambda p: (tempdir / p).as_posix(), |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def mockfs(request): |
|
return dict( |
|
fs=_MockFileSystem(), |
|
pathfn=lambda p: p, |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def py_mockfs(request): |
|
return dict( |
|
fs=PyFileSystem(ProxyHandler(_MockFileSystem())), |
|
pathfn=lambda p: p, |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def localfs_with_mmap(request, tempdir): |
|
return dict( |
|
fs=LocalFileSystem(use_mmap=True), |
|
pathfn=lambda p: (tempdir / p).as_posix(), |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def subtree_localfs(request, tempdir, localfs): |
|
return dict( |
|
fs=SubTreeFileSystem(str(tempdir), localfs['fs']), |
|
pathfn=lambda p: p, |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def gcsfs(request, gcs_server): |
|
request.config.pyarrow.requires('gcs') |
|
from pyarrow.fs import GcsFileSystem |
|
|
|
host, port = gcs_server['connection'] |
|
bucket = 'pyarrow-filesystem/' |
|
|
|
fs = GcsFileSystem( |
|
endpoint_override=f'{host}:{port}', |
|
scheme='http', |
|
|
|
anonymous=True, |
|
retry_time_limit=timedelta(seconds=45), |
|
project_id='test-project-id' |
|
) |
|
try: |
|
fs.create_dir(bucket) |
|
except OSError as e: |
|
pytest.skip(f"Could not create directory in {fs}: {e}") |
|
|
|
yield dict( |
|
fs=fs, |
|
pathfn=bucket.__add__, |
|
allow_move_dir=False, |
|
allow_append_to_file=False, |
|
) |
|
fs.delete_dir(bucket) |
|
|
|
|
|
@pytest.fixture |
|
def s3fs(request, s3_server): |
|
request.config.pyarrow.requires('s3') |
|
from pyarrow.fs import S3FileSystem |
|
|
|
host, port, access_key, secret_key = s3_server['connection'] |
|
bucket = 'pyarrow-filesystem/' |
|
|
|
fs = S3FileSystem( |
|
access_key=access_key, |
|
secret_key=secret_key, |
|
endpoint_override='{}:{}'.format(host, port), |
|
scheme='http', |
|
allow_bucket_creation=True, |
|
allow_bucket_deletion=True |
|
) |
|
fs.create_dir(bucket) |
|
|
|
yield dict( |
|
fs=fs, |
|
pathfn=bucket.__add__, |
|
allow_move_dir=False, |
|
allow_append_to_file=False, |
|
) |
|
fs.delete_dir(bucket) |
|
|
|
|
|
@pytest.fixture |
|
def subtree_s3fs(request, s3fs): |
|
prefix = 'pyarrow-filesystem/prefix/' |
|
return dict( |
|
fs=SubTreeFileSystem(prefix, s3fs['fs']), |
|
pathfn=prefix.__add__, |
|
allow_move_dir=False, |
|
allow_append_to_file=False, |
|
) |
|
|
|
|
|
_minio_limited_policy = """{ |
|
"Version": "2012-10-17", |
|
"Statement": [ |
|
{ |
|
"Effect": "Allow", |
|
"Action": [ |
|
"s3:ListAllMyBuckets", |
|
"s3:PutObject", |
|
"s3:GetObject", |
|
"s3:ListBucket", |
|
"s3:PutObjectTagging", |
|
"s3:DeleteObject", |
|
"s3:GetObjectVersion" |
|
], |
|
"Resource": [ |
|
"arn:aws:s3:::*" |
|
] |
|
} |
|
] |
|
}""" |
|
|
|
|
|
@pytest.fixture |
|
def azurefs(request, azure_server): |
|
request.config.pyarrow.requires('azure') |
|
from pyarrow.fs import AzureFileSystem |
|
|
|
host, port, account_name, account_key = azure_server['connection'] |
|
azurite_authority = f"{host}:{port}" |
|
azurite_scheme = "http" |
|
|
|
container = 'pyarrow-filesystem/' |
|
|
|
fs = AzureFileSystem(account_name=account_name, |
|
account_key=account_key, |
|
blob_storage_authority=azurite_authority, |
|
dfs_storage_authority=azurite_authority, |
|
blob_storage_scheme=azurite_scheme, |
|
dfs_storage_scheme=azurite_scheme) |
|
|
|
fs.create_dir(container) |
|
|
|
yield dict( |
|
fs=fs, |
|
pathfn=container.__add__, |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
fs.delete_dir(container) |
|
|
|
|
|
@pytest.fixture |
|
def hdfs(request, hdfs_connection): |
|
request.config.pyarrow.requires('hdfs') |
|
if not pa.have_libhdfs(): |
|
pytest.skip('Cannot locate libhdfs') |
|
|
|
from pyarrow.fs import HadoopFileSystem |
|
|
|
host, port, user = hdfs_connection |
|
fs = HadoopFileSystem(host, port=port, user=user) |
|
|
|
return dict( |
|
fs=fs, |
|
pathfn=lambda p: p, |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def py_fsspec_localfs(request, tempdir): |
|
fsspec = pytest.importorskip("fsspec") |
|
fs = fsspec.filesystem('file') |
|
return dict( |
|
fs=PyFileSystem(FSSpecHandler(fs)), |
|
pathfn=lambda p: (tempdir / p).as_posix(), |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def py_fsspec_memoryfs(request, tempdir): |
|
fsspec = pytest.importorskip("fsspec", minversion="0.7.5") |
|
if fsspec.__version__ == "0.8.5": |
|
|
|
pytest.skip("Bug in fsspec 0.8.5 for in-memory filesystem") |
|
fs = fsspec.filesystem('memory') |
|
return dict( |
|
fs=PyFileSystem(FSSpecHandler(fs)), |
|
pathfn=lambda p: p, |
|
allow_move_dir=True, |
|
allow_append_to_file=True, |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def py_fsspec_s3fs(request, s3_server): |
|
s3fs = pytest.importorskip("s3fs") |
|
host, port, access_key, secret_key = s3_server['connection'] |
|
bucket = 'pyarrow-filesystem/' |
|
|
|
fs = s3fs.S3FileSystem( |
|
key=access_key, |
|
secret=secret_key, |
|
client_kwargs=dict(endpoint_url='http://{}:{}'.format(host, port)) |
|
) |
|
fs = PyFileSystem(FSSpecHandler(fs)) |
|
fs.create_dir(bucket) |
|
|
|
yield dict( |
|
fs=fs, |
|
pathfn=bucket.__add__, |
|
allow_move_dir=False, |
|
allow_append_to_file=True, |
|
) |
|
fs.delete_dir(bucket) |
|
|
|
|
|
@pytest.fixture(params=[ |
|
pytest.param( |
|
'localfs', |
|
id='LocalFileSystem()' |
|
), |
|
pytest.param( |
|
'localfs_with_mmap', |
|
id='LocalFileSystem(use_mmap=True)' |
|
), |
|
pytest.param( |
|
'subtree_localfs', |
|
id='SubTreeFileSystem(LocalFileSystem())' |
|
), |
|
pytest.param( |
|
's3fs', |
|
id='S3FileSystem', |
|
marks=pytest.mark.s3 |
|
), |
|
pytest.param( |
|
'gcsfs', |
|
id='GcsFileSystem', |
|
marks=pytest.mark.gcs |
|
), |
|
pytest.param( |
|
'azurefs', |
|
id='AzureFileSystem', |
|
marks=pytest.mark.azure |
|
), |
|
pytest.param( |
|
'hdfs', |
|
id='HadoopFileSystem', |
|
marks=pytest.mark.hdfs |
|
), |
|
pytest.param( |
|
'mockfs', |
|
id='_MockFileSystem()' |
|
), |
|
pytest.param( |
|
'py_localfs', |
|
id='PyFileSystem(ProxyHandler(LocalFileSystem()))' |
|
), |
|
pytest.param( |
|
'py_mockfs', |
|
id='PyFileSystem(ProxyHandler(_MockFileSystem()))' |
|
), |
|
pytest.param( |
|
'py_fsspec_localfs', |
|
id='PyFileSystem(FSSpecHandler(fsspec.LocalFileSystem()))' |
|
), |
|
pytest.param( |
|
'py_fsspec_memoryfs', |
|
id='PyFileSystem(FSSpecHandler(fsspec.filesystem("memory")))' |
|
), |
|
pytest.param( |
|
'py_fsspec_s3fs', |
|
id='PyFileSystem(FSSpecHandler(s3fs.S3FileSystem()))', |
|
marks=pytest.mark.s3 |
|
), |
|
]) |
|
def filesystem_config(request): |
|
return request.getfixturevalue(request.param) |
|
|
|
|
|
@pytest.fixture |
|
def fs(filesystem_config): |
|
return filesystem_config['fs'] |
|
|
|
|
|
@pytest.fixture |
|
def pathfn(filesystem_config): |
|
return filesystem_config['pathfn'] |
|
|
|
|
|
@pytest.fixture |
|
def allow_move_dir(filesystem_config): |
|
return filesystem_config['allow_move_dir'] |
|
|
|
|
|
@pytest.fixture |
|
def allow_append_to_file(filesystem_config): |
|
return filesystem_config['allow_append_to_file'] |
|
|
|
|
|
def check_mtime(file_info): |
|
assert isinstance(file_info.mtime, datetime) |
|
assert isinstance(file_info.mtime_ns, int) |
|
assert file_info.mtime_ns >= 0 |
|
assert file_info.mtime_ns == pytest.approx( |
|
file_info.mtime.timestamp() * 1e9) |
|
|
|
tzinfo = file_info.mtime.tzinfo |
|
assert tzinfo is not None |
|
assert tzinfo.utcoffset(None) == timedelta(0) |
|
|
|
|
|
def check_mtime_absent(file_info): |
|
assert file_info.mtime is None |
|
assert file_info.mtime_ns is None |
|
|
|
|
|
def check_mtime_or_absent(file_info): |
|
if file_info.mtime is None: |
|
check_mtime_absent(file_info) |
|
else: |
|
check_mtime(file_info) |
|
|
|
|
|
def skip_fsspec_s3fs(fs): |
|
if fs.type_name == "py::fsspec+('s3', 's3a')": |
|
pytest.xfail(reason="Not working with fsspec's s3fs") |
|
|
|
|
|
def skip_azure(fs, reason): |
|
if fs.type_name == "abfs": |
|
pytest.skip(reason=reason) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3fs_limited_permissions_create_bucket(s3_server): |
|
from pyarrow.fs import S3FileSystem |
|
_configure_s3_limited_user(s3_server, _minio_limited_policy, |
|
'test_fs_limited_user', 'limited123') |
|
host, port, _, _ = s3_server['connection'] |
|
|
|
fs = S3FileSystem( |
|
access_key='test_fs_limited_user', |
|
secret_key='limited123', |
|
endpoint_override='{}:{}'.format(host, port), |
|
scheme='http' |
|
) |
|
fs.create_dir('existing-bucket/test') |
|
|
|
with pytest.raises(pa.ArrowIOError, match="Bucket 'new-bucket' not found"): |
|
fs.create_dir('new-bucket') |
|
|
|
with pytest.raises(pa.ArrowIOError, match="Would delete bucket"): |
|
fs.delete_dir('existing-bucket') |
|
|
|
|
|
def test_file_info_constructor(): |
|
dt = datetime.fromtimestamp(1568799826, timezone.utc) |
|
|
|
info = FileInfo("foo/bar") |
|
assert info.path == "foo/bar" |
|
assert info.base_name == "bar" |
|
assert info.type == FileType.Unknown |
|
assert info.size is None |
|
check_mtime_absent(info) |
|
|
|
info = FileInfo("foo/baz.txt", type=FileType.File, size=123, |
|
mtime=1568799826.5) |
|
assert info.path == "foo/baz.txt" |
|
assert info.base_name == "baz.txt" |
|
assert info.type == FileType.File |
|
assert info.size == 123 |
|
assert info.mtime_ns == 1568799826500000000 |
|
check_mtime(info) |
|
|
|
info = FileInfo("foo", type=FileType.Directory, mtime=dt) |
|
assert info.path == "foo" |
|
assert info.base_name == "foo" |
|
assert info.type == FileType.Directory |
|
assert info.size is None |
|
assert info.mtime == dt |
|
assert info.mtime_ns == 1568799826000000000 |
|
check_mtime(info) |
|
|
|
|
|
def test_cannot_instantiate_base_filesystem(): |
|
with pytest.raises(TypeError): |
|
FileSystem() |
|
|
|
|
|
def test_filesystem_equals(): |
|
fs0 = LocalFileSystem() |
|
fs1 = LocalFileSystem() |
|
fs2 = _MockFileSystem() |
|
|
|
assert fs0.equals(fs0) |
|
assert fs0.equals(fs1) |
|
with pytest.raises(TypeError): |
|
fs0.equals('string') |
|
assert fs0 == fs0 == fs1 |
|
assert fs0 != 4 |
|
|
|
assert fs2 == fs2 |
|
assert fs2 != _MockFileSystem() |
|
|
|
assert SubTreeFileSystem('/base', fs0) == SubTreeFileSystem('/base', fs0) |
|
assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/base', fs2) |
|
assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/other', fs0) |
|
|
|
|
|
def test_filesystem_equals_none(fs): |
|
with pytest.raises(TypeError, match="got NoneType"): |
|
fs.equals(None) |
|
|
|
assert fs is not None |
|
|
|
|
|
def test_subtree_filesystem(): |
|
localfs = LocalFileSystem() |
|
|
|
subfs = SubTreeFileSystem('/base', localfs) |
|
assert subfs.base_path == '/base/' |
|
assert subfs.base_fs == localfs |
|
assert repr(subfs).startswith('SubTreeFileSystem(base_path=/base/, ' |
|
'base_fs=<pyarrow._fs.LocalFileSystem') |
|
|
|
subfs = SubTreeFileSystem('/another/base/', LocalFileSystem()) |
|
assert subfs.base_path == '/another/base/' |
|
assert subfs.base_fs == localfs |
|
assert repr(subfs).startswith('SubTreeFileSystem(base_path=/another/base/,' |
|
' base_fs=<pyarrow._fs.LocalFileSystem') |
|
|
|
|
|
def test_filesystem_pickling(fs, pickle_module): |
|
if fs.type_name.split('::')[-1] == 'mock': |
|
pytest.xfail(reason='MockFileSystem is not serializable') |
|
|
|
serialized = pickle_module.dumps(fs) |
|
restored = pickle_module.loads(serialized) |
|
assert isinstance(restored, FileSystem) |
|
assert restored.equals(fs) |
|
|
|
|
|
def test_filesystem_is_functional_after_pickling(fs, pathfn, pickle_module): |
|
if fs.type_name.split('::')[-1] == 'mock': |
|
pytest.xfail(reason='MockFileSystem is not serializable') |
|
skip_fsspec_s3fs(fs) |
|
|
|
aaa = pathfn('a/aa/aaa/') |
|
bb = pathfn('a/bb') |
|
c = pathfn('c.txt') |
|
|
|
fs.create_dir(aaa) |
|
with fs.open_output_stream(bb): |
|
pass |
|
with fs.open_output_stream(c) as fp: |
|
fp.write(b'test') |
|
|
|
restored = pickle_module.loads(pickle_module.dumps(fs)) |
|
aaa_info, bb_info, c_info = restored.get_file_info([aaa, bb, c]) |
|
assert aaa_info.type == FileType.Directory |
|
assert bb_info.type == FileType.File |
|
assert c_info.type == FileType.File |
|
|
|
|
|
def test_type_name(): |
|
fs = LocalFileSystem() |
|
assert fs.type_name == "local" |
|
fs = _MockFileSystem() |
|
assert fs.type_name == "mock" |
|
|
|
|
|
def test_normalize_path(fs): |
|
|
|
|
|
assert fs.normalize_path("foo") == "foo" |
|
|
|
|
|
def test_non_path_like_input_raises(fs): |
|
class Path: |
|
pass |
|
|
|
invalid_paths = [1, 1.1, Path(), tuple(), {}, [], lambda: 1, |
|
pathlib.Path()] |
|
for path in invalid_paths: |
|
with pytest.raises(TypeError): |
|
fs.create_dir(path) |
|
|
|
|
|
def test_get_file_info(fs, pathfn): |
|
aaa = pathfn('a/aa/aaa/') |
|
bb = pathfn('a/bb') |
|
c = pathfn('c.txt') |
|
zzz = pathfn('zzz') |
|
|
|
fs.create_dir(aaa) |
|
with fs.open_output_stream(bb): |
|
pass |
|
with fs.open_output_stream(c) as fp: |
|
fp.write(b'test') |
|
|
|
aaa_info, bb_info, c_info, zzz_info = fs.get_file_info([aaa, bb, c, zzz]) |
|
|
|
assert aaa_info.path == aaa |
|
assert 'aaa' in repr(aaa_info) |
|
assert aaa_info.extension == '' |
|
if fs.type_name == "py::fsspec+('s3', 's3a')": |
|
|
|
assert aaa_info.type == FileType.NotFound |
|
else: |
|
assert aaa_info.type == FileType.Directory |
|
assert 'FileType.Directory' in repr(aaa_info) |
|
assert aaa_info.size is None |
|
check_mtime_or_absent(aaa_info) |
|
|
|
assert bb_info.path == str(bb) |
|
assert bb_info.base_name == 'bb' |
|
assert bb_info.extension == '' |
|
assert bb_info.type == FileType.File |
|
assert 'FileType.File' in repr(bb_info) |
|
assert bb_info.size == 0 |
|
if fs.type_name not in ["py::fsspec+memory", "py::fsspec+('s3', 's3a')"]: |
|
check_mtime(bb_info) |
|
|
|
assert c_info.path == str(c) |
|
assert c_info.base_name == 'c.txt' |
|
assert c_info.extension == 'txt' |
|
assert c_info.type == FileType.File |
|
assert 'FileType.File' in repr(c_info) |
|
assert c_info.size == 4 |
|
if fs.type_name not in ["py::fsspec+memory", "py::fsspec+('s3', 's3a')"]: |
|
check_mtime(c_info) |
|
|
|
assert zzz_info.path == str(zzz) |
|
assert zzz_info.base_name == 'zzz' |
|
assert zzz_info.extension == '' |
|
assert zzz_info.type == FileType.NotFound |
|
assert zzz_info.size is None |
|
assert zzz_info.mtime is None |
|
assert 'FileType.NotFound' in repr(zzz_info) |
|
check_mtime_absent(zzz_info) |
|
|
|
|
|
aaa_info2 = fs.get_file_info(aaa) |
|
assert aaa_info.path == aaa_info2.path |
|
assert aaa_info.type == aaa_info2.type |
|
|
|
|
|
def test_get_file_info_with_selector(fs, pathfn): |
|
base_dir = pathfn('selector-dir/') |
|
file_a = pathfn('selector-dir/test_file_a') |
|
file_b = pathfn('selector-dir/test_file_b') |
|
dir_a = pathfn('selector-dir/test_dir_a') |
|
file_c = pathfn('selector-dir/test_dir_a/test_file_c') |
|
dir_b = pathfn('selector-dir/test_dir_b') |
|
|
|
try: |
|
fs.create_dir(base_dir) |
|
with fs.open_output_stream(file_a): |
|
pass |
|
with fs.open_output_stream(file_b): |
|
pass |
|
fs.create_dir(dir_a) |
|
with fs.open_output_stream(file_c): |
|
pass |
|
fs.create_dir(dir_b) |
|
|
|
|
|
selector = FileSelector(base_dir, allow_not_found=False, |
|
recursive=True) |
|
assert selector.base_dir == base_dir |
|
|
|
infos = fs.get_file_info(selector) |
|
if fs.type_name == "py::fsspec+('s3', 's3a')": |
|
|
|
len(infos) == 4 |
|
else: |
|
assert len(infos) == 5 |
|
|
|
for info in infos: |
|
if (info.path.endswith(file_a) or info.path.endswith(file_b) or |
|
info.path.endswith(file_c)): |
|
assert info.type == FileType.File |
|
elif (info.path.rstrip("/").endswith(dir_a) or |
|
info.path.rstrip("/").endswith(dir_b)): |
|
assert info.type == FileType.Directory |
|
else: |
|
raise ValueError('unexpected path {}'.format(info.path)) |
|
check_mtime_or_absent(info) |
|
|
|
|
|
selector = FileSelector(base_dir, recursive=False) |
|
|
|
infos = fs.get_file_info(selector) |
|
if fs.type_name == "py::fsspec+('s3', 's3a')": |
|
|
|
assert len(infos) == 3 |
|
else: |
|
assert len(infos) == 4 |
|
|
|
finally: |
|
fs.delete_dir(base_dir) |
|
|
|
|
|
def test_create_dir(fs, pathfn): |
|
|
|
|
|
skip_fsspec_s3fs(fs) |
|
d = pathfn('test-directory/') |
|
|
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(d) |
|
|
|
fs.create_dir(d) |
|
fs.delete_dir(d) |
|
|
|
d = pathfn('deeply/nested/test-directory/') |
|
fs.create_dir(d, recursive=True) |
|
fs.delete_dir(d) |
|
|
|
|
|
def test_delete_dir(fs, pathfn): |
|
skip_fsspec_s3fs(fs) |
|
|
|
d = pathfn('directory/') |
|
nd = pathfn('directory/nested/') |
|
|
|
fs.create_dir(nd) |
|
fs.delete_dir(d) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(nd) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(d) |
|
|
|
|
|
def test_delete_dir_with_explicit_subdir(fs, pathfn): |
|
|
|
|
|
|
|
|
|
skip_fsspec_s3fs(fs) |
|
|
|
d = pathfn('directory/') |
|
nd = pathfn('directory/nested/') |
|
|
|
|
|
fs.create_dir(d) |
|
fs.create_dir(nd) |
|
fs.delete_dir(d) |
|
dir_info = fs.get_file_info(d) |
|
assert dir_info.type == FileType.NotFound |
|
|
|
|
|
d = pathfn('directory2') |
|
nd = pathfn('directory2/nested') |
|
f = pathfn('directory2/nested/target-file') |
|
|
|
fs.create_dir(d) |
|
fs.create_dir(nd) |
|
with fs.open_output_stream(f) as s: |
|
s.write(b'data') |
|
|
|
fs.delete_dir(d) |
|
dir_info = fs.get_file_info(d) |
|
assert dir_info.type == FileType.NotFound |
|
|
|
|
|
def test_delete_dir_contents(fs, pathfn): |
|
skip_fsspec_s3fs(fs) |
|
|
|
d = pathfn('directory/') |
|
nd = pathfn('directory/nested/') |
|
|
|
fs.create_dir(nd) |
|
fs.delete_dir_contents(d) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(nd) |
|
fs.delete_dir_contents(nd, missing_dir_ok=True) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir_contents(nd) |
|
fs.delete_dir(d) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(d) |
|
|
|
|
|
def _check_root_dir_contents(config): |
|
fs = config['fs'] |
|
pathfn = config['pathfn'] |
|
|
|
d = pathfn('directory/') |
|
nd = pathfn('directory/nested/') |
|
|
|
fs.create_dir(nd) |
|
with pytest.raises(pa.ArrowInvalid): |
|
fs.delete_dir_contents("") |
|
with pytest.raises(pa.ArrowInvalid): |
|
fs.delete_dir_contents("/") |
|
with pytest.raises(pa.ArrowInvalid): |
|
fs.delete_dir_contents("//") |
|
|
|
fs.delete_dir_contents("", accept_root_dir=True) |
|
fs.delete_dir_contents("/", accept_root_dir=True) |
|
fs.delete_dir_contents("//", accept_root_dir=True) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(d) |
|
|
|
|
|
def test_delete_root_dir_contents(mockfs, py_mockfs): |
|
_check_root_dir_contents(mockfs) |
|
_check_root_dir_contents(py_mockfs) |
|
|
|
|
|
def test_copy_file(fs, pathfn): |
|
s = pathfn('test-copy-source-file') |
|
t = pathfn('test-copy-target-file') |
|
|
|
with fs.open_output_stream(s): |
|
pass |
|
|
|
fs.copy_file(s, t) |
|
fs.delete_file(s) |
|
fs.delete_file(t) |
|
|
|
|
|
def test_move_directory(fs, pathfn, allow_move_dir): |
|
|
|
skip_azure(fs, "Not implemented yet in for Azure. See GH-40025") |
|
|
|
|
|
s = pathfn('source-dir/') |
|
t = pathfn('target-dir/') |
|
|
|
fs.create_dir(s) |
|
|
|
if allow_move_dir: |
|
fs.move(s, t) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_dir(s) |
|
fs.delete_dir(t) |
|
else: |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.move(s, t) |
|
|
|
|
|
def test_move_file(fs, pathfn): |
|
|
|
|
|
skip_fsspec_s3fs(fs) |
|
|
|
|
|
skip_azure(fs, "Not implemented yet in for Azure. See GH-40025") |
|
|
|
s = pathfn('test-move-source-file') |
|
t = pathfn('test-move-target-file') |
|
|
|
with fs.open_output_stream(s): |
|
pass |
|
|
|
fs.move(s, t) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_file(s) |
|
fs.delete_file(t) |
|
|
|
|
|
def test_delete_file(fs, pathfn): |
|
p = pathfn('test-delete-target-file') |
|
with fs.open_output_stream(p): |
|
pass |
|
|
|
fs.delete_file(p) |
|
with pytest.raises(pa.ArrowIOError): |
|
fs.delete_file(p) |
|
|
|
d = pathfn('test-delete-nested') |
|
fs.create_dir(d) |
|
f = pathfn('test-delete-nested/target-file') |
|
with fs.open_output_stream(f) as s: |
|
s.write(b'data') |
|
|
|
fs.delete_dir(d) |
|
|
|
|
|
def identity(v): |
|
return v |
|
|
|
|
|
@pytest.mark.gzip |
|
@pytest.mark.parametrize( |
|
('compression', 'buffer_size', 'compressor'), |
|
[ |
|
(None, None, identity), |
|
(None, 64, identity), |
|
('gzip', None, gzip.compress), |
|
('gzip', 256, gzip.compress), |
|
] |
|
) |
|
def test_open_input_stream(fs, pathfn, compression, buffer_size, compressor): |
|
p = pathfn('open-input-stream') |
|
|
|
data = b'some data for reading\n' * 512 |
|
with fs.open_output_stream(p) as s: |
|
s.write(compressor(data)) |
|
|
|
with fs.open_input_stream(p, compression, buffer_size) as s: |
|
result = s.read() |
|
|
|
assert result == data |
|
|
|
|
|
def test_open_input_file(fs, pathfn): |
|
p = pathfn('open-input-file') |
|
|
|
data = b'some data' * 1024 |
|
with fs.open_output_stream(p) as s: |
|
s.write(data) |
|
|
|
read_from = len(b'some data') * 512 |
|
with fs.open_input_file(p) as f: |
|
result = f.read() |
|
assert result == data |
|
|
|
with fs.open_input_file(p) as f: |
|
f.seek(read_from) |
|
result = f.read() |
|
|
|
assert result == data[read_from:] |
|
|
|
|
|
def test_open_input_stream_not_found(fs, pathfn): |
|
|
|
p = pathfn('open-input-stream-not-found') |
|
with pytest.raises(FileNotFoundError): |
|
fs.open_input_stream(p) |
|
|
|
|
|
@pytest.mark.gzip |
|
@pytest.mark.parametrize( |
|
('compression', 'buffer_size', 'decompressor'), |
|
[ |
|
(None, None, identity), |
|
(None, 64, identity), |
|
('gzip', None, gzip.decompress), |
|
('gzip', 256, gzip.decompress), |
|
] |
|
) |
|
def test_open_output_stream(fs, pathfn, compression, buffer_size, |
|
decompressor): |
|
p = pathfn('open-output-stream') |
|
|
|
data = b'some data for writing' * 1024 |
|
with fs.open_output_stream(p, compression, buffer_size) as f: |
|
f.write(data) |
|
|
|
with fs.open_input_stream(p, compression, buffer_size) as f: |
|
assert f.read(len(data)) == data |
|
|
|
|
|
@pytest.mark.gzip |
|
@pytest.mark.parametrize( |
|
('compression', 'buffer_size', 'compressor', 'decompressor'), |
|
[ |
|
(None, None, identity, identity), |
|
(None, 64, identity, identity), |
|
('gzip', None, gzip.compress, gzip.decompress), |
|
('gzip', 256, gzip.compress, gzip.decompress), |
|
] |
|
) |
|
def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor, |
|
decompressor, allow_append_to_file): |
|
p = pathfn('open-append-stream') |
|
|
|
initial = compressor(b'already existing') |
|
with fs.open_output_stream(p) as s: |
|
s.write(initial) |
|
|
|
if allow_append_to_file: |
|
with fs.open_append_stream(p, compression=compression, |
|
buffer_size=buffer_size) as f: |
|
f.write(b'\nnewly added') |
|
|
|
with fs.open_input_stream(p) as f: |
|
result = f.read() |
|
|
|
result = decompressor(result) |
|
assert result == b'already existing\nnewly added' |
|
else: |
|
with pytest.raises(pa.ArrowNotImplementedError): |
|
fs.open_append_stream(p, compression=compression, |
|
buffer_size=buffer_size) |
|
|
|
|
|
def test_open_output_stream_metadata(fs, pathfn): |
|
p = pathfn('open-output-stream-metadata') |
|
metadata = {'Content-Type': 'x-pyarrow/test'} |
|
|
|
data = b'some data' |
|
with fs.open_output_stream(p, metadata=metadata) as f: |
|
f.write(data) |
|
|
|
with fs.open_input_stream(p) as f: |
|
assert f.read() == data |
|
got_metadata = f.metadata() |
|
|
|
if fs.type_name in ['s3', 'gcs', 'abfs'] or 'mock' in fs.type_name: |
|
|
|
skip_azure( |
|
fs, "Azure filesystem currently only returns system metadata not user " |
|
"metadata. See GH-40026") |
|
for k, v in metadata.items(): |
|
assert got_metadata[k] == v.encode() |
|
else: |
|
assert got_metadata == {} |
|
|
|
|
|
def test_localfs_options(): |
|
|
|
LocalFileSystem(use_mmap=False) |
|
|
|
with pytest.raises(TypeError): |
|
LocalFileSystem(xxx=False) |
|
|
|
|
|
def test_localfs_errors(localfs): |
|
|
|
|
|
fs = localfs['fs'] |
|
with assert_file_not_found(): |
|
fs.open_input_stream('/non/existent/file') |
|
with assert_file_not_found(): |
|
fs.open_output_stream('/non/existent/file') |
|
with assert_file_not_found(): |
|
fs.create_dir('/non/existent/dir', recursive=False) |
|
with assert_file_not_found(): |
|
fs.delete_dir('/non/existent/dir') |
|
with assert_file_not_found(): |
|
fs.delete_file('/non/existent/dir') |
|
with assert_file_not_found(): |
|
fs.move('/non/existent', '/xxx') |
|
with assert_file_not_found(): |
|
fs.copy_file('/non/existent', '/xxx') |
|
|
|
|
|
def test_localfs_file_info(localfs): |
|
fs = localfs['fs'] |
|
|
|
file_path = pathlib.Path(__file__) |
|
dir_path = file_path.parent |
|
[file_info, dir_info] = fs.get_file_info([file_path.as_posix(), |
|
dir_path.as_posix()]) |
|
assert file_info.size == file_path.stat().st_size |
|
assert file_info.mtime_ns == file_path.stat().st_mtime_ns |
|
check_mtime(file_info) |
|
assert dir_info.mtime_ns == dir_path.stat().st_mtime_ns |
|
check_mtime(dir_info) |
|
|
|
|
|
def test_mockfs_mtime_roundtrip(mockfs): |
|
dt = datetime.fromtimestamp(1568799826, timezone.utc) |
|
fs = _MockFileSystem(dt) |
|
|
|
with fs.open_output_stream('foo'): |
|
pass |
|
[info] = fs.get_file_info(['foo']) |
|
assert info.mtime == dt |
|
|
|
|
|
@pytest.mark.gcs |
|
def test_gcs_options(pickle_module): |
|
from pyarrow.fs import GcsFileSystem |
|
dt = datetime.now() |
|
fs = GcsFileSystem(access_token='abc', |
|
target_service_account='service_account@apache', |
|
credential_token_expiration=dt, |
|
default_bucket_location='us-west2', |
|
scheme='https', endpoint_override='localhost:8999', |
|
project_id='test-project-id') |
|
assert isinstance(fs, GcsFileSystem) |
|
assert fs.default_bucket_location == 'us-west2' |
|
assert fs.project_id == 'test-project-id' |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = GcsFileSystem() |
|
assert isinstance(fs, GcsFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = GcsFileSystem(anonymous=True) |
|
assert isinstance(fs, GcsFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = GcsFileSystem(default_metadata={"ACL": "authenticated-read", |
|
"Content-Type": "text/plain"}) |
|
assert isinstance(fs, GcsFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
with pytest.raises(ValueError): |
|
GcsFileSystem(access_token='access') |
|
with pytest.raises(ValueError): |
|
GcsFileSystem(anonymous=True, access_token='secret') |
|
with pytest.raises(ValueError): |
|
GcsFileSystem(anonymous=True, target_service_account='acct') |
|
with pytest.raises(ValueError): |
|
GcsFileSystem(credential_token_expiration=datetime.now()) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3_options(pickle_module): |
|
from pyarrow.fs import (AwsDefaultS3RetryStrategy, |
|
AwsStandardS3RetryStrategy, S3FileSystem, |
|
S3RetryStrategy) |
|
|
|
fs = S3FileSystem(access_key='access', secret_key='secret', |
|
session_token='token', region='us-east-2', |
|
scheme='https', endpoint_override='localhost:8999') |
|
assert isinstance(fs, S3FileSystem) |
|
assert fs.region == 'us-east-2' |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = S3FileSystem(role_arn='role', session_name='session', |
|
external_id='id', load_frequency=100) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
|
|
fs = S3FileSystem( |
|
retry_strategy=AwsStandardS3RetryStrategy(max_attempts=5)) |
|
assert isinstance(fs, S3FileSystem) |
|
|
|
fs = S3FileSystem( |
|
retry_strategy=AwsDefaultS3RetryStrategy(max_attempts=5)) |
|
assert isinstance(fs, S3FileSystem) |
|
|
|
fs2 = S3FileSystem(role_arn='role') |
|
assert isinstance(fs2, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs2 |
|
assert fs2 != fs |
|
|
|
fs = S3FileSystem(anonymous=True) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = S3FileSystem(background_writes=True) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs2 = S3FileSystem(background_writes=True, |
|
default_metadata={"ACL": "authenticated-read", |
|
"Content-Type": "text/plain"}) |
|
assert isinstance(fs2, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs2 |
|
assert fs2 != fs |
|
|
|
fs = S3FileSystem(allow_bucket_creation=True, allow_bucket_deletion=True) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = S3FileSystem(allow_bucket_creation=True, allow_bucket_deletion=True, |
|
check_directory_existence_before_creation=True) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = S3FileSystem(request_timeout=0.5, connect_timeout=0.25) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs2 = S3FileSystem(request_timeout=0.25, connect_timeout=0.5) |
|
assert isinstance(fs2, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs2 |
|
assert fs2 != fs |
|
|
|
fs = S3FileSystem(endpoint_override='localhost:8999', force_virtual_addressing=True) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
with pytest.raises(ValueError): |
|
S3FileSystem(access_key='access') |
|
with pytest.raises(ValueError): |
|
S3FileSystem(secret_key='secret') |
|
with pytest.raises(ValueError): |
|
S3FileSystem(access_key='access', session_token='token') |
|
with pytest.raises(ValueError): |
|
S3FileSystem(secret_key='secret', session_token='token') |
|
with pytest.raises(ValueError): |
|
S3FileSystem( |
|
access_key='access', secret_key='secret', role_arn='arn' |
|
) |
|
with pytest.raises(ValueError): |
|
S3FileSystem( |
|
access_key='access', secret_key='secret', anonymous=True |
|
) |
|
with pytest.raises(ValueError): |
|
S3FileSystem(role_arn="arn", anonymous=True) |
|
with pytest.raises(ValueError): |
|
S3FileSystem(default_metadata=["foo", "bar"]) |
|
with pytest.raises(ValueError): |
|
S3FileSystem(retry_strategy=S3RetryStrategy()) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3_proxy_options(monkeypatch, pickle_module): |
|
from pyarrow.fs import S3FileSystem |
|
|
|
|
|
proxy_opts_1_dict = {'scheme': 'http', 'host': 'localhost', 'port': 8999} |
|
proxy_opts_1_str = 'http://localhost:8999' |
|
|
|
proxy_opts_2_dict = {'scheme': 'https', 'host': 'localhost', 'port': 8080} |
|
proxy_opts_2_str = 'https://localhost:8080' |
|
|
|
|
|
fs = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
|
|
fs = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
fs = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
assert isinstance(fs, S3FileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
assert fs1 == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
assert fs1 == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs1 |
|
|
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
assert fs1 == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
assert fs1 == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs1 |
|
|
|
|
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
assert fs1 == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
assert fs1 == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs1 |
|
|
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
|
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
|
fs2 = S3FileSystem() |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
|
fs2 = S3FileSystem() |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
|
fs2 = S3FileSystem() |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
fs1 = S3FileSystem(proxy_options=proxy_opts_2_str) |
|
fs2 = S3FileSystem() |
|
assert fs1 != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) != fs2 |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) != fs1 |
|
|
|
|
|
with pytest.raises(TypeError): |
|
S3FileSystem(proxy_options=('http', 'localhost', 9090)) |
|
|
|
with pytest.raises(KeyError): |
|
S3FileSystem(proxy_options={'host': 'localhost', 'port': 9090}) |
|
|
|
with pytest.raises(KeyError): |
|
S3FileSystem(proxy_options={'scheme': 'https', 'port': 9090}) |
|
|
|
with pytest.raises(KeyError): |
|
S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost'}) |
|
|
|
with pytest.raises(pa.ArrowInvalid): |
|
S3FileSystem(proxy_options='httpsB://localhost:9000') |
|
|
|
with pytest.raises(pa.ArrowInvalid): |
|
S3FileSystem(proxy_options={'scheme': 'httpA', 'host': 'localhost', |
|
'port': 8999}) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3fs_wrong_region(): |
|
from pyarrow.fs import S3FileSystem |
|
|
|
|
|
|
|
fs = S3FileSystem(region='eu-north-1', anonymous=True) |
|
|
|
msg = ("When getting information for bucket 'voltrondata-labs-datasets': " |
|
r"AWS Error UNKNOWN \(HTTP status 301\) during HeadBucket " |
|
"operation: No response body. Looks like the configured region is " |
|
"'eu-north-1' while the bucket is located in 'us-east-2'." |
|
"|NETWORK_CONNECTION") |
|
with pytest.raises(OSError, match=msg) as exc: |
|
fs.get_file_info("voltrondata-labs-datasets") |
|
|
|
|
|
if 'NETWORK_CONNECTION' in str(exc.value): |
|
return |
|
|
|
fs = S3FileSystem(region='us-east-2', anonymous=True) |
|
fs.get_file_info("voltrondata-labs-datasets") |
|
|
|
|
|
@pytest.mark.azure |
|
def test_azurefs_options(pickle_module): |
|
from pyarrow.fs import AzureFileSystem |
|
|
|
fs1 = AzureFileSystem(account_name='fake-account-name') |
|
assert isinstance(fs1, AzureFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs1)) == fs1 |
|
|
|
fs2 = AzureFileSystem(account_name='fake-account-name', |
|
account_key='fakeaccountkey') |
|
assert isinstance(fs2, AzureFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs2)) == fs2 |
|
assert fs2 != fs1 |
|
|
|
fs3 = AzureFileSystem(account_name='fake-account', account_key='fakeaccount', |
|
blob_storage_authority='fake-blob-authority', |
|
dfs_storage_authority='fake-dfs-authority', |
|
blob_storage_scheme='https', |
|
dfs_storage_scheme='https') |
|
assert isinstance(fs3, AzureFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs3)) == fs3 |
|
assert fs3 != fs2 |
|
|
|
fs4 = AzureFileSystem(account_name='fake-account-name', |
|
sas_token='fakesastoken') |
|
assert isinstance(fs4, AzureFileSystem) |
|
assert pickle_module.loads(pickle_module.dumps(fs4)) == fs4 |
|
assert fs4 != fs3 |
|
|
|
with pytest.raises(ValueError): |
|
AzureFileSystem(account_name='fake-account-name', account_key='fakeaccount', |
|
sas_token='fakesastoken') |
|
|
|
with pytest.raises(TypeError): |
|
AzureFileSystem() |
|
|
|
|
|
@pytest.mark.hdfs |
|
def test_hdfs_options(hdfs_connection, pickle_module): |
|
from pyarrow.fs import HadoopFileSystem |
|
if not pa.have_libhdfs(): |
|
pytest.skip('Cannot locate libhdfs') |
|
|
|
host, port, user = hdfs_connection |
|
|
|
replication = 2 |
|
buffer_size = 64*1024 |
|
default_block_size = 128*1024**2 |
|
uri = ('hdfs://{}:{}/?user={}&replication={}&buffer_size={}' |
|
'&default_block_size={}') |
|
|
|
hdfs1 = HadoopFileSystem(host, port, user='libhdfs', |
|
replication=replication, buffer_size=buffer_size, |
|
default_block_size=default_block_size) |
|
hdfs2 = HadoopFileSystem.from_uri(uri.format( |
|
host, port, 'libhdfs', replication, buffer_size, default_block_size |
|
)) |
|
hdfs3 = HadoopFileSystem.from_uri(uri.format( |
|
host, port, 'me', replication, buffer_size, default_block_size |
|
)) |
|
hdfs4 = HadoopFileSystem.from_uri(uri.format( |
|
host, port, 'me', replication + 1, buffer_size, default_block_size |
|
)) |
|
hdfs5 = HadoopFileSystem(host, port) |
|
hdfs6 = HadoopFileSystem.from_uri('hdfs://{}:{}'.format(host, port)) |
|
hdfs7 = HadoopFileSystem(host, port, user='localuser') |
|
hdfs8 = HadoopFileSystem(host, port, user='localuser', |
|
kerb_ticket="cache_path") |
|
hdfs9 = HadoopFileSystem(host, port, user='localuser', |
|
kerb_ticket=pathlib.Path("cache_path")) |
|
hdfs10 = HadoopFileSystem(host, port, user='localuser', |
|
kerb_ticket="cache_path2") |
|
hdfs11 = HadoopFileSystem(host, port, user='localuser', |
|
kerb_ticket="cache_path", |
|
extra_conf={'hdfs_token': 'abcd'}) |
|
|
|
assert hdfs1 == hdfs2 |
|
assert hdfs5 == hdfs6 |
|
assert hdfs6 != hdfs7 |
|
assert hdfs2 != hdfs3 |
|
assert hdfs3 != hdfs4 |
|
assert hdfs7 != hdfs5 |
|
assert hdfs2 != hdfs3 |
|
assert hdfs3 != hdfs4 |
|
assert hdfs7 != hdfs8 |
|
assert hdfs8 == hdfs9 |
|
assert hdfs10 != hdfs9 |
|
assert hdfs11 != hdfs8 |
|
|
|
with pytest.raises(TypeError): |
|
HadoopFileSystem() |
|
with pytest.raises(TypeError): |
|
HadoopFileSystem.from_uri(3) |
|
|
|
for fs in [hdfs1, hdfs2, hdfs3, hdfs4, hdfs5, hdfs6, hdfs7, hdfs8, |
|
hdfs9, hdfs10, hdfs11]: |
|
assert pickle_module.loads(pickle_module.dumps(fs)) == fs |
|
|
|
host, port, user = hdfs_connection |
|
|
|
hdfs = HadoopFileSystem(host, port, user=user) |
|
assert hdfs.get_file_info(FileSelector('/')) |
|
|
|
hdfs = HadoopFileSystem.from_uri( |
|
"hdfs://{}:{}/?user={}".format(host, port, user) |
|
) |
|
assert hdfs.get_file_info(FileSelector('/')) |
|
|
|
|
|
@pytest.mark.parametrize(('uri', 'expected_klass', 'expected_path'), [ |
|
|
|
|
|
('mock:', _MockFileSystem, ''), |
|
('mock:foo/bar', _MockFileSystem, 'foo/bar'), |
|
('mock:/foo/bar', _MockFileSystem, 'foo/bar'), |
|
('mock:///foo/bar', _MockFileSystem, 'foo/bar'), |
|
('mock:///some%20path/%C3%A9', _MockFileSystem, 'some path/Γ©'), |
|
('file:/', LocalFileSystem, '/'), |
|
('file:///', LocalFileSystem, '/'), |
|
('file:/foo/bar', LocalFileSystem, '/foo/bar'), |
|
('file:///foo/bar', LocalFileSystem, '/foo/bar'), |
|
('file:///some%20path/%C3%A9', LocalFileSystem, '/some path/Γ©'), |
|
|
|
('/', LocalFileSystem, '/'), |
|
('/foo/bar', LocalFileSystem, '/foo/bar'), |
|
('/some path/%20Γ©', LocalFileSystem, '/some path/%20Γ©'), |
|
]) |
|
def test_filesystem_from_uri(uri, expected_klass, expected_path): |
|
fs, path = FileSystem.from_uri(uri) |
|
assert isinstance(fs, expected_klass) |
|
assert path == expected_path |
|
|
|
|
|
@pytest.mark.parametrize( |
|
'path', |
|
['', '/', 'foo/bar', '/foo/bar', __file__] |
|
) |
|
def test_filesystem_from_path_object(path): |
|
p = pathlib.Path(path) |
|
fs, path = FileSystem.from_uri(p) |
|
assert isinstance(fs, LocalFileSystem) |
|
assert path == p.resolve().absolute().as_posix() |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_filesystem_from_uri_s3(s3_server): |
|
from pyarrow.fs import S3FileSystem |
|
|
|
host, port, access_key, secret_key = s3_server['connection'] |
|
|
|
uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}"\ |
|
"&allow_bucket_creation=True" \ |
|
.format(access_key, secret_key, host, port) |
|
|
|
fs, path = FileSystem.from_uri(uri) |
|
assert isinstance(fs, S3FileSystem) |
|
assert path == "mybucket/foo/bar" |
|
|
|
fs.create_dir(path) |
|
[info] = fs.get_file_info([path]) |
|
assert info.path == path |
|
assert info.type == FileType.Directory |
|
|
|
|
|
@pytest.mark.gcs |
|
def test_filesystem_from_uri_gcs(gcs_server): |
|
from pyarrow.fs import GcsFileSystem |
|
|
|
host, port = gcs_server['connection'] |
|
|
|
uri = ("gs://anonymous@" + |
|
f"mybucket/foo/bar?scheme=http&endpoint_override={host}:{port}&" + |
|
"retry_limit_seconds=5&project_id=test-project-id") |
|
|
|
fs, path = FileSystem.from_uri(uri) |
|
assert isinstance(fs, GcsFileSystem) |
|
assert path == "mybucket/foo/bar" |
|
|
|
fs.create_dir(path) |
|
[info] = fs.get_file_info([path]) |
|
assert info.path == path |
|
assert info.type == FileType.Directory |
|
|
|
|
|
def test_py_filesystem(): |
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
assert isinstance(fs, PyFileSystem) |
|
assert fs.type_name == "py::dummy" |
|
assert fs.handler is handler |
|
|
|
with pytest.raises(TypeError): |
|
PyFileSystem(None) |
|
|
|
|
|
def test_py_filesystem_equality(): |
|
handler1 = DummyHandler(1) |
|
handler2 = DummyHandler(2) |
|
handler3 = DummyHandler(2) |
|
fs1 = PyFileSystem(handler1) |
|
fs2 = PyFileSystem(handler1) |
|
fs3 = PyFileSystem(handler2) |
|
fs4 = PyFileSystem(handler3) |
|
|
|
assert fs2 is not fs1 |
|
assert fs3 is not fs2 |
|
assert fs4 is not fs3 |
|
assert fs2 == fs1 |
|
assert fs3 != fs2 |
|
assert fs4 == fs3 |
|
|
|
assert fs1 != LocalFileSystem() |
|
assert fs1 != object() |
|
|
|
|
|
def test_py_filesystem_pickling(pickle_module): |
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
|
|
serialized = pickle_module.dumps(fs) |
|
restored = pickle_module.loads(serialized) |
|
assert isinstance(restored, FileSystem) |
|
assert restored == fs |
|
assert restored.handler == handler |
|
assert restored.type_name == "py::dummy" |
|
|
|
|
|
def test_py_filesystem_lifetime(): |
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
assert isinstance(fs, PyFileSystem) |
|
wr = weakref.ref(handler) |
|
handler = None |
|
assert wr() is not None |
|
fs = None |
|
assert wr() is None |
|
|
|
|
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
wr = weakref.ref(handler) |
|
handler = None |
|
assert wr() is fs.handler |
|
assert wr() is not None |
|
fs = None |
|
assert wr() is None |
|
|
|
|
|
def test_py_filesystem_get_file_info(): |
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
|
|
[info] = fs.get_file_info(['some/dir']) |
|
assert info.path == 'some/dir' |
|
assert info.type == FileType.Directory |
|
|
|
[info] = fs.get_file_info(['some/file']) |
|
assert info.path == 'some/file' |
|
assert info.type == FileType.File |
|
|
|
[info] = fs.get_file_info(['notfound']) |
|
assert info.path == 'notfound' |
|
assert info.type == FileType.NotFound |
|
|
|
with pytest.raises(TypeError): |
|
fs.get_file_info(['badtype']) |
|
|
|
with pytest.raises(IOError): |
|
fs.get_file_info(['xxx']) |
|
|
|
|
|
def test_py_filesystem_get_file_info_selector(): |
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
|
|
selector = FileSelector(base_dir="somedir") |
|
infos = fs.get_file_info(selector) |
|
assert len(infos) == 2 |
|
assert infos[0].path == "somedir/file1" |
|
assert infos[0].type == FileType.File |
|
assert infos[0].size == 123 |
|
assert infos[1].path == "somedir/subdir1" |
|
assert infos[1].type == FileType.Directory |
|
assert infos[1].size is None |
|
|
|
selector = FileSelector(base_dir="somedir", recursive=True) |
|
infos = fs.get_file_info(selector) |
|
assert len(infos) == 3 |
|
assert infos[0].path == "somedir/file1" |
|
assert infos[1].path == "somedir/subdir1" |
|
assert infos[2].path == "somedir/subdir1/file2" |
|
|
|
selector = FileSelector(base_dir="notfound") |
|
with pytest.raises(FileNotFoundError): |
|
fs.get_file_info(selector) |
|
|
|
selector = FileSelector(base_dir="notfound", allow_not_found=True) |
|
assert fs.get_file_info(selector) == [] |
|
|
|
|
|
def test_py_filesystem_ops(): |
|
handler = DummyHandler() |
|
fs = PyFileSystem(handler) |
|
|
|
fs.create_dir("recursive", recursive=True) |
|
fs.create_dir("non-recursive", recursive=False) |
|
with pytest.raises(IOError): |
|
fs.create_dir("foobar") |
|
|
|
fs.delete_dir("delete_dir") |
|
fs.delete_dir_contents("delete_dir_contents") |
|
for path in ("", "/", "//"): |
|
with pytest.raises(ValueError): |
|
fs.delete_dir_contents(path) |
|
fs.delete_dir_contents(path, accept_root_dir=True) |
|
fs.delete_file("delete_file") |
|
fs.move("move_from", "move_to") |
|
fs.copy_file("copy_file_from", "copy_file_to") |
|
|
|
|
|
def test_py_open_input_stream(): |
|
fs = PyFileSystem(DummyHandler()) |
|
|
|
with fs.open_input_stream("somefile") as f: |
|
assert f.read() == b"somefile:input_stream" |
|
with pytest.raises(FileNotFoundError): |
|
fs.open_input_stream("notfound") |
|
|
|
|
|
def test_py_open_input_file(): |
|
fs = PyFileSystem(DummyHandler()) |
|
|
|
with fs.open_input_file("somefile") as f: |
|
assert f.read() == b"somefile:input_file" |
|
with pytest.raises(FileNotFoundError): |
|
fs.open_input_file("notfound") |
|
|
|
|
|
def test_py_open_output_stream(): |
|
fs = PyFileSystem(DummyHandler()) |
|
|
|
with fs.open_output_stream("somefile") as f: |
|
f.write(b"data") |
|
|
|
|
|
def test_py_open_append_stream(): |
|
fs = PyFileSystem(DummyHandler()) |
|
|
|
with fs.open_append_stream("somefile") as f: |
|
f.write(b"data") |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3_real_aws(): |
|
|
|
|
|
from pyarrow.fs import S3FileSystem |
|
default_region = (os.environ.get('PYARROW_TEST_S3_REGION') or |
|
'us-east-1') |
|
fs = S3FileSystem(anonymous=True) |
|
assert fs.region == default_region |
|
|
|
fs = S3FileSystem(anonymous=True, region='us-east-2') |
|
entries = fs.get_file_info(FileSelector( |
|
'voltrondata-labs-datasets/nyc-taxi')) |
|
assert len(entries) > 0 |
|
key = 'voltrondata-labs-datasets/nyc-taxi/year=2019/month=6/part-0.parquet' |
|
with fs.open_input_stream(key) as f: |
|
md = f.metadata() |
|
assert 'Content-Type' in md |
|
assert md['Last-Modified'] == b'2022-07-12T23:32:00Z' |
|
|
|
|
|
assert md['ETag'] == b'"4c6a76826a695c6ac61592bc30cda3df-16"' |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3_real_aws_region_selection(): |
|
|
|
|
|
fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt') |
|
assert fs.region == 'eu-west-1' |
|
with fs.open_input_stream(path) as f: |
|
assert b"Meteo-France Atmospheric models on AWS" in f.read(50) |
|
|
|
|
|
fs, path = FileSystem.from_uri( |
|
's3://mf-nwp-models/README.txt?region=us-east-2') |
|
assert fs.region == 'us-east-2' |
|
|
|
|
|
|
|
with pytest.raises(IOError, match="Bucket '.*' not found"): |
|
FileSystem.from_uri('s3://x-arrow-nonexistent-bucket') |
|
fs, path = FileSystem.from_uri( |
|
's3://x-arrow-nonexistent-bucket?region=us-east-3') |
|
assert fs.region == 'us-east-3' |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_resolve_s3_region(): |
|
from pyarrow.fs import resolve_s3_region |
|
assert resolve_s3_region('voltrondata-labs-datasets') == 'us-east-2' |
|
assert resolve_s3_region('mf-nwp-models') == 'eu-west-1' |
|
|
|
with pytest.raises(ValueError, match="Not a valid bucket name"): |
|
resolve_s3_region('foo/bar') |
|
with pytest.raises(ValueError, match="Not a valid bucket name"): |
|
resolve_s3_region('s3:bucket') |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_copy_files(s3_connection, s3fs, tempdir): |
|
fs = s3fs["fs"] |
|
pathfn = s3fs["pathfn"] |
|
|
|
|
|
path = pathfn('c.txt') |
|
with fs.open_output_stream(path) as f: |
|
f.write(b'test') |
|
|
|
|
|
host, port, access_key, secret_key = s3_connection |
|
source_uri = ( |
|
f"s3://{access_key}:{secret_key}@{path}" |
|
f"?scheme=http&endpoint_override={host}:{port}" |
|
) |
|
|
|
local_path1 = str(tempdir / "c_copied1.txt") |
|
copy_files(source_uri, local_path1) |
|
|
|
localfs = LocalFileSystem() |
|
with localfs.open_input_stream(local_path1) as f: |
|
assert f.read() == b"test" |
|
|
|
|
|
local_path2 = str(tempdir / "c_copied2.txt") |
|
copy_files(path, local_path2, source_filesystem=fs) |
|
with localfs.open_input_stream(local_path2) as f: |
|
assert f.read() == b"test" |
|
|
|
|
|
local_path3 = str(tempdir / "c_copied3.txt") |
|
destination_uri = _filesystem_uri(local_path3) |
|
copy_files(source_uri, destination_uri) |
|
|
|
with localfs.open_input_stream(local_path3) as f: |
|
assert f.read() == b"test" |
|
|
|
|
|
local_path4 = str(tempdir / "c_copied4.txt") |
|
copy_files(source_uri, local_path4, destination_filesystem=localfs) |
|
|
|
with localfs.open_input_stream(local_path4) as f: |
|
assert f.read() == b"test" |
|
|
|
|
|
local_path5 = str(tempdir / "c_copied5.txt") |
|
copy_files(source_uri, local_path5, chunk_size=1, use_threads=False) |
|
|
|
with localfs.open_input_stream(local_path5) as f: |
|
assert f.read() == b"test" |
|
|
|
|
|
def test_copy_files_directory(tempdir): |
|
localfs = LocalFileSystem() |
|
|
|
|
|
source_dir = tempdir / "source" |
|
source_dir.mkdir() |
|
with localfs.open_output_stream(str(source_dir / "file1")) as f: |
|
f.write(b'test1') |
|
with localfs.open_output_stream(str(source_dir / "file2")) as f: |
|
f.write(b'test2') |
|
|
|
def check_copied_files(destination_dir): |
|
with localfs.open_input_stream(str(destination_dir / "file1")) as f: |
|
assert f.read() == b"test1" |
|
with localfs.open_input_stream(str(destination_dir / "file2")) as f: |
|
assert f.read() == b"test2" |
|
|
|
|
|
destination_dir1 = tempdir / "destination1" |
|
|
|
destination_dir1.mkdir() |
|
copy_files(str(source_dir), str(destination_dir1)) |
|
check_copied_files(destination_dir1) |
|
|
|
|
|
destination_dir2 = tempdir / "destination2" |
|
destination_dir2.mkdir() |
|
copy_files(str(source_dir), str(destination_dir2), |
|
source_filesystem=localfs, destination_filesystem=localfs) |
|
check_copied_files(destination_dir2) |
|
|
|
|
|
destination_dir3 = tempdir / "destination3" |
|
destination_dir3.mkdir() |
|
source_uri = _filesystem_uri(str(source_dir)) |
|
destination_uri = _filesystem_uri(str(destination_dir3)) |
|
copy_files(source_uri, destination_uri) |
|
check_copied_files(destination_dir3) |
|
|
|
|
|
destination_dir4 = tempdir / "destination4" |
|
destination_dir4.mkdir() |
|
copy_files(source_dir, destination_dir4) |
|
check_copied_files(destination_dir4) |
|
|
|
|
|
destination_dir5 = tempdir / "destination5" |
|
destination_dir5.mkdir() |
|
copy_files(source_dir, destination_dir5, chunk_size=1, use_threads=False) |
|
check_copied_files(destination_dir5) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3_finalize(): |
|
|
|
|
|
code = """if 1: |
|
import pytest |
|
from pyarrow.fs import (FileSystem, S3FileSystem, |
|
ensure_s3_initialized, finalize_s3) |
|
|
|
fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt') |
|
assert fs.region == 'eu-west-1' |
|
f = fs.open_input_stream(path) |
|
f.read(50) |
|
|
|
finalize_s3() |
|
|
|
with pytest.raises(ValueError, match="S3 .* finalized"): |
|
f.read(50) |
|
with pytest.raises(ValueError, match="S3 .* finalized"): |
|
fs.open_input_stream(path) |
|
with pytest.raises(ValueError, match="S3 .* finalized"): |
|
S3FileSystem(anonymous=True) |
|
with pytest.raises(ValueError, match="S3 .* finalized"): |
|
FileSystem.from_uri('s3://mf-nwp-models/README.txt') |
|
""" |
|
subprocess.check_call([sys.executable, "-c", code]) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_s3_finalize_region_resolver(): |
|
|
|
code = """if 1: |
|
import pytest |
|
from pyarrow.fs import resolve_s3_region, ensure_s3_initialized, finalize_s3 |
|
|
|
resolve_s3_region('mf-nwp-models') |
|
|
|
finalize_s3() |
|
|
|
# Testing both cached and uncached accesses |
|
with pytest.raises(ValueError, match="S3 .* finalized"): |
|
resolve_s3_region('mf-nwp-models') |
|
with pytest.raises(ValueError, match="S3 .* finalized"): |
|
resolve_s3_region('voltrondata-labs-datasets') |
|
""" |
|
subprocess.check_call([sys.executable, "-c", code]) |
|
|
|
|
|
@pytest.mark.processes |
|
@pytest.mark.threading |
|
@pytest.mark.s3 |
|
def test_concurrent_s3fs_init(): |
|
|
|
code = """if 1: |
|
import threading |
|
import pytest |
|
from pyarrow.fs import (FileSystem, S3FileSystem, |
|
ensure_s3_initialized, finalize_s3) |
|
threads = [] |
|
fn = lambda: FileSystem.from_uri('s3://mf-nwp-models/README.txt') |
|
for i in range(4): |
|
thread = threading.Thread(target = fn) |
|
threads.append(thread) |
|
thread.start() |
|
|
|
for thread in threads: |
|
thread.join() |
|
|
|
finalize_s3() |
|
""" |
|
subprocess.check_call([sys.executable, "-c", code]) |
|
|
|
|
|
@pytest.mark.s3 |
|
@pytest.mark.skipif(running_on_musllinux(), reason="Leaking S3ClientFinalizer causes " |
|
"segfault on musl based systems") |
|
def test_uwsgi_integration(): |
|
|
|
try: |
|
subprocess.check_call(["uwsgi", "--version"]) |
|
except FileNotFoundError: |
|
pytest.skip("uwsgi not installed on this Python") |
|
|
|
port = find_free_port() |
|
args = ["uwsgi", "-i", "--http", f"127.0.0.1:{port}", |
|
"--wsgi-file", os.path.join(here, "wsgi_examples.py")] |
|
proc = subprocess.Popen(args, stdin=subprocess.DEVNULL) |
|
|
|
try: |
|
url = f"http://127.0.0.1:{port}/s3/" |
|
start_time = time.time() |
|
error = None |
|
while time.time() < start_time + 5: |
|
try: |
|
with urlopen(url) as resp: |
|
assert resp.status == 200 |
|
break |
|
except OSError as e: |
|
error = e |
|
time.sleep(0.1) |
|
else: |
|
pytest.fail(f"Could not fetch {url!r}: {error}") |
|
finally: |
|
proc.terminate() |
|
|
|
assert proc.wait() == 30 |
|
|