wingo - in flumotion/trunk: . flumotion/common

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Wed May 2 13:57:14 CEST 2007


Author: wingo
Date: Wed May  2 13:57:10 2007
New Revision: 4866

Added:
   flumotion/trunk/flumotion/common/startset.py
      - copied, changed from r4858, flumotion/trunk/flumotion/worker/job.py
Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/common/Makefile.am
Log:
2007-05-02  Andy Wingo  <wingo at pobox.com>

	* flumotion/common/Makefile.am (flumotion_PYTHON): 
	* flumotion/common/startset.py (StartSet): New data structure,
	factored out of flumotion.worker.job.DeferredStartSet. Still need
	to port the jobheaven to use this though.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Wed May  2 13:57:10 2007
@@ -1,3 +1,10 @@
+2007-05-02  Andy Wingo  <wingo at pobox.com>
+
+	* flumotion/common/Makefile.am (flumotion_PYTHON): 
+	* flumotion/common/startset.py (StartSet): New data structure,
+	factored out of flumotion.worker.job.DeferredStartSet. Still need
+	to port the jobheaven to use this though.
+
 2007-04-30  Andy Wingo  <wingo at pobox.com>
 
 	* flumotion/twisted/pb.py

Modified: flumotion/trunk/flumotion/common/Makefile.am
==============================================================================
--- flumotion/trunk/flumotion/common/Makefile.am	(original)
+++ flumotion/trunk/flumotion/common/Makefile.am	Wed May  2 13:57:10 2007
@@ -33,6 +33,7 @@
 	reload.py \
 	server.py \
 	setup.py \
+	startset.py \
 	watched.py \
 	worker.py
 

Copied: flumotion/trunk/flumotion/common/startset.py (from r4858, flumotion/trunk/flumotion/worker/job.py)
==============================================================================
--- flumotion/trunk/flumotion/worker/job.py	(original)
+++ flumotion/trunk/flumotion/common/startset.py	Wed May  2 13:57:10 2007
@@ -1,4 +1,4 @@
-# -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*-
+# -*- Mode: Python -*-
 # vi:si:et:sw=4:sts=4:ts=4
 #
 # Flumotion - a streaming media server
@@ -20,123 +20,54 @@
 # Headers in this file shall remain intact.
 
 """
-worker-side objects to handle worker clients
+a data structure to manage asynchronous avatar starts and shutdowns
 """
 
-import os
-import sys
+from twisted.internet import defer
 
-from twisted.cred import portal
-from twisted.internet import defer, reactor
-from twisted.spread import pb
-
-from flumotion.common import errors, log
-from flumotion.common import common, worker
-from flumotion.twisted import checkers, fdserver, compat
-from flumotion.twisted import pb as fpb
-
-JOB_SHUTDOWN_TIMEOUT = 5
-
-def _getSocketPath():
-    # FIXME: there is mkstemp for sockets, so we have a small window
-    # here in which the socket could be created by something else
-    # I didn't succeed in preparing a socket file with that name either
-
-    # caller needs to delete name before using
-    import tempfile
-    fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.')
-    os.close(fd)
-    
-    return name
-
-class JobInfo(object):
-    """
-    I hold information about a job.
-
-    @cvar  pid:        PID of the child process
-    @type  pid:        int
-    @cvar  avatarId:   avatar identification string
-    @type  avatarId:   str
-    @cvar  type:       type of the component to create
-    @type  type:       str
-    @cvar  moduleName: name of the module to create the component from
-    @type  moduleName: str
-    @cvar  methodName: the factory method to use to create the component
-    @type  methodName: str
-    @cvar  nice:       the nice level to run the job as
-    @type  nice:       int
-    @cvar  bundles:    ordered list of (bundleName, bundlePath) needed to
-                       create the component
-    @type  bundles:    list of (str, str)
-    """
-    __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName',
-                 'nice', 'bundles')
-    def __init__(self, pid, avatarId, type, moduleName, methodName, nice,
-                 bundles):
-        self.pid = pid 
-        self.avatarId = avatarId
-        self.type = type
-        self.moduleName = moduleName
-        self.methodName = methodName
-        self.nice = nice
-        self.bundles = bundles
-
-class JobProcessProtocol(worker.ProcessProtocol):
-    def __init__(self, heaven, avatarId, deferredStarts):
-        self._deferredStarts = deferredStarts
-        worker.ProcessProtocol.__init__(self, heaven, avatarId,
-                                        'component',
-                                        heaven.brain.workerName)
-
-    def sendMessage(self, message):
-        heaven = self.loggable
-        heaven.brain.callRemote('componentAddMessage', self.avatarId,
-                                message)
-
-    def processEnded(self, status):
-        heaven = self.loggable
-        dstarts = self._deferredStarts
-        signum = status.value.signal
-
-        # we need to trigger a failure on the create deferred 
-        # if the job failed before logging in to the worker;
-        # otherwise the manager still thinks it's starting up when it's
-        # dead.  If the job already attached to the worker however,
-        # the create deferred will already have callbacked.
-        if dstarts.createRegistered(self.avatarId):
-            if signum:
-                reason = "received signal %d" % signum
-            else:
-                reason = "unknown reason"
-            text = "Component '%s' has exited early (%s).  " \
-                   "This is sometimes triggered by a corrupt " \
-                   "GStreamer registry." % (self.avatarId, reason)
-            dstarts.createFailed(self.avatarId, 
-                                 errors.ComponentCreateError(text))
-
-        if dstarts.shutdownRegistered(self.avatarId):
-            dstarts.shutdownTrigger(self.avatarId)
-
-        heaven.jobStopped(self.avatarId)
-
-        # chain up
-        worker.ProcessProtocol.processEnded(self, status)
-        
-class DeferredStartSet(log.Loggable):
-    def __init__(self, getAvatars):
-        self._getAvatars = getAvatars # function of no arguments,
-                                      # returns {avatarId=>avatar}
-
-        self._createDeferreds = {} # avatarId => deferred that will fire
-                                   # when the job attaches
-        self._shutdownDeferreds = {} # avatarId => deferred for shutting
-                                   # down jobs; fires when job is reaped
-
-    def create(self, avatarId):
-        """
-        Create and register a deferred for creating the given component.
-        This deferred will be fired when the JobAvatar has instructed the
-        job to create the component.
+from flumotion.common import log
+
+
+# This class was factored out of the worker's jobheaven, so sometimes
+# the comments talk about jobs, but they refer to any asynchronous
+# process. For example the multiadmin uses this to manage its
+# connections to remote managers.
+
+class StartSet(log.Loggable):
+    def __init__(self, avatarLoggedIn, alreadyStartingError,
+                 alreadyRunningError):
+        """Create a StartSet, a data structure for managing starts and
+        stops of remote processes, for example jobs in a jobheaven.
+
+        @param avatarLoggedIn: a procedure of type avatarId->boolean;
+        should return True if the avatarId is logged in and "ready", and
+        False otherwise. An avatarId is ready if avatarStarted() could
+        have been called on it. This interface is made this way because
+        it is assumed that whatever code instantiates a StartSet keeps
+        track of "ready" remote processes, and this way we prevent data
+        duplication.
+        @param alreadyStartingError: An exception class to raise if
+        createStart() is called, but there is already a create deferred
+        registered for that avatarId.
+        @param alreadyRunningError: An exception class to raise if
+        createStart() is called, but there is already a "ready" process
+        with that avatarId.
+        """
+        self._avatarLoggedIn = avatarLoggedIn
+        self._alreadyStartingError = alreadyStartingError
+        self._alreadyRunningError = alreadyRunningError
+
+        self._createDeferreds = {} # avatarId => deferred
+        self._shutdownDeferreds = {} # avatarId => deferred
+
+    def createStart(self, avatarId):
+        """
+        Create and register a deferred for starting a given process.
+        The deferred will be fired when the process is ready, as
+        triggered by a call to createSuccess().
+
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
@@ -145,28 +76,29 @@
         d = defer.Deferred()
 
         # the question of "what jobs do we know about" is answered in
-        # three places: the create deferreds hash, the avatar list in
-        # the jobheaven, and the shutdown deferreds hash. there are four
+        # three places: the create deferreds hash, the set of logged in
+        # avatars, and the shutdown deferreds hash. there are four
         # possible answers:
         if avatarId in self._createDeferreds:
             # (1) a job is already starting: it is in the
             # createdeferreds hash
             self.info('already have a create deferred for %s', avatarId)
-            raise errors.ComponentAlreadyStartingError(avatarId)
+            raise self._alreadyStartingError(avatarId)
         elif avatarId in self._shutdownDeferreds:
             # (2) a job is shutting down; note it is also in
             # heaven.avatars
             self.debug('waiting for previous %s to shut down like it '
                        'said it would', avatarId)
+            # fixme: i don't understand this code
             def ensureShutdown(res,
                                shutdown=self._shutdownDeferreds[avatarId]):
                 shutdown.addCallback(lambda _: res)
                 return shutdown
             d.addCallback(ensureShutdown)
-        elif avatarId in self._getAvatars():
+        elif self._avatarLoggedIn(avatarId):
             # (3) a job is running fine
             self.info('avatar named %s already running', avatarId)
-            raise errors.ComponentAlreadyRunningError(avatarId)
+            raise self._alreadyRunningError(avatarId)
         else:
             # (4) it's new; we know of nothing with this avatarId
             pass
@@ -175,10 +107,14 @@
         self._createDeferreds[avatarId] = d
         return d
 
-    def createTrigger(self, avatarId):
+    def createSuccess(self, avatarId):
         """
-        Trigger a previously registered deferred for creating up the given
-        component.
+        Trigger a deferred start previously registerd via createStart().
+        For example, a JobHeaven might call this method when a job has
+        logged in and been told to start a component.
+
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
         """
         self.debug('triggering create deferred for %s', avatarId)
         if not avatarId in self._createDeferreds:
@@ -194,6 +130,11 @@
         """
         Notify the caller that a create has failed, and remove the create
         from the list of pending creates.
+
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
+        @param exception: either an exception or a failure describing
+        why the create failed.
         """
         self.debug('create deferred failed for %s', avatarId)
         if not avatarId in self._createDeferreds:
@@ -207,14 +148,22 @@
     def createRegistered(self, avatarId):
         """
         Check if a deferred create has been registered for the given avatarId.
+
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
+
+        @returns: True if a deferred create has been registered for this
+        object, False otherwise
         """
         return avatarId in self._createDeferreds
 
-    def shutdown(self, avatarId):
+    def shutdownStart(self, avatarId):
         """
-        Create and register a deferred for notifying the worker of a
-        clean job shutdown. This deferred will be fired when the job is
-        reaped.
+        Create and register a deferred that will be fired when a process
+        has shut down cleanly.
+
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
@@ -230,10 +179,14 @@
             self._shutdownDeferreds[avatarId] = d
             return d
 
-    def shutdownTrigger(self, avatarId):
+    def shutdownSuccess(self, avatarId):
         """
-        Trigger a previously registered deferred for creating up the given
-        component.
+        Trigger a callback on a deferred previously registered via
+        shutdownStart(). For example, a JobHeaven would call this when a
+        job for which shutdownStart() was called is reaped.
+
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
         """
         self.debug('triggering shutdown deferred for %s', avatarId)
         if not avatarId in self._shutdownDeferreds:
@@ -245,371 +198,51 @@
 
     def shutdownRegistered(self, avatarId):
         """
-        Check if a deferred shutdown has been registered for the given avatarId.
-        """
-        return avatarId in self._shutdownDeferreds
+        Check if a deferred shutdown has been registered for the given
+        avatarId.
 
-class JobHeaven(pb.Root, log.Loggable):
-    """
-    I am similar to but not quite the same as a manager-side Heaven.
-    I manage avatars inside the worker for job processes spawned by the worker.
-
-    @ivar avatars: dict of avatarId -> avatar
-    @type avatars: dict of str -> L{JobAvatar}
-    @ivar brain:   the worker brain
-    @type brain:   L{WorkerBrain}
-    """
-
-    logCategory = "job-heaven"
-    compat.implements(portal.IRealm)
-
-    def __init__(self, brain):
-        """
-        @param brain:       a reference to the worker brain
-        @type  brain:       L{WorkerBrain}
-        @param socketPath:  the path of the Unix domain socket for PB
-        @type  socketPath:  str
-        """
-        self.avatars = {} # componentId -> avatar
-        self.brain = brain
-        self._socketPath = _getSocketPath()
-        self._port = None
-        self._onShutdown = None # If set, a deferred to fire when our last child
-                                # process exits
-
-        self._jobInfos = {} # avatarId -> JobInfo
-
-        self._deferredStarts = DeferredStartSet(lambda: self.avatars)
-
-    def listen(self):
-        assert self._port is None
-        # FIXME: we should hand a username and password to log in with to
-        # the job process instead of allowing anonymous
-        checker = checkers.FlexibleCredentialsChecker()
-        checker.allowPasswordless(True)
-        p = portal.Portal(self, [checker])
-        f = pb.PBServerFactory(p)
-        try:
-            os.unlink(self._socketPath)
-        except:
-            pass
-
-        # Rather than a listenUNIX(), we use listenWith so that we can specify
-        # our particular Port, which creates Transports that we know how to
-        # pass FDs over.
-        port = reactor.listenWith(fdserver.FDPort, self._socketPath, f)
-        self._port = port
-        
-    ### portal.IRealm method
-    def requestAvatar(self, avatarId, mind, *interfaces):
-        if pb.IPerspective in interfaces:
-            avatar = JobAvatar(self, avatarId, mind)
-            self.avatars[avatarId] = avatar
-            return pb.IPerspective, avatar, avatar.logout
-        else:
-            raise NotImplementedError("no interface")
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
 
-    def getManagerConnectionInfo(self):
+        @returns: True if a deferred shutdown has been registered for
+        this object, False otherwise
         """
-        Gets the L{flumotion.common.connection.PBConnectionInfo}
-        describing how to connect to the manager.
+        return avatarId in self._shutdownDeferreds
 
-        @rtype: L{flumotion.common.connection.PBConnectionInfo}
+    def avatarStarted(self, avatarId):
         """
-        return self.brain.managerConnectionInfo
-
-    def getWorkerName(self):
-        """
-        Gets the name of the worker that spawns the process.
-
-        @rtype: str
-        """
-        return self.brain.workerName
-
-    def spawn(self, avatarId, type, moduleName, methodName, nice, bundles):
-        """
-        Spawn a new job.
-
-        This will spawn a new flumotion-job process, running under the
-        requested nice level. When the job logs in, it will be told to
-        load bundles and run a function, which is expected to return a
-        component.
-
-        @param avatarId:   avatarId the component should use to log in
-        @type  avatarId:   str
-        @param type:       type of component to start
-        @type  type:       str
-        @param moduleName: name of the module to create the component from
-        @type  moduleName: str
-        @param methodName: the factory method to use to create the component
-        @type  methodName: str
-        @param nice:       nice level
-        @type  nice:       int
-        @param bundles:    ordered list of (bundleName, bundlePath) for this
-                           component
-        @type  bundles:    list of (str, str)
-        """
-        # can raise ComponentAlreadyStartingError
-        d = self._deferredStarts.create(avatarId)
-
-        p = JobProcessProtocol(self, avatarId, self._deferredStarts)
-        executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job')
-        if not os.path.exists(executable):
-            self.error("Trying to spawn job process, but '%s' does not "
-                       "exist", executable)
-        argv = [executable, avatarId, self._socketPath]
-
-        realexecutable = executable
-
-        # Run some jobs under valgrind, optionally. Would be nice to have the
-        # arguments to run it with configurable, but this'll do for now.
-        # FLU_VALGRIND_JOB takes a comma-seperated list of full component
-        # avatar IDs.
-        if os.environ.has_key('FLU_VALGRIND_JOB'):
-            jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
-            if avatarId in jobnames:
-                realexecutable = 'valgrind'
-                # We can't just valgrind flumotion-job, we have to valgrind
-                # python running flumotion-job, otherwise we'd need 
-                # --trace-children (not quite sure why), which we don't want
-                argv = ['valgrind', '--leak-check=full', '--num-callers=24', 
-                    '--leak-resolution=high', '--show-reachable=yes', 
-                    'python'] + argv
-
-        childFDs = {0: 0, 1: 1, 2: 2}
-        env = {}
-        env.update(os.environ)
-        env['FLU_DEBUG'] = log.getDebug()
-        process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
-            childFDs=childFDs)
-
-        p.setPid(process.pid)
-
-        jobInfo = JobInfo(process.pid, avatarId, type, moduleName,
-                          methodName, nice, bundles)
-        self._jobInfos[avatarId] = jobInfo
-
-        return d
+        Notify the startset that an avatar has started. If there was a
+        create deferred registered for this avatar, this will cause
+        createSuccess() to be called.
 
-    def getJobInfo(self, avatarId):
-        return self._jobInfos[avatarId]
-    
-    def getJobInfos(self):
-        return self._jobInfos.values()
-
-    def getJobAvatarIds(self):
-        return self._jobInfos.keys()
-
-    def rotateChildLogFDs(self):
-        self.debug('telling kids about new log file descriptors')
-        for avatar in self.avatars.values():
-            avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
-
-    def jobStopped(self, avatarId):
-        if avatarId in self.avatars:
-            del self.avatars[avatarId]
-        else:
-            self.warning("some programmer is telling me about an avatar "
-                         "I have no idea about: %s", avatarId)
-
-        if avatarId in self._jobInfos:
-            self.debug('Removing job info for %s', avatarId)
-            del self._jobInfos[avatarId]
-
-            if not self._jobInfos and self._onShutdown:
-                self.debug("Last child exited")
-                self._onShutdown.callback(None)
-        else:
-            self.warning("some programmer is telling me about an avatar "
-                         "I have no idea about: %s", avatarId)
-
-    def shutdown(self):
-        self.debug('Shutting down JobHeaven')
-        self.debug('Stopping all jobs')
-        for avatar in self.avatars.values():
-            avatar.stop()
-
-        if self.avatars:
-            # If our jobs fail to shut down nicely within some period of
-            # time, shut them down less nicely
-            dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill)
-            def cancelDelayedCall(res, dc):
-                # be nice to unit tests
-                if dc.active():
-                    dc.cancel()
-                return res
-
-            self._onShutdown = defer.Deferred()
-            self._onShutdown.addCallback(cancelDelayedCall, dc)
-            ret = self._onShutdown
-        else:
-            # everything's gone already, return success
-            ret = defer.succeed(None)
-
-        def stopListening(_):
-            # possible for it to be None, if we haven't been told to
-            # listen yet, as in some test cases
-            if self._port:
-                port = self._port
-                self._port = None
-                return port.stopListening()
-        ret.addCallback(stopListening)
-        return ret
-
-    def kill(self):
-        self.warning("Killing all children immediately")
-        for jobInfo in self.getJobInfos():
-            self.debug("Sending SIGKILL to pid %d", jobInfo.pid)
-            common.killPid(jobInfo.pid)
-
-class JobAvatar(fpb.Avatar, log.Loggable):
-    """
-    I am an avatar for the job living in the worker.
-    """
-    logCategory = 'job-avatar'
-
-    def __init__(self, heaven, avatarId, mind):
-        """
-        @type  heaven:   L{flumotion.worker.worker.JobHeaven}
-        @type  avatarId: str
-        """
-        fpb.Avatar.__init__(self, avatarId)
-        self._heaven = heaven
-        self.setMind(mind)
-            
-    def setMind(self, mind):
-        """
-        @param mind: reference to the job's JobMedium on which we can call
-        @type  mind: L{twisted.spread.pb.RemoteReference}
-        """
-        fpb.Avatar.setMind(self, mind)
-
-        def bootstrap(*args):
-            return self.mindCallRemote('bootstrap', *args)
-
-        def create(_, job):
-            self.debug("asking job to create component with avatarId %s,"
-                       " type %s", job.avatarId, job.type)
-            return self.mindCallRemote('create', job.avatarId, job.type,
-                                       job.moduleName, job.methodName,
-                                       job.nice)
-
-        def success(_, avatarId):
-            self.debug('job started component with avatarId %s',
-                       avatarId)
-            # FIXME: drills down too much?
-            self._heaven._deferredStarts.createTrigger(avatarId)
-
-        def error(failure, job):
-            msg = log.getFailureMessage(failure)
-            if failure.check(errors.ComponentCreateError):
-                self.warning('could not create component %s of type %s:',
-                             ' %s', job.avatarId, job.type, msg)
-            else:
-                self.warning('unhandled error creating component %s: %s',
-                             job.avatarId, msg)
-            # FIXME: drills down too much?
-            self._heaven._deferredStarts.createFailed(job.avatarId,
-                                                      failure)
-            
-        info = self._heaven.getManagerConnectionInfo()
-        if info.use_ssl:
-            transport = 'ssl'
-        else:
-            transport = 'tcp'
-        workerName = self._heaven.getWorkerName()
-        job = self._heaven.getJobInfo(self.avatarId)
-
-        d = bootstrap(workerName, info.host, info.port, transport,
-                      info.authenticator, job.bundles)
-        d.addCallback(create, job)
-        d.addCallback(success, job.avatarId)
-        d.addErrback(error, job)
-        return d
-
-    def logout(self):
-        self.log('logout called, %s disconnected', self.avatarId)
-        # don't need to do anything much here, we get notified via
-        # sigchld
-        
-    def stop(self):
-        """
-        returns: a deferred marking completed stop.
-        """
-        if not self.mind:
-            self.debug('already logged out')
-            return defer.succeed(None)
-        else:
-            self.debug('stopping')
-            return self.mindCallRemote('stop')
-
-    def _sendFileDescriptor(self, fd, message):
-        try:
-            # FIXME: pay attention to the return value of
-            # sendFileDescriptor; is the same as the return value of
-            # sendmsg(2)
-            self.mind.broker.transport.sendFileDescriptor(fd, message)
-            return True
-        except RuntimeError, e:
-            # RuntimeError is what is thrown by the C code doing this
-            # when there are issues
-            self.warning("RuntimeError %s sending file descriptors",
-                         log.getExceptionMessage(e))
-            return False
-
-    def logTo(self, stdout, stderr):
-        """
-        Tell the job to log to the given file descriptors.
-        """
-        self.debug('Giving job new stdout and stderr')
-        if self.mind:
-            self._sendFileDescriptor(stdout, "redirectStdout")
-            self._sendFileDescriptor(stdout, "redirectStderr")
-
-    def sendFeed(self, feedName, fd, eaterId):
-        """
-        Tell the feeder to send the given feed to the given fd.
-
-        @returns: whether the fd was successfully handed off to the component.
-        """
-        self.debug('Sending FD %d to component job to feed %s to fd',
-                   fd, feedName)
-
-        # it is possible that the component has logged out, in which
-        # case we don't have a mind. Trying to check for this earlier
-        # only introduces a race, so we handle it here by triggering a
-        # disconnect on the fd.
-        if self.mind:
-            message = "sendFeed %s %s" % (feedName, eaterId)
-            return self._sendFileDescriptor(fd, message)
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
+        """
+        if avatarId in self._createDeferreds:
+            self.createSuccess(avatarId)
         else:
-            self.debug('my mind is gone, trigger disconnect')
-            return False
+            self.log('avatar %s started, but we were not waiting for'
+                     ' it', avatarId)
 
-    def receiveFeed(self, feedId, fd):
+    def avatarStopped(self, avatarId, getFailure):
         """
-        Tell the feeder to receive the given feed from the given fd.
+        Notify the startset that an avatar has stopped. If there was a
+        shutdown deferred registered for this avatar, this will cause
+        shutdownSuccess() to be called.
 
-        @returns: whether the fd was successfully handed off to the component.
-        """
-        self.debug('Sending FD %d to component job to eat %s from fd',
-                   fd, feedId)
+        On the other hand, if there was a create deferred still pending,
+        we will call createFailed with the result of calling getFailure.
 
-        # same note as in sendFeed
-        if self.mind:
-            message = "receiveFeed %s" % (feedId,)
-            return self._sendFileDescriptor(fd, message)
-        else:
-            self.debug('my mind is gone, trigger disconnect')
-            return False
+        If no start or create was registered, we do nothing.
 
-    def perspective_cleanShutdown(self):
+        @param avatarId: the id of the remote process, for example the
+        avatarId of the job
+        @param getFailure: procedure of type avatarId -> Failure. The
+        returned failure should describe the reason that the job failed.
         """
-        This notification from the job process will be fired when it is
-        shutting down, so that although the process might still be
-        around, we know it's OK to accept new start requests for this
-        avatar ID.
-        """
-        self.info("component %s shutting down cleanly", self.avatarId)
-        # FIXME: drills down too much?
-        self._heaven._deferredStarts.shutdown(self.avatarId)
+        if avatarId in self._createDeferreds:
+            self.createFailed(avatarId, getFailure(avatarId))
+        elif avatarId in self._shutdownDeferreds:
+            self.shutdownSuccess(avatarId)
+        else:
+            self.debug('unknown avatar %s logged out', avatarId)


More information about the flumotion-commit mailing list