arek - in flumotion/trunk: . flumotion/test
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Mon Jul 23 14:23:53 CEST 2007
Author: arek
Date: Mon Jul 23 14:23:51 2007
New Revision: 5338
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/test/pond.py
flumotion/trunk/flumotion/test/test_pond.py
Log:
* flumotion/test/pond.py:
Fix component clock synchronization and startup dealy code; turn
some callback functions into less implicit helper functions.
* flumotion/test/test_pond.py:
Add tests for providing clocking info and for startup delays,
remove redundant/useless tests, fix spelling, replace unclear
callbacks.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Mon Jul 23 14:23:51 2007
@@ -1,3 +1,14 @@
+2007-07-23 Arek Korbik <arkadini at gmail.com>
+
+ * flumotion/test/pond.py:
+ Fix component clock synchronization and startup dealy code; turn
+ some callback functions into less implicit helper functions.
+
+ * flumotion/test/test_pond.py:
+ Add tests for providing clocking info and for startup delays,
+ remove redundant/useless tests, fix spelling, replace unclear
+ callbacks.
+
2007-07-20 Pedro Gracia <pedro at flumotion.com>
* flumotion/admin/gtk/client.py:
Modified: flumotion/trunk/flumotion/test/pond.py
==============================================================================
--- flumotion/trunk/flumotion/test/pond.py (original)
+++ flumotion/trunk/flumotion/test/pond.py Mon Jul 23 14:23:51 2007
@@ -64,6 +64,26 @@
pass
+def delayed_d(time, val):
+ """Insert some delay into callback chain."""
+
+ d = defer.Deferred()
+ reactor.callLater(time, d.callback, val)
+ return d
+
+def override_value_callback(_result, value):
+ """
+ Ignore the result returned from the deferred callback chain and
+ return the given value.
+ """
+ return value
+
+def call_and_passthru_callback(result, callable_, *args, **kwargs):
+ """Invoke the callable_ and passthrough the original result."""
+ callable_(*args, **kwargs)
+ return result
+
+
class ComponentWrapper(object, log.Loggable):
logCategory = 'pond-compwrap'
_u_name_cnt = 0
@@ -203,28 +223,25 @@
self._comps = []
self._byname = {}
self._master = None
- self._syncing = None
def set_flow(self, comp_chain, auto_link=True):
- self._comps = []
-
if len(comp_chain) == 0:
return
+ self._comps = comp_chain
+
if auto_link:
for c_src, c_sink in zip(comp_chain[:-1], comp_chain[1:]):
if c_sink.auto_link:
c_src.feed(c_sink)
- self._comps.extend(comp_chain)
-
masters = [c for c in self._comps if c.sync_master is not None]
need_sync = sorted([c for c in self._comps if c.sync is not None],
key=(lambda e: e.sync), reverse=True)
if need_sync:
if masters:
- master = master[0]
+ master = masters[0]
else:
master = need_sync[0]
@@ -267,36 +284,47 @@
def all_ready_p(results):
self.debug('** 1: all_ready_p: %r' % results)
pass
+
def setup_failed(failure):
self.info('*! 1: setup_failed: %r' % (failure,))
failure.trap(defer.FirstError)
return failure.value.subFailure
+
def start_master_clock(_):
self.debug('** 2: start_master_clock: %r (%r)' % (_, self._master))
if self._master is not None:
self.debug('About to ask to provide_master_clock()...')
d = self._master.comp.provide_master_clock(7600 - 1) # ...hack?
- def _jd(_):
- self.debug('After provide_master_clock() : %r' % (_,))
- d.addCallback(_jd)
+ def _passthrough_debug(_res):
+ self.debug('After provide_master_clock() : %r' % (_res,))
+ return _res
+ d.addCallback(_passthrough_debug)
return d
return None
+
def add_delay(value):
- self.debug('** 3: add_delay: %r' % value)
+ self.debug('** 3: add_delay: %r' % (value,))
if delay:
- return DeferredDelay(delay, value)
- return defer.succeed(delay)
+ return delayed_d(delay, value)
+ return defer.succeed(value)
+
def do_start(clocking, c):
self.debug('** 4: do_start_cb: %r, %r' % (clocking, c))
for src in c.cfg['source']:
r_fd, feed_starter = self._fds[src]
c.eatFromFD(src, r_fd)
feed_starter()
+ comp_clocking = clocking
if not c.sync:
- clocking = None
- d = c.start(clocking)
- d.addCallback(lambda _: clocking)
+ comp_clocking = None
+ self.debug('*_ 4: do_start_cb: %r, %r' % (comp_clocking, c))
+ d = c.start(comp_clocking)
+
+ # if component starts ok, repeat/pass clocking info to a
+ # subsequent component
+ d.addCallback(override_value_callback, clocking)
return d
+
def do_stop(failure):
self.debug('** X: do_stop: %r' % failure)
rcomps = self._comps[:]
Modified: flumotion/trunk/flumotion/test/test_pond.py
==============================================================================
--- flumotion/trunk/flumotion/test/test_pond.py (original)
+++ flumotion/trunk/flumotion/test/test_pond.py Mon Jul 23 14:23:51 2007
@@ -111,7 +111,6 @@
def test_instantiate_and_setup_errors(self):
pp = pond.ComponentWrapper('pipeline-producer', None, name='pp')
self.failUnlessRaises(TypeError, pp.instantiate) # None()!?
- from flumotion.component.producers.pipeline import pipeline
pp = pond.ComponentWrapper('pipeline-producer', Producer, name='pp')
pp.instantiate()
@@ -119,12 +118,11 @@
# the deferred should fail (no mandatory pipeline property) -
# stop the component in any case (to clean the reactor) and
- # passtrough the result/failure
- d.addBoth(lambda rf: (pp.stop(), rf)[1])
+ # passthrough the result/failure
+ d.addBoth(pond.call_and_passthru_callback, pp.stop)
return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
def test_setup_pipeline_error(self):
- from flumotion.component.producers.pipeline import pipeline
pp = pond.ComponentWrapper('pipeline-producer', Producer,
name='pp', props={'pipeline': 'fakesink'})
@@ -138,9 +136,9 @@
# the deferred should fail (the only pipeline element doesn't
# have source pads) - stop the component in any case (to clean
- # the reactor) and passtrough the result/failure
+ # the reactor) and passthrough the result/failure
+ d.addBoth(pond.call_and_passthru_callback, pp.stop)
- d.addBoth(lambda rf: (pp.stop(), rf)[1])
if old_debug_level != gst.LEVEL_NONE:
def _restore_gst_debug_level(rf):
gst.debug_set_default_threshold(old_debug_level)
@@ -149,7 +147,6 @@
return self.failUnlessFailure(d, errors.ComponentSetupHandledError)
def test_setup_and_stop(self):
- from flumotion.component.producers.pipeline import pipeline
pp = pond.ComponentWrapper('pipeline-producer', Producer,
name='pp', props={'pipeline': 'fakesrc'})
@@ -323,6 +320,48 @@
d = self.p.run_flow(self.duration)
return self.failUnlessFailure(d, pond.StartTimeout)
+ def test_run_with_delays(self):
+ self.p.start_delay = 0.5
+
+ self.p.set_flow([self.prod, self.cnv1, self.cnv2])
+ d = self.p.run_flow(self.duration)
+ return d
+
+ def test_run_provides_clocking(self):
+ p2_pp = ('videotestsrc is-live=true ! '
+ 'video/x-raw-rgb,framerate=(fraction)8/1,'
+ 'width=32,height=24')
+ p2 = pond.pipeline_src(p2_pp)
+
+ from flumotion.component.muxers.multipart import Multipart
+ mux = pond.ComponentWrapper('multipart-muxer', Multipart, name='mux')
+
+ self.prod.feed(mux)
+ p2.feed(mux)
+ mux.feed(self.cnv1)
+
+ self.clock_slave = p2
+ def check_clocking(_):
+ self.warning('check_clocking: %s %r' %
+ (self.clock_slave.name,
+ self.clock_slave.comp.pipeline.get_clock()))
+ import gst
+ pp = self.clock_slave.comp.pipeline
+ # is there a better way to check if that component is
+ # using an external clock source?
+ self.failUnless(isinstance(pp.get_clock(), gst.NetClientClock),
+ "Didn't receive external clocking info.")
+ return _
+ task_d = defer.Deferred()
+ task_d.addCallback(check_clocking)
+
+ self.p.set_flow([self.prod, p2, mux, self.cnv1], auto_link=False)
+ if self.prod is not self.p._master:
+ # p2 (of [self.prod, p2]) seems to be the master this time
+ self.clock_slave = self.prod
+ d = self.p.run_flow(self.duration, tasks=[task_d])
+ return d
+
def test_run_tasks_chained_and_fired(self):
self.tasks_fired = []
self.tasks = []
@@ -346,34 +385,6 @@
return d
- def test_run_started_happy_and_running_and_stopping(self):
- self.time_tolerance = 0.5 # should suffice, no?
- self.timer = 0.0
-
- self.p.set_flow([self.prod, self.cnv1, self.cnv2])
-
- def check_happy(_):
- for c in (self.prod, self.cnv1, self.cnv2):
- self.assertEquals(moods.get(c.comp.getMood()), moods.happy)
- return _
- def timer_start(result):
- self.timer = time.time()
- return result
- def timer_check(result):
- time_difference = abs(time.time() - self.timer - self.duration)
- self.failUnless(time_difference < self.time_tolerance,
- "Time difference too big: %r" % time_difference)
- return result
-
- task_d = defer.Deferred()
- task_d.addCallback(check_happy)
- task_d.addCallback(timer_start)
-
- d = self.p.run_flow(self.duration, tasks=[task_d])
- d.addCallback(timer_check)
-
- return d
-
def test_run_tasks_timeout(self):
self.p.set_flow([self.prod, self.cnv1, self.cnv2])
self.p.guard_timeout = 4.0
More information about the flumotion-commit
mailing list