msmith - in flumotion/trunk: . flumotion/worker
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Thu Mar 8 16:05:22 CET 2007
Author: msmith
Date: Thu Mar 8 16:05:19 2007
New Revision: 4602
Modified:
flumotion/trunk/ChangeLog
flumotion/trunk/flumotion/worker/main.py
flumotion/trunk/flumotion/worker/worker.py
Log:
* flumotion/worker/worker.py:
WorkerBrain now only binds sockets in listen(), not in __init__,
which returns False if it fails.
Make sure various functions don't fail if we didn't call listen()
* flumotion/worker/main.py:
Create WorkerBrain and call listen() on it before daemonizing.
If listen() fails, print an error and exit; this is fatal.
Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog (original)
+++ flumotion/trunk/ChangeLog Thu Mar 8 16:05:19 2007
@@ -1,3 +1,14 @@
+2007-03-08 Michael Smith <msmith at fluendo.com>
+
+ * flumotion/worker/worker.py:
+ WorkerBrain now only binds sockets in listen(), not in __init__,
+ which returns False if it fails.
+ Make sure various functions don't fail if we didn't call listen()
+
+ * flumotion/worker/main.py:
+ Create WorkerBrain and call listen() on it before daemonizing.
+ If listen() fails, print an error and exit; this is fatal.
+
2007-03-07 Michael Smith <msmith at fluendo.com>
* flumotion/worker/worker.py:
Modified: flumotion/trunk/flumotion/worker/main.py
==============================================================================
--- flumotion/trunk/flumotion/worker/main.py (original)
+++ flumotion/trunk/flumotion/worker/main.py Thu Mar 8 16:05:19 2007
@@ -232,6 +232,13 @@
'ERROR: --service-name can only be used with -D/--daemonize.\n')
return 1
+ brain = worker.WorkerBrain(options)
+
+ # Now bind and listen to our unix and tcp sockets
+ if not brain.listen():
+ sys.stderr.write('ERROR: Failed to listen on worker ports.\n')
+ return 1
+
name = options.name
if options.daemonize:
if options.serviceName:
@@ -239,11 +246,6 @@
common.startup("worker", name, options.daemonize, options.daemonizeTo)
- # register all package paths (FIXME: this should go away when
- # components come from manager)
- from flumotion.common import setup
- setup.setupPackagePath()
-
log.debug('worker', 'Running Flumotion version %s' %
configure.version)
import twisted.copyright
@@ -252,9 +254,12 @@
log.debug('worker', 'Running against GStreamer version %s' %
configure.gst_version)
- # create a brain and have it remember the manager to direct jobs to
- brain = worker.WorkerBrain(options)
+ # register all package paths (FIXME: this should go away when
+ # components come from manager)
+ from flumotion.common import setup
+ setup.setupPackagePath()
+ # create a brain and have it remember the manager to direct jobs to
# connect the brain to the manager
if options.transport == "tcp":
reactor.connectTCP(options.host, options.port,
@@ -278,8 +283,7 @@
)
brain.login(authenticator)
- reactor.addSystemEventTrigger('before', 'shutdown',
- brain.shutdownHandler)
+ reactor.addSystemEventTrigger('before', 'shutdown', brain.shutdownHandler)
# go into the reactor main loop
reactor.run()
Modified: flumotion/trunk/flumotion/worker/worker.py
==============================================================================
--- flumotion/trunk/flumotion/worker/worker.py (original)
+++ flumotion/trunk/flumotion/worker/worker.py Thu Mar 8 16:05:19 2007
@@ -35,7 +35,7 @@
from twisted.internet import defer, reactor
from twisted.spread import pb
import twisted.cred.error
-import twisted.internet.error
+from twisted.internet import error
from flumotion.common import errors, interfaces, log, bundleclient
from flumotion.common import common, medium, messages, worker
@@ -166,8 +166,8 @@
"""
Return the TCP port the Feed Server is listening on.
- @rtype: int
- @return: TCP port number
+ @rtype: int, or NoneType
+ @return: TCP port number, or None if there is no feed server
"""
port = self.brain.feedServerPort
return port
@@ -553,19 +553,40 @@
self._port = None # port for unix domain socket, set from _setup
- self._jobServerFactory, self._jobServerPort = self._setupJobServer()
+ self._jobServerFactory = None
+ self._jobServerPort = None
self._feedServerFactory = feed.feedServerFactory(self)
- # set up feed server if we have the feederports for it
self._feedServerPort = None # twisted port
self.feedServerPort = None # port number
- self._setupFeedServer()
self._createDeferreds = {} # avatarId => deferred that will fire
# when the job attaches
self._shutdownDeferreds = {} # avatarId => deferred for shutting
# down jobs; fires when job is reaped
+ def listen(self):
+ """
+ Start listening on FeedServer (incoming eater requests) and
+ JobServer (through which we communicate with our children) ports
+
+ @returns: True if we successfully listened on both ports
+ """
+ # set up feed server if we have the feederports for it
+ try:
+ self._setupFeedServer()
+ except error.CannotListenError, e:
+ self.warning("Failed to listen on feed server port: %r", e)
+ return False
+
+ try:
+ self._jobServerFactory, self._jobServerPort = self._setupJobServer()
+ except error.CannotListenError, e:
+ self.warning("Failed to listen on job server port: %r", e)
+ return False
+
+ return True
+
def login(self, authenticator):
self.authenticator = authenticator
self.workerClientFactory.startLogin(authenticator)
@@ -574,7 +595,6 @@
"""
@returns: (factory, port)
"""
- # called from __init__
dispatcher = JobDispatcher(self.jobHeaven)
# FIXME: we should hand a username and password to log in with to
# the job process instead of allowing anonymous
@@ -598,7 +618,6 @@
"""
@returns: (port, portNumber)
"""
- # called from __init__
try:
self.feedServerPort = self.options.feederports[-1]
except IndexError:
@@ -620,9 +639,13 @@
the teardown is completed
"""
self.debug("cleaning up port %r" % self._port)
- d = self._jobServerPort.stopListening()
- d.addCallback(lambda r: self._feedServerPort.stopListening())
- return d
+ dl = []
+ if self._jobServerPort:
+ dl.append(self._jobServerPort.stopListening())
+ if self._feedServerPort:
+ dl.append(self._feedServerPort.stopListening())
+
+ return defer.DeferredList(dl)
def callRemote(self, methodName, *args, **kwargs):
return self.medium.callRemote(methodName, *args, **kwargs)
More information about the flumotion-commit
mailing list