# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import contextlib import os import signal import subprocess import sys import weakref import pyarrow as pa from pyarrow.tests import util import pytest pytestmark = pytest.mark.processes possible_backends = ["system", "jemalloc", "mimalloc"] # Backends which are expected to be present in all builds of PyArrow, # except if the user manually recompiled Arrow C++. mandatory_backends = ["system", "mimalloc"] def backend_factory(backend_name): return getattr(pa, f"{backend_name}_memory_pool") def supported_factories(): yield pa.default_memory_pool for backend_name in pa.supported_memory_backends(): yield backend_factory(backend_name) @contextlib.contextmanager def allocate_bytes(pool, nbytes): """ Temporarily allocate *nbytes* from the given *pool*. """ arr = pa.array([b"x" * nbytes], type=pa.binary(), memory_pool=pool) # Fetch the values buffer from the varbinary array and release the rest, # to get the desired allocation amount buf = arr.buffers()[2] arr = None assert len(buf) == nbytes try: yield finally: buf = None def check_allocated_bytes(pool): """ Check allocation stats on *pool*. """ allocated_before = pool.bytes_allocated() max_mem_before = pool.max_memory() num_allocations_before = pool.num_allocations() with allocate_bytes(pool, 512): assert pool.bytes_allocated() == allocated_before + 512 new_max_memory = pool.max_memory() assert pool.max_memory() >= max_mem_before num_allocations_after = pool.num_allocations() assert num_allocations_after > num_allocations_before assert num_allocations_after < num_allocations_before + 5 assert pool.bytes_allocated() == allocated_before assert pool.max_memory() == new_max_memory assert pool.num_allocations() == num_allocations_after def test_default_allocated_bytes(): pool = pa.default_memory_pool() with allocate_bytes(pool, 1024): check_allocated_bytes(pool) assert pool.bytes_allocated() == pa.total_allocated_bytes() def test_proxy_memory_pool(): pool = pa.proxy_memory_pool(pa.default_memory_pool()) check_allocated_bytes(pool) wr = weakref.ref(pool) assert wr() is not None del pool assert wr() is None def test_logging_memory_pool(capfd): pool = pa.logging_memory_pool(pa.default_memory_pool()) check_allocated_bytes(pool) out, err = capfd.readouterr() assert err == "" assert out.count("Allocate:") > 0 assert out.count("Allocate:") == out.count("Free:") def test_set_memory_pool(): old_pool = pa.default_memory_pool() pool = pa.proxy_memory_pool(old_pool) pa.set_memory_pool(pool) try: allocated_before = pool.bytes_allocated() with allocate_bytes(None, 512): assert pool.bytes_allocated() == allocated_before + 512 assert pool.bytes_allocated() == allocated_before finally: pa.set_memory_pool(old_pool) def test_default_backend_name(): pool = pa.default_memory_pool() assert pool.backend_name in possible_backends def test_release_unused(): pool = pa.default_memory_pool() pool.release_unused() def check_env_var(name, expected, *, expect_warning=False): code = f"""if 1: import pyarrow as pa pool = pa.default_memory_pool() assert pool.backend_name in {expected!r}, pool.backend_name """ env = dict(os.environ) env['ARROW_DEFAULT_MEMORY_POOL'] = name res = subprocess.run([sys.executable, "-c", code], env=env, universal_newlines=True, stderr=subprocess.PIPE) if res.returncode != 0: print(res.stderr, file=sys.stderr) res.check_returncode() # fail errlines = res.stderr.splitlines() if expect_warning: assert len(errlines) in (1, 2) if len(errlines) == 1: # ARROW_USE_GLOG=OFF assert f"Unsupported backend '{name}'" in errlines[0] else: # ARROW_USE_GLOG=ON assert "InitGoogleLogging()" in errlines[0] assert f"Unsupported backend '{name}'" in errlines[1] else: assert len(errlines) == 0 def test_env_var(): for backend_name in mandatory_backends: check_env_var(backend_name, [backend_name]) check_env_var("nonexistent", possible_backends, expect_warning=True) def test_memory_pool_factories(): def check(factory, name, *, can_fail=False): if can_fail: try: pool = factory() except NotImplementedError: return else: pool = factory() assert pool.backend_name == name for backend_name in possible_backends: check(backend_factory(backend_name), backend_name, can_fail=backend_name not in mandatory_backends) def test_supported_memory_backends(): backends = pa.supported_memory_backends() assert set(backends) >= set(mandatory_backends) assert set(backends) <= set(possible_backends) def run_debug_memory_pool(pool_factory, env_value): """ Run a piece of code making an invalid memory write with the ARROW_DEBUG_MEMORY_POOL environment variable set to a specific value. """ code = f"""if 1: import ctypes import pyarrow as pa # ARROW-16873: some Python installs enable faulthandler by default, # which could dump a spurious stack trace if the following crashes import faulthandler faulthandler.disable() pool = pa.{pool_factory}() buf = pa.allocate_buffer(64, memory_pool=pool) # Write memory out of bounds ptr = ctypes.cast(buf.address, ctypes.POINTER(ctypes.c_ubyte)) ptr[64] = 0 del buf """ env = dict(os.environ) env['ARROW_DEBUG_MEMORY_POOL'] = env_value res = subprocess.run([sys.executable, "-c", code], env=env, universal_newlines=True, stderr=subprocess.PIPE) print(res.stderr, file=sys.stderr) return res @pytest.mark.parametrize('pool_factory', supported_factories()) def test_debug_memory_pool_abort(pool_factory): res = run_debug_memory_pool(pool_factory.__name__, "abort") if os.name == "posix": assert res.returncode == -signal.SIGABRT else: assert res.returncode != 0 assert "Wrong size on deallocation" in res.stderr @pytest.mark.parametrize('pool_factory', supported_factories()) def test_debug_memory_pool_trap(pool_factory): res = run_debug_memory_pool(pool_factory.__name__, "trap") if os.name == "posix": assert res.returncode == -signal.SIGTRAP else: assert res.returncode != 0 assert "Wrong size on deallocation" in res.stderr @pytest.mark.parametrize('pool_factory', supported_factories()) def test_debug_memory_pool_warn(pool_factory): res = run_debug_memory_pool(pool_factory.__name__, "warn") res.check_returncode() assert "Wrong size on deallocation" in res.stderr def check_debug_memory_pool_disabled(pool_factory, env_value, msg): if sys.maxsize < 2**32: # GH-45011: mimalloc may print warnings in this test on 32-bit Linux, ignore. pytest.skip("Test may fail on 32-bit platforms") res = run_debug_memory_pool(pool_factory.__name__, env_value) # The subprocess either returned successfully or was killed by a signal # (due to writing out of bounds), depending on the underlying allocator. if os.name == "posix": assert res.returncode <= 0 else: res.check_returncode() if msg == "": assert res.stderr == "" else: assert msg in res.stderr @pytest.mark.parametrize('pool_factory', supported_factories()) def test_debug_memory_pool_none(pool_factory): check_debug_memory_pool_disabled(pool_factory, "none", "") @pytest.mark.parametrize('pool_factory', supported_factories()) def test_debug_memory_pool_empty(pool_factory): check_debug_memory_pool_disabled(pool_factory, "", "") @pytest.mark.parametrize('pool_factory', supported_factories()) def test_debug_memory_pool_unknown(pool_factory): env_value = "some_arbitrary_value" msg = ( f"Invalid value for ARROW_DEBUG_MEMORY_POOL: '{env_value}'. " "Valid values are 'abort', 'trap', 'warn', 'none'." ) check_debug_memory_pool_disabled(pool_factory, env_value, msg) @pytest.mark.parametrize('pool_factory', supported_factories()) def test_print_stats(pool_factory): code = f"""if 1: import pyarrow as pa pool = pa.{pool_factory.__name__}() buf = pa.allocate_buffer(64, memory_pool=pool) pool.print_stats() """ res = subprocess.run([sys.executable, "-c", code], check=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if sys.platform == "linux" and not util.running_on_musllinux(): # On Linux with glibc at least, all memory pools should emit statistics assert res.stderr.strip() != ""