""" Utility functions for - building and importing modules on test time, using a temporary location - detecting if compilers are present - determining paths to tests """ import atexit import concurrent.futures import contextlib import glob import os import shutil import subprocess import sys import tempfile from importlib import import_module from pathlib import Path import pytest import numpy from numpy._utils import asunicode from numpy.f2py._backends._meson import MesonBackend from numpy.testing import IS_WASM, temppath # # Check if compilers are available at all... # def check_language(lang, code_snippet=None): if sys.platform == "win32": pytest.skip("No Fortran tests on Windows (Issue #25134)", allow_module_level=True) tmpdir = tempfile.mkdtemp() try: meson_file = os.path.join(tmpdir, "meson.build") with open(meson_file, "w") as f: f.write("project('check_compilers')\n") f.write(f"add_languages('{lang}')\n") if code_snippet: f.write(f"{lang}_compiler = meson.get_compiler('{lang}')\n") f.write(f"{lang}_code = '''{code_snippet}'''\n") f.write( f"_have_{lang}_feature =" f"{lang}_compiler.compiles({lang}_code," f" name: '{lang} feature check')\n" ) try: runmeson = subprocess.run( ["meson", "setup", "btmp"], check=False, cwd=tmpdir, capture_output=True, ) except subprocess.CalledProcessError: pytest.skip("meson not present, skipping compiler dependent test", allow_module_level=True) return runmeson.returncode == 0 finally: shutil.rmtree(tmpdir) fortran77_code = ''' C Example Fortran 77 code PROGRAM HELLO PRINT *, 'Hello, Fortran 77!' END ''' fortran90_code = ''' ! Example Fortran 90 code program hello90 type :: greeting character(len=20) :: text end type greeting type(greeting) :: greet greet%text = 'hello, fortran 90!' print *, greet%text end program hello90 ''' # Dummy class for caching relevant checks class CompilerChecker: def __init__(self): self.compilers_checked = False self.has_c = False self.has_f77 = False self.has_f90 = False def check_compilers(self): if (not self.compilers_checked) and (not sys.platform == "cygwin"): with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(check_language, "c"), executor.submit(check_language, "fortran", fortran77_code), executor.submit(check_language, "fortran", fortran90_code) ] self.has_c = futures[0].result() self.has_f77 = futures[1].result() self.has_f90 = futures[2].result() self.compilers_checked = True if not IS_WASM: checker = CompilerChecker() checker.check_compilers() def has_c_compiler(): return checker.has_c def has_f77_compiler(): return checker.has_f77 def has_f90_compiler(): return checker.has_f90 def has_fortran_compiler(): return (checker.has_f90 and checker.has_f77) # # Maintaining a temporary module directory # _module_dir = None _module_num = 5403 if sys.platform == "cygwin": NUMPY_INSTALL_ROOT = Path(__file__).parent.parent.parent _module_list = list(NUMPY_INSTALL_ROOT.glob("**/*.dll")) def _cleanup(): global _module_dir if _module_dir is not None: try: sys.path.remove(_module_dir) except ValueError: pass try: shutil.rmtree(_module_dir) except OSError: pass _module_dir = None def get_module_dir(): global _module_dir if _module_dir is None: _module_dir = tempfile.mkdtemp() atexit.register(_cleanup) if _module_dir not in sys.path: sys.path.insert(0, _module_dir) return _module_dir def get_temp_module_name(): # Assume single-threaded, and the module dir usable only by this thread global _module_num get_module_dir() name = "_test_ext_module_%d" % _module_num _module_num += 1 if name in sys.modules: # this should not be possible, but check anyway raise RuntimeError("Temporary module name already in use.") return name def _memoize(func): memo = {} def wrapper(*a, **kw): key = repr((a, kw)) if key not in memo: try: memo[key] = func(*a, **kw) except Exception as e: memo[key] = e raise ret = memo[key] if isinstance(ret, Exception): raise ret return ret wrapper.__name__ = func.__name__ return wrapper # # Building modules # @_memoize def build_module(source_files, options=[], skip=[], only=[], module_name=None): """ Compile and import a f2py module, built from the given files. """ code = f"import sys; sys.path = {sys.path!r}; import numpy.f2py; numpy.f2py.main()" d = get_module_dir() # gh-27045 : Skip if no compilers are found if not has_fortran_compiler(): pytest.skip("No Fortran compiler available") # Copy files dst_sources = [] f2py_sources = [] for fn in source_files: if not os.path.isfile(fn): raise RuntimeError(f"{fn} is not a file") dst = os.path.join(d, os.path.basename(fn)) shutil.copyfile(fn, dst) dst_sources.append(dst) base, ext = os.path.splitext(dst) if ext in (".f90", ".f95", ".f", ".c", ".pyf"): f2py_sources.append(dst) assert f2py_sources # Prepare options if module_name is None: module_name = get_temp_module_name() gil_options = [] if '--freethreading-compatible' not in options and '--no-freethreading-compatible' not in options: # default to disabling the GIL if unset in options gil_options = ['--freethreading-compatible'] f2py_opts = ["-c", "-m", module_name] + options + gil_options + f2py_sources f2py_opts += ["--backend", "meson"] if skip: f2py_opts += ["skip:"] + skip if only: f2py_opts += ["only:"] + only # Build cwd = os.getcwd() try: os.chdir(d) cmd = [sys.executable, "-c", code] + f2py_opts p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out, err = p.communicate() if p.returncode != 0: raise RuntimeError(f"Running f2py failed: {cmd[4:]}\n{asunicode(out)}") finally: os.chdir(cwd) # Partial cleanup for fn in dst_sources: os.unlink(fn) # Rebase (Cygwin-only) if sys.platform == "cygwin": # If someone starts deleting modules after import, this will # need to change to record how big each module is, rather than # relying on rebase being able to find that from the files. _module_list.extend( glob.glob(os.path.join(d, f"{module_name:s}*")) ) subprocess.check_call( ["/usr/bin/rebase", "--database", "--oblivious", "--verbose"] + _module_list ) # Import return import_module(module_name) @_memoize def build_code(source_code, options=[], skip=[], only=[], suffix=None, module_name=None): """ Compile and import Fortran code using f2py. """ if suffix is None: suffix = ".f" with temppath(suffix=suffix) as path: with open(path, "w") as f: f.write(source_code) return build_module([path], options=options, skip=skip, only=only, module_name=module_name) # # Building with meson # class SimplifiedMesonBackend(MesonBackend): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def compile(self): self.write_meson_build(self.build_dir) self.run_meson(self.build_dir) def build_meson(source_files, module_name=None, **kwargs): """ Build a module via Meson and import it. """ # gh-27045 : Skip if no compilers are found if not has_fortran_compiler(): pytest.skip("No Fortran compiler available") build_dir = get_module_dir() if module_name is None: module_name = get_temp_module_name() # Initialize the MesonBackend backend = SimplifiedMesonBackend( modulename=module_name, sources=source_files, extra_objects=kwargs.get("extra_objects", []), build_dir=build_dir, include_dirs=kwargs.get("include_dirs", []), library_dirs=kwargs.get("library_dirs", []), libraries=kwargs.get("libraries", []), define_macros=kwargs.get("define_macros", []), undef_macros=kwargs.get("undef_macros", []), f2py_flags=kwargs.get("f2py_flags", []), sysinfo_flags=kwargs.get("sysinfo_flags", []), fc_flags=kwargs.get("fc_flags", []), flib_flags=kwargs.get("flib_flags", []), setup_flags=kwargs.get("setup_flags", []), remove_build_dir=kwargs.get("remove_build_dir", False), extra_dat=kwargs.get("extra_dat", {}), ) backend.compile() # Import the compiled module sys.path.insert(0, f"{build_dir}/{backend.meson_build_dir}") return import_module(module_name) # # Unittest convenience # class F2PyTest: code = None sources = None options = [] skip = [] only = [] suffix = ".f" module = None _has_c_compiler = None _has_f77_compiler = None _has_f90_compiler = None @property def module_name(self): cls = type(self) return f'_{cls.__module__.rsplit(".", 1)[-1]}_{cls.__name__}_ext_module' @classmethod def setup_class(cls): if sys.platform == "win32": pytest.skip("Fails with MinGW64 Gfortran (Issue #9673)") F2PyTest._has_c_compiler = has_c_compiler() F2PyTest._has_f77_compiler = has_f77_compiler() F2PyTest._has_f90_compiler = has_f90_compiler() F2PyTest._has_fortran_compiler = has_fortran_compiler() def setup_method(self): if self.module is not None: return codes = self.sources or [] if self.code: codes.append(self.suffix) needs_f77 = any(str(fn).endswith(".f") for fn in codes) needs_f90 = any(str(fn).endswith(".f90") for fn in codes) needs_pyf = any(str(fn).endswith(".pyf") for fn in codes) if needs_f77 and not self._has_f77_compiler: pytest.skip("No Fortran 77 compiler available") if needs_f90 and not self._has_f90_compiler: pytest.skip("No Fortran 90 compiler available") if needs_pyf and not self._has_fortran_compiler: pytest.skip("No Fortran compiler available") # Build the module if self.code is not None: self.module = build_code( self.code, options=self.options, skip=self.skip, only=self.only, suffix=self.suffix, module_name=self.module_name, ) if self.sources is not None: self.module = build_module( self.sources, options=self.options, skip=self.skip, only=self.only, module_name=self.module_name, ) # # Helper functions # def getpath(*a): # Package root d = Path(numpy.f2py.__file__).parent.resolve() return d.joinpath(*a) @contextlib.contextmanager def switchdir(path): curpath = Path.cwd() os.chdir(path) try: yield finally: os.chdir(curpath)