msmith - in flumotion/trunk: . flumotion/job flumotion/worker
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Tue Jun 19 18:20:03 CEST 2007
Author: msmith
Date: Tue Jun 19 18:19:58 2007
New Revision: 5207
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/job/job.py
flumotion/trunk/flumotion/worker/job.py
flumotion/trunk/flumotion/worker/worker.py
Log:
* flumotion/job/job.py:
* flumotion/worker/job.py:
* flumotion/worker/worker.py:
Make the worker tracks jobs by PID, not avatarId, because avatarId
isn't unique when restarting components.
Fixes #683.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Tue Jun 19 18:19:58 2007
@@ -1,3 +1,12 @@
+2007-06-19 Michael Smith <msmith at fluendo.com>
+
+ * flumotion/job/job.py:
+ * flumotion/worker/job.py:
+ * flumotion/worker/worker.py:
+ Make the worker tracks jobs by PID, not avatarId, because avatarId
+ isn't unique when restarting components.
+ Fixes #683.
+
2007-06-19 Andy Wingo <wingo at pobox.com>
* configure.ac:
Modified: flumotion/trunk/flumotion/job/job.py
==============================================================================
--- flumotion/trunk/flumotion/job/job.py (original)
+++ flumotion/trunk/flumotion/job/job.py Tue Jun 19 18:19:58 2007
@@ -120,6 +120,9 @@
self.log('... from path %s' % path)
packager.registerPackagePath(path, name)
+ def remote_getPid(self):
+ return os.getpid()
+
def remote_create(self, avatarId, type, moduleName, methodName, nice=0):
"""
I am called on by the worker's JobAvatar to create a component.
Modified: flumotion/trunk/flumotion/worker/job.py
==============================================================================
--- flumotion/trunk/flumotion/worker/job.py (original)
+++ flumotion/trunk/flumotion/worker/job.py Tue Jun 19 18:19:58 2007
@@ -105,7 +105,7 @@
# otherwise the manager still thinks it's starting up when it's
# dead. If the job already attached to the worker however,
# the create deferred will already have callbacked.
- if dstarts.createRegistered(self.avatarId):
+ if dstarts.createRegistered(self.pid):
if signum:
reason = "received signal %d" % signum
else:
@@ -113,13 +113,13 @@
text = "Component '%s' has exited early (%s). " \
"This is sometimes triggered by a corrupt " \
"GStreamer registry." % (self.avatarId, reason)
- dstarts.createFailed(self.avatarId,
+ dstarts.createFailed(self.pid,
errors.ComponentCreateError(text))
- if dstarts.shutdownRegistered(self.avatarId):
- dstarts.shutdownTrigger(self.avatarId)
+ if dstarts.shutdownRegistered(self.pid):
+ dstarts.shutdownTrigger(self.pid)
- heaven.jobStopped(self.avatarId)
+ heaven.jobStopped(self.pid)
# chain up
worker.ProcessProtocol.processEnded(self, status)
@@ -129,12 +129,12 @@
self._getAvatars = getAvatars # function of no arguments,
# returns {avatarId=>avatar}
- self._createDeferreds = {} # avatarId => deferred that will fire
+ self._createDeferreds = {} # processId => deferred that will fire
# when the job attaches
- self._shutdownDeferreds = {} # avatarId => deferred for shutting
+ self._shutdownDeferreds = {} # processId => deferred for shutting
# down jobs; fires when job is reaped
- def create(self, avatarId):
+ def create(self, avatarId, processId):
"""
Create and register a deferred for creating the given component.
This deferred will be fired when the JobAvatar has instructed the
@@ -150,18 +150,18 @@
# three places: the create deferreds hash, the avatar list in
# the jobheaven, and the shutdown deferreds hash. there are four
# possible answers:
- if avatarId in self._createDeferreds:
+ if processId in self._createDeferreds:
# (1) a job is already starting: it is in the
# createdeferreds hash
- self.info('already have a create deferred for %s', avatarId)
+ self.info('already have a create deferred for %d', processId)
raise errors.ComponentAlreadyStartingError(avatarId)
- elif avatarId in self._shutdownDeferreds:
+ elif processId in self._shutdownDeferreds:
# (2) a job is shutting down; note it is also in
# heaven.avatars
self.debug('waiting for previous %s to shut down like it '
'said it would', avatarId)
def ensureShutdown(res,
- shutdown=self._shutdownDeferreds[avatarId]):
+ shutdown=self._shutdownDeferreds[processId]):
shutdown.addCallback(lambda _: res)
return shutdown
d.addCallback(ensureShutdown)
@@ -174,45 +174,43 @@
pass
self.debug('registering deferredCreate for %s', avatarId)
- self._createDeferreds[avatarId] = d
+ self._createDeferreds[processId] = d
return d
- def createTrigger(self, avatarId):
+ def createTrigger(self, avatarId, processId):
"""
Trigger a previously registered deferred for creating up the given
component.
"""
self.debug('triggering create deferred for %s', avatarId)
- if not avatarId in self._createDeferreds:
- self.warning('No create deferred registered for %s', avatarId)
+ if not processId in self._createDeferreds:
+ self.warning('No create deferred registered for %d', processId)
return
- d = self._createDeferreds[avatarId]
- del self._createDeferreds[avatarId]
+ d = self._createDeferreds.pop(processId)
# return the avatarId the component will use to the original caller
d.callback(avatarId)
- def createFailed(self, avatarId, exception):
+ def createFailed(self, processId, exception):
"""
Notify the caller that a create has failed, and remove the create
from the list of pending creates.
"""
- self.debug('create deferred failed for %s', avatarId)
- if not avatarId in self._createDeferreds:
- self.warning('No create deferred registered for %s', avatarId)
+ self.debug('create deferred failed for %d', processId)
+ if not processId in self._createDeferreds:
+ self.warning('No create deferred registered for %d', processId)
return
- d = self._createDeferreds[avatarId]
- del self._createDeferreds[avatarId]
+ d = self._createDeferreds.pop(processId)
d.errback(exception)
- def createRegistered(self, avatarId):
+ def createRegistered(self, processId):
"""
- Check if a deferred create has been registered for the given avatarId.
+ Check if a deferred create has been registered for the given processId
"""
- return avatarId in self._createDeferreds
+ return processId in self._createDeferreds
- def shutdown(self, avatarId):
+ def shutdown(self, processId):
"""
Create and register a deferred for notifying the worker of a
clean job shutdown. This deferred will be fired when the job is
@@ -220,36 +218,35 @@
@rtype: L{twisted.internet.defer.Deferred}
"""
- self.debug('making shutdown deferred for %s', avatarId)
+ self.debug('making shutdown deferred for %d', processId)
- if avatarId in self._shutdownDeferreds:
- self.warning('already have a shutdown deferred for %s',
- avatarId)
- return self._shutdownDeferreds[avatarId]
+ if processId in self._shutdownDeferreds:
+ self.warning('already have a shutdown deferred for %d',
+ processId)
+ return self._shutdownDeferreds[processId]
else:
- self.debug('registering shutdown for %s', avatarId)
+ self.debug('registering shutdown for %d', processId)
d = defer.Deferred()
- self._shutdownDeferreds[avatarId] = d
+ self._shutdownDeferreds[processId] = d
return d
- def shutdownTrigger(self, avatarId):
+ def shutdownTrigger(self, processId):
"""
- Trigger a previously registered deferred for creating up the given
- component.
+ Trigger a previously registered deferred for shutting down a process.
"""
- self.debug('triggering shutdown deferred for %s', avatarId)
- if not avatarId in self._shutdownDeferreds:
- self.warning('No shutdown deferred registered for %s', avatarId)
+ self.debug('triggering shutdown deferred for %d', processId)
+ if not processId in self._shutdownDeferreds:
+ self.warning('No shutdown deferred registered for %d', processId)
return
- d = self._shutdownDeferreds.pop(avatarId)
- d.callback(avatarId)
+ d = self._shutdownDeferreds.pop(processId)
+ d.callback(processId)
- def shutdownRegistered(self, avatarId):
+ def shutdownRegistered(self, processId):
"""
- Check if a deferred shutdown has been registered for the given avatarId.
+ Check if a deferred shutdown has been registered for the given processId
"""
- return avatarId in self._shutdownDeferreds
+ return processId in self._shutdownDeferreds
class JobHeaven(pb.Root, log.Loggable):
"""
@@ -279,7 +276,7 @@
self._onShutdown = None # If set, a deferred to fire when our last child
# process exits
- self._jobInfos = {} # avatarId -> JobInfo
+ self._jobInfos = {} # processid -> JobInfo
self._deferredStarts = DeferredStartSet(lambda: self.avatars)
@@ -311,6 +308,13 @@
else:
raise NotImplementedError("no interface")
+ def removeAvatar(self, avatarId):
+ if avatarId in self.avatars:
+ del self.avatars[avatarId]
+ else:
+ self.warning("some programmer is telling me about an avatar "
+ "I have no idea about: %s", avatarId)
+
def getManagerConnectionInfo(self):
"""
Gets the L{flumotion.common.connection.PBConnectionInfo}
@@ -351,9 +355,6 @@
component
@type bundles: list of (str, str)
"""
- # can raise ComponentAlreadyStartingError
- d = self._deferredStarts.create(avatarId)
-
p = JobProcessProtocol(self, avatarId, self._deferredStarts)
executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job')
if not os.path.exists(executable):
@@ -389,17 +390,18 @@
jobInfo = JobInfo(process.pid, avatarId, type, moduleName,
methodName, nice, bundles)
- self._jobInfos[avatarId] = jobInfo
+ self._jobInfos[process.pid] = jobInfo
+ d = self._deferredStarts.create(avatarId, process.pid)
return d
- def getJobInfo(self, avatarId):
- return self._jobInfos[avatarId]
-
+ def getJobInfo(self, processId):
+ return self._jobInfos[processId]
+
def getJobInfos(self):
return self._jobInfos.values()
-
- def getJobAvatarIds(self):
+
+ def getJobPids(self):
return self._jobInfos.keys()
def rotateChildLogFDs(self):
@@ -407,23 +409,17 @@
for avatar in self.avatars.values():
avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
- def jobStopped(self, avatarId):
- if avatarId in self.avatars:
- del self.avatars[avatarId]
- else:
- self.warning("some programmer is telling me about an avatar "
- "I have no idea about: %s", avatarId)
-
- if avatarId in self._jobInfos:
- self.debug('Removing job info for %s', avatarId)
- del self._jobInfos[avatarId]
+ def jobStopped(self, pid):
+ if pid in self._jobInfos:
+ self.debug('Removing job info for %d', pid)
+ del self._jobInfos[pid]
if not self._jobInfos and self._onShutdown:
self.debug("Last child exited")
self._onShutdown.callback(None)
else:
- self.warning("some programmer is telling me about an avatar "
- "I have no idea about: %s", avatarId)
+ self.warning("some programmer is telling me about a pid "
+ "I have no idea about: %d", pid)
def shutdown(self):
self.debug('Shutting down JobHeaven')
@@ -460,17 +456,23 @@
def kill(self, signum=signal.SIGKILL):
self.warning("Killing all children immediately")
- for avatarId in self.getJobAvatarIds():
- self.killJob(avatarId, signum)
+ for pid in self.getJobPids():
+ self.killJobByPid(pid, signum)
- def killJob(self, avatarId, signum):
- if avatarId not in self._jobInfos:
- raise errors.UnknownComponentError(avatarId)
- jobInfo = self._jobInfos[avatarId]
+ def killJobByPid(self, pid, signum):
+ if pid not in self._jobInfos:
+ raise errors.UnknownComponentError(pid)
+
+ jobInfo = self._jobInfos[pid]
self.debug("Sending signal %d to job %s at pid %d", signum,
- avatarId, jobInfo.pid)
+ jobInfo.avatarId, jobInfo.pid)
common.signalPid(jobInfo.pid, signum)
+ def killJob(self, avatarId, signum):
+ for job in self.jobInfos.values():
+ if job.avatarId == avatarId:
+ self.killJobByPid(self, job.pid, signum)
+
class JobAvatar(fpb.Avatar, log.Loggable):
"""
I am an avatar for the job living in the worker.
@@ -485,6 +487,7 @@
fpb.Avatar.__init__(self, avatarId)
self._heaven = heaven
self.setMind(mind)
+ self.pid = None
def setMind(self, mind):
"""
@@ -503,13 +506,13 @@
job.moduleName, job.methodName,
job.nice)
- def success(_, avatarId):
+ def success(_, avatarId, pid):
self.debug('job started component with avatarId %s',
avatarId)
# FIXME: drills down too much?
- self._heaven._deferredStarts.createTrigger(avatarId)
+ self._heaven._deferredStarts.createTrigger(avatarId, pid)
- def error(failure, job):
+ def error(failure, job, pid):
msg = log.getFailureMessage(failure)
if failure.check(errors.ComponentCreateError):
self.warning('could not create component %s of type %s:'
@@ -518,28 +521,33 @@
self.warning('unhandled error creating component %s: %s',
job.avatarId, msg)
# FIXME: drills down too much?
- self._heaven._deferredStarts.createFailed(job.avatarId,
+ self._heaven._deferredStarts.createFailed(pid,
failure)
-
- info = self._heaven.getManagerConnectionInfo()
- if info.use_ssl:
- transport = 'ssl'
- else:
- transport = 'tcp'
- workerName = self._heaven.getWorkerName()
- job = self._heaven.getJobInfo(self.avatarId)
-
- d = bootstrap(workerName, info.host, info.port, transport,
- info.authenticator, job.bundles)
- d.addCallback(create, job)
- d.addCallback(success, job.avatarId)
- d.addErrback(error, job)
+
+ def gotPid(pid):
+ self.pid = pid
+ info = self._heaven.getManagerConnectionInfo()
+ if info.use_ssl:
+ transport = 'ssl'
+ else:
+ transport = 'tcp'
+ job = self._heaven.getJobInfo(pid)
+ workerName = self._heaven.getWorkerName()
+
+ d = bootstrap(workerName, info.host, info.port, transport,
+ info.authenticator, job.bundles)
+ d.addCallback(create, job)
+ d.addCallback(success, job.avatarId, pid)
+ d.addErrback(error, job, pid)
+ return d
+ d = self.mindCallRemote("getPid")
+ d.addCallback(gotPid)
return d
def logout(self):
self.log('logout called, %s disconnected', self.avatarId)
- # don't need to do anything much here, we get notified via
- # sigchld
+
+ self._heaven.removeAvatar(self.avatarId)
def stop(self):
"""
@@ -621,4 +629,4 @@
"""
self.info("component %s shutting down cleanly", self.avatarId)
# FIXME: drills down too much?
- self._heaven._deferredStarts.shutdown(self.avatarId)
+ self._heaven._deferredStarts.shutdown(self.pid)
Modified: flumotion/trunk/flumotion/worker/worker.py
==============================================================================
--- flumotion/trunk/flumotion/worker/worker.py (original)
+++ flumotion/trunk/flumotion/worker/worker.py Tue Jun 19 18:19:58 2007
@@ -280,7 +280,8 @@
__import__(moduleName)
def getComponents(self):
- return self.jobHeaven.getJobAvatarIds()
+ return [job.avatarId for job in self.jobHeaven.getJobInfos()]
def killJob(self, avatarId, signum):
self.jobHeaven.killJob(avatarId, signum)
+
More information about the flumotion-commit
mailing list