VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39617

Last change on this file since 39617 was 39617, checked in by vboxsync, 13 years ago

later.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 22.0 KB
Line 
1/* $Id: reqpool.cpp 39617 2011-12-14 16:45:01Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Structures and Typedefs *
51*******************************************************************************/
52typedef struct RTREQPOOLTHREAD
53{
54 /** Node in the RTREQPOOLINT::IdleThreads list. */
55 RTLISTNODE IdleNode;
56 /** Node in the RTREQPOOLINT::WorkerThreads list. */
57 RTLISTNODE ListNode;
58
59 /** The submit timestamp of the pending request. */
60 uint64_t uPendingNanoTs;
61 /** The submit timestamp of the request processing. */
62 uint64_t uProcessingNanoTs;
63 /** When this CPU went idle the last time. */
64 uint64_t uIdleNanoTs;
65 /** The number of requests processed by this thread. */
66 uint64_t cReqProcessed;
67 /** Total time the requests processed by this thread took to process. */
68 uint64_t cNsTotalReqProcessing;
69 /** Total time the requests processed by this thread had to wait in
70 * the queue before being scheduled. */
71 uint64_t cNsTotalReqQueued;
72 /** The CPU this was scheduled last time we checked. */
73 RTCPUID idLastCpu;
74
75 /** The submitter will put an incoming request here when scheduling an idle
76 * thread. */
77 PRTREQINT volatile pTodoReq;
78 /** The request the thread is currently processing. */
79 PRTREQINT volatile pPendingReq;
80
81 /** The thread handle. */
82 RTTHREAD hThread;
83 /** Nano seconds timestamp representing the birth time of the thread. */
84 uint64_t uBirthNanoTs;
85 /** Pointer to the request thread pool instance the thread is associated
86 * with. */
87 struct RTREQPOOLINT *pPool;
88} RTREQPOOLTHREAD;
89/** Pointer to a worker thread. */
90typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
91
92/**
93 * Request thread pool instance data.
94 */
95typedef struct RTREQPOOLINT
96{
97 /** Magic value (RTREQPOOL_MAGIC). */
98 uint32_t u32Magic;
99
100 /** @name Config
101 * @{ */
102 /** The worker thread type. */
103 RTTHREADTYPE enmThreadType;
104 /** The maximum number of worker threads. */
105 uint32_t cMaxThreads;
106 /** The number of threads which should be spawned before throttling kicks
107 * in. */
108 uint32_t cThreadsThreshold;
109 /** The minimum number of worker threads. */
110 uint32_t cMinThreads;
111 /** The number of milliseconds a thread needs to be idle before it is
112 * considered for retirement. */
113 uint32_t cMsMinIdle;
114 /** cMsMinIdle in nano seconds. */
115 uint64_t cNsMinIdle;
116 /** The idle thread sleep interval in milliseconds. */
117 uint32_t cMsIdleSleep;
118 /** The max number of milliseconds to push back a submitter before creating
119 * a new worker thread once the threshold has been reached. */
120 uint32_t cMsMaxPushBack;
121 /** The minimum number of milliseconds to push back a submitter before
122 * creating a new worker thread once the threshold has been reached. */
123 uint32_t cMsMinPushBack;
124 /** The max number of free requests in the recycle LIFO. */
125 uint32_t cMaxFreeRequests;
126 /** @} */
127
128 /** Signaled by terminating worker threads. */
129 RTSEMEVENTMULTI hThreadTermEvt;
130
131 /** Destruction indicator. The worker threads checks in their loop. */
132 bool volatile fDestructing;
133
134 /** The current submitter push back in milliseconds.
135 * This is recalculated when worker threads come and go. */
136 uint32_t cMsCurPushBack;
137 /** The current number of worker threads. */
138 uint32_t cCurThreads;
139 /** Statistics: The total number of threads created. */
140 uint32_t cThreadsCreated;
141 /** Statistics: The timestamp when the last thread was created. */
142 uint64_t uLastThreadCreateNanoTs;
143 /** Linked list of worker threads. */
144 RTLISTANCHOR WorkerThreads;
145
146 /** Reference counter. */
147 uint32_t volatile cRefs;
148 /** The number of idle thread or threads in the process of becoming
149 * idle. This is increased before the to-be-idle thread tries to enter
150 * the critical section and add itself to the list. */
151 uint32_t volatile cIdleThreads;
152 /** Linked list of idle threads. */
153 RTLISTANCHOR IdleThreads;
154
155 /** Head of the request FIFO. */
156 PRTREQINT pPendingRequests;
157 /** Where to insert the next request. */
158 PRTREQINT *ppPendingRequests;
159
160 /** Head of the request recycling LIFO. */
161 PRTREQINT pFreeRequests;
162 /** The number of requests in the recycling LIFO. This is read without
163 * entering the critical section, thus volatile. */
164 uint32_t volatile cCurFreeRequests;
165
166 /** Critical section serializing access to members of this structure. */
167 RTCRITSECT CritSect;
168
169} RTREQPOOLINT;
170
171
172/**
173 * Used by exiting thread and the pool destruction code to cancel unexpected
174 * requests.
175 *
176 * @param pReq The request.
177 */
178static void rtReqPoolCancelReq(PRTREQINT pReq)
179{
180 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
181 pReq->enmState = RTREQSTATE_COMPLETED;
182 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
183 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
184 RTSemEventMultiSignal(pReq->hPushBackEvt);
185 RTSemEventSignal(pReq->EventSem);
186
187 RTReqRelease(pReq);
188}
189
190
191/**
192 * Recalculate the max pushback interval when adding or removing worker threads.
193 *
194 * @param pPool The pool. cMsCurPushBack will be changed.
195 */
196static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
197{
198 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
199 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
200 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
201
202 uint32_t cMsCurPushBack;
203 if ((cMsRange >> 2) >= cSteps)
204 cMsCurPushBack = cMsRange / cSteps * iStep;
205 else
206 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
207 cMsCurPushBack += pPool->cMsMinPushBack;
208
209 pPool->cMsCurPushBack = cMsCurPushBack;
210}
211
212
213
214/**
215 * Performs thread exit.
216 *
217 * @returns Thread termination status code (VINF_SUCCESS).
218 * @param pPool The pool.
219 * @param pThread The thread.
220 * @param fLocked Whether we are inside the critical section
221 * already.
222 */
223static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
224{
225 if (!fLocked)
226 RTCritSectEnter(&pPool->CritSect);
227
228 /* Get out of the idle list. */
229 if (!RTListIsEmpty(&pThread->IdleNode))
230 {
231 RTListNodeRemove(&pThread->IdleNode);
232 Assert(pPool->cIdleThreads > 0);
233 ASMAtomicDecU32(&pPool->cIdleThreads);
234 }
235
236 /* Get out of the thread list. */
237 RTListNodeRemove(&pThread->ListNode);
238 Assert(pPool->cCurThreads > 0);
239 pPool->cCurThreads--;
240 rtReqPoolRecalcPushBack(pPool);
241
242 /* This shouldn't happen... */
243 PRTREQINT pReq = pThread->pTodoReq;
244 if (pReq)
245 {
246 AssertFailed();
247 pThread->pTodoReq = NULL;
248 rtReqPoolCancelReq(pReq);
249 }
250
251 /* If we're the last thread terminating, ping the destruction thread before
252 we leave the critical section. */
253 if ( RTListIsEmpty(&pPool->WorkerThreads)
254 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
255 RTSemEventMultiSignal(pPool->hThreadTermEvt);
256
257 RTCritSectLeave(&pPool->CritSect);
258
259 return VINF_SUCCESS;
260}
261
262
263
264static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
265{
266 /*
267 * Update thread state.
268 */
269 pThread->uProcessingNanoTs = RTTimeNanoTS();
270 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
271 pThread->pPendingReq = pReq;
272 Assert(pReq->u32Magic == RTREQ_MAGIC);
273
274 /*
275 * Do the actual processing.
276 */
277 /** @todo */
278
279 /*
280 * Update thread statistics and state.
281 */
282 uint64_t const uNsTsEnd = RTTimeNanoTS();
283 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
284 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
285 pThread->cReqProcessed++;
286}
287
288
289
290/**
291 * The Worker Thread Procedure.
292 *
293 * @returns VINF_SUCCESS.
294 * @param hThreadSelf The thread handle (unused).
295 * @param pvArg Pointer to the thread data.
296 */
297static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
298{
299 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
300 PRTREQPOOLINT pPool = pThread->pPool;
301
302 /*
303 * The work loop.
304 */
305 uint64_t cPrevReqProcessed = UINT64_MAX;
306 while (!pPool->fDestructing)
307 {
308 /*
309 * Process pending work.
310 */
311
312 /* Check if anything is scheduled directly to us. */
313 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
314 if (pReq)
315 {
316 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
317 rtReqPoolThreadProcessRequest(pThread, pReq);
318 continue;
319 }
320
321 ASMAtomicIncU32(&pPool->cIdleThreads);
322 RTCritSectEnter(&pPool->CritSect);
323
324 /* Recheck the todo request pointer after entering the critsect. */
325 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
326 if (pReq)
327 {
328 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
329 RTCritSectLeave(&pPool->CritSect);
330
331 rtReqPoolThreadProcessRequest(pThread, pReq);
332 continue;
333 }
334
335 /* Any pending requests in the queue? */
336 pReq = pPool->pPendingRequests;
337 if (pReq)
338 {
339 pPool->pPendingRequests = pReq->pNext;
340 if (pReq->pNext == NULL)
341 pPool->ppPendingRequests = &pPool->pPendingRequests;
342
343 /* Un-idle ourselves and process the request. */
344 if (!RTListIsEmpty(&pThread->IdleNode))
345 {
346 RTListNodeRemove(&pThread->IdleNode);
347 RTListInit(&pThread->IdleNode);
348 ASMAtomicDecU32(&pPool->cIdleThreads);
349 }
350 ASMAtomicDecU32(&pPool->cIdleThreads);
351 RTCritSectLeave(&pPool->CritSect);
352
353 rtReqPoolThreadProcessRequest(pThread, pReq);
354 continue;
355 }
356
357 /*
358 * Nothing to do, go idle.
359 */
360 if (cPrevReqProcessed != pThread->cReqProcessed)
361 {
362 pThread->cReqProcessed = cPrevReqProcessed;
363 pThread->uIdleNanoTs = RTTimeNanoTS();
364 }
365 else if (pPool->cCurThreads > pPool->cMinThreads)
366 {
367 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
368 if (cNsIdle >= pPool->cNsMinIdle)
369 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
370 }
371
372 if (RTListIsEmpty(&pThread->IdleNode))
373 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
374 else
375 ASMAtomicDecU32(&pPool->cIdleThreads);
376 RTThreadUserReset(hThreadSelf);
377 uint32_t const cMsSleep = pPool->cMsIdleSleep;
378
379 RTCritSectLeave(&pPool->CritSect);
380
381 RTThreadUserWait(hThreadSelf, cMsSleep);
382 }
383
384 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
385}
386
387
388/**
389 * Create a new worker thread.
390 *
391 * @param pPool The pool needing new worker thread.
392 * @remarks Caller owns the critical section
393 */
394static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
395{
396 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
397 if (!pThread)
398 return;
399
400 pThread->uBirthNanoTs = RTTimeNanoTS();
401 pThread->pPool = pPool;
402 pThread->idLastCpu = NIL_RTCPUID;
403 pThread->hThread = NIL_RTTHREAD;
404 RTListInit(&pThread->IdleNode);
405 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
406 pPool->cCurThreads++;
407 pPool->cThreadsCreated++;
408
409 static uint32_t s_idThread = 0;
410 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
411 pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
412 if (RT_SUCCESS(rc))
413 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
414 else
415 {
416 pPool->cCurThreads--;
417 RTListNodeRemove(&pThread->ListNode);
418 RTMemFree(pThread);
419 }
420}
421
422
423/**
424 * Repel the submitter, giving the worker threads a chance to process the
425 * incoming request.
426 *
427 * @returns Success if a worker picked up the request, failure if not. The
428 * critical section has been left on success, while we'll be inside it
429 * on failure.
430 * @param pPool The pool.
431 * @param pReq The incoming request.
432 */
433static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
434{
435 /*
436 * Lazily create the push back semaphore that we'll be blociing on.
437 */
438 int rc;
439 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
440 if (hEvt == NIL_RTSEMEVENTMULTI)
441 {
442 rc = RTSemEventMultiCreate(&hEvt);
443 if (RT_FAILURE(rc))
444 return rc;
445 pReq->hPushBackEvt = hEvt;
446 }
447
448 /*
449 * Prepare the request and semaphore.
450 */
451 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
452 pReq->fSignalPushBack = true;
453 RTReqRetain(pReq);
454 RTSemEventMultiReset(hEvt);
455
456 RTCritSectLeave(&pPool->CritSect);
457
458 /*
459 * Block.
460 */
461 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
462 if (RT_FAILURE(rc))
463 {
464 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
465 RTCritSectEnter(&pPool->CritSect);
466 }
467 RTReqRelease(pReq);
468 return rc;
469}
470
471
472
473DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
474{
475 RTCritSectEnter(&pPool->CritSect);
476
477 /*
478 * Try schedule the request to a thread that's currently idle.
479 */
480 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
481 if (pThread)
482 {
483 /** @todo CPU affinity... */
484 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
485
486 RTListNodeRemove(&pThread->IdleNode);
487 RTListInit(&pThread->IdleNode);
488 ASMAtomicDecU32(&pPool->cIdleThreads);
489
490 RTThreadUserSignal(pThread->hThread);
491
492 RTCritSectLeave(&pPool->CritSect);
493 return;
494 }
495 Assert(RTListIsEmpty(&pPool->IdleThreads));
496
497 /*
498 * Put the request in the pending queue.
499 */
500 pReq->pNext = NULL;
501 *pPool->ppPendingRequests = pReq;
502 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
503
504 /*
505 * If there is an incoming worker thread already or we've reached the
506 * maximum number of worker threads, we're done.
507 */
508 if ( pPool->cIdleThreads > 0
509 || pPool->cCurThreads >= pPool->cMaxThreads)
510 {
511 RTCritSectLeave(&pPool->CritSect);
512 return;
513 }
514
515 /*
516 * Push back before creating a new worker thread.
517 */
518 if ( pPool->cCurThreads > pPool->cThreadsThreshold
519 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
520 {
521 int rc = rtReqPoolPushBack(pPool, pReq);
522 if (RT_SUCCESS(rc))
523 return;
524 }
525
526 /*
527 * Create a new thread for processing the request.
528 * For simplicity, we don't bother leaving the critical section while doing so.
529 */
530 rtReqPoolCreateNewWorker(pPool);
531
532 RTCritSectLeave(&pPool->CritSect);
533 return;
534}
535
536
537/**
538 * Frees a requst.
539 *
540 * @returns true if recycled, false if not.
541 * @param pPool The request thread pool.
542 * @param pReq The request.
543 */
544DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
545{
546 if ( pPool
547 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
548 {
549 RTCritSectEnter(&pPool->CritSect);
550 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
551 {
552 pReq->pNext = pPool->pFreeRequests;
553 pPool->pFreeRequests = pReq;
554 ASMAtomicIncU32(&pPool->cCurFreeRequests);
555
556 RTCritSectLeave(&pPool->CritSect);
557 return true;
558 }
559
560 RTCritSectLeave(&pPool->CritSect);
561 }
562 return false;
563}
564
565#if 0 /* later */
566
567typedef enum RTREQPOOLCFGVAR
568{
569 RTREQPOOLCFGVAR_INVALID = 0,
570 RTREQPOOLCFGVAR_END,
571 RTREQPOOLCFGVAR_32BIT_HACK = 0x7fffffff
572} RTREQPOOLCFGVAR;
573
574
575RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
576{
577 return VERR_NOT_SUPPORTED;
578}
579
580
581RTDECL(int) RTReqPoolQueryCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t *puValue)
582{
583 return VERR_NOT_SUPPORTED;
584}
585
586
587typedef enum RTREQPOOLSTAT
588{
589 RTREQPOOLSTAT_INVALID = 0,
590 RTREQPOOLSTAT_END,
591 RTREQPOOLSTAT_32BIT_HACK = 0x7fffffff
592} RTREQPOOLSTAT;
593
594
595RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
596{
597 return UINT64_MAX;
598}
599
600#endif /* later */
601
602RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
603{
604 PRTREQPOOLINT pPool = hPool;
605 AssertPtrReturn(pPool, UINT32_MAX);
606 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
607
608 return ASMAtomicIncU32(&pPool->cRefs);
609}
610RT_EXPORT_SYMBOL(RTReqPoolRetain);
611
612
613RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
614{
615 /*
616 * Ignore NULL and validate the request.
617 */
618 if (!hPool)
619 return 0;
620 PRTREQPOOLINT pPool = hPool;
621 AssertPtrReturn(pPool, UINT32_MAX);
622 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
623
624 /*
625 * Drop a reference, free it when it reaches zero.
626 */
627 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
628 if (cRefs == 0)
629 {
630 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
631
632 RTCritSectEnter(&pPool->CritSect);
633#ifdef RT_STRICT
634 RTTHREAD const hSelf = RTThreadSelf();
635#endif
636
637 /* Indicate to the worker threads that we're shutting down. */
638 ASMAtomicWriteBool(&pPool->fDestructing, true);
639 PRTREQPOOLTHREAD pThread;
640 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
641 {
642 Assert(pThread->hThread != hSelf);
643 RTThreadUserSignal(pThread->hThread);
644 }
645
646 /* Cancel pending requests. */
647 Assert(!pPool->pPendingRequests);
648 while (pPool->pPendingRequests)
649 {
650 PRTREQINT pReq = pPool->pPendingRequests;
651 pPool->pPendingRequests = pReq->pNext;
652 rtReqPoolCancelReq(pReq);
653 }
654 pPool->ppPendingRequests = NULL;
655
656 /* Wait for the workers to shut down. */
657 while (!RTListIsEmpty(&pPool->WorkerThreads))
658 {
659 RTCritSectLeave(&pPool->CritSect);
660 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
661 RTCritSectEnter(&pPool->CritSect);
662 /** @todo should we wait forever here? */
663 }
664
665 /* Free recycled requests. */
666 for (;;)
667 {
668 PRTREQINT pReq = pPool->pFreeRequests;
669 if (!pReq)
670 break;
671 pPool->pFreeRequests = pReq->pNext;
672 pPool->cCurFreeRequests--;
673 rtReqFreeIt(pReq);
674 }
675
676 /* Finally, free the handle. */
677 RTMemFree(pPool);
678 }
679
680 return cRefs;
681}
682RT_EXPORT_SYMBOL(RTReqPoolRelease);
683
684
685RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
686{
687 PRTREQPOOLINT pPool = hPool;
688 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
689 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
690
691 /*
692 * Try recycle old requests.
693 */
694 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
695 {
696 RTCritSectEnter(&pPool->CritSect);
697 PRTREQINT pReq = pPool->pFreeRequests;
698 if (pReq)
699 {
700 ASMAtomicDecU32(&pPool->cCurFreeRequests);
701 pPool->pFreeRequests = pReq->pNext;
702
703 RTCritSectLeave(&pPool->CritSect);
704
705 Assert(pReq->fPoolOrQueue);
706 Assert(pReq->uOwner.hPool == pPool);
707
708 int rc = rtReqReInit(pReq, enmType);
709 if (RT_SUCCESS(rc))
710 {
711 *phReq = pReq;
712 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
713 return rc;
714 }
715 }
716 else
717 RTCritSectLeave(&pPool->CritSect);
718 }
719
720 /*
721 * Allocate a new request.
722 */
723 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
724 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
725 return VINF_SUCCESS;
726}
727RT_EXPORT_SYMBOL(RTReqPoolAlloc);
728
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette