wingo - in flumotion/trunk: . flumotion/worker

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Fri Jul 27 11:35:56 CEST 2007


Author: wingo
Date: Fri Jul 27 11:35:47 2007
New Revision: 5364

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/worker/job.py
Log:
2007-07-27  Andy Wingo  <wingo at pobox.com>

	* flumotion/worker/job.py (CheckJobHeaven.__init__): Keep a pool
	of idle jobs for running checks.
	(CheckJobHeaven.getCheckJobFromPool): New function, tries to reuse
	an idle job instead of starting a new one every time.
	(CheckJobHeaven.runCheck.haveJob.haveResult): Don't kill the job
	right when it finishes; keep it around for a while in case we want
	to run another check on it. Makes the wizard fast again.


Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Fri Jul 27 11:35:47 2007
@@ -1,3 +1,13 @@
+2007-07-27  Andy Wingo  <wingo at pobox.com>
+
+	* flumotion/worker/job.py (CheckJobHeaven.__init__): Keep a pool
+	of idle jobs for running checks.
+	(CheckJobHeaven.getCheckJobFromPool): New function, tries to reuse
+	an idle job instead of starting a new one every time.
+	(CheckJobHeaven.runCheck.haveJob.haveResult): Don't kill the job
+	right when it finishes; keep it around for a while in case we want
+	to run another check on it. Makes the wizard fast again.
+
 2007-07-27  Michael Smith <msmith at fluendo.com>
 
 	* flumotion/extern/log/log.py:

Modified: flumotion/trunk/flumotion/worker/job.py
==============================================================================
--- flumotion/trunk/flumotion/worker/job.py	(original)
+++ flumotion/trunk/flumotion/worker/job.py	Fri Jul 27 11:35:47 2007
@@ -253,10 +253,25 @@
     _checkCount = 0
     _timeout = 45
 
-    def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
-        avatarId = '%s-%d' % (methodName, self._checkCount)
+    def __init__(self, brain):
+        base.BaseJobHeaven.__init__(self, brain)
+
+        # job processes that are available to do work (i.e. not actively
+        # running checks)
+        self.jobPool = []
+
+    def getCheckJobFromPool(self):
+        if self.jobPool:
+            job, expireDC = self.jobPool.pop(0)
+            expireDC.cancel()
+            self.debug('running check in already-running job %s',
+                       job.avatarId)
+            return defer.succeed(job)
+
+        avatarId = 'check-%d' % (self._checkCount,)
         self._checkCount += 1
 
+        self.debug('spawning new job %s to run a check', avatarId)
         d = self._startSet.createStart(avatarId)
 
         p = base.JobProcessProtocol(self, avatarId, self._startSet)
@@ -271,50 +286,62 @@
                                        childFDs=childFDs)
 
         p.setPid(process.pid)
+        jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
+                               None, [])
+        self._jobInfos[process.pid] = jobInfo
 
         def haveMind(_):
-            # we have a mind, in theory
-            job = self.avatars[avatarId]
-            return job.mindCallRemote('bootstrap', self.getWorkerName(),
-                                      None, None, None, None, bundles)
-
-        def callProc(_):
-            job = self.avatars[avatarId]
-            return job.mindCallRemote('runFunction', moduleName,
-                                      methodName, *args, **kwargs)
-
-        def timeout(sig):
-            self.killJobByPid(process.pid, sig)
-
-        def haveResult(res):
-            if not termtimeout.active():
-                self.info("Discarding error %s", res)
-                res = messages.Result()
-                res.add(messages.Error(T_(N_("Check timed out.")),
-                                       debug=("Timed out running %s."%methodName)))
-            else:
-                self.killJobByPid(process.pid, signal.SIGTERM)
-                # so, we're just assuming that this process goes away...
-                # fixme?
-
-            if termtimeout.active():
-                termtimeout.cancel()
-            if killtimeout.active():
-                killtimeout.cancel()
-            return res
-
-        # add callbacks and errbacks that kill the job
-
-        termtimeout = reactor.callLater(self._timeout, timeout,
-                                        signal.SIGTERM)
-        killtimeout = reactor.callLater(self._timeout, timeout,
-                                        signal.SIGKILL)
+            # we have a mind, in theory; return the job avatar
+            return self.avatars[avatarId]
 
         d.addCallback(haveMind)
-        d.addCallback(callProc)
-        d.addCallbacks(haveResult, haveResult)
+        return d
+
+    def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
+        def haveJob(job):
+            def callProc(_):
+                return job.mindCallRemote('runFunction', moduleName,
+                                          methodName, *args, **kwargs)
+
+            def timeout(sig):
+                self.killJobByPid(process.pid, sig)
+
+            def haveResult(res):
+                if not termtimeout.active():
+                    self.info("Discarding error %s", res)
+                    res = messages.Result()
+                    res.add(messages.Error(T_(N_("Check timed out.")),
+                                           debug=("Timed out running %s."%methodName)))
+                else:
+                    def expire():
+                        if (job, expireDC) in self.jobPool:
+                            self.debug('stopping idle check job process %s', 
+                                       job.avatarId)
+                            self.jobPool.remove((job, expireDC))
+                            job.mindCallRemote('stop')
+                    expireDC = reactor.callLater(self._timeout, expire)
+                    self.jobPool.append((job, expireDC))
+
+                if termtimeout.active():
+                    termtimeout.cancel()
+                if killtimeout.active():
+                    killtimeout.cancel()
+                return res
+
+            # add callbacks and errbacks that kill the job
+
+            termtimeout = reactor.callLater(self._timeout, timeout,
+                                            signal.SIGTERM)
+            killtimeout = reactor.callLater(self._timeout, timeout,
+                                            signal.SIGKILL)
+
+            d = job.mindCallRemote('bootstrap', self.getWorkerName(),
+                                   None, None, None, None, bundles)
+            d.addCallback(callProc)
+            d.addCallbacks(haveResult, haveResult)
+            return d
+
+        d = self.getCheckJobFromPool()
+        d.addCallback(haveJob)
 
-        jobInfo = base.JobInfo(process.pid, avatarId, type, moduleName,
-                               methodName, None, bundles)
-        self._jobInfos[process.pid] = jobInfo
         return d


More information about the flumotion-commit mailing list