arek - in flumotion/trunk: . flumotion/test

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Mon Jul 23 14:23:53 CEST 2007


Author: arek
Date: Mon Jul 23 14:23:51 2007
New Revision: 5338

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/test/pond.py
   flumotion/trunk/flumotion/test/test_pond.py
Log:
	* flumotion/test/pond.py:
	Fix component clock synchronization and startup dealy code; turn
	some callback functions into less implicit helper functions.
	* flumotion/test/test_pond.py:
	Add tests for providing clocking info and for startup delays,
	remove redundant/useless tests, fix spelling, replace unclear
	callbacks.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Mon Jul 23 14:23:51 2007
@@ -1,3 +1,14 @@
+2007-07-23  Arek Korbik  <arkadini at gmail.com>
+
+	* flumotion/test/pond.py:
+	Fix component clock synchronization and startup dealy code; turn
+	some callback functions into less implicit helper functions.
+
+	* flumotion/test/test_pond.py:
+	Add tests for providing clocking info and for startup delays,
+	remove redundant/useless tests, fix spelling, replace unclear
+	callbacks.
+
 2007-07-20 Pedro Gracia <pedro at flumotion.com>
 
 	* flumotion/admin/gtk/client.py:

Modified: flumotion/trunk/flumotion/test/pond.py
==============================================================================
--- flumotion/trunk/flumotion/test/pond.py	(original)
+++ flumotion/trunk/flumotion/test/pond.py	Mon Jul 23 14:23:51 2007
@@ -64,6 +64,26 @@
     pass
 
 
+def delayed_d(time, val):
+    """Insert some delay into callback chain."""
+
+    d = defer.Deferred()
+    reactor.callLater(time, d.callback, val)
+    return d
+
+def override_value_callback(_result, value):
+    """
+    Ignore the result returned from the deferred callback chain and
+    return the given value.
+    """
+    return value
+
+def call_and_passthru_callback(result, callable_, *args, **kwargs):
+    """Invoke the callable_ and passthrough the original result."""
+    callable_(*args, **kwargs)
+    return result
+
+
 class ComponentWrapper(object, log.Loggable):
     logCategory = 'pond-compwrap'
     _u_name_cnt = 0
@@ -203,28 +223,25 @@
         self._comps = []
         self._byname = {}
         self._master = None
-        self._syncing = None
 
     def set_flow(self, comp_chain, auto_link=True):
-        self._comps = []
-
         if len(comp_chain) == 0:
             return
 
+        self._comps = comp_chain
+
         if auto_link:
             for c_src, c_sink in zip(comp_chain[:-1], comp_chain[1:]):
                 if c_sink.auto_link:
                     c_src.feed(c_sink)
 
-        self._comps.extend(comp_chain)
-
         masters = [c for c in self._comps if c.sync_master is not None]
         need_sync = sorted([c for c in self._comps if c.sync is not None],
                            key=(lambda e: e.sync), reverse=True)
 
         if need_sync:
             if masters:
-                master = master[0]
+                master = masters[0]
             else:
                 master = need_sync[0]
 
@@ -267,36 +284,47 @@
         def all_ready_p(results):
             self.debug('** 1: all_ready_p: %r' % results)
             pass
+
         def setup_failed(failure):
             self.info('*! 1: setup_failed: %r' % (failure,))
             failure.trap(defer.FirstError)
             return failure.value.subFailure
+
         def start_master_clock(_):
             self.debug('** 2: start_master_clock: %r (%r)' % (_, self._master))
             if self._master is not None:
                 self.debug('About to ask to provide_master_clock()...')
                 d = self._master.comp.provide_master_clock(7600 - 1) # ...hack?
-                def _jd(_):
-                    self.debug('After provide_master_clock() : %r' % (_,))
-                d.addCallback(_jd)
+                def _passthrough_debug(_res):
+                    self.debug('After provide_master_clock() : %r' % (_res,))
+                    return _res
+                d.addCallback(_passthrough_debug)
                 return d
             return None
+
         def add_delay(value):
-            self.debug('** 3: add_delay: %r' % value)
+            self.debug('** 3: add_delay: %r' % (value,))
             if delay:
-                return DeferredDelay(delay, value)
-            return defer.succeed(delay)
+                return delayed_d(delay, value)
+            return defer.succeed(value)
+
         def do_start(clocking, c):
             self.debug('** 4: do_start_cb: %r, %r' % (clocking, c))
             for src in c.cfg['source']:
                 r_fd, feed_starter = self._fds[src]
                 c.eatFromFD(src, r_fd)
                 feed_starter()
+            comp_clocking = clocking
             if not c.sync:
-                clocking = None
-            d = c.start(clocking)
-            d.addCallback(lambda _: clocking)
+                comp_clocking = None
+            self.debug('*_ 4: do_start_cb: %r, %r' % (comp_clocking, c))
+            d = c.start(comp_clocking)
+
+            # if component starts ok, repeat/pass clocking info to a
+            # subsequent component
+            d.addCallback(override_value_callback, clocking)
             return d
+
         def do_stop(failure):
             self.debug('** X: do_stop: %r' % failure)
             rcomps = self._comps[:]

Modified: flumotion/trunk/flumotion/test/test_pond.py
==============================================================================
--- flumotion/trunk/flumotion/test/test_pond.py	(original)
+++ flumotion/trunk/flumotion/test/test_pond.py	Mon Jul 23 14:23:51 2007
@@ -111,7 +111,6 @@
     def test_instantiate_and_setup_errors(self):
         pp = pond.ComponentWrapper('pipeline-producer', None, name='pp')
         self.failUnlessRaises(TypeError, pp.instantiate) # None()!?
-        from flumotion.component.producers.pipeline import pipeline
         pp = pond.ComponentWrapper('pipeline-producer', Producer, name='pp')
 
         pp.instantiate()
@@ -119,12 +118,11 @@
 
         # the deferred should fail (no mandatory pipeline property) -
         # stop the component in any case (to clean the reactor) and
-        # passtrough the result/failure
-        d.addBoth(lambda rf: (pp.stop(), rf)[1])
+        # passthrough the result/failure
+        d.addBoth(pond.call_and_passthru_callback, pp.stop)
         return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
 
     def test_setup_pipeline_error(self):
-        from flumotion.component.producers.pipeline import pipeline
         pp = pond.ComponentWrapper('pipeline-producer', Producer,
                                    name='pp', props={'pipeline': 'fakesink'})
 
@@ -138,9 +136,9 @@
 
         # the deferred should fail (the only pipeline element doesn't
         # have source pads) - stop the component in any case (to clean
-        # the reactor) and passtrough the result/failure
+        # the reactor) and passthrough the result/failure
+        d.addBoth(pond.call_and_passthru_callback, pp.stop)
 
-        d.addBoth(lambda rf: (pp.stop(), rf)[1])
         if old_debug_level != gst.LEVEL_NONE:
             def _restore_gst_debug_level(rf):
                 gst.debug_set_default_threshold(old_debug_level)
@@ -149,7 +147,6 @@
         return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
 
     def test_setup_and_stop(self):
-        from flumotion.component.producers.pipeline import pipeline
         pp = pond.ComponentWrapper('pipeline-producer', Producer,
                                    name='pp', props={'pipeline': 'fakesrc'})
 
@@ -323,6 +320,48 @@
         d = self.p.run_flow(self.duration)
         return self.failUnlessFailure(d, pond.StartTimeout)
 
+    def test_run_with_delays(self):
+        self.p.start_delay = 0.5
+
+        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+        d = self.p.run_flow(self.duration)
+        return d
+
+    def test_run_provides_clocking(self):
+        p2_pp = ('videotestsrc is-live=true ! '
+                 'video/x-raw-rgb,framerate=(fraction)8/1,'
+                 'width=32,height=24')
+        p2 = pond.pipeline_src(p2_pp)
+
+        from flumotion.component.muxers.multipart import Multipart
+        mux = pond.ComponentWrapper('multipart-muxer', Multipart, name='mux')
+
+        self.prod.feed(mux)
+        p2.feed(mux)
+        mux.feed(self.cnv1)
+
+        self.clock_slave = p2
+        def check_clocking(_):
+            self.warning('check_clocking: %s %r' %
+                         (self.clock_slave.name,
+                          self.clock_slave.comp.pipeline.get_clock()))
+            import gst
+            pp = self.clock_slave.comp.pipeline
+            # is there a better way to check if that component is
+            # using an external clock source?
+            self.failUnless(isinstance(pp.get_clock(), gst.NetClientClock),
+                            "Didn't receive external clocking info.")
+            return _
+        task_d = defer.Deferred()
+        task_d.addCallback(check_clocking)
+
+        self.p.set_flow([self.prod, p2, mux, self.cnv1], auto_link=False)
+        if self.prod is not self.p._master:
+            # p2 (of [self.prod, p2]) seems to be the master this time
+            self.clock_slave = self.prod
+        d = self.p.run_flow(self.duration, tasks=[task_d])
+        return d
+
     def test_run_tasks_chained_and_fired(self):
         self.tasks_fired = []
         self.tasks = []
@@ -346,34 +385,6 @@
 
         return d
 
-    def test_run_started_happy_and_running_and_stopping(self):
-        self.time_tolerance = 0.5 # should suffice, no?
-        self.timer = 0.0
-
-        self.p.set_flow([self.prod, self.cnv1, self.cnv2])
-
-        def check_happy(_):
-            for c in (self.prod, self.cnv1, self.cnv2):
-                self.assertEquals(moods.get(c.comp.getMood()), moods.happy)
-            return _
-        def timer_start(result):
-            self.timer = time.time()
-            return result
-        def timer_check(result):
-            time_difference = abs(time.time() - self.timer - self.duration)
-            self.failUnless(time_difference < self.time_tolerance,
-                            "Time difference too big: %r" % time_difference)
-            return result
-
-        task_d = defer.Deferred()
-        task_d.addCallback(check_happy)
-        task_d.addCallback(timer_start)
-
-        d = self.p.run_flow(self.duration, tasks=[task_d])
-        d.addCallback(timer_check)
-
-        return d
-
     def test_run_tasks_timeout(self):
         self.p.set_flow([self.prod, self.cnv1, self.cnv2])
         self.p.guard_timeout = 4.0


More information about the flumotion-commit mailing list