msmith - in flumotion/trunk: . flumotion/worker

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Thu Mar 8 16:05:22 CET 2007


Author: msmith
Date: Thu Mar  8 16:05:19 2007
New Revision: 4602

Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/worker/main.py
   flumotion/trunk/flumotion/worker/worker.py
Log:
        * flumotion/worker/worker.py:
          WorkerBrain now only binds sockets in listen(), not in __init__,
          which returns False if it fails.
          Make sure various functions don't fail if we didn't call listen()

        * flumotion/worker/main.py:
          Create WorkerBrain and call listen() on it before daemonizing.
          If listen() fails, print an error and exit; this is fatal.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Thu Mar  8 16:05:19 2007
@@ -1,3 +1,14 @@
+2007-03-08  Michael Smith  <msmith at fluendo.com>
+
+	* flumotion/worker/worker.py:
+	  WorkerBrain now only binds sockets in listen(), not in __init__,
+	  which returns False if it fails.
+	  Make sure various functions don't fail if we didn't call listen()
+
+	* flumotion/worker/main.py:
+	  Create WorkerBrain and call listen() on it before daemonizing.
+	  If listen() fails, print an error and exit; this is fatal.
+
 2007-03-07  Michael Smith  <msmith at fluendo.com>
 
 	* flumotion/worker/worker.py:

Modified: flumotion/trunk/flumotion/worker/main.py
==============================================================================
--- flumotion/trunk/flumotion/worker/main.py	(original)
+++ flumotion/trunk/flumotion/worker/main.py	Thu Mar  8 16:05:19 2007
@@ -232,6 +232,13 @@
             'ERROR: --service-name can only be used with -D/--daemonize.\n')
         return 1
 
+    brain = worker.WorkerBrain(options)
+
+    # Now bind and listen to our unix and tcp sockets
+    if not brain.listen():
+        sys.stderr.write('ERROR: Failed to listen on worker ports.\n')
+        return 1
+
     name = options.name
     if options.daemonize:
         if options.serviceName:
@@ -239,11 +246,6 @@
 
     common.startup("worker", name, options.daemonize, options.daemonizeTo)
 
-    # register all package paths (FIXME: this should go away when
-    # components come from manager)
-    from flumotion.common import setup
-    setup.setupPackagePath()
-
     log.debug('worker', 'Running Flumotion version %s' %
         configure.version)
     import twisted.copyright
@@ -252,9 +254,12 @@
     log.debug('worker', 'Running against GStreamer version %s' %
         configure.gst_version)
 
-    # create a brain and have it remember the manager to direct jobs to
-    brain = worker.WorkerBrain(options)
+    # register all package paths (FIXME: this should go away when
+    # components come from manager)
+    from flumotion.common import setup
+    setup.setupPackagePath()
 
+    # create a brain and have it remember the manager to direct jobs to
     # connect the brain to the manager
     if options.transport == "tcp":
         reactor.connectTCP(options.host, options.port,
@@ -278,8 +283,7 @@
     )
     brain.login(authenticator)
 
-    reactor.addSystemEventTrigger('before', 'shutdown',
-        brain.shutdownHandler)
+    reactor.addSystemEventTrigger('before', 'shutdown', brain.shutdownHandler)
 
     # go into the reactor main loop
     reactor.run()

Modified: flumotion/trunk/flumotion/worker/worker.py
==============================================================================
--- flumotion/trunk/flumotion/worker/worker.py	(original)
+++ flumotion/trunk/flumotion/worker/worker.py	Thu Mar  8 16:05:19 2007
@@ -35,7 +35,7 @@
 from twisted.internet import defer, reactor
 from twisted.spread import pb
 import twisted.cred.error
-import twisted.internet.error
+from twisted.internet import error
 
 from flumotion.common import errors, interfaces, log, bundleclient
 from flumotion.common import common, medium, messages, worker
@@ -166,8 +166,8 @@
         """
         Return the TCP port the Feed Server is listening on.
 
-        @rtype:  int
-        @return: TCP port number
+        @rtype:  int, or NoneType
+        @return: TCP port number, or None if there is no feed server
         """
         port = self.brain.feedServerPort
         return port
@@ -553,19 +553,40 @@
 
         self._port = None # port for unix domain socket, set from _setup
 
-        self._jobServerFactory, self._jobServerPort = self._setupJobServer()
+        self._jobServerFactory = None
+        self._jobServerPort = None
         self._feedServerFactory = feed.feedServerFactory(self)
 
-        # set up feed server if we have the feederports for it
         self._feedServerPort = None # twisted port
         self.feedServerPort = None # port number
-        self._setupFeedServer()
 
         self._createDeferreds = {} # avatarId => deferred that will fire
                                    # when the job attaches
         self._shutdownDeferreds = {} # avatarId => deferred for shutting
                                    # down jobs; fires when job is reaped
 
+    def listen(self):
+        """
+        Start listening on FeedServer (incoming eater requests) and 
+        JobServer (through which we communicate with our children) ports
+
+        @returns: True if we successfully listened on both ports
+        """
+        # set up feed server if we have the feederports for it
+        try:
+            self._setupFeedServer()
+        except error.CannotListenError, e:
+            self.warning("Failed to listen on feed server port: %r", e)
+            return False
+
+        try:
+            self._jobServerFactory, self._jobServerPort = self._setupJobServer()
+        except error.CannotListenError, e:
+            self.warning("Failed to listen on job server port: %r", e)
+            return False
+
+        return True
+
     def login(self, authenticator):
         self.authenticator = authenticator
         self.workerClientFactory.startLogin(authenticator)
@@ -574,7 +595,6 @@
         """
         @returns: (factory, port)
         """
-        # called from __init__
         dispatcher = JobDispatcher(self.jobHeaven)
         # FIXME: we should hand a username and password to log in with to
         # the job process instead of allowing anonymous
@@ -598,7 +618,6 @@
         """
         @returns: (port, portNumber)
         """
-        # called from __init__
         try:
             self.feedServerPort = self.options.feederports[-1]
         except IndexError:
@@ -620,9 +639,13 @@
                   the teardown is completed
         """
         self.debug("cleaning up port %r" % self._port)
-        d = self._jobServerPort.stopListening()
-        d.addCallback(lambda r: self._feedServerPort.stopListening())
-        return d
+        dl = []
+        if self._jobServerPort:
+            dl.append(self._jobServerPort.stopListening())
+        if self._feedServerPort:
+            dl.append(self._feedServerPort.stopListening())
+
+        return defer.DeferredList(dl)
 
     def callRemote(self, methodName, *args, **kwargs):
         return self.medium.callRemote(methodName, *args, **kwargs)


More information about the flumotion-commit mailing list