msmith - in flumotion/trunk: . flumotion/component
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Tue Jul 24 12:19:23 CEST 2007
Author: msmith
Date: Tue Jul 24 12:19:17 2007
New Revision: 5345
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/component/feedcomponent010.py
Log:
* flumotion/component/feedcomponent010.py:
Generalise the eater probes/data flow checks as a PadMonitor class,
so that other components can use it on other parts of a pipeline.
Partially (not yet fully) move the eater checks over to using it.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Tue Jul 24 12:19:17 2007
@@ -1,3 +1,10 @@
+2007-07-24 Michael Smith <msmith at fluendo.com>
+
+ * flumotion/component/feedcomponent010.py:
+ Generalise the eater probes/data flow checks as a PadMonitor class,
+ so that other components can use it on other parts of a pipeline.
+ Partially (not yet fully) move the eater checks over to using it.
+
2007-07-23 Andy Wingo <wingo at pobox.com>
* flumotion/common/debug.py (AllocMonitor): New object, uses
Modified: flumotion/trunk/flumotion/component/feedcomponent010.py
==============================================================================
--- flumotion/trunk/flumotion/component/feedcomponent010.py (original)
+++ flumotion/trunk/flumotion/component/feedcomponent010.py Tue Jul 24 12:19:17 2007
@@ -28,7 +28,7 @@
from twisted.internet import reactor, defer
from flumotion.component import component as basecomponent
-from flumotion.common import common, errors, pygobject, messages
+from flumotion.common import common, errors, pygobject, messages, log
from flumotion.common import gstreamer, componentui
from flumotion.component import feed
@@ -348,6 +348,85 @@
uiState.set('totalOffsetDiscont',
uiState.get('totalOffsetDiscont', 0) + units)
+class PadMonitor(log.Loggable):
+ PAD_MONITOR_PROBE_FREQUENCY = 5.0
+ PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5
+
+ def __init__(self, component, pad, name):
+ self._last_data_time = 0
+ self._component = component
+ self._pad = pad
+ self._name = name
+ self._active = True
+
+ # Note: be very careful touching anything here: most of these functions
+ # are called from multiple threads
+ self._add_probe_dc = None
+ self._probe_id = None
+
+ self._add_flow_probe()
+
+ self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_PROBE_TIMEOUT,
+ self._check_flow_timeout)
+
+ def isActive(self):
+ return self._active
+
+ def detach(self):
+ # TODO: This doesn't look very threadsafe
+ if self._probe_id:
+ self._pad.remove_buffer_probe(self._probe_id)
+ self._probe_id = None
+ if self._add_probe_dc:
+ self._add_probe_dc.cancel()
+ self._add_probe_dc = None
+
+ if self._check_flow_dc:
+ self._check_flow_dc.cancel()
+ self._check_flow_dc = None
+
+ def _add_flow_probe(self):
+ self._probe_id = self._pad.add_buffer_probe(self._probe_cb)
+ self._add_probe_dc = None
+
+ def _add_flow_probe_later(self):
+ self._add_probe_dc = reactor.callLater(self.PAD_MONITOR_PROBE_FREQUENCY,
+ self._add_flow_probe)
+
+ def _flow_watch_probe_cb(self, pad, buffer):
+ self._last_data_time = time.time()
+
+ pad.remove_buffer_probe(self._probe_id)
+ self._probe_id = None
+
+ reactor.callFromThread(self._add_flow_probe_later, pad)
+
+ # Data received! Return to happy ASAP:
+ self._check_flow_dc.cancel()
+ self._check_flow_timeout()
+
+ return True
+
+ def _check_flow_timeout(self):
+ now = time.time()
+
+ if self._last_data_time > 0:
+ delta = now - self._last_data_time
+
+ if self._active and delta > self.PAD_MONITOR_TIMEOUT:
+ self.info("No data received on pad for > %r seconds, setting "
+ "to hungry", self.PAD_MONITOR_TIMEOUT)
+
+ self._component.setFlowInactive(self._name)
+ self._active = False
+ elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
+ self.info("Receiving data again, flow active")
+ self._component.setFlowActive(self._name)
+ self._active = True
+
+ self._check_flow_dc = reactor.callLater(self.PAD_MONITOR_TIMEOUT,
+ self._check_flow_timeout)
+
class FeedComponent(basecomponent.BaseComponent):
"""
I am a base class for all Flumotion feed components.
@@ -397,6 +476,9 @@
self._probe_ids = {} # eater name -> probe handler id
self._feeder_probe_cl = None
+ self._pad_monitors = {}
+ self._pad_monitors_inactive = 0
+
self.clock_provider = None
self.eater_names = [] # componentName:feedName list
@@ -511,35 +593,70 @@
self.cleanup()
self.pipeline = pipeline
self.setup_pipeline()
-
+
+ def attachPadMonitor(self, pad, name):
+ """
+ Watch for data flow through this pad periodically.
+ If data flow ceases for too long, we turn hungry. If data flow resumes,
+ we return to happy.
+ """
+ self._pad_monitors[name] = PadMonitor(self, pad, name)
+ self._pad_monitors_inactive += 1
+ self.info("Added pad monitor %s", name)
+
+ def removePadMonitor(self, name):
+ if name not in self._pad_monitors:
+ self.warning("No pad monitor with name %s", name)
+ return
+
+ self._pad_monitors[name].detach()
+
+ if not self._pad_monitors[name].isActive():
+ self._pad_monitors_inactive -= 1
+ del self._pad_monitors[name]
+
+ def setPadMonitorActive(self, name):
+ """
+ Inform the component that data flow through the given pad monitor is
+ currently working.
+ """
+ self.info('Pad data flow at %s is active', name)
+ self._pad_monitors_inactive -= 1
+
+ if self._pad_monitors_inactive == 0 and self._inactivated:
+ # We never go happy initially because of this; only if we went
+ # hungry because of an eater being inactive.
+ self.setMood(moods.happy)
+ self._inactivated = False
+
+ def setPadMonitorInactive(self, name):
+ """
+ Inform the component that data flow through the given pad monitor has
+ ceased.
+ """
+ self.info('Pad data flow at %s is inactive', name)
+ self._pad_monitors_inactive += 1
+
+ self.setMood(moods.hungry)
+ self._inactivated = True
+
def eaterSetInactive(self, feedId):
"""
The eater for the given feedId is no longer active
By default, the component will go hungry.
"""
- self.info('Eater of %s is inactive' % feedId)
- if feedId in self._inactiveEaters:
- self.warning('Eater of %s was already inactive' % feedId)
- else:
- self._inactiveEaters.append(feedId)
- self.setMood(moods.hungry)
- self._inactivated = True
+ self.setPadMonitorInactive(feedId)
+ self._inactiveEaters.append(feedId)
def eaterSetActive(self, feedId):
"""
The eater for the given feedId is now active and producing data.
By default, the component will go happy if all eaters are active.
"""
- self.info('Eater of %s is active' % feedId)
- if feedId not in self._inactiveEaters:
- self.warning('Eater of %s was already active' % feedId)
- else:
+ self.setPadMonitorActive(feedId)
+ if feedId in self._inactiveEaters:
self._inactiveEaters.remove(feedId)
- if not self._inactiveEaters and self._inactivated:
- # We never go happy initially because of this; only if we went
- # hungry because of an eater being inactive.
- self.setMood(moods.happy)
- self._inactivated = False
+
# FIXME: it may make sense to have an updateMood method, that can be used
# by the two previous methods, but also in other places, and then
# overridden. That would make us have to publicize inactiveEaters
@@ -813,6 +930,7 @@
# start checking eaters
for feedId in self.eater_names:
status = self._eaterStatus[feedId]
+ self._pad_monitors_inactive += 1
status['checkEaterDC'] = reactor.callLater(
self.BUFFER_CHECK_FREQUENCY, self._checkEater, feedId)
More information about the flumotion-commit
mailing list