VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 66998

Last change on this file since 66998 was 66998, checked in by vboxsync, 8 years ago

ReCreateQueueData:deepTestGroupSort: more error diagnotics

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 66.5 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 66998 2017-05-22 09:28:07Z vboxsync $
3# pylint: disable=C0302
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2016 Oracle Corporation
13
14This file is part of VirtualBox Open Source Edition (OSE), as
15available from http://www.215389.xyz. This file is free software;
16you can redistribute it and/or modify it under the terms of the GNU
17General Public License (GPL) as published by the Free Software
18Foundation, in version 2 as it comes in the "COPYING" file of the
19VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21
22The contents of this file may alternatively be used under the terms
23of the Common Development and Distribution License Version 1.0
24(CDDL) only, as it comes in the "COPYING.CDDL" file of the
25VirtualBox OSE distribution, in which case the provisions of the
26CDDL are applicable instead of those of the GPL.
27
28You may elect to license modified versions of this file under the
29terms and conditions of either the GPL or the CDDL or both.
30"""
31__version__ = "$Revision: 66998 $"
32
33
34# Standard python imports.
35import unittest;
36
37# Validation Kit imports.
38from common import utils, constants;
39from testmanager import config;
40from testmanager.core.build import BuildDataEx, BuildLogic;
41from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
42from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
43from testmanager.core.globalresource import GlobalResourceLogic;
44from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
45from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
46from testmanager.core.testbox import TestBoxData, TestBoxDataEx;
47from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
48from testmanager.core.testcase import TestCaseLogic;
49from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
50from testmanager.core.testset import TestSetData, TestSetLogic;
51
52
53class ReCreateQueueData(object):
54 """
55 Data object for recreating a scheduling queue.
56
57 It's mostly a storage object, but has a few data checking operation
58 associated with it.
59 """
60
61 def __init__(self, oDb, idSchedGroup):
62 #
63 # Load data from the database.
64 #
65
66 # Will extend the entries with aoTestCases and dTestCases members
67 # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
68 self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
69
70 # aoTestCases entries are TestCaseData instance with iSchedPriority
71 # and idTestGroup added for our purposes.
72 # We will add oTestGroup and aoArgsVariations members to each further down.
73 self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
74
75 # Load dependencies.
76 oTestCaseLogic = TestCaseLogic(oDb)
77 for oTestCase in self.aoTestCases:
78 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
79
80 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
81 # and idTestGroup added for our purposes.
82 # We will add oTestGroup and oTestCase members to each further down.
83 self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
84
85 #
86 # Generate global lookups.
87 #
88
89 # Generate a testcase lookup dictionary for use when working on
90 # argument variations.
91 self.dTestCases = dict();
92 for oTestCase in self.aoTestCases:
93 self.dTestCases[oTestCase.idTestCase] = oTestCase;
94 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
95
96 # Generate a testgroup lookup dictionary.
97 self.dTestGroups = dict();
98 for oTestGroup in self.aoTestGroups:
99 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
100 assert len(self.dTestGroups) == len(self.aoTestGroups);
101
102 #
103 # Associate extra members with the base data.
104 #
105 if self.aoTestGroups:
106 # Prep the test groups.
107 for oTestGroup in self.aoTestGroups:
108 oTestGroup.aoTestCases = list();
109 oTestGroup.dTestCases = dict();
110
111 # Link testcases to their group, both directions. Prep testcases for
112 # argument varation association.
113 oTestGroup = self.aoTestGroups[0];
114 for oTestCase in self.aoTestCases:
115 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
116 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
117
118 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
119 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
120 oTestGroup.aoTestCases.append(oTestCase);
121 oTestCase.oTestGroup = oTestGroup;
122 oTestCase.aoArgsVariations = list();
123
124 # Associate testcase argument variations with their testcases (group)
125 # in both directions.
126 oTestGroup = self.aoTestGroups[0];
127 oTestCase = self.aoTestCases[0] if self.aoTestCases else None;
128 for oArgVariation in self.aoArgsVariations:
129 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
130 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
131 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
132 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
133
134 oTestCase.aoArgsVariations.append(oArgVariation);
135 oArgVariation.oTestCase = oTestCase;
136 oArgVariation.oTestGroup = oTestGroup;
137
138 else:
139 assert not self.aoTestCases;
140 assert not self.aoArgsVariations;
141 # done.
142
143 @staticmethod
144 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
145 """ Returns a chain of IDs error entry. """
146
147 sMsg += ' Dependency chain: %s' % (aidChain[0],);
148 for i in range(1, len(aidChain)):
149 sMsg += ' -> %s' % (aidChain[i],);
150
151 aoErrors.append([sMsg, oObj]);
152 return aoErrors;
153
154 def checkForGroupDepCycles(self):
155 """
156 Checks for testgroup depencency cycles and any missing testgroup
157 dependencies.
158 Returns array of errors (see SchedulderBase.recreateQueue()).
159 """
160 aoErrors = list();
161 for oTestGroup in self.aoTestGroups:
162 idPreReq = oTestGroup.idTestGroupPreReq;
163 if idPreReq is None:
164 oTestGroup.aidTestGroupPreReqs = list();
165 continue;
166
167 aidChain = [oTestGroup.idTestGroup,];
168 while idPreReq is not None:
169 aidChain.append(idPreReq);
170 if len(aidChain) >= 10:
171 self._addPreReqError(aoErrors, aidChain, oTestGroup,
172 'TestGroup #%s prerequisite chain is too long!'
173 % (oTestGroup.idTestGroup,));
174 break;
175
176 oDep = self.dTestGroups.get(idPreReq, None);
177 if oDep is None:
178 self._addPreReqError(aoErrors, aidChain, oTestGroup,
179 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
180 % (oTestGroup.idTestGroup, idPreReq,));
181 break;
182
183 idPreReq = oDep.idTestGroupPreReq;
184 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
185
186 return aoErrors;
187
188
189 def checkForMissingTestCaseDeps(self):
190 """
191 Checks that testcase dependencies stays within bounds. We do not allow
192 dependencies outside a testgroup, no dependency cycles or even remotely
193 long dependency chains.
194
195 Returns array of errors (see SchedulderBase.recreateQueue()).
196 """
197 aoErrors = list();
198 for oTestGroup in self.aoTestGroups:
199 for oTestCase in oTestGroup.aoTestCases:
200 if not oTestCase.aidPreReqs:
201 continue;
202
203 # Stupid recursion code using special stack(s).
204 aiIndexes = [[oTestCase, 0], ];
205 aidChain = [oTestCase.idTestGroup,];
206 while aiIndexes:
207 (oCur, i) = aiIndexes[-1];
208 if i >= len(oCur.aidPreReqs):
209 aiIndexes.pop();
210 aidChain.pop();
211 else:
212 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
213
214 idPreReq = oTestCase.aidPreReqs[i];
215 oDep = oTestGroup.dTestCases.get(idPreReq, None);
216 if oDep is None:
217 self._addPreReqError(aoErrors, aidChain, oTestCase,
218 'TestCase #%s prerequisite #%s is not in the scheduling group!'
219 % (oTestCase.idTestCase, idPreReq));
220 elif idPreReq in aidChain:
221 self._addPreReqError(aoErrors, aidChain, oTestCase,
222 'TestCase #%s prerequisite #%s creates a cycle!'
223 % (oTestCase.idTestCase, idPreReq));
224 elif not oDep.aiPreReqs:
225 pass;
226 elif len(aidChain) >= 10:
227 self._addPreReqError(aoErrors, aidChain, oTestCase,
228 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
229 else:
230 aiIndexes.append([oDep, 0]);
231 aidChain.append(idPreReq);
232
233 return aoErrors;
234
235 def deepTestGroupSort(self):
236 """
237 Sorts the testgroups and their testcases by priority and dependencies.
238 Note! Don't call this before checking for dependency cycles!
239 """
240 if not self.aoTestGroups:
241 return;
242
243 #
244 # ASSUMES groups as well as testcases are sorted by priority by the
245 # database. So we only have to concern ourselves with the dependency
246 # sorting.
247 #
248 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
249 for oTestGroup in self.aoTestGroups:
250 if oTestGroup.iSchedPriority > iGrpPrio:
251 raise TMExceptionBase('Incorrectly sorted testgroups returned by database.');
252 iGrpPrio = oTestGroup.iSchedPriority;
253
254 if oTestGroup.aoTestCases:
255 iTstPrio = oTestGroup.aoTestCases[0];
256 for iTestCase, oTestCase in enumerate(oTestGroup.aoTestCases):
257 if oTestCase.iSchedPriority > iTstPrio:
258 raise TMExceptionBase('Incorrectly sorted testcases returned by database: i=%s prio=%s idGrp=%s %s'
259 % ( iTestCase, iTstPrio, oTestGroup.idTestGroup,
260 ','.join(['(%s: %s)' % (oCur.idTestCase, oCur.iSchedPriority)
261 for oCur in oTestGroup.aoTestCases]),));
262
263 #
264 # Sort the testgroups by dependencies.
265 #
266 i = 0;
267 while i < len(self.aoTestGroups):
268 oTestGroup = self.aoTestGroups[i];
269 if oTestGroup.idTestGroupPreReq is not None:
270 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
271 if iPreReq > i:
272 # The prerequisite is after the current entry. Move the
273 # current entry so that it's following it's prereq entry.
274 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
275 self.aoTestGroups.pop(i);
276 continue;
277 assert iPreReq < i;
278 i += 1; # Advance.
279
280 #
281 # Sort the testcases by dependencies.
282 # Same algorithm as above, just more prerequisites.
283 #
284 for oTestGroup in self.aoTestGroups:
285 i = 0;
286 while i < len(oTestGroup.aoTestCases):
287 oTestCase = oTestGroup.aoTestCases[i];
288 if oTestCase.aidPreReqs:
289 for idPreReq in oTestCase.aidPreReqs:
290 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
291 if iPreReq > i:
292 # The prerequisite is after the current entry. Move the
293 # current entry so that it's following it's prereq entry.
294 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
295 oTestGroup.aoTestGroups.pop(i);
296 i -= 1; # Don't advance.
297 break;
298 assert iPreReq < i;
299 i += 1; # Advance.
300
301
302 return True;
303
304
305
306class SchedQueueData(ModelDataBase):
307 """
308 Scheduling queue data item.
309 """
310
311 ksIdAttr = 'idSchedGroup';
312
313 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
314 ksParam_idItem = 'SchedQueueData_idItem';
315 ksParam_offQueue = 'SchedQueueData_offQueue';
316 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
317 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
318 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
319 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
320 ksParam_tsConfig = 'SchedQueueData_tsConfig';
321 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
322 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
323 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
324
325 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
326 'tsConfig', 'tsLastScheduled' ];
327
328
329 def __init__(self):
330 ModelDataBase.__init__(self);
331
332 #
333 # Initialize with defaults.
334 # See the database for explanations of each of these fields.
335 #
336 self.idSchedGroup = None;
337 self.idItem = None;
338 self.offQueue = None;
339 self.idGenTestCaseArgs = None;
340 self.idTestGroup = None;
341 self.aidTestGroupPreReqs = None;
342 self.bmHourlySchedule = None;
343 self.tsConfig = None;
344 self.tsLastScheduled = None;
345 self.idTestSetGangLeader = None;
346 self.cMissingGangMembers = 1;
347
348 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=R0913
349 bmHourlySchedule, cMissingGangMembers,
350 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
351 """
352 Reinitialize with all attributes potentially given as inputs.
353 Return self.
354 """
355 self.idSchedGroup = idSchedGroup;
356 self.idItem = idItem;
357 self.offQueue = offQueue;
358 self.idGenTestCaseArgs = idGenTestCaseArgs;
359 self.idTestGroup = idTestGroup;
360 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
361 self.bmHourlySchedule = bmHourlySchedule;
362 self.tsConfig = tsConfig;
363 self.tsLastScheduled = tsLastScheduled;
364 self.idTestSetGangLeader = idTestSetGangLeader;
365 self.cMissingGangMembers = cMissingGangMembers;
366 return self;
367
368 def initFromDbRow(self, aoRow):
369 """
370 Initialize from database row (SELECT * FROM SchedQueues).
371 Returns self.
372 Raises exception if no row is specfied.
373 """
374 if aoRow is None:
375 raise TMExceptionBase('SchedQueueData not found.');
376
377 self.idSchedGroup = aoRow[0];
378 self.idItem = aoRow[1];
379 self.offQueue = aoRow[2];
380 self.idGenTestCaseArgs = aoRow[3];
381 self.idTestGroup = aoRow[4];
382 self.aidTestGroupPreReqs = aoRow[5];
383 self.bmHourlySchedule = aoRow[6];
384 self.tsConfig = aoRow[7];
385 self.tsLastScheduled = aoRow[8];
386 self.idTestSetGangLeader = aoRow[9];
387 self.cMissingGangMembers = aoRow[10];
388 return self;
389
390
391
392
393
394
395class SchedulerBase(object):
396 """
397 The scheduler base class.
398
399 The scheduler classes have two functions:
400 1. Recreate the scheduling queue.
401 2. Pick the next task from the queue.
402
403 The first is scheduler specific, the latter isn't.
404 """
405
406 class BuildCache(object):
407 """ Build cache. """
408
409 class BuildCacheIterator(object):
410 """ Build class iterator. """
411 def __init__(self, oCache):
412 self.oCache = oCache;
413 self.iCur = 0;
414
415 def __iter__(self):
416 """Returns self, required by the language."""
417 return self;
418
419 def next(self):
420 """Returns the next build, raises StopIteration when the end has been reached."""
421 while True:
422 if self.iCur >= len(self.oCache.aoEntries):
423 oEntry = self.oCache.fetchFromCursor();
424 if oEntry is None:
425 raise StopIteration;
426 else:
427 oEntry = self.oCache.aoEntries[self.iCur];
428 self.iCur += 1;
429 if not oEntry.fRemoved:
430 return oEntry;
431 # end
432
433 class BuildCacheEntry(object):
434 """ Build cache entry. """
435
436 def __init__(self, oBuild, fMaybeBlacklisted):
437 self.oBuild = oBuild;
438 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
439 self.fRemoved = False;
440 self._dPreReqDecisions = dict();
441
442 def remove(self):
443 """
444 Marks the cache entry as removed.
445 This doesn't actually remove it from the cache array, only marks
446 it as removed. It has no effect on open iterators.
447 """
448 self.fRemoved = True;
449
450 def getPreReqDecision(self, sPreReqSet):
451 """
452 Retrieves a cached prerequisite decision.
453 Returns boolean if found, None if not.
454 """
455 return self._dPreReqDecisions.get(sPreReqSet);
456
457 def setPreReqDecision(self, sPreReqSet, fDecision):
458 """
459 Caches a prerequistie decision.
460 """
461 self._dPreReqDecisions[sPreReqSet] = fDecision;
462 return fDecision;
463
464 def isBlacklisted(self, oDb):
465 """ Checks if the build is blacklisted. """
466 if self._fBlacklisted is None:
467 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
468 return self._fBlacklisted;
469
470
471 def __init__(self):
472 self.aoEntries = [];
473 self.oCursor = None;
474
475 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
476 """ Configures the build cursor for the cache. """
477 if not self.aoEntries and self.oCursor is None:
478 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
479 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
480 return True;
481
482 def __iter__(self):
483 """Return an iterator."""
484 return self.BuildCacheIterator(self);
485
486 def fetchFromCursor(self):
487 """ Fetches a build from the cursor and adds it to the cache."""
488 if self.oCursor is None:
489 return None;
490
491 try:
492 aoRow = self.oCursor.fetchOne();
493 except:
494 return None;
495 if aoRow is None:
496 return None;
497
498 oBuild = BuildDataEx().initFromDbRow(aoRow);
499 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
500 self.aoEntries.append(oEntry);
501 return oEntry;
502
503 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
504 self._oDb = oDb;
505 self._oSchedGrpData = oSchedGrpData;
506 self._iVerbosity = iVerbosity;
507 self._asMessages = [];
508 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
509 self.oBuildCache = self.BuildCache();
510 self.dTestGroupMembers = dict();
511
512 @staticmethod
513 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
514 """
515 Instantiate the scheduler specified by the scheduling group.
516 Returns scheduler child class instance. May raise exception if
517 the input is invalid.
518 """
519 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinuousIntegration:
520 from testmanager.core.schedulerbeci import SchdulerBeci;
521 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
522 else:
523 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
524 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
525 return oScheduler;
526
527
528 #
529 # Misc.
530 #
531
532 def msgDebug(self, sText):
533 """Debug printing."""
534 if self._iVerbosity > 1:
535 self._asMessages.append('debug:' + sText);
536 return None;
537
538 def msgInfo(self, sText):
539 """Info printing."""
540 if self._iVerbosity > 1:
541 self._asMessages.append('info: ' + sText);
542 return None;
543
544 def dprint(self, sMsg):
545 """Prints a debug message to the srv glue log (see config.py). """
546 if config.g_kfSrvGlueDebugScheduler:
547 self._oDb.dprint(sMsg);
548 return None;
549
550 def getElapsedSecs(self):
551 """ Returns the number of seconds this scheduling task has been running. """
552 tsSecNow = utils.timestampSecond();
553 if tsSecNow < self._tsSecStart: # paranoia
554 self._tsSecStart = tsSecNow;
555 return tsSecNow - self._tsSecStart;
556
557
558 #
559 # Create schedule.
560 #
561
562 def _recreateQueueCancelGatherings(self):
563 """
564 Cancels all pending gang gatherings on the current queue.
565 """
566 self._oDb.execute('SELECT idTestSetGangLeader\n'
567 'FROM SchedQueues\n'
568 'WHERE idSchedGroup = %s\n'
569 ' AND idTestSetGangLeader is not NULL\n'
570 , (self._oSchedGrpData.idSchedGroup,));
571 if self._oDb.getRowCount() > 0:
572 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
573 for aoRow in self._oDb.fetchAll():
574 idTestSetGangLeader = aoRow[0];
575 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
576 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
577 fCommit = False);
578 return True;
579
580 def _recreateQueueItems(self, oData):
581 """
582 Returns an array of queue items (SchedQueueData).
583 Child classes must override this.
584 """
585 _ = oData;
586 return [];
587
588 def recreateQueueWorker(self):
589 """
590 Worker for recreateQueue.
591 """
592
593 #
594 # Collect the necessary data and validate it.
595 #
596 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
597 aoErrors = oData.checkForGroupDepCycles();
598 aoErrors.extend(oData.checkForMissingTestCaseDeps());
599 if not aoErrors:
600 oData.deepTestGroupSort();
601
602 #
603 # The creation of the scheduling queue is done by the child class.
604 #
605 # We will try guess where in queue we're currently at and rotate
606 # the items such that we will resume execution in the approximately
607 # same position. The goal of the scheduler is to provide a 100%
608 # deterministic result so that if we regenerate the queue when there
609 # are no changes to the testcases, testgroups or scheduling groups
610 # involved, test execution will be unchanged (save for maybe just a
611 # little for gang gathering).
612 #
613 aoItems = list();
614 if oData.aoArgsVariations:
615 aoItems = self._recreateQueueItems(oData);
616 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
617 #for i in range(len(aoItems)):
618 # self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
619 if aoItems:
620 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
621 , (self._oSchedGrpData.idSchedGroup,));
622 if self._oDb.getRowCount() > 0:
623 offQueue = self._oDb.fetchOne()[0];
624 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
625 , (self._oSchedGrpData.idSchedGroup,));
626 cItems = self._oDb.fetchOne()[0];
627 offQueueNew = (offQueue * cItems) / len(aoItems);
628 if offQueueNew != 0:
629 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
630
631 #
632 # Replace the scheduling queue.
633 # Care need to be take to first timeout/abort any gangs in the
634 # gathering state since these use the queue to set up the date.
635 #
636 self._recreateQueueCancelGatherings();
637 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
638 self._oDb.insertList('INSERT INTO SchedQueues (\n'
639 ' idSchedGroup,\n'
640 ' offQueue,\n'
641 ' idGenTestCaseArgs,\n'
642 ' idTestGroup,\n'
643 ' aidTestGroupPreReqs,\n'
644 ' bmHourlySchedule,\n'
645 ' cMissingGangMembers )\n',
646 aoItems, self._formatItemForInsert);
647 return (aoErrors, self._asMessages);
648
649 def _formatItemForInsert(self, oItem):
650 """
651 Used by recreateQueueWorker together with TMDatabaseConnect::insertList
652 """
653 return self._oDb.formatBindArgs('(%s,%s,%s,%s,%s,%s,%s)'
654 , ( oItem.idSchedGroup,
655 oItem.offQueue,
656 oItem.idGenTestCaseArgs,
657 oItem.idTestGroup,
658 oItem.aidTestGroupPreReqs if oItem.aidTestGroupPreReqs else None,
659 oItem.bmHourlySchedule,
660 oItem.cMissingGangMembers
661 ));
662
663 @staticmethod
664 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
665 """
666 (Re-)creates the scheduling queue for the given group.
667
668 Returns (asMessages, asMessages). On success the array with the error
669 will be empty, on failure it will contain (sError, oRelatedObject)
670 entries. The messages is for debugging and are simple strings.
671
672 Raises exception database error.
673 """
674
675 aoExtraMsgs = [];
676 if oDb.debugIsExplainEnabled():
677 aoExtraMsgs += ['Warning! Disabling SQL explain to avoid deadlocking against locked tables.'];
678 oDb.debugDisableExplain();
679
680 aoErrors = [];
681 asMessages = [];
682 try:
683 #
684 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
685 # we lock quite a few tables while doing this work. We access more
686 # data than scheduleNewTask so we lock some additional tables.
687 #
688 oDb.rollback();
689 oDb.begin();
690 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
691 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
692 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
693
694 #
695 # Instantiate the scheduler and call the worker function.
696 #
697 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
698 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
699
700 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
701 if not aoErrors:
702 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
703 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
704 oDb.commit();
705 else:
706 oDb.rollback();
707
708 except:
709 oDb.rollback();
710 raise;
711
712 return (aoErrors, aoExtraMsgs + asMessages);
713
714
715
716 #
717 # Schedule Task.
718 #
719
720 def _composeGangArguments(self, idTestSet):
721 """
722 Composes the gang specific testdriver arguments.
723 Returns command line string, including a leading space.
724 """
725
726 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
727 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
728
729 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
730 for i, _ in enumerate(aoGangMembers):
731 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
732
733 return sArgs;
734
735
736 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
737 """
738 Given all the bits of data, compose an EXEC command response to the testbox.
739 """
740 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
741 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
742 assert oValidationKitBuild;
743 if sScriptZips is None:
744 sScriptZips = oValidationKitBuild.sBinaries;
745 else:
746 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
747 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
748
749 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
750 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
751 sCmdLine = sCmdLine.strip();
752 if oTestEx.cGangMembers > 1:
753 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
754
755 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
756 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
757
758 dResponse = \
759 {
760 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
761 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
762 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
763 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
764 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
765 };
766 return dResponse;
767
768 @staticmethod
769 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
770 """
771 Composes an EXEC response for a gang member (other than the last).
772 Returns a EXEC response or raises an exception (DB/input error).
773 """
774 #
775 # Gather the necessary data.
776 #
777 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
778 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
779 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTestSet.idGenTestCaseArgs);
780 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
781 oValidationKitBuild = None;
782 if oTestSet.idBuildTestSuite is not None:
783 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
784
785 #
786 # Instantiate the specified scheduler and let it do the rest.
787 #
788 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestSet.idSchedGroup, oTestSet.tsCreated);
789 assert oSchedGrpData.fEnabled is True;
790 assert oSchedGrpData.idBuildSrc is not None;
791 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
792
793 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
794
795
796 def _updateTask(self, oTask, tsNow):
797 """
798 Updates a gang schedule task.
799 """
800 assert oTask.cMissingGangMembers >= 1;
801 assert oTask.idTestSetGangLeader is not None;
802 assert oTask.idTestSetGangLeader >= 1;
803 if tsNow is not None:
804 self._oDb.execute('UPDATE SchedQueues\n'
805 ' SET idTestSetGangLeader = %s,\n'
806 ' cMissingGangMembers = %s,\n'
807 ' tsLastScheduled = %s\n'
808 'WHERE idItem = %s\n'
809 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
810 else:
811 self._oDb.execute('UPDATE SchedQueues\n'
812 ' SET cMissingGangMembers = %s\n'
813 'WHERE idItem = %s\n'
814 , (oTask.cMissingGangMembers, oTask.idItem,) );
815 return True;
816
817 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
818 """
819 The task has been scheduled successfully, reset it's data move it to
820 the end of the queue.
821 """
822 if cGangMembers > 1:
823 self._oDb.execute('UPDATE SchedQueues\n'
824 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
825 ' idTestSetGangLeader = NULL,\n'
826 ' cMissingGangMembers = %s\n'
827 'WHERE idItem = %s\n'
828 , (cGangMembers, oTask.idItem,) );
829 else:
830 self._oDb.execute('UPDATE SchedQueues\n'
831 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
832 ' idTestSetGangLeader = NULL,\n'
833 ' cMissingGangMembers = 1,\n'
834 ' tsLastScheduled = %s\n'
835 'WHERE idItem = %s\n'
836 , (tsNow, oTask.idItem,) );
837 return True;
838
839
840
841
842 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
843 # type: (SchedQueueData, TestCaseArgsDataEx, TestBoxData, BuildDataEx, BuildDataEx, datetime.datetime) -> int
844 """
845 Creates a test set for using the given data.
846 Will not commit, someone up the callstack will that later on.
847
848 Returns the test set ID, may raise an exception on database error.
849 """
850 # Lazy bird doesn't want to write testset.py and does it all here.
851
852 #
853 # We're getting the TestSet ID first in order to include it in the base
854 # file name (that way we can directly relate files on the disk to the
855 # test set when doing batch work), and also for idTesetSetGangLeader.
856 #
857 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
858 idTestSet = self._oDb.fetchOne()[0];
859
860 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
861 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
862
863 #
864 # Gang scheduling parameters. Changes the oTask data for updating by caller.
865 #
866 iGangMemberNo = 0;
867
868 if oTestEx.cGangMembers <= 1:
869 assert oTask.idTestSetGangLeader is None;
870 assert oTask.cMissingGangMembers <= 1;
871 elif oTask.idTestSetGangLeader is None:
872 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
873 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
874 oTask.idTestSetGangLeader = idTestSet;
875 else:
876 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
877 oTask.cMissingGangMembers -= 1;
878
879 #
880 # Do the database stuff.
881 #
882 self._oDb.execute('INSERT INTO TestSets (\n'
883 ' idTestSet,\n'
884 ' tsConfig,\n'
885 ' tsCreated,\n'
886 ' idBuild,\n'
887 ' idBuildCategory,\n'
888 ' idBuildTestSuite,\n'
889 ' idGenTestBox,\n'
890 ' idTestBox,\n'
891 ' idSchedGroup,\n'
892 ' idTestGroup,\n'
893 ' idGenTestCase,\n'
894 ' idTestCase,\n'
895 ' idGenTestCaseArgs,\n'
896 ' idTestCaseArgs,\n'
897 ' sBaseFilename,\n'
898 ' iGangMemberNo,\n'
899 ' idTestSetGangLeader )\n'
900 'VALUES ( %s,\n' # idTestSet
901 ' %s,\n' # tsConfig
902 ' %s,\n' # tsCreated
903 ' %s,\n' # idBuild
904 ' %s,\n' # idBuildCategory
905 ' %s,\n' # idBuildTestSuite
906 ' %s,\n' # idGenTestBox
907 ' %s,\n' # idTestBox
908 ' %s,\n' # idSchedGroup
909 ' %s,\n' # idTestGroup
910 ' %s,\n' # idGenTestCase
911 ' %s,\n' # idTestCase
912 ' %s,\n' # idGenTestCaseArgs
913 ' %s,\n' # idTestCaseArgs
914 ' %s,\n' # sBaseFilename
915 ' %s,\n' # iGangMemberNo
916 ' %s)\n' # idTestSetGangLeader
917 , ( idTestSet,
918 oTask.tsConfig,
919 tsNow,
920 oBuild.idBuild,
921 oBuild.idBuildCategory,
922 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
923 oTestBoxData.idGenTestBox,
924 oTestBoxData.idTestBox,
925 oTask.idSchedGroup,
926 oTask.idTestGroup,
927 oTestEx.oTestCase.idGenTestCase,
928 oTestEx.oTestCase.idTestCase,
929 oTestEx.idGenTestCaseArgs,
930 oTestEx.idTestCaseArgs,
931 sBaseFilename,
932 iGangMemberNo,
933 oTask.idTestSetGangLeader,
934 ));
935
936 self._oDb.execute('INSERT INTO TestResults (\n'
937 ' idTestResultParent,\n'
938 ' idTestSet,\n'
939 ' tsCreated,\n'
940 ' idStrName,\n'
941 ' cErrors,\n'
942 ' enmStatus,\n'
943 ' iNestingDepth)\n'
944 'VALUES ( NULL,\n' # idTestResultParent
945 ' %s,\n' # idTestSet
946 ' %s,\n' # tsCreated
947 ' 0,\n' # idStrName
948 ' 0,\n' # cErrors
949 ' \'running\'::TestStatus_T,\n'
950 ' 0)\n' # iNestingDepth
951 'RETURNING idTestResult'
952 , ( idTestSet, tsNow, ));
953 idTestResult = self._oDb.fetchOne()[0];
954
955 self._oDb.execute('UPDATE TestSets\n'
956 ' SET idTestResult = %s\n'
957 'WHERE idTestSet = %s\n'
958 , (idTestResult, idTestSet, ));
959
960 return idTestSet;
961
962 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
963 """
964 Tries to find the most recent validation kit build suitable for the given testbox.
965 Returns BuildDataEx or None. Raise exception on database error.
966
967 Can be overridden by child classes to change the default build requirements.
968 """
969 oBuildLogic = BuildLogic(self._oDb);
970 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
971 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
972 for _ in range(oCursor.getRowCount()):
973 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
974 if not oBuildLogic.isBuildBlacklisted(oBuild):
975 return oBuild;
976 return None;
977
978 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
979 """
980 Tries to find a fitting build.
981 Returns BuildDataEx or None. Raise exception on database error.
982
983 Can be overridden by child classes to change the default build requirements.
984 """
985
986 #
987 # Gather the set of prerequisites we have and turn them into a value
988 # set for use in the loop below.
989 #
990 # Note! We're scheduling on testcase level and ignoring argument variation
991 # selections in TestGroupMembers is intentional.
992 #
993 dPreReqs = {};
994
995 # Direct prerequisites. We assume they're all enabled as this can be
996 # checked at queue creation time.
997 for oPreReq in oTestEx.aoTestCasePreReqs:
998 dPreReqs[oPreReq.idTestCase] = 1;
999
1000 # Testgroup dependencies from the scheduling group config.
1001 if oTask.aidTestGroupPreReqs is not None:
1002 for iTestGroup in oTask.aidTestGroupPreReqs:
1003 # Make sure the _active_ test group members are in the cache.
1004 if iTestGroup not in self.dTestGroupMembers:
1005 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
1006 'FROM TestGroupMembers, TestCases\n'
1007 'WHERE TestGroupMembers.idTestGroup = %s\n'
1008 ' AND TestGroupMembers.tsExpire > %s\n'
1009 ' AND TestGroupMembers.tsEffective <= %s\n'
1010 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
1011 ' AND TestCases.tsExpire > %s\n'
1012 ' AND TestCases.tsEffective <= %s\n'
1013 ' AND TestCases.fEnabled is TRUE\n'
1014 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
1015 aidTestCases = [];
1016 for aoRow in self._oDb.fetchAll():
1017 aidTestCases.append(aoRow[0]);
1018 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1019
1020 # Add the testgroup members to the prerequisites.
1021 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1022 dPreReqs[idTestCase] = 1;
1023
1024 # Create a SQL values table out of them.
1025 sPreReqSet = ''
1026 if dPreReqs:
1027 for idPreReq in sorted(dPreReqs):
1028 sPreReqSet += ', (' + str(idPreReq) + ')';
1029 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1030
1031 #
1032 # Try the builds.
1033 #
1034 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1035 for oEntry in self.oBuildCache:
1036 #
1037 # Check build requirements set by the test.
1038 #
1039 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1040 continue;
1041
1042 if oEntry.isBlacklisted(self._oDb):
1043 oEntry.remove();
1044 continue;
1045
1046 #
1047 # Check prerequisites. The default scheduler is satisfied if one
1048 # argument variation has been executed successfully. It is not
1049 # satisfied if there are any failure runs.
1050 #
1051 if sPreReqSet:
1052 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1053 if fDecision is None:
1054 # Check for missing prereqs.
1055 self._oDb.execute('SELECT COUNT(*)\n'
1056 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1057 'LEFT OUTER JOIN (SELECT idTestSet\n'
1058 ' FROM TestSets\n'
1059 ' WHERE enmStatus IN (%s, %s)\n'
1060 ' AND idBuild = %s\n'
1061 ' ) AS TestSets\n'
1062 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1063 'WHERE TestSets.idTestSet is NULL\n'
1064 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1065 oEntry.oBuild.idBuild, ));
1066 cMissingPreReqs = self._oDb.fetchOne()[0];
1067 if cMissingPreReqs > 0:
1068 self.dprint('build %s is missing %u prerequisites (out of %s)'
1069 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1070 oEntry.setPreReqDecision(sPreReqSet, False);
1071 continue;
1072
1073 # Check for failed prereq runs.
1074 self._oDb.execute('SELECT COUNT(*)\n'
1075 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1076 ' TestSets\n'
1077 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1078 ' AND TestSets.idBuild = %s\n'
1079 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1080 , ( oEntry.oBuild.idBuild,
1081 TestSetData.ksTestStatus_Failure,
1082 TestSetData.ksTestStatus_TimedOut,
1083 TestSetData.ksTestStatus_Rebooted,
1084 )
1085 );
1086 cFailedPreReqs = self._oDb.fetchOne()[0];
1087 if cFailedPreReqs > 0:
1088 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1089 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1090 oEntry.setPreReqDecision(sPreReqSet, False);
1091 continue;
1092
1093 oEntry.setPreReqDecision(sPreReqSet, True);
1094 elif not fDecision:
1095 continue;
1096
1097 #
1098 # If we can, check if the build files still exist.
1099 #
1100 if oEntry.oBuild.areFilesStillThere() is False:
1101 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1102 oEntry.remove();
1103 continue;
1104
1105 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1106 return oEntry.oBuild;
1107 return None;
1108
1109 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1110 """
1111 Tries to find a matching build for gang scheduling.
1112 Returns BuildDataEx or None. Raise exception on database error.
1113
1114 Can be overridden by child classes to change the default build requirements.
1115 """
1116 #
1117 # Note! Should probably check build prerequisites if we get a different
1118 # build back, so that we don't use a build which hasn't passed
1119 # the smoke test.
1120 #
1121 _ = idBuildSrc;
1122 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1123
1124
1125 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1126 """
1127 Try schedule the task as a gang leader (can be a gang of one).
1128 Returns response or None. May raise exception on DB error.
1129 """
1130
1131 # We don't wait for busy resources, we just try the next test.
1132 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1133 if not oTestArgsLogic.areResourcesFree(oTestEx):
1134 self.dprint('Cannot get global test resources!');
1135 return None;
1136
1137 #
1138 # Find a matching build (this is the difficult bit).
1139 #
1140 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1141 if oBuild is None:
1142 self.dprint('No build!');
1143 return None;
1144 if oTestEx.oTestCase.needValidationKitBit():
1145 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1146 if oValidationKitBuild is None:
1147 self.dprint('No validation kit build!');
1148 return None;
1149 else:
1150 oValidationKitBuild = None;
1151
1152 #
1153 # Create a testset, allocate the resources and update the state.
1154 # Note! Since resource allocation may still fail, we create a nested
1155 # transaction so we can roll back. (Heed lock warning in docs!)
1156 #
1157 self._oDb.execute('SAVEPOINT tryAsLeader');
1158 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1159
1160 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1161 is not True:
1162 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1163 self.dprint('Failed to allocate global resources!');
1164 return False;
1165
1166 if oTestEx.cGangMembers <= 1:
1167 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1168 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1169 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1170 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1171 else:
1172 # We're missing gang members, issue WAIT cmd.
1173 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1174 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1175 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1176
1177 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1178 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1179 return dResponse;
1180
1181 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1182 """
1183 Try schedule the task as a gang member.
1184 Returns response or None. May raise exception on DB error.
1185 """
1186
1187 #
1188 # The leader has choosen a build, we need to find a matching one for our platform.
1189 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1190 # upon subordinate group members.)
1191 #
1192 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1193
1194 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1195 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1196 if oBuild is None:
1197 return None;
1198
1199 oValidationKitBuild = None;
1200 if oLeaderTestSet.idBuildTestSuite is not None:
1201 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1202 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1203 self._oSchedGrpData.idBuildSrcTestSuite);
1204
1205 #
1206 # Create a testset and update the state(s).
1207 #
1208 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1209
1210 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1211 if oTask.cMissingGangMembers < 1:
1212 # The whole gang is there, move the task to the end of the queue
1213 # and update the status on the other gang members.
1214 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1215 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1216 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1217 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1218 else:
1219 # We're still missing some gang members, issue WAIT cmd.
1220 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1221 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1222 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1223
1224 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1225 return dResponse;
1226
1227
1228 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1229 """
1230 Worker for schduling a new task.
1231 """
1232
1233 #
1234 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1235 # queries), trying out each task to see if the testbox can execute it.
1236 #
1237 dRejected = {}; # variations we've already checked out and rejected.
1238 self._oDb.execute('SELECT *\n'
1239 'FROM SchedQueues\n'
1240 'WHERE idSchedGroup = %s\n'
1241 ' AND ( bmHourlySchedule IS NULL\n'
1242 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1243 'ORDER BY idItem ASC\n'
1244 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1245 aaoRows = self._oDb.fetchAll();
1246 for aoRow in aaoRows:
1247 # Don't loop forever.
1248 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1249 break;
1250
1251 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1252 oTask = SchedQueueData().initFromDbRow(aoRow);
1253 if config.g_kfSrvGlueDebugScheduler:
1254 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1255 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1256 oTask.tsLastScheduled, oTask.tsConfig,));
1257
1258 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1259 if sRejectNm in dRejected:
1260 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1261 continue;
1262 dRejected[sRejectNm] = 1;
1263
1264 # Fetch all the test case info (too much, but who cares right now).
1265 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1266 tsConfigEff = oTask.tsConfig,
1267 tsRsrcEff = oTask.tsConfig);
1268 if config.g_kfSrvGlueDebugScheduler:
1269 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1270
1271 # This shouldn't happen, but just in case it does...
1272 if oTestEx.oTestCase.fEnabled is not True:
1273 self.dprint('Testcase is not enabled!!');
1274 continue;
1275
1276 # Check if the testbox properties matches the test.
1277 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1278 self.dprint('Testbox mismatch!');
1279 continue;
1280
1281 # Try schedule it.
1282 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1283 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1284 elif oTask.cMissingGangMembers > 1:
1285 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1286 else:
1287 dResponse = None; # Shouldn't happen!
1288 if dResponse is not None:
1289 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1290 return dResponse;
1291
1292 # Found no suitable task.
1293 return None;
1294
1295 @staticmethod
1296 def _pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds):
1297 """
1298 Picks the next scheduling group for the given testbox.
1299 """
1300 if len(oTestBoxDataEx.aoInSchedGroups) == 1:
1301 oSchedGroup = oTestBoxDataEx.aoInSchedGroups[0].oSchedGroup;
1302 if oSchedGroup.fEnabled \
1303 and oSchedGroup.idBuildSrc is not None \
1304 and oSchedGroup.idSchedGroup not in dIgnoreSchedGroupIds:
1305 return (oSchedGroup, 0);
1306 iWorkItem = 0;
1307
1308 elif oTestBoxDataEx.aoInSchedGroups:
1309 # Construct priority table of currently enabled scheduling groups.
1310 aaoList1 = [];
1311 for oInGroup in oTestBoxDataEx.aoInSchedGroups:
1312 oSchedGroup = oInGroup.oSchedGroup;
1313 if oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None:
1314 iSchedPriority = oInGroup.iSchedPriority;
1315 if iSchedPriority > 31: # paranoia
1316 iSchedPriority = 31;
1317 elif iSchedPriority < 0: # paranoia
1318 iSchedPriority = 0;
1319
1320 for iSchedPriority in xrange(min(iSchedPriority, len(aaoList1))):
1321 aaoList1[iSchedPriority].append(oSchedGroup);
1322 while len(aaoList1) <= iSchedPriority:
1323 aaoList1.append([oSchedGroup,]);
1324
1325 # Flatten it into a single list, mixing the priorities a little so it doesn't
1326 # take forever before low priority stuff is executed.
1327 aoFlat = [];
1328 iLo = 0;
1329 iHi = len(aaoList1) - 1;
1330 while iHi >= iLo:
1331 aoFlat += aaoList1[iHi];
1332 if iLo < iHi:
1333 aoFlat += aaoList1[iLo];
1334 iLo += 1;
1335 iHi -= 1;
1336
1337 # Pick the next one.
1338 cLeft = len(aoFlat);
1339 while cLeft > 0:
1340 cLeft -= 1;
1341 iWorkItem += 1;
1342 if iWorkItem >= len(aoFlat) or iWorkItem < 0:
1343 iWorkItem = 0;
1344 if aoFlat[iWorkItem].idSchedGroup not in dIgnoreSchedGroupIds:
1345 return (aoFlat[iWorkItem], iWorkItem);
1346 else:
1347 iWorkItem = 0;
1348
1349 # No active group.
1350 return (None, iWorkItem);
1351
1352 @staticmethod
1353 def scheduleNewTask(oDb, oTestBoxData, iWorkItem, sBaseUrl, iVerbosity = 0):
1354 # type: (TMDatabaseConnection, TestBoxData, int, str, int) -> None
1355 """
1356 Schedules a new task for a testbox.
1357 """
1358 oTBStatusLogic = TestBoxStatusLogic(oDb);
1359
1360 try:
1361 #
1362 # To avoid concurrency issues in SchedQueues we lock all the rows
1363 # related to our scheduling queue. Also, since this is a very
1364 # expensive operation we lock the testbox status row to fend of
1365 # repeated retires by faulty testbox scripts.
1366 #
1367 tsSecStart = utils.timestampSecond();
1368 oDb.rollback();
1369 oDb.begin();
1370 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1371 % (oTestBoxData.idTestBox,));
1372 oDb.execute('SELECT SchedQueues.idSchedGroup\n'
1373 ' FROM SchedQueues, TestBoxesInSchedGroups\n'
1374 'WHERE TestBoxesInSchedGroups.idTestBox = %s\n'
1375 ' AND TestBoxesInSchedGroups.tsExpire = \'infinity\'::TIMESTAMP\n'
1376 ' AND TestBoxesInSchedGroups.idSchedGroup = SchedQueues.idSchedGroup\n'
1377 ' FOR UPDATE'
1378 % (oTestBoxData.idTestBox,));
1379
1380 # We need the current timestamp.
1381 tsNow = oDb.getCurrentTimestamp();
1382
1383 # Re-read the testbox data with scheduling group relations.
1384 oTestBoxDataEx = TestBoxDataEx().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1385 if oTestBoxDataEx.fEnabled \
1386 and oTestBoxDataEx.idGenTestBox == oTestBoxData.idGenTestBox:
1387
1388 # We may have to skip scheduling groups that are out of work (e.g. 'No build').
1389 iInitialWorkItem = iWorkItem;
1390 dIgnoreSchedGroupIds = {};
1391 while True:
1392 # Now, pick the scheduling group.
1393 (oSchedGroup, iWorkItem) = SchedulerBase._pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds);
1394 if oSchedGroup is None:
1395 break;
1396 assert oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None;
1397
1398 # Instantiate the specified scheduler and let it do the rest.
1399 oScheduler = SchedulerBase._instantiate(oDb, oSchedGroup, iVerbosity, tsSecStart);
1400 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataEx, tsNow, sBaseUrl);
1401 if dResponse is not None:
1402 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1403 oDb.commit();
1404 return dResponse;
1405
1406 # Check out the next work item?
1407 if oScheduler.getElapsedSecs() > config.g_kcSecMaxNewTask:
1408 break;
1409 dIgnoreSchedGroupIds[oSchedGroup.idSchedGroup] = oSchedGroup;
1410
1411 # No luck, but best if we update the work item if we've made progress.
1412 # Note! In case of a config.g_kcSecMaxNewTask timeout, this may accidentally skip
1413 # a work item with actually work to do. But that's a small price to pay.
1414 if iWorkItem != iInitialWorkItem:
1415 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1416 oDb.commit();
1417 return None;
1418 except:
1419 oDb.rollback();
1420 raise;
1421
1422 # Not enabled, rollback and return no task.
1423 oDb.rollback();
1424 return None;
1425
1426 @staticmethod
1427 def tryCancelGangGathering(oDb, oStatusData):
1428 """
1429 Try canceling a gang gathering.
1430
1431 Returns True if successfully cancelled.
1432 Returns False if not (someone raced us to the SchedQueue table).
1433
1434 Note! oStatusData is re-initialized.
1435 """
1436 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1437 try:
1438 #
1439 # Lock the tables we're updating so we don't run into concurrency
1440 # issues (we're racing both scheduleNewTask and other callers of
1441 # this method).
1442 #
1443 oDb.rollback();
1444 oDb.begin();
1445 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1446
1447 #
1448 # Re-read the testbox data and check that we're still in the same state.
1449 #
1450 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1451 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1452 #
1453 # Get the leader thru the test set and change the state of the whole gang.
1454 #
1455 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1456
1457 oTBStatusLogic = TestBoxStatusLogic(oDb);
1458 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1459 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1460 fCommit = False);
1461
1462 #
1463 # Move the scheduling queue item to the end.
1464 #
1465 oDb.execute('SELECT *\n'
1466 'FROM SchedQueues\n'
1467 'WHERE idTestSetGangLeader = %s\n'
1468 , (oTestSetData.idTestSetGangLeader,) );
1469 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1470 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTask.idGenTestCaseArgs);
1471
1472 oDb.execute('UPDATE SchedQueues\n'
1473 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1474 ' idTestSetGangLeader = NULL,\n'
1475 ' cMissingGangMembers = %s\n'
1476 'WHERE idItem = %s\n'
1477 , (oTestEx.cGangMembers, oTask.idItem,) );
1478
1479 oDb.commit();
1480 return True;
1481
1482 elif oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1483 oDb.rollback();
1484 return True;
1485 except:
1486 oDb.rollback();
1487 raise;
1488
1489 # Not enabled, rollback and return no task.
1490 oDb.rollback();
1491 return False;
1492
1493
1494#
1495# Unit testing.
1496#
1497
1498# pylint: disable=C0111
1499class SchedQueueDataTestCase(ModelDataBaseTestCase):
1500 def setUp(self):
1501 self.aoSamples = [SchedQueueData(),];
1502
1503if __name__ == '__main__':
1504 unittest.main();
1505 # not reached.
1506
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette