zaheer - in flumotion/branches/platform-3-a3: . flumotion/component/base

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Thu Jun 7 19:20:17 CEST 2007


Author: zaheer
Date: Thu Jun  7 19:20:14 2007
New Revision: 5131

Added:
   flumotion/branches/platform-3-a3/flumotion/component/base/scheduler.py
   flumotion/branches/platform-3-a3/flumotion/component/base/watcher.py
Modified:
   flumotion/branches/platform-3-a3/ChangeLog
   flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am
   flumotion/branches/platform-3-a3/flumotion/component/base/base.xml
Log:
	* flumotion/component/base/Makefile.am:
	* flumotion/component/base/base.xml:
	* flumotion/component/base/scheduler.py (LocalTimezone,
	  LocalTimezone.utcoffset, LocalTimezone.dst, LocalTimezone.tzname,
	  LocalTimezone._isdst, now, Event, Event.__init__, Event.reschedule,
	  Event.toTuple, Event.__lt__, Event.__gt__, Event.__eq__, Scheduler,
	  Scheduler.__init__, Scheduler.addEvent, Scheduler.removeEvent,
	  Scheduler.getCurrentEvents, Scheduler.addEvents,
	  Scheduler.replaceEvents, Scheduler.subscribe, Scheduler.unsubscribe,
	  Scheduler._eventStarted, Scheduler._eventStopped,
	  Scheduler._reschedule, Scheduler._getNextStart,
	  Scheduler._getNextStop, Scheduler.doStart, Scheduler.doStop,
	  Scheduler.toSeconds, ICalScheduler, ICalScheduler.__init__,
	  ICalScheduler.parseCalendarFromFile, ICalScheduler.fileChanged,
	  ICalScheduler.parseCalendar):
	* flumotion/component/base/watcher.py (BaseWatcher,
	  BaseWatcher.__init__, BaseWatcher._reset, BaseWatcher._subscribe,
	  BaseWatcher.subscribe, BaseWatcher.unsubscribe, BaseWatcher.event,
	  BaseWatcher.start, BaseWatcher.checkFiles, BaseWatcher.stop,
	  BaseWatcher.getFileData, BaseWatcher.getFilesToStat,
	  DirectoryWatcher, DirectoryWatcher.__init__,
	  DirectoryWatcher.getFilesToStat, FilesWatcher,
	  FilesWatcher.__init__):
	Copied from trunk, scheduler and watcher code.



Modified: flumotion/branches/platform-3-a3/ChangeLog
==============================================================================
--- flumotion/branches/platform-3-a3/ChangeLog	(original)
+++ flumotion/branches/platform-3-a3/ChangeLog	Thu Jun  7 19:20:14 2007
@@ -1,3 +1,30 @@
+2007-06-07  Zaheer Abbas Merali  <zaheerabbas at merali dot org>
+
+	* flumotion/component/base/Makefile.am:
+	* flumotion/component/base/base.xml:
+	* flumotion/component/base/scheduler.py (LocalTimezone,
+	  LocalTimezone.utcoffset, LocalTimezone.dst, LocalTimezone.tzname,
+	  LocalTimezone._isdst, now, Event, Event.__init__, Event.reschedule,
+	  Event.toTuple, Event.__lt__, Event.__gt__, Event.__eq__, Scheduler,
+	  Scheduler.__init__, Scheduler.addEvent, Scheduler.removeEvent,
+	  Scheduler.getCurrentEvents, Scheduler.addEvents,
+	  Scheduler.replaceEvents, Scheduler.subscribe, Scheduler.unsubscribe,
+	  Scheduler._eventStarted, Scheduler._eventStopped,
+	  Scheduler._reschedule, Scheduler._getNextStart,
+	  Scheduler._getNextStop, Scheduler.doStart, Scheduler.doStop,
+	  Scheduler.toSeconds, ICalScheduler, ICalScheduler.__init__,
+	  ICalScheduler.parseCalendarFromFile, ICalScheduler.fileChanged,
+	  ICalScheduler.parseCalendar):
+	* flumotion/component/base/watcher.py (BaseWatcher,
+	  BaseWatcher.__init__, BaseWatcher._reset, BaseWatcher._subscribe,
+	  BaseWatcher.subscribe, BaseWatcher.unsubscribe, BaseWatcher.event,
+	  BaseWatcher.start, BaseWatcher.checkFiles, BaseWatcher.stop,
+	  BaseWatcher.getFileData, BaseWatcher.getFilesToStat,
+	  DirectoryWatcher, DirectoryWatcher.__init__,
+	  DirectoryWatcher.getFilesToStat, FilesWatcher,
+	  FilesWatcher.__init__):
+	Copied from trunk, scheduler and watcher code.
+
 2007-06-06  Zaheer Abbas Merali  <zaheerabbas at merali dot org>
 
 	* TODO:

Modified: flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am
==============================================================================
--- flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am	(original)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am	Thu Jun  7 19:20:14 2007
@@ -1,6 +1,12 @@
 include $(top_srcdir)/common/python.mk
-                                                                                
-component_PYTHON = __init__.py admin_gtk.py admin_text.py http.py
+
+component_PYTHON = __init__.py \
+	admin_gtk.py \
+	admin_text.py \
+	http.py \
+	scheduler.py \
+	watcher.py
+
 componentdir = $(libdir)/flumotion/python/flumotion/component/base
 component_DATA = base.xml feeders.glade eaters.glade
 

Modified: flumotion/branches/platform-3-a3/flumotion/component/base/base.xml
==============================================================================
--- flumotion/branches/platform-3-a3/flumotion/component/base/base.xml	(original)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/base.xml	Thu Jun  7 19:20:14 2007
@@ -35,5 +35,26 @@
         </directory>
       </directories>
     </bundle>
+    <bundle name="base-scheduler">
+      <dependencies>
+        <dependency name="flumotion" />
+        <dependency name="base-watcher" />
+      </dependencies>
+      <directories>
+        <directory name="flumotion/component/base">
+          <filename location="scheduler.py" />
+        </directory>
+      </directories>
+    </bundle>
+    <bundle name="base-watcher">
+      <dependencies>
+        <dependency name="flumotion" />
+      </dependencies>
+      <directories>
+        <directory name="flumotion/component/base">
+          <filename location="watcher.py" />
+        </directory>
+      </directories>
+    </bundle>
   </bundles>
 </registry> 

Added: flumotion/branches/platform-3-a3/flumotion/component/base/scheduler.py
==============================================================================
--- (empty file)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/scheduler.py	Thu Jun  7 19:20:14 2007
@@ -0,0 +1,367 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+
+import time
+from datetime import datetime, timedelta, tzinfo
+
+from twisted.internet import reactor
+
+from flumotion.common import log, avltree
+from flumotion.component.base import watcher
+
+
+# A class capturing the platform's idea of local time, from the
+# documentation of datetime.tzinfo.
+class LocalTimezone(tzinfo):
+    STDOFFSET = timedelta(seconds=-time.timezone)
+    if time.daylight:
+        DSTOFFSET = timedelta(seconds=-time.altzone)
+    else:
+        DSTOFFSET = STDOFFSET
+    DSTDIFF = DSTOFFSET - STDOFFSET
+    ZERO = timedelta(0)
+
+    def utcoffset(self, dt):
+        if self._isdst(dt):
+            return self.DSTOFFSET
+        else:
+            return self.STDOFFSET
+
+    def dst(self, dt):
+        if self._isdst(dt):
+            return self.DSTDIFF
+        else:
+            return self.ZERO
+
+    def tzname(self, dt):
+        return time.tzname[self._isdst(dt)]
+
+    def _isdst(self, dt):
+        tt = (dt.year, dt.month, dt.day,
+              dt.hour, dt.minute, dt.second,
+              dt.weekday(), 0, -1)
+        return time.localtime(time.mktime(tt)).tm_isdst > 0
+LOCAL = LocalTimezone()
+
+
+def now(tz=LOCAL):
+    return datetime.now(tz)
+
+
+class Event(log.Loggable):
+    """
+    I am an event. I have a start and stop time and a "content" that can
+    be anything. I can recur.
+    """
+
+    def __init__(self, start, end, content, recur=None, now=None):
+        self.debug('new event, content=%r, start=%r, end=%r', content,
+                   start, end)
+
+        if recur:
+            from dateutil import rrule
+            startRecurRule = rrule.rrulestr(recur, dtstart=start)
+            endRecurRule = rrule.rrulestr(recur, dtstart=end) 
+            if now is None:
+                now = datetime.now(LOCAL)
+            if end < now:
+                end = endRecurRule.after(now)
+                start = startRecurRule.before(end)
+                self.debug("adjusting start and end times to %r, %r",
+                           start, end)
+
+        if not start.tzinfo:
+            self.info('event starting at %r does not have timezone '
+                      'info; using local time zone', start)
+            start = start.replace(tzinfo=LOCAL)
+        if not end.tzinfo:
+            self.info('event ending at %r does not have timezone '
+                      'info; using local time zone', end)
+            end = end.replace(tzinfo=LOCAL)
+
+        self.start = start
+        self.end = end
+        self.content = content
+        self.recur = recur
+
+    def reschedule(self, now=None):
+        if self.recur:
+            return Event(self.start, self.end, self.content, self.recur,
+                         now)
+        else:
+            return None
+
+    def toTuple(self):
+        return self.start, self.end, self.content, self.recur
+
+    def __lt__(self, other):
+        return self.toTuple() < other.toTuple()
+
+    def __gt__(self, other):
+        return self.toTuple() > other.toTuple()
+
+    def __eq__(self, other):
+        return self.toTuple() == other.toTuple()
+
+
+class Scheduler(log.Loggable):
+    """
+    I keep track of upcoming events.
+    
+    I can provide notifications when events stop and start, and maintain
+    a set of current events.
+    """
+
+    def __init__(self):
+        self.current = []
+        self._delayedCall = None
+        self._subscribeId = 0
+        self.subscribers = {}
+        self.replaceEvents([])
+
+    def addEvent(self, start, end, content, recur=None, now=None):
+        """Add a new event to the scheduler.
+
+        @param start: wall-clock time of event start
+        @type  start: datetime
+        @param   end: wall-clock time of event end
+        @type    end: datetime
+        @param content: content of this event
+        @type  content: str
+        @param recur: recurrence rule
+        @type  recur: str
+
+        @returns: an Event that can later be passed to removeEvent, if
+        so desired. The event will be removed or rescheduled
+        automatically when it stops.
+        """
+        if now is None:
+            now = datetime.now(LOCAL)
+        event = Event(start, end, content, recur, now)
+        if event.end < now:
+            self.warning('attempted to schedule event in the past: %r',
+                         event)
+        else:
+            self.events.insert(event)
+            if event.start < now:
+                self._eventStarted(event)
+        self._reschedule()
+        return event
+
+    def removeEvent(self, event):
+        """Remove an event from the scheduler.
+
+        @param event: an event, as returned from addEvent()
+        @type  event: Event
+        """
+        currentEvent = event.reschedule() or event
+        self.events.delete(currentEvent)
+        if currentEvent in self.current:
+            self._eventStopped(currentEvent)
+        self._reschedule()
+
+    def getCurrentEvents(self):
+        return [e.content for e in self.current]
+
+    def addEvents(self, events):
+        """
+        Add a new list of events to the schedule.
+
+        @param events: the new events
+        @type  events: a new set of events
+        """
+        now = datetime.now()
+        for event in events:
+            if event.end > now:
+                self.events.insert(event)
+                if event.start < now:
+                    self._eventStarted(event)
+        if events:
+            self._reschedule()
+
+    def replaceEvents(self, events):
+        """Replace the set of events in the scheduler.
+
+        This function is different than simply removing all events then
+        adding new ones, because it tries to avoid spurious
+        stopped/start notifications.
+
+        @param events: the new events
+        @type  events: a sequence of Event
+        """
+        now = datetime.now(LOCAL)
+        self.events = avltree.AVLTree(events)
+        current = []
+        for event in self.events:
+            if now < event.start:
+                break
+            elif event.end < now:
+                # yay functional trees: we don't modify the iterator
+                self.events.delete(event)
+            else:
+                current.append(event)
+        for event in self.current[:]:
+            if event not in current:
+                self._eventStopped(event)
+        for event in current:
+            if event not in self.current:
+                self._eventStarted(event)
+        assert self.current == current
+        self._reschedule()
+        
+    def subscribe(self, eventStarted, eventStopped):
+        """Subscribe to event happenings in the scheduler.
+
+        @param eventStarted: Function that will be called when an event
+        starts.
+        @type eventStarted: Event -> None
+        @param eventStopped: Function that will be called when an event
+        stops.
+        @type eventStopped: Event -> None
+
+        @returns: A subscription ID that can later be passed to
+        unsubscribe().
+        """
+        sid = self._subscribeId
+        self._subscribeId += 1
+        self.subscribers[sid] = (eventStarted, eventStopped)
+        return sid
+
+    def unsubscribe(self, id):
+        """Unsubscribe from event happenings in the scheduler.
+
+        @param id: Subscription ID received from subscribe()
+        """
+        del self.subscribers[id]
+
+    def _eventStarted(self, event):
+        self.current.append(event)
+        for started, _ in self.subscribers.values():
+            started(event.content)
+
+    def _eventStopped(self, event):
+        self.current.remove(event)
+        for _, stopped in self.subscribers.values():
+            stopped(event.content)
+
+    def _reschedule(self):
+        def _getNextStart():
+            for event in self.events:
+                if event not in self.current:
+                    return event
+            return None
+
+        def _getNextStop():
+            t = None
+            e = None
+            for event in self.current:
+                if not t or event.end < t:
+                    t = event.end
+                    e = event
+            return e
+
+        def doStart(e):
+            self._eventStarted(e)
+            self._reschedule()
+            
+        def doStop(e):
+            self._eventStopped(e)
+            self.events.delete(e)
+            new = e.reschedule()
+            if new:
+                self.events.insert(new)
+            self._reschedule()
+            
+        if self._delayedCall:
+            if self._delayedCall.active():
+                self._delayedCall.cancel()
+            self._delayedCall = None
+
+        start = _getNextStart()
+        stop = _getNextStop()
+        now = datetime.now(LOCAL)
+
+        def toSeconds(td):
+            return max(td.days*24*3600 + td.seconds + td.microseconds/1e6, 0)
+
+        if start and (not stop or start.start < stop.end):
+            dc = reactor.callLater(toSeconds(start.start - now),
+                                   doStart, start)
+        elif stop:
+            dc = reactor.callLater(toSeconds(stop.end - now),
+                                   doStop, stop)
+        else:
+            dc = None
+
+        self._delayedCall = dc
+
+
+class ICalScheduler(Scheduler):
+    """
+    I am a scheduler that takes its data from an ical file.
+    """
+
+    def __init__(self, fileObj):
+        from icalendar import Calendar
+
+        Scheduler.__init__(self)
+
+        def parseCalendarFromFile(f):
+            cal = Calendar.from_string(f.read())
+            events = self.parseCalendar(cal)
+            self.replaceEvents(events)
+        parseCalendarFromFile(fileObj)
+
+        if hasattr(fileObj, 'name'):
+            def fileChanged(f):
+                parseCalendarFromFile(open(f,'r'))
+            self.watcher = watcher.FilesWatcher([fileObj.name])
+            self.watcher.subscribe(fileChanged=fileChanged)
+            self.watcher.start()
+
+    def parseCalendar(self, cal):
+        """
+        Take a Calendar object and return a list of
+        Event objects.
+
+        @param cal: The calendar to "parse"
+        @type  cal: icalendar.Calendar
+        @rtype List of {flumotion.component.base.scheduler.Event}
+        """
+        events = []
+        for event in cal.walk('vevent'):
+            start = event.decoded('dtstart', None)
+            end = event.decoded('dtend', None)
+            summary = event.decoded('summary', None)
+            recur = event.get('rrule', None)
+            if start and end:
+                self.debug("start %r tzname %s end %r recur %r", start, 
+                    start.tzname(), end, recur)
+                if recur:
+                    e = Event(start, end, summary, recur.ical())
+                else:
+                    e = Event(start, end, summary)
+                events.append(e)
+            else:
+                self.warning('ical has event without start or end: '
+                             '%r', event)
+        return events

Added: flumotion/branches/platform-3-a3/flumotion/component/base/watcher.py
==============================================================================
--- (empty file)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/watcher.py	Thu Jun  7 19:20:14 2007
@@ -0,0 +1,199 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+
+import os
+
+from twisted.internet import reactor
+
+from flumotion.common import log
+
+class BaseWatcher(log.Loggable):
+    """I watch for file changes.
+
+    I am a base class for a file watcher. I can be specialized to watch
+    any set of files.
+    """
+
+    def __init__(self, timeout):
+        """Make a file watcher object.
+        
+        @param timeout: timeout between checks, in seconds
+        @type timeout: int
+        """
+        self.timeout = timeout
+        self._reset()
+        self._subscribeId = 0
+        self.subscribers = {}
+
+    def _reset(self):
+        self._stableData = {}
+        self._changingData = {}
+        self._delayedCall = None
+
+    def _subscribe(self, **events):
+        """Subscribe to events.
+
+        @param kwargs: The events to subscribe to. Subclasses are
+        expected to formalize this dict, specifying which events they
+        support via declaring their kwargs explicitly.
+
+        @returns: A subscription ID that can later be passed to
+        unsubscribe().
+        """
+        sid = self._subscribeId
+        self._subscribeId += 1
+        self.subscribers[sid] = events
+        return sid
+
+    def subscribe(self, fileChanged=None, fileDeleted=None):
+        """Subscribe to events.
+
+        @param fileChanged: A function to call when a file changes. This
+        function will only be called if the file's details (size, mtime)
+        do not change during the timeout period.
+        @type fileChanged: filename -> None
+        @param fileDeleted: A function to call when a file is deleted.
+        @type fileDeleted: filename -> None
+        
+        @returns: A subscription ID that can later be passed to
+        unsubscribe().
+        """
+        return self._subscribe(fileChanged=fileChanged,
+                               fileDeleted=fileDeleted)
+
+    def unsubscribe(self, id):
+        """Unsubscribe from file change notifications.
+
+        @param id: Subscription ID received from subscribe()
+        """
+        del self.subscribers[id]
+
+    def event(self, event, *args, **kwargs):
+        """Fire an event.
+
+        This method is intended for use by object implementations.
+        """
+        for s in self.subscribers.values():
+            if s[event]:
+                s[event](*args, **kwargs)
+
+    # FIXME: this API has tripped up two people thus far, including its
+    # author. make subscribe() call start() if necessary?
+    def start(self):
+        """Start checking for file changes.
+        
+        Subscribers will be notified asynchronously of changes to the
+        watched files.
+        """
+        def checkFiles():
+            self.log("checking for file changes")
+            new = self.getFileData()
+            changing = self._changingData
+            stable = self._stableData
+            for f in new:
+                if f not in changing:
+                    if f in stable and new[f] == stable[f]:
+                        # no change
+                        pass
+                    else:
+                        self.debug('change start noted for %s', f)
+                        changing[f] = new[f]
+                else:
+                    if new[f] == changing[f]:
+                        self.debug('change finished for %s', f)
+                        del changing[f]
+                        stable[f] = new[f]
+                        self.event('fileChanged', f)
+                    else:
+                        self.log('change continues for %s', f)
+                        changing[f] = new[f]
+            for f in stable.keys():
+                if f not in new:
+                    # deletion
+                    del stable[f]
+                    self.debug('file %s has been deleted', f)
+                    self.event('fileDeleted', f)
+            for f in changing.keys():
+                if f not in new:
+                    self.debug('file %s has been deleted', f)
+                    del changing[f]
+            self._delayedCall = reactor.callLater(self.timeout,
+                                                  checkFiles)
+
+        assert self._delayedCall is None
+        checkFiles()
+
+    def stop(self):
+        """Stop checking for file changes.
+        """
+        self._delayedCall.cancel()
+        self._reset()
+
+    def getFileData(self):
+        """
+        @returns: a dict, {filename => DATA}
+        DATA can be anything. In the default implementation it is a pair
+        of (mtime, size).
+        """
+        ret = {}
+        for f in self.getFilesToStat():
+            try:
+                stat = os.stat(f)
+                ret[f] = (stat.st_mtime, stat.st_size)
+            except OSError, e:
+                self.debug('could not read file %f: %s', f,
+                           log.getExceptionMessage(e))
+        return ret
+
+    def getFilesToStat(self):
+        """
+        @returns: sequence of filename
+        """
+        raise NotImplementedError
+
+class DirectoryWatcher(BaseWatcher):
+    """
+    Directory Watcher
+    Watches a directory for new files.
+    """
+    
+    def __init__(self, path, ignorefiles=(), timeout=30):
+        BaseWatcher.__init__(self, timeout)
+        self.path = path
+        self._ignorefiles = ignorefiles
+
+    def getFilesToStat(self):
+        return [os.path.join(self.path, f)
+                for f in os.listdir(self.path)
+                if f not in self._ignorefiles]
+        
+class FilesWatcher(BaseWatcher):
+    """
+    Watches a collection of files for modifications.
+    """
+
+    def __init__(self, files, timeout=30):
+        BaseWatcher.__init__(self, timeout)
+        self._files = files
+
+    def getFilesToStat(self):
+        return self._files


More information about the flumotion-commit mailing list