VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39616

Last change on this file since 39616 was 39616, checked in by vboxsync, 13 years ago

reqpool.cpp: Some more code.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 22.0 KB
Line 
1/* $Id: reqpool.cpp 39616 2011-12-14 16:35:38Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Structures and Typedefs *
51*******************************************************************************/
52typedef struct RTREQPOOLTHREAD
53{
54 /** Node in the RTREQPOOLINT::IdleThreads list. */
55 RTLISTNODE IdleNode;
56 /** Node in the RTREQPOOLINT::WorkerThreads list. */
57 RTLISTNODE ListNode;
58
59 /** The submit timestamp of the pending request. */
60 uint64_t uPendingNanoTs;
61 /** The submit timestamp of the request processing. */
62 uint64_t uProcessingNanoTs;
63 /** When this CPU went idle the last time. */
64 uint64_t uIdleNanoTs;
65 /** The number of requests processed by this thread. */
66 uint64_t cReqProcessed;
67 /** Total time the requests processed by this thread took to process. */
68 uint64_t cNsTotalReqProcessing;
69 /** Total time the requests processed by this thread had to wait in
70 * the queue before being scheduled. */
71 uint64_t cNsTotalReqQueued;
72 /** The CPU this was scheduled last time we checked. */
73 RTCPUID idLastCpu;
74
75 /** The submitter will put an incoming request here when scheduling an idle
76 * thread. */
77 PRTREQINT volatile pTodoReq;
78 /** The request the thread is currently processing. */
79 PRTREQINT volatile pPendingReq;
80
81 /** The thread handle. */
82 RTTHREAD hThread;
83 /** Nano seconds timestamp representing the birth time of the thread. */
84 uint64_t uBirthNanoTs;
85 /** Pointer to the request thread pool instance the thread is associated
86 * with. */
87 struct RTREQPOOLINT *pPool;
88} RTREQPOOLTHREAD;
89/** Pointer to a worker thread. */
90typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
91
92/**
93 * Request thread pool instance data.
94 */
95typedef struct RTREQPOOLINT
96{
97 /** Magic value (RTREQPOOL_MAGIC). */
98 uint32_t u32Magic;
99
100 /** @name Config
101 * @{ */
102 /** The worker thread type. */
103 RTTHREADTYPE enmThreadType;
104 /** The maximum number of worker threads. */
105 uint32_t cMaxThreads;
106 /** The number of threads which should be spawned before throttling kicks
107 * in. */
108 uint32_t cThreadsThreshold;
109 /** The minimum number of worker threads. */
110 uint32_t cMinThreads;
111 /** The number of milliseconds a thread needs to be idle before it is
112 * considered for retirement. */
113 uint32_t cMsMinIdle;
114 /** cMsMinIdle in nano seconds. */
115 uint64_t cNsMinIdle;
116 /** The idle thread sleep interval in milliseconds. */
117 uint32_t cMsIdleSleep;
118 /** The max number of milliseconds to push back a submitter before creating
119 * a new worker thread once the threshold has been reached. */
120 uint32_t cMsMaxPushBack;
121 /** The minimum number of milliseconds to push back a submitter before
122 * creating a new worker thread once the threshold has been reached. */
123 uint32_t cMsMinPushBack;
124 /** The max number of free requests in the recycle LIFO. */
125 uint32_t cMaxFreeRequests;
126 /** @} */
127
128 /** Signaled by terminating worker threads. */
129 RTSEMEVENTMULTI hThreadTermEvt;
130
131 /** Destruction indicator. The worker threads checks in their loop. */
132 bool volatile fDestructing;
133
134 /** The current submitter push back in milliseconds.
135 * This is recalculated when worker threads come and go. */
136 uint32_t cMsCurPushBack;
137 /** The current number of worker threads. */
138 uint32_t cCurThreads;
139 /** Statistics: The total number of threads created. */
140 uint32_t cThreadsCreated;
141 /** Statistics: The timestamp when the last thread was created. */
142 uint64_t uLastThreadCreateNanoTs;
143 /** Linked list of worker threads. */
144 RTLISTANCHOR WorkerThreads;
145
146 /** Reference counter. */
147 uint32_t volatile cRefs;
148 /** The number of idle thread or threads in the process of becoming
149 * idle. This is increased before the to-be-idle thread tries to enter
150 * the critical section and add itself to the list. */
151 uint32_t volatile cIdleThreads;
152 /** Linked list of idle threads. */
153 RTLISTANCHOR IdleThreads;
154
155 /** Head of the request FIFO. */
156 PRTREQINT pPendingRequests;
157 /** Where to insert the next request. */
158 PRTREQINT *ppPendingRequests;
159
160 /** Head of the request recycling LIFO. */
161 PRTREQINT pFreeRequests;
162 /** The number of requests in the recycling LIFO. This is read without
163 * entering the critical section, thus volatile. */
164 uint32_t volatile cCurFreeRequests;
165
166 /** Critical section serializing access to members of this structure. */
167 RTCRITSECT CritSect;
168
169} RTREQPOOLINT;
170
171
172/**
173 * Used by exiting thread and the pool destruction code to cancel unexpected
174 * requests.
175 *
176 * @param pReq The request.
177 */
178static void rtReqPoolCancelReq(PRTREQINT pReq)
179{
180 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
181 pReq->enmState = RTREQSTATE_COMPLETED;
182 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
183 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
184 RTSemEventMultiSignal(pReq->hPushBackEvt);
185 RTSemEventSignal(pReq->EventSem);
186
187 RTReqRelease(pReq);
188}
189
190
191/**
192 * Recalculate the max pushback interval when adding or removing worker threads.
193 *
194 * @param pPool The pool. cMsCurPushBack will be changed.
195 */
196static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
197{
198 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
199 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
200 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
201
202 uint32_t cMsCurPushBack;
203 if ((cMsRange >> 2) >= cSteps)
204 cMsCurPushBack = cMsRange / cSteps * iStep;
205 else
206 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
207 cMsCurPushBack += pPool->cMsMinPushBack;
208
209 pPool->cMsCurPushBack = cMsCurPushBack;
210}
211
212
213
214/**
215 * Performs thread exit.
216 *
217 * @returns Thread termination status code (VINF_SUCCESS).
218 * @param pPool The pool.
219 * @param pThread The thread.
220 * @param fLocked Whether we are inside the critical section
221 * already.
222 */
223static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
224{
225 if (!fLocked)
226 RTCritSectEnter(&pPool->CritSect);
227
228 /* Get out of the idle list. */
229 if (!RTListIsEmpty(&pThread->IdleNode))
230 {
231 RTListNodeRemove(&pThread->IdleNode);
232 Assert(pPool->cIdleThreads > 0);
233 ASMAtomicDecU32(&pPool->cIdleThreads);
234 }
235
236 /* Get out of the thread list. */
237 RTListNodeRemove(&pThread->ListNode);
238 Assert(pPool->cCurThreads > 0);
239 pPool->cCurThreads--;
240 rtReqPoolRecalcPushBack(pPool);
241
242 /* This shouldn't happen... */
243 PRTREQINT pReq = pThread->pTodoReq;
244 if (pReq)
245 {
246 AssertFailed();
247 pThread->pTodoReq = NULL;
248 rtReqPoolCancelReq(pReq);
249 }
250
251 /* If we're the last thread terminating, ping the destruction thread before
252 we leave the critical section. */
253 if ( RTListIsEmpty(&pPool->WorkerThreads)
254 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
255 RTSemEventMultiSignal(pPool->hThreadTermEvt);
256
257 RTCritSectLeave(&pPool->CritSect);
258
259 return VINF_SUCCESS;
260}
261
262
263
264static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
265{
266 /*
267 * Update thread state.
268 */
269 pThread->uProcessingNanoTs = RTTimeNanoTS();
270 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
271 pThread->pPendingReq = pReq;
272 Assert(pReq->u32Magic == RTREQ_MAGIC);
273
274 /*
275 * Do the actual processing.
276 */
277 /** @todo */
278
279 /*
280 * Update thread statistics and state.
281 */
282 uint64_t const uNsTsEnd = RTTimeNanoTS();
283 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
284 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
285 pThread->cReqProcessed++;
286}
287
288
289
290/**
291 * The Worker Thread Procedure.
292 *
293 * @returns VINF_SUCCESS.
294 * @param hThreadSelf The thread handle (unused).
295 * @param pvArg Pointer to the thread data.
296 */
297static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
298{
299 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
300 PRTREQPOOLINT pPool = pThread->pPool;
301
302 /*
303 * The work loop.
304 */
305 uint64_t cPrevReqProcessed = UINT64_MAX;
306 while (!pPool->fDestructing)
307 {
308 /*
309 * Process pending work.
310 */
311
312 /* Check if anything is scheduled directly to us. */
313 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
314 if (pReq)
315 {
316 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
317 rtReqPoolThreadProcessRequest(pThread, pReq);
318 continue;
319 }
320
321 ASMAtomicIncU32(&pPool->cIdleThreads);
322 RTCritSectEnter(&pPool->CritSect);
323
324 /* Recheck the todo request pointer after entering the critsect. */
325 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
326 if (pReq)
327 {
328 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
329 RTCritSectLeave(&pPool->CritSect);
330
331 rtReqPoolThreadProcessRequest(pThread, pReq);
332 continue;
333 }
334
335 /* Any pending requests in the queue? */
336 pReq = pPool->pPendingRequests;
337 if (pReq)
338 {
339 pPool->pPendingRequests = pReq->pNext;
340 if (pReq->pNext == NULL)
341 pPool->ppPendingRequests = &pPool->pPendingRequests;
342
343 /* Un-idle ourselves and process the request. */
344 if (!RTListIsEmpty(&pThread->IdleNode))
345 {
346 RTListNodeRemove(&pThread->IdleNode);
347 RTListInit(&pThread->IdleNode);
348 ASMAtomicDecU32(&pPool->cIdleThreads);
349 }
350 ASMAtomicDecU32(&pPool->cIdleThreads);
351 RTCritSectLeave(&pPool->CritSect);
352
353 rtReqPoolThreadProcessRequest(pThread, pReq);
354 continue;
355 }
356
357 /*
358 * Nothing to do, go idle.
359 */
360 if (cPrevReqProcessed != pThread->cReqProcessed)
361 {
362 pThread->cReqProcessed = cPrevReqProcessed;
363 pThread->uIdleNanoTs = RTTimeNanoTS();
364 }
365 else if (pPool->cCurThreads > pPool->cMinThreads)
366 {
367 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
368 if (cNsIdle >= pPool->cNsMinIdle)
369 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
370 }
371
372 if (RTListIsEmpty(&pThread->IdleNode))
373 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
374 else
375 ASMAtomicDecU32(&pPool->cIdleThreads);
376 RTThreadUserReset(hThreadSelf);
377 uint32_t const cMsSleep = pPool->cMsIdleSleep;
378
379 RTCritSectLeave(&pPool->CritSect);
380
381 RTThreadUserWait(hThreadSelf, cMsSleep);
382 }
383
384 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
385}
386
387
388/**
389 * Create a new worker thread.
390 *
391 * @param pPool The pool needing new worker thread.
392 * @remarks Caller owns the critical section
393 */
394static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
395{
396 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
397 if (!pThread)
398 return;
399
400 pThread->uBirthNanoTs = RTTimeNanoTS();
401 pThread->pPool = pPool;
402 pThread->idLastCpu = NIL_RTCPUID;
403 pThread->hThread = NIL_RTTHREAD;
404 RTListInit(&pThread->IdleNode);
405 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
406 pPool->cCurThreads++;
407 pPool->cThreadsCreated++;
408
409 static uint32_t s_idThread = 0;
410 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
411 pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
412 if (RT_SUCCESS(rc))
413 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
414 else
415 {
416 pPool->cCurThreads--;
417 RTListNodeRemove(&pThread->ListNode);
418 RTMemFree(pThread);
419 }
420}
421
422
423/**
424 * Repel the submitter, giving the worker threads a chance to process the
425 * incoming request.
426 *
427 * @returns Success if a worker picked up the request, failure if not. The
428 * critical section has been left on success, while we'll be inside it
429 * on failure.
430 * @param pPool The pool.
431 * @param pReq The incoming request.
432 */
433static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
434{
435 /*
436 * Lazily create the push back semaphore that we'll be blociing on.
437 */
438 int rc;
439 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
440 if (hEvt == NIL_RTSEMEVENTMULTI)
441 {
442 rc = RTSemEventMultiCreate(&hEvt);
443 if (RT_FAILURE(rc))
444 return rc;
445 pReq->hPushBackEvt = hEvt;
446 }
447
448 /*
449 * Prepare the request and semaphore.
450 */
451 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
452 pReq->fSignalPushBack = true;
453 RTReqRetain(pReq);
454 RTSemEventMultiReset(hEvt);
455
456 RTCritSectLeave(&pPool->CritSect);
457
458 /*
459 * Block.
460 */
461 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
462 if (RT_FAILURE(rc))
463 {
464 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
465 RTCritSectEnter(&pPool->CritSect);
466 }
467 RTReqRelease(pReq);
468 return rc;
469}
470
471
472
473DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
474{
475 RTCritSectEnter(&pPool->CritSect);
476
477 /*
478 * Try schedule the request to a thread that's currently idle.
479 */
480 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
481 if (pThread)
482 {
483 /** @todo CPU affinity... */
484 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
485
486 RTListNodeRemove(&pThread->IdleNode);
487 RTListInit(&pThread->IdleNode);
488 ASMAtomicDecU32(&pPool->cIdleThreads);
489
490 RTThreadUserSignal(pThread->hThread);
491
492 RTCritSectLeave(&pPool->CritSect);
493 return;
494 }
495 Assert(RTListIsEmpty(&pPool->IdleThreads));
496
497 /*
498 * Put the request in the pending queue.
499 */
500 pReq->pNext = NULL;
501 *pPool->ppPendingRequests = pReq;
502 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
503
504 /*
505 * If there is an incoming worker thread already or we've reached the
506 * maximum number of worker threads, we're done.
507 */
508 if ( pPool->cIdleThreads > 0
509 || pPool->cCurThreads >= pPool->cMaxThreads)
510 {
511 RTCritSectLeave(&pPool->CritSect);
512 return;
513 }
514
515 /*
516 * Push back before creating a new worker thread.
517 */
518 if ( pPool->cCurThreads > pPool->cThreadsThreshold
519 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
520 {
521 int rc = rtReqPoolPushBack(pPool, pReq);
522 if (RT_SUCCESS(rc))
523 return;
524 }
525
526 /*
527 * Create a new thread for processing the request.
528 * For simplicity, we don't bother leaving the critical section while doing so.
529 */
530 rtReqPoolCreateNewWorker(pPool);
531
532 RTCritSectLeave(&pPool->CritSect);
533 return;
534}
535
536
537/**
538 * Frees a requst.
539 *
540 * @returns true if recycled, false if not.
541 * @param pPool The request thread pool.
542 * @param pReq The request.
543 */
544DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
545{
546 if ( pPool
547 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
548 {
549 RTCritSectEnter(&pPool->CritSect);
550 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
551 {
552 pReq->pNext = pPool->pFreeRequests;
553 pPool->pFreeRequests = pReq;
554 ASMAtomicIncU32(&pPool->cCurFreeRequests);
555
556 RTCritSectLeave(&pPool->CritSect);
557 return true;
558 }
559
560 RTCritSectLeave(&pPool->CritSect);
561 }
562 return false;
563}
564
565typedef enum RTREQPOOLCFGVAR
566{
567 RTREQPOOLCFGVAR_INVALID = 0,
568 RTREQPOOLCFGVAR_END,
569 RTREQPOOLCFGVAR_32BIT_HACK = 0x7fffffff
570} RTREQPOOLCFGVAR;
571
572
573RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
574{
575 return VERR_NOT_SUPPORTED;
576}
577
578
579RTDECL(int) RTReqPoolQueryCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t *puValue)
580{
581 return VERR_NOT_SUPPORTED;
582}
583
584
585typedef enum RTREQPOOLSTAT
586{
587 RTREQPOOLSTAT_INVALID = 0,
588 RTREQPOOLSTAT_END,
589 RTREQPOOLSTAT_32BIT_HACK = 0x7fffffff
590} RTREQPOOLSTAT;
591
592
593RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
594{
595 return UINT64_MAX;
596}
597
598
599RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
600{
601 PRTREQPOOLINT pPool = hPool;
602 AssertPtrReturn(pPool, UINT32_MAX);
603 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
604
605 return ASMAtomicIncU32(&pPool->cRefs);
606}
607RT_EXPORT_SYMBOL(RTReqPoolRetain);
608
609
610RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
611{
612 /*
613 * Ignore NULL and validate the request.
614 */
615 if (!hPool)
616 return 0;
617 PRTREQPOOLINT pPool = hPool;
618 AssertPtrReturn(pPool, UINT32_MAX);
619 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
620
621 /*
622 * Drop a reference, free it when it reaches zero.
623 */
624 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
625 if (cRefs == 0)
626 {
627 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
628
629 RTCritSectEnter(&pPool->CritSect);
630#ifdef RT_STRICT
631 RTTHREAD const hSelf = RTThreadSelf();
632#endif
633
634 /* Indicate to the worker threads that we're shutting down. */
635 ASMAtomicWriteBool(&pPool->fDestructing, true);
636 PRTREQPOOLTHREAD pThread;
637 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
638 {
639 Assert(pThread->hThread != hSelf);
640 RTThreadUserSignal(pThread->hThread);
641 }
642
643 /* Cancel pending requests. */
644 Assert(!pPool->pPendingRequests);
645 while (pPool->pPendingRequests)
646 {
647 PRTREQINT pReq = pPool->pPendingRequests;
648 pPool->pPendingRequests = pReq->pNext;
649 rtReqPoolCancelReq(pReq);
650 }
651 pPool->ppPendingRequests = NULL;
652
653 /* Wait for the workers to shut down. */
654 while (!RTListIsEmpty(&pPool->WorkerThreads))
655 {
656 RTCritSectLeave(&pPool->CritSect);
657 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
658 RTCritSectEnter(&pPool->CritSect);
659 /** @todo should we wait forever here? */
660 }
661
662 /* Free recycled requests. */
663 for (;;)
664 {
665 PRTREQINT pReq = pPool->pFreeRequests;
666 if (!pReq)
667 break;
668 pPool->pFreeRequests = pReq->pNext;
669 pPool->cCurFreeRequests--;
670 rtReqFreeIt(pReq);
671 }
672
673 /* Finally, free the handle. */
674 RTMemFree(pPool);
675 }
676
677 return cRefs;
678}
679RT_EXPORT_SYMBOL(RTReqPoolRelease);
680
681
682RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
683{
684 PRTREQPOOLINT pPool = hPool;
685 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
686 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
687
688 /*
689 * Try recycle old requests.
690 */
691 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
692 {
693 RTCritSectEnter(&pPool->CritSect);
694 PRTREQINT pReq = pPool->pFreeRequests;
695 if (pReq)
696 {
697 ASMAtomicDecU32(&pPool->cCurFreeRequests);
698 pPool->pFreeRequests = pReq->pNext;
699
700 RTCritSectLeave(&pPool->CritSect);
701
702 Assert(pReq->fPoolOrQueue);
703 Assert(pReq->uOwner.hPool == pPool);
704
705 int rc = rtReqReInit(pReq, enmType);
706 if (RT_SUCCESS(rc))
707 {
708 *phReq = pReq;
709 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
710 return rc;
711 }
712 }
713 else
714 RTCritSectLeave(&pPool->CritSect);
715 }
716
717 /*
718 * Allocate a new request.
719 */
720 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
721 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
722 return VINF_SUCCESS;
723}
724RT_EXPORT_SYMBOL(RTReqPoolAlloc);
725
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette