arek - in flumotion/trunk: . flumotion/test

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Fri Jul 20 16:12:57 CEST 2007


Author: arek
Date: Fri Jul 20 16:12:54 2007
New Revision: 5333

Added:
   flumotion/trunk/flumotion/test/pond.py
   flumotion/trunk/flumotion/test/test_pond.py
Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/test/Makefile.am
Log:
	* flumotion/test/pond.py:
	* flumotion/test/test_pond.py:
	* flumotion/test/Makefile.am:
	Add a component test helper class.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Fri Jul 20 16:12:54 2007
@@ -1,3 +1,10 @@
+2007-07-20  Arek Korbik  <arkadini at gmail.com>
+
+	* flumotion/test/pond.py:
+	* flumotion/test/test_pond.py:
+	* flumotion/test/Makefile.am:
+	Add a component test helper class.
+
 2007-07-20 Pedro Gracia <pedro at flumotion.com>
 
 	* flumotion/wizard/steps.py:

Modified: flumotion/trunk/flumotion/test/Makefile.am
==============================================================================
--- flumotion/trunk/flumotion/test/Makefile.am	(original)
+++ flumotion/trunk/flumotion/test/Makefile.am	Fri Jul 20 16:12:54 2007
@@ -8,6 +8,7 @@
         __init__.py			\
 	common.py			\
 	testclasses.py			\
+	pond.y				\
 	gtkunit.py			\
 	realm.py
 
@@ -56,6 +57,7 @@
 	test_parts.py	 		\
 	test_pb.py	 		\
 	test_pbstream.py		\
+	test_pond.py			\
 	test_porter.py	 		\
 	test_reflect.py 		\
 	test_registry.py 		\

Added: flumotion/trunk/flumotion/test/pond.py
==============================================================================
--- (empty file)
+++ flumotion/trunk/flumotion/test/pond.py	Fri Jul 20 16:12:54 2007
@@ -0,0 +1,497 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+__all__ = ['Pond', 'PondUnitTestMixin', 'cleanup_reactor',
+           'pipeline_src', 'pipeline_cnv']
+
+import common
+
+import os
+import tempfile
+import new
+import sys
+
+from twisted.internet import gtk2reactor
+HAVE_GTK2REACTOR = True
+try:
+    gtk2reactor.install(useGtk=False)
+except AssertionError:
+    if not isinstance(sys.modules['twisted.internet.reactor'],
+                      gtk2reactor.Gtk2Reactor):
+        HAVE_GTK2REACTOR = False
+
+from twisted.python import failure
+from twisted.internet import reactor, defer, interfaces
+from twisted.web import client, error
+
+from flumotion.common import registry, log, errors, common
+from flumotion.component.producers.pipeline.pipeline import Producer
+from flumotion.component.converters.pipeline.pipeline import Converter
+from flumotion.twisted import flavors
+
+
+class PondException(Exception):
+    pass
+
+class WrongReactor(PondException):
+    pass
+
+class StartTimeout(PondException):
+    pass
+
+class FlowTimeout(PondException):
+    pass
+
+class StopTimeout(PondException):
+    pass
+
+
+class ComponentWrapper(object, log.Loggable):
+    logCategory = 'pond-compwrap'
+    _u_name_cnt = 0
+    _registry = None
+
+    def __init__(self, type_, class_, props=None, name=None, plugs=None,
+                 cfg=None):
+        self.comp = None
+        self.comp_class = class_
+        if cfg is None:
+            cfg = {}
+        self.cfg = cfg
+        self.auto_link = True
+        self.debug_msgs = []
+
+        self.sync = None
+        self.sync_master = None
+
+        if ComponentWrapper._registry is None:
+            ComponentWrapper._registry = registry.getRegistry()
+
+        cfg['type'] = type_
+        reg = ComponentWrapper._registry.getComponent(type_)
+
+        if not cfg.has_key('source'):
+            cfg['source'] = []
+
+        if not cfg.has_key('eater'):
+            cfg['eater'] = dict([(e.getName(), []) for e in reg.getEaters()
+                                 if e.getRequired()])
+
+        if not cfg.has_key('feed'):
+            cfg['feed'] = reg.getFeeders()[:]
+
+        if plugs is not None:
+            cfg['plugs'] = plugs
+        if not cfg.has_key('plugs'):
+            cfg['plugs'] = dict([(s, []) for s in reg.getSockets()])
+
+        if name:
+            cfg['name'] = name
+        if not cfg.has_key('name'):
+            cfg['name'] = ComponentWrapper.get_unique_name()
+        self.name = cfg['name']
+
+        if not cfg.has_key('parent'):
+            cfg['parent'] = 'default'
+
+        if not cfg.has_key('avatarId'):
+            cfg['avatarId'] = common.componentId(cfg['parent'], self.name)
+
+        if props is not None:
+            cfg['properties'] = props
+        if not cfg.has_key('properties'):
+            cfg['properties'] = {}
+
+        if not cfg.has_key('clock-master'):
+            cfg['clock-master'] = None
+
+        self.sync_master = cfg['clock-master']
+
+        if reg.getNeedsSynchronization():
+            self.sync = reg.getClockPriority()
+
+    def __repr__(self):
+        return '%s(%r, %r)' % (self.__class__.__name__,
+                               self.comp_class.__name__, self.cfg)
+
+    @classmethod
+    def get_unique_name(cls, prefix='cmp-'):
+        name, cls._u_name_cnt = ('%s%d' % (prefix, cls._u_name_cnt),
+                                 cls._u_name_cnt + 1)
+        return name
+
+    def instantiate(self):
+        self.comp = self.comp_class()
+        self.debug('instantiate:: %r' % self.comp.state)
+        def append(instance, key, value):
+            self.debug('append %r: %r' % (value.level, value))
+            if key == 'messages':
+                if value.debug:
+                    self.debug('proxied state debug:: %r' % value.debug)
+                    self.debug_msgs.append(value.debug)
+            flavors.StateCacheable.append(instance, key, value)
+        self.comp.state.append = new.instancemethod(append, self.comp.state)
+
+    def setup(self):
+        if self.comp is None:
+            self.instantiate()
+        return self.comp.setup(self.cfg)
+
+    def feed(self, sink_comp, links=None):
+        if links is None:
+            links = [('default', 'default')]
+        sink_name = sink_comp.name
+        for feeder, eater in links:
+            if feeder not in self.cfg['feed']:
+                raise PondException('Invalid feeder specified: %r' % feeder)
+            #self.cfg['feed'].append('%s:%s' % (sink_name, eater))
+            sink_comp.add_feeder(self, '%s:%s' % (self.name, feeder), eater)
+            #self.auto_link = False
+
+    def add_feeder(self, src_comp, feeder_name, eater):
+        self.cfg['source'].append(feeder_name)
+        self.cfg['eater'].setdefault(eater, []).append(feeder_name)
+        self.auto_link = False
+
+    def feedToFD(self, feedName, fd, eaterId=None):
+        self.debug('feedToFD(feedName=%s, %d (%s))' % (feedName, fd, eaterId))
+        return self.comp.feedToFD(feedName, fd, os.close, eaterId)
+
+    def eatFromFD(self, feedId, fd):
+        self.debug('eatFromFD(feedId=%s, %d)' % (feedId, fd))
+        return self.comp.eatFromFD(feedId, fd)
+
+    def start(self, *args, **kwargs):
+        self.debug('start(*%r, **%r)' % (args, kwargs))
+        d = self.comp.start(*args, **kwargs)
+        d.addCallback(lambda _: (self.debug('after start: %r' % _), _)[1])
+        return d
+
+    def stop(self, *args, **kwargs):
+        self.debug('stop(*%r, **%r)' % (args, kwargs))
+        if self.comp:
+            return self.comp.stop(*args, **kwargs)
+        return defer.succeed(None)
+
+
+class Pond(object, log.Loggable):
+    logCategory = 'pond'
+
+    guard_timeout = 60.0
+    guard_delay = 0.5
+    start_delay = None
+
+    def __init__(self):
+        self._comps = []
+        self._byname = {}
+        self._master = None
+        self._syncing = None
+
+    def set_flow(self, comp_chain, auto_link=True):
+        self._comps = []
+
+        if len(comp_chain) == 0:
+            return
+
+        # given a sequence (a0, a1, a2, ..., aN-1, aN)
+        # this generator produces: (a0, a1), (a1, a2), ..., (aN-1, aN)
+        def pair_chain(seq):
+            car = seq[0]
+            for cdr in seq[1:]:
+                yield car, cdr
+                car = cdr
+
+        if auto_link:
+            for c_src, c_sink in pair_chain(comp_chain):
+                if c_sink.auto_link:
+                    c_src.feed(c_sink)
+
+        self._comps.extend(comp_chain)
+
+        masters = [c for c in self._comps if c.sync_master is not None]
+        need_sync = sorted([c for c in self._comps if c.sync is not None],
+                           key=(lambda e: e.sync), reverse=True)
+
+        if need_sync:
+            if masters:
+                master = master[0]
+            else:
+                master = need_sync[0]
+
+            master.sync = None # ...? :/
+            self._master = master
+
+            master = master.cfg['avatarId']
+            for c in need_sync:
+                c.cfg['clock-master'] = master
+        elif masters:
+            for c in masters:
+                c.cfg['clock-master'] = None
+
+        for c in self._comps:
+            self._byname[c.name] = c
+            c.log('updated config for %r: %r' % (c, c.cfg))
+
+    def _make_pipes(self):
+        fds = {}
+        def feed_starter(c, feed_name, w_fd, feed_id):
+            def _feed_starter():
+                self.debug('_feed_starter: %r, %r' % (feed_name, feed_id))
+                return c.feedToFD(feed_name, w_fd, eaterId=feed_id)
+            return _feed_starter
+        for c in self._comps:
+            eaters = c.cfg['eater']
+            for eater_id in eaters:
+                for src in eaters[eater_id]:
+                    e_name, e_feed = src.split(':')
+                    self.debug('creating pipe: %r, %r, %r' %
+                               (src, e_feed, eater_id))
+                    r_fd, w_fd = os.pipe()
+                    fds[src] = (r_fd, feed_starter(self._byname[e_name],
+                                                   e_feed, w_fd, eater_id))
+        self._fds = fds
+
+    def start_flow(self):
+        delay = self.start_delay
+
+        def all_ready_p(results):
+            self.debug('** 1: all_ready_p: %r' % results)
+            pass
+        def setup_failed(failure):
+            self.info('*! 1: setup_failed: %r' % (failure,))
+            failure.trap(defer.FirstError)
+            return failure.value.subFailure
+        def start_master_clock(_):
+            self.debug('** 2: start_master_clock: %r (%r)' % (_, self._master))
+            if self._master is not None:
+                self.debug('About to ask to provide_master_clock()...')
+                d = self._master.comp.provide_master_clock(7600 - 1) # ...hack?
+                def _jd(_):
+                    self.debug('After provide_master_clock() : %r' % (_,))
+                d.addCallback(_jd)
+                return d
+            return None
+        def add_delay(value):
+            self.debug('** 3: add_delay: %r' % value)
+            if delay:
+                return DeferredDelay(delay, value)
+            return defer.succeed(delay)
+        def do_start(clocking, c):
+            self.debug('** 4: do_start_cb: %r, %r' % (clocking, c))
+            for src in c.cfg['source']:
+                r_fd, feed_starter = self._fds[src]
+                c.eatFromFD(src, r_fd)
+                feed_starter()
+            if not c.sync:
+                clocking = None
+            d = c.start(clocking)
+            d.addCallback(lambda _: clocking)
+            return d
+        def do_stop(failure):
+            self.debug('** X: do_stop: %r' % failure)
+            for c in reversed(self._comps):
+                c.stop()
+            return failure
+
+        self._make_pipes()
+
+        self.debug('About to start the flow...')
+        # P(ossible)TODO: make it report setup failures in all the
+        # components, not only in the first to fail...?
+        d = defer.DeferredList([c.setup() for c in self._comps],
+                               fireOnOneErrback=1, consumeErrors=1)
+        d.addCallbacks(all_ready_p, setup_failed)
+        d.addCallback(start_master_clock)
+        for c in self._comps:
+            d.addCallback(add_delay)
+            d.addCallback(do_start, c)
+        d.addErrback(do_stop)
+        return d
+
+    def stop_flow(self):
+        d = defer.DeferredList([c.stop() for c in reversed(self._comps)],
+                               fireOnOneErrback=1, consumeErrors=1)
+        def stop_flow_report(results):
+            self.debug('stop_flow_report: %r' % (results,))
+            return results
+        def stop_flow_failed(failure):
+            self.info('stop_flow_failed: %r' % (failure,))
+            failure.trap(defer.FirstError)
+            self.info('stop_flow_failed! %r' % (failure.value.subFailure,))
+            return failure.value.subFailure
+        d.addCallbacks(stop_flow_report, stop_flow_failed)
+        return d
+
+    def run_flow(self, duration, tasks=None,
+                 start_d=None, start_flow=True, stop_flow=True):
+        if not HAVE_GTK2REACTOR:
+            raise WrongReactor("gtk2reactor isn't available")
+
+        self.debug('run_flow: tasks: %r' % (tasks,))
+        flow_d = start_d
+
+        if tasks is None:
+            tasks = []
+
+        if flow_d is None:
+            if start_flow:
+                flow_d = self.start_flow()
+            else:
+                flow_d = defer.succeed(True)
+
+        flow_started_finished = [False, False]
+
+        guard_d = None
+        timeout_d = defer.Deferred()
+        stop_d = defer.Deferred()
+        stop_timeout_d = defer.Deferred()
+        chained_d = None
+        immediate_d = None
+
+        callids = [None, None, None] # callLater ids: stop_d,
+                                     # timeout_d, fire_chained
+
+        if tasks:
+            # if have tasks, run simultaneously with the main timer deferred
+            chained_d = defer.DeferredList([stop_d] + tasks,
+                                           fireOnOneErrback=1, consumeErrors=1)
+            def chained_failed(failure):
+                self.info('chained_failed: %r' % (failure,))
+                failure.trap(defer.FirstError)
+                return failure.value.subFailure
+            chained_d.addErrback(chained_failed)
+        else:
+            # otherwise, just idle...
+            chained_d = stop_d
+
+        def start_complete(result):
+            self.debug('start_complete: %r' % (result,))
+            flow_started_finished[0] = True
+            callids[0] = reactor.callLater(duration, stop_d.callback, None)
+            if tasks:
+                def _fire_chained():
+                    callids[2] = None
+                    for t in tasks:
+                        try:
+                            t.callback(result)
+                        except defer.AlreadyCalledError:
+                            pass
+                callids[2] = reactor.callLater(0, _fire_chained)
+            return chained_d
+
+        def flow_complete(result):
+            self.debug('flow_complete: %r' % (result,))
+            flow_started_finished[1] = True
+            return result
+
+        def flow_timed_out():
+            self.debug('flow_timed_out!')
+            if not flow_started_finished[0]:
+                timeout_d.errback(StartTimeout('flow start timed out'))
+            elif not flow_started_finished[1]:
+                timeout_d.errback(FlowTimeout('flow run timed out'))
+            else:
+                stop_timeout_d.errback(StopTimeout('flow stop timed out'))
+
+        def clean_calls(result):
+            self.debug('clean_calls: %r' % (result,))
+            for i, cid in enumerate(callids):
+                if cid is not None:
+                    if cid.active():
+                        cid.cancel()
+                    callids[i] = None
+            return result
+
+        flow_d.addCallbacks(start_complete)
+        flow_d.addCallback(flow_complete)
+
+        guard_d = defer.DeferredList([flow_d, timeout_d], consumeErrors=1,
+                                     fireOnOneErrback=1, fireOnOneCallback=1)
+
+        def guard_failed(failure):
+            self.info('guard_failed: %r' % (failure,))
+            failure.trap(defer.FirstError)
+            return failure.value.subFailure
+        if stop_flow:
+            def _force_stop_flow(result):
+                self.debug('_force_stop_flow: %r' % (result,))
+                d = defer.DeferredList([self.stop_flow(), stop_timeout_d],
+                                       fireOnOneErrback=1, fireOnOneCallback=1,
+                                       consumeErrors=1)
+                def _return_orig_result(stop_result):
+                    if isinstance(result, failure.Failure):
+                        # always return the run's failure first
+                        # what do I return if both the run and stop failed?
+                        self.debug('_return_orig[R]: %r' % (result,))
+                        return result
+                    elif isinstance(stop_result, failure.Failure):
+                        # return failure from stop
+                        self.debug('_return_orig[S]: %r' % (stop_result,))
+                        return stop_result
+                    return result
+                def force_stop_failed(failure):
+                    self.info('force_stop_failed: %r' % (failure,))
+                    failure.trap(defer.FirstError)
+                    return failure.value.subFailure
+                d.addCallbacks(lambda r: r[0][1], force_stop_failed)
+                d.addBoth(_return_orig_result)
+                return d
+            guard_d.addBoth(_force_stop_flow)
+
+        guard_d.addErrback(guard_failed)
+        guard_d.addBoth(clean_calls)
+
+        callids[1] = reactor.callLater(self.guard_timeout, flow_timed_out)
+        return guard_d
+
+class PondUnitTestMixin:
+    if not HAVE_GTK2REACTOR:
+        skip = 'gtk2reactor is required for this test case'
+
+def cleanup_reactor(force=False):
+    if not HAVE_GTK2REACTOR and not force:
+        return
+    log.debug('pond', 'running cleanup_reactor...')
+    delayed = reactor.getDelayedCalls()
+    for dc in delayed:
+        dc.cancel()
+    # the rest is taken from twisted trial...
+    sels = reactor.removeAll()
+    if sels:
+        log.info('pond', 'leftover selectables...: %r %r' %
+                 (sels, reactor.waker))
+        for sel in sels:
+            if interfaces.IProcessTransport.providedBy(sel):
+                sel.signalProcess('KILL')
+
+def pipeline_src(pipelinestr='fakesrc datarate=1024 is-live=true ! '
+                 'video/x-raw-rgb,framerate=(fraction)8/1,width=32,height=24'):
+    fs_name = ComponentWrapper.get_unique_name('ppln-src-')
+
+    return ComponentWrapper('pipeline-producer', Producer, name=fs_name,
+                            props={'pipeline': pipelinestr})
+
+def pipeline_cnv(pipelinestr='identity'):
+    fs_name = ComponentWrapper.get_unique_name('ppln-cnv-')
+
+    return ComponentWrapper('pipeline-converter', Converter, name=fs_name,
+                            props={'pipeline': pipelinestr})
+

Added: flumotion/trunk/flumotion/test/test_pond.py
==============================================================================
--- (empty file)
+++ flumotion/trunk/flumotion/test/test_pond.py	Fri Jul 20 16:12:54 2007
@@ -0,0 +1,468 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+from twisted.trial import unittest
+
+import common
+
+import sys, time, os
+
+from twisted.python import failure
+from twisted.internet import defer, interfaces, reactor, gtk2reactor
+from twisted.web import client, error
+
+from flumotion.common import log, errors
+from flumotion.common.planet import moods
+
+from flumotion.test import pond
+
+from flumotion.component.producers.pipeline.pipeline import Producer
+from flumotion.component.converters.pipeline.pipeline import Converter
+
+class PondTestCase(log.Loggable, unittest.TestCase, pond.PondUnitTestMixin):
+    logCategory = 'pond-test'
+
+class TestPondGtk2Reactorness(unittest.TestCase):
+    def test_mixin_class(self):
+        class TestPondUnitTestMixin(unittest.TestCase, pond.PondUnitTestMixin):
+            pass
+        if not isinstance(sys.modules['twisted.internet.reactor'],
+                          gtk2reactor.Gtk2Reactor):
+            # not running with a gtk2reactor, TestPondUnitTestMixin
+            # class should have .skip attribute
+            self.failUnless(hasattr(TestPondUnitTestMixin, 'skip'),
+                            "PondUnitTestMixin doesn't set .skip correctly.")
+        else:
+            self.failIf(hasattr(TestPondUnitTestMixin, 'skip'),
+                        "PondUnitTestMixin sets .skip incorrectly.")
+
+    def test_have_gtk2reactor(self):
+        if not isinstance(sys.modules['twisted.internet.reactor'],
+                          gtk2reactor.Gtk2Reactor):
+            # not running with a gtk2reactor, pond.HAVE_GTK2REACTOR
+            # should be False
+            self.failUnlessEquals(pond.HAVE_GTK2REACTOR, False)
+        else:
+            self.failIfEquals(pond.HAVE_GTK2REACTOR, False)
+
+class TestComponentWrapper(unittest.TestCase):
+    def test_get_unique_name(self):
+        self.failIfEquals(pond.ComponentWrapper.get_unique_name(),
+                          pond.ComponentWrapper.get_unique_name())
+
+
+    def test_invalid_type(self):
+        self.failUnlessRaises(errors.UnknownComponentError,
+                              pond.ComponentWrapper, 'invalid-comp-type',
+                              None)
+
+    def test_valid_type(self):
+        cw = pond.ComponentWrapper('pipeline-producer', None, name='pp')
+        self.failUnlessEquals(cw.cfg,
+                              {'feed': ['default'], 'name': 'pp',
+                               'parent': 'default', 'clock-master': None,
+                               'avatarId': '/default/pp', 'eater': {},
+                               'source': [], 'plugs': {}, 'properties': {},
+                               'type': 'pipeline-producer'})
+
+
+    def test_simple_link(self):
+        pp = pond.ComponentWrapper('pipeline-producer', None, name='pp')
+        pc = pond.ComponentWrapper('pipeline-converter', None)
+
+        pp.feed(pc)
+        self.failUnlessEquals(pc.cfg['source'], ['pp:default'])
+        self.failUnlessEquals(pc.cfg['eater'], {'default': ['pp:default']})
+
+    def test_non_default_link(self):
+        fwp = pond.ComponentWrapper('firewire-producer', None, name='fwp')
+        pc = pond.ComponentWrapper('pipeline-converter', None, name='pc')
+
+        # this should raise an exception - firewire-producer doesn't
+        # have a default feeder
+        self.failUnlessRaises(pond.PondException, fwp.feed, pc)
+
+        fwp.feed(pc, [('video', 'default')])
+        fwp.feed(pc, [('audio', 'default')])
+
+        self.failUnlessEquals(pc.cfg['source'], ['fwp:video', 'fwp:audio'])
+        self.failUnlessEquals(pc.cfg['eater'],
+                              {'default': ['fwp:video', 'fwp:audio']})
+
+
+    def test_instantiate_and_setup_errors(self):
+        pp = pond.ComponentWrapper('pipeline-producer', None, name='pp')
+        self.failUnlessRaises(TypeError, pp.instantiate) # None()!?
+        from flumotion.component.producers.pipeline import pipeline
+        pp = pond.ComponentWrapper('pipeline-producer', Producer, name='pp')
+
+        pp.instantiate()
+        d = pp.setup()
+
+        # the deferred should fail (no mandatory pipeline property) -
+        # stop the component in any case (to clean the reactor) and
+        # passtrough the result/failure
+        d.addBoth(lambda rf: (pp.stop(), rf)[1])
+        return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+    def test_setup_pipeline_error(self):
+        from flumotion.component.producers.pipeline import pipeline
+        pp = pond.ComponentWrapper('pipeline-producer', Producer,
+                                   name='pp', props={'pipeline': 'fakesink'})
+
+        pp.instantiate()
+        # we're going to fail in gst - make sure the gst logger is silent
+        import gst
+        old_debug_level = gst.debug_get_default_threshold()
+        gst.debug_set_default_threshold(gst.LEVEL_NONE)
+
+        d = pp.setup()
+
+        # the deferred should fail (the only pipeline element doesn't
+        # have source pads) - stop the component in any case (to clean
+        # the reactor) and passtrough the result/failure
+
+        d.addBoth(lambda rf: (pp.stop(), rf)[1])
+        if old_debug_level != gst.LEVEL_NONE:
+            def _restore_gst_debug_level(rf):
+                gst.debug_set_default_threshold(old_debug_level)
+                return rf
+            d.addBoth(_restore_gst_debug_level)
+        return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+    def test_setup_and_stop(self):
+        from flumotion.component.producers.pipeline import pipeline
+        pp = pond.ComponentWrapper('pipeline-producer', Producer,
+                                   name='pp', props={'pipeline': 'fakesrc'})
+
+        pp.instantiate()
+        d = pp.setup()
+
+        d.addCallback(lambda _: pp.stop())
+        return d
+
+class TestPondSetup(PondTestCase):
+    def setUp(self):
+        self.prod = pond.pipeline_src()
+        self.cnv1 = pond.pipeline_cnv()
+        self.cnv2 = pond.pipeline_cnv()
+        self.components = [self.prod, self.cnv1, self.cnv2]
+
+        self.p = pond.Pond()
+
+    def tearDown(self):
+        return defer.DeferredList([c.stop() for c in self.components])
+
+
+    def test_auto_linking(self):
+        # the components should be linked automatically
+        # [prod:default] --> [default:cnv1:default] --> [default:cnv2]
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+
+        prod_feed = '%s:%s' % (self.prod.name, self.prod.cfg['feed'][0])
+        self.failUnlessEquals([prod_feed], self.cnv1.cfg['source'])
+        self.failUnlessEquals({'default': [prod_feed]}, self.cnv1.cfg['eater'])
+
+        cnv1_feed = '%s:%s' % (self.cnv1.name, self.cnv1.cfg['feed'][0])
+        self.failUnlessEquals([cnv1_feed], self.cnv2.cfg['source'])
+        self.failUnlessEquals({'default': [cnv1_feed]}, self.cnv2.cfg['eater'])
+
+    def test_dont_auto_link_linked(self):
+        p2 = pond.pipeline_src()
+        self.components.append(p2)
+
+        p2.feed(self.cnv1)
+        self.prod.auto_link = False
+
+        # [  p2:default] --> [default:cnv1], set explicitly
+        # no p2 --> prod, explicitly prohibited
+        # [prod:default] --> [default:cnv2]
+        self.p.set_flow([p2, self.prod, self.cnv2, self.cnv1])
+
+        prod_feed = '%s:%s' % (p2.name, p2.cfg['feed'][0])
+        self.failUnlessEquals([prod_feed], self.cnv1.cfg['source'])
+        self.failUnlessEquals({'default': [prod_feed]}, self.cnv1.cfg['eater'])
+
+        self.failUnlessEquals([], self.prod.cfg['source'])
+        self.failUnlessEquals({}, self.prod.cfg['eater'])
+
+        prod_feed = '%s:%s' % (self.prod.name, self.prod.cfg['feed'][0])
+        self.failUnlessEquals([prod_feed], self.cnv2.cfg['source'])
+        self.failUnlessEquals({'default': [prod_feed]}, self.cnv2.cfg['eater'])
+
+    def test_master_clock(self):
+        p2 = pond.pipeline_src()
+        self.components.append(p2)
+
+        p2.feed(self.cnv1)
+        self.prod.feed(self.cnv1)
+        self.cnv1.feed(self.cnv2)
+
+        self.p.set_flow([self.prod, p2, self.cnv1, self.cnv2], auto_link=False)
+
+        # both prod and p2 require a clock, only one should provide it
+        self.failUnlessEquals(self.prod.cfg['clock-master'],
+                              p2.cfg['clock-master'])
+        self.failUnlessEquals(self.cnv1.cfg['clock-master'], None)
+        self.failUnlessEquals(self.cnv2.cfg['clock-master'], None)
+
+        master = self.prod
+        slave = p2
+        if master.cfg['clock-master'] != master.cfg['avatarId']:
+            slave, master = master, slave
+
+        # the master-clock component should provide a clock, and not
+        # require an external clock source, as opposed the the slave
+        self.failUnlessEquals(master.sync, None)
+        self.failIfEquals(slave.sync, None)
+
+class TestPondFlow(PondTestCase):
+    def setUp(self):
+        self.duration = 2.0
+
+        prod_pp = ('videotestsrc is-live=true ! '
+                   'video/x-raw-rgb,framerate=(fraction)8/1,'
+                   'width=32,height=24')
+        self.prod = pond.pipeline_src(prod_pp)
+
+        self.cnv1 = pond.pipeline_cnv()
+        self.cnv2 = pond.pipeline_cnv()
+
+        self.p = pond.Pond()
+
+    def tearDown(self):
+        d = self.p.stop_flow()
+
+        # add cleanup, otherwise components a.t.m. don't cleanup after
+        # themselves too well, remove when fixed
+        d.addBoth(lambda _: pond.cleanup_reactor())
+        return d
+
+
+    def test_setup_fail_gst_linking(self):
+        p2 = pond.pipeline_src('fakesink') # this just can't work!
+        c2 = pond.pipeline_cnv('fakesink') # and neither can this!
+
+        # we're going to fail in gst - make sure the gst logger is silent
+        import gst
+        old_debug_level = gst.debug_get_default_threshold()
+        gst.debug_set_default_threshold(gst.LEVEL_NONE)
+
+        self.p.set_flow([p2, c2, self.cnv1])
+        d = self.p.start_flow()
+
+        if old_debug_level != gst.LEVEL_NONE:
+            def _restore_gst_debug_level(rf):
+                gst.debug_set_default_threshold(old_debug_level)
+                return rf
+            d.addBoth(_restore_gst_debug_level)
+        return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+    def test_setup_started_and_happy(self):
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+        d = self.p.start_flow()
+        def check_happy(_):
+            for c in (self.prod, self.cnv1, self.cnv2):
+                self.assertEquals(moods.get(c.comp.getMood()), moods.happy)
+            return _
+        d.addCallback(check_happy)
+        return d
+
+    def test_run_fail_gst_linking(self):
+        p2 = pond.pipeline_src('fakesink') # this just can't work!
+        c2 = pond.pipeline_cnv('fakesink') # and neither can this!
+
+        # we're going to fail in gst - make sure the gst logger is silent
+        import gst
+        old_debug_level = gst.debug_get_default_threshold()
+        gst.debug_set_default_threshold(gst.LEVEL_NONE)
+
+        self.p.set_flow([p2, c2, self.cnv1])
+        d = self.p.run_flow(self.duration)
+
+        if old_debug_level != gst.LEVEL_NONE:
+            def _restore_gst_debug_level(rf):
+                gst.debug_set_default_threshold(old_debug_level)
+                return rf
+            d.addBoth(_restore_gst_debug_level)
+        return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
+
+    def test_run_start_timeout(self):
+        start_delay_time = 5.0
+        self.p.guard_timeout = 2.0
+        class LingeringCompWrapper(pond.ComponentWrapper):
+            def start(self, *a, **kw):
+                d = pond.ComponentWrapper.start(self, *a, **kw)
+                def delay_start(result):
+                    dd = defer.Deferred()
+                    reactor.callLater(start_delay_time, dd.callback, result)
+                    return dd
+                d.addCallback(delay_start)
+                return d
+        c2 = LingeringCompWrapper('pipeline-converter', Converter,
+                                  props={'pipeline': 'identity'})
+        self.p.set_flow([self.prod, c2])
+        d = self.p.run_flow(self.duration)
+        return self.failUnlessFailure(d, pond.StartTimeout)
+
+    def test_run_tasks_chained_and_fired(self):
+        self.tasks_fired = []
+        self.tasks = []
+        num_tasks = 5
+        def tasks_started(result, index):
+            self.tasks_fired[index] = True
+            return result
+        def tasks_check(result):
+            self.failIfIn(False, self.tasks_fired)
+            self.failIfIn(False, self.tasks_fired)
+            return result
+        for i in range(num_tasks):
+            self.tasks_fired.append(False)
+            d = defer.Deferred()
+            self.tasks.append(d)
+            d.addCallback(tasks_started, i)
+
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+        d = self.p.run_flow(self.duration, tasks=self.tasks)
+        d.addCallback(tasks_check)
+
+        return d
+
+    def test_run_started_happy_and_running_and_stopping(self):
+        self.time_tolerance = 0.5 # should suffice, no?
+        self.timer = 0.0
+
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+
+        def check_happy(_):
+            for c in (self.prod, self.cnv1, self.cnv2):
+                self.assertEquals(moods.get(c.comp.getMood()), moods.happy)
+            return _
+        def timer_start(result):
+            self.timer = time.time()
+            return result
+        def timer_check(result):
+            time_difference = abs(time.time() - self.timer - self.duration)
+            self.failUnless(time_difference < self.time_tolerance,
+                            "Time difference too big: %r" % time_difference)
+            return result
+
+        task_d = defer.Deferred()
+        task_d.addCallback(check_happy)
+        task_d.addCallback(timer_start)
+
+        d = self.p.run_flow(self.duration, tasks=[task_d])
+        d.addCallback(timer_check)
+
+        return d
+
+    def test_run_tasks_timeout(self):
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+        self.p.guard_timeout = 4.0
+
+        def make_eternal_deferred(_):
+            # never going to fire this one...
+            eternal_d = defer.Deferred()
+            return eternal_d
+        task_d = defer.Deferred()
+        task_d.addCallback(make_eternal_deferred)
+
+        d = self.p.run_flow(self.duration, tasks=[task_d])
+
+        return self.failUnlessFailure(d, pond.FlowTimeout)
+
+    def test_run_stop_timeout(self):
+        stop_delay_time = 6.0
+        self.p.guard_timeout = 4.0
+        class DelayingCompWrapper(pond.ComponentWrapper):
+            do_delay = True
+            def stop(self, *a, **kw):
+                d = pond.ComponentWrapper.stop(self, *a, **kw)
+                def delay_stop(result):
+                    if self.do_delay:
+                        self.do_delay = False
+                        dd = defer.Deferred()
+                        reactor.callLater(stop_delay_time, dd.callback, result)
+                        return dd
+                    return result
+                d.addCallback(delay_stop)
+                return d
+        c2 = DelayingCompWrapper('pipeline-converter', Converter,
+                                 props={'pipeline': 'identity'})
+        self.p.set_flow([self.prod, c2])
+        d = self.p.run_flow(self.duration)
+        return self.failUnlessFailure(d, pond.StopTimeout)
+
+    def test_run_started_then_fails(self):
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+        wrench_timeout = 0.5
+
+        class CustomWrenchException(Exception):
+            pass
+        def insert_wrenches_into_cogs(_):
+            def insert_wrench(c):
+                raise CustomWrenchException("Wasn't that loose?")
+            d = defer.Deferred()
+            d.addCallback(insert_wrench)
+            reactor.callLater(wrench_timeout, d.callback, self.cnv1)
+            return d
+        task_d = defer.Deferred()
+        task_d.addCallback(insert_wrenches_into_cogs)
+
+        d = self.p.run_flow(self.duration, tasks=[task_d])
+        return self.failUnlessFailure(d, CustomWrenchException)
+
+    def test_run_started_then_flow_and_stop_fail(self):
+        flow_error_timeout = 0.5
+
+        class CustomFlowException(Exception):
+            pass
+        class CustomStopException(Exception):
+            pass
+
+        class BrokenCompWrapper(pond.ComponentWrapper):
+            do_break = True
+            def stop(self, *a, **kw):
+                d = pond.ComponentWrapper.stop(self, *a, **kw)
+                def delay_stop(result):
+                    # breaking once should be enough
+                    if self.do_break:
+                        self.do_break = False
+                        raise CustomStopException()
+                d.addCallback(delay_stop)
+                return d
+        c2 = BrokenCompWrapper('pipeline-converter', Converter,
+                               props={'pipeline': 'identity'})
+        self.p.set_flow([self.prod, c2])
+
+        class CustomFlowException(Exception):
+            pass
+        def insert_flow_errors(_):
+            def insert_error(_ignore):
+                raise CustomFlowException("This pond is too small!")
+            d = defer.Deferred()
+            d.addCallback(insert_error)
+            reactor.callLater(flow_error_timeout, d.callback, None)
+            return d
+        task_d = defer.Deferred()
+        task_d.addCallback(insert_flow_errors)
+        d = self.p.run_flow(self.duration, tasks=[task_d])
+        return self.failUnlessFailure(d, CustomFlowException)


More information about the flumotion-commit mailing list