msmith - in flumotion/trunk: . flumotion/manager
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Wed Jul 18 14:37:45 CEST 2007
Author: msmith
Date: Wed Jul 18 14:37:41 2007
New Revision: 5323
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/manager/component.py
flumotion/trunk/flumotion/manager/depgraph.py
flumotion/trunk/flumotion/manager/manager.py
Log:
* flumotion/manager/depgraph.py:
Remove Feeder, Eater classes, which are unused.
Make state dict contain a callable as well as a boolean for each DAG
node.
All methods that add a node now get passed a callable as well.
Remove whatShouldBeStarted entirely.
On _setState(True), call appropriate down-graph callables, if we
can.
All actions needed by the depgraph are now directly driven from the
depgraph code.
* flumotion/manager/component.py:
Remove tryWhatCanBeStarted. Split the individual things out into
separate methods. These are used as callables in the depgraph now.
* flumotion/manager/manager.py:
Changes for depgraph API additions.
Fixes #691.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Wed Jul 18 14:37:41 2007
@@ -1,3 +1,23 @@
+2007-07-18 Michael Smith <msmith at fluendo.com>
+
+ * flumotion/manager/depgraph.py:
+ Remove Feeder, Eater classes, which are unused.
+ Make state dict contain a callable as well as a boolean for each DAG
+ node.
+ All methods that add a node now get passed a callable as well.
+ Remove whatShouldBeStarted entirely.
+ On _setState(True), call appropriate down-graph callables, if we
+ can.
+ All actions needed by the depgraph are now directly driven from the
+ depgraph code.
+ * flumotion/manager/component.py:
+ Remove tryWhatCanBeStarted. Split the individual things out into
+ separate methods. These are used as callables in the depgraph now.
+ * flumotion/manager/manager.py:
+ Changes for depgraph API additions.
+
+ Fixes #691.
+
2007-07-12 Thomas Vander Stichele <thomas at apestaart dot org>
* flumotion/component/feedcomponent010.py:
Modified: flumotion/trunk/flumotion/manager/component.py
==============================================================================
--- flumotion/trunk/flumotion/manager/component.py (original)
+++ flumotion/trunk/flumotion/manager/component.py Wed Jul 18 14:37:41 2007
@@ -835,156 +835,119 @@
d.addCallback(start)
return d
- def _tryWhatCanBeStarted(self, result=True):
- """
- I try to start nodes in the depgraph if they should be started. I am
- a recursive method, because the depgraph's list of what should be
- started may change when nodes start/stop.
-
- @param result: only needed because this method is added as a callback
- """
-
- # Generic failure handler for
- # synchronous and asynchronous errors
- def handleFailure(failure, avatar, message, id_template):
- log.warningFailure(failure, swallow=False)
- if failure.check(errors.HandledException):
- self.debug('failure %r already handled' % failure)
- return
- self.debug('showing error message for failure %r' % failure)
- m = messages.Error(message,
- id=id_template % componentAvatar.avatarId,
- debug=log.getFailureMessage(failure))
- avatar._addMessage(m)
- avatar._setMood(moods.sad)
-
- self.debug("tryWhatCanBeStarted")
- deplist = self.vishnu._depgraph.whatShouldBeStarted()
- self.debug("Listing deplist")
-
- if not deplist:
- self.debug("Nothing needs to be setup or started!")
- return
- for dep in deplist:
- self.debug("Deplist: %r,%r" % (dep[0], dep[1]))
-
- # we handle all direct dependencies;
- # an error for one of them shouldn't stop handling the others
- for dep, deptype in deplist:
- if dep:
- if deptype == "COMPONENTSETUP":
- self.debug("Component %s to be setup" % dep.get("name"))
- componentAvatar = self.getComponentAvatarForState(dep)
- if componentAvatar:
- if not componentAvatar._beingSetup:
- componentAvatar._beingSetup = True
- # specific setup failure handler
- def componentSetupFailed(failure):
- componentAvatar._beingSetup = False
- handleFailure(failure, componentAvatar,
- T_(N_("Could not setup component.")),
- "component-setup-%s")
- try:
- d = self.setupComponent(componentAvatar)
- except:
- # give feedback of synchronous failures
- # to the componentAvatar, and resume the loop
- componentSetupFailed(Failure())
- continue
- # add callback because nodes that can be
- # started as a result of this component being
- # setup may not be in the current list, and
- # add errback to be able to give feedback
- # of asynchronous failures to the componentAvatar.
- def setupErrback(failure):
- componentSetupFailed(failure)
- raise errors.ComponentSetupHandledError(failure)
- d.addCallbacks(self._tryWhatCanBeStarted,
- setupErrback)
- else:
- self.debug(
- "Component %s already on way to being setup",
- dep.get("name"))
- else:
- self.debug(
- "Component %s to be setup but has no avatar yet",
- dep.get("name"))
- elif deptype == "COMPONENTSTART":
- self.debug("Component %s to be started" % dep.get("name"))
- componentAvatar = self.getComponentAvatarForState(dep)
- if not componentAvatar._starting:
- componentAvatar._starting = True
- happyd = defer.Deferred()
- # since we've reached happy, we should clear the pending
- # mood - it is done transitioning
- happyd.addCallback(lambda r, s: s.set(
- 'moodPending', None),
- componentAvatar.componentState)
- # add callback because nodes that can be
- # started as a result of this component being
- # happy may not be in the current list.
- happyd.addCallback(self._tryWhatCanBeStarted)
- componentAvatar._happydefers.append(happyd)
-
- # specific startup failure handler
- def componentStartupFailed(failure):
- componentAvatar._starting = False
- handleFailure(failure, componentAvatar,
- T_(N_("Could not start component.")),
- "component-start-%s")
- try:
- d = self._startComponent(componentAvatar)
- except:
- # give feedback of synchronous failures
- # to the componentAvatar, and resume the loop
- componentStartupFailed(Failure())
- continue
- # add errback to be able to give feedback
- # of asynchronous failures to the componentAvatar.
- def startErrback(failure):
- componentStartupFailed(failure)
- raise errors.ComponentStartHandledError(failure)
- d.addErrback(startErrback)
- else:
- self.log("Component is already starting")
- elif deptype == "CLOCKMASTER":
- self.debug("Component %s to be clock master!",
- dep.get("name"))
- componentAvatar = self.getComponentAvatarForState(dep)
- if componentAvatar:
- if not componentAvatar._providingClock:
- componentAvatar._providingClock = True
- # specific master clock failure handler
- def componentMasterClockFailed(failure):
- componentAvatar._providingClock = False
- handleFailure(failure, componentAvatar,
- T_(N_("Could not setup component's master clock.")),
- "component-clock-%s")
- try:
- d = self.provideMasterClock(componentAvatar)
- except:
- # give feedback of synchronous failures
- # to the componentAvatar and resume the loop
- componentMasterClockFailed(Failure())
- continue
- # add callback because nodes that can be
- # started as a result of this component providing
- # master clock may not be in the current list, and
- # add errback to be able to give feedback
- # of asynchronous failures to the componentAvatar.
- def clockMasterErrback(failure):
- componentMasterClockFailed(failure)
- raise errors.ComponentStartHandledError(failure)
- d.addCallbacks(self._tryWhatCanBeStarted,
- clockMasterErrback)
- else:
- self.debug(
- "Component %s already on way to clock master",
- dep.get("name"))
- else:
- self.debug("Unknown dependency type")
+ # Generic failure handler for
+ # synchronous and asynchronous errors
+ def handleFailure(self, failure, avatar, message, id_template):
+ log.warningFailure(failure, swallow=False)
+ if failure.check(errors.HandledException):
+ self.debug('failure %r already handled' % failure)
+ return
+ self.debug('showing error message for failure %r' % failure)
+
+ m = messages.Error(message,
+ id=id_template % avatar.avatarId,
+ debug=log.getFailureMessage(failure))
+ avatar._addMessage(m)
+ avatar._setMood(moods.sad)
+
+ def setupClockMaster(self, componentState):
+ self.debug("Component %s to be clock master!",
+ componentState.get('name'))
+ componentAvatar = self.getComponentAvatarForState(componentState)
+ if componentAvatar:
+ if not componentAvatar._providingClock:
+ componentAvatar._providingClock = True
+ # specific master clock failure handler
+ def componentMasterClockFailed(failure):
+ componentAvatar._providingClock = False
+ self.handleFailure(failure, componentAvatar,
+ T_(N_("Could not setup component's master clock.")),
+ "component-clock-%s")
+ try:
+ d = self.provideMasterClock(componentAvatar)
+ except:
+ # give feedback of synchronous failures
+ # to the componentAvatar
+ componentMasterClockFailed(Failure())
+ return
+ # Add errback to be able to give feedback
+ # of asynchronous failures to the componentAvatar.
+ def clockMasterErrback(failure):
+ componentMasterClockFailed(failure)
+ raise errors.ComponentStartHandledError(failure)
+ d.addErrback(clockMasterErrback)
+ else:
+ self.debug("Component %s already on way to clock master",
+ componentState.get("name"))
+
+ def startComponent(self, componentState):
+ self.debug("Component %s to be started" % componentState.get("name"))
+ componentAvatar = self.getComponentAvatarForState(componentState)
+ if not componentAvatar._starting:
+ componentAvatar._starting = True
+ happyd = defer.Deferred()
+ # When we reach happy, we should clear the pending
+ # mood - it is done transitioning
+ happyd.addCallback(lambda r, s: s.set('moodPending', None),
+ componentState)
+ # TODO: Ugly poking in privates
+ componentAvatar._happydefers.append(happyd)
+
+ # specific startup failure handler
+ def componentStartupFailed(failure):
+ componentAvatar._starting = False
+ self.handleFailure(failure, componentAvatar,
+ T_(N_("Could not start component.")),
+ "component-start-%s")
+ try:
+ d = self._startComponent(componentAvatar)
+ except:
+ # give feedback of synchronous failures
+ # to the componentAvatar
+ componentStartupFailed(Failure())
+ return
+ # add errback to be able to give feedback
+ # of asynchronous failures to the componentAvatar.
+ def startErrback(failure):
+ componentStartupFailed(failure)
+ raise errors.ComponentStartHandledError(failure)
+ d.addErrback(startErrback)
+ else:
+ self.log("Component is already starting")
+
+ def setupComponent(self, componentState):
+ self.debug("Component %s to be setup" % componentState.get("name"))
+ componentAvatar = self.getComponentAvatarForState(componentState)
+ if componentAvatar:
+ if not componentAvatar._beingSetup:
+ componentAvatar._beingSetup = True
+ # specific setup failure handler
+ def componentSetupFailed(failure):
+ componentAvatar._beingSetup = False
+ self.handleFailure(failure, componentAvatar,
+ T_(N_("Could not setup component.")),
+ "component-setup-%s")
+ try:
+ d = self.doSetupComponent(componentAvatar)
+ except:
+ # give feedback of synchronous failures
+ # to the componentAvatar
+ componentSetupFailed(Failure())
+ return
+ # add errback to be able to give feedback
+ # of asynchronous failures to the componentAvatar.
+ def setupErrback(failure):
+ componentSetupFailed(failure)
+ raise errors.ComponentSetupHandledError(failure)
+ d.addErrback(setupErrback)
+ else:
+ self.debug("Component %s already on way to being setup",
+ componentState.get("name"))
+ else:
+ self.debug("Component %s to be setup but has no avatar yet",
+ componentState.get("name"))
- def _setupComponent(self, componentAvatar):
+ def _doSetupComponent(self, componentAvatar):
# set up the component
state = componentAvatar.componentState
conf = state.get('config')
@@ -1013,7 +976,7 @@
componentAvatar._setMood(moods.sad)
raise errors.FlumotionError('Could not set up component')
- setupComponent = defer_generator_method(_setupComponent)
+ doSetupComponent = defer_generator_method(_doSetupComponent)
def registerComponent(self, componentAvatar):
"""
Modified: flumotion/trunk/flumotion/manager/depgraph.py
==============================================================================
--- flumotion/trunk/flumotion/manager/depgraph.py (original)
+++ flumotion/trunk/flumotion/manager/depgraph.py Wed Jul 18 14:37:41 2007
@@ -22,26 +22,6 @@
from flumotion.common import dag, log, registry, errors, common
from flumotion.common.planet import moods
-class Feeder:
- """
- I am an object representing a feeder in the DepGraph
- """
- def __init__(self, feederName, component):
- self.feederName = feederName
- self.component = component
- self.feederData = None
-
-class Eater:
- """
- I am an object representing an eater in the DepGraph
- """
- def __init__(self, eaterName, component):
- # feeder attribute is a reference to the Feeder object
- # that this eater eats from
- self.eaterName = eaterName
- self.component = component
- self.feeder = None
-
class DepGraph(log.Loggable):
"""
I am a dependency graph for components. I also maintain boolean state
@@ -50,19 +30,24 @@
I contain a DAG to help with resolving dependencies.
"""
logCategory = "depgraph"
-
+
typeNames = ("WORKER", "JOB", "COMPONENTSETUP", "CLOCKMASTER",
"COMPONENTSTART")
-
+
def __init__(self):
+ # Each node in the DAG is an object (and has a given type, corresponding
+ # to some action that must be taken to progress through the DAG
self._dag = dag.DAG()
+
+ # (object,type) -> (callable, boolean) mapping. True if the object/type tuple
+ # corresponding to this action has been done (TODO: make this make sense!)
self._state = {}
- def _addNode(self, component, type):
+ def _addNode(self, component, type, callable):
# type: str
self.debug("Adding node %r of type %s" % (component, type))
self._dag.addNode(component, type)
- self._state[(component, type)] = False
+ self._state[(component, type)] = (callable, False)
def _removeNode(self, component, type):
self.debug("Removing node %r of type %s" % (component, type))
@@ -78,7 +63,7 @@
parent, parentType, child, childType))
self._dag.removeEdge(parent, child, parentType, childType)
- def addClockMaster(self, component):
+ def addClockMaster(self, component, setupClockMasterCallable):
"""
I set a component to be the clock master in the dependency
graph. This component must have already been added to the
@@ -88,7 +73,7 @@
@type component: L{flumotion.manager.component.ComponentAvatar}
"""
if self._dag.hasNode(component, "JOB"):
- self._addNode(component, "CLOCKMASTER")
+ self._addNode(component, "CLOCKMASTER", setupClockMasterCallable)
self._addEdge(component, component, "COMPONENTSETUP",
"CLOCKMASTER")
@@ -103,7 +88,7 @@
else:
raise KeyError("Component %r has not been added" % component)
- def addComponent(self, component):
+ def addComponent(self, component, setupCallable, startCallable):
"""
I add a component to the dependency graph.
This includes adding the worker (if not already added), the job,
@@ -120,9 +105,9 @@
return
self.debug('adding component %r to depgraph' % component)
- self._addNode(component, "JOB")
- self._addNode(component, "COMPONENTSTART")
- self._addNode(component, "COMPONENTSETUP")
+ self._addNode(component, "JOB", lambda x: None)
+ self._addNode(component, "COMPONENTSTART", startCallable)
+ self._addNode(component, "COMPONENTSETUP", setupCallable)
self._addEdge(component, component, "JOB", "COMPONENTSETUP")
workername = component.get('workerRequested')
if workername:
@@ -140,7 +125,7 @@
"""
self.debug('adding worker %s' % worker)
if not self._dag.hasNode(worker, "WORKER"):
- self._addNode(worker, "WORKER")
+ self._addNode(worker, "WORKER", lambda x: None)
def removeComponent(self, component):
"""
@@ -236,55 +221,12 @@
" %s on component %s" % (
feed, eater, eatingComponent))
- def whatShouldBeStarted(self):
- """
- I return a list of things that can and should be started now.
-
- @return: a list of nodes that should be started, in order
- @rtype: list of (object, str)
- """
- # A bit tricky because workers can't be started by manager,
- # and jobs are started automatically when worker is attached
- # So we get all the stuff sorted by depgraph,
- # then remove ones that are already have state of True,
- # then remove ones that are workers who are False, and their offspring,
- # then remove ones that are jobs who are False, and their offspring,
- # and also remove nodes that are offspring of nodes with state of False
- toBeStarted = self._dag.sort()
- # we want to loop over all objects, so we loop over a copy
- for obj in toBeStarted[:]:
- if obj in toBeStarted:
- self.log("toBeStarted: checking if (%r, %r) needs starting",
- obj[0], obj[1])
- if self._state[obj]:
- toBeStarted.remove(obj)
- elif obj[1] == "WORKER":
- # This is a worker not started.
- # Let's remove it and its offspring
- worker_offspring = self._dag.getOffspringTyped(
- obj[0], obj[1])
- for offspring in worker_offspring:
- if offspring in toBeStarted:
- toBeStarted.remove(offspring)
- toBeStarted.remove(obj)
- elif obj[1] == "JOB":
- job_offspring = self._dag.getOffspringTyped(obj[0], obj[1])
- for offspring in job_offspring:
- if offspring in toBeStarted:
- toBeStarted.remove(offspring)
- toBeStarted.remove(obj)
- else:
- offspring = self._dag.getOffspringTyped(obj[0], obj[1])
- for child in offspring:
- if child in toBeStarted:
- toBeStarted.remove(child)
-
- return toBeStarted
-
def _setState(self, object, type, value):
self.doLog(log.DEBUG, -2, "Setting state of (%r, %s) to %d" % (
object, type, value))
- self._state[(object,type)] = value
+ (callable, oldstate) = self._state[(object,type)]
+ self._state[(object,type)] = (callable, value)
+
# if making state False, should make its offspring False
# if the object is the same
if not value:
@@ -294,7 +236,25 @@
for kid in offspring:
self.debug("Setting state of offspring (%r) to %d", kid, value)
if kid[0] == object:
- self._state[kid] = False
+ (callable, oldstate) = self._state[kid]
+ self._state[kid] = (callable, False)
+ else:
+ # We set this to true. So perhaps we can progress!
+ kids = self._dag.getChildrenTyped(object, type)
+ for (kid, kidtype) in kids:
+ # For each of these we need to check that ALL the parents are
+ # now true before we can go further
+ # if reduce(lambda x,y: x and self._state[y][1], self._dag.getParentsTyped(kid, kidtype), True):
+ parents = self._dag.getParentsTyped(kid, kidtype)
+ ready = True
+ for parent in parents:
+ if not self._state[parent][1]:
+ ready = False
+ if ready:
+ self.debug("Calling callable %r",
+ self._state[(kid, kidtype)][0])
+ self._state[(kid,kidtype)][0](kid)
+
def setComponentStarted(self, component):
"""
Modified: flumotion/trunk/flumotion/manager/manager.py
==============================================================================
--- flumotion/trunk/flumotion/manager/manager.py (original)
+++ flumotion/trunk/flumotion/manager/manager.py Wed Jul 18 14:37:41 2007
@@ -473,7 +473,9 @@
def _updateFlowDependencies(self, state):
self.debug('registering dependencies of %r' % state)
- self._depgraph.addComponent(state)
+ self._depgraph.addComponent(state,
+ self.componentHeaven.setupComponent,
+ self.componentHeaven.startComponent)
conf = state.get('config')
@@ -484,7 +486,8 @@
self.debug("Using config: %r for component with avatarId %r", conf, componentAvatarId)
if componentAvatarId == conf['clock-master']:
- self._depgraph.addClockMaster(state)
+ self._depgraph.addClockMaster(state,
+ self.componentHeaven.setupClockMaster)
def _updateStateFromConf(self, _, conf, identity):
"""
@@ -1237,7 +1240,6 @@
self.debug('vishnu registering component %r' % componentAvatar)
state = componentAvatar.componentState
- self._depgraph.setJobStarted(state)
# If this is a reconnecting component, we might also need to set the
# component as started.
# If mood is happy or hungry, then the component is running.
@@ -1245,6 +1247,8 @@
if mood == moods.happy.value or mood == moods.hungry.value:
self.debug("Component %s is already in mood %s. Set depgraph "
"appropriately", componentAvatar.avatarId, moods.get(mood).name)
+ # TODO: Somehow freeze the depgraph so we don't follow all the links
+ # until these 2 or 3 things are all completed?
self._depgraph.setComponentSetup(state)
self._depgraph.setComponentStarted(state)
if self._depgraph.isAClockMaster(state):
@@ -1254,7 +1258,7 @@
self._depgraph.setClockMasterStarted(state)
self.debug('vishnu registered component %r' % componentAvatar)
- self.componentHeaven._tryWhatCanBeStarted()
+ self._depgraph.setJobStarted(state)
def unregisterComponent(self, componentAvatar):
# called when the component is logging out
More information about the flumotion-commit
mailing list