wingo - in flumotion/trunk: . flumotion/admin flumotion/test

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Wed Jun 27 16:37:37 CEST 2007


Author: wingo
Date: Wed Jun 27 16:37:33 2007
New Revision: 5272

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/admin/multi.py
   flumotion/trunk/flumotion/test/test_admin_multi.py
Log:
2007-06-27  Andy Wingo  <wingo at pobox.com>

	* flumotion/admin/multi.py (MultiAdminModel.removeListener): New
	method.
	(MultiAdminModel._managerConnected)
	(MultiAdminModel._managerDisconnected): New internal methods.
	Listen for reconnections. Fixes #664.
	(MultiAdminModel.addManager): Refactor to use the admin's deferred
	connection interface. This shows that the interface is sufficient.
	Fixes #621.
	(MultiAdminModel.removeManager): Remove reconnection signals, and
	treat the manager as disconnected immediately. No need to use
	deferred stops.

	* flumotion/test/test_admin_multi.py (MultiAdminTest): Use
	TestCaseWithManager.
	(MultiAdminTest.testConnectSuccess)
	(MultiAdminTest.testConnectFailure)
	(MultiAdminTest.testReconnect): Add tests.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Wed Jun 27 16:37:33 2007
@@ -1,5 +1,23 @@
 2007-06-27  Andy Wingo  <wingo at pobox.com>
 
+	* flumotion/admin/multi.py (MultiAdminModel.removeListener): New
+	method.
+	(MultiAdminModel._managerConnected)
+	(MultiAdminModel._managerDisconnected): New internal methods.
+	Listen for reconnections. Fixes #664.
+	(MultiAdminModel.addManager): Refactor to use the admin's deferred
+	connection interface. This shows that the interface is sufficient.
+	Fixes #621.
+	(MultiAdminModel.removeManager): Remove reconnection signals, and
+	treat the manager as disconnected immediately. No need to use
+	deferred stops.
+
+	* flumotion/test/test_admin_multi.py (MultiAdminTest): Use
+	TestCaseWithManager.
+	(MultiAdminTest.testConnectSuccess)
+	(MultiAdminTest.testConnectFailure)
+	(MultiAdminTest.testReconnect): Add tests.
+
 	* flumotion/test/common.py: 
 	* flumotion/test/test_admin_admin.py: Move the TestCaseWithManager
 	to common.py.

Modified: flumotion/trunk/flumotion/admin/multi.py
==============================================================================
--- flumotion/trunk/flumotion/admin/multi.py	(original)
+++ flumotion/trunk/flumotion/admin/multi.py	Wed Jun 27 16:37:33 2007
@@ -77,6 +77,7 @@
         self.admins = WatchedDict() # {managerId: AdminModel}
         # private
         self.listeners = []
+        self._reconnectHandlerIds = {} # managerId => [disconnect, id..]
         self._startSet = startset.StartSet(self.admins.has_key,
                                            errors.AlreadyConnectingError,
                                            errors.AlreadyConnectedError)
@@ -98,101 +99,92 @@
         assert not obj in self.listeners
         self.listeners.append(obj)
 
-    def addManager(self, connectionInfo, tenacious=False):
-        def connected_cb(admin):
-            self._startSet.avatarStarted(managerId)
-
-        def disconnected_cb(admin):
-            self.info('Disconnected from manager')
-            if admin.managerId in self.admins:
-                self.emit('removePlanet', admin, admin.planet)
-                del self.admins[admin.managerId]
-            else:
-                self.warning('Could not find admin model %r', admin)
-            if self._startSet.shutdownRegistered(managerId):
-                self._startSet.shutdownSuccess(managerId)
-
-        def connection_refused_cb(admin):
-            msg = 'Connection to %s:%d refused.' % (i.host, i.port)
-            self.info('%s', msg)
-            if not tenacious:
-                self._startSet.avatarStopped(managerId,
-                    errors.ConnectionRefusedError(msg))
-
-        def connection_failed_cb(admin, string):
-            msg = 'Connection to %s:%d failed: %s' % (i.host, i.port,
-                                                      string)
-            self.info('%s', msg)
-            if not tenacious:
-                self._startSet.avatarStopped(managerId,
-                    lambda _: errors.ConnectionFailedError(msg))
-
-        def connection_error_cb(admin, obj):
-            msg = 'Error connecting to %s:%d: %r' % (i.host, i.port,
-                                                     obj)
-            self.warning('%s', msg)
-            if not tenacious:
-                self._startSet.avatarStopped(managerId,
-                    lambda _: errors.ConnectionFailedError(msg))
+    def removeListener(self, obj):
+        self.listeners.remove(obj)
 
+    def _managerConnected(self, admin):
+        if admin.managerId not in self._reconnectHandlerIds:
+            # the first time a manager is connected to, start listening
+            # for reconnections; intertwingled with removeManager()
+            ids = [admin.disconnect]
+            ids.append(admin.connect('connected',
+                                     self._managerConnected))
+            ids.append(admin.connect('disconnected',
+                                     self._managerDisconnected))
+            self._reconnectHandlerIds[admin.managerId] = ids
+
+        planet = admin.planet
+        self.info('Connected to manager %s (planet %s)',
+                  admin.managerId, planet.get('name'))
+        assert admin.managerId not in self.admins
+        self.admins[admin.managerId] = admin
+        self.emit('addPlanet', admin, planet)
+
+    def _managerDisconnected(self, admin):
+        if admin.managerId in self.admins:
+            self.emit('removePlanet', admin, admin.planet)
+            del self.admins[admin.managerId]
+        else:
+            self.warning('Could not find admin model %r', admin)
+        
+    def addManager(self, connectionInfo, tenacious=False):
         i = connectionInfo
         managerId = str(i)
 
+        # This dance of deferreds is here so as to make sure that
+        # removeManager can cancel a pending connection.
+
         # can raise errors.AlreadyConnectingError or
         # errors.AlreadyConnectedError
         try:
-            d = self._startSet.createStart(managerId)
+            startD = self._startSet.createStart(managerId)
         except Exception, e:
             return defer.fail(e)
 
         a = admin.AdminModel()
-        a.connectToManager(i, tenacious)
+        connectD = a.connectToManager(i, tenacious)
         assert a.managerId == managerId
 
-        a.connect('connected', connected_cb)
-        a.connect('disconnected', disconnected_cb)
-        a.connect('connection-refused', connection_refused_cb)
-        a.connect('connection-failed', connection_failed_cb)
-        a.connect('connection-error', connection_error_cb)
-
-        # the admin should offer a decent deferred-connect interface;
-        # instead here we conflate the startset and the
-        # signal->deferred adaptations in one function
-
-        def emit_add_planet(_):
-            planet = a.planet
-            self.info('Connected to manager %s (planet %s)',
-                      a.managerId, planet.get('name'))
-            self.admins[a.managerId] = a
-            self.emit('addPlanet', a, planet)
-            return a
-
-        def disconnect_on_error(failure):
+        def connect_callback(_):
+            self._startSet.avatarStarted(managerId)
+            
+        def connect_errback(failure):
+            self._startSet.avatarStopped(managerId, lambda _: failure)
+
+        connectD.addCallbacks(connect_callback, connect_errback)
+
+        def start_callback(_):
+            self._managerConnected(a)
+            
+        def start_errback(failure):
             a.shutdown()
             return failure
 
-        d.addCallbacks(emit_add_planet, disconnect_on_error)
+        startD.addCallbacks(start_callback, start_errback)
 
-        return d
+        return startD
 
     def removeManager(self, managerId):
         self.info('disconnecting from %s', managerId)
+
+        # stop listening to admin's signals, if the manager had actually
+        # connected at some point
+        if managerId in self._reconnectHandlerIds:
+            disconnect = self._reconnectHandlerIds[managerId][0]
+            ids = self._reconnectHandlerIds[managerId][1:]
+            map(disconnect, ids)
+
         if managerId in self.admins:
-            self.admins[managerId].shutdown()
-            return self._startSet.shutdownStart(managerId)
-        elif self._startSet.createRegistered(managerId):
-            # this admin has not yet connected; let us assume that in
-            # this window, it will not connect. Firing this makes the
-            # admin shutdown, see disconnect_on_error above.
-            self._startSet.shutdownSuccess(admin.managerId)
-            return defer.succeed(managerId)
-        elif self._startSet.shutdownRegistered(managerId):
-            # some caller is overzealous?
-            return self._startSet.shutdownStart(managerId)
-        else:
-            self.warning('told to remove an unknown manager: %s',
-                         managerId)
-            return defer.succeed(managerId)
+            admin = self.admins[managerId]
+            admin.shutdown()
+            self._managerDisconnected(admin)
+
+        # Firing this has the side effect of errbacking on any pending
+        # start, calling start_errback above if appropriate.
+        self._startSet.avatarStopped(
+            managerId, lambda _: errors.ConnectionCancelledError())
+
+        return defer.succeed(managerId)
 
     def for_each_component(self, object, proc):
         '''Call a procedure on each component that is a child of OBJECT'''

Modified: flumotion/trunk/flumotion/test/test_admin_multi.py
==============================================================================
--- flumotion/trunk/flumotion/test/test_admin_multi.py	(original)
+++ flumotion/trunk/flumotion/test/test_admin_multi.py	Wed Jun 27 16:37:33 2007
@@ -19,11 +19,88 @@
 
 # Headers in this file shall remain intact.
 
+import common
+
 from twisted.trial import unittest
-from twisted.internet import reactor
+from twisted.internet import defer
+
+from flumotion.admin import multi
+from flumotion.common import connection
+from flumotion.twisted import pb
 
-class MultiAdminTest(unittest.TestCase):
-    def testConstructor(self):
-        from flumotion.admin import multi
 
+class MultiAdminTest(common.TestCaseWithManager):
+    def testConstructor(self):
         model = multi.MultiAdminModel()
+
+    def testConnectSuccess(self):
+        def connected(_):
+            self.assertEqual(len(self.vishnu.adminHeaven.avatars),
+                             1)
+            return m.removeManager(str(self.connectionInfo))
+
+        m = multi.MultiAdminModel()
+        d = m.addManager(self.connectionInfo)
+        d.addCallback(connected)
+        return d
+
+    def testConnectFailure(self):
+        def connected(_):
+            self.fail('should not have connected')
+
+        def failure(f):
+            # ok!
+            self.assertEqual(len(self.vishnu.adminHeaven.avatars), 0)
+            self.assertEqual(m.admins, {})
+            self.assertEqual(m._reconnectHandlerIds, {})
+
+        m = multi.MultiAdminModel()
+        i = connection.PBConnectionInfo(self.connectionInfo.host,
+                                        self.connectionInfo.port,
+                                        self.connectionInfo.use_ssl,
+                                        pb.Authenticator(username='user',
+                                                         password='pest'))
+        d = m.addManager(i)
+        d.addCallbacks(connected, failure)
+        return d
+
+    def testReconnect(self):
+        class Listener:
+            def __init__(self):
+                self.disconnectDeferred = defer.Deferred()
+                self.reconnectDeferred = defer.Deferred()
+                
+            def model_addPlanet(self, admin, planet):
+                self.reconnectDeferred.callback(admin)
+                self.reconnectDeferred = None
+
+            def model_removePlanet(self, admin, planet):
+                self.disconnectDeferred.callback(admin)
+                self.disconnectDeferred = None
+        Listener = Listener()
+
+        def connected(_):
+            self.assertEqual(len(self.vishnu.adminHeaven.avatars),
+                             1)
+            a = m.admins[str(self.connectionInfo)]
+
+            m.addListener(Listener)
+
+            a.clientFactory.disconnect()
+
+            return Listener.disconnectDeferred
+
+        def disconnected(_):
+            return Listener.reconnectDeferred
+
+        def reconnected(_):
+            m.removeListener(Listener)
+            return m.removeManager(str(self.connectionInfo))
+
+        m = multi.MultiAdminModel()
+        d = m.addManager(self.connectionInfo)
+        d.addCallback(connected)
+        d.addCallback(disconnected)
+        d.addCallback(reconnected)
+        return d
+


More information about the flumotion-commit mailing list