wingo - in flumotion/trunk: . conf/managers/default/flows conf/rrdmon flumotion/admin/gtk flumotion/admin/rrdmon flumotion/component/base

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Mon Jul 2 18:30:59 CEST 2007


Author: wingo
Date: Mon Jul  2 18:30:54 2007
New Revision: 5282

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/conf/managers/default/flows/ogg-test-theora.xml
   flumotion/trunk/conf/rrdmon/default.xml
   flumotion/trunk/flumotion/admin/gtk/client.py
   flumotion/trunk/flumotion/admin/rrdmon/config.py
   flumotion/trunk/flumotion/admin/rrdmon/rrdmon.py
   flumotion/trunk/flumotion/component/base/scheduler.py
Log:
2007-07-02  Andy Wingo  <wingo at pobox.com>

	* flumotion/admin/gtk/client.py (Window._clearMessages): Fix bug
	when the manager goes away.

	* flumotion/component/base/scheduler.py (Event.__init__): Add some
	assertions, and the ability to specify recur as a timedelta.
	(Scheduler.addEvent): Document recur as timedelta.

	* flumotion/admin/rrdmon/config.py (ConfigParser._parseSource):
	Require absolute filenames for <rrd-file>. Add <component-id>,
	also required. Add docs.

	* flumotion/admin/rrdmon/rrdmon.py: Actually implement. Add docs.
	Fixes #626.

	* conf/managers/default/flows/ogg-test-theora.xml: Fix up for The
	Great Renaming.

	* conf/rrdmon/default.xml: Fix up; require <rrd-file>.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Mon Jul  2 18:30:54 2007
@@ -1,3 +1,24 @@
+2007-07-02  Andy Wingo  <wingo at pobox.com>
+
+	* flumotion/admin/gtk/client.py (Window._clearMessages): Fix bug
+	when the manager goes away.
+
+	* flumotion/component/base/scheduler.py (Event.__init__): Add some
+	assertions, and the ability to specify recur as a timedelta.
+	(Scheduler.addEvent): Document recur as timedelta.
+
+	* flumotion/admin/rrdmon/config.py (ConfigParser._parseSource):
+	Require absolute filenames for <rrd-file>. Add <component-id>,
+	also required. Add docs.
+
+	* flumotion/admin/rrdmon/rrdmon.py: Actually implement. Add docs.
+	Fixes #626.
+
+	* conf/managers/default/flows/ogg-test-theora.xml: Fix up for The
+	Great Renaming.
+
+	* conf/rrdmon/default.xml: Fix up; require <rrd-file>.
+
 2007-07-02  Zaheer Abbas Merali  <zaheerabbas at merali dot org>
 
 	* flumotion/admin/gtk/componentview.py

Modified: flumotion/trunk/conf/managers/default/flows/ogg-test-theora.xml
==============================================================================
--- flumotion/trunk/conf/managers/default/flows/ogg-test-theora.xml	(original)
+++ flumotion/trunk/conf/managers/default/flows/ogg-test-theora.xml	Mon Jul  2 18:30:54 2007
@@ -1,7 +1,7 @@
 <?xml version="1.0" ?>
 <planet>
   <flow name="default">
-    <component name="video-source" type="videotest" worker="localhost">
+    <component name="video-source" type="videotest-producer" worker="localhost">
       <!-- properties -->
       <property name="format">video/x-raw-yuv</property>
       <property name="framerate">50/10</property>
@@ -20,11 +20,9 @@
     <component name="http-video" type="http-streamer" worker="localhost">
       <source>muxer-video</source>
       <!-- properties -->
-      <property name="bandwidth_limit">10</property>
-      <property name="burst_on_connect">True</property>
-      <property name="mount_point">/</property>
+      <property name="burst-on-connect">True</property>
+      <property name="mount-point">/</property>
       <property name="port">8800</property>
-      <property name="user_limit">1024</property>
     </component>
   </flow>
-</planet>
\ No newline at end of file
+</planet>

Modified: flumotion/trunk/conf/rrdmon/default.xml
==============================================================================
--- flumotion/trunk/conf/rrdmon/default.xml	(original)
+++ flumotion/trunk/conf/rrdmon/default.xml	Mon Jul  2 18:30:54 2007
@@ -1,9 +1,11 @@
 <rrdmon>
  <source name="http-streamer">
   <manager>user:test at localhost:7531</manager>
-  <ui-state-key>stream-totalbytes</ui-state-key>
+  <component-id>/default/http-audio-video</component-id>
+  <ui-state-key>stream-totalbytes-raw</ui-state-key>
   <is-gauge>False</is-gauge>
-  <sample-frequency>300</sample-frequency>
+  <sample-frequency>30</sample-frequency>
+  <rrd-file>/tmp/stream-bitrate.rrd</rrd-file>
   <archive>
    <!-- This should be nicer in the future, but the meaning of this is that we
         sample every 1*stepsize=1*300s=5 minutes, for 1200 samples = 5

Modified: flumotion/trunk/flumotion/admin/gtk/client.py
==============================================================================
--- flumotion/trunk/flumotion/admin/gtk/client.py	(original)
+++ flumotion/trunk/flumotion/admin/gtk/client.py	Mon Jul  2 18:30:54 2007
@@ -346,7 +346,7 @@
     def _clearMessages(self):
         self._messages_view.clear()
         pstate = self._planetState
-        if pstate.hasKey('messages'):
+        if pstate and pstate.hasKey('messages'):
             for message in pstate.get('messages').values():
                 self._messages_view.add_message(message)
         

Modified: flumotion/trunk/flumotion/admin/rrdmon/config.py
==============================================================================
--- flumotion/trunk/flumotion/admin/rrdmon/config.py	(original)
+++ flumotion/trunk/flumotion/admin/rrdmon/config.py	Mon Jul  2 18:30:54 2007
@@ -28,6 +28,65 @@
 from flumotion.common.errors import ConfigError
 
 
+"""
+RRD monitor configuration
+
+The format of the configuration file is as follows. *, +, and ? have
+their normal meanings: 0 or more, 1 or more, and 0 or 1, respectively.
+
+<rrdmon>
+
+  <!-- normal -->
+  <debug>*:4</debug> ?
+
+  <!-- implementation note: the name of the source is used as the DS
+       name in the RRD file -->
+  <source name="http-streamer"> +
+
+    <!-- how we connect to the manager; parsed with
+         L{flumotion.common.connection.parsePBConnectionInfo} -->
+    <manager>user:test at localhost:7531</manager>
+
+    <!-- the L{flumotion.common.common.componentId} of the component we
+         will poll --> 
+    <component-id>/default/http-audio-video</component-id>
+
+    <!-- the key of the L{flumotion.common.componentui} UIState that we
+         will poll; should be numeric in value -->
+    <ui-state-key>stream-totalbytes-raw</ui-state-key>
+
+    <!-- boolean; examples of gauge values would be number of users,
+         temperature, signal strength, precomputed bitrate. The most
+         common non-gauge values are bitrate values, where you poll e.g.
+         the number of bytes sent, not the rate itself -->
+    <is-gauge>False</is-gauge> ?
+
+    <!-- sample frequency in seconds, defaults to 5 minutes -->
+    <sample-frequency>300</sample-frequency> ?
+
+    <!-- Normally we generate the RRD DS spec from the answers above,
+         but if you want to you can specify one directly here. The DS
+         name should be the source name -->
+    <rrd-ds-spec>DS-SPEC</rrd-ds-spec> ?
+
+    <!-- file will be created if necessary -->
+    <rrd-file>/tmp/stream-bitrate.rrd</rrd-file>
+
+    <!-- set of archives to store in the rrd file
+    <archive> +
+      <!-- Would be nice to break this down as we did above for the DS
+           spec, but for now you have to specify the RRA specs manually.
+           Bummer dude! In this example, the meaning is that we should
+           archive a sample every 1*stepsize=1*300s=5 minutes, for 1200
+           samples = 5 min*1200=100h.-->
+      <rra-spec>AVERAGE:0.5:1:1200</rra-spec>
+    </archive>
+  </source>
+
+</rrdmon>
+"""
+
+
 class ConfigParser(config.BaseConfigParser):
     """
     RRD monitor configuration file parser.
@@ -38,18 +97,11 @@
     """
     logCategory = 'rrdmon-config'
 
-    def __init__(self, file, rrdBaseDir=None):
+    def __init__(self, file):
         """
         @param file: The path to the config file to parse, or a file object
         @type  file: str or file
-        @param rrdBaseDir: The base directory for resolving filenames, or None to
-                        infer from the path passed
-        @type rrdBaseDir: str or None
         """
-        if rrdBaseDir is not None:
-            self.rrdBaseDir = rrdBaseDir
-        else:
-            self.rrdBaseDir = os.path.dirname(file)
         config.BaseConfigParser.__init__(self, file)
 
     def _parseArchive(self, node):
@@ -86,7 +138,11 @@
             def setter(v):
                 res[k] = v
             return setter
-
+        def filename(v):
+            if v[0] != os.sep:
+                raise config.ConfigError('rrdfile paths should be absolute')
+            return str(v)
+                
         name, = self.parseAttributes(node, ('name',))
 
         res = {'name': name}
@@ -94,11 +150,12 @@
 
         basicOptions = (('manager', True,
                          connection.parsePBConnectionInfo, None),
+                        ('component-id', True, str, None),
                         ('ui-state-key', True, str, None),
-                        ('sample-frequency', False, float, 300),
+                        ('sample-frequency', False, int, 300),
                         ('is-gauge', False, common.strToBool, True),
                         ('rrd-ds-spec', False, str, None),
-                        ('rrd-file', False, str, None))
+                        ('rrd-file', True, filename, None))
         for k, required, parser, default in basicOptions:
             table[k] = strparser(parser), ressetter(k)
             if not required:

Modified: flumotion/trunk/flumotion/admin/rrdmon/rrdmon.py
==============================================================================
--- flumotion/trunk/flumotion/admin/rrdmon/rrdmon.py	(original)
+++ flumotion/trunk/flumotion/admin/rrdmon/rrdmon.py	Mon Jul  2 18:30:54 2007
@@ -20,7 +20,74 @@
 # Headers in this file shall remain intact.
 
 
-from flumotion.common import log
+import os
+import random
+import rrdtool
+import datetime
+import time
+
+from dateutil import rrule
+
+from flumotion.admin import multi
+from flumotion.common import log, common
+from flumotion.component.base import scheduler
+
+# register the unjellyable
+from flumotion.common import componentui
+
+
+"""RRD resource poller daemon for Flumotion.
+
+Makes periodic observations on components' UI states, recording them to
+RRD files. One can then extract graphs using rrdtool graph. For example,
+to show a stream bandwidth graph for the last 30 minutes with the
+example configuration file, in the source tree as
+conf/rrdmon/default.xml, the following command makes a graph:
+
+  rrdtool graph --end now --start end-30min --width 400 out.png \
+     DEF:ds0=/tmp/stream-bitrate.rrd:http-streamer:AVERAGE \
+     AREA:ds0#0000FF:"Stream bandwidth (bytes/sec)"
+
+It would be possible to expose these graphs via HTTP, but I don't know
+how useful this might be.
+
+See L{flumotion.admin.rrdmon.config} for information on how to configure
+the RRD resource poller.
+"""
+
+
+def sourceGetFileName(source):
+    return source['rrd-file']
+    
+def sourceGetName(source):
+    return source['name']
+
+def sourceGetSampleFrequency(source):
+    return source['sample-frequency']
+
+def sourceGetDS(source):
+    def makeDS():
+        if source['is-gauge']:
+            return 'DS:%s:GAUGE:%d:U:U' % (source['name'],
+                                           2*source['sample-frequency'])
+        else:
+            return 'DS:%s:DERIVE:%d:0:U' % (source['name'],
+                                            2*source['sample-frequency'])
+    return source['rrd-ds-spec'] or makeDS()
+
+def sourceGetRRAList(source):
+    def archiveGetRRA(archive):
+        return 'RRA:' + archive['rra-spec']
+    return [archiveGetRRA(archive) for archive in source['archives']]
+
+def sourceGetConnectionInfo(source):
+    return source['manager']
+
+def sourceGetComponentId(source):
+    return source['component-id']
+
+def sourceGetUIStateKey(source):
+    return source['ui-state-key']
 
 
 class RRDMonitor(log.Loggable):
@@ -28,6 +95,111 @@
 
     def __init__(self, sources):
         self.debug('started rrd monitor')
-        self.sources = sources
-        # There is a bit of a TODO here :)
+        self.multi = multi.MultiAdminModel()
+        self.scheduler = scheduler.Scheduler()
+        self.ensureRRDFiles(sources)
+        self.connectToManagers(sources)
+        self.startScheduler(sources)
         
+    def ensureRRDFiles(self, sources):
+        for source in sources:
+            rrdfile = sourceGetFileName(source)
+            if not os.path.exists(rrdfile):
+                try:
+                    self.info('Creating RRD file %s', rrdfile)
+                    rrdtool.create(rrdfile,
+                                   "-s", str(sourceGetSampleFrequency(source)),
+                                   sourceGetDS(source),
+                                   *sourceGetRRAList(source))
+                except rrdtool.error, e:
+                    self.warning('Could not create RRD file %s',
+                                 rrdfile)
+                    self.debug('Failure reason: %s',
+                               log.getExceptionMessage(e))
+
+    def connectToManagers(self, sources):
+        for source in sources:
+            connectionInfo = sourceGetConnectionInfo(source)
+            self.multi.addManager(connectionInfo, tenacious=True)
+
+    def startScheduler(self, sources):
+        r = random.Random()
+        now = scheduler.now()
+
+        def eventStarted(event):
+            self.pollData(*event.content)
+        def eventStopped(event):
+            pass
+
+        self.scheduler.subscribe(eventStarted, eventStopped)
+
+        for source in sources:
+            freq = sourceGetSampleFrequency(source)
+
+            # randomly offset the polling
+            offset = datetime.timedelta(seconds=r.randint(0,freq))
+
+            data = (str(sourceGetConnectionInfo(source)),
+                    sourceGetComponentId(source),
+                    sourceGetUIStateKey(source),
+                    sourceGetName(source),
+                    sourceGetFileName(source))
+
+            self.scheduler.addEvent(now+offset,
+                                    now+offset+datetime.timedelta(seconds=1),
+                                    data,
+                                    datetime.timedelta(seconds=freq))
+
+    def pollData(self, managerId, componentId, uiStateKey, dsName,
+                 rrdFile):
+        def stateListToDict(l):
+            return dict([(x.get('name'), x) for x in l])
+
+        if managerId in self.multi.admins:
+            admin = self.multi.admins[managerId]
+
+            flowName, componentName = common.parseComponentId(componentId)
+
+            flows = stateListToDict(admin.planet.get('flows'))
+            if flowName not in flows:
+                self.warning('not polling %s%s:%s: no such flow %s',
+                             managerId, componentId, uiStateKey,
+                             flowName)
+                return
+
+            components = stateListToDict(flows[flowName].get('components'))
+            if componentName not in components:
+                self.warning('not polling %s%s:%s: no such component %s',
+                             managerId, componentId, uiStateKey,
+                             componentId)
+                return
+
+            state = components[componentName]
+
+            def gotUIState(uiState):
+                if not uiState.hasKey(uiStateKey):
+                    self.warning('while polling %s%s:%s: uiState has no '
+                                 'key %s', managerId, componentId,
+                                 uiStateKey, uiStateKey)
+                else:
+                    try:
+                        value = '%d:%s' % (int(time.time()),
+                                           uiState.get(uiStateKey))
+                        self.log("polled %s%s:%s, updating ds %s = %s",
+                                 managerId, componentId, uiStateKey,
+                                 dsName, value)
+                        rrdtool.update(rrdFile, "-t", dsName, value)
+                    except rrdtool.error, e:
+                        self.warning('error updating rrd file %s for '
+                                     '%s%s:%s', rrdFile, managerId,
+                                     componentId, uiStateKey)
+                        self.debug('error reason: %s',
+                                   log.getExceptionMessage(e))
+
+            def errback(failure):
+                self.warning('not polling %s%s:%s: failed to get ui '
+                             'state')
+                self.debug('reason: %s', log.getFailureMessage(failure))
+
+            d = admin.componentCallRemote(state, 'getUIState')
+            d.addCallbacks(gotUIState, errback)

Modified: flumotion/trunk/flumotion/component/base/scheduler.py
==============================================================================
--- flumotion/trunk/flumotion/component/base/scheduler.py	(original)
+++ flumotion/trunk/flumotion/component/base/scheduler.py	Mon Jul  2 18:30:54 2007
@@ -77,6 +77,8 @@
         self.debug('new event, content=%r, start=%r, end=%r', content,
                    start, end)
 
+        assert start < end
+
         if recur:
             from dateutil import rrule
             if now is None:
@@ -84,12 +86,22 @@
             if end.tzinfo is None:
                 end = datetime(end.year, end.month, end.day, end.hour, 
                     end.minute, end.second, end.microsecond, LOCAL)
-            endRecurRule = rrule.rrulestr(recur, dtstart=end) 
             if start.tzinfo is None:
                 start = datetime(start.year, start.month, start.day, 
                     start.hour, start.minute, start.second, 
                     start.microsecond, LOCAL)
-            startRecurRule = rrule.rrulestr(recur, dtstart=start)
+
+            if isinstance(recur, timedelta):
+                interval = recur.days*24*60*60 + recur.seconds
+                endRecurRule = rrule.rrule(rrule.SECONDLY,
+                                           interval=interval,
+                                           dtstart=end)
+                startRecurRule = rrule.rrule(rrule.SECONDLY,
+                                             interval=interval,
+                                             dtstart=start)
+            else:
+                endRecurRule = rrule.rrulestr(recur, dtstart=end) 
+                startRecurRule = rrule.rrulestr(recur, dtstart=start)
 
             if end < now:
                 end = endRecurRule.after(now)
@@ -174,8 +186,9 @@
         @type    end: datetime
         @param content: content of this event
         @type  content: str
-        @param recur: recurrence rule
-        @type  recur: str
+        @param recur: recurrence rule, either as a string parseable by
+        datetime.rrule.rrulestr or as a datetime.timedelta
+        @type  recur: None, str, or datetime.timedelta
 
         @returns: an Event that can later be passed to removeEvent, if
         so desired. The event will be removed or rescheduled


More information about the flumotion-commit mailing list