VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39517

Last change on this file since 39517 was 39517, checked in by vboxsync, 13 years ago

Some request thread pool musings.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 13.0 KB
Line 
1/* $Id: reqpool.cpp 39517 2011-12-02 15:58:27Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Structures and Typedefs *
51*******************************************************************************/
52typedef struct RTREQPOOLTHREAD
53{
54 /** Node in the RTREQPOOLINT::IdleThreads list. */
55 RTLISTNODE IdleNode;
56 /** Node in the RTREQPOOLINT::WorkerThreads list. */
57 RTLISTNODE ListNode;
58
59 /** The submit timestamp of the pending request. */
60 uint64_t uPendingNanoTs;
61 /** The submit timestamp of the request processing. */
62 uint64_t uProcessingNanoTs;
63 /** When this CPU went idle the last time. */
64 uint64_t uIdleNanoTs;
65 /** The number of requests processed by this thread. */
66 uint64_t cReqProcessed;
67 /** Total time the requests processed by this thread took to process. */
68 uint64_t cNsTotalReqProcessing;
69 /** Total time the requests processed by this thread had to wait in
70 * the queue before being scheduled. */
71 uint64_t cNsTotalReqQueued;
72 /** The CPU this was scheduled last time we checked. */
73 RTCPUID idLastCpu;
74
75 /** The submitter will put an incoming request here when scheduling an idle
76 * thread. */
77 PRTREQINT volatile pTodoReq;
78 /** The request the thread is currently processing. */
79 PRTREQINT volatile pPendingReq;
80
81 /** The thread handle. */
82 RTTHREAD hThread;
83 /** Nano seconds timestamp representing the birth time of the thread. */
84 uint64_t uBirthNanoTs;
85 /** Pointer to the request thread pool instance the thread is associated
86 * with. */
87 struct RTREQPOOLINT *pPool;
88} RTREQPOOLTHREAD;
89/** Pointer to a worker thread. */
90typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
91
92/**
93 * Request thread pool instance data.
94 */
95typedef struct RTREQPOOLINT
96{
97 /** Magic value (RTREQPOOL_MAGIC). */
98 uint32_t u32Magic;
99
100 /** The worker thread type. */
101 RTTHREADTYPE enmThreadType;
102 /** The current number of worker threads. */
103 uint32_t cCurThreads;
104 /** The maximum number of worker threads. */
105 uint32_t cMaxThreads;
106 /** The number of threads which should be spawned before throttling kicks
107 * in. */
108 uint32_t cThreadsThreshold;
109 /** The minimum number of worker threads. */
110 uint32_t cMinThreads;
111 /** The number of milliseconds a thread needs to be idle before it is
112 * considered for retirement. */
113 uint32_t cMsMinIdle;
114 /** The max number of milliseconds to push back a submitter before creating
115 * a new worker thread once the threshold has been reached. */
116 uint32_t cMsMaxPushBack;
117 /** The minimum number of milliseconds to push back a submitter before
118 * creating a new worker thread once the threshold has been reached. */
119 uint32_t cMsMinPushBack;
120 /** The current submitter push back in milliseconds.
121 * This is recalculated when worker threads come and go. */
122 uint32_t cMsCurPushBack;
123
124 /** Statistics: The total number of threads created. */
125 uint32_t cThreadsCreated;
126 /** Statistics: The timestamp when the last thread was created. */
127 uint64_t uLastThreadCreateNanoTs;
128 /** Linked list of worker threads. */
129 RTLISTANCHOR WorkerThreads;
130
131 /** Event semaphore that submitters block on when pushing back . */
132 RTSEMEVENT hPushBackEvt;
133
134 /** Critical section serializing access to members of this structure. */
135 RTCRITSECT CritSect;
136
137 /** Destruction indicator. The worker threads checks in their loop. */
138 bool volatile fDestructing;
139
140 /** Reference counter. */
141 uint32_t volatile cRefs;
142 /** Number of threads pushing back. */
143 uint32_t volatile cPushingBack;
144 /** The number of idle thread or threads in the process of becoming
145 * idle. This is increased before the to-be-idle thread tries to enter
146 * the critical section and add itself to the list. */
147 uint32_t volatile cIdleThreads;
148 /** Linked list of idle threads. */
149 RTLISTANCHOR IdleThreads;
150
151 /** Head of the request FIFO. */
152 PRTREQINT pPendingRequests;
153 /** Where to insert the next request. */
154 PRTREQINT *ppPendingRequests;
155
156} RTREQPOOLINT;
157/** Pointer to a request thread pool instance. */
158typedef RTREQPOOLINT *PRTREQPOOLINT;
159
160
161static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
162{
163 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
164 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
165 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
166
167 uint32_t cMsCurPushBack;
168 if ((cMsRange >> 2) >= cSteps)
169 cMsCurPushBack = cMsRange / cSteps * iStep;
170 else
171 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
172 cMsCurPushBack += pPool->cMsMinPushBack;
173
174 pPool->cMsCurPushBack = cMsCurPushBack;
175}
176
177
178
179static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
180{
181 /*
182 * Update thread state.
183 */
184 pThread->uProcessingNanoTs = RTTimeNanoTS();
185 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
186 pThread->pPendingReq = pReq;
187 Assert(pReq->u32Magic == RTREQ_MAGIC);
188
189 /*
190 * Do the actual processing.
191 */
192 /** @todo */
193
194 /*
195 * Update thread statistics and state.
196 */
197 uint64_t const uNsTsEnd = RTTimeNanoTS();
198 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
199 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
200 pThread->cReqProcessed++;
201}
202
203
204
205static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
206{
207 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
208 PRTREQPOOLINT pPool = pThread->pPool;
209
210 /*
211 * The work loop.
212 */
213 uint64_t cPrevReqProcessed = 0;
214 while (pPool->fDestructing)
215 {
216 /*
217 * Pending work?
218 */
219 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
220 if (pReq)
221 rtReqPoolThreadProcessRequest(pThread, pReq);
222 else
223 {
224 ASMAtomicIncU32(&pPool->cIdleThreads);
225 RTCritSectEnter(&pPool->CritSect);
226
227 /* Recheck the todo request pointer after entering the critsect. */
228 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
229 if (!pReq)
230 {
231 /* Any pending requests in the queue? */
232 pReq = pPool->pPendingRequests;
233 if (pReq)
234 {
235 pPool->pPendingRequests = pReq->pNext;
236 if (pReq->pNext == NULL)
237 pPool->ppPendingRequests = &pPool->pPendingRequests;
238 }
239 }
240
241 if (pReq)
242 {
243 /*
244 * Un-idle ourselves and process the request.
245 */
246 if (!RTListIsEmpty(&pThread->IdleNode))
247 {
248 RTListNodeRemove(&pThread->IdleNode);
249 RTListInit(&pThread->IdleNode);
250 }
251 ASMAtomicDecU32(&pPool->cIdleThreads);
252 RTCritSectLeave(&pPool->CritSect);
253
254 rtReqPoolThreadProcessRequest(pThread, pReq);
255 }
256 else
257 {
258 /*
259 * Nothing to do, go idle.
260 */
261 if (cPrevReqProcessed != pThread->cReqProcessed)
262 {
263 pThread->cReqProcessed = cPrevReqProcessed;
264 pThread->uIdleNanoTs = RTTimeNanoTS();
265 }
266
267 if (RTListIsEmpty(&pThread->IdleNode))
268 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
269 RTThreadUserReset(hThreadSelf);
270
271 RTCritSectLeave(&pPool->CritSect);
272
273 RTThreadUserWait(hThreadSelf, 0);
274
275
276
277 }
278 }
279 }
280
281 /*
282 * Clean up on the way out.
283 */
284 RTCritSectEnter(&pPool->CritSect);
285
286 /** @todo .... */
287
288 rtReqPoolRecalcPushBack(pPool);
289
290 RTCritSectLeave(&pPool->CritSect);
291
292 return VINF_SUCCESS;
293}
294
295
296DECLHIDDEN(int) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
297{
298 /*
299 * Prepare the request.
300 */
301 pReq->uSubmitNanoTs = RTTimeNanoTS();
302
303
304 RTCritSectEnter(&pPool->CritSect);
305
306 /*
307 * Try schedule the request to any currently idle thread.
308 */
309 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
310 if (pThread)
311 {
312 /** @todo CPU affinity... */
313 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
314
315 RTListNodeRemove(&pThread->IdleNode);
316 RTListInit(&pThread->IdleNode);
317 ASMAtomicDecU32(&pPool->cIdleThreads);
318
319 RTThreadUserSignal(pThread->hThread);
320
321 RTCritSectLeave(&pPool->CritSect);
322 return VINF_SUCCESS;
323 }
324 Assert(RTListIsEmpty(&pPool->IdleThreads));
325
326 /*
327 * Put the request in the pending queue.
328 */
329 pReq->pNext = NULL;
330 *pPool->ppPendingRequests = pReq;
331 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
332
333 /*
334 * If there is an incoming worker thread already or we've reached the
335 * maximum number of worker threads, we're done.
336 */
337 if ( pPool->cIdleThreads > 0
338 || pPool->cCurThreads >= pPool->cMaxThreads)
339 {
340 RTCritSectLeave(&pPool->CritSect);
341 return VINF_SUCCESS;
342 }
343
344 /*
345 * Push back before creating a new worker thread.
346 */
347 if ( pPool->cCurThreads > pPool->cThreadsThreshold
348 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
349 {
350 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
351 pPool->cPushingBack++;
352 RTCritSectLeave(&pPool->CritSect);
353
354 /** @todo this is everything but perfect... it makes wake up order
355 * assumptions. A better solution would be having a lazily
356 * allocated push back event on each request. */
357 int rc = RTSemEventWait(pPool->hPushBackEvt, cMsTimeout);
358
359 RTCritSectEnter(&pPool->CritSect);
360 pPool->cPushingBack--;
361 }
362
363 /*
364 * Create a new thread for processing the request, or should we wait?
365 */
366 pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
367 if (pThread)
368 {
369 pThread->uBirthNanoTs = RTTimeNanoTS();
370 pThread->pPool = pPool;
371 pThread->idLastCpu = NIL_RTCPUID;
372 pThread->hThread = NIL_RTTHREAD;
373 RTListInit(&pThread->IdleNode);
374 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
375 pPool->cCurThreads++;
376 pPool->cThreadsCreated++;
377
378 static uint32_t s_idThread = 0;
379 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
380 pPool->enmThreadType, RTTHREADFLAGS_WAITABLE, "REQPT%02u", ++s_idThread);
381 if (RT_SUCCESS(rc))
382 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
383 else
384 {
385 pPool->cCurThreads--;
386 RTListNodeRemove(&pThread->ListNode);
387 RTMemFree(pThread);
388 }
389 }
390 RTCritSectLeave(&pPool->CritSect);
391
392 return VINF_SUCCESS;
393}
394
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette