msmith - in flumotion/trunk: . flumotion/job flumotion/worker

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Tue Jun 19 18:20:03 CEST 2007


Author: msmith
Date: Tue Jun 19 18:19:58 2007
New Revision: 5207

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/job/job.py
   flumotion/trunk/flumotion/worker/job.py
   flumotion/trunk/flumotion/worker/worker.py
Log:
        * flumotion/job/job.py:
        * flumotion/worker/job.py:
        * flumotion/worker/worker.py:
          Make the worker tracks jobs by PID, not avatarId, because avatarId
          isn't unique when restarting components.
          Fixes #683.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Tue Jun 19 18:19:58 2007
@@ -1,3 +1,12 @@
+2007-06-19  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/job/job.py:
+	* flumotion/worker/job.py:
+	* flumotion/worker/worker.py:
+	  Make the worker tracks jobs by PID, not avatarId, because avatarId
+	  isn't unique when restarting components.
+	  Fixes #683.
+
 2007-06-19  Andy Wingo  <wingo at pobox.com>
 
 	* configure.ac: 

Modified: flumotion/trunk/flumotion/job/job.py
==============================================================================
--- flumotion/trunk/flumotion/job/job.py	(original)
+++ flumotion/trunk/flumotion/job/job.py	Tue Jun 19 18:19:58 2007
@@ -120,6 +120,9 @@
             self.log('... from path %s' % path)
             packager.registerPackagePath(path, name)
 
+    def remote_getPid(self):
+        return os.getpid()
+
     def remote_create(self, avatarId, type, moduleName, methodName, nice=0):
         """
         I am called on by the worker's JobAvatar to create a component.

Modified: flumotion/trunk/flumotion/worker/job.py
==============================================================================
--- flumotion/trunk/flumotion/worker/job.py	(original)
+++ flumotion/trunk/flumotion/worker/job.py	Tue Jun 19 18:19:58 2007
@@ -105,7 +105,7 @@
         # otherwise the manager still thinks it's starting up when it's
         # dead.  If the job already attached to the worker however,
         # the create deferred will already have callbacked.
-        if dstarts.createRegistered(self.avatarId):
+        if dstarts.createRegistered(self.pid):
             if signum:
                 reason = "received signal %d" % signum
             else:
@@ -113,13 +113,13 @@
             text = "Component '%s' has exited early (%s).  " \
                    "This is sometimes triggered by a corrupt " \
                    "GStreamer registry." % (self.avatarId, reason)
-            dstarts.createFailed(self.avatarId, 
+            dstarts.createFailed(self.pid, 
                                  errors.ComponentCreateError(text))
 
-        if dstarts.shutdownRegistered(self.avatarId):
-            dstarts.shutdownTrigger(self.avatarId)
+        if dstarts.shutdownRegistered(self.pid):
+            dstarts.shutdownTrigger(self.pid)
 
-        heaven.jobStopped(self.avatarId)
+        heaven.jobStopped(self.pid)
 
         # chain up
         worker.ProcessProtocol.processEnded(self, status)
@@ -129,12 +129,12 @@
         self._getAvatars = getAvatars # function of no arguments,
                                       # returns {avatarId=>avatar}
 
-        self._createDeferreds = {} # avatarId => deferred that will fire
+        self._createDeferreds = {} # processId => deferred that will fire
                                    # when the job attaches
-        self._shutdownDeferreds = {} # avatarId => deferred for shutting
+        self._shutdownDeferreds = {} # processId => deferred for shutting
                                    # down jobs; fires when job is reaped
 
-    def create(self, avatarId):
+    def create(self, avatarId, processId):
         """
         Create and register a deferred for creating the given component.
         This deferred will be fired when the JobAvatar has instructed the
@@ -150,18 +150,18 @@
         # three places: the create deferreds hash, the avatar list in
         # the jobheaven, and the shutdown deferreds hash. there are four
         # possible answers:
-        if avatarId in self._createDeferreds:
+        if processId in self._createDeferreds:
             # (1) a job is already starting: it is in the
             # createdeferreds hash
-            self.info('already have a create deferred for %s', avatarId)
+            self.info('already have a create deferred for %d', processId)
             raise errors.ComponentAlreadyStartingError(avatarId)
-        elif avatarId in self._shutdownDeferreds:
+        elif processId in self._shutdownDeferreds:
             # (2) a job is shutting down; note it is also in
             # heaven.avatars
             self.debug('waiting for previous %s to shut down like it '
                        'said it would', avatarId)
             def ensureShutdown(res,
-                               shutdown=self._shutdownDeferreds[avatarId]):
+                               shutdown=self._shutdownDeferreds[processId]):
                 shutdown.addCallback(lambda _: res)
                 return shutdown
             d.addCallback(ensureShutdown)
@@ -174,45 +174,43 @@
             pass
 
         self.debug('registering deferredCreate for %s', avatarId)
-        self._createDeferreds[avatarId] = d
+        self._createDeferreds[processId] = d
         return d
 
-    def createTrigger(self, avatarId):
+    def createTrigger(self, avatarId, processId):
         """
         Trigger a previously registered deferred for creating up the given
         component.
         """
         self.debug('triggering create deferred for %s', avatarId)
-        if not avatarId in self._createDeferreds:
-            self.warning('No create deferred registered for %s', avatarId)
+        if not processId in self._createDeferreds:
+            self.warning('No create deferred registered for %d', processId)
             return
 
-        d = self._createDeferreds[avatarId]
-        del self._createDeferreds[avatarId]
+        d = self._createDeferreds.pop(processId)
         # return the avatarId the component will use to the original caller
         d.callback(avatarId)
  
-    def createFailed(self, avatarId, exception):
+    def createFailed(self, processId, exception):
         """
         Notify the caller that a create has failed, and remove the create
         from the list of pending creates.
         """
-        self.debug('create deferred failed for %s', avatarId)
-        if not avatarId in self._createDeferreds:
-            self.warning('No create deferred registered for %s', avatarId)
+        self.debug('create deferred failed for %d', processId)
+        if not processId in self._createDeferreds:
+            self.warning('No create deferred registered for %d', processId)
             return
 
-        d = self._createDeferreds[avatarId]
-        del self._createDeferreds[avatarId]
+        d = self._createDeferreds.pop(processId)
         d.errback(exception)
 
-    def createRegistered(self, avatarId):
+    def createRegistered(self, processId):
         """
-        Check if a deferred create has been registered for the given avatarId.
+        Check if a deferred create has been registered for the given processId
         """
-        return avatarId in self._createDeferreds
+        return processId in self._createDeferreds
 
-    def shutdown(self, avatarId):
+    def shutdown(self, processId):
         """
         Create and register a deferred for notifying the worker of a
         clean job shutdown. This deferred will be fired when the job is
@@ -220,36 +218,35 @@
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
-        self.debug('making shutdown deferred for %s', avatarId)
+        self.debug('making shutdown deferred for %d', processId)
 
-        if avatarId in self._shutdownDeferreds:
-            self.warning('already have a shutdown deferred for %s',
-                         avatarId)
-            return self._shutdownDeferreds[avatarId]
+        if processId in self._shutdownDeferreds:
+            self.warning('already have a shutdown deferred for %d',
+                         processId)
+            return self._shutdownDeferreds[processId]
         else:
-            self.debug('registering shutdown for %s', avatarId)
+            self.debug('registering shutdown for %d', processId)
             d = defer.Deferred()
-            self._shutdownDeferreds[avatarId] = d
+            self._shutdownDeferreds[processId] = d
             return d
 
-    def shutdownTrigger(self, avatarId):
+    def shutdownTrigger(self, processId):
         """
-        Trigger a previously registered deferred for creating up the given
-        component.
+        Trigger a previously registered deferred for shutting down a process.
         """
-        self.debug('triggering shutdown deferred for %s', avatarId)
-        if not avatarId in self._shutdownDeferreds:
-            self.warning('No shutdown deferred registered for %s', avatarId)
+        self.debug('triggering shutdown deferred for %d', processId)
+        if not processId in self._shutdownDeferreds:
+            self.warning('No shutdown deferred registered for %d', processId)
             return
 
-        d = self._shutdownDeferreds.pop(avatarId)
-        d.callback(avatarId)
+        d = self._shutdownDeferreds.pop(processId)
+        d.callback(processId)
 
-    def shutdownRegistered(self, avatarId):
+    def shutdownRegistered(self, processId):
         """
-        Check if a deferred shutdown has been registered for the given avatarId.
+        Check if a deferred shutdown has been registered for the given processId
         """
-        return avatarId in self._shutdownDeferreds
+        return processId in self._shutdownDeferreds
 
 class JobHeaven(pb.Root, log.Loggable):
     """
@@ -279,7 +276,7 @@
         self._onShutdown = None # If set, a deferred to fire when our last child
                                 # process exits
 
-        self._jobInfos = {} # avatarId -> JobInfo
+        self._jobInfos = {} # processid -> JobInfo
 
         self._deferredStarts = DeferredStartSet(lambda: self.avatars)
 
@@ -311,6 +308,13 @@
         else:
             raise NotImplementedError("no interface")
 
+    def removeAvatar(self, avatarId):
+        if avatarId in self.avatars:
+            del self.avatars[avatarId]
+        else:
+            self.warning("some programmer is telling me about an avatar "
+                         "I have no idea about: %s", avatarId)
+
     def getManagerConnectionInfo(self):
         """
         Gets the L{flumotion.common.connection.PBConnectionInfo}
@@ -351,9 +355,6 @@
                            component
         @type  bundles:    list of (str, str)
         """
-        # can raise ComponentAlreadyStartingError
-        d = self._deferredStarts.create(avatarId)
-
         p = JobProcessProtocol(self, avatarId, self._deferredStarts)
         executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job')
         if not os.path.exists(executable):
@@ -389,17 +390,18 @@
 
         jobInfo = JobInfo(process.pid, avatarId, type, moduleName,
                           methodName, nice, bundles)
-        self._jobInfos[avatarId] = jobInfo
+        self._jobInfos[process.pid] = jobInfo
 
+        d = self._deferredStarts.create(avatarId, process.pid)
         return d
 
-    def getJobInfo(self, avatarId):
-        return self._jobInfos[avatarId]
-    
+    def getJobInfo(self, processId):
+        return self._jobInfos[processId]
+
     def getJobInfos(self):
         return self._jobInfos.values()
-
-    def getJobAvatarIds(self):
+    
+    def getJobPids(self):
         return self._jobInfos.keys()
 
     def rotateChildLogFDs(self):
@@ -407,23 +409,17 @@
         for avatar in self.avatars.values():
             avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
 
-    def jobStopped(self, avatarId):
-        if avatarId in self.avatars:
-            del self.avatars[avatarId]
-        else:
-            self.warning("some programmer is telling me about an avatar "
-                         "I have no idea about: %s", avatarId)
-
-        if avatarId in self._jobInfos:
-            self.debug('Removing job info for %s', avatarId)
-            del self._jobInfos[avatarId]
+    def jobStopped(self, pid):
+        if pid in self._jobInfos:
+            self.debug('Removing job info for %d', pid)
+            del self._jobInfos[pid]
 
             if not self._jobInfos and self._onShutdown:
                 self.debug("Last child exited")
                 self._onShutdown.callback(None)
         else:
-            self.warning("some programmer is telling me about an avatar "
-                         "I have no idea about: %s", avatarId)
+            self.warning("some programmer is telling me about a pid "
+                         "I have no idea about: %d", pid)
 
     def shutdown(self):
         self.debug('Shutting down JobHeaven')
@@ -460,17 +456,23 @@
 
     def kill(self, signum=signal.SIGKILL):
         self.warning("Killing all children immediately")
-        for avatarId in self.getJobAvatarIds():
-            self.killJob(avatarId, signum)
+        for pid in self.getJobPids():
+            self.killJobByPid(pid, signum)
 
-    def killJob(self, avatarId, signum):
-        if avatarId not in self._jobInfos:
-            raise errors.UnknownComponentError(avatarId)
-        jobInfo = self._jobInfos[avatarId]
+    def killJobByPid(self, pid, signum):
+        if pid not in self._jobInfos:
+            raise errors.UnknownComponentError(pid)
+
+        jobInfo = self._jobInfos[pid]
         self.debug("Sending signal %d to job %s at pid %d", signum,
-                   avatarId, jobInfo.pid)
+                   jobInfo.avatarId, jobInfo.pid)
         common.signalPid(jobInfo.pid, signum)
 
+    def killJob(self, avatarId, signum):
+        for job in self.jobInfos.values():
+            if job.avatarId == avatarId:
+                self.killJobByPid(self, job.pid, signum)
+
 class JobAvatar(fpb.Avatar, log.Loggable):
     """
     I am an avatar for the job living in the worker.
@@ -485,6 +487,7 @@
         fpb.Avatar.__init__(self, avatarId)
         self._heaven = heaven
         self.setMind(mind)
+        self.pid = None
             
     def setMind(self, mind):
         """
@@ -503,13 +506,13 @@
                                        job.moduleName, job.methodName,
                                        job.nice)
 
-        def success(_, avatarId):
+        def success(_, avatarId, pid):
             self.debug('job started component with avatarId %s',
                        avatarId)
             # FIXME: drills down too much?
-            self._heaven._deferredStarts.createTrigger(avatarId)
+            self._heaven._deferredStarts.createTrigger(avatarId, pid)
 
-        def error(failure, job):
+        def error(failure, job, pid):
             msg = log.getFailureMessage(failure)
             if failure.check(errors.ComponentCreateError):
                 self.warning('could not create component %s of type %s:'
@@ -518,28 +521,33 @@
                 self.warning('unhandled error creating component %s: %s',
                              job.avatarId, msg)
             # FIXME: drills down too much?
-            self._heaven._deferredStarts.createFailed(job.avatarId,
+            self._heaven._deferredStarts.createFailed(pid,
                                                       failure)
-            
-        info = self._heaven.getManagerConnectionInfo()
-        if info.use_ssl:
-            transport = 'ssl'
-        else:
-            transport = 'tcp'
-        workerName = self._heaven.getWorkerName()
-        job = self._heaven.getJobInfo(self.avatarId)
-
-        d = bootstrap(workerName, info.host, info.port, transport,
-                      info.authenticator, job.bundles)
-        d.addCallback(create, job)
-        d.addCallback(success, job.avatarId)
-        d.addErrback(error, job)
+
+        def gotPid(pid):
+            self.pid = pid
+            info = self._heaven.getManagerConnectionInfo()
+            if info.use_ssl:
+                transport = 'ssl'
+            else:
+                transport = 'tcp'
+            job = self._heaven.getJobInfo(pid)
+            workerName = self._heaven.getWorkerName()
+
+            d = bootstrap(workerName, info.host, info.port, transport,
+                          info.authenticator, job.bundles)
+            d.addCallback(create, job)
+            d.addCallback(success, job.avatarId, pid)
+            d.addErrback(error, job, pid)
+            return d
+        d = self.mindCallRemote("getPid")
+        d.addCallback(gotPid)
         return d
 
     def logout(self):
         self.log('logout called, %s disconnected', self.avatarId)
-        # don't need to do anything much here, we get notified via
-        # sigchld
+
+        self._heaven.removeAvatar(self.avatarId)
         
     def stop(self):
         """
@@ -621,4 +629,4 @@
         """
         self.info("component %s shutting down cleanly", self.avatarId)
         # FIXME: drills down too much?
-        self._heaven._deferredStarts.shutdown(self.avatarId)
+        self._heaven._deferredStarts.shutdown(self.pid)

Modified: flumotion/trunk/flumotion/worker/worker.py
==============================================================================
--- flumotion/trunk/flumotion/worker/worker.py	(original)
+++ flumotion/trunk/flumotion/worker/worker.py	Tue Jun 19 18:19:58 2007
@@ -280,7 +280,8 @@
         __import__(moduleName) 
 
     def getComponents(self):
-        return self.jobHeaven.getJobAvatarIds()
+        return [job.avatarId for job in self.jobHeaven.getJobInfos()]
 
     def killJob(self, avatarId, signum):
         self.jobHeaven.killJob(avatarId, signum)
+


More information about the flumotion-commit mailing list