sebastien - in flumotion/branches/transcoder-1: . flumotion/common flumotion/component/combiners/switch flumotion/component/consumers/httpstreamer flumotion/manager flumotion/test

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Mon May 14 15:31:25 CEST 2007


Author: sebastien
Date: Mon May 14 15:31:17 2007
New Revision: 4937

Modified:
   flumotion/branches/transcoder-1/ChangeLog
   flumotion/branches/transcoder-1/flumotion/common/config.py
   flumotion/branches/transcoder-1/flumotion/component/combiners/switch/Makefile.am
   flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.py
   flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.xml
   flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/resources.py
   flumotion/branches/transcoder-1/flumotion/manager/admin.py
   flumotion/branches/transcoder-1/flumotion/manager/manager.py
   flumotion/branches/transcoder-1/flumotion/test/test_http.py
Log:
2007-05-14  Sebastien Merle  <sebastien at fluendo.com>

	* Merged flumotion trunk changesets up to [4936].

Modified: flumotion/branches/transcoder-1/ChangeLog
==============================================================================
--- flumotion/branches/transcoder-1/ChangeLog	(original)
+++ flumotion/branches/transcoder-1/ChangeLog	Mon May 14 15:31:17 2007
@@ -1,3 +1,7 @@
+2007-05-14  Sebastien Merle  <sebastien at fluendo.com>
+
+	* Merged flumotion trunk changesets up to [4936].
+
 2007-05-10  Sebastien Merle  <sebastien at fluendo.com>
 
 	* Merged flumotion trunk changesets up to [4932].
@@ -30,6 +34,25 @@
 	* flumotion/manager/admin.py: 
 	Added the loadComponent method.
 
+2007-05-11  Zaheer Abbas Merali  <<zaheerabbas at merali dot org>>
+
+	* flumotion/component/combiners/switch/Makefile.am:
+	Add admin ui to Makefile.
+
+2007-05-11  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/test/test_http.py:
+	  Fix a test that used an internal method in resource.py.
+
+2007-05-11  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/component/consumers/httpstreamer/http.py:
+	* flumotion/component/consumers/httpstreamer/http.xml:
+	* flumotion/component/consumers/httpstreamer/resources.py:
+	  Implement bandwidth-limit in http streamer.
+	  Implement redirecting clients to a different location when the
+	  server is full (hitting either bandwidth limit or client limit)
+
 2007-05-10  Sebastien Merle  <sebastien at fluendo.com>
 
 	* flumotion/common/enum.py: 

Modified: flumotion/branches/transcoder-1/flumotion/common/config.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/common/config.py	(original)
+++ flumotion/branches/transcoder-1/flumotion/common/config.py	Mon May 14 15:31:17 2007
@@ -62,7 +62,7 @@
 
 def _buildComponentConfig(parentName, componentType, componentName, 
                           componentProperties, componentPlugs=None,
-                          eaters=None, isClockMaster=None, 
+                          eaters=None, isClockMaster=False, 
                           flumotionVersion=None):
     """
     Build a component configuration dictionary.
@@ -75,12 +75,15 @@
     else:
         version = configure.versionTuple
 
+    # clock-master should be either an avatar id or None.
+    # It can temporarily be set to True, and the flow parsing
+    # code will change it to the avatar id or None.
     config = { 'name': componentName,
                'parent': parentName,
                'type': componentType,
                'avatarId': common.componentId(parentName, componentName),
                'version': version,
-               'clock-master': isClockMaster}
+               'clock-master': isClockMaster or None}
 
     try:
         defs = registry.getRegistry().getComponent(componentType)
@@ -166,7 +169,7 @@
 
 def buildComponentConfig(parentName, componentType, componentName, 
                          componentProperties, componentPlugs,
-                         workerName, eaters=None, isClockMaster=None,
+                         workerName, eaters=None, isClockMaster=False,
                          flumotionVersion=None):
     config = _buildComponentConfig(parentName, componentType, componentName, 
                                    componentProperties, componentPlugs, 
@@ -845,7 +848,7 @@
         if bools and bools[0]:
             return True # will get changed to avatarId in parseFlow
         else:
-            return None
+            return False
             
     def _parsePlugs(self, node, sockets):
         plugs = {}

Modified: flumotion/branches/transcoder-1/flumotion/component/combiners/switch/Makefile.am
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/component/combiners/switch/Makefile.am	(original)
+++ flumotion/branches/transcoder-1/flumotion/component/combiners/switch/Makefile.am	Mon May 14 15:31:17 2007
@@ -1,6 +1,6 @@
 include $(top_srcdir)/common/python.mk
 
-component_PYTHON = __init__.py switch.py
+component_PYTHON = __init__.py switch.py admin_gtk.py
 componentdir = $(libdir)/flumotion/python/flumotion/component/combiners/switch
 component_DATA = \
 	switch.xml	

Modified: flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.py	(original)
+++ flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.py	Mon May 14 15:31:17 2007
@@ -50,6 +50,9 @@
 T_ = messages.gettexter('flumotion')
 
 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
+
+
+STATS_POLL_INTERVAL = 10
     
 # FIXME: generalize this class and move it out here ?
 class Stats:
@@ -66,6 +69,8 @@
         self.load_deltas = [0, 0]
         self._load_deltas_period = 10 # seconds
         self._load_deltas_ongoing = [time.time(), 0, 0]
+        self._currentBitrate = -1 # not known yet
+        self._lastBytesReceived = -1 # not known yet
 
         # keep track of average clients by tracking last average and its time
         self.average_client_number = 0
@@ -123,9 +128,25 @@
         self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
         self._load_deltas_ongoing = [now, add, remove]
 
+        bytesReceived = self.getBytesReceived()
+        if self._lastBytesReceived >= 0:
+            self._currentBitrate = ((bytesReceived - self._lastBytesReceived) *
+                 8 / STATS_POLL_INTERVAL)
+            self._lastBytesReceived = bytesReceived
+            
+        self._currentBitrate = -1 # not known yet
+        self._lastBytesSent = -1 # not known yet
+
         self.update_ui_state()
 
-        self._updateCallLaterId = reactor.callLater(10, self._updateStats)
+        self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL, 
+            self._updateStats)
+
+    def getCurrentBitrate(self):
+        if self._currentBitrate >= 0:
+            return self._currentBitrate
+        else:
+            return self.getBytesReceived() * 8 / self.getUptime()
 
     def getBytesSent(self):
         return self.sink.get_property('bytes-served')
@@ -445,6 +466,13 @@
         if properties.has_key('client-limit'):
             self.resource.setUserLimit(int(properties['client-limit']))
 
+        if properties.has_key('bandwidth-limit'):
+            self.resource.setBandwidthLimit(int(properties['bandwidth-limit']))
+
+        if properties.has_key('redirect-on-overflow'):
+            self.resource.setRedirectionOnLimits(
+                properties['redirect-on-overflow'])
+
         if properties.has_key('bouncer'):
             self.resource.setBouncerName(properties['bouncer'])
 

Modified: flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.xml
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.xml	(original)
+++ flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/http.xml	Mon May 14 15:31:17 2007
@@ -49,10 +49,14 @@
                   description="Server's host name to display" />
         <property name="domain" type="string"
                   description="Domain of server for authentication" />
+
         <property name="client-limit" type="int"
                   description="Maximum number of clients allowed" />
         <property name="bandwidth-limit" type="int"
-                  description="Maximum bandwidth usage allowed (not implemented)" />
+                  description="Maximum bandwidth usage allowed in bits per second" />
+        <property name="redirect-on-overflow" type="string"
+                  description="A URL to redirect clients to if either of the above limits have been reached" />
+
         <property name="duration" type="float"
                   description="How long to keep clients connected for (in seconds) "/>
 

Modified: flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/resources.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/resources.py	(original)
+++ flumotion/branches/transcoder-1/flumotion/component/consumers/httpstreamer/resources.py	Mon May 14 15:31:17 2007
@@ -85,6 +85,10 @@
         self._requests = {}            # request fd -> Request
         
         self.maxclients = self.getMaxAllowedClients(-1)
+        self.maxbandwidth = -1 # not limited by default
+
+        # If set, a URL to redirect a user to when the limits above are reached
+        self._redirectOnFull = None
         
         self.loggers = \
             streamer.plugs['flumotion.component.plugs.loggers.Logger']
@@ -150,6 +154,13 @@
         # Log what we actually managed to set it to.
         self.info('set maxclients to %d' % self.maxclients)
 
+    def setBandwidthLimit(self, limit):
+        self.maxbandwidth = limit
+        self.info("set maxbandwidth to %d", self.maxbandwidth)
+
+    def setRedirectionOnLimits(self, url):
+        self._redirectOnFull = url
+
     # FIXME: rename to writeHeaders
     """
     Write out the HTTP headers for the incoming HTTP request.
@@ -249,8 +260,14 @@
         else:
             return softmax - self.__reserve_fds__
 
-    def reachedMaxClients(self):
-        return len(self._requests) >= self.maxclients and self.maxclients >= 0
+    def reachedServerLimits(self):
+        if self.maxclients >= 0 and len(self._requests) >= self.maxclients:
+            return True
+        elif self.maxbandwidth >= 0:
+            if (len(self._requests) * self.streamer.getCurrentBitrate() >= 
+                    self.maxbandwidth):
+                return True
+        return False
     
     def _addClient(self, request):
         """
@@ -338,8 +355,8 @@
 
         if not self.isReady():
             return self._handleNotReady(request)
-        elif self.reachedMaxClients():
-            return self._handleMaxClients(request)
+        elif self.reachedServerLimits():
+            return self._handleServerFull(request)
 
         self.debug('_render(): asked for (possible) authentication')
         d = self.startAuthentication(request)
@@ -358,14 +375,20 @@
         self.debug('Not sending data, it\'s not ready')
         return server.NOT_DONE_YET
         
-    def _handleMaxClients(self, request):
-        self.debug('Refusing clients, client limit %d reached' %
-            self.maxclients)
+    def _handleServerFull(self, request):
+        if self._redirectOnFull:
+            self.debug("Redirecting client, client limit %d reached", 
+                self.maxclients)
+            error_code = http.FOUND
+            request.setHeader('location', self._redirectOnFull)
+        else:
+            self.debug('Refusing clients, client limit %d reached' %
+                self.maxclients)
+            error_code = http.SERVICE_UNAVAILABLE
 
         request.setHeader('content-type', 'text/html')
-        request.setHeader('server', HTTP_VERSION)
         
-        error_code = http.SERVICE_UNAVAILABLE
+        request.setHeader('server', HTTP_VERSION)
         request.setResponseCode(error_code)
         
         return ERROR_TEMPLATE % {'code': error_code,

Modified: flumotion/branches/transcoder-1/flumotion/manager/admin.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/manager/admin.py	(original)
+++ flumotion/branches/transcoder-1/flumotion/manager/admin.py	Mon May 14 15:31:17 2007
@@ -411,7 +411,7 @@
 
     def perspective_loadComponent(self, componentType, componentId,
                                   componentProperties, workerName,
-                                  eaters=None, isClockMaster=None):
+                                  eaters=None, isClockMaster=False):
         """
         Load a component into the manager configuration.
         Returns a deferred that will be called with the component state.

Modified: flumotion/branches/transcoder-1/flumotion/manager/manager.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/manager/manager.py	(original)
+++ flumotion/branches/transcoder-1/flumotion/manager/manager.py	Mon May 14 15:31:17 2007
@@ -555,7 +555,7 @@
 
     def loadComponent(self, identity, componentType, componentId,
                       componentProperties, workerName, 
-                      eaters=None, isClockMaster=None):
+                      eaters=None, isClockMaster=False):
         """
         Load a component into the manager configuration.
         Returns a deferred that will be called with the component state.
@@ -585,12 +585,21 @@
             self.debug(message)
             raise errors.ComponentWorkerConfigError(message)
         
+        if isClockMaster:
+            raise NotImplementedError("Clock master components "
+                                      "are not yet supported")
+        
         state = self.state
         compState = None
         compConf = config.buildComponentConfig(parentName, componentType,
                                                compName, componentProperties,
                                                None, workerName, eaters,
                                                isClockMaster)
+        
+        if compConf.defs.getNeedsSynchronization():
+            raise NotImplementedError("Components that need synchronization "
+                                      "are not yet supported")
+            
         if parentName == 'atmosphere':
             atmosphere = state.get('atmosphere')
             components = [x.get('name') for x in atmosphere.get('components')]

Modified: flumotion/branches/transcoder-1/flumotion/test/test_http.py
==============================================================================
--- flumotion/branches/transcoder-1/flumotion/test/test_http.py	(original)
+++ flumotion/branches/transcoder-1/flumotion/test/test_http.py	Mon May 14 15:31:17 2007
@@ -178,7 +178,7 @@
         #assert resource.maxAllowedClients() == 974
         resource._requests = ' ' * (resource.maxclients + 1)
         
-        self.failUnless(resource.reachedMaxClients())
+        self.failUnless(resource.reachedServerLimits())
         
         request = FakeRequest(ip='127.0.0.1')
         data = resource.render(request)


More information about the flumotion-commit mailing list