Windows 异步 IO
发布于 2022-02-28
Overlapped I/O 是 WIN32 的一项技术,可以异步的读写数据。
打开或者创建文件调用 CreateFile
的时候,
HANDLE CreateFileA(
[in] LPCSTR lpFileName,
[in] DWORD dwDesiredAccess,
[in] DWORD dwShareMode,
[in, optional] LPSECURITY_ATTRIBUTES lpSecurityAttributes,
[in] DWORD dwCreationDisposition,
[in] DWORD dwFlagsAndAttributes,
[in, optional] HANDLE hTemplateFile
);
dwFlagsAndAttributes
参数传递 FILE_FLAG_OVERLAPPED
标志,表示以异步 I/O 的方式操作文件,如下所示:
HANDLE file = CreateFile(L"CMakeCache.txt", GENERIC_READ | GENERIC_WRITE, 0, NULL,
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
if (file == NULL || file == INVALID_HANDLE_VALUE) {
printf("CreateFile handle failed\n");
return 0;
}
调用 ReadFile
可以异步的读取数据。
BOOL ReadFile(
[in] HANDLE hFile,
[out] LPVOID lpBuffer,
[in] DWORD nNumberOfBytesToRead,
[out, optional] LPDWORD lpNumberOfBytesRead,
[in, out, optional] LPOVERLAPPED lpOverlapped
);
注意:如果 CreateFile
指定了 FILE_FLAG_OVERLAPPED
标志,后续读写文件调用也必须有 OVERLAPPED
参数。同步方式 CreateFile
,则传递NULL即可。
OVERLAPPED
数据结构如下:
typedef struct _OVERLAPPED {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
struct {
DWORD Offset;
DWORD OffsetHigh;
} DUMMYSTRUCTNAME;
PVOID Pointer;
} DUMMYUNIONNAME;
HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;
具体字段:
Internal
表示执行异步 I/O 后的状态码。这个值为STATUS_PENDING
时,表示异步操作还未完成。InternalHigh
表示异步后传送的字节数。DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset
DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh
调用者设置的值,用于指定 I/O 的文件偏移位置hEvent
事件句柄。一般使用GetOverlappedResult
或者其他等待函数等待事件,它们会自动重置事件到无信号状态。需要使用手动重置类型的事件。
const DWORD buffer_size = 30;
char buffer[buffer_size] = {0};
DWORD number_of_bytes_read = 0;
OVERLAPPED over_lapped = {0};
BOOL result =
ReadFile(file, buffer, buffer_size, &number_of_bytes_read, &over_lapped);
printf("number_of_bytes_read %d\n", number_of_bytes_read);
auto read_function = [&]() {
WaitForSingleObject(file, INFINITE);
printf("over_lapped.InternalHigh %d\n", over_lapped.InternalHigh);
PrintBuffer("", std::string(buffer, buffer + over_lapped.InternalHigh));
};
std::thread read_thread(read_function);
read_thread.join();
输出结果如下:
number_of_bytes_read 0
over_lapped.InternalHigh 30
read:# This is the CMakeCache file.
我们通过设置 OVERLAPPED
的 Offset
来控制读取文件的起始偏移地址:
over_lapped.Offset = 10;
输出结果:
number_of_bytes_read 0
over_lapped.InternalHigh 30
read:the CMakeCache file.\x0d\x0a# For bu
我们是通过 WaitForSingleObject
文件句柄来等待异步操作的完成。如果我们同时发起了多个异步操作,则不能 wait 一个文件句柄了。可以通过 OVERLAPPED
的 hEvent
区分开来。
const int max_op = 10;
OVERLAPPED over_lapped[max_op];
for (int index = 0; index < max_op; index++) {
const DWORD buffer_size = 30;
char *buffer = new char[buffer_size];
memset(buffer, 0, buffer_size);
DWORD number_of_bytes_read = 0;
memset((void *)&over_lapped[index], 0, sizeof(OVERLAPPED));
// 创建手动重置的事件
over_lapped[index].hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
BOOL result = ReadFile(file, buffer, buffer_size, &number_of_bytes_read,
&over_lapped[index]);
auto read_function = [](char *buffer, OVERLAPPED *over_lapped) {
// 等待 OVERLAPPED 中的事件
WaitForSingleObject(over_lapped->hEvent, INFINITE);
PrintBuffer("read:",
std::string(buffer, buffer + over_lapped->InternalHigh));
delete[] buffer;
};
std::thread read_thread(read_function, buffer, &over_lapped[index]);
read_thread.detach();
}
我们也可以通过 WaitForMultipleObjects
一次性等待多次异步操作的完成。
const int max_op = 10;
OVERLAPPED over_lapped[max_op];
HANDLE events[max_op];
std::vector<char *> buffers;
const DWORD buffer_size = 30;
for (int index = 0; index < max_op; index++) {
char *buffer = new char[buffer_size];
buffers.push_back(buffer);
memset(buffer, 0, buffer_size);
DWORD number_of_bytes_read = 0;
memset((void *)&over_lapped[index], 0, sizeof(OVERLAPPED));
// 创建手动重置的事件
events[index] = CreateEvent(NULL, TRUE, FALSE, NULL);
over_lapped[index].hEvent = events[index];
over_lapped[index].Offset = index * buffer_size;
BOOL result = ReadFile(file, buffer, buffer_size, &number_of_bytes_read,
&over_lapped[index]);
}
// 等待所有异步事件完成
WaitForMultipleObjects(max_op, events, TRUE, INFINITE);
for (auto b : buffers) {
PrintBuffer("read:", std::string(b, b + buffer_size));
}
我们还可以通过 GetOverlappedResult
来获取异步的状态:
BOOL GetOverlappedResult(
[in] HANDLE hFile,
[in] LPOVERLAPPED lpOverlapped,
[out] LPDWORD lpNumberOfBytesTransferred,
[in] BOOL bWait
);
代码如下:
const DWORD buffer_size = 30;
char buffer[buffer_size] = {0};
DWORD number_of_bytes_read = 0;
OVERLAPPED over_lapped = {0};
BOOL result =
ReadFile(file, buffer, buffer_size, &number_of_bytes_read, &over_lapped);
if (result) {
//表示已经读取到数据了
PrintBuffer("read 1:", std::string(buffer, buffer + number_of_bytes_read));
} else {
if (GetLastError() == ERROR_IO_PENDING) {
printf("ERROR_IO_PENDING\n");
DWORD number_of_bytes_transferred = 0;
// 最后参数 TRUE 表示同步等待结果
result = GetOverlappedResult(file, &over_lapped,
&number_of_bytes_transferred, TRUE);
PrintBuffer("read 2:",
std::string(buffer, buffer + number_of_bytes_transferred));
}
}
也可以通过 GetOverlappedResult
不阻塞的轮询异步状态:
const DWORD buffer_size = 1024 * 1024*20;
char *buffer = new char[buffer_size];
DWORD number_of_bytes_read = 0;
OVERLAPPED over_lapped = {0};
BOOL result =
ReadFile(file, buffer, buffer_size, &number_of_bytes_read, &over_lapped);
if (result) {
//表示已经读取到数据了
PrintBuffer("read 1:", std::string(buffer, buffer + number_of_bytes_read));
} else {
if (GetLastError() == ERROR_IO_PENDING) {
printf("ERROR_IO_PENDING\n");
auto read_function = [](HANDLE file, OVERLAPPED *over_lapped,
char *buffer) {
DWORD number_of_bytes_transferred = 0;
// 轮询异步的状态
while (!GetOverlappedResult(file, over_lapped,
&number_of_bytes_transferred, FALSE)) {
printf("loop query\n");
}
printf("read ok\n");
};
std::thread read_thread(read_function, file, &over_lapped, buffer);
read_thread.detach();
}
}
GetOverlappedResultEx
则是可以指定等待的超时值:
BOOL GetOverlappedResultEx(
[in] HANDLE hFile,
[in] LPOVERLAPPED lpOverlapped,
[out] LPDWORD lpNumberOfBytesTransferred,
[in] DWORD dwMilliseconds,
[in] BOOL bAlertable
);
此外还可以使用 ReadFileEx
版本,通过回调函数来获取异步读取的结果:
BOOL ReadFileEx(
[in] HANDLE hFile,
[out, optional] LPVOID lpBuffer,
[in] DWORD nNumberOfBytesToRead,
[in, out] LPOVERLAPPED lpOverlapped,
[in] LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
void LpoverlappedCompletionRoutine(
[in] DWORD dwErrorCode,
[in] DWORD dwNumberOfBytesTransfered,
[in, out] LPOVERLAPPED lpOverlapped
)
调用 CancelIo
、CancelIoEx
可以取消异步操作。