20 #include <sys/types.h>
28 #include <system_error>
31 #include <kvikio/buffer.hpp>
32 #include <kvikio/defaults.hpp>
33 #include <kvikio/error.hpp>
34 #include <kvikio/parallel_operation.hpp>
35 #include <kvikio/posix_io.hpp>
36 #include <kvikio/shim/cufile.hpp>
37 #include <kvikio/stream.hpp>
38 #include <kvikio/utils.hpp>
50 inline int open_fd_parse_flags(
const std::string& flags,
bool o_direct)
53 if (flags.empty()) {
throw std::invalid_argument(
"Unknown file open flag"); }
56 file_flags = O_RDONLY;
57 if (flags[1] ==
'+') { file_flags = O_RDWR; }
60 file_flags = O_WRONLY;
61 if (flags[1] ==
'+') { file_flags = O_RDWR; }
62 file_flags |= O_CREAT | O_TRUNC;
64 case 'a':
throw std::invalid_argument(
"Open flag 'a' isn't supported");
65 default:
throw std::invalid_argument(
"Unknown file open flag");
67 file_flags |= O_CLOEXEC;
68 if (o_direct) { file_flags |= O_DIRECT; }
80 inline int open_fd(
const std::string& file_path,
81 const std::string& flags,
86 int fd = ::open(file_path.c_str(), open_fd_parse_flags(flags, o_direct), mode);
87 if (fd == -1) {
throw std::system_error(errno, std::generic_category(),
"Unable to open file"); }
96 [[nodiscard]]
inline int open_flags(
int fd)
98 int ret = fcntl(fd, F_GETFL);
100 throw std::system_error(errno, std::generic_category(),
"Unable to retrieve open flags");
111 [[nodiscard]]
inline std::size_t get_file_size(
int file_descriptor)
114 int ret = fstat(file_descriptor, &st);
116 throw std::system_error(errno, std::generic_category(),
"Unable to query file size");
118 return static_cast<std::size_t
>(st.st_size);
131 int _fd_direct_on{-1};
132 int _fd_direct_off{-1};
133 bool _initialized{
false};
134 bool _compat_mode{
false};
135 mutable std::size_t _nbytes{0};
136 CUfileHandle_t _handle{};
139 static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
159 const std::string& flags =
"r",
162 : _fd_direct_off{detail::open_fd(file_path, flags, false, mode)},
164 _compat_mode{compat_mode}
167 _fd_direct_on = detail::open_fd(file_path, flags,
true, mode);
168 }
catch (
const std::system_error&) {
172 if (_compat_mode) {
return; }
173 #ifdef KVIKIO_CUFILE_FOUND
174 CUfileDescr_t desc{};
175 desc.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;
177 desc.handle.fd = _fd_direct_on;
178 CUFILE_TRY(cuFileAPI::instance().HandleRegister(&_handle, &desc));
188 : _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
189 _fd_direct_off{std::exchange(o._fd_direct_off, -1)},
190 _initialized{std::exchange(o._initialized,
false)},
191 _compat_mode{std::exchange(o._compat_mode,
false)},
192 _nbytes{std::exchange(o._nbytes, 0)},
193 _handle{std::exchange(o._handle, CUfileHandle_t{})}
196 FileHandle& operator=(FileHandle&& o) noexcept
198 _fd_direct_on = std::exchange(o._fd_direct_on, -1);
199 _fd_direct_off = std::exchange(o._fd_direct_off, -1);
200 _initialized = std::exchange(o._initialized,
false);
201 _compat_mode = std::exchange(o._compat_mode,
false);
202 _nbytes = std::exchange(o._nbytes, 0);
203 _handle = std::exchange(o._handle, CUfileHandle_t{});
206 ~FileHandle() noexcept {
close(); }
208 [[nodiscard]]
bool closed() const noexcept {
return !_initialized; }
215 if (closed()) {
return; }
218 #ifdef KVIKIO_CUFILE_FOUND
219 cuFileAPI::instance().HandleDeregister(_handle);
223 if (_fd_direct_on != -1) {
::close(_fd_direct_on); }
226 _initialized =
false;
241 throw CUfileException(
"The underlying cuFile handle isn't available in compatibility mode");
255 [[nodiscard]]
int fd() const noexcept {
return _fd_direct_off; }
266 [[nodiscard]]
int fd_open_flags()
const {
return detail::open_flags(_fd_direct_off); }
275 [[nodiscard]]
inline std::size_t
nbytes()
const
277 if (closed()) {
return 0; }
278 if (_nbytes == 0) { _nbytes = detail::get_file_size(_fd_direct_off); }
305 std::size_t
read(
void* devPtr_base,
307 std::size_t file_offset,
308 std::size_t devPtr_offset)
311 return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
313 #ifdef KVIKIO_CUFILE_FOUND
314 ssize_t ret = cuFileAPI::instance().Read(
315 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
317 throw std::system_error(errno, std::generic_category(),
"Unable to read file");
320 throw CUfileException(std::string{
"cuFile error at: "} + __FILE__ +
":" +
321 KVIKIO_STRINGIFY(__LINE__) +
": " + CUFILE_ERRSTR(ret));
353 std::size_t
write(
const void* devPtr_base,
355 std::size_t file_offset,
356 std::size_t devPtr_offset)
361 return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
363 #ifdef KVIKIO_CUFILE_FOUND
364 ssize_t ret = cuFileAPI::instance().Write(
365 _handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
367 throw std::system_error(errno, std::generic_category(),
"Unable to write file");
370 throw CUfileException(std::string{
"cuFile error at: "} + __FILE__ +
":" +
371 KVIKIO_STRINGIFY(__LINE__) +
": " + CUFILE_ERRSTR(ret));
399 std::future<std::size_t>
pread(
void* buf,
401 std::size_t file_offset = 0,
405 if (is_host_memory(buf)) {
406 auto op = [
this](
void* hostPtr_base,
408 std::size_t file_offset,
409 std::size_t hostPtr_offset) -> std::size_t {
410 char* buf =
static_cast<char*
>(hostPtr_base) + hostPtr_offset;
411 return posix_host_read(_fd_direct_off, buf, size, file_offset,
false);
414 return parallel_io(op, buf, size, file_offset, task_size, 0);
417 CUcontext ctx = get_context_from_pointer(buf);
420 if (size < gds_threshold) {
421 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
423 return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
425 return std::async(std::launch::deferred, task);
429 auto task = [
this, ctx](
void* devPtr_base,
431 std::size_t file_offset,
432 std::size_t devPtr_offset) -> std::size_t {
434 return read(devPtr_base, size, file_offset, devPtr_offset);
436 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
437 return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
460 std::future<std::size_t>
pwrite(
const void* buf,
462 std::size_t file_offset = 0,
466 if (is_host_memory(buf)) {
467 auto op = [
this](
const void* hostPtr_base,
469 std::size_t file_offset,
470 std::size_t hostPtr_offset) -> std::size_t {
471 const char* buf =
static_cast<const char*
>(hostPtr_base) + hostPtr_offset;
472 return posix_host_write(_fd_direct_off, buf, size, file_offset,
false);
475 return parallel_io(op, buf, size, file_offset, task_size, 0);
478 CUcontext ctx = get_context_from_pointer(buf);
481 if (size < gds_threshold) {
482 auto task = [
this, ctx, buf, size, file_offset]() -> std::size_t {
484 return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
486 return std::async(std::launch::deferred, task);
490 auto op = [
this, ctx](
const void* devPtr_base,
492 std::size_t file_offset,
493 std::size_t devPtr_offset) -> std::size_t {
495 return write(devPtr_base, size, file_offset, devPtr_offset);
497 auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
498 return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
537 off_t* file_offset_p,
538 off_t* devPtr_offset_p,
539 ssize_t* bytes_read_p,
542 #ifdef KVIKIO_CUFILE_STREAM_API_FOUND
543 if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
544 CUFILE_TRY(cuFileAPI::instance().ReadAsync(
545 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
550 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
552 static_cast<ssize_t
>(
read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
582 off_t file_offset = 0,
583 off_t devPtr_offset = 0,
584 CUstream stream =
nullptr)
586 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
587 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
589 read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
630 off_t* file_offset_p,
631 off_t* devPtr_offset_p,
632 ssize_t* bytes_written_p,
635 #ifdef KVIKIO_CUFILE_STREAM_API_FOUND
636 if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
637 CUFILE_TRY(cuFileAPI::instance().WriteAsync(
638 _handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
643 CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
645 static_cast<ssize_t
>(
write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
675 off_t file_offset = 0,
676 off_t devPtr_offset = 0,
677 CUstream stream =
nullptr)
679 StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
680 auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
682 write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
Handle of an open file registered with cufile.
void close() noexcept
Deregister the file and close the two files.
FileHandle(const FileHandle &)=delete
FileHandle support move semantic but isn't copyable.
void read_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_read_p, CUstream stream)
Reads specified bytes from the file into the device memory asynchronously.
std::future< std::size_t > pread(void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold())
Reads specified bytes from the file into the device or host memory in parallel.
CUfileHandle_t handle()
Get the underlying cuFile file handle.
int fd_open_flags() const
Get the flags of one of the file descriptors (see open(2))
FileHandle(const std::string &file_path, const std::string &flags="r", mode_t mode=m644, bool compat_mode=defaults::compat_mode())
Construct a file handle from a file path.
bool is_compat_mode_on() const noexcept
Returns true if the compatibility mode has been enabled for this file.
StreamFuture write_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t read(void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
Reads specified bytes from the file into the device memory.
StreamFuture read_async(void *devPtr_base, std::size_t size, off_t file_offset=0, off_t devPtr_offset=0, CUstream stream=nullptr)
Reads specified bytes from the file into the device memory asynchronously.
int fd() const noexcept
Get one of the file descriptors.
void write_async(void *devPtr_base, std::size_t *size_p, off_t *file_offset_p, off_t *devPtr_offset_p, ssize_t *bytes_written_p, CUstream stream)
Writes specified bytes from the device memory into the file asynchronously.
std::size_t write(const void *devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
Writes specified bytes from the device memory into the file.
std::size_t nbytes() const
Get the file size.
std::future< std::size_t > pwrite(const void *buf, std::size_t size, std::size_t file_offset=0, std::size_t task_size=defaults::task_size(), std::size_t gds_threshold=defaults::gds_threshold())
Writes specified bytes from device or host memory into the file in parallel.
Push CUDA context on creation and pop it on destruction.
Future of an asynchronous IO operation.
std::tuple< void *, std::size_t *, off_t *, off_t *, ssize_t *, CUstream > get_args() const
Return the arguments of the future call.
static std::size_t task_size()
Get the default task size used for parallel IO operations.
static bool compat_mode()
Return whether the KvikIO library is running in compatibility mode or not.
static std::size_t gds_threshold()
Get the default GDS threshold, which is the minimum size to use GDS (in bytes).