msmith - in flumotion/trunk: . flumotion/component flumotion/job
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Wed Feb 28 15:57:44 CET 2007
Author: msmith
Date: Wed Feb 28 15:57:41 2007
New Revision: 4536
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/component/component.py
flumotion/trunk/flumotion/job/job.py
Log:
* flumotion/component/component.py:
* flumotion/job/job.py:
Rewrite component/job shutdown to have a clean shutdown path going
only through the job.
Ensure through this path that we only ever stop the reactor once.
Should fix a variety of shutdown races triggered by calling
reactor.stop() from a reactor shutdown hook.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Wed Feb 28 15:57:41 2007
@@ -1,5 +1,15 @@
2007-02-28 Michael Smith <msmith at fluendo.com>
+ * flumotion/component/component.py:
+ * flumotion/job/job.py:
+ Rewrite component/job shutdown to have a clean shutdown path going
+ only through the job.
+ Ensure through this path that we only ever stop the reactor once.
+ Should fix a variety of shutdown races triggered by calling
+ reactor.stop() from a reactor shutdown hook.
+
+2007-02-28 Michael Smith <msmith at fluendo.com>
+
* flumotion/component/feedcomponent010.py:
client-removed has an extra argument; make sure we have it.
Modified: flumotion/trunk/flumotion/component/component.py
==============================================================================
--- flumotion/trunk/flumotion/component/component.py (original)
+++ flumotion/trunk/flumotion/component/component.py Wed Feb 28 15:57:41 2007
@@ -149,8 +149,6 @@
self.comp = component
self.authenticator = None
- self.reactor_stopped = False
-
### our methods
def setup(self, config):
pass
@@ -234,22 +232,8 @@
return self.comp.start(*args, **kwargs)
def remote_stop(self):
- self.info('Stopping job')
- d = self.comp.stop()
- # We want to stop the process even if the component stop fails for some
- # reason - otherwise we end up unstoppable.
- d.addBoth(self._destroyCallback)
-
- return d
-
- def _destroyCallback(self, result):
- self.debug('_destroyCallback: scheduling destroy')
- reactor.callLater(0, self._destroyCallLater)
-
- def _destroyCallLater(self):
- self.debug('_destroyCalllater: stopping reactor')
- self.reactor_stopped = True
- reactor.stop()
+ self.info('Stopping component')
+ return self.comp.stop()
def remote_reloadComponent(self):
"""Reload modules in the component."""
@@ -355,6 +339,8 @@
# Start the cpu-usage updating.
self._cpuCallLater = reactor.callLater(5, self._updateCPUUsage)
+ self._shutdownHook = None
+
def do_check(self):
"""
Subclasses can implement me to run any checks before the component
@@ -531,6 +517,13 @@
self.debug("Exception during component do_start: %s" %
log.getExceptionMessage(e))
return defer.fail(e)
+
+ def setShutdownHook(self, shutdownHook):
+ """
+ Set the shutdown hook for this component (replacing any previous hook).
+ When a component is stopped, then this hook will be fired.
+ """
+ self._shutdownHook = shutdownHook
def stop(self):
"""
@@ -547,6 +540,11 @@
plug.stop(self)
return ret
+ def fireShutdownHook(ret):
+ if self._shutdownHook:
+ self.debug('_stoppedCallback: firing shutdown hook')
+ self._shutdownHook()
+
self.setMood(moods.waking)
for message in self.state.get('messages'):
self.state.remove('messages', message)
@@ -557,6 +555,7 @@
d = self.do_stop()
d.addCallback(stop_plugs)
+ d.addBoth(fireShutdownHook)
return d
### GObject methods
Modified: flumotion/trunk/flumotion/job/job.py
==============================================================================
--- flumotion/trunk/flumotion/job/job.py (original)
+++ flumotion/trunk/flumotion/job/job.py Wed Feb 28 15:57:41 2007
@@ -74,6 +74,8 @@
self._managerKeycard = None
self._componentClientFactory = None # from component to manager
+ self._hasStoppedReactor = False
+
### pb.Referenceable remote methods called on by the WorkerBrain
def remote_bootstrap(self, workerName, host, port, transport, authenticator,
packagePaths):
@@ -137,12 +139,19 @@
self.component = self._createComponent(avatarId, type, moduleName,
methodName, nice)
+ self.component.setShutdownHook(self._componentStopped)
- def remote_stop(self):
- # stop reactor from a callLater so this remote method finishes
- # nicely
+ def _componentStopped(self):
+ # stop reactor from a callLater so remote methods finish nicely
reactor.callLater(0, self.shutdown)
+ def remote_stop(self):
+ if self.component:
+ self.debug('stopping component and shutting down')
+ self.component.stop()
+ else:
+ reactor.callLater(0, self.shutdown)
+
def shutdownHandler(self):
dlist = []
if self.hasRemoteReference():
@@ -162,18 +171,12 @@
Shut down the job process completely, cleaning up the component
so the reactor can be left from.
"""
- reactorAlreadyStopped = False
- if self.component:
- self.debug('stopping component')
- self.component.stop()
- if self.component.medium and self.component.medium.reactor_stopped:
- reactorAlreadyStopped = True
- self.debug('stopped component')
-
- if not reactorAlreadyStopped:
- self.debug('stopping reactor')
+ if self._hasStoppedReactor:
+ self.debug("Not stopping reactor again, already shutting down")
+ else:
+ self._hasStoppedReactor = True
+ self.info("Stopping reactor in job process")
reactor.stop()
- self.debug('reactor stopped, exiting process')
def _setNice(self, nice):
if not nice:
@@ -310,7 +313,7 @@
class JobClientFactory(pb.PBClientFactory, log.Loggable):
"""
I am a client factory that logs in to the WorkerBrain.
- I live in the flumotion-worker job process.
+ I live in the flumotion-job process spawned by the worker.
@cvar medium: the medium for the JobHeaven to access us through
@type medium: L{JobMedium}
@@ -365,3 +368,4 @@
self.debug('shutting down medium')
self.medium.shutdown()
self.debug('shut down medium')
+
More information about the flumotion-commit
mailing list