sebastien - in flumotion/branches/transcoder-1: . flumotion/common
flumotion/manager flumotion/twisted
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Tue May 8 11:12:04 CEST 2007
Author: sebastien
Date: Tue May 8 11:12:00 2007
New Revision: 4881
Modified:
flumotion/branches/transcoder-1/ChangeLog
flumotion/branches/transcoder-1/flumotion/common/config.py
flumotion/branches/transcoder-1/flumotion/common/errors.py
flumotion/branches/transcoder-1/flumotion/manager/admin.py
flumotion/branches/transcoder-1/flumotion/manager/manager.py
flumotion/branches/transcoder-1/flumotion/twisted/defer.py
Log:
2007-05-08 Sebastien Merle <sebastien at fluendo.com>
* flumotion/twisted/defer.py:
Merged from trunk.
* flumotion/common/errors.py:
* flumotion/common/config.py:
* flumotion/manager/manager.py:
* flumotion/manager/admin.py:
Added the loadComponenet method.
Modified: flumotion/branches/transcoder-1/ChangeLog
==============================================================================
--- flumotion/branches/transcoder-1/ChangeLog (original)
+++ flumotion/branches/transcoder-1/ChangeLog Tue May 8 11:12:00 2007
@@ -1,3 +1,13 @@
+2007-05-08 Sebastien Merle <sebastien at fluendo.com>
+
+ * flumotion/twisted/defer.py:
+ Merged from trunk.
+ * flumotion/common/errors.py:
+ * flumotion/common/config.py:
+ * flumotion/manager/manager.py:
+ * flumotion/manager/admin.py:
+ Added the loadComponenet method.
+
2007-03-30 Sebastien Merle <sebastien at fluendo.com>
* flumotion/common/enum.py:
Modified: flumotion/branches/transcoder-1/flumotion/common/config.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/common/config.py (original)
+++ flumotion/branches/transcoder-1/flumotion/common/config.py Tue May 8 11:12:00 2007
@@ -41,6 +41,137 @@
# all these string values should result in True
BOOL_TRUE_VALUES = ['True', 'true', '1', 'Yes', 'yes']
+def _check_fraction(value):
+ return (isinstance(value, tuple)
+ and (len(value) == 2)
+ and (isinstance(value[0], int) or isinstance(value[0], long))
+ and (isinstance(value[1], int) or isinstance(value[1], long))
+ and (value[1] > 0))
+
+def _make_type_checker(propertyType):
+ return lambda value: type(value) == propertyType
+
+_property_checkers = {'string': _make_type_checker(str),
+ 'rawstring': _make_type_checker(str),
+ 'int': _make_type_checker(int),
+ 'long': _make_type_checker(long),
+ 'bool': _make_type_checker(bool),
+ 'float': _make_type_checker(float),
+ 'fraction': _check_fraction}
+
+
+def _buildComponentConfig(parentName, componentType, componentName,
+ componentProperties, componentPlugs=None,
+ sources=None, isClockMaster=None,
+ flumotionVersion=None):
+ """
+ Build a componenet configuration dictionary.
+ Note that the parent entry is set to None,
+ """
+ # If we don't have a version at all, use the current version
+ if flumotionVersion:
+ version = flumotionVersion
+ assert isinstance(version, tuple) and (len(version) == 4)
+ else:
+ version = configure.versionTuple
+
+ config = { 'name': componentName,
+ 'parent': parentName,
+ 'type': componentType,
+ 'avatarId': common.componentId(parentName, componentName),
+ 'version': version,
+ 'clock-master': isClockMaster}
+
+ try:
+ defs = registry.getRegistry().getComponent(componentType)
+ except KeyError:
+ raise errors.UnknownComponentError(
+ "unknown component type: %s" % componentType)
+
+ # let the component know what its feeds should be called
+ config['feed'] = defs.getFeeders()
+
+ eaters = dict([(x.getName(), x) for x in defs.getEaters()])
+ # at this point we don't support assigning certain sources to
+ # certain eaters -- a problem to fix later. for now take the
+ # union of the properties.
+ required = [x for x in eaters.values() if x.getRequired()]
+ multiple = [x for x in eaters.values() if x.getMultiple()]
+
+ if sources == None:
+ sources = list()
+ if len(sources) == 0 and required:
+ raise ConfigError("Component %s wants to eat on %s, but no "
+ "source specified"
+ % (componentName, eaters.keys()[0]))
+ elif len(sources) > 1 and not multiple:
+ raise ConfigError("Component %s does not support multiple "
+ "sources feeding %s (%r)"
+ % (componentName, eaters.keys()[0], sources))
+ if sources:
+ config['source'] = sources
+
+ sockets = defs.getSockets()
+ if componentPlugs == None:
+ componentPlugs = list()
+ plugs = dict()
+ for socket in sockets:
+ plugs[socket] = []
+ for plug in componentPlugs:
+ if not plug.socket in sockets:
+ raise ConfigError("Component %s does not support "
+ "sockets of type %s"
+ % (componentName, plug['socket']))
+ plugs[plug.socket].append(plug)
+ config['plugs'] = plugs
+
+ properties = dict()
+ property_specs = dict([(x.name, x) for x in defs.getProperties()])
+ for name, value in componentProperties.items():
+ if not name in property_specs:
+ raise ConfigError('%s: unknown property %s' % (componentName, name))
+ definition = property_specs[name]
+ if not (definition.type in _property_checkers):
+ raise ConfigError("%s: unknown type '%s' for property %s"
+ % (componentName, definition.type, name))
+ checker = _property_checkers.get(definition.type)
+ if value and (isinstance(value, tuple) or isinstance(value, list)):
+ if not definition.multiple and len(nodes) > 1:
+ raise ConfigError("%s: multiple value specified but not "
+ "allowed for property %s"
+ % (componentName, name))
+ values = value
+ properties[name] = list(value)
+ else:
+ values = [value]
+ properties[name] = value
+ for item in values:
+ if not checker(item):
+ raise ConfigError("%s: invalid value '%s' for property %s"
+ % (componentName, str(item), name))
+
+ for name, definition in property_specs.items():
+ if definition.isRequired() and not name in componentProperties:
+ raise error("%s: required but unspecified property %s"
+ % (componentName, name))
+
+ config['properties'] = properties
+ return config
+
+
+def buildComponentConfig(parentName, componentType, componentName,
+ componentProperties, componentPlugs=None,
+ workerName=None, sources=None, isClockMaster=None,
+ flumotionVersion=None):
+ config = _buildComponentConfig(parentName, componentType, componentName,
+ componentProperties, componentPlugs,
+ sources, isClockMaster, flumotionVersion)
+ defs = registry.getRegistry().getComponent(componentType)
+ return ConfigEntryComponent(componentName, parentName, componentType,
+ config, defs, workerName)
+
+
+
class ConfigEntryComponent(log.Loggable):
"I represent a <component> entry in a planet config file"
nice = 0
@@ -383,7 +514,7 @@
ret[component.name] = component
return ret
-
+
def _parseComponent(self, node, parent, forManager=False):
"""
Parse a <component></component> block.
@@ -394,7 +525,7 @@
# <source>*
# <property name="name">value</property>*
# </component>
-
+
if not node.hasAttribute('name'):
raise ConfigError("<component> must have a name attribute")
if not node.hasAttribute('type'):
@@ -422,10 +553,6 @@
raise ComponentWorkerConfigError("<component> version not"
" parseable")
- # If we don't have a version at all, use the current version
- if not version:
- version = configure.versionTuple
-
type = str(node.getAttribute('type'))
name = str(node.getAttribute('name'))
if forManager:
@@ -433,23 +560,12 @@
else:
worker = str(node.getAttribute('worker'))
- # FIXME: flumotion-launch does not define parent, type, or
- # avatarId. Thus they don't appear to be necessary, like they're
- # just extra info for the manager or so. Figure out what's going
- # on with that. Also, -launch treats clock-master differently.
- config = { 'name': name,
- 'parent': parent,
- 'type': type,
- 'avatarId': common.componentId(parent, name),
- 'version': version
- }
-
try:
defs = registry.getRegistry().getComponent(type)
except KeyError:
raise errors.UnknownComponentError(
"unknown component type: %s" % type)
-
+
possible_node_names = ['source', 'clock-master', 'property',
'plugs']
for subnode in node.childNodes:
@@ -463,22 +579,19 @@
raise ConfigError("Invalid subnode of <component>: %s"
% subnode.nodeName)
- # let the component know what its feeds should be called
- config['feed'] = defs.getFeeders()
-
sources = self._parseSources(node, defs)
- if sources:
- config['source'] = sources
-
- config['clock-master'] = self._parseClockMaster(node)
- config['plugs'] = self._parsePlugs(node, defs.getSockets())
+ isClockMaster = self._parseClockMaster(node)
+ plugs = []
+ plugParsers = {'plug': (self.parsePlug, plugs.append)}
+ for subnode in node.childNodes:
+ if subnode.nodeName == 'plugs':
+ self.parseFromTable(subnode, plugParser)
- properties = defs.getProperties()
+ properties = self.parseProperties(node, defs.getProperties(),
+ lambda msg: ConfigError('%s: %s' % (name, str)))
- self.debug('Parsing component: %s' % name)
- def err(str):
- return ConfigError('%s: %s' % (name, str))
- config['properties'] = self.parseProperties(node, properties, err)
+ config = _buildComponentConfig(parent, type, name, properties, plugs,
+ sources, isClockMaster, version)
# fixme: all of the information except the worker is in the
# config dict: why?
Modified: flumotion/branches/transcoder-1/flumotion/common/errors.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/common/errors.py (original)
+++ flumotion/branches/transcoder-1/flumotion/common/errors.py Tue May 8 11:12:00 2007
@@ -162,6 +162,11 @@
args[1]: str
"""
+class ComponentDuplicatedError(ComponentError):
+ """
+ A component name is already used.
+ """
+
class ComponentCreateError(ComponentError):
"""
An error during creation of a component. Can be raised during a
Modified: flumotion/branches/transcoder-1/flumotion/manager/admin.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/manager/admin.py (original)
+++ flumotion/branches/transcoder-1/flumotion/manager/admin.py Tue May 8 11:12:00 2007
@@ -411,6 +411,32 @@
return res
+ def perspective_loadComponent(self, componentType, componentId,
+ componentProperties, workerName,
+ sources=None, isClockMaster=None):
+ """
+ Load a component into the manager configuration.
+ Returns a deferred that will be call with a LoadingOutcome value.
+
+ @param componentType: the registered type of the componenet to be added
+ @type componentType: str
+ @param componentId: the identifier of the component to add, should be
+ created by the function flumotion.common.componentId.
+ @type componentId: str
+ @param componentProperties: the properties of the component to be added
+ @type componentProperties: dict of str => str
+ @param workerName: the compName of the worker where the added
+ component should run.
+ @type workerName: str
+ @param sources: the stream sources of the componenet to be added.
+ @type sources: list of str
+ @param isClockMaster: if the componenet to be added is clock master.
+ @type isClockMaster: bool
+ """
+ return self.vishnu.loadComponent(self.remoteIdentity, componentType,
+ componentId, componentProperties,
+ workerName, sources, isClockMaster)
+
def perspective_deleteFlow(self, flowName):
return self.vishnu.deleteFlow(flowName)
Modified: flumotion/branches/transcoder-1/flumotion/manager/manager.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/manager/manager.py (original)
+++ flumotion/branches/transcoder-1/flumotion/manager/manager.py Tue May 8 11:12:00 2007
@@ -502,7 +502,7 @@
return added
- def _startComponents(self, components, conf, identity):
+ def _startComponents(self, components, identity):
# now start all components that need starting -- collecting into
# an temporary dict of the form {workerId => [components]}
componentsToStart = {}
@@ -521,7 +521,7 @@
d = self._makeBouncer(conf, identity)
d.addCallback(self._addManagerPlugs, conf, identity)
d.addCallback(self._updateStateFromConf, conf, identity)
- d.addCallback(self._startComponents, conf, identity)
+ d.addCallback(self._startComponents, identity)
return d
def loadConfigurationXML(self, file, identity):
@@ -542,6 +542,96 @@
conf.parse()
return self._loadConfiguration(conf, identity)
+ def loadComponent(self, identity, componentType, componentId,
+ componentProperties, workerName,
+ sources=None, isClockMaster=None):
+ """
+ Load a component into the manager configuration.
+ Returns a deferred that will be call with a LoadingOutcome value.
+
+ @param identity: The identity making this request. This is used by the
+ adminaction logging mechanism in order to say who is
+ performing the action.
+ @type identity: L{flumotion.common.identity.Identity}
+ @param componentType: the registered type of the componenet to be added
+ @type componentType: str
+ @param componentId: the identifier of the component to add, should be
+ created by the function flumotion.common.componentId.
+ @type componentId: str
+ @param componentProperties: the properties of the component to be added
+ @type componentProperties: dict of str => str
+ @param workerName: the name of the worker where the added
+ component should run.
+ @type workerName: str
+ @param sources: the stream sources of the componenet to be added.
+ @type sources: list of str
+ @param isClockMaster: if the componenet to be added is clock master.
+ @type isClockMaster: bool
+ @return a deferred
+ """
+ self.debug('loading %s component %s on %s',
+ componentType, componentId, workerName)
+ parentName, compName = common.parseComponentId(componentId)
+
+ if not worker:
+ message = ("A worker name should be specified "
+ "to load component %s" % compName)
+ self.debug(message)
+ raise ComponentWorkerConfigError(message)
+
+ state = self.state
+ compState = None
+ compConf = config.buildComponentConfig(parentName, componentType,
+ compName, componentProperties,
+ None, workerName, sources,
+ isClockMaster)
+ if parentName == 'atmosphere':
+ atmosphere = state.get('atmosphere')
+ components = [x.get('name') for x in atmosphere.get('components')]
+ if compName in components:
+ message = 'Atmosphere already has component %s' % compName
+ self.debug(message)
+ raise errors.ComponentDuplicatedError(message)
+ else:
+ compState = self._addComponent(compConf, atmosphere, identity)
+ else:
+ flows = dict([(x.get('name'), x) for x in state.get('flows')])
+ try:
+ flow = flows[parentName]
+ self.debug('Checking existing flow %s' % parentName)
+ except KeyError:
+ self.info('Creating flow "%s"' % parentName)
+ flow = planet.ManagerFlowState(name=parentName, parent=state)
+ state.append('flows', flow)
+
+ components = [x.get('name') for x in flow.get('components')]
+ if compName in components:
+ message = ('Component %s already in flow %s'
+ % (compName, parentName))
+ self.debug(message)
+ raise errors.ComponentDuplicatedError(message)
+ else:
+ compState = self._addComponent(compConf, flow, identity)
+
+ assert compState != None
+ self._updateFlowDependencies(compState)
+
+ try:
+ self._depgraph.mapEatersToFeeders()
+ except errors.ComponentConfigError, e:
+ state = e.args[0]
+ debug = e.args[1]
+ message = messages.Error(T_(
+ N_("The component is misconfigured.")),
+ debug=debug)
+ state.append('messages', message)
+ state.setMood(moods.sad.value)
+ raise e
+
+ self._startComponents([compState], identity)
+
+ return defer.succeed(compState)
+
def _createHeaven(self, interface, klass):
"""
Create a heaven of the given klass that will send avatars to clients
Modified: flumotion/branches/transcoder-1/flumotion/twisted/defer.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/twisted/defer.py (original)
+++ flumotion/branches/transcoder-1/flumotion/twisted/defer.py Tue May 8 11:12:00 2007
@@ -85,7 +85,7 @@
def errback(failure, d):
def raise_error():
- # failure.type will be the exception class for local
+ # failure.parents[-1] will be the exception class for local
# failures and the string name of the exception class
# for remote failures (which might not exist in our
# namespace)
@@ -99,10 +99,10 @@
# exception class is in our namespace, and it only takes
# one string argument. if either condition is not true,
# we wrap the strings in a default Exception.
- k, v = failure.type, failure.value
- if isinstance(k, str):
- k = reflect.namedClass(k)
+ k, v = failure.parents[-1], failure.value
try:
+ if isinstance(k, str):
+ k = reflect.namedClass(k)
if isinstance(v, tuple):
e = k(*v)
else:
More information about the flumotion-commit
mailing list