Windows 异步 IO

发布于 2022-02-28


Overlapped I/O 是 WIN32 的一项技术,可以异步的读写数据。

打开或者创建文件调用 CreateFile 的时候,

HANDLE CreateFileA(
  [in]           LPCSTR                lpFileName,
  [in]           DWORD                 dwDesiredAccess,
  [in]           DWORD                 dwShareMode,
  [in, optional] LPSECURITY_ATTRIBUTES lpSecurityAttributes,
  [in]           DWORD                 dwCreationDisposition,
  [in]           DWORD                 dwFlagsAndAttributes,
  [in, optional] HANDLE                hTemplateFile
);

dwFlagsAndAttributes 参数传递 FILE_FLAG_OVERLAPPED 标志,表示以异步 I/O 的方式操作文件,如下所示:

  HANDLE file = CreateFile(L"CMakeCache.txt", GENERIC_READ | GENERIC_WRITE, 0, NULL,
                           OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
  if (file == NULL || file == INVALID_HANDLE_VALUE) {
    printf("CreateFile handle failed\n");
    return 0;
  }

调用 ReadFile 可以异步的读取数据。

BOOL ReadFile(
  [in]                HANDLE       hFile,
  [out]               LPVOID       lpBuffer,
  [in]                DWORD        nNumberOfBytesToRead,
  [out, optional]     LPDWORD      lpNumberOfBytesRead,
  [in, out, optional] LPOVERLAPPED lpOverlapped
);

注意:如果 CreateFile 指定了 FILE_FLAG_OVERLAPPED 标志,后续读写文件调用也必须有 OVERLAPPED 参数。同步方式 CreateFile ,则传递NULL即可。

OVERLAPPED 数据结构如下:

typedef struct _OVERLAPPED {
  ULONG_PTR Internal;
  ULONG_PTR InternalHigh;
  union {
    struct {
      DWORD Offset;
      DWORD OffsetHigh;
    } DUMMYSTRUCTNAME;
    PVOID Pointer;
  } DUMMYUNIONNAME;
  HANDLE    hEvent;
} OVERLAPPED, *LPOVERLAPPED;

具体字段:

  • Internal 表示执行异步 I/O 后的状态码。这个值为 STATUS_PENDING 时,表示异步操作还未完成。
  • InternalHigh 表示异步后传送的字节数。
  • DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh 调用者设置的值,用于指定 I/O 的文件偏移位置
  • hEvent 事件句柄。一般使用 GetOverlappedResult 或者其他等待函数等待事件,它们会自动重置事件到无信号状态。需要使用手动重置类型的事件。
  const DWORD buffer_size = 30;
  char buffer[buffer_size] = {0};
  DWORD number_of_bytes_read = 0;
  OVERLAPPED over_lapped = {0};
  BOOL result =
      ReadFile(file, buffer, buffer_size, &number_of_bytes_read, &over_lapped);
  printf("number_of_bytes_read %d\n", number_of_bytes_read);

  auto read_function = [&]() {
    WaitForSingleObject(file, INFINITE);
    printf("over_lapped.InternalHigh %d\n", over_lapped.InternalHigh);
    PrintBuffer("", std::string(buffer, buffer + over_lapped.InternalHigh));
  };
  std::thread read_thread(read_function);
  read_thread.join();

输出结果如下:
number_of_bytes_read 0
over_lapped.InternalHigh 30
read:# This is the CMakeCache file.

我们通过设置 OVERLAPPEDOffset 来控制读取文件的起始偏移地址:

over_lapped.Offset = 10;

输出结果:
number_of_bytes_read 0
over_lapped.InternalHigh 30
read:the CMakeCache file.\x0d\x0a# For bu

我们是通过 WaitForSingleObject 文件句柄来等待异步操作的完成。如果我们同时发起了多个异步操作,则不能 wait 一个文件句柄了。可以通过 OVERLAPPEDhEvent 区分开来。

  const int max_op = 10;
  OVERLAPPED over_lapped[max_op];
  for (int index = 0; index < max_op; index++) {
    const DWORD buffer_size = 30;
    char *buffer = new char[buffer_size];
    memset(buffer, 0, buffer_size);
    DWORD number_of_bytes_read = 0;
    memset((void *)&over_lapped[index], 0, sizeof(OVERLAPPED));
    // 创建手动重置的事件
    over_lapped[index].hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

    BOOL result = ReadFile(file, buffer, buffer_size, &number_of_bytes_read,
                           &over_lapped[index]);

    auto read_function = [](char *buffer, OVERLAPPED *over_lapped) {
      // 等待 OVERLAPPED 中的事件
      WaitForSingleObject(over_lapped->hEvent, INFINITE);
      PrintBuffer("read:",
                  std::string(buffer, buffer + over_lapped->InternalHigh));
      delete[] buffer;
    };
    std::thread read_thread(read_function, buffer, &over_lapped[index]);
    read_thread.detach();
  }

我们也可以通过 WaitForMultipleObjects 一次性等待多次异步操作的完成。

  const int max_op = 10;
  OVERLAPPED over_lapped[max_op];
  HANDLE events[max_op];
  std::vector<char *> buffers;
  const DWORD buffer_size = 30;
  for (int index = 0; index < max_op; index++) {
    char *buffer = new char[buffer_size];
    buffers.push_back(buffer);
    memset(buffer, 0, buffer_size);
    DWORD number_of_bytes_read = 0;
    memset((void *)&over_lapped[index], 0, sizeof(OVERLAPPED));
    // 创建手动重置的事件
    events[index] = CreateEvent(NULL, TRUE, FALSE, NULL);
    over_lapped[index].hEvent = events[index];
    over_lapped[index].Offset = index * buffer_size;

    BOOL result = ReadFile(file, buffer, buffer_size, &number_of_bytes_read,
                           &over_lapped[index]);
  }

  // 等待所有异步事件完成
  WaitForMultipleObjects(max_op, events, TRUE, INFINITE);
  for (auto b : buffers) {
    PrintBuffer("read:", std::string(b, b + buffer_size));
  }

我们还可以通过 GetOverlappedResult 来获取异步的状态:

BOOL GetOverlappedResult(
  [in]  HANDLE       hFile,
  [in]  LPOVERLAPPED lpOverlapped,
  [out] LPDWORD      lpNumberOfBytesTransferred,
  [in]  BOOL         bWait
);

代码如下:

  const DWORD buffer_size = 30;
  char buffer[buffer_size] = {0};
  DWORD number_of_bytes_read = 0;
  OVERLAPPED over_lapped = {0};
  BOOL result =
      ReadFile(file, buffer, buffer_size, &number_of_bytes_read, &over_lapped);
  if (result) {
    //表示已经读取到数据了
    PrintBuffer("read 1:", std::string(buffer, buffer + number_of_bytes_read));
  } else {
    if (GetLastError() == ERROR_IO_PENDING) {
      printf("ERROR_IO_PENDING\n");

      DWORD number_of_bytes_transferred = 0;
      // 最后参数 TRUE 表示同步等待结果
      result = GetOverlappedResult(file, &over_lapped,
                                   &number_of_bytes_transferred, TRUE);
      PrintBuffer("read 2:",
                  std::string(buffer, buffer + number_of_bytes_transferred));
    }
  }

也可以通过 GetOverlappedResult 不阻塞的轮询异步状态:

  const DWORD buffer_size = 1024 * 1024*20;
  char *buffer = new char[buffer_size];
  DWORD number_of_bytes_read = 0;
  OVERLAPPED over_lapped = {0};
  BOOL result =
      ReadFile(file, buffer, buffer_size, &number_of_bytes_read, &over_lapped);
  if (result) {
    //表示已经读取到数据了
    PrintBuffer("read 1:", std::string(buffer, buffer + number_of_bytes_read));
  } else {
    if (GetLastError() == ERROR_IO_PENDING) {
      printf("ERROR_IO_PENDING\n");

      auto read_function = [](HANDLE file, OVERLAPPED *over_lapped,
                              char *buffer) {
        DWORD number_of_bytes_transferred = 0;
        // 轮询异步的状态
        while (!GetOverlappedResult(file, over_lapped,
                                    &number_of_bytes_transferred, FALSE)) {
          printf("loop query\n");
        }
        printf("read ok\n");
      };
      std::thread read_thread(read_function, file, &over_lapped, buffer);
      read_thread.detach();
    }
  }

GetOverlappedResultEx 则是可以指定等待的超时值:

BOOL GetOverlappedResultEx(
  [in]  HANDLE       hFile,
  [in]  LPOVERLAPPED lpOverlapped,
  [out] LPDWORD      lpNumberOfBytesTransferred,
  [in]  DWORD        dwMilliseconds,
  [in]  BOOL         bAlertable
);

此外还可以使用 ReadFileEx 版本,通过回调函数来获取异步读取的结果:

BOOL ReadFileEx(
  [in]            HANDLE                          hFile,
  [out, optional] LPVOID                          lpBuffer,
  [in]            DWORD                           nNumberOfBytesToRead,
  [in, out]       LPOVERLAPPED                    lpOverlapped,
  [in]            LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

void LpoverlappedCompletionRoutine(
  [in]      DWORD dwErrorCode,
  [in]      DWORD dwNumberOfBytesTransfered,
  [in, out] LPOVERLAPPED lpOverlapped
)

调用 CancelIoCancelIoEx 可以取消异步操作。

参考