VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp@ 79952

Last change on this file since 79952 was 79949, checked in by vboxsync, 6 years ago

Runtime: bugref:8231 Starting on defining and implementing a new RTIoQueue API to replace RTFileAio mid term.

The RTIoQueue API is meant to be more efficient as it doesn't require allocation
of request structures and is only meant as a thin layer around host dependent APIs.
It will support mutiple providers for different handle types and the best suited provider
supported on a particular host can be selected. This allows multiple implementations
to coexist for the same host in an easy manner.
The providers currently being implemented are (in various stages of the implementation):

  • ioqueue-stdfile-provider.cpp:

A fallback provider if nothing else is available using the synchronous RTFile* APIs
emulating asynchronous behavior by using a dedicated worker thread.

  • ioqueue-aiofile-provider.cpp:

Uses the current RTFileAio* API, will get replaced by dedicated provider implementations for
each host later on.

  • ioqueue-iouringfile-provider.cpp:

Uses the recently added io_uring interface for Linux kernels 5.1 and newer. The new interface
is a great improvement over the old aio interface which is cumbersome to use and has various
restrictions. If not available on a host the code can fall back to one of the other providers.

The I/O queue interface is also meant to be suitable for sockets which can be implemented later.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 17.1 KB
Line 
1/* $Id: ioqueue-stdfile-provider.cpp 79949 2019-07-24 11:05:45Z vboxsync $ */
2/** @file
3 * IPRT - I/O queue, Standard file provider.
4 */
5
6/*
7 * Copyright (C) 2019 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#define LOG_GROUP RTLOGGROUP_IOQUEUE
32#include <iprt/ioqueue.h>
33
34#include <iprt/asm.h>
35#include <iprt/errcore.h>
36#include <iprt/file.h>
37#include <iprt/log.h>
38#include <iprt/mem.h>
39#include <iprt/semaphore.h>
40#include <iprt/string.h>
41#include <iprt/thread.h>
42
43#include "internal/ioqueue.h"
44
45
46/*********************************************************************************************************************************
47* Defined Constants And Macros *
48*********************************************************************************************************************************/
49
50/** The I/O queue worker thread needs to wake up the waiting thread when requests completed. */
51#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP RT_BIT(0)
52/** The waiting thread was interrupted by the external wakeup call. */
53#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR RT_BIT(1)
54/** The I/O queue worker thread needs to be woken up to process new requests. */
55#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP RT_BIT(2)
56#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT 2
57
58
59/*********************************************************************************************************************************
60* Structures and Typedefs *
61*********************************************************************************************************************************/
62
63
64/**
65 * Submission queue entry.
66 */
67typedef struct RTIOQUEUESSQENTRY
68{
69 /** The file to work on. */
70 RTFILE hFile;
71 /** I/O operation. */
72 RTIOQUEUEOP enmOp;
73 /** Start offset. */
74 uint64_t off;
75 /** Additional request flags. */
76 uint32_t fReqFlags;
77 /** Size of the request. */
78 size_t cbReq;
79 /** Opaque user data passed on completion. */
80 void *pvUser;
81 /** Flag whether this is a S/G or standard request. */
82 bool fSg;
83 /** Type dependent data. */
84 union
85 {
86 /** Pointer to buffer for non S/G requests. */
87 void *pvBuf;
88 /** Pointer to S/G buffer. */
89 PCRTSGBUF pSgBuf;
90 } u;
91} RTIOQUEUESSQENTRY;
92/** Pointer to a submission queue entry. */
93typedef RTIOQUEUESSQENTRY *PRTIOQUEUESSQENTRY;
94/** Pointer to a constant submission queue entry. */
95typedef const RTIOQUEUESSQENTRY *PCRTIOQUEUESSQENTRY;
96
97
98/**
99 * Internal I/O queue provider instance data.
100 */
101typedef struct RTIOQUEUEPROVINT
102{
103 /** Size of the submission queue in entries. */
104 size_t cSqEntries;
105 /** Size of the completion queue in entries. */
106 size_t cCqEntries;
107 /** Pointer to the submission queue base. */
108 PRTIOQUEUESSQENTRY paSqEntryBase;
109 /** Submission queue producer index. */
110 volatile uint32_t idxSqProd;
111 /** Submission queue consumer index. */
112 volatile uint32_t idxSqCons;
113 /** Pointer to the completion queue base. */
114 PRTIOQUEUECEVT paCqEntryBase;
115 /** Completion queue producer index. */
116 volatile uint32_t idxCqProd;
117 /** Completion queue consumer index. */
118 volatile uint32_t idxCqCons;
119 /** Various state flags for synchronizing the worker thread with other participants. */
120 volatile uint32_t fState;
121 /** The worker thread handle. */
122 RTTHREAD hThrdWork;
123 /** Event semaphore the worker thread waits on for work. */
124 RTSEMEVENT hSemEvtWorker;
125 /** Event semaphore the caller waits for completion events. */
126 RTSEMEVENT hSemEvtWaitEvts;
127 /** Flag whether to shutdown the worker thread. */
128 volatile bool fShutdown;
129} RTIOQUEUEPROVINT;
130/** Pointer to the internal I/O queue provider instance data. */
131typedef RTIOQUEUEPROVINT *PRTIOQUEUEPROVINT;
132
133
134/*********************************************************************************************************************************
135* Internal Functions *
136*********************************************************************************************************************************/
137
138
139/**
140 * Processes the given submission queue entry and reports back the result in the completion queue.
141 *
142 * @returns nothing.
143 * @param pSqEntry The submission queue entry to process.
144 * @param pCqEntry The comppletion queue entry to store the result in.
145 */
146static void rtIoQueueStdFileProv_SqEntryProcess(PCRTIOQUEUESSQENTRY pSqEntry, PRTIOQUEUECEVT pCqEntry)
147{
148 int rcReq = VINF_SUCCESS;
149
150 switch (pSqEntry->enmOp)
151 {
152 case RTIOQUEUEOP_READ:
153 if (!pSqEntry->fSg)
154 rcReq = RTFileReadAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL);
155 else
156 {
157 RTSGBUF SgBuf;
158 RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf);
159 rcReq = RTFileSgReadAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL);
160 }
161 break;
162 case RTIOQUEUEOP_WRITE:
163 if (!pSqEntry->fSg)
164 rcReq = RTFileWriteAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL);
165 else
166 {
167 RTSGBUF SgBuf;
168 RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf);
169 rcReq = RTFileSgWriteAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL);
170 }
171 break;
172 case RTIOQUEUEOP_SYNC:
173 rcReq = RTFileFlush(pSqEntry->hFile);
174 break;
175 default:
176 AssertMsgFailedReturnVoid(("Invalid I/O queue operation: %d\n", pSqEntry->enmOp));
177 }
178
179 /* Write the result back into the completion queue. */
180 pCqEntry->rcReq = rcReq;
181 pCqEntry->pvUser = pSqEntry->pvUser;
182}
183
184
185/**
186 * The main I/O queue worker loop which processes the incoming I/O requests.
187 */
188static DECLCALLBACK(int) rtIoQueueStdFileProv_WorkerLoop(RTTHREAD hThrdSelf, void *pvUser)
189{
190 PRTIOQUEUEPROVINT pThis = (PRTIOQUEUEPROVINT)pvUser;
191
192 /* Signal that we started up. */
193 int rc = RTThreadUserSignal(hThrdSelf);
194 AssertRC(rc);
195
196 while (!ASMAtomicReadBool(&pThis->fShutdown))
197 {
198 /* Wait for some work. */
199 ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP);
200 uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
201 uint32_t idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
202 uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
203
204 if (idxSqCons == idxSqProd)
205 {
206 rc = RTSemEventWait(pThis->hSemEvtWorker, RT_INDEFINITE_WAIT);
207 AssertRC(rc);
208
209 idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
210 idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
211 idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
212 }
213
214 ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
215
216 /* Process all requests. */
217 do
218 {
219 while ( idxSqCons != idxSqProd
220 && idxCqCons != pThis->idxCqProd)
221 {
222 PCRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqCons];
223 PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[pThis->idxCqProd];
224
225 rtIoQueueStdFileProv_SqEntryProcess(pSqEntry, pCqEntry);
226 ASMWriteFence();
227
228 idxSqCons = (idxSqCons + 1) % pThis->cSqEntries;
229 pThis->idxCqProd = (pThis->idxCqProd + 1) % pThis->cCqEntries;
230 ASMWriteFence();
231 if (ASMAtomicReadU32(&pThis->fState) & RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP)
232 {
233 rc = RTSemEventSignal(pThis->hSemEvtWaitEvts);
234 AssertRC(rc);
235 }
236 }
237
238 ASMWriteFence();
239 ASMAtomicWriteU32(&pThis->idxSqCons, idxSqCons);
240 idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
241 idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
242 } while (idxSqCons != idxSqProd);
243 }
244
245 return VINF_SUCCESS;
246}
247
248
249/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnIsSupported} */
250static DECLCALLBACK(bool) rtIoQueueStdFileProv_IsSupported(void)
251{
252 /* The common code/public API already checked for the proper handle type. */
253 return true;
254}
255
256
257/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueInit} */
258static DECLCALLBACK(int) rtIoQueueStdFileProv_QueueInit(RTIOQUEUEPROV hIoQueueProv, uint32_t fFlags,
259 size_t cSqEntries, size_t cCqEntries)
260{
261 RT_NOREF(fFlags);
262
263 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
264 int rc = VINF_SUCCESS;
265
266 pThis->cSqEntries = cSqEntries;
267 pThis->cCqEntries = cCqEntries;
268 pThis->idxSqProd = 0;
269 pThis->idxSqCons = 0;
270 pThis->idxCqProd = 0;
271 pThis->idxCqCons = 0;
272 pThis->fShutdown = false;
273 pThis->fState = 0;
274
275 pThis->paSqEntryBase = (PRTIOQUEUESSQENTRY)RTMemAllocZ(cSqEntries * sizeof(RTIOQUEUESSQENTRY));
276 if (RT_LIKELY(pThis->paSqEntryBase))
277 {
278 pThis->paCqEntryBase = (PRTIOQUEUECEVT)RTMemAllocZ(cCqEntries * sizeof(RTIOQUEUECEVT));
279 if (RT_LIKELY(pThis->paSqEntryBase))
280 {
281 rc = RTSemEventCreate(&pThis->hSemEvtWorker);
282 if (RT_SUCCESS(rc))
283 {
284 rc = RTSemEventCreate(&pThis->hSemEvtWaitEvts);
285 if (RT_SUCCESS(rc))
286 {
287 /* Spin up the worker thread. */
288 rc = RTThreadCreate(&pThis->hThrdWork, rtIoQueueStdFileProv_WorkerLoop, pThis, 0, RTTHREADTYPE_IO, RTTHREADFLAGS_WAITABLE,
289 "IoQ-StdFile");
290 if (RT_SUCCESS(rc))
291 {
292 rc = RTThreadUserWait(pThis->hThrdWork, 10 * RT_MS_1SEC);
293 AssertRC(rc);
294
295 return VINF_SUCCESS;
296 }
297
298 RTSemEventDestroy(pThis->hSemEvtWaitEvts);
299 }
300
301 RTSemEventDestroy(pThis->hSemEvtWorker);
302 }
303
304 RTMemFree(pThis->paCqEntryBase);
305 }
306 else
307 rc = VERR_NO_MEMORY;
308
309 RTMemFree(pThis->paSqEntryBase);
310 }
311 else
312 rc = VERR_NO_MEMORY;
313
314 return rc;
315}
316
317
318/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueDestroy} */
319static DECLCALLBACK(void) rtIoQueueStdFileProv_QueueDestroy(RTIOQUEUEPROV hIoQueueProv)
320{
321 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
322
323 ASMAtomicXchgBool(&pThis->fShutdown, true);
324 RTSemEventSignal(pThis->hSemEvtWorker);
325
326 int rc = RTThreadWait(pThis->hThrdWork, 60 * RT_MS_1SEC, NULL);
327 AssertRC(rc);
328
329 RTSemEventDestroy(pThis->hSemEvtWaitEvts);
330 RTSemEventDestroy(pThis->hSemEvtWorker);
331 RTMemFree(pThis->paCqEntryBase);
332 RTMemFree(pThis->paSqEntryBase);
333 RT_BZERO(pThis, sizeof(*pThis));
334}
335
336
337/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleRegister} */
338static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleRegister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
339{
340 RT_NOREF(hIoQueueProv, pHandle);
341
342 /* Nothing to do here. */
343 return VINF_SUCCESS;
344}
345
346
347/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleDeregister} */
348static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleDeregister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
349{
350 RT_NOREF(hIoQueueProv, pHandle);
351
352 /* Nothing to do here. */
353 return VINF_SUCCESS;
354}
355
356
357/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepare} */
358static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepare(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
359 uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags,
360 void *pvUser)
361{
362 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
363 uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
364 PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqProd];
365
366 pSqEntry->hFile = pHandle->u.hFile;
367 pSqEntry->enmOp = enmOp;
368 pSqEntry->off = off;
369 pSqEntry->fReqFlags = fReqFlags;
370 pSqEntry->cbReq = cbBuf;
371 pSqEntry->pvUser = pvUser;
372 pSqEntry->fSg = false;
373 pSqEntry->u.pvBuf = pvBuf;
374 ASMWriteFence();
375
376 idxSqProd = (idxSqProd + 1) % pThis->cSqEntries;
377 ASMAtomicWriteU32(&pThis->idxSqProd, idxSqProd);
378 return VINF_SUCCESS;
379}
380
381
382/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepareSg} */
383static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepareSg(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
384 uint64_t off, PCRTSGBUF pSgBuf, size_t cbSg, uint32_t fReqFlags,
385 void *pvUser)
386{
387 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
388 uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
389 PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqProd];
390
391 pSqEntry->hFile = pHandle->u.hFile;
392 pSqEntry->enmOp = enmOp;
393 pSqEntry->off = off;
394 pSqEntry->fReqFlags = fReqFlags;
395 pSqEntry->cbReq = cbSg;
396 pSqEntry->pvUser = pvUser;
397 pSqEntry->fSg = true;
398 pSqEntry->u.pSgBuf = pSgBuf;
399 ASMWriteFence();
400
401 idxSqProd = (idxSqProd + 1) % pThis->cSqEntries;
402 ASMAtomicWriteU32(&pThis->idxSqProd, idxSqProd);
403 return VINF_SUCCESS;
404}
405
406
407/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnCommit} */
408static DECLCALLBACK(int) rtIoQueueStdFileProv_Commit(RTIOQUEUEPROV hIoQueueProv, uint32_t *pcReqsCommitted)
409{
410 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
411 RT_NOREF(pcReqsCommitted);
412
413 return RTSemEventSignal(pThis->hSemEvtWorker);
414}
415
416
417/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWait} */
418static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWait(RTIOQUEUEPROV hIoQueueProv, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt,
419 uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags)
420{
421 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
422 RT_NOREF(pThis, paCEvt, cCEvt, cMinWait, pcCEvt, fFlags);
423
424 return VERR_NOT_IMPLEMENTED;
425}
426
427
428/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWaitWakeup} */
429static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWaitWakeup(RTIOQUEUEPROV hIoQueueProv)
430{
431 PRTIOQUEUEPROVINT pThis = hIoQueueProv;
432
433 ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR);
434 return RTSemEventSignal(pThis->hSemEvtWaitEvts);
435}
436
437
438/**
439 * Standard file I/O queue provider virtual method table.
440 */
441RT_DECL_DATA_CONST(RTIOQUEUEPROVVTABLE const) g_RTIoQueueStdFileProv =
442{
443 /** uVersion */
444 RTIOQUEUEPROVVTABLE_VERSION,
445 /** pszId */
446 "StdFile",
447 /** cbIoQueueProv */
448 sizeof(RTIOQUEUEPROVINT),
449 /** enmHnd */
450 RTHANDLETYPE_FILE,
451 /** fFlags */
452 0,
453 /** pfnIsSupported */
454 rtIoQueueStdFileProv_IsSupported,
455 /** pfnQueueInit */
456 rtIoQueueStdFileProv_QueueInit,
457 /** pfnQueueDestroy */
458 rtIoQueueStdFileProv_QueueDestroy,
459 /** pfnHandleRegister */
460 rtIoQueueStdFileProv_HandleRegister,
461 /** pfnHandleDeregister */
462 rtIoQueueStdFileProv_HandleDeregister,
463 /** pfnReqPrepare */
464 rtIoQueueStdFileProv_ReqPrepare,
465 /** pfnReqPrepareSg */
466 rtIoQueueStdFileProv_ReqPrepareSg,
467 /** pfnCommit */
468 rtIoQueueStdFileProv_Commit,
469 /** pfnEvtWait */
470 rtIoQueueStdFileProv_EvtWait,
471 /** pfnEvtWaitWakeup */
472 rtIoQueueStdFileProv_EvtWaitWakeup,
473 /** uEndMarker */
474 RTIOQUEUEPROVVTABLE_VERSION
475};
476
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette