msmith - in flumotion/trunk: . flumotion/component flumotion/test

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Tue Jul 24 13:21:48 CEST 2007


Author: msmith
Date: Tue Jul 24 13:21:43 2007
New Revision: 5348

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/component/feedcomponent010.py
   flumotion/trunk/flumotion/test/test_feedcomponent010.py
Log:
        * flumotion/component/feedcomponent010.py:
          Some fixes to PadMonitor code. Including making it threadsafe with
          nasty trickery.
        * flumotion/test/test_feedcomponent010.py:
          Unit tests for PadMonitors.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Tue Jul 24 13:21:43 2007
@@ -1,3 +1,11 @@
+2007-07-24  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/component/feedcomponent010.py:
+	  Some fixes to PadMonitor code. Including making it threadsafe with
+	  nasty trickery.
+	* flumotion/test/test_feedcomponent010.py:
+	  Unit tests for PadMonitors.
+
 2007-07-24  Andy Wingo  <wingo at pobox.com>
 
 	* flumotion/component/feed.py (FeedMedium.stopConnecting): Drop

Modified: flumotion/trunk/flumotion/component/feedcomponent010.py
==============================================================================
--- flumotion/trunk/flumotion/component/feedcomponent010.py	(original)
+++ flumotion/trunk/flumotion/component/feedcomponent010.py	Tue Jul 24 13:21:43 2007
@@ -357,26 +357,26 @@
         self._component = component
         self._pad = pad
         self._name = name
-        self._active = True
+        self._active = False
 
-        # Note: be very careful touching anything here: most of these functions
-        # are called from multiple threads
+        # This dict sillyness is because python's dict operations are atomic
+        # w.r.t. the GIL.
+        self._probe_id = {}
         self._add_probe_dc = None
-        self._probe_id = None
 
         self._add_flow_probe()
 
-        self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_PROBE_TIMEOUT,
+        self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_TIMEOUT,
             self._check_flow_timeout)
 
     def isActive(self):
         return self._active
 
     def detach(self):
-        # TODO: This doesn't look very threadsafe
-        if self._probe_id:
-            self._pad.remove_buffer_probe(self._probe_id)
-            self._probe_id = None
+        probe_id = self._probe_id.pop("id", None)
+        if probe_id:
+            self._pad.remove_buffer_probe(probe_id)
+
         if self._add_probe_dc:
             self._add_probe_dc.cancel()
             self._add_probe_dc = None
@@ -386,7 +386,8 @@
             self._check_flow_dc = None
         
     def _add_flow_probe(self):
-        self._probe_id = self._pad.add_buffer_probe(self._probe_cb)
+        self._probe_id['id'] = self._pad.add_buffer_probe(
+            self._flow_watch_probe_cb)
         self._add_probe_dc = None
 
     def _add_flow_probe_later(self):
@@ -396,14 +397,15 @@
     def _flow_watch_probe_cb(self, pad, buffer):
         self._last_data_time = time.time()
 
-        pad.remove_buffer_probe(self._probe_id)
-        self._probe_id = None
+        id = self._probe_id.pop("id", None)
+        if id:
+            # This will be None only if detach() has been called.
+            self._pad.remove_buffer_probe(id)
 
-        reactor.callFromThread(self._add_flow_probe_later, pad)
+            reactor.callFromThread(self._add_flow_probe_later)
 
-        # Data received! Return to happy ASAP:
-        self._check_flow_dc.cancel()
-        reactor.callFromThread(self._check_flow_timeout_now)
+            # Data received! Return to happy ASAP:
+            reactor.callFromThread(self._check_flow_timeout_now)
 
         return True
 
@@ -412,6 +414,8 @@
         self._check_flow_timeout()
         
     def _check_flow_timeout(self):
+        self._check_flow_dc = None
+
         now = time.time()
 
         if self._last_data_time > 0:
@@ -421,11 +425,11 @@
                 self.info("No data received on pad for > %r seconds, setting "
                     "to hungry", self.PAD_MONITOR_TIMEOUT)
 
-                self._component.setFlowInactive(self._name)
+                self._component.setPadMonitorInactive(self._name)
                 self._active = False
             elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
                 self.info("Receiving data again, flow active")
-                self._component.setFlowActive(self._name)
+                self._component.setPadMonitorActive(self._name)
                 self._active = True
 
         self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_TIMEOUT,

Modified: flumotion/trunk/flumotion/test/test_feedcomponent010.py
==============================================================================
--- flumotion/trunk/flumotion/test/test_feedcomponent010.py	(original)
+++ flumotion/trunk/flumotion/test/test_feedcomponent010.py	Tue Jul 24 13:21:43 2007
@@ -20,6 +20,11 @@
 # Headers in this file shall remain intact.
 
 import time
+import gobject
+gobject.threads_init()
+import pygst
+pygst.require('0.10')
+import gst
 
 from twisted.trial import unittest
 
@@ -88,5 +93,77 @@
         self.clientAssertEquals(client, 'buffersDroppedTotal', bdt)
         self.clientAssertEquals(client, 'reconnects', reconnects)
 
+class FakeComponent(object):
+    def setPadMonitorActive(self, name):
+        pass
+
+    def setPadMonitorInactive(self, name):
+        pass
+
+class TestPadMonitor(unittest.TestCase):
+
+    def _run_pipeline(self, pipeline):
+        pipeline.set_state(gst.STATE_PLAYING)
+        pipeline.get_bus().poll(gst.MESSAGE_EOS, -1)
+        pipeline.set_state(gst.STATE_NULL)
+        
+    def testPadMonitorActivation(self):
+        component = FakeComponent()
+        pipeline = gst.parse_launch(
+            'fakesrc num-buffers=1 ! identity name=id ! fakesink')
+        identity = pipeline.get_by_name('id')
+
+        srcpad = identity.get_pad('src')
+        monitor = fc.PadMonitor(component, srcpad, 
+            "identity-source")
+        self.assertEquals(monitor.isActive(), False)
+
+        self._run_pipeline(pipeline)
+        # Now give the reactor a chance to process the callFromThread()
+        d = defer.Deferred()
+        def finishTest():
+            self.assertEquals(monitor.isActive(), True)
+            monitor.detach()
+            d.callback(True)
+        reactor.callLater(0.1, finishTest)
+
+        return d
+
+    def testPadMonitorTimeout(self):
+        fc.PadMonitor.PAD_MONITOR_PROBE_FREQUENCY = 0.2
+        fc.PadMonitor.PAD_MONITOR_TIMEOUT = 0.5
+
+        component = FakeComponent()
+        pipeline = gst.parse_launch(
+            'fakesrc num-buffers=1 ! identity name=id ! fakesink')
+        identity = pipeline.get_by_name('id')
+
+        srcpad = identity.get_pad('src')
+        monitor = fc.PadMonitor(component, srcpad, 
+            "identity-source")
+        self.assertEquals(monitor.isActive(), False)
+
+        self._run_pipeline(pipeline)
+        # Now give the reactor a chance to process the callFromThread()
+        d = defer.Deferred()
+        def finished():
+            monitor.detach()
+            d.callback(True)
+
+        def hasInactivated(name):
+            # We can't detach the monitor from this callback safely, so do
+            # it from a reactor.callLater()
+            reactor.callLater(0, finished)
+            
+        def hasActivated():
+            self.assertEquals(monitor.isActive(), True)
+            # Now, we don't send any more data, and after our 0.5 second timeout
+            # we should go inactive. Pass our test if that happens. Otherwise
+            # trial will time out.
+            component.setPadMonitorInactive = hasInactivated
+        reactor.callLater(0.1, hasActivated)
+
+        return d
+
 if __name__ == '__main__':
     unittest.main()


More information about the flumotion-commit mailing list