msmith - in flumotion/trunk: . flumotion/component

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Tue Jul 24 12:19:23 CEST 2007


Author: msmith
Date: Tue Jul 24 12:19:17 2007
New Revision: 5345

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/component/feedcomponent010.py
Log:
        * flumotion/component/feedcomponent010.py:
          Generalise the eater probes/data flow checks as a PadMonitor class,
          so that other components can use it on other parts of a pipeline.
          Partially (not yet fully) move the eater checks over to using it.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Tue Jul 24 12:19:17 2007
@@ -1,3 +1,10 @@
+2007-07-24  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/component/feedcomponent010.py:
+	  Generalise the eater probes/data flow checks as a PadMonitor class,
+	  so that other components can use it on other parts of a pipeline.
+	  Partially (not yet fully) move the eater checks over to using it.
+
 2007-07-23  Andy Wingo  <wingo at pobox.com>
 
 	* flumotion/common/debug.py (AllocMonitor): New object, uses

Modified: flumotion/trunk/flumotion/component/feedcomponent010.py
==============================================================================
--- flumotion/trunk/flumotion/component/feedcomponent010.py	(original)
+++ flumotion/trunk/flumotion/component/feedcomponent010.py	Tue Jul 24 12:19:17 2007
@@ -28,7 +28,7 @@
 from twisted.internet import reactor, defer
 
 from flumotion.component import component as basecomponent
-from flumotion.common import common, errors, pygobject, messages
+from flumotion.common import common, errors, pygobject, messages, log
 from flumotion.common import gstreamer, componentui
 from flumotion.component import feed
 
@@ -348,6 +348,85 @@
         uiState.set('totalOffsetDiscont',
             uiState.get('totalOffsetDiscont', 0) + units)
 
+class PadMonitor(log.Loggable):
+    PAD_MONITOR_PROBE_FREQUENCY = 5.0
+    PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5
+
+    def __init__(self, component, pad, name):
+        self._last_data_time = 0
+        self._component = component
+        self._pad = pad
+        self._name = name
+        self._active = True
+
+        # Note: be very careful touching anything here: most of these functions
+        # are called from multiple threads
+        self._add_probe_dc = None
+        self._probe_id = None
+
+        self._add_flow_probe()
+
+        self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_PROBE_TIMEOUT,
+            self._check_flow_timeout)
+
+    def isActive(self):
+        return self._active
+
+    def detach(self):
+        # TODO: This doesn't look very threadsafe
+        if self._probe_id:
+            self._pad.remove_buffer_probe(self._probe_id)
+            self._probe_id = None
+        if self._add_probe_dc:
+            self._add_probe_dc.cancel()
+            self._add_probe_dc = None
+
+        if self._check_flow_dc:
+            self._check_flow_dc.cancel()
+            self._check_flow_dc = None
+        
+    def _add_flow_probe(self):
+        self._probe_id = self._pad.add_buffer_probe(self._probe_cb)
+        self._add_probe_dc = None
+
+    def _add_flow_probe_later(self):
+        self._add_probe_dc = reactor.callLater(self.PAD_MONITOR_PROBE_FREQUENCY,
+            self._add_flow_probe)
+
+    def _flow_watch_probe_cb(self, pad, buffer):
+        self._last_data_time = time.time()
+
+        pad.remove_buffer_probe(self._probe_id)
+        self._probe_id = None
+
+        reactor.callFromThread(self._add_flow_probe_later, pad)
+
+        # Data received! Return to happy ASAP:
+        self._check_flow_dc.cancel()
+        self._check_flow_timeout()
+
+        return True
+
+    def _check_flow_timeout(self):
+        now = time.time()
+
+        if self._last_data_time > 0:
+            delta = now - self._last_data_time
+
+            if self._active and delta > self.PAD_MONITOR_TIMEOUT:
+                self.info("No data received on pad for > %r seconds, setting "
+                    "to hungry", self.PAD_MONITOR_TIMEOUT)
+
+                self._component.setFlowInactive(self._name)
+                self._active = False
+            elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
+                self.info("Receiving data again, flow active")
+                self._component.setFlowActive(self._name)
+                self._active = True
+
+        self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_TIMEOUT,
+            self._check_flow_timeout)
+
 class FeedComponent(basecomponent.BaseComponent):
     """
     I am a base class for all Flumotion feed components.
@@ -397,6 +476,9 @@
         self._probe_ids = {} # eater name -> probe handler id
         self._feeder_probe_cl = None
 
+        self._pad_monitors = {}
+        self._pad_monitors_inactive = 0
+
         self.clock_provider = None
 
         self.eater_names = [] # componentName:feedName list
@@ -511,35 +593,70 @@
             self.cleanup()
         self.pipeline = pipeline
         self.setup_pipeline()
- 
+
+    def attachPadMonitor(self, pad, name):
+        """
+        Watch for data flow through this pad periodically.
+        If data flow ceases for too long, we turn hungry. If data flow resumes,
+        we return to happy.
+        """
+        self._pad_monitors[name] = PadMonitor(self, pad, name)
+        self._pad_monitors_inactive += 1
+        self.info("Added pad monitor %s", name)
+
+    def removePadMonitor(self, name):
+        if name not in self._pad_monitors:
+            self.warning("No pad monitor with name %s", name)
+            return
+
+        self._pad_monitors[name].detach()
+
+        if not self._pad_monitors[name].isActive():
+            self._pad_monitors_inactive -= 1
+        del self._pad_monitors[name]
+
+    def setPadMonitorActive(self, name):
+        """
+        Inform the component that data flow through the given pad monitor is
+        currently working. 
+        """
+        self.info('Pad data flow at %s is active', name)
+        self._pad_monitors_inactive -= 1
+
+        if self._pad_monitors_inactive == 0 and self._inactivated:
+            # We never go happy initially because of this; only if we went
+            # hungry because of an eater being inactive.
+            self.setMood(moods.happy)
+            self._inactivated = False
+
+    def setPadMonitorInactive(self, name):
+        """
+        Inform the component that data flow through the given pad monitor has
+        ceased.
+        """
+        self.info('Pad data flow at %s is inactive', name)
+        self._pad_monitors_inactive += 1
+
+        self.setMood(moods.hungry)
+        self._inactivated = True
+
     def eaterSetInactive(self, feedId):
         """
         The eater for the given feedId is no longer active
         By default, the component will go hungry.
         """
-        self.info('Eater of %s is inactive' % feedId)
-        if feedId in self._inactiveEaters:
-            self.warning('Eater of %s was already inactive' % feedId)
-        else:
-            self._inactiveEaters.append(feedId)
-        self.setMood(moods.hungry)
-        self._inactivated = True
+        self.setPadMonitorInactive(feedId)
+        self._inactiveEaters.append(feedId)
 
     def eaterSetActive(self, feedId):
         """
         The eater for the given feedId is now active and producing data.
         By default, the component will go happy if all eaters are active.
         """
-        self.info('Eater of %s is active' % feedId)
-        if feedId not in self._inactiveEaters:
-            self.warning('Eater of %s was already active' % feedId)
-        else:
+        self.setPadMonitorActive(feedId)
+        if feedId in self._inactiveEaters:
             self._inactiveEaters.remove(feedId)
-        if not self._inactiveEaters and self._inactivated:
-            # We never go happy initially because of this; only if we went
-            # hungry because of an eater being inactive.
-            self.setMood(moods.happy)
-            self._inactivated = False
+
     # FIXME: it may make sense to have an updateMood method, that can be used
     # by the two previous methods, but also in other places, and then
     # overridden.  That would make us have to publicize inactiveEaters
@@ -813,6 +930,7 @@
         # start checking eaters
         for feedId in self.eater_names:
             status = self._eaterStatus[feedId]
+            self._pad_monitors_inactive += 1
             status['checkEaterDC'] = reactor.callLater(
                 self.BUFFER_CHECK_FREQUENCY, self._checkEater, feedId)
 


More information about the flumotion-commit mailing list