VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39545

Last change on this file since 39545 was 39545, checked in by vboxsync, 13 years ago

laptop -> workstation.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 13.6 KB
Line 
1/* $Id: reqpool.cpp 39545 2011-12-07 10:27:29Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Structures and Typedefs *
51*******************************************************************************/
52typedef struct RTREQPOOLTHREAD
53{
54 /** Node in the RTREQPOOLINT::IdleThreads list. */
55 RTLISTNODE IdleNode;
56 /** Node in the RTREQPOOLINT::WorkerThreads list. */
57 RTLISTNODE ListNode;
58
59 /** The submit timestamp of the pending request. */
60 uint64_t uPendingNanoTs;
61 /** The submit timestamp of the request processing. */
62 uint64_t uProcessingNanoTs;
63 /** When this CPU went idle the last time. */
64 uint64_t uIdleNanoTs;
65 /** The number of requests processed by this thread. */
66 uint64_t cReqProcessed;
67 /** Total time the requests processed by this thread took to process. */
68 uint64_t cNsTotalReqProcessing;
69 /** Total time the requests processed by this thread had to wait in
70 * the queue before being scheduled. */
71 uint64_t cNsTotalReqQueued;
72 /** The CPU this was scheduled last time we checked. */
73 RTCPUID idLastCpu;
74
75 /** The submitter will put an incoming request here when scheduling an idle
76 * thread. */
77 PRTREQINT volatile pTodoReq;
78 /** The request the thread is currently processing. */
79 PRTREQINT volatile pPendingReq;
80
81 /** The thread handle. */
82 RTTHREAD hThread;
83 /** Nano seconds timestamp representing the birth time of the thread. */
84 uint64_t uBirthNanoTs;
85 /** Pointer to the request thread pool instance the thread is associated
86 * with. */
87 struct RTREQPOOLINT *pPool;
88} RTREQPOOLTHREAD;
89/** Pointer to a worker thread. */
90typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
91
92/**
93 * Request thread pool instance data.
94 */
95typedef struct RTREQPOOLINT
96{
97 /** Magic value (RTREQPOOL_MAGIC). */
98 uint32_t u32Magic;
99
100 /** The worker thread type. */
101 RTTHREADTYPE enmThreadType;
102 /** The current number of worker threads. */
103 uint32_t cCurThreads;
104 /** The maximum number of worker threads. */
105 uint32_t cMaxThreads;
106 /** The number of threads which should be spawned before throttling kicks
107 * in. */
108 uint32_t cThreadsThreshold;
109 /** The minimum number of worker threads. */
110 uint32_t cMinThreads;
111 /** The number of milliseconds a thread needs to be idle before it is
112 * considered for retirement. */
113 uint32_t cMsMinIdle;
114 /** The max number of milliseconds to push back a submitter before creating
115 * a new worker thread once the threshold has been reached. */
116 uint32_t cMsMaxPushBack;
117 /** The minimum number of milliseconds to push back a submitter before
118 * creating a new worker thread once the threshold has been reached. */
119 uint32_t cMsMinPushBack;
120 /** The current submitter push back in milliseconds.
121 * This is recalculated when worker threads come and go. */
122 uint32_t cMsCurPushBack;
123
124 /** Statistics: The total number of threads created. */
125 uint32_t cThreadsCreated;
126 /** Statistics: The timestamp when the last thread was created. */
127 uint64_t uLastThreadCreateNanoTs;
128 /** Linked list of worker threads. */
129 RTLISTANCHOR WorkerThreads;
130
131 /** Event semaphore that submitters block on when pushing back . */
132 RTSEMEVENT hPushBackEvt;
133
134 /** Critical section serializing access to members of this structure. */
135 RTCRITSECT CritSect;
136
137 /** Destruction indicator. The worker threads checks in their loop. */
138 bool volatile fDestructing;
139
140 /** Reference counter. */
141 uint32_t volatile cRefs;
142 /** Number of threads pushing back. */
143 uint32_t volatile cPushingBack;
144 /** The number of idle thread or threads in the process of becoming
145 * idle. This is increased before the to-be-idle thread tries to enter
146 * the critical section and add itself to the list. */
147 uint32_t volatile cIdleThreads;
148 /** Linked list of idle threads. */
149 RTLISTANCHOR IdleThreads;
150
151 /** Head of the request FIFO. */
152 PRTREQINT pPendingRequests;
153 /** Where to insert the next request. */
154 PRTREQINT *ppPendingRequests;
155
156} RTREQPOOLINT;
157/** Pointer to a request thread pool instance. */
158typedef RTREQPOOLINT *PRTREQPOOLINT;
159
160
161static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
162{
163 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
164 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
165 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
166
167 uint32_t cMsCurPushBack;
168 if ((cMsRange >> 2) >= cSteps)
169 cMsCurPushBack = cMsRange / cSteps * iStep;
170 else
171 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
172 cMsCurPushBack += pPool->cMsMinPushBack;
173
174 pPool->cMsCurPushBack = cMsCurPushBack;
175}
176
177
178
179static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
180{
181 /*
182 * Update thread state.
183 */
184 pThread->uProcessingNanoTs = RTTimeNanoTS();
185 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
186 pThread->pPendingReq = pReq;
187 Assert(pReq->u32Magic == RTREQ_MAGIC);
188
189 /*
190 * Do the actual processing.
191 */
192 /** @todo */
193
194 /*
195 * Update thread statistics and state.
196 */
197 uint64_t const uNsTsEnd = RTTimeNanoTS();
198 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
199 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
200 pThread->cReqProcessed++;
201}
202
203
204
205static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
206{
207 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
208 PRTREQPOOLINT pPool = pThread->pPool;
209
210 /*
211 * The work loop.
212 */
213 uint64_t cPrevReqProcessed = 0;
214 while (pPool->fDestructing)
215 {
216 /*
217 * Pending work?
218 */
219 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
220 if (pReq)
221 rtReqPoolThreadProcessRequest(pThread, pReq);
222 else
223 {
224 ASMAtomicIncU32(&pPool->cIdleThreads);
225 RTCritSectEnter(&pPool->CritSect);
226
227 /* Recheck the todo request pointer after entering the critsect. */
228 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
229 if (!pReq)
230 {
231 /* Any pending requests in the queue? */
232 pReq = pPool->pPendingRequests;
233 if (pReq)
234 {
235 pPool->pPendingRequests = pReq->pNext;
236 if (pReq->pNext == NULL)
237 pPool->ppPendingRequests = &pPool->pPendingRequests;
238 }
239 }
240
241 if (pReq)
242 {
243 /*
244 * Un-idle ourselves and process the request.
245 */
246 if (!RTListIsEmpty(&pThread->IdleNode))
247 {
248 RTListNodeRemove(&pThread->IdleNode);
249 RTListInit(&pThread->IdleNode);
250 }
251 ASMAtomicDecU32(&pPool->cIdleThreads);
252 RTCritSectLeave(&pPool->CritSect);
253
254 rtReqPoolThreadProcessRequest(pThread, pReq);
255 }
256 else
257 {
258 /*
259 * Nothing to do, go idle.
260 */
261 if (cPrevReqProcessed != pThread->cReqProcessed)
262 {
263 pThread->cReqProcessed = cPrevReqProcessed;
264 pThread->uIdleNanoTs = RTTimeNanoTS();
265 }
266
267 if (RTListIsEmpty(&pThread->IdleNode))
268 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
269 RTThreadUserReset(hThreadSelf);
270
271 RTCritSectLeave(&pPool->CritSect);
272
273 RTThreadUserWait(hThreadSelf, 0);
274
275
276
277 }
278 }
279 }
280
281 /*
282 * Clean up on the way out.
283 */
284 RTCritSectEnter(&pPool->CritSect);
285
286 /** @todo .... */
287
288 rtReqPoolRecalcPushBack(pPool);
289
290 RTCritSectLeave(&pPool->CritSect);
291
292 return VINF_SUCCESS;
293}
294
295static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
296{
297 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
298 if (!pThread)
299 return;
300
301 pThread->uBirthNanoTs = RTTimeNanoTS();
302 pThread->pPool = pPool;
303 pThread->idLastCpu = NIL_RTCPUID;
304 pThread->hThread = NIL_RTTHREAD;
305 RTListInit(&pThread->IdleNode);
306 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
307 pPool->cCurThreads++;
308 pPool->cThreadsCreated++;
309
310 static uint32_t s_idThread = 0;
311 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
312 pPool->enmThreadType, RTTHREADFLAGS_WAITABLE, "REQPT%02u", ++s_idThread);
313 if (RT_SUCCESS(rc))
314 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
315 else
316 {
317 pPool->cCurThreads--;
318 RTListNodeRemove(&pThread->ListNode);
319 RTMemFree(pThread);
320 }
321}
322
323
324DECLHIDDEN(int) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
325{
326 /*
327 * Prepare the request.
328 */
329 pReq->uSubmitNanoTs = RTTimeNanoTS();
330
331
332 RTCritSectEnter(&pPool->CritSect);
333
334 /*
335 * Try schedule the request to any currently idle thread.
336 */
337 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
338 if (pThread)
339 {
340 /** @todo CPU affinity... */
341 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
342
343 RTListNodeRemove(&pThread->IdleNode);
344 RTListInit(&pThread->IdleNode);
345 ASMAtomicDecU32(&pPool->cIdleThreads);
346
347 RTThreadUserSignal(pThread->hThread);
348
349 RTCritSectLeave(&pPool->CritSect);
350 return VINF_SUCCESS;
351 }
352 Assert(RTListIsEmpty(&pPool->IdleThreads));
353
354 /*
355 * Put the request in the pending queue.
356 */
357 pReq->pNext = NULL;
358 *pPool->ppPendingRequests = pReq;
359 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
360
361 /*
362 * If there is an incoming worker thread already or we've reached the
363 * maximum number of worker threads, we're done.
364 */
365 if ( pPool->cIdleThreads > 0
366 || pPool->cCurThreads >= pPool->cMaxThreads)
367 {
368 RTCritSectLeave(&pPool->CritSect);
369 return VINF_SUCCESS;
370 }
371
372 /*
373 * Push back before creating a new worker thread.
374 */
375 if ( pPool->cCurThreads > pPool->cThreadsThreshold
376 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
377 {
378 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
379 if (hEvt == NIL_RTSEMEVENTMULTI)
380 {
381 int rc = RTSemEventMultiCreate(&hEvt);
382 if (RT_SUCCESS(rc))
383 pReq->hPushBackEvt = hEvt;
384 else
385 hEvt = NIL_RTSEMEVENTMULTI;
386 }
387 if (hEvt != NIL_RTSEMEVENTMULTI)
388 {
389 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
390 pPool->cPushingBack++;
391 RTCritSectLeave(&pPool->CritSect);
392
393 /** @todo this is everything but perfect... it makes wake up order
394 * assumptions. A better solution would be having a lazily
395 * allocated push back event on each request. */
396 int rc = RTSemEventWait(pPool->hPushBackEvt, cMsTimeout);
397
398 RTCritSectEnter(&pPool->CritSect);
399 pPool->cPushingBack--;
400 /** @todo check if it's still on the list before going on. */
401 }
402 }
403
404 /*
405 * Create a new thread for processing the request.
406 * For simplicity, we don't bother leaving the critical section while doing so.
407 */
408 rtReqPoolCreateNewWorker(pPool);
409
410 RTCritSectLeave(&pPool->CritSect);
411 return VINF_SUCCESS;
412}
413
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette