msmith - in flumotion/trunk: . flumotion/component flumotion/test
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Tue Jul 24 13:21:48 CEST 2007
Author: msmith
Date: Tue Jul 24 13:21:43 2007
New Revision: 5348
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/component/feedcomponent010.py
flumotion/trunk/flumotion/test/test_feedcomponent010.py
Log:
* flumotion/component/feedcomponent010.py:
Some fixes to PadMonitor code. Including making it threadsafe with
nasty trickery.
* flumotion/test/test_feedcomponent010.py:
Unit tests for PadMonitors.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Tue Jul 24 13:21:43 2007
@@ -1,3 +1,11 @@
+2007-07-24 Michael Smith <msmith at fluendo.com>
+
+ * flumotion/component/feedcomponent010.py:
+ Some fixes to PadMonitor code. Including making it threadsafe with
+ nasty trickery.
+ * flumotion/test/test_feedcomponent010.py:
+ Unit tests for PadMonitors.
+
2007-07-24 Andy Wingo <wingo at pobox.com>
* flumotion/component/feed.py (FeedMedium.stopConnecting): Drop
Modified: flumotion/trunk/flumotion/component/feedcomponent010.py
==============================================================================
--- flumotion/trunk/flumotion/component/feedcomponent010.py (original)
+++ flumotion/trunk/flumotion/component/feedcomponent010.py Tue Jul 24 13:21:43 2007
@@ -357,26 +357,26 @@
self._component = component
self._pad = pad
self._name = name
- self._active = True
+ self._active = False
- # Note: be very careful touching anything here: most of these functions
- # are called from multiple threads
+ # This dict sillyness is because python's dict operations are atomic
+ # w.r.t. the GIL.
+ self._probe_id = {}
self._add_probe_dc = None
- self._probe_id = None
self._add_flow_probe()
- self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_PROBE_TIMEOUT,
+ self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_TIMEOUT,
self._check_flow_timeout)
def isActive(self):
return self._active
def detach(self):
- # TODO: This doesn't look very threadsafe
- if self._probe_id:
- self._pad.remove_buffer_probe(self._probe_id)
- self._probe_id = None
+ probe_id = self._probe_id.pop("id", None)
+ if probe_id:
+ self._pad.remove_buffer_probe(probe_id)
+
if self._add_probe_dc:
self._add_probe_dc.cancel()
self._add_probe_dc = None
@@ -386,7 +386,8 @@
self._check_flow_dc = None
def _add_flow_probe(self):
- self._probe_id = self._pad.add_buffer_probe(self._probe_cb)
+ self._probe_id['id'] = self._pad.add_buffer_probe(
+ self._flow_watch_probe_cb)
self._add_probe_dc = None
def _add_flow_probe_later(self):
@@ -396,14 +397,15 @@
def _flow_watch_probe_cb(self, pad, buffer):
self._last_data_time = time.time()
- pad.remove_buffer_probe(self._probe_id)
- self._probe_id = None
+ id = self._probe_id.pop("id", None)
+ if id:
+ # This will be None only if detach() has been called.
+ self._pad.remove_buffer_probe(id)
- reactor.callFromThread(self._add_flow_probe_later, pad)
+ reactor.callFromThread(self._add_flow_probe_later)
- # Data received! Return to happy ASAP:
- self._check_flow_dc.cancel()
- reactor.callFromThread(self._check_flow_timeout_now)
+ # Data received! Return to happy ASAP:
+ reactor.callFromThread(self._check_flow_timeout_now)
return True
@@ -412,6 +414,8 @@
self._check_flow_timeout()
def _check_flow_timeout(self):
+ self._check_flow_dc = None
+
now = time.time()
if self._last_data_time > 0:
@@ -421,11 +425,11 @@
self.info("No data received on pad for > %r seconds, setting "
"to hungry", self.PAD_MONITOR_TIMEOUT)
- self._component.setFlowInactive(self._name)
+ self._component.setPadMonitorInactive(self._name)
self._active = False
elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
self.info("Receiving data again, flow active")
- self._component.setFlowActive(self._name)
+ self._component.setPadMonitorActive(self._name)
self._active = True
self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_TIMEOUT,
Modified: flumotion/trunk/flumotion/test/test_feedcomponent010.py
==============================================================================
--- flumotion/trunk/flumotion/test/test_feedcomponent010.py (original)
+++ flumotion/trunk/flumotion/test/test_feedcomponent010.py Tue Jul 24 13:21:43 2007
@@ -20,6 +20,11 @@
# Headers in this file shall remain intact.
import time
+import gobject
+gobject.threads_init()
+import pygst
+pygst.require('0.10')
+import gst
from twisted.trial import unittest
@@ -88,5 +93,77 @@
self.clientAssertEquals(client, 'buffersDroppedTotal', bdt)
self.clientAssertEquals(client, 'reconnects', reconnects)
+class FakeComponent(object):
+ def setPadMonitorActive(self, name):
+ pass
+
+ def setPadMonitorInactive(self, name):
+ pass
+
+class TestPadMonitor(unittest.TestCase):
+
+ def _run_pipeline(self, pipeline):
+ pipeline.set_state(gst.STATE_PLAYING)
+ pipeline.get_bus().poll(gst.MESSAGE_EOS, -1)
+ pipeline.set_state(gst.STATE_NULL)
+
+ def testPadMonitorActivation(self):
+ component = FakeComponent()
+ pipeline = gst.parse_launch(
+ 'fakesrc num-buffers=1 ! identity name=id ! fakesink')
+ identity = pipeline.get_by_name('id')
+
+ srcpad = identity.get_pad('src')
+ monitor = fc.PadMonitor(component, srcpad,
+ "identity-source")
+ self.assertEquals(monitor.isActive(), False)
+
+ self._run_pipeline(pipeline)
+ # Now give the reactor a chance to process the callFromThread()
+ d = defer.Deferred()
+ def finishTest():
+ self.assertEquals(monitor.isActive(), True)
+ monitor.detach()
+ d.callback(True)
+ reactor.callLater(0.1, finishTest)
+
+ return d
+
+ def testPadMonitorTimeout(self):
+ fc.PadMonitor.PAD_MONITOR_PROBE_FREQUENCY = 0.2
+ fc.PadMonitor.PAD_MONITOR_TIMEOUT = 0.5
+
+ component = FakeComponent()
+ pipeline = gst.parse_launch(
+ 'fakesrc num-buffers=1 ! identity name=id ! fakesink')
+ identity = pipeline.get_by_name('id')
+
+ srcpad = identity.get_pad('src')
+ monitor = fc.PadMonitor(component, srcpad,
+ "identity-source")
+ self.assertEquals(monitor.isActive(), False)
+
+ self._run_pipeline(pipeline)
+ # Now give the reactor a chance to process the callFromThread()
+ d = defer.Deferred()
+ def finished():
+ monitor.detach()
+ d.callback(True)
+
+ def hasInactivated(name):
+ # We can't detach the monitor from this callback safely, so do
+ # it from a reactor.callLater()
+ reactor.callLater(0, finished)
+
+ def hasActivated():
+ self.assertEquals(monitor.isActive(), True)
+ # Now, we don't send any more data, and after our 0.5 second timeout
+ # we should go inactive. Pass our test if that happens. Otherwise
+ # trial will time out.
+ component.setPadMonitorInactive = hasInactivated
+ reactor.callLater(0.1, hasActivated)
+
+ return d
+
if __name__ == '__main__':
unittest.main()
More information about the flumotion-commit
mailing list