File size: 5,806 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import os
import sys
from typing import Callable, Optional
import torch
from torch.types import Storage
__all__: list[str] = [
"gds_register_buffer",
"gds_deregister_buffer",
"GdsFile",
]
def _dummy_fn(name: str) -> Callable:
def fn(*args, **kwargs): # type: ignore[no-untyped-def]
raise RuntimeError(f"torch._C.{name} is not supported on this platform")
return fn
if not hasattr(torch._C, "_gds_register_buffer"):
assert not hasattr(torch._C, "_gds_deregister_buffer")
assert not hasattr(torch._C, "_gds_register_handle")
assert not hasattr(torch._C, "_gds_deregister_handle")
assert not hasattr(torch._C, "_gds_load_storage")
assert not hasattr(torch._C, "_gds_save_storage")
# Define functions
torch._C.__dict__["_gds_register_buffer"] = _dummy_fn("_gds_register_buffer")
torch._C.__dict__["_gds_deregister_buffer"] = _dummy_fn("_gds_deregister_buffer")
torch._C.__dict__["_gds_register_handle"] = _dummy_fn("_gds_register_handle")
torch._C.__dict__["_gds_deregister_handle"] = _dummy_fn("_gds_deregister_handle")
torch._C.__dict__["_gds_load_storage"] = _dummy_fn("_gds_load_storage")
torch._C.__dict__["_gds_save_storage"] = _dummy_fn("_gds_save_storage")
def gds_register_buffer(s: Storage) -> None:
"""Registers a storage on a CUDA device as a cufile buffer.
Example::
>>> # xdoctest: +SKIP("gds filesystem requirements")
>>> src = torch.randn(1024, device="cuda")
>>> s = src.untyped_storage()
>>> gds_register_buffer(s)
Args:
s (Storage): Buffer to register.
"""
torch._C._gds_register_buffer(s)
def gds_deregister_buffer(s: Storage) -> None:
"""Deregisters a previously registered storage on a CUDA device as a cufile buffer.
Example::
>>> # xdoctest: +SKIP("gds filesystem requirements")
>>> src = torch.randn(1024, device="cuda")
>>> s = src.untyped_storage()
>>> gds_register_buffer(s)
>>> gds_deregister_buffer(s)
Args:
s (Storage): Buffer to register.
"""
torch._C._gds_deregister_buffer(s)
class GdsFile:
r"""Wrapper around cuFile.
cuFile is a file-like interface to the GPUDirect Storage (GDS) API.
See the `cufile docs <https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufile-io-api>`_
for more details.
Args:
filename (str): Name of the file to open.
flags (int): Flags to pass to ``os.open`` when opening the file. ``os.O_DIRECT`` will
be added automatically.
Example::
>>> # xdoctest: +SKIP("gds filesystem requirements")
>>> src1 = torch.randn(1024, device="cuda")
>>> src2 = torch.randn(2, 1024, device="cuda")
>>> file = torch.cuda.gds.GdsFile(f, os.O_CREAT | os.O_RDWR)
>>> file.save_storage(src1.untyped_storage(), offset=0)
>>> file.save_storage(src2.untyped_storage(), offset=src1.nbytes)
>>> dest1 = torch.empty(1024, device="cuda")
>>> dest2 = torch.empty(2, 1024, device="cuda")
>>> file.load_storage(dest1.untyped_storage(), offset=0)
>>> file.load_storage(dest2.untyped_storage(), offset=src1.nbytes)
>>> torch.equal(src1, dest1)
True
>>> torch.equal(src2, dest2)
True
"""
def __init__(self, filename: str, flags: int):
if sys.platform == "win32":
raise RuntimeError("GdsFile is not supported on this platform.")
self.filename = filename
self.flags = flags
self.fd = os.open(filename, flags | os.O_DIRECT) # type: ignore[attr-defined]
self.handle: Optional[int] = None
self.register_handle()
def __del__(self) -> None:
if self.handle is not None:
self.deregister_handle()
os.close(self.fd)
def register_handle(self) -> None:
"""Registers file descriptor to cuFile Driver.
This is a wrapper around ``cuFileHandleRegister``.
"""
assert (
self.handle is None
), "Cannot register a handle that is already registered."
self.handle = torch._C._gds_register_handle(self.fd)
def deregister_handle(self) -> None:
"""Deregisters file descriptor from cuFile Driver.
This is a wrapper around ``cuFileHandleDeregister``.
"""
assert (
self.handle is not None
), "Cannot deregister a handle that is not registered."
torch._C._gds_deregister_handle(self.handle)
self.handle = None
def load_storage(self, storage: Storage, offset: int = 0) -> None:
"""Loads data from the file into the storage.
This is a wrapper around ``cuFileRead``. ``storage.nbytes()`` of data
will be loaded from the file at ``offset`` into the storage.
Args:
storage (Storage): Storage to load data into.
offset (int, optional): Offset into the file to start loading from. (Default: 0)
"""
assert (
self.handle is not None
), "Cannot load data from a file that is not registered."
torch._C._gds_load_storage(self.handle, storage, offset)
def save_storage(self, storage: Storage, offset: int = 0) -> None:
"""Saves data from the storage into the file.
This is a wrapper around ``cuFileWrite``. All bytes of the storage
will be written to the file at ``offset``.
Args:
storage (Storage): Storage to save data from.
offset (int, optional): Offset into the file to start saving to. (Default: 0)
"""
assert (
self.handle is not None
), "Cannot save data to a file that is not registered."
torch._C._gds_save_storage(self.handle, storage, offset)
|