msmith - in flumotion/trunk: . flumotion/component flumotion/job

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Wed Feb 28 15:57:44 CET 2007


Author: msmith
Date: Wed Feb 28 15:57:41 2007
New Revision: 4536

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/component/component.py
   flumotion/trunk/flumotion/job/job.py
Log:
        * flumotion/component/component.py:
        * flumotion/job/job.py:
          Rewrite component/job shutdown to have a clean shutdown path going
          only through the job.
          Ensure through this path that we only ever stop the reactor once.
          Should fix a variety of shutdown races triggered by calling
          reactor.stop() from a reactor shutdown hook.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Wed Feb 28 15:57:41 2007
@@ -1,5 +1,15 @@
 2007-02-28  Michael Smith  <msmith at fluendo.com>
 
+	* flumotion/component/component.py:
+	* flumotion/job/job.py:
+	  Rewrite component/job shutdown to have a clean shutdown path going
+	  only through the job.
+	  Ensure through this path that we only ever stop the reactor once.
+	  Should fix a variety of shutdown races triggered by calling
+	  reactor.stop() from a reactor shutdown hook.
+
+2007-02-28  Michael Smith  <msmith at fluendo.com>
+
 	* flumotion/component/feedcomponent010.py:
 	  client-removed has an extra argument; make sure we have it.
 

Modified: flumotion/trunk/flumotion/component/component.py
==============================================================================
--- flumotion/trunk/flumotion/component/component.py	(original)
+++ flumotion/trunk/flumotion/component/component.py	Wed Feb 28 15:57:41 2007
@@ -149,8 +149,6 @@
         self.comp = component
         self.authenticator = None
 
-        self.reactor_stopped = False
-        
     ### our methods
     def setup(self, config):
         pass
@@ -234,22 +232,8 @@
         return self.comp.start(*args, **kwargs)
        
     def remote_stop(self):
-        self.info('Stopping job')
-        d = self.comp.stop()
-        # We want to stop the process even if the component stop fails for some
-        # reason - otherwise we end up unstoppable. 
-        d.addBoth(self._destroyCallback)
-
-        return d
-
-    def _destroyCallback(self, result):
-        self.debug('_destroyCallback: scheduling destroy')
-        reactor.callLater(0, self._destroyCallLater)
-
-    def _destroyCallLater(self):
-        self.debug('_destroyCalllater: stopping reactor')
-        self.reactor_stopped = True
-        reactor.stop()
+        self.info('Stopping component')
+        return self.comp.stop()
 
     def remote_reloadComponent(self):
         """Reload modules in the component."""
@@ -355,6 +339,8 @@
         # Start the cpu-usage updating.
         self._cpuCallLater = reactor.callLater(5, self._updateCPUUsage)
 
+        self._shutdownHook = None
+
     def do_check(self):
         """
         Subclasses can implement me to run any checks before the component
@@ -531,6 +517,13 @@
             self.debug("Exception during component do_start: %s" % 
                 log.getExceptionMessage(e))
             return defer.fail(e)
+
+    def setShutdownHook(self, shutdownHook):
+        """
+        Set the shutdown hook for this component (replacing any previous hook).
+        When a component is stopped, then this hook will be fired.
+        """
+        self._shutdownHook = shutdownHook
         
     def stop(self):
         """
@@ -547,6 +540,11 @@
                     plug.stop(self)
             return ret
 
+        def fireShutdownHook(ret):
+            if self._shutdownHook:
+                self.debug('_stoppedCallback: firing shutdown hook')
+                self._shutdownHook()
+
         self.setMood(moods.waking)
         for message in self.state.get('messages'):
             self.state.remove('messages', message)
@@ -557,6 +555,7 @@
 
         d = self.do_stop()
         d.addCallback(stop_plugs)
+        d.addBoth(fireShutdownHook)
         return d
 
     ### GObject methods

Modified: flumotion/trunk/flumotion/job/job.py
==============================================================================
--- flumotion/trunk/flumotion/job/job.py	(original)
+++ flumotion/trunk/flumotion/job/job.py	Wed Feb 28 15:57:41 2007
@@ -74,6 +74,8 @@
         self._managerKeycard = None
         self._componentClientFactory = None # from component to manager
 
+        self._hasStoppedReactor = False
+
     ### pb.Referenceable remote methods called on by the WorkerBrain
     def remote_bootstrap(self, workerName, host, port, transport, authenticator,
             packagePaths):
@@ -137,12 +139,19 @@
 
         self.component = self._createComponent(avatarId, type, moduleName,
             methodName, nice)
+        self.component.setShutdownHook(self._componentStopped)
 
-    def remote_stop(self):
-        # stop reactor from a callLater so this remote method finishes
-        # nicely
+    def _componentStopped(self):
+        # stop reactor from a callLater so remote methods finish nicely
         reactor.callLater(0, self.shutdown)
 
+    def remote_stop(self):
+        if self.component:
+            self.debug('stopping component and shutting down')
+            self.component.stop()
+        else:
+            reactor.callLater(0, self.shutdown)
+
     def shutdownHandler(self):
         dlist = []
         if self.hasRemoteReference():
@@ -162,18 +171,12 @@
         Shut down the job process completely, cleaning up the component
         so the reactor can be left from.
         """
-        reactorAlreadyStopped = False
-        if self.component:
-            self.debug('stopping component')
-            self.component.stop()
-            if self.component.medium and self.component.medium.reactor_stopped:
-                reactorAlreadyStopped = True
-            self.debug('stopped component')
-
-        if not reactorAlreadyStopped:
-            self.debug('stopping reactor')
+        if self._hasStoppedReactor:
+            self.debug("Not stopping reactor again, already shutting down")
+        else:
+            self._hasStoppedReactor = True
+            self.info("Stopping reactor in job process")
             reactor.stop()
-            self.debug('reactor stopped, exiting process')
 
     def _setNice(self, nice):
         if not nice:
@@ -310,7 +313,7 @@
 class JobClientFactory(pb.PBClientFactory, log.Loggable):
     """
     I am a client factory that logs in to the WorkerBrain.
-    I live in the flumotion-worker job process.
+    I live in the flumotion-job process spawned by the worker.
 
     @cvar medium: the medium for the JobHeaven to access us through
     @type medium: L{JobMedium}
@@ -365,3 +368,4 @@
         self.debug('shutting down medium')
         self.medium.shutdown()
         self.debug('shut down medium')
+


More information about the flumotion-commit mailing list