wingo - in flumotion/trunk: . flumotion/worker
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Fri Jul 27 11:35:56 CEST 2007
Author: wingo
Date: Fri Jul 27 11:35:47 2007
New Revision: 5364
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/worker/job.py
Log:
2007-07-27 Andy Wingo <wingo at pobox.com>
* flumotion/worker/job.py (CheckJobHeaven.__init__): Keep a pool
of idle jobs for running checks.
(CheckJobHeaven.getCheckJobFromPool): New function, tries to reuse
an idle job instead of starting a new one every time.
(CheckJobHeaven.runCheck.haveJob.haveResult): Don't kill the job
right when it finishes; keep it around for a while in case we want
to run another check on it. Makes the wizard fast again.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Fri Jul 27 11:35:47 2007
@@ -1,3 +1,13 @@
+2007-07-27 Andy Wingo <wingo at pobox.com>
+
+ * flumotion/worker/job.py (CheckJobHeaven.__init__): Keep a pool
+ of idle jobs for running checks.
+ (CheckJobHeaven.getCheckJobFromPool): New function, tries to reuse
+ an idle job instead of starting a new one every time.
+ (CheckJobHeaven.runCheck.haveJob.haveResult): Don't kill the job
+ right when it finishes; keep it around for a while in case we want
+ to run another check on it. Makes the wizard fast again.
+
2007-07-27 Michael Smith <msmith at fluendo.com>
* flumotion/extern/log/log.py:
Modified: flumotion/trunk/flumotion/worker/job.py
==============================================================================
--- flumotion/trunk/flumotion/worker/job.py (original)
+++ flumotion/trunk/flumotion/worker/job.py Fri Jul 27 11:35:47 2007
@@ -253,10 +253,25 @@
_checkCount = 0
_timeout = 45
- def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
- avatarId = '%s-%d' % (methodName, self._checkCount)
+ def __init__(self, brain):
+ base.BaseJobHeaven.__init__(self, brain)
+
+ # job processes that are available to do work (i.e. not actively
+ # running checks)
+ self.jobPool = []
+
+ def getCheckJobFromPool(self):
+ if self.jobPool:
+ job, expireDC = self.jobPool.pop(0)
+ expireDC.cancel()
+ self.debug('running check in already-running job %s',
+ job.avatarId)
+ return defer.succeed(job)
+
+ avatarId = 'check-%d' % (self._checkCount,)
self._checkCount += 1
+ self.debug('spawning new job %s to run a check', avatarId)
d = self._startSet.createStart(avatarId)
p = base.JobProcessProtocol(self, avatarId, self._startSet)
@@ -271,50 +286,62 @@
childFDs=childFDs)
p.setPid(process.pid)
+ jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
+ None, [])
+ self._jobInfos[process.pid] = jobInfo
def haveMind(_):
- # we have a mind, in theory
- job = self.avatars[avatarId]
- return job.mindCallRemote('bootstrap', self.getWorkerName(),
- None, None, None, None, bundles)
-
- def callProc(_):
- job = self.avatars[avatarId]
- return job.mindCallRemote('runFunction', moduleName,
- methodName, *args, **kwargs)
-
- def timeout(sig):
- self.killJobByPid(process.pid, sig)
-
- def haveResult(res):
- if not termtimeout.active():
- self.info("Discarding error %s", res)
- res = messages.Result()
- res.add(messages.Error(T_(N_("Check timed out.")),
- debug=("Timed out running %s."%methodName)))
- else:
- self.killJobByPid(process.pid, signal.SIGTERM)
- # so, we're just assuming that this process goes away...
- # fixme?
-
- if termtimeout.active():
- termtimeout.cancel()
- if killtimeout.active():
- killtimeout.cancel()
- return res
-
- # add callbacks and errbacks that kill the job
-
- termtimeout = reactor.callLater(self._timeout, timeout,
- signal.SIGTERM)
- killtimeout = reactor.callLater(self._timeout, timeout,
- signal.SIGKILL)
+ # we have a mind, in theory; return the job avatar
+ return self.avatars[avatarId]
d.addCallback(haveMind)
- d.addCallback(callProc)
- d.addCallbacks(haveResult, haveResult)
+ return d
+
+ def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
+ def haveJob(job):
+ def callProc(_):
+ return job.mindCallRemote('runFunction', moduleName,
+ methodName, *args, **kwargs)
+
+ def timeout(sig):
+ self.killJobByPid(process.pid, sig)
+
+ def haveResult(res):
+ if not termtimeout.active():
+ self.info("Discarding error %s", res)
+ res = messages.Result()
+ res.add(messages.Error(T_(N_("Check timed out.")),
+ debug=("Timed out running %s."%methodName)))
+ else:
+ def expire():
+ if (job, expireDC) in self.jobPool:
+ self.debug('stopping idle check job process %s',
+ job.avatarId)
+ self.jobPool.remove((job, expireDC))
+ job.mindCallRemote('stop')
+ expireDC = reactor.callLater(self._timeout, expire)
+ self.jobPool.append((job, expireDC))
+
+ if termtimeout.active():
+ termtimeout.cancel()
+ if killtimeout.active():
+ killtimeout.cancel()
+ return res
+
+ # add callbacks and errbacks that kill the job
+
+ termtimeout = reactor.callLater(self._timeout, timeout,
+ signal.SIGTERM)
+ killtimeout = reactor.callLater(self._timeout, timeout,
+ signal.SIGKILL)
+
+ d = job.mindCallRemote('bootstrap', self.getWorkerName(),
+ None, None, None, None, bundles)
+ d.addCallback(callProc)
+ d.addCallbacks(haveResult, haveResult)
+ return d
+
+ d = self.getCheckJobFromPool()
+ d.addCallback(haveJob)
- jobInfo = base.JobInfo(process.pid, avatarId, type, moduleName,
- methodName, None, bundles)
- self._jobInfos[process.pid] = jobInfo
return d
More information about the flumotion-commit
mailing list