arek - in flumotion/trunk: . flumotion/test
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Fri Jul 20 16:12:57 CEST 2007
Author: arek
Date: Fri Jul 20 16:12:54 2007
New Revision: 5333
Added:
flumotion/trunk/flumotion/test/pond.py
flumotion/trunk/flumotion/test/test_pond.py
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/test/Makefile.am
Log:
* flumotion/test/pond.py:
* flumotion/test/test_pond.py:
* flumotion/test/Makefile.am:
Add a component test helper class.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Fri Jul 20 16:12:54 2007
@@ -1,3 +1,10 @@
+2007-07-20 Arek Korbik <arkadini at gmail.com>
+
+ * flumotion/test/pond.py:
+ * flumotion/test/test_pond.py:
+ * flumotion/test/Makefile.am:
+ Add a component test helper class.
+
2007-07-20 Pedro Gracia <pedro at flumotion.com>
* flumotion/wizard/steps.py:
Modified: flumotion/trunk/flumotion/test/Makefile.am
==============================================================================
--- flumotion/trunk/flumotion/test/Makefile.am (original)
+++ flumotion/trunk/flumotion/test/Makefile.am Fri Jul 20 16:12:54 2007
@@ -8,6 +8,7 @@
__init__.py \
common.py \
testclasses.py \
+ pond.y \
gtkunit.py \
realm.py
@@ -56,6 +57,7 @@
test_parts.py \
test_pb.py \
test_pbstream.py \
+ test_pond.py \
test_porter.py \
test_reflect.py \
test_registry.py \
Added: flumotion/trunk/flumotion/test/pond.py
==============================================================================
--- (empty file)
+++ flumotion/trunk/flumotion/test/pond.py Fri Jul 20 16:12:54 2007
@@ -0,0 +1,497 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+__all__ = ['Pond', 'PondUnitTestMixin', 'cleanup_reactor',
+ 'pipeline_src', 'pipeline_cnv']
+
+import common
+
+import os
+import tempfile
+import new
+import sys
+
+from twisted.internet import gtk2reactor
+HAVE_GTK2REACTOR = True
+try:
+ gtk2reactor.install(useGtk=False)
+except AssertionError:
+ if not isinstance(sys.modules['twisted.internet.reactor'],
+ gtk2reactor.Gtk2Reactor):
+ HAVE_GTK2REACTOR = False
+
+from twisted.python import failure
+from twisted.internet import reactor, defer, interfaces
+from twisted.web import client, error
+
+from flumotion.common import registry, log, errors, common
+from flumotion.component.producers.pipeline.pipeline import Producer
+from flumotion.component.converters.pipeline.pipeline import Converter
+from flumotion.twisted import flavors
+
+
+class PondException(Exception):
+ pass
+
+class WrongReactor(PondException):
+ pass
+
+class StartTimeout(PondException):
+ pass
+
+class FlowTimeout(PondException):
+ pass
+
+class StopTimeout(PondException):
+ pass
+
+
+class ComponentWrapper(object, log.Loggable):
+ logCategory = 'pond-compwrap'
+ _u_name_cnt = 0
+ _registry = None
+
+ def __init__(self, type_, class_, props=None, name=None, plugs=None,
+ cfg=None):
+ self.comp = None
+ self.comp_class = class_
+ if cfg is None:
+ cfg = {}
+ self.cfg = cfg
+ self.auto_link = True
+ self.debug_msgs = []
+
+ self.sync = None
+ self.sync_master = None
+
+ if ComponentWrapper._registry is None:
+ ComponentWrapper._registry = registry.getRegistry()
+
+ cfg['type'] = type_
+ reg = ComponentWrapper._registry.getComponent(type_)
+
+ if not cfg.has_key('source'):
+ cfg['source'] = []
+
+ if not cfg.has_key('eater'):
+ cfg['eater'] = dict([(e.getName(), []) for e in reg.getEaters()
+ if e.getRequired()])
+
+ if not cfg.has_key('feed'):
+ cfg['feed'] = reg.getFeeders()[:]
+
+ if plugs is not None:
+ cfg['plugs'] = plugs
+ if not cfg.has_key('plugs'):
+ cfg['plugs'] = dict([(s, []) for s in reg.getSockets()])
+
+ if name:
+ cfg['name'] = name
+ if not cfg.has_key('name'):
+ cfg['name'] = ComponentWrapper.get_unique_name()
+ self.name = cfg['name']
+
+ if not cfg.has_key('parent'):
+ cfg['parent'] = 'default'
+
+ if not cfg.has_key('avatarId'):
+ cfg['avatarId'] = common.componentId(cfg['parent'], self.name)
+
+ if props is not None:
+ cfg['properties'] = props
+ if not cfg.has_key('properties'):
+ cfg['properties'] = {}
+
+ if not cfg.has_key('clock-master'):
+ cfg['clock-master'] = None
+
+ self.sync_master = cfg['clock-master']
+
+ if reg.getNeedsSynchronization():
+ self.sync = reg.getClockPriority()
+
+ def __repr__(self):
+ return '%s(%r, %r)' % (self.__class__.__name__,
+ self.comp_class.__name__, self.cfg)
+
+ @classmethod
+ def get_unique_name(cls, prefix='cmp-'):
+ name, cls._u_name_cnt = ('%s%d' % (prefix, cls._u_name_cnt),
+ cls._u_name_cnt + 1)
+ return name
+
+ def instantiate(self):
+ self.comp = self.comp_class()
+ self.debug('instantiate:: %r' % self.comp.state)
+ def append(instance, key, value):
+ self.debug('append %r: %r' % (value.level, value))
+ if key == 'messages':
+ if value.debug:
+ self.debug('proxied state debug:: %r' % value.debug)
+ self.debug_msgs.append(value.debug)
+ flavors.StateCacheable.append(instance, key, value)
+ self.comp.state.append = new.instancemethod(append, self.comp.state)
+
+ def setup(self):
+ if self.comp is None:
+ self.instantiate()
+ return self.comp.setup(self.cfg)
+
+ def feed(self, sink_comp, links=None):
+ if links is None:
+ links = [('default', 'default')]
+ sink_name = sink_comp.name
+ for feeder, eater in links:
+ if feeder not in self.cfg['feed']:
+ raise PondException('Invalid feeder specified: %r' % feeder)
+ #self.cfg['feed'].append('%s:%s' % (sink_name, eater))
+ sink_comp.add_feeder(self, '%s:%s' % (self.name, feeder), eater)
+ #self.auto_link = False
+
+ def add_feeder(self, src_comp, feeder_name, eater):
+ self.cfg['source'].append(feeder_name)
+ self.cfg['eater'].setdefault(eater, []).append(feeder_name)
+ self.auto_link = False
+
+ def feedToFD(self, feedName, fd, eaterId=None):
+ self.debug('feedToFD(feedName=%s, %d (%s))' % (feedName, fd, eaterId))
+ return self.comp.feedToFD(feedName, fd, os.close, eaterId)
+
+ def eatFromFD(self, feedId, fd):
+ self.debug('eatFromFD(feedId=%s, %d)' % (feedId, fd))
+ return self.comp.eatFromFD(feedId, fd)
+
+ def start(self, *args, **kwargs):
+ self.debug('start(*%r, **%r)' % (args, kwargs))
+ d = self.comp.start(*args, **kwargs)
+ d.addCallback(lambda _: (self.debug('after start: %r' % _), _)[1])
+ return d
+
+ def stop(self, *args, **kwargs):
+ self.debug('stop(*%r, **%r)' % (args, kwargs))
+ if self.comp:
+ return self.comp.stop(*args, **kwargs)
+ return defer.succeed(None)
+
+
+class Pond(object, log.Loggable):
+ logCategory = 'pond'
+
+ guard_timeout = 60.0
+ guard_delay = 0.5
+ start_delay = None
+
+ def __init__(self):
+ self._comps = []
+ self._byname = {}
+ self._master = None
+ self._syncing = None
+
+ def set_flow(self, comp_chain, auto_link=True):
+ self._comps = []
+
+ if len(comp_chain) == 0:
+ return
+
+ # given a sequence (a0, a1, a2, ..., aN-1, aN)
+ # this generator produces: (a0, a1), (a1, a2), ..., (aN-1, aN)
+ def pair_chain(seq):
+ car = seq[0]
+ for cdr in seq[1:]:
+ yield car, cdr
+ car = cdr
+
+ if auto_link:
+ for c_src, c_sink in pair_chain(comp_chain):
+ if c_sink.auto_link:
+ c_src.feed(c_sink)
+
+ self._comps.extend(comp_chain)
+
+ masters = [c for c in self._comps if c.sync_master is not None]
+ need_sync = sorted([c for c in self._comps if c.sync is not None],
+ key=(lambda e: e.sync), reverse=True)
+
+ if need_sync:
+ if masters:
+ master = master[0]
+ else:
+ master = need_sync[0]
+
+ master.sync = None # ...? :/
+ self._master = master
+
+ master = master.cfg['avatarId']
+ for c in need_sync:
+ c.cfg['clock-master'] = master
+ elif masters:
+ for c in masters:
+ c.cfg['clock-master'] = None
+
+ for c in self._comps:
+ self._byname[c.name] = c
+ c.log('updated config for %r: %r' % (c, c.cfg))
+
+ def _make_pipes(self):
+ fds = {}
+ def feed_starter(c, feed_name, w_fd, feed_id):
+ def _feed_starter():
+ self.debug('_feed_starter: %r, %r' % (feed_name, feed_id))
+ return c.feedToFD(feed_name, w_fd, eaterId=feed_id)
+ return _feed_starter
+ for c in self._comps:
+ eaters = c.cfg['eater']
+ for eater_id in eaters:
+ for src in eaters[eater_id]:
+ e_name, e_feed = src.split(':')
+ self.debug('creating pipe: %r, %r, %r' %
+ (src, e_feed, eater_id))
+ r_fd, w_fd = os.pipe()
+ fds[src] = (r_fd, feed_starter(self._byname[e_name],
+ e_feed, w_fd, eater_id))
+ self._fds = fds
+
+ def start_flow(self):
+ delay = self.start_delay
+
+ def all_ready_p(results):
+ self.debug('** 1: all_ready_p: %r' % results)
+ pass
+ def setup_failed(failure):
+ self.info('*! 1: setup_failed: %r' % (failure,))
+ failure.trap(defer.FirstError)
+ return failure.value.subFailure
+ def start_master_clock(_):
+ self.debug('** 2: start_master_clock: %r (%r)' % (_, self._master))
+ if self._master is not None:
+ self.debug('About to ask to provide_master_clock()...')
+ d = self._master.comp.provide_master_clock(7600 - 1) # ...hack?
+ def _jd(_):
+ self.debug('After provide_master_clock() : %r' % (_,))
+ d.addCallback(_jd)
+ return d
+ return None
+ def add_delay(value):
+ self.debug('** 3: add_delay: %r' % value)
+ if delay:
+ return DeferredDelay(delay, value)
+ return defer.succeed(delay)
+ def do_start(clocking, c):
+ self.debug('** 4: do_start_cb: %r, %r' % (clocking, c))
+ for src in c.cfg['source']:
+ r_fd, feed_starter = self._fds[src]
+ c.eatFromFD(src, r_fd)
+ feed_starter()
+ if not c.sync:
+ clocking = None
+ d = c.start(clocking)
+ d.addCallback(lambda _: clocking)
+ return d
+ def do_stop(failure):
+ self.debug('** X: do_stop: %r' % failure)
+ for c in reversed(self._comps):
+ c.stop()
+ return failure
+
+ self._make_pipes()
+
+ self.debug('About to start the flow...')
+ # P(ossible)TODO: make it report setup failures in all the
+ # components, not only in the first to fail...?
+ d = defer.DeferredList([c.setup() for c in self._comps],
+ fireOnOneErrback=1, consumeErrors=1)
+ d.addCallbacks(all_ready_p, setup_failed)
+ d.addCallback(start_master_clock)
+ for c in self._comps:
+ d.addCallback(add_delay)
+ d.addCallback(do_start, c)
+ d.addErrback(do_stop)
+ return d
+
+ def stop_flow(self):
+ d = defer.DeferredList([c.stop() for c in reversed(self._comps)],
+ fireOnOneErrback=1, consumeErrors=1)
+ def stop_flow_report(results):
+ self.debug('stop_flow_report: %r' % (results,))
+ return results
+ def stop_flow_failed(failure):
+ self.info('stop_flow_failed: %r' % (failure,))
+ failure.trap(defer.FirstError)
+ self.info('stop_flow_failed! %r' % (failure.value.subFailure,))
+ return failure.value.subFailure
+ d.addCallbacks(stop_flow_report, stop_flow_failed)
+ return d
+
+ def run_flow(self, duration, tasks=None,
+ start_d=None, start_flow=True, stop_flow=True):
+ if not HAVE_GTK2REACTOR:
+ raise WrongReactor("gtk2reactor isn't available")
+
+ self.debug('run_flow: tasks: %r' % (tasks,))
+ flow_d = start_d
+
+ if tasks is None:
+ tasks = []
+
+ if flow_d is None:
+ if start_flow:
+ flow_d = self.start_flow()
+ else:
+ flow_d = defer.succeed(True)
+
+ flow_started_finished = [False, False]
+
+ guard_d = None
+ timeout_d = defer.Deferred()
+ stop_d = defer.Deferred()
+ stop_timeout_d = defer.Deferred()
+ chained_d = None
+ immediate_d = None
+
+ callids = [None, None, None] # callLater ids: stop_d,
+ # timeout_d, fire_chained
+
+ if tasks:
+ # if have tasks, run simultaneously with the main timer deferred
+ chained_d = defer.DeferredList([stop_d] + tasks,
+ fireOnOneErrback=1, consumeErrors=1)
+ def chained_failed(failure):
+ self.info('chained_failed: %r' % (failure,))
+ failure.trap(defer.FirstError)
+ return failure.value.subFailure
+ chained_d.addErrback(chained_failed)
+ else:
+ # otherwise, just idle...
+ chained_d = stop_d
+
+ def start_complete(result):
+ self.debug('start_complete: %r' % (result,))
+ flow_started_finished[0] = True
+ callids[0] = reactor.callLater(duration, stop_d.callback, None)
+ if tasks:
+ def _fire_chained():
+ callids[2] = None
+ for t in tasks:
+ try:
+ t.callback(result)
+ except defer.AlreadyCalledError:
+ pass
+ callids[2] = reactor.callLater(0, _fire_chained)
+ return chained_d
+
+ def flow_complete(result):
+ self.debug('flow_complete: %r' % (result,))
+ flow_started_finished[1] = True
+ return result
+
+ def flow_timed_out():
+ self.debug('flow_timed_out!')
+ if not flow_started_finished[0]:
+ timeout_d.errback(StartTimeout('flow start timed out'))
+ elif not flow_started_finished[1]:
+ timeout_d.errback(FlowTimeout('flow run timed out'))
+ else:
+ stop_timeout_d.errback(StopTimeout('flow stop timed out'))
+
+ def clean_calls(result):
+ self.debug('clean_calls: %r' % (result,))
+ for i, cid in enumerate(callids):
+ if cid is not None:
+ if cid.active():
+ cid.cancel()
+ callids[i] = None
+ return result
+
+ flow_d.addCallbacks(start_complete)
+ flow_d.addCallback(flow_complete)
+
+ guard_d = defer.DeferredList([flow_d, timeout_d], consumeErrors=1,
+ fireOnOneErrback=1, fireOnOneCallback=1)
+
+ def guard_failed(failure):
+ self.info('guard_failed: %r' % (failure,))
+ failure.trap(defer.FirstError)
+ return failure.value.subFailure
+ if stop_flow:
+ def _force_stop_flow(result):
+ self.debug('_force_stop_flow: %r' % (result,))
+ d = defer.DeferredList([self.stop_flow(), stop_timeout_d],
+ fireOnOneErrback=1, fireOnOneCallback=1,
+ consumeErrors=1)
+ def _return_orig_result(stop_result):
+ if isinstance(result, failure.Failure):
+ # always return the run's failure first
+ # what do I return if both the run and stop failed?
+ self.debug('_return_orig[R]: %r' % (result,))
+ return result
+ elif isinstance(stop_result, failure.Failure):
+ # return failure from stop
+ self.debug('_return_orig[S]: %r' % (stop_result,))
+ return stop_result
+ return result
+ def force_stop_failed(failure):
+ self.info('force_stop_failed: %r' % (failure,))
+ failure.trap(defer.FirstError)
+ return failure.value.subFailure
+ d.addCallbacks(lambda r: r[0][1], force_stop_failed)
+ d.addBoth(_return_orig_result)
+ return d
+ guard_d.addBoth(_force_stop_flow)
+
+ guard_d.addErrback(guard_failed)
+ guard_d.addBoth(clean_calls)
+
+ callids[1] = reactor.callLater(self.guard_timeout, flow_timed_out)
+ return guard_d
+
+class PondUnitTestMixin:
+ if not HAVE_GTK2REACTOR:
+ skip = 'gtk2reactor is required for this test case'
+
+def cleanup_reactor(force=False):
+ if not HAVE_GTK2REACTOR and not force:
+ return
+ log.debug('pond', 'running cleanup_reactor...')
+ delayed = reactor.getDelayedCalls()
+ for dc in delayed:
+ dc.cancel()
+ # the rest is taken from twisted trial...
+ sels = reactor.removeAll()
+ if sels:
+ log.info('pond', 'leftover selectables...: %r %r' %
+ (sels, reactor.waker))
+ for sel in sels:
+ if interfaces.IProcessTransport.providedBy(sel):
+ sel.signalProcess('KILL')
+
+def pipeline_src(pipelinestr='fakesrc datarate=1024 is-live=true ! '
+ 'video/x-raw-rgb,framerate=(fraction)8/1,width=32,height=24'):
+ fs_name = ComponentWrapper.get_unique_name('ppln-src-')
+
+ return ComponentWrapper('pipeline-producer', Producer, name=fs_name,
+ props={'pipeline': pipelinestr})
+
+def pipeline_cnv(pipelinestr='identity'):
+ fs_name = ComponentWrapper.get_unique_name('ppln-cnv-')
+
+ return ComponentWrapper('pipeline-converter', Converter, name=fs_name,
+ props={'pipeline': pipelinestr})
+
Added: flumotion/trunk/flumotion/test/test_pond.py
==============================================================================
--- (empty file)
+++ flumotion/trunk/flumotion/test/test_pond.py Fri Jul 20 16:12:54 2007
@@ -0,0 +1,468 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+from twisted.trial import unittest
+
+import common
+
+import sys, time, os
+
+from twisted.python import failure
+from twisted.internet import defer, interfaces, reactor, gtk2reactor
+from twisted.web import client, error
+
+from flumotion.common import log, errors
+from flumotion.common.planet import moods
+
+from flumotion.test import pond
+
+from flumotion.component.producers.pipeline.pipeline import Producer
+from flumotion.component.converters.pipeline.pipeline import Converter
+
+class PondTestCase(log.Loggable, unittest.TestCase, pond.PondUnitTestMixin):
+ logCategory = 'pond-test'
+
+class TestPondGtk2Reactorness(unittest.TestCase):
+ def test_mixin_class(self):
+ class TestPondUnitTestMixin(unittest.TestCase, pond.PondUnitTestMixin):
+ pass
+ if not isinstance(sys.modules['twisted.internet.reactor'],
+ gtk2reactor.Gtk2Reactor):
+ # not running with a gtk2reactor, TestPondUnitTestMixin
+ # class should have .skip attribute
+ self.failUnless(hasattr(TestPondUnitTestMixin, 'skip'),
+ "PondUnitTestMixin doesn't set .skip correctly.")
+ else:
+ self.failIf(hasattr(TestPondUnitTestMixin, 'skip'),
+ "PondUnitTestMixin sets .skip incorrectly.")
+
+ def test_have_gtk2reactor(self):
+ if not isinstance(sys.modules['twisted.internet.reactor'],
+ gtk2reactor.Gtk2Reactor):
+ # not running with a gtk2reactor, pond.HAVE_GTK2REACTOR
+ # should be False
+ self.failUnlessEquals(pond.HAVE_GTK2REACTOR, False)
+ else:
+ self.failIfEquals(pond.HAVE_GTK2REACTOR, False)
+
+class TestComponentWrapper(unittest.TestCase):
+ def test_get_unique_name(self):
+ self.failIfEquals(pond.ComponentWrapper.get_unique_name(),
+ pond.ComponentWrapper.get_unique_name())
+
+
+ def test_invalid_type(self):
+ self.failUnlessRaises(errors.UnknownComponentError,
+ pond.ComponentWrapper, 'invalid-comp-type',
+ None)
+
+ def test_valid_type(self):
+ cw = pond.ComponentWrapper('pipeline-producer', None, name='pp')
+ self.failUnlessEquals(cw.cfg,
+ {'feed': ['default'], 'name': 'pp',
+ 'parent': 'default', 'clock-master': None,
+ 'avatarId': '/default/pp', 'eater': {},
+ 'source': [], 'plugs': {}, 'properties': {},
+ 'type': 'pipeline-producer'})
+
+
+ def test_simple_link(self):
+ pp = pond.ComponentWrapper('pipeline-producer', None, name='pp')
+ pc = pond.ComponentWrapper('pipeline-converter', None)
+
+ pp.feed(pc)
+ self.failUnlessEquals(pc.cfg['source'], ['pp:default'])
+ self.failUnlessEquals(pc.cfg['eater'], {'default': ['pp:default']})
+
+ def test_non_default_link(self):
+ fwp = pond.ComponentWrapper('firewire-producer', None, name='fwp')
+ pc = pond.ComponentWrapper('pipeline-converter', None, name='pc')
+
+ # this should raise an exception - firewire-producer doesn't
+ # have a default feeder
+ self.failUnlessRaises(pond.PondException, fwp.feed, pc)
+
+ fwp.feed(pc, [('video', 'default')])
+ fwp.feed(pc, [('audio', 'default')])
+
+ self.failUnlessEquals(pc.cfg['source'], ['fwp:video', 'fwp:audio'])
+ self.failUnlessEquals(pc.cfg['eater'],
+ {'default': ['fwp:video', 'fwp:audio']})
+
+
+ def test_instantiate_and_setup_errors(self):
+ pp = pond.ComponentWrapper('pipeline-producer', None, name='pp')
+ self.failUnlessRaises(TypeError, pp.instantiate) # None()!?
+ from flumotion.component.producers.pipeline import pipeline
+ pp = pond.ComponentWrapper('pipeline-producer', Producer, name='pp')
+
+ pp.instantiate()
+ d = pp.setup()
+
+ # the deferred should fail (no mandatory pipeline property) -
+ # stop the component in any case (to clean the reactor) and
+ # passtrough the result/failure
+ d.addBoth(lambda rf: (pp.stop(), rf)[1])
+ return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+ def test_setup_pipeline_error(self):
+ from flumotion.component.producers.pipeline import pipeline
+ pp = pond.ComponentWrapper('pipeline-producer', Producer,
+ name='pp', props={'pipeline': 'fakesink'})
+
+ pp.instantiate()
+ # we're going to fail in gst - make sure the gst logger is silent
+ import gst
+ old_debug_level = gst.debug_get_default_threshold()
+ gst.debug_set_default_threshold(gst.LEVEL_NONE)
+
+ d = pp.setup()
+
+ # the deferred should fail (the only pipeline element doesn't
+ # have source pads) - stop the component in any case (to clean
+ # the reactor) and passtrough the result/failure
+
+ d.addBoth(lambda rf: (pp.stop(), rf)[1])
+ if old_debug_level != gst.LEVEL_NONE:
+ def _restore_gst_debug_level(rf):
+ gst.debug_set_default_threshold(old_debug_level)
+ return rf
+ d.addBoth(_restore_gst_debug_level)
+ return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+ def test_setup_and_stop(self):
+ from flumotion.component.producers.pipeline import pipeline
+ pp = pond.ComponentWrapper('pipeline-producer', Producer,
+ name='pp', props={'pipeline': 'fakesrc'})
+
+ pp.instantiate()
+ d = pp.setup()
+
+ d.addCallback(lambda _: pp.stop())
+ return d
+
+class TestPondSetup(PondTestCase):
+ def setUp(self):
+ self.prod = pond.pipeline_src()
+ self.cnv1 = pond.pipeline_cnv()
+ self.cnv2 = pond.pipeline_cnv()
+ self.components = [self.prod, self.cnv1, self.cnv2]
+
+ self.p = pond.Pond()
+
+ def tearDown(self):
+ return defer.DeferredList([c.stop() for c in self.components])
+
+
+ def test_auto_linking(self):
+ # the components should be linked automatically
+ # [prod:default] --> [default:cnv1:default] --> [default:cnv2]
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+
+ prod_feed = '%s:%s' % (self.prod.name, self.prod.cfg['feed'][0])
+ self.failUnlessEquals([prod_feed], self.cnv1.cfg['source'])
+ self.failUnlessEquals({'default': [prod_feed]}, self.cnv1.cfg['eater'])
+
+ cnv1_feed = '%s:%s' % (self.cnv1.name, self.cnv1.cfg['feed'][0])
+ self.failUnlessEquals([cnv1_feed], self.cnv2.cfg['source'])
+ self.failUnlessEquals({'default': [cnv1_feed]}, self.cnv2.cfg['eater'])
+
+ def test_dont_auto_link_linked(self):
+ p2 = pond.pipeline_src()
+ self.components.append(p2)
+
+ p2.feed(self.cnv1)
+ self.prod.auto_link = False
+
+ # [ p2:default] --> [default:cnv1], set explicitly
+ # no p2 --> prod, explicitly prohibited
+ # [prod:default] --> [default:cnv2]
+ self.p.set_flow([p2, self.prod, self.cnv2, self.cnv1])
+
+ prod_feed = '%s:%s' % (p2.name, p2.cfg['feed'][0])
+ self.failUnlessEquals([prod_feed], self.cnv1.cfg['source'])
+ self.failUnlessEquals({'default': [prod_feed]}, self.cnv1.cfg['eater'])
+
+ self.failUnlessEquals([], self.prod.cfg['source'])
+ self.failUnlessEquals({}, self.prod.cfg['eater'])
+
+ prod_feed = '%s:%s' % (self.prod.name, self.prod.cfg['feed'][0])
+ self.failUnlessEquals([prod_feed], self.cnv2.cfg['source'])
+ self.failUnlessEquals({'default': [prod_feed]}, self.cnv2.cfg['eater'])
+
+ def test_master_clock(self):
+ p2 = pond.pipeline_src()
+ self.components.append(p2)
+
+ p2.feed(self.cnv1)
+ self.prod.feed(self.cnv1)
+ self.cnv1.feed(self.cnv2)
+
+ self.p.set_flow([self.prod, p2, self.cnv1, self.cnv2], auto_link=False)
+
+ # both prod and p2 require a clock, only one should provide it
+ self.failUnlessEquals(self.prod.cfg['clock-master'],
+ p2.cfg['clock-master'])
+ self.failUnlessEquals(self.cnv1.cfg['clock-master'], None)
+ self.failUnlessEquals(self.cnv2.cfg['clock-master'], None)
+
+ master = self.prod
+ slave = p2
+ if master.cfg['clock-master'] != master.cfg['avatarId']:
+ slave, master = master, slave
+
+ # the master-clock component should provide a clock, and not
+ # require an external clock source, as opposed the the slave
+ self.failUnlessEquals(master.sync, None)
+ self.failIfEquals(slave.sync, None)
+
+class TestPondFlow(PondTestCase):
+ def setUp(self):
+ self.duration = 2.0
+
+ prod_pp = ('videotestsrc is-live=true ! '
+ 'video/x-raw-rgb,framerate=(fraction)8/1,'
+ 'width=32,height=24')
+ self.prod = pond.pipeline_src(prod_pp)
+
+ self.cnv1 = pond.pipeline_cnv()
+ self.cnv2 = pond.pipeline_cnv()
+
+ self.p = pond.Pond()
+
+ def tearDown(self):
+ d = self.p.stop_flow()
+
+ # add cleanup, otherwise components a.t.m. don't cleanup after
+ # themselves too well, remove when fixed
+ d.addBoth(lambda _: pond.cleanup_reactor())
+ return d
+
+
+ def test_setup_fail_gst_linking(self):
+ p2 = pond.pipeline_src('fakesink') # this just can't work!
+ c2 = pond.pipeline_cnv('fakesink') # and neither can this!
+
+ # we're going to fail in gst - make sure the gst logger is silent
+ import gst
+ old_debug_level = gst.debug_get_default_threshold()
+ gst.debug_set_default_threshold(gst.LEVEL_NONE)
+
+ self.p.set_flow([p2, c2, self.cnv1])
+ d = self.p.start_flow()
+
+ if old_debug_level != gst.LEVEL_NONE:
+ def _restore_gst_debug_level(rf):
+ gst.debug_set_default_threshold(old_debug_level)
+ return rf
+ d.addBoth(_restore_gst_debug_level)
+ return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+ def test_setup_started_and_happy(self):
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+ d = self.p.start_flow()
+ def check_happy(_):
+ for c in (self.prod, self.cnv1, self.cnv2):
+ self.assertEquals(moods.get(c.comp.getMood()), moods.happy)
+ return _
+ d.addCallback(check_happy)
+ return d
+
+ def test_run_fail_gst_linking(self):
+ p2 = pond.pipeline_src('fakesink') # this just can't work!
+ c2 = pond.pipeline_cnv('fakesink') # and neither can this!
+
+ # we're going to fail in gst - make sure the gst logger is silent
+ import gst
+ old_debug_level = gst.debug_get_default_threshold()
+ gst.debug_set_default_threshold(gst.LEVEL_NONE)
+
+ self.p.set_flow([p2, c2, self.cnv1])
+ d = self.p.run_flow(self.duration)
+
+ if old_debug_level != gst.LEVEL_NONE:
+ def _restore_gst_debug_level(rf):
+ gst.debug_set_default_threshold(old_debug_level)
+ return rf
+ d.addBoth(_restore_gst_debug_level)
+ return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+ def test_run_start_timeout(self):
+ start_delay_time = 5.0
+ self.p.guard_timeout = 2.0
+ class LingeringCompWrapper(pond.ComponentWrapper):
+ def start(self, *a, **kw):
+ d = pond.ComponentWrapper.start(self, *a, **kw)
+ def delay_start(result):
+ dd = defer.Deferred()
+ reactor.callLater(start_delay_time, dd.callback, result)
+ return dd
+ d.addCallback(delay_start)
+ return d
+ c2 = LingeringCompWrapper('pipeline-converter', Converter,
+ props={'pipeline': 'identity'})
+ self.p.set_flow([self.prod, c2])
+ d = self.p.run_flow(self.duration)
+ return self.failUnlessFailure(d, pond.StartTimeout)
+
+ def test_run_tasks_chained_and_fired(self):
+ self.tasks_fired = []
+ self.tasks = []
+ num_tasks = 5
+ def tasks_started(result, index):
+ self.tasks_fired[index] = True
+ return result
+ def tasks_check(result):
+ self.failIfIn(False, self.tasks_fired)
+ self.failIfIn(False, self.tasks_fired)
+ return result
+ for i in range(num_tasks):
+ self.tasks_fired.append(False)
+ d = defer.Deferred()
+ self.tasks.append(d)
+ d.addCallback(tasks_started, i)
+
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+ d = self.p.run_flow(self.duration, tasks=self.tasks)
+ d.addCallback(tasks_check)
+
+ return d
+
+ def test_run_started_happy_and_running_and_stopping(self):
+ self.time_tolerance = 0.5 # should suffice, no?
+ self.timer = 0.0
+
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+
+ def check_happy(_):
+ for c in (self.prod, self.cnv1, self.cnv2):
+ self.assertEquals(moods.get(c.comp.getMood()), moods.happy)
+ return _
+ def timer_start(result):
+ self.timer = time.time()
+ return result
+ def timer_check(result):
+ time_difference = abs(time.time() - self.timer - self.duration)
+ self.failUnless(time_difference < self.time_tolerance,
+ "Time difference too big: %r" % time_difference)
+ return result
+
+ task_d = defer.Deferred()
+ task_d.addCallback(check_happy)
+ task_d.addCallback(timer_start)
+
+ d = self.p.run_flow(self.duration, tasks=[task_d])
+ d.addCallback(timer_check)
+
+ return d
+
+ def test_run_tasks_timeout(self):
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+ self.p.guard_timeout = 4.0
+
+ def make_eternal_deferred(_):
+ # never going to fire this one...
+ eternal_d = defer.Deferred()
+ return eternal_d
+ task_d = defer.Deferred()
+ task_d.addCallback(make_eternal_deferred)
+
+ d = self.p.run_flow(self.duration, tasks=[task_d])
+
+ return self.failUnlessFailure(d, pond.FlowTimeout)
+
+ def test_run_stop_timeout(self):
+ stop_delay_time = 6.0
+ self.p.guard_timeout = 4.0
+ class DelayingCompWrapper(pond.ComponentWrapper):
+ do_delay = True
+ def stop(self, *a, **kw):
+ d = pond.ComponentWrapper.stop(self, *a, **kw)
+ def delay_stop(result):
+ if self.do_delay:
+ self.do_delay = False
+ dd = defer.Deferred()
+ reactor.callLater(stop_delay_time, dd.callback, result)
+ return dd
+ return result
+ d.addCallback(delay_stop)
+ return d
+ c2 = DelayingCompWrapper('pipeline-converter', Converter,
+ props={'pipeline': 'identity'})
+ self.p.set_flow([self.prod, c2])
+ d = self.p.run_flow(self.duration)
+ return self.failUnlessFailure(d, pond.StopTimeout)
+
+ def test_run_started_then_fails(self):
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+ wrench_timeout = 0.5
+
+ class CustomWrenchException(Exception):
+ pass
+ def insert_wrenches_into_cogs(_):
+ def insert_wrench(c):
+ raise CustomWrenchException("Wasn't that loose?")
+ d = defer.Deferred()
+ d.addCallback(insert_wrench)
+ reactor.callLater(wrench_timeout, d.callback, self.cnv1)
+ return d
+ task_d = defer.Deferred()
+ task_d.addCallback(insert_wrenches_into_cogs)
+
+ d = self.p.run_flow(self.duration, tasks=[task_d])
+ return self.failUnlessFailure(d, CustomWrenchException)
+
+ def test_run_started_then_flow_and_stop_fail(self):
+ flow_error_timeout = 0.5
+
+ class CustomFlowException(Exception):
+ pass
+ class CustomStopException(Exception):
+ pass
+
+ class BrokenCompWrapper(pond.ComponentWrapper):
+ do_break = True
+ def stop(self, *a, **kw):
+ d = pond.ComponentWrapper.stop(self, *a, **kw)
+ def delay_stop(result):
+ # breaking once should be enough
+ if self.do_break:
+ self.do_break = False
+ raise CustomStopException()
+ d.addCallback(delay_stop)
+ return d
+ c2 = BrokenCompWrapper('pipeline-converter', Converter,
+ props={'pipeline': 'identity'})
+ self.p.set_flow([self.prod, c2])
+
+ class CustomFlowException(Exception):
+ pass
+ def insert_flow_errors(_):
+ def insert_error(_ignore):
+ raise CustomFlowException("This pond is too small!")
+ d = defer.Deferred()
+ d.addCallback(insert_error)
+ reactor.callLater(flow_error_timeout, d.callback, None)
+ return d
+ task_d = defer.Deferred()
+ task_d.addCallback(insert_flow_errors)
+ d = self.p.run_flow(self.duration, tasks=[task_d])
+ return self.failUnlessFailure(d, CustomFlowException)
More information about the flumotion-commit
mailing list