zaheer - in flumotion/branches/platform-3-a3: .
flumotion/component/base
flumotion-commit at lists.fluendo.com
flumotion-commit at lists.fluendo.com
Thu Jun 7 19:20:17 CEST 2007
Author: zaheer
Date: Thu Jun 7 19:20:14 2007
New Revision: 5131
Added:
flumotion/branches/platform-3-a3/flumotion/component/base/scheduler.py
flumotion/branches/platform-3-a3/flumotion/component/base/watcher.py
Modified:
flumotion/branches/platform-3-a3/ChangeLog
flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am
flumotion/branches/platform-3-a3/flumotion/component/base/base.xml
Log:
* flumotion/component/base/Makefile.am:
* flumotion/component/base/base.xml:
* flumotion/component/base/scheduler.py (LocalTimezone,
LocalTimezone.utcoffset, LocalTimezone.dst, LocalTimezone.tzname,
LocalTimezone._isdst, now, Event, Event.__init__, Event.reschedule,
Event.toTuple, Event.__lt__, Event.__gt__, Event.__eq__, Scheduler,
Scheduler.__init__, Scheduler.addEvent, Scheduler.removeEvent,
Scheduler.getCurrentEvents, Scheduler.addEvents,
Scheduler.replaceEvents, Scheduler.subscribe, Scheduler.unsubscribe,
Scheduler._eventStarted, Scheduler._eventStopped,
Scheduler._reschedule, Scheduler._getNextStart,
Scheduler._getNextStop, Scheduler.doStart, Scheduler.doStop,
Scheduler.toSeconds, ICalScheduler, ICalScheduler.__init__,
ICalScheduler.parseCalendarFromFile, ICalScheduler.fileChanged,
ICalScheduler.parseCalendar):
* flumotion/component/base/watcher.py (BaseWatcher,
BaseWatcher.__init__, BaseWatcher._reset, BaseWatcher._subscribe,
BaseWatcher.subscribe, BaseWatcher.unsubscribe, BaseWatcher.event,
BaseWatcher.start, BaseWatcher.checkFiles, BaseWatcher.stop,
BaseWatcher.getFileData, BaseWatcher.getFilesToStat,
DirectoryWatcher, DirectoryWatcher.__init__,
DirectoryWatcher.getFilesToStat, FilesWatcher,
FilesWatcher.__init__):
Copied from trunk, scheduler and watcher code.
Modified: flumotion/branches/platform-3-a3/ChangeLog
==============================================================================
--- flumotion/branches/platform-3-a3/ChangeLog (original)
+++ flumotion/branches/platform-3-a3/ChangeLog Thu Jun 7 19:20:14 2007
@@ -1,3 +1,30 @@
+2007-06-07 Zaheer Abbas Merali <zaheerabbas at merali dot org>
+
+ * flumotion/component/base/Makefile.am:
+ * flumotion/component/base/base.xml:
+ * flumotion/component/base/scheduler.py (LocalTimezone,
+ LocalTimezone.utcoffset, LocalTimezone.dst, LocalTimezone.tzname,
+ LocalTimezone._isdst, now, Event, Event.__init__, Event.reschedule,
+ Event.toTuple, Event.__lt__, Event.__gt__, Event.__eq__, Scheduler,
+ Scheduler.__init__, Scheduler.addEvent, Scheduler.removeEvent,
+ Scheduler.getCurrentEvents, Scheduler.addEvents,
+ Scheduler.replaceEvents, Scheduler.subscribe, Scheduler.unsubscribe,
+ Scheduler._eventStarted, Scheduler._eventStopped,
+ Scheduler._reschedule, Scheduler._getNextStart,
+ Scheduler._getNextStop, Scheduler.doStart, Scheduler.doStop,
+ Scheduler.toSeconds, ICalScheduler, ICalScheduler.__init__,
+ ICalScheduler.parseCalendarFromFile, ICalScheduler.fileChanged,
+ ICalScheduler.parseCalendar):
+ * flumotion/component/base/watcher.py (BaseWatcher,
+ BaseWatcher.__init__, BaseWatcher._reset, BaseWatcher._subscribe,
+ BaseWatcher.subscribe, BaseWatcher.unsubscribe, BaseWatcher.event,
+ BaseWatcher.start, BaseWatcher.checkFiles, BaseWatcher.stop,
+ BaseWatcher.getFileData, BaseWatcher.getFilesToStat,
+ DirectoryWatcher, DirectoryWatcher.__init__,
+ DirectoryWatcher.getFilesToStat, FilesWatcher,
+ FilesWatcher.__init__):
+ Copied from trunk, scheduler and watcher code.
+
2007-06-06 Zaheer Abbas Merali <zaheerabbas at merali dot org>
* TODO:
Modified: flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am
==============================================================================
--- flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am (original)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/Makefile.am Thu Jun 7 19:20:14 2007
@@ -1,6 +1,12 @@
include $(top_srcdir)/common/python.mk
-
-component_PYTHON = __init__.py admin_gtk.py admin_text.py http.py
+
+component_PYTHON = __init__.py \
+ admin_gtk.py \
+ admin_text.py \
+ http.py \
+ scheduler.py \
+ watcher.py
+
componentdir = $(libdir)/flumotion/python/flumotion/component/base
component_DATA = base.xml feeders.glade eaters.glade
Modified: flumotion/branches/platform-3-a3/flumotion/component/base/base.xml
==============================================================================
--- flumotion/branches/platform-3-a3/flumotion/component/base/base.xml (original)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/base.xml Thu Jun 7 19:20:14 2007
@@ -35,5 +35,26 @@
</directory>
</directories>
</bundle>
+ <bundle name="base-scheduler">
+ <dependencies>
+ <dependency name="flumotion" />
+ <dependency name="base-watcher" />
+ </dependencies>
+ <directories>
+ <directory name="flumotion/component/base">
+ <filename location="scheduler.py" />
+ </directory>
+ </directories>
+ </bundle>
+ <bundle name="base-watcher">
+ <dependencies>
+ <dependency name="flumotion" />
+ </dependencies>
+ <directories>
+ <directory name="flumotion/component/base">
+ <filename location="watcher.py" />
+ </directory>
+ </directories>
+ </bundle>
</bundles>
</registry>
Added: flumotion/branches/platform-3-a3/flumotion/component/base/scheduler.py
==============================================================================
--- (empty file)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/scheduler.py Thu Jun 7 19:20:14 2007
@@ -0,0 +1,367 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+
+import time
+from datetime import datetime, timedelta, tzinfo
+
+from twisted.internet import reactor
+
+from flumotion.common import log, avltree
+from flumotion.component.base import watcher
+
+
+# A class capturing the platform's idea of local time, from the
+# documentation of datetime.tzinfo.
+class LocalTimezone(tzinfo):
+ STDOFFSET = timedelta(seconds=-time.timezone)
+ if time.daylight:
+ DSTOFFSET = timedelta(seconds=-time.altzone)
+ else:
+ DSTOFFSET = STDOFFSET
+ DSTDIFF = DSTOFFSET - STDOFFSET
+ ZERO = timedelta(0)
+
+ def utcoffset(self, dt):
+ if self._isdst(dt):
+ return self.DSTOFFSET
+ else:
+ return self.STDOFFSET
+
+ def dst(self, dt):
+ if self._isdst(dt):
+ return self.DSTDIFF
+ else:
+ return self.ZERO
+
+ def tzname(self, dt):
+ return time.tzname[self._isdst(dt)]
+
+ def _isdst(self, dt):
+ tt = (dt.year, dt.month, dt.day,
+ dt.hour, dt.minute, dt.second,
+ dt.weekday(), 0, -1)
+ return time.localtime(time.mktime(tt)).tm_isdst > 0
+LOCAL = LocalTimezone()
+
+
+def now(tz=LOCAL):
+ return datetime.now(tz)
+
+
+class Event(log.Loggable):
+ """
+ I am an event. I have a start and stop time and a "content" that can
+ be anything. I can recur.
+ """
+
+ def __init__(self, start, end, content, recur=None, now=None):
+ self.debug('new event, content=%r, start=%r, end=%r', content,
+ start, end)
+
+ if recur:
+ from dateutil import rrule
+ startRecurRule = rrule.rrulestr(recur, dtstart=start)
+ endRecurRule = rrule.rrulestr(recur, dtstart=end)
+ if now is None:
+ now = datetime.now(LOCAL)
+ if end < now:
+ end = endRecurRule.after(now)
+ start = startRecurRule.before(end)
+ self.debug("adjusting start and end times to %r, %r",
+ start, end)
+
+ if not start.tzinfo:
+ self.info('event starting at %r does not have timezone '
+ 'info; using local time zone', start)
+ start = start.replace(tzinfo=LOCAL)
+ if not end.tzinfo:
+ self.info('event ending at %r does not have timezone '
+ 'info; using local time zone', end)
+ end = end.replace(tzinfo=LOCAL)
+
+ self.start = start
+ self.end = end
+ self.content = content
+ self.recur = recur
+
+ def reschedule(self, now=None):
+ if self.recur:
+ return Event(self.start, self.end, self.content, self.recur,
+ now)
+ else:
+ return None
+
+ def toTuple(self):
+ return self.start, self.end, self.content, self.recur
+
+ def __lt__(self, other):
+ return self.toTuple() < other.toTuple()
+
+ def __gt__(self, other):
+ return self.toTuple() > other.toTuple()
+
+ def __eq__(self, other):
+ return self.toTuple() == other.toTuple()
+
+
+class Scheduler(log.Loggable):
+ """
+ I keep track of upcoming events.
+
+ I can provide notifications when events stop and start, and maintain
+ a set of current events.
+ """
+
+ def __init__(self):
+ self.current = []
+ self._delayedCall = None
+ self._subscribeId = 0
+ self.subscribers = {}
+ self.replaceEvents([])
+
+ def addEvent(self, start, end, content, recur=None, now=None):
+ """Add a new event to the scheduler.
+
+ @param start: wall-clock time of event start
+ @type start: datetime
+ @param end: wall-clock time of event end
+ @type end: datetime
+ @param content: content of this event
+ @type content: str
+ @param recur: recurrence rule
+ @type recur: str
+
+ @returns: an Event that can later be passed to removeEvent, if
+ so desired. The event will be removed or rescheduled
+ automatically when it stops.
+ """
+ if now is None:
+ now = datetime.now(LOCAL)
+ event = Event(start, end, content, recur, now)
+ if event.end < now:
+ self.warning('attempted to schedule event in the past: %r',
+ event)
+ else:
+ self.events.insert(event)
+ if event.start < now:
+ self._eventStarted(event)
+ self._reschedule()
+ return event
+
+ def removeEvent(self, event):
+ """Remove an event from the scheduler.
+
+ @param event: an event, as returned from addEvent()
+ @type event: Event
+ """
+ currentEvent = event.reschedule() or event
+ self.events.delete(currentEvent)
+ if currentEvent in self.current:
+ self._eventStopped(currentEvent)
+ self._reschedule()
+
+ def getCurrentEvents(self):
+ return [e.content for e in self.current]
+
+ def addEvents(self, events):
+ """
+ Add a new list of events to the schedule.
+
+ @param events: the new events
+ @type events: a new set of events
+ """
+ now = datetime.now()
+ for event in events:
+ if event.end > now:
+ self.events.insert(event)
+ if event.start < now:
+ self._eventStarted(event)
+ if events:
+ self._reschedule()
+
+ def replaceEvents(self, events):
+ """Replace the set of events in the scheduler.
+
+ This function is different than simply removing all events then
+ adding new ones, because it tries to avoid spurious
+ stopped/start notifications.
+
+ @param events: the new events
+ @type events: a sequence of Event
+ """
+ now = datetime.now(LOCAL)
+ self.events = avltree.AVLTree(events)
+ current = []
+ for event in self.events:
+ if now < event.start:
+ break
+ elif event.end < now:
+ # yay functional trees: we don't modify the iterator
+ self.events.delete(event)
+ else:
+ current.append(event)
+ for event in self.current[:]:
+ if event not in current:
+ self._eventStopped(event)
+ for event in current:
+ if event not in self.current:
+ self._eventStarted(event)
+ assert self.current == current
+ self._reschedule()
+
+ def subscribe(self, eventStarted, eventStopped):
+ """Subscribe to event happenings in the scheduler.
+
+ @param eventStarted: Function that will be called when an event
+ starts.
+ @type eventStarted: Event -> None
+ @param eventStopped: Function that will be called when an event
+ stops.
+ @type eventStopped: Event -> None
+
+ @returns: A subscription ID that can later be passed to
+ unsubscribe().
+ """
+ sid = self._subscribeId
+ self._subscribeId += 1
+ self.subscribers[sid] = (eventStarted, eventStopped)
+ return sid
+
+ def unsubscribe(self, id):
+ """Unsubscribe from event happenings in the scheduler.
+
+ @param id: Subscription ID received from subscribe()
+ """
+ del self.subscribers[id]
+
+ def _eventStarted(self, event):
+ self.current.append(event)
+ for started, _ in self.subscribers.values():
+ started(event.content)
+
+ def _eventStopped(self, event):
+ self.current.remove(event)
+ for _, stopped in self.subscribers.values():
+ stopped(event.content)
+
+ def _reschedule(self):
+ def _getNextStart():
+ for event in self.events:
+ if event not in self.current:
+ return event
+ return None
+
+ def _getNextStop():
+ t = None
+ e = None
+ for event in self.current:
+ if not t or event.end < t:
+ t = event.end
+ e = event
+ return e
+
+ def doStart(e):
+ self._eventStarted(e)
+ self._reschedule()
+
+ def doStop(e):
+ self._eventStopped(e)
+ self.events.delete(e)
+ new = e.reschedule()
+ if new:
+ self.events.insert(new)
+ self._reschedule()
+
+ if self._delayedCall:
+ if self._delayedCall.active():
+ self._delayedCall.cancel()
+ self._delayedCall = None
+
+ start = _getNextStart()
+ stop = _getNextStop()
+ now = datetime.now(LOCAL)
+
+ def toSeconds(td):
+ return max(td.days*24*3600 + td.seconds + td.microseconds/1e6, 0)
+
+ if start and (not stop or start.start < stop.end):
+ dc = reactor.callLater(toSeconds(start.start - now),
+ doStart, start)
+ elif stop:
+ dc = reactor.callLater(toSeconds(stop.end - now),
+ doStop, stop)
+ else:
+ dc = None
+
+ self._delayedCall = dc
+
+
+class ICalScheduler(Scheduler):
+ """
+ I am a scheduler that takes its data from an ical file.
+ """
+
+ def __init__(self, fileObj):
+ from icalendar import Calendar
+
+ Scheduler.__init__(self)
+
+ def parseCalendarFromFile(f):
+ cal = Calendar.from_string(f.read())
+ events = self.parseCalendar(cal)
+ self.replaceEvents(events)
+ parseCalendarFromFile(fileObj)
+
+ if hasattr(fileObj, 'name'):
+ def fileChanged(f):
+ parseCalendarFromFile(open(f,'r'))
+ self.watcher = watcher.FilesWatcher([fileObj.name])
+ self.watcher.subscribe(fileChanged=fileChanged)
+ self.watcher.start()
+
+ def parseCalendar(self, cal):
+ """
+ Take a Calendar object and return a list of
+ Event objects.
+
+ @param cal: The calendar to "parse"
+ @type cal: icalendar.Calendar
+ @rtype List of {flumotion.component.base.scheduler.Event}
+ """
+ events = []
+ for event in cal.walk('vevent'):
+ start = event.decoded('dtstart', None)
+ end = event.decoded('dtend', None)
+ summary = event.decoded('summary', None)
+ recur = event.get('rrule', None)
+ if start and end:
+ self.debug("start %r tzname %s end %r recur %r", start,
+ start.tzname(), end, recur)
+ if recur:
+ e = Event(start, end, summary, recur.ical())
+ else:
+ e = Event(start, end, summary)
+ events.append(e)
+ else:
+ self.warning('ical has event without start or end: '
+ '%r', event)
+ return events
Added: flumotion/branches/platform-3-a3/flumotion/component/base/watcher.py
==============================================================================
--- (empty file)
+++ flumotion/branches/platform-3-a3/flumotion/component/base/watcher.py Thu Jun 7 19:20:14 2007
@@ -0,0 +1,199 @@
+# -*- Mode: Python -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+
+import os
+
+from twisted.internet import reactor
+
+from flumotion.common import log
+
+class BaseWatcher(log.Loggable):
+ """I watch for file changes.
+
+ I am a base class for a file watcher. I can be specialized to watch
+ any set of files.
+ """
+
+ def __init__(self, timeout):
+ """Make a file watcher object.
+
+ @param timeout: timeout between checks, in seconds
+ @type timeout: int
+ """
+ self.timeout = timeout
+ self._reset()
+ self._subscribeId = 0
+ self.subscribers = {}
+
+ def _reset(self):
+ self._stableData = {}
+ self._changingData = {}
+ self._delayedCall = None
+
+ def _subscribe(self, **events):
+ """Subscribe to events.
+
+ @param kwargs: The events to subscribe to. Subclasses are
+ expected to formalize this dict, specifying which events they
+ support via declaring their kwargs explicitly.
+
+ @returns: A subscription ID that can later be passed to
+ unsubscribe().
+ """
+ sid = self._subscribeId
+ self._subscribeId += 1
+ self.subscribers[sid] = events
+ return sid
+
+ def subscribe(self, fileChanged=None, fileDeleted=None):
+ """Subscribe to events.
+
+ @param fileChanged: A function to call when a file changes. This
+ function will only be called if the file's details (size, mtime)
+ do not change during the timeout period.
+ @type fileChanged: filename -> None
+ @param fileDeleted: A function to call when a file is deleted.
+ @type fileDeleted: filename -> None
+
+ @returns: A subscription ID that can later be passed to
+ unsubscribe().
+ """
+ return self._subscribe(fileChanged=fileChanged,
+ fileDeleted=fileDeleted)
+
+ def unsubscribe(self, id):
+ """Unsubscribe from file change notifications.
+
+ @param id: Subscription ID received from subscribe()
+ """
+ del self.subscribers[id]
+
+ def event(self, event, *args, **kwargs):
+ """Fire an event.
+
+ This method is intended for use by object implementations.
+ """
+ for s in self.subscribers.values():
+ if s[event]:
+ s[event](*args, **kwargs)
+
+ # FIXME: this API has tripped up two people thus far, including its
+ # author. make subscribe() call start() if necessary?
+ def start(self):
+ """Start checking for file changes.
+
+ Subscribers will be notified asynchronously of changes to the
+ watched files.
+ """
+ def checkFiles():
+ self.log("checking for file changes")
+ new = self.getFileData()
+ changing = self._changingData
+ stable = self._stableData
+ for f in new:
+ if f not in changing:
+ if f in stable and new[f] == stable[f]:
+ # no change
+ pass
+ else:
+ self.debug('change start noted for %s', f)
+ changing[f] = new[f]
+ else:
+ if new[f] == changing[f]:
+ self.debug('change finished for %s', f)
+ del changing[f]
+ stable[f] = new[f]
+ self.event('fileChanged', f)
+ else:
+ self.log('change continues for %s', f)
+ changing[f] = new[f]
+ for f in stable.keys():
+ if f not in new:
+ # deletion
+ del stable[f]
+ self.debug('file %s has been deleted', f)
+ self.event('fileDeleted', f)
+ for f in changing.keys():
+ if f not in new:
+ self.debug('file %s has been deleted', f)
+ del changing[f]
+ self._delayedCall = reactor.callLater(self.timeout,
+ checkFiles)
+
+ assert self._delayedCall is None
+ checkFiles()
+
+ def stop(self):
+ """Stop checking for file changes.
+ """
+ self._delayedCall.cancel()
+ self._reset()
+
+ def getFileData(self):
+ """
+ @returns: a dict, {filename => DATA}
+ DATA can be anything. In the default implementation it is a pair
+ of (mtime, size).
+ """
+ ret = {}
+ for f in self.getFilesToStat():
+ try:
+ stat = os.stat(f)
+ ret[f] = (stat.st_mtime, stat.st_size)
+ except OSError, e:
+ self.debug('could not read file %f: %s', f,
+ log.getExceptionMessage(e))
+ return ret
+
+ def getFilesToStat(self):
+ """
+ @returns: sequence of filename
+ """
+ raise NotImplementedError
+
+class DirectoryWatcher(BaseWatcher):
+ """
+ Directory Watcher
+ Watches a directory for new files.
+ """
+
+ def __init__(self, path, ignorefiles=(), timeout=30):
+ BaseWatcher.__init__(self, timeout)
+ self.path = path
+ self._ignorefiles = ignorefiles
+
+ def getFilesToStat(self):
+ return [os.path.join(self.path, f)
+ for f in os.listdir(self.path)
+ if f not in self._ignorefiles]
+
+class FilesWatcher(BaseWatcher):
+ """
+ Watches a collection of files for modifications.
+ """
+
+ def __init__(self, files, timeout=30):
+ BaseWatcher.__init__(self, timeout)
+ self._files = files
+
+ def getFilesToStat(self):
+ return self._files
More information about the flumotion-commit
mailing list