msmith - in flumotion/trunk: . flumotion/manager

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Wed Jul 18 14:37:45 CEST 2007


Author: msmith
Date: Wed Jul 18 14:37:41 2007
New Revision: 5323

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/manager/component.py
   flumotion/trunk/flumotion/manager/depgraph.py
   flumotion/trunk/flumotion/manager/manager.py
Log:
        * flumotion/manager/depgraph.py:
          Remove Feeder, Eater classes, which are unused.
          Make state dict contain a callable as well as a boolean for each DAG
          node.
          All methods that add a node now get passed a callable as well.
          Remove whatShouldBeStarted entirely.
          On _setState(True), call appropriate down-graph callables, if we
          can.
          All actions needed by the depgraph are now directly driven from the
          depgraph code.
        * flumotion/manager/component.py:
          Remove tryWhatCanBeStarted. Split the individual things out into
          separate methods. These are used as callables in the depgraph now.
        * flumotion/manager/manager.py:
          Changes for depgraph API additions.

          Fixes #691.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Wed Jul 18 14:37:41 2007
@@ -1,3 +1,23 @@
+2007-07-18  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/manager/depgraph.py:
+	  Remove Feeder, Eater classes, which are unused.
+	  Make state dict contain a callable as well as a boolean for each DAG
+	  node. 
+	  All methods that add a node now get passed a callable as well.
+	  Remove whatShouldBeStarted entirely.
+	  On _setState(True), call appropriate down-graph callables, if we
+	  can.
+	  All actions needed by the depgraph are now directly driven from the
+	  depgraph code.
+	* flumotion/manager/component.py:
+	  Remove tryWhatCanBeStarted. Split the individual things out into
+	  separate methods. These are used as callables in the depgraph now.
+	* flumotion/manager/manager.py:
+	  Changes for depgraph API additions.
+
+	  Fixes #691.
+
 2007-07-12  Thomas Vander Stichele  <thomas at apestaart dot org>
 
 	* flumotion/component/feedcomponent010.py:

Modified: flumotion/trunk/flumotion/manager/component.py
==============================================================================
--- flumotion/trunk/flumotion/manager/component.py	(original)
+++ flumotion/trunk/flumotion/manager/component.py	Wed Jul 18 14:37:41 2007
@@ -835,156 +835,119 @@
         d.addCallback(start)
         return d
 
-    def _tryWhatCanBeStarted(self, result=True):
-        """
-        I try to start nodes in the depgraph if they should be started.  I am
-        a recursive method, because the depgraph's list of what should be
-        started may change when nodes start/stop.
-        
-        @param result: only needed because this method is added as a callback
-        """
-        
-        # Generic failure handler for 
-        # synchronous and asynchronous errors
-        def handleFailure(failure, avatar, message, id_template):
-            log.warningFailure(failure, swallow=False)
-            if failure.check(errors.HandledException):
-                self.debug('failure %r already handled' % failure)
-                return                                        
-            self.debug('showing error message for failure %r' % failure)            
-            m = messages.Error(message, 
-               id=id_template % componentAvatar.avatarId,
-               debug=log.getFailureMessage(failure))
-            avatar._addMessage(m)
-            avatar._setMood(moods.sad)
-
-        self.debug("tryWhatCanBeStarted")
-        deplist = self.vishnu._depgraph.whatShouldBeStarted()
-        self.debug("Listing deplist")
-
-        if not deplist:
-            self.debug("Nothing needs to be setup or started!")
-            return
-        for dep in deplist:
-            self.debug("Deplist: %r,%r" % (dep[0], dep[1]))
-
-        # we handle all direct dependencies;
-        # an error for one of them shouldn't stop handling the others
-        for dep, deptype in deplist:
-            if dep:
-                if deptype == "COMPONENTSETUP":
-                    self.debug("Component %s to be setup" % dep.get("name"))
-                    componentAvatar = self.getComponentAvatarForState(dep)
-                    if componentAvatar:
-                        if not componentAvatar._beingSetup:
-                            componentAvatar._beingSetup = True
-                            # specific setup failure handler
-                            def componentSetupFailed(failure):
-                                componentAvatar._beingSetup = False                            
-                                handleFailure(failure, componentAvatar,
-                                         T_(N_("Could not setup component.")),
-                                         "component-setup-%s")
-                            try:
-                                d = self.setupComponent(componentAvatar)
-                            except:
-                                # give feedback of synchronous failures 
-                                # to the componentAvatar, and resume the loop
-                                componentSetupFailed(Failure())
-                                continue
-                            # add callback because nodes that can be
-                            # started as a result of this component being
-                            # setup may not be in the current list, and
-                            # add errback to be able to give feedback
-                            # of asynchronous failures to the componentAvatar.
-                            def setupErrback(failure):
-                                componentSetupFailed(failure)
-                                raise errors.ComponentSetupHandledError(failure)                            
-                            d.addCallbacks(self._tryWhatCanBeStarted, 
-                                setupErrback)
-                        else:
-                            self.debug(
-                                "Component %s already on way to being setup",
-                                dep.get("name"))
-                    else:
-                        self.debug(
-                            "Component %s to be setup but has no avatar yet",
-                                dep.get("name"))
-                elif deptype == "COMPONENTSTART":
-                    self.debug("Component %s to be started" % dep.get("name"))
-                    componentAvatar = self.getComponentAvatarForState(dep)
-                    if not componentAvatar._starting:
-                        componentAvatar._starting = True
-                        happyd = defer.Deferred()
-                        # since we've reached happy, we should clear the pending
-                        # mood - it is done transitioning
-                        happyd.addCallback(lambda r, s: s.set(
-                            'moodPending', None), 
-                            componentAvatar.componentState)
-                        # add callback because nodes that can be
-                        # started as a result of this component being
-                        # happy may not be in the current list.
-                        happyd.addCallback(self._tryWhatCanBeStarted)
-                        componentAvatar._happydefers.append(happyd)
-
-                        # specific startup failure handler
-                        def componentStartupFailed(failure):
-                            componentAvatar._starting = False
-                            handleFailure(failure, componentAvatar,
-                                         T_(N_("Could not start component.")),
-                                         "component-start-%s")
-                        try:
-                            d = self._startComponent(componentAvatar)
-                        except:
-                            # give feedback of synchronous failures 
-                            # to the componentAvatar, and resume the loop
-                            componentStartupFailed(Failure())
-                            continue                        
-                        # add errback to be able to give feedback
-                        # of asynchronous failures to the componentAvatar.
-                        def startErrback(failure):
-                            componentStartupFailed(failure)
-                            raise errors.ComponentStartHandledError(failure)
-                        d.addErrback(startErrback)
-                    else:
-                        self.log("Component is already starting")
-                elif deptype == "CLOCKMASTER":
-                    self.debug("Component %s to be clock master!",
-                        dep.get("name"))
-                    componentAvatar = self.getComponentAvatarForState(dep)
-                    if componentAvatar:
-                        if not componentAvatar._providingClock:
-                            componentAvatar._providingClock = True
-                            # specific master clock failure handler
-                            def componentMasterClockFailed(failure):
-                                componentAvatar._providingClock = False
-                                handleFailure(failure, componentAvatar,
-                                      T_(N_("Could not setup component's master clock.")),
-                                      "component-clock-%s")
-                            try:
-                                d = self.provideMasterClock(componentAvatar)
-                            except:
-                                # give feedback of synchronous failures 
-                                # to the componentAvatar and resume the loop
-                                componentMasterClockFailed(Failure())
-                                continue                                
-                            # add callback because nodes that can be
-                            # started as a result of this component providing
-                            # master clock may not be in the current list, and
-                            # add errback to be able to give feedback
-                            # of asynchronous failures to the componentAvatar.
-                            def clockMasterErrback(failure):
-                                componentMasterClockFailed(failure)
-                                raise errors.ComponentStartHandledError(failure)                            
-                            d.addCallbacks(self._tryWhatCanBeStarted, 
-                                clockMasterErrback)
-                        else:
-                            self.debug(
-                                "Component %s already on way to clock master", 
-                                dep.get("name"))
-                else:
-                    self.debug("Unknown dependency type")
+    # Generic failure handler for 
+    # synchronous and asynchronous errors
+    def handleFailure(self, failure, avatar, message, id_template):
+        log.warningFailure(failure, swallow=False)
+        if failure.check(errors.HandledException):
+            self.debug('failure %r already handled' % failure)
+            return                                        
+        self.debug('showing error message for failure %r' % failure)            
+
+        m = messages.Error(message, 
+           id=id_template % avatar.avatarId,
+           debug=log.getFailureMessage(failure))
+        avatar._addMessage(m)
+        avatar._setMood(moods.sad)
+
+    def setupClockMaster(self, componentState):
+        self.debug("Component %s to be clock master!", 
+            componentState.get('name'))
+        componentAvatar = self.getComponentAvatarForState(componentState)
+        if componentAvatar:
+            if not componentAvatar._providingClock:
+                componentAvatar._providingClock = True
+                # specific master clock failure handler
+                def componentMasterClockFailed(failure):
+                    componentAvatar._providingClock = False
+                    self.handleFailure(failure, componentAvatar, 
+                        T_(N_("Could not setup component's master clock.")),
+                        "component-clock-%s")
+                try:
+                    d = self.provideMasterClock(componentAvatar)
+                except:
+                    # give feedback of synchronous failures 
+                    # to the componentAvatar
+                    componentMasterClockFailed(Failure())
+                    return
+                # Add errback to be able to give feedback
+                # of asynchronous failures to the componentAvatar.
+                def clockMasterErrback(failure):
+                    componentMasterClockFailed(failure)
+                    raise errors.ComponentStartHandledError(failure)                            
+                d.addErrback(clockMasterErrback)
+            else:
+                self.debug("Component %s already on way to clock master", 
+                    componentState.get("name"))
+
+    def startComponent(self, componentState):
+        self.debug("Component %s to be started" % componentState.get("name"))
+        componentAvatar = self.getComponentAvatarForState(componentState)
+        if not componentAvatar._starting:
+            componentAvatar._starting = True
+            happyd = defer.Deferred()
+            # When we reach happy, we should clear the pending
+            # mood - it is done transitioning
+            happyd.addCallback(lambda r, s: s.set('moodPending', None), 
+                componentState)
+            # TODO: Ugly poking in privates
+            componentAvatar._happydefers.append(happyd)
+
+            # specific startup failure handler
+            def componentStartupFailed(failure):
+                componentAvatar._starting = False
+                self.handleFailure(failure, componentAvatar,
+                    T_(N_("Could not start component.")),
+                    "component-start-%s")
+            try:
+                d = self._startComponent(componentAvatar)
+            except:
+                # give feedback of synchronous failures 
+                # to the componentAvatar
+                componentStartupFailed(Failure())
+                return
+            # add errback to be able to give feedback
+            # of asynchronous failures to the componentAvatar.
+            def startErrback(failure):
+                componentStartupFailed(failure)
+                raise errors.ComponentStartHandledError(failure)
+            d.addErrback(startErrback)
+        else:
+            self.log("Component is already starting")
+
+    def setupComponent(self, componentState):
+        self.debug("Component %s to be setup" % componentState.get("name"))
+        componentAvatar = self.getComponentAvatarForState(componentState)
+        if componentAvatar:
+            if not componentAvatar._beingSetup:
+                componentAvatar._beingSetup = True
+                # specific setup failure handler
+                def componentSetupFailed(failure):
+                    componentAvatar._beingSetup = False
+                    self.handleFailure(failure, componentAvatar,
+                        T_(N_("Could not setup component.")),
+                        "component-setup-%s")
+                try:
+                    d = self.doSetupComponent(componentAvatar)
+                except:
+                    # give feedback of synchronous failures 
+                    # to the componentAvatar
+                    componentSetupFailed(Failure())
+                    return
+                # add errback to be able to give feedback
+                # of asynchronous failures to the componentAvatar.
+                def setupErrback(failure):
+                    componentSetupFailed(failure)
+                    raise errors.ComponentSetupHandledError(failure)                            
+                d.addErrback(setupErrback)
+            else:
+                self.debug("Component %s already on way to being setup", 
+                    componentState.get("name"))
+        else:
+            self.debug("Component %s to be setup but has no avatar yet", 
+                componentState.get("name"))
 
-    def _setupComponent(self, componentAvatar):
+    def _doSetupComponent(self, componentAvatar):
         # set up the component
         state = componentAvatar.componentState
         conf = state.get('config')
@@ -1013,7 +976,7 @@
             componentAvatar._setMood(moods.sad)
             raise errors.FlumotionError('Could not set up component')
  
-    setupComponent = defer_generator_method(_setupComponent)
+    doSetupComponent = defer_generator_method(_doSetupComponent)
         
     def registerComponent(self, componentAvatar):
         """

Modified: flumotion/trunk/flumotion/manager/depgraph.py
==============================================================================
--- flumotion/trunk/flumotion/manager/depgraph.py	(original)
+++ flumotion/trunk/flumotion/manager/depgraph.py	Wed Jul 18 14:37:41 2007
@@ -22,26 +22,6 @@
 from flumotion.common import dag, log, registry, errors, common
 from flumotion.common.planet import moods
 
-class Feeder:
-    """
-    I am an object representing a feeder in the DepGraph
-    """
-    def __init__(self, feederName, component):
-        self.feederName = feederName
-        self.component = component
-        self.feederData = None
-        
-class Eater:
-    """
-    I am an object representing an eater in the DepGraph
-    """
-    def __init__(self, eaterName, component):
-        # feeder attribute is a reference to the Feeder object
-        # that this eater eats from
-        self.eaterName = eaterName
-        self.component = component
-        self.feeder = None
-
 class DepGraph(log.Loggable):
     """
     I am a dependency graph for components.  I also maintain boolean state
@@ -50,19 +30,24 @@
     I contain a DAG to help with resolving dependencies.
     """
     logCategory = "depgraph"
-
+    
     typeNames = ("WORKER", "JOB", "COMPONENTSETUP", "CLOCKMASTER",
         "COMPONENTSTART")
-    
+
     def __init__(self):
+        # Each node in the DAG is an object (and has a given type, corresponding
+        # to some action that must be taken to progress through the DAG
         self._dag = dag.DAG()
+
+        # (object,type) -> (callable, boolean) mapping. True if the object/type tuple 
+        # corresponding to this action has been done (TODO: make this make sense!)
         self._state = {}
 
-    def _addNode(self, component, type):
+    def _addNode(self, component, type, callable):
         # type: str
         self.debug("Adding node %r of type %s" % (component, type))
         self._dag.addNode(component, type)
-        self._state[(component, type)] = False
+        self._state[(component, type)] = (callable, False)
 
     def _removeNode(self, component, type):
         self.debug("Removing node %r of type %s" % (component, type))
@@ -78,7 +63,7 @@
             parent, parentType, child, childType))
         self._dag.removeEdge(parent, child, parentType, childType)
 
-    def addClockMaster(self, component):
+    def addClockMaster(self, component, setupClockMasterCallable):
         """
         I set a component to be the clock master in the dependency
         graph.  This component must have already been added to the
@@ -88,7 +73,7 @@
         @type  component: L{flumotion.manager.component.ComponentAvatar}
         """
         if self._dag.hasNode(component, "JOB"):
-            self._addNode(component, "CLOCKMASTER")
+            self._addNode(component, "CLOCKMASTER", setupClockMasterCallable)
             self._addEdge(component, component, "COMPONENTSETUP",
                 "CLOCKMASTER")
         
@@ -103,7 +88,7 @@
         else:
             raise KeyError("Component %r has not been added" % component)
 
-    def addComponent(self, component):
+    def addComponent(self, component, setupCallable, startCallable):
         """
         I add a component to the dependency graph.
         This includes adding the worker (if not already added), the job,
@@ -120,9 +105,9 @@
             return
         
         self.debug('adding component %r to depgraph' % component)
-        self._addNode(component, "JOB")
-        self._addNode(component, "COMPONENTSTART")
-        self._addNode(component, "COMPONENTSETUP")
+        self._addNode(component, "JOB", lambda x: None)
+        self._addNode(component, "COMPONENTSTART", startCallable)
+        self._addNode(component, "COMPONENTSETUP", setupCallable)
         self._addEdge(component, component, "JOB", "COMPONENTSETUP")
         workername = component.get('workerRequested')
         if workername:
@@ -140,7 +125,7 @@
         """
         self.debug('adding worker %s' % worker)
         if not self._dag.hasNode(worker, "WORKER"):
-            self._addNode(worker, "WORKER")
+            self._addNode(worker, "WORKER", lambda x: None)
 
     def removeComponent(self, component):
         """
@@ -236,55 +221,12 @@
                                 " %s on component %s" % (
                                 feed, eater, eatingComponent))
 
-    def whatShouldBeStarted(self):
-        """
-        I return a list of things that can and should be started now.
-
-        @return: a list of nodes that should be started, in order
-        @rtype:  list of (object, str)
-        """
-        # A bit tricky because workers can't be started by manager,
-        # and jobs are started automatically when worker is attached
-        # So we get all the stuff sorted by depgraph,
-        # then remove ones that are already have state of True,
-        # then remove ones that are workers who are False, and their offspring,
-        # then remove ones that are jobs who are False, and their offspring,
-        # and also remove nodes that are offspring of nodes with state of False
-        toBeStarted = self._dag.sort()
-        # we want to loop over all objects, so we loop over a copy
-        for obj in toBeStarted[:]:
-            if obj in toBeStarted:
-                self.log("toBeStarted: checking if (%r, %r) needs starting",
-                    obj[0], obj[1])
-                if self._state[obj]:
-                    toBeStarted.remove(obj)
-                elif obj[1] == "WORKER":
-                    # This is a worker not started.
-                    # Let's remove it and its offspring
-                    worker_offspring = self._dag.getOffspringTyped(
-                        obj[0], obj[1])
-                    for offspring in worker_offspring:
-                        if offspring in toBeStarted:
-                            toBeStarted.remove(offspring)
-                    toBeStarted.remove(obj)
-                elif obj[1] == "JOB":
-                    job_offspring = self._dag.getOffspringTyped(obj[0], obj[1])
-                    for offspring in job_offspring:
-                        if offspring in toBeStarted:
-                            toBeStarted.remove(offspring)
-                    toBeStarted.remove(obj)
-                else:
-                    offspring = self._dag.getOffspringTyped(obj[0], obj[1])
-                    for child in offspring:
-                        if child in toBeStarted:
-                            toBeStarted.remove(child)
-        
-        return toBeStarted
-
     def _setState(self, object, type, value):
         self.doLog(log.DEBUG, -2, "Setting state of (%r, %s) to %d" % (
             object, type, value))
-        self._state[(object,type)] = value
+        (callable, oldstate) = self._state[(object,type)]
+        self._state[(object,type)] = (callable, value)
+
         # if making state False, should make its offspring False
         # if the object is the same
         if not value:
@@ -294,7 +236,25 @@
             for kid in offspring:
                 self.debug("Setting state of offspring (%r) to %d", kid, value)
                 if kid[0] == object:
-                    self._state[kid] = False
+                    (callable, oldstate) = self._state[kid]
+                    self._state[kid] = (callable, False)
+        else:
+            # We set this to true. So perhaps we can progress!
+            kids = self._dag.getChildrenTyped(object, type)
+            for (kid, kidtype) in kids:
+                # For each of these we need to check that ALL the parents are
+                # now true before we can go further
+                # if reduce(lambda x,y: x and self._state[y][1], self._dag.getParentsTyped(kid, kidtype), True):
+                parents = self._dag.getParentsTyped(kid, kidtype)
+                ready = True
+                for parent in parents:
+                    if not self._state[parent][1]:
+                        ready = False
+                if ready:
+                    self.debug("Calling callable %r", 
+                        self._state[(kid, kidtype)][0])
+                    self._state[(kid,kidtype)][0](kid)
+
 
     def setComponentStarted(self, component):
         """

Modified: flumotion/trunk/flumotion/manager/manager.py
==============================================================================
--- flumotion/trunk/flumotion/manager/manager.py	(original)
+++ flumotion/trunk/flumotion/manager/manager.py	Wed Jul 18 14:37:41 2007
@@ -473,7 +473,9 @@
     def _updateFlowDependencies(self, state):
         self.debug('registering dependencies of %r' % state)
 
-        self._depgraph.addComponent(state)
+        self._depgraph.addComponent(state, 
+            self.componentHeaven.setupComponent, 
+            self.componentHeaven.startComponent)
 
         conf = state.get('config')
 
@@ -484,7 +486,8 @@
 
         self.debug("Using config: %r for component with avatarId %r", conf, componentAvatarId)
         if componentAvatarId == conf['clock-master']:
-            self._depgraph.addClockMaster(state)
+            self._depgraph.addClockMaster(state, 
+                self.componentHeaven.setupClockMaster)
 
     def _updateStateFromConf(self, _, conf, identity):
         """
@@ -1237,7 +1240,6 @@
         self.debug('vishnu registering component %r' % componentAvatar)
 
         state = componentAvatar.componentState
-        self._depgraph.setJobStarted(state)
         # If this is a reconnecting component, we might also need to set the
         # component as started.
         # If mood is happy or hungry, then the component is running.
@@ -1245,6 +1247,8 @@
         if mood == moods.happy.value or mood == moods.hungry.value:
             self.debug("Component %s is already in mood %s.  Set depgraph "
                 "appropriately", componentAvatar.avatarId, moods.get(mood).name)
+            # TODO: Somehow freeze the depgraph so we don't follow all the links
+            # until these 2 or 3 things are all completed?
             self._depgraph.setComponentSetup(state)
             self._depgraph.setComponentStarted(state)
             if self._depgraph.isAClockMaster(state):
@@ -1254,7 +1258,7 @@
                 self._depgraph.setClockMasterStarted(state)
 
         self.debug('vishnu registered component %r' % componentAvatar)
-        self.componentHeaven._tryWhatCanBeStarted()
+        self._depgraph.setJobStarted(state)
 
     def unregisterComponent(self, componentAvatar):
         # called when the component is logging out


More information about the flumotion-commit mailing list