|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
UNTESTED: |
|
read_message |
|
""" |
|
|
|
import sys |
|
import sysconfig |
|
|
|
import pytest |
|
|
|
import pyarrow as pa |
|
try: |
|
import numpy as np |
|
except ImportError: |
|
pytestmark = pytest.mark.numpy |
|
|
|
|
|
cuda = pytest.importorskip("pyarrow.cuda") |
|
|
|
platform = sysconfig.get_platform() |
|
|
|
has_ipc_support = platform == 'linux-x86_64' |
|
|
|
cuda_ipc = pytest.mark.skipif( |
|
not has_ipc_support, |
|
reason='CUDA IPC not supported in platform `%s`' % (platform)) |
|
|
|
global_context = None |
|
global_context1 = None |
|
|
|
|
|
def setup_module(module): |
|
module.global_context = cuda.Context(0) |
|
module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1) |
|
|
|
|
|
def teardown_module(module): |
|
del module.global_context |
|
|
|
|
|
def test_Context(): |
|
assert cuda.Context.get_num_devices() > 0 |
|
assert global_context.device_number == 0 |
|
assert global_context1.device_number == cuda.Context.get_num_devices() - 1 |
|
|
|
mm = global_context.memory_manager |
|
assert not mm.is_cpu |
|
assert "<pyarrow.MemoryManager device: CudaDevice" in repr(mm) |
|
|
|
dev = global_context.device |
|
assert dev == mm.device |
|
|
|
assert not dev.is_cpu |
|
assert dev.device_id == 0 |
|
assert dev.device_type == pa.DeviceAllocationType.CUDA |
|
|
|
with pytest.raises(ValueError, |
|
match=("device_number argument must " |
|
"be non-negative less than")): |
|
cuda.Context(cuda.Context.get_num_devices()) |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_manage_allocate_free_host(size): |
|
buf = cuda.new_host_buffer(size) |
|
arr = np.frombuffer(buf, dtype=np.uint8) |
|
arr[size//4:3*size//4] = 1 |
|
arr_cp = arr.copy() |
|
arr2 = np.frombuffer(buf, dtype=np.uint8) |
|
np.testing.assert_equal(arr2, arr_cp) |
|
assert buf.size == size |
|
|
|
|
|
def test_context_allocate_del(): |
|
bytes_allocated = global_context.bytes_allocated |
|
cudabuf = global_context.new_buffer(128) |
|
assert global_context.bytes_allocated == bytes_allocated + 128 |
|
del cudabuf |
|
assert global_context.bytes_allocated == bytes_allocated |
|
|
|
|
|
def make_random_buffer(size, target='host'): |
|
"""Return a host or device buffer with random data. |
|
""" |
|
if target == 'host': |
|
assert size >= 0 |
|
buf = pa.allocate_buffer(size) |
|
assert buf.size == size |
|
arr = np.frombuffer(buf, dtype=np.uint8) |
|
assert arr.size == size |
|
arr[:] = np.random.randint(low=1, high=255, size=size, dtype=np.uint8) |
|
assert arr.sum() > 0 or size == 0 |
|
arr_ = np.frombuffer(buf, dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr_) |
|
return arr, buf |
|
elif target == 'device': |
|
arr, buf = make_random_buffer(size, target='host') |
|
dbuf = global_context.new_buffer(size) |
|
assert dbuf.size == size |
|
dbuf.copy_from_host(buf, position=0, nbytes=size) |
|
return arr, dbuf |
|
raise ValueError('invalid target value') |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_context_device_buffer(size): |
|
|
|
arr, buf = make_random_buffer(size) |
|
cudabuf = global_context.buffer_from_data(buf) |
|
assert cudabuf.size == size |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
with pytest.raises(BufferError): |
|
memoryview(cudabuf) |
|
|
|
|
|
cudabuf = global_context.buffer_from_data(arr) |
|
assert cudabuf.size == size |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
cudabuf = global_context.buffer_from_data(arr.tobytes()) |
|
assert cudabuf.size == size |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
cudabuf2 = cudabuf.slice(0, cudabuf.size) |
|
assert cudabuf2.size == size |
|
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
if size > 1: |
|
cudabuf2.copy_from_host(arr[size//2:]) |
|
arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(np.concatenate((arr[size//2:], arr[size//2:])), |
|
arr3) |
|
cudabuf2.copy_from_host(arr[:size//2]) |
|
|
|
|
|
cudabuf2 = global_context.buffer_from_data(cudabuf) |
|
assert cudabuf2.size == size |
|
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
cudabuf2.copy_from_host(arr[size//2:]) |
|
arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr3) |
|
|
|
|
|
cudabuf2 = cudabuf.slice(0, cudabuf.size+10) |
|
assert cudabuf2.size == size |
|
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
cudabuf2 = cudabuf.slice(size//4, size+10) |
|
assert cudabuf2.size == size - size//4 |
|
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[size//4:], arr2) |
|
|
|
|
|
soffset = size//4 |
|
ssize = 2*size//4 |
|
cudabuf = global_context.buffer_from_data(buf, offset=soffset, |
|
size=ssize) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) |
|
|
|
cudabuf = global_context.buffer_from_data(buf.slice(offset=soffset, |
|
length=ssize)) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) |
|
|
|
|
|
cudabuf = global_context.buffer_from_data(arr, offset=soffset, size=ssize) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) |
|
|
|
cudabuf = global_context.buffer_from_data(arr[soffset:soffset+ssize]) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) |
|
|
|
|
|
cudabuf = global_context.buffer_from_data(arr.tobytes(), |
|
offset=soffset, |
|
size=ssize) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) |
|
|
|
|
|
cudabuf = global_context.new_buffer(size) |
|
assert cudabuf.size == size |
|
|
|
|
|
cudabuf = global_context.buffer_from_data(arr) |
|
cudabuf2 = cudabuf.slice(soffset, ssize) |
|
assert cudabuf2.size == ssize |
|
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2) |
|
|
|
|
|
|
|
buf = cuda.new_host_buffer(size) |
|
arr_ = np.frombuffer(buf, dtype=np.uint8) |
|
arr_[:] = arr |
|
cudabuf = global_context.buffer_from_data(buf) |
|
assert cudabuf.size == size |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
|
|
cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2) |
|
|
|
cudabuf = global_context.buffer_from_data( |
|
buf.slice(offset=soffset, length=ssize)) |
|
assert cudabuf.size == ssize |
|
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2) |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_context_from_object(size): |
|
ctx = global_context |
|
arr, cbuf = make_random_buffer(size, target='device') |
|
dtype = arr.dtype |
|
|
|
|
|
hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize) |
|
np.frombuffer(hbuf, dtype=dtype)[:] = arr |
|
cbuf2 = ctx.buffer_from_object(hbuf) |
|
assert cbuf2.size == cbuf.size |
|
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
cbuf2 = ctx.buffer_from_object(cbuf2) |
|
assert cbuf2.size == cbuf.size |
|
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
with pytest.raises(pa.ArrowTypeError, |
|
match=('buffer is not backed by a CudaBuffer')): |
|
ctx.buffer_from_object(pa.py_buffer(b"123")) |
|
|
|
|
|
with pytest.raises(pa.ArrowTypeError, |
|
match=("cannot create device buffer view from " |
|
".* \'numpy.ndarray\'")): |
|
ctx.buffer_from_object(np.array([1, 2, 3])) |
|
|
|
|
|
def test_foreign_buffer(): |
|
ctx = global_context |
|
dtype = np.dtype(np.uint8) |
|
size = 10 |
|
hbuf = cuda.new_host_buffer(size * dtype.itemsize) |
|
|
|
|
|
rc = sys.getrefcount(hbuf) |
|
fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf) |
|
assert sys.getrefcount(hbuf) == rc + 1 |
|
del fbuf |
|
assert sys.getrefcount(hbuf) == rc |
|
|
|
|
|
fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf) |
|
del hbuf |
|
fbuf.copy_to_host() |
|
|
|
|
|
hbuf = cuda.new_host_buffer(size * dtype.itemsize) |
|
fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size) |
|
del hbuf |
|
with pytest.raises(pa.ArrowIOError, |
|
match=('Cuda error ')): |
|
fbuf.copy_to_host() |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_CudaBuffer(size): |
|
arr, buf = make_random_buffer(size) |
|
assert arr.tobytes() == buf.to_pybytes() |
|
cbuf = global_context.buffer_from_data(buf) |
|
assert cbuf.size == size |
|
assert not cbuf.is_cpu |
|
assert arr.tobytes() == cbuf.to_pybytes() |
|
if size > 0: |
|
assert cbuf.address > 0 |
|
|
|
for i in range(size): |
|
assert cbuf[i] == arr[i] |
|
|
|
for s in [ |
|
slice(None), |
|
slice(size//4, size//2), |
|
]: |
|
assert cbuf[s].to_pybytes() == arr[s].tobytes() |
|
|
|
sbuf = cbuf.slice(size//4, size//2) |
|
assert sbuf.parent == cbuf |
|
|
|
with pytest.raises(TypeError, |
|
match="Do not call CudaBuffer's constructor directly"): |
|
cuda.CudaBuffer() |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_HostBuffer(size): |
|
arr, buf = make_random_buffer(size) |
|
assert arr.tobytes() == buf.to_pybytes() |
|
hbuf = cuda.new_host_buffer(size) |
|
np.frombuffer(hbuf, dtype=np.uint8)[:] = arr |
|
assert hbuf.size == size |
|
assert hbuf.is_cpu |
|
assert arr.tobytes() == hbuf.to_pybytes() |
|
for i in range(size): |
|
assert hbuf[i] == arr[i] |
|
for s in [ |
|
slice(None), |
|
slice(size//4, size//2), |
|
]: |
|
assert hbuf[s].to_pybytes() == arr[s].tobytes() |
|
|
|
sbuf = hbuf.slice(size//4, size//2) |
|
assert sbuf.parent == hbuf |
|
|
|
del hbuf |
|
|
|
with pytest.raises(TypeError, |
|
match="Do not call HostBuffer's constructor directly"): |
|
cuda.HostBuffer() |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_copy_from_to_host(size): |
|
|
|
dt = np.dtype('uint16') |
|
nbytes = size * dt.itemsize |
|
buf = pa.allocate_buffer(nbytes, resizable=True) |
|
assert isinstance(buf, pa.Buffer) |
|
assert not isinstance(buf, cuda.CudaBuffer) |
|
arr = np.frombuffer(buf, dtype=dt) |
|
assert arr.size == size |
|
arr[:] = range(size) |
|
arr_ = np.frombuffer(buf, dtype=dt) |
|
np.testing.assert_equal(arr, arr_) |
|
|
|
|
|
device_buffer = global_context.new_buffer(nbytes) |
|
assert isinstance(device_buffer, cuda.CudaBuffer) |
|
assert isinstance(device_buffer, pa.Buffer) |
|
assert device_buffer.size == nbytes |
|
assert not device_buffer.is_cpu |
|
device_buffer.copy_from_host(buf, position=0, nbytes=nbytes) |
|
|
|
|
|
buf2 = device_buffer.copy_to_host(position=0, nbytes=nbytes) |
|
arr2 = np.frombuffer(buf2, dtype=dt) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_copy_to_host(size): |
|
arr, dbuf = make_random_buffer(size, target='device') |
|
|
|
buf = dbuf.copy_to_host() |
|
assert buf.is_cpu |
|
np.testing.assert_equal(arr, np.frombuffer(buf, dtype=np.uint8)) |
|
|
|
buf = dbuf.copy_to_host(position=size//4) |
|
assert buf.is_cpu |
|
np.testing.assert_equal(arr[size//4:], np.frombuffer(buf, dtype=np.uint8)) |
|
|
|
buf = dbuf.copy_to_host(position=size//4, nbytes=size//8) |
|
assert buf.is_cpu |
|
np.testing.assert_equal(arr[size//4:size//4+size//8], |
|
np.frombuffer(buf, dtype=np.uint8)) |
|
|
|
buf = dbuf.copy_to_host(position=size//4, nbytes=0) |
|
assert buf.is_cpu |
|
assert buf.size == 0 |
|
|
|
for (position, nbytes) in [ |
|
(size+2, -1), (-2, -1), (size+1, 0), (-3, 0), |
|
]: |
|
with pytest.raises(ValueError, |
|
match='position argument is out-of-range'): |
|
dbuf.copy_to_host(position=position, nbytes=nbytes) |
|
|
|
for (position, nbytes) in [ |
|
(0, size+1), (size//2, (size+1)//2+1), (size, 1) |
|
]: |
|
with pytest.raises(ValueError, |
|
match=('requested more to copy than' |
|
' available from device buffer')): |
|
dbuf.copy_to_host(position=position, nbytes=nbytes) |
|
|
|
buf = pa.allocate_buffer(size//4) |
|
dbuf.copy_to_host(buf=buf) |
|
np.testing.assert_equal(arr[:size//4], np.frombuffer(buf, dtype=np.uint8)) |
|
|
|
if size < 12: |
|
return |
|
|
|
dbuf.copy_to_host(buf=buf, position=12) |
|
np.testing.assert_equal(arr[12:12+size//4], |
|
np.frombuffer(buf, dtype=np.uint8)) |
|
|
|
dbuf.copy_to_host(buf=buf, nbytes=12) |
|
np.testing.assert_equal(arr[:12], np.frombuffer(buf, dtype=np.uint8)[:12]) |
|
|
|
dbuf.copy_to_host(buf=buf, nbytes=12, position=6) |
|
np.testing.assert_equal(arr[6:6+12], |
|
np.frombuffer(buf, dtype=np.uint8)[:12]) |
|
|
|
for (position, nbytes) in [ |
|
(0, size+10), (10, size-5), |
|
(0, size//2), (size//4, size//4+1) |
|
]: |
|
with pytest.raises(ValueError, |
|
match=('requested copy does not ' |
|
'fit into host buffer')): |
|
dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes) |
|
|
|
|
|
@pytest.mark.parametrize("dest_ctx", ['same', 'another']) |
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_copy_from_device(dest_ctx, size): |
|
arr, buf = make_random_buffer(size=size, target='device') |
|
lst = arr.tolist() |
|
if dest_ctx == 'another': |
|
dest_ctx = global_context1 |
|
if buf.context.device_number == dest_ctx.device_number: |
|
pytest.skip("not a multi-GPU system") |
|
else: |
|
dest_ctx = buf.context |
|
dbuf = dest_ctx.new_buffer(size) |
|
|
|
def put(*args, **kwargs): |
|
dbuf.copy_from_device(buf, *args, **kwargs) |
|
rbuf = dbuf.copy_to_host() |
|
return np.frombuffer(rbuf, dtype=np.uint8).tolist() |
|
assert put() == lst |
|
if size > 4: |
|
assert put(position=size//4) == lst[:size//4]+lst[:-size//4] |
|
assert put() == lst |
|
assert put(position=1, nbytes=size//2) == \ |
|
lst[:1] + lst[:size//2] + lst[-(size-size//2-1):] |
|
|
|
for (position, nbytes) in [ |
|
(size+2, -1), (-2, -1), (size+1, 0), (-3, 0), |
|
]: |
|
with pytest.raises(ValueError, |
|
match='position argument is out-of-range'): |
|
put(position=position, nbytes=nbytes) |
|
|
|
for (position, nbytes) in [ |
|
(0, size+1), |
|
]: |
|
with pytest.raises(ValueError, |
|
match=('requested more to copy than' |
|
' available from device buffer')): |
|
put(position=position, nbytes=nbytes) |
|
|
|
if size < 4: |
|
return |
|
|
|
for (position, nbytes) in [ |
|
(size//2, (size+1)//2+1) |
|
]: |
|
with pytest.raises(ValueError, |
|
match=('requested more to copy than' |
|
' available in device buffer')): |
|
put(position=position, nbytes=nbytes) |
|
|
|
|
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_copy_from_host(size): |
|
arr, buf = make_random_buffer(size=size, target='host') |
|
lst = arr.tolist() |
|
dbuf = global_context.new_buffer(size) |
|
|
|
def put(*args, **kwargs): |
|
dbuf.copy_from_host(buf, *args, **kwargs) |
|
rbuf = dbuf.copy_to_host() |
|
return np.frombuffer(rbuf, dtype=np.uint8).tolist() |
|
assert put() == lst |
|
if size > 4: |
|
assert put(position=size//4) == lst[:size//4]+lst[:-size//4] |
|
assert put() == lst |
|
assert put(position=1, nbytes=size//2) == \ |
|
lst[:1] + lst[:size//2] + lst[-(size-size//2-1):] |
|
|
|
for (position, nbytes) in [ |
|
(size+2, -1), (-2, -1), (size+1, 0), (-3, 0), |
|
]: |
|
with pytest.raises(ValueError, |
|
match='position argument is out-of-range'): |
|
put(position=position, nbytes=nbytes) |
|
|
|
for (position, nbytes) in [ |
|
(0, size+1), |
|
]: |
|
with pytest.raises(ValueError, |
|
match=('requested more to copy than' |
|
' available from host buffer')): |
|
put(position=position, nbytes=nbytes) |
|
|
|
if size < 4: |
|
return |
|
|
|
for (position, nbytes) in [ |
|
(size//2, (size+1)//2+1) |
|
]: |
|
with pytest.raises(ValueError, |
|
match=('requested more to copy than' |
|
' available in device buffer')): |
|
put(position=position, nbytes=nbytes) |
|
|
|
|
|
def test_buffer_device(): |
|
buf = cuda.new_host_buffer(10) |
|
assert buf.device_type == pa.DeviceAllocationType.CUDA_HOST |
|
assert isinstance(buf.device, pa.Device) |
|
assert isinstance(buf.memory_manager, pa.MemoryManager) |
|
assert buf.is_cpu |
|
assert buf.device.is_cpu |
|
assert buf.device == pa.default_cpu_memory_manager().device |
|
|
|
|
|
assert buf.memory_manager.is_cpu |
|
|
|
_, buf = make_random_buffer(size=10, target='device') |
|
assert buf.device_type == pa.DeviceAllocationType.CUDA |
|
assert isinstance(buf.device, pa.Device) |
|
assert buf.device == global_context.memory_manager.device |
|
assert isinstance(buf.memory_manager, pa.MemoryManager) |
|
assert not buf.is_cpu |
|
assert not buf.device.is_cpu |
|
assert not buf.memory_manager.is_cpu |
|
|
|
|
|
def test_BufferWriter(): |
|
def allocate(size): |
|
cbuf = global_context.new_buffer(size) |
|
writer = cuda.BufferWriter(cbuf) |
|
return cbuf, writer |
|
|
|
def test_writes(total_size, chunksize, buffer_size=0): |
|
cbuf, writer = allocate(total_size) |
|
arr, buf = make_random_buffer(size=total_size, target='host') |
|
|
|
if buffer_size > 0: |
|
writer.buffer_size = buffer_size |
|
|
|
position = writer.tell() |
|
assert position == 0 |
|
writer.write(buf.slice(length=chunksize)) |
|
assert writer.tell() == chunksize |
|
writer.seek(0) |
|
position = writer.tell() |
|
assert position == 0 |
|
|
|
while position < total_size: |
|
bytes_to_write = min(chunksize, total_size - position) |
|
writer.write(buf.slice(offset=position, length=bytes_to_write)) |
|
position += bytes_to_write |
|
|
|
writer.flush() |
|
assert cbuf.size == total_size |
|
cbuf.context.synchronize() |
|
buf2 = cbuf.copy_to_host() |
|
cbuf.context.synchronize() |
|
assert buf2.size == total_size |
|
arr2 = np.frombuffer(buf2, dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
total_size, chunk_size = 1 << 16, 1000 |
|
test_writes(total_size, chunk_size) |
|
test_writes(total_size, chunk_size, total_size // 16) |
|
|
|
cbuf, writer = allocate(100) |
|
writer.write(np.arange(100, dtype=np.uint8)) |
|
writer.writeat(50, np.arange(25, dtype=np.uint8)) |
|
writer.write(np.arange(25, dtype=np.uint8)) |
|
writer.flush() |
|
|
|
arr = np.frombuffer(cbuf.copy_to_host(), np.uint8) |
|
np.testing.assert_equal(arr[:50], np.arange(50, dtype=np.uint8)) |
|
np.testing.assert_equal(arr[50:75], np.arange(25, dtype=np.uint8)) |
|
np.testing.assert_equal(arr[75:], np.arange(25, dtype=np.uint8)) |
|
|
|
|
|
def test_BufferWriter_edge_cases(): |
|
|
|
size = 1000 |
|
cbuf = global_context.new_buffer(size) |
|
writer = cuda.BufferWriter(cbuf) |
|
arr, buf = make_random_buffer(size=size, target='host') |
|
|
|
assert writer.buffer_size == 0 |
|
writer.buffer_size = 100 |
|
assert writer.buffer_size == 100 |
|
|
|
writer.write(buf.slice(length=0)) |
|
assert writer.tell() == 0 |
|
|
|
writer.write(buf.slice(length=10)) |
|
writer.buffer_size = 200 |
|
assert writer.buffer_size == 200 |
|
assert writer.num_bytes_buffered == 0 |
|
|
|
writer.write(buf.slice(offset=10, length=300)) |
|
assert writer.num_bytes_buffered == 0 |
|
|
|
writer.write(buf.slice(offset=310, length=200)) |
|
assert writer.num_bytes_buffered == 0 |
|
|
|
writer.write(buf.slice(offset=510, length=390)) |
|
writer.write(buf.slice(offset=900, length=100)) |
|
|
|
writer.flush() |
|
|
|
buf2 = cbuf.copy_to_host() |
|
assert buf2.size == size |
|
arr2 = np.frombuffer(buf2, dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
def test_BufferReader(): |
|
size = 1000 |
|
arr, cbuf = make_random_buffer(size=size, target='device') |
|
|
|
reader = cuda.BufferReader(cbuf) |
|
reader.seek(950) |
|
assert reader.tell() == 950 |
|
|
|
data = reader.read(100) |
|
assert len(data) == 50 |
|
assert reader.tell() == 1000 |
|
|
|
reader.seek(925) |
|
arr2 = np.zeros(100, dtype=np.uint8) |
|
n = reader.readinto(arr2) |
|
assert n == 75 |
|
assert reader.tell() == 1000 |
|
np.testing.assert_equal(arr[925:], arr2[:75]) |
|
|
|
reader.seek(0) |
|
assert reader.tell() == 0 |
|
buf2 = reader.read_buffer() |
|
arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
def test_BufferReader_zero_size(): |
|
arr, cbuf = make_random_buffer(size=0, target='device') |
|
reader = cuda.BufferReader(cbuf) |
|
reader.seek(0) |
|
data = reader.read() |
|
assert len(data) == 0 |
|
assert reader.tell() == 0 |
|
buf2 = reader.read_buffer() |
|
arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8) |
|
np.testing.assert_equal(arr, arr2) |
|
|
|
|
|
def make_recordbatch(length): |
|
schema = pa.schema([pa.field('f0', pa.int16()), |
|
pa.field('f1', pa.int16())]) |
|
a0 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16)) |
|
a1 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16)) |
|
batch = pa.record_batch([a0, a1], schema=schema) |
|
return batch |
|
|
|
|
|
def test_batch_serialize(): |
|
batch = make_recordbatch(10) |
|
hbuf = batch.serialize() |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
|
|
|
|
cbatch = cuda.read_record_batch(cbuf, batch.schema) |
|
assert isinstance(cbatch, pa.RecordBatch) |
|
assert batch.schema == cbatch.schema |
|
assert batch.num_columns == cbatch.num_columns |
|
assert batch.num_rows == cbatch.num_rows |
|
|
|
|
|
buf = cbuf.copy_to_host() |
|
assert hbuf.equals(buf) |
|
batch2 = pa.ipc.read_record_batch(buf, batch.schema) |
|
assert hbuf.equals(batch2.serialize()) |
|
|
|
assert batch.num_columns == batch2.num_columns |
|
assert batch.num_rows == batch2.num_rows |
|
assert batch.column(0).equals(batch2.column(0)) |
|
assert batch.equals(batch2) |
|
|
|
|
|
def make_table(): |
|
a0 = pa.array([0, 1, 42, None], type=pa.int16()) |
|
a1 = pa.array([[0, 1], [2], [], None], type=pa.list_(pa.int32())) |
|
a2 = pa.array([("ab", True), ("cde", False), (None, None), None], |
|
type=pa.struct([("strs", pa.utf8()), |
|
("bools", pa.bool_())])) |
|
|
|
|
|
a3 = pa.DictionaryArray.from_arrays( |
|
indices=[0, 1, 1, None], |
|
dictionary=pa.array(['foo', 'bar'])) |
|
a4 = pa.DictionaryArray.from_arrays( |
|
indices=[2, 1, 2, None], |
|
dictionary=a1) |
|
a5 = pa.DictionaryArray.from_arrays( |
|
indices=[2, 1, 0, None], |
|
dictionary=a2) |
|
|
|
arrays = [a0, a1, a2, a3, a4, a5] |
|
schema = pa.schema([('f{}'.format(i), arr.type) |
|
for i, arr in enumerate(arrays)]) |
|
batch = pa.record_batch(arrays, schema=schema) |
|
table = pa.Table.from_batches([batch]) |
|
return table |
|
|
|
|
|
def make_table_cuda(): |
|
htable = make_table() |
|
|
|
sink = pa.BufferOutputStream() |
|
with pa.ipc.new_stream(sink, htable.schema) as out: |
|
out.write_table(htable) |
|
hbuf = pa.py_buffer(sink.getvalue().to_pybytes()) |
|
|
|
|
|
dbuf = global_context.new_buffer(len(hbuf)) |
|
dbuf.copy_from_host(hbuf, nbytes=len(hbuf)) |
|
|
|
dtable = pa.ipc.open_stream(cuda.BufferReader(dbuf)).read_all() |
|
return hbuf, htable, dbuf, dtable |
|
|
|
|
|
def test_table_deserialize(): |
|
|
|
|
|
hbuf, htable, dbuf, dtable = make_table_cuda() |
|
|
|
assert htable.schema == dtable.schema |
|
assert htable.num_rows == dtable.num_rows |
|
assert htable.num_columns == dtable.num_columns |
|
|
|
assert hbuf.equals(dbuf.copy_to_host()) |
|
|
|
assert htable.equals(pa.ipc.open_stream( |
|
dbuf.copy_to_host() |
|
).read_all()) |
|
|
|
|
|
def test_create_table_with_device_buffers(): |
|
|
|
|
|
hbuf, htable, dbuf, dtable = make_table_cuda() |
|
|
|
dtable2 = pa.Table.from_arrays(dtable.columns, dtable.column_names) |
|
|
|
assert htable.schema == dtable2.schema |
|
assert htable.num_rows == dtable2.num_rows |
|
assert htable.num_columns == dtable2.num_columns |
|
|
|
assert hbuf.equals(dbuf.copy_to_host()) |
|
|
|
assert htable.equals(pa.ipc.open_stream( |
|
dbuf.copy_to_host() |
|
).read_all()) |
|
|
|
|
|
def other_process_for_test_IPC(handle_buffer, expected_arr): |
|
other_context = pa.cuda.Context(0) |
|
ipc_handle = pa.cuda.IpcMemHandle.from_buffer(handle_buffer) |
|
ipc_buf = other_context.open_ipc_buffer(ipc_handle) |
|
ipc_buf.context.synchronize() |
|
buf = ipc_buf.copy_to_host() |
|
assert buf.size == expected_arr.size, repr((buf.size, expected_arr.size)) |
|
arr = np.frombuffer(buf, dtype=expected_arr.dtype) |
|
np.testing.assert_equal(arr, expected_arr) |
|
|
|
|
|
@cuda_ipc |
|
@pytest.mark.parametrize("size", [0, 1, 1000]) |
|
def test_IPC(size): |
|
import multiprocessing |
|
ctx = multiprocessing.get_context('spawn') |
|
arr, cbuf = make_random_buffer(size=size, target='device') |
|
ipc_handle = cbuf.export_for_ipc() |
|
handle_buffer = ipc_handle.serialize() |
|
p = ctx.Process(target=other_process_for_test_IPC, |
|
args=(handle_buffer, arr)) |
|
p.start() |
|
p.join() |
|
assert p.exitcode == 0 |
|
|
|
|
|
def test_copy_to(): |
|
_, buf = make_random_buffer(size=10, target='device') |
|
mm_cuda = buf.memory_manager |
|
|
|
for dest in [mm_cuda, mm_cuda.device]: |
|
arr = pa.array([0, 1, 2]) |
|
arr_cuda = arr.copy_to(dest) |
|
assert not arr_cuda.buffers()[1].is_cpu |
|
assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA |
|
assert arr_cuda.buffers()[1].device == mm_cuda.device |
|
|
|
arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager()) |
|
assert arr_roundtrip.equals(arr) |
|
|
|
batch = pa.record_batch({"col": arr}) |
|
batch_cuda = batch.copy_to(dest) |
|
buf_cuda = batch_cuda["col"].buffers()[1] |
|
assert not buf_cuda.is_cpu |
|
assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA |
|
assert buf_cuda.device == mm_cuda.device |
|
|
|
batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager()) |
|
assert batch_roundtrip.equals(batch) |
|
|
|
|
|
def test_device_interface_array(): |
|
cffi = pytest.importorskip("pyarrow.cffi") |
|
ffi = cffi.ffi |
|
|
|
c_schema = ffi.new("struct ArrowSchema*") |
|
ptr_schema = int(ffi.cast("uintptr_t", c_schema)) |
|
c_array = ffi.new("struct ArrowDeviceArray*") |
|
ptr_array = int(ffi.cast("uintptr_t", c_array)) |
|
|
|
typ = pa.list_(pa.int32()) |
|
arr = pa.array([[1], [2, 42]], type=typ) |
|
|
|
|
|
_, buf = make_random_buffer(size=10, target='device') |
|
mm_cuda = buf.memory_manager |
|
carr = arr.copy_to(mm_cuda) |
|
|
|
|
|
carr._export_to_c_device(ptr_array) |
|
|
|
|
|
assert c_array.device_type == 2 |
|
assert c_array.device_id == global_context.device_number |
|
assert c_array.array.length == 2 |
|
|
|
|
|
del carr |
|
carr_new = pa.Array._import_from_c_device(ptr_array, typ) |
|
assert carr_new.type == pa.list_(pa.int32()) |
|
arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) |
|
assert arr_new.equals(arr) |
|
|
|
del carr_new |
|
|
|
with pytest.raises(ValueError, match="Cannot import released ArrowArray"): |
|
pa.Array._import_from_c_device(ptr_array, typ) |
|
|
|
|
|
carr = arr.copy_to(mm_cuda) |
|
carr._export_to_c_device(ptr_array, ptr_schema) |
|
|
|
del carr |
|
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema) |
|
assert carr_new.type == pa.list_(pa.int32()) |
|
arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) |
|
assert arr_new.equals(arr) |
|
|
|
del carr_new |
|
|
|
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): |
|
pa.Array._import_from_c_device(ptr_array, ptr_schema) |
|
|
|
|
|
def test_device_interface_batch_array(): |
|
cffi = pytest.importorskip("pyarrow.cffi") |
|
ffi = cffi.ffi |
|
|
|
c_schema = ffi.new("struct ArrowSchema*") |
|
ptr_schema = int(ffi.cast("uintptr_t", c_schema)) |
|
c_array = ffi.new("struct ArrowDeviceArray*") |
|
ptr_array = int(ffi.cast("uintptr_t", c_array)) |
|
|
|
batch = make_recordbatch(10) |
|
schema = batch.schema |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
cbatch = cuda.read_record_batch(cbuf, schema) |
|
|
|
|
|
cbatch._export_to_c_device(ptr_array) |
|
|
|
|
|
assert c_array.device_type == 2 |
|
assert c_array.device_id == global_context.device_number |
|
assert c_array.array.length == 10 |
|
|
|
|
|
del cbatch |
|
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) |
|
assert cbatch_new.schema == schema |
|
batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) |
|
assert batch_new.equals(batch) |
|
|
|
del cbatch_new |
|
|
|
with pytest.raises(ValueError, match="Cannot import released ArrowArray"): |
|
pa.RecordBatch._import_from_c_device(ptr_array, schema) |
|
|
|
|
|
cbatch = cuda.read_record_batch(cbuf, schema) |
|
cbatch._export_to_c_device(ptr_array, ptr_schema) |
|
|
|
del cbatch |
|
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) |
|
assert cbatch_new.schema == schema |
|
batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) |
|
assert batch_new.equals(batch) |
|
|
|
del cbatch_new |
|
|
|
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): |
|
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) |
|
|
|
|
|
pa.int32()._export_to_c(ptr_schema) |
|
with pytest.raises(ValueError, |
|
match="ArrowSchema describes non-struct type"): |
|
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) |
|
|
|
|
|
def test_print_array(): |
|
batch = make_recordbatch(10) |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
cbatch = cuda.read_record_batch(cbuf, batch.schema) |
|
arr = batch["f0"] |
|
carr = cbatch["f0"] |
|
assert str(carr) == str(arr) |
|
|
|
batch = make_recordbatch(100) |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
cbatch = cuda.read_record_batch(cbuf, batch.schema) |
|
arr = batch["f0"] |
|
carr = cbatch["f0"] |
|
assert str(carr) == str(arr) |
|
|
|
|
|
@pytest.mark.parametrize("size", [10, 100]) |
|
def test_print_array_host(size): |
|
buf = cuda.new_host_buffer(size*8) |
|
np_arr = np.frombuffer(buf, dtype=np.int64) |
|
np_arr[:] = range(size) |
|
|
|
arr = pa.array(range(size), pa.int64()) |
|
carr = pa.Array.from_buffers(pa.int64(), size, [None, buf]) |
|
assert str(carr) == str(arr) |
|
|
|
|
|
def make_chunked_array(n_elements_per_chunk, n_chunks): |
|
arrs = [] |
|
carrs = [] |
|
for _ in range(n_chunks): |
|
batch = make_recordbatch(n_elements_per_chunk) |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
cbatch = cuda.read_record_batch(cbuf, batch.schema) |
|
arrs.append(batch["f0"]) |
|
carrs.append(cbatch["f0"]) |
|
|
|
return pa.chunked_array(arrs), pa.chunked_array(carrs) |
|
|
|
|
|
def test_print_chunked_array(): |
|
arr, carr = make_chunked_array(10, 3) |
|
assert str(carr) == str(arr) |
|
|
|
arr, carr = make_chunked_array(100, 20) |
|
assert str(carr) == str(arr) |
|
|
|
|
|
def test_print_record_batch(): |
|
batch = make_recordbatch(10) |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
cbatch = cuda.read_record_batch(cbuf, batch.schema) |
|
assert str(cbatch) == str(batch) |
|
|
|
batch = make_recordbatch(100) |
|
cbuf = cuda.serialize_record_batch(batch, global_context) |
|
cbatch = cuda.read_record_batch(cbuf, batch.schema) |
|
assert str(cbatch) == str(batch) |
|
|
|
|
|
def test_print_table(): |
|
_, table, _, ctable = make_table_cuda() |
|
assert str(ctable) == str(table) |
|
|