msmith - in flumotion/branches/platform-3: . flumotion/component/producers/playlist

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Mon May 21 18:38:29 CEST 2007


Author: msmith
Date: Mon May 21 18:38:26 2007
New Revision: 4989

Modified:
   flumotion/branches/platform-3/ChangeLog
   flumotion/branches/platform-3/flumotion/component/producers/playlist/design
   flumotion/branches/platform-3/flumotion/component/producers/playlist/playlist.py
   flumotion/branches/platform-3/flumotion/component/producers/playlist/playlistparser.py
Log:
        * flumotion/component/producers/playlist/design:
        * flumotion/component/producers/playlist/playlist.py:
        * flumotion/component/producers/playlist/playlistparser.py:
          Merge recent changes to this component.



Modified: flumotion/branches/platform-3/ChangeLog
==============================================================================
--- flumotion/branches/platform-3/ChangeLog	(original)
+++ flumotion/branches/platform-3/ChangeLog	Mon May 21 18:38:26 2007
@@ -1,3 +1,10 @@
+2007-05-21  Michael Smith <msmith at fluendo.com>
+
+	* flumotion/component/producers/playlist/design:
+	* flumotion/component/producers/playlist/playlist.py:
+	* flumotion/component/producers/playlist/playlistparser.py:
+	  Merge recent changes to this component.
+
 2007-05-16  Thomas Vander Stichele  <thomas at apestaart dot org>
 
 	merged from: TRUNK, 4967

Modified: flumotion/branches/platform-3/flumotion/component/producers/playlist/design
==============================================================================
--- flumotion/branches/platform-3/flumotion/component/producers/playlist/design	(original)
+++ flumotion/branches/platform-3/flumotion/component/producers/playlist/design	Mon May 21 18:38:26 2007
@@ -1,3 +1,19 @@
+TODO:
+-----
+
+- Read all xml files in a directory on startup
+- Monitor the directory for new files. Read any new files that turn up
+- Fix bug with returning to scheduled stuff from default sources
+- write tests for scheduler
+- test platform-3 backport
+- test playlist updating
+- test with customer playlist files
+
+Later:
+- test with additional formats
+- clean up finished entries in compositions
+
+
 Low-level playlist operations:
 ------------------------------
 

Modified: flumotion/branches/platform-3/flumotion/component/producers/playlist/playlist.py
==============================================================================
--- flumotion/branches/platform-3/flumotion/component/producers/playlist/playlist.py	(original)
+++ flumotion/branches/platform-3/flumotion/component/producers/playlist/playlist.py	Mon May 21 18:38:26 2007
@@ -23,7 +23,7 @@
 import gobject
 import time
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 
 from flumotion.common import errors, messages, log, fxml
 from flumotion.component import feedcomponent
@@ -89,7 +89,6 @@
 
     def init(self):
         self.basetime = -1
-        self.pipeline = None
 
         self._hasAudio = True
         self._hasVideo = True
@@ -101,6 +100,9 @@
         self.videocaps = gst.Caps("video/x-raw-yuv;video/x-raw-rgb")
         self.audiocaps = gst.Caps("audio/x-raw-int;audio/x-raw-float")
 
+        self._vsrcs = {} # { PlaylistItem -> gnlsource }
+        self._asrcs = {} # { PlaylistItem -> gnlsource }
+
     def _buildAudioPipeline(self, pipeline, queue):
         audiorate = gst.element_factory_make("audiorate")
         audioconvert = gst.element_factory_make('audioconvert')
@@ -215,6 +217,9 @@
         self.debug("Setting basetime of %d", self.basetime)
         pipeline.set_base_time(self.basetime)
 
+    def getCurrentPosition(self):
+        return self.pipeline.query_position(gst.FORMAT_TIME)[0]
+
     def scheduleItem(self, item):
         """
         Schedule a given playlist item in our playback compositions.
@@ -228,43 +233,66 @@
         # thus we're out of sync.
         # So, always start slightly in the future... 5 seconds seems to work
         # fine in practice.
-        now = self.pipeline.query_position(gst.FORMAT_TIME)[0] + 5 * gst.SECOND
+        now = self.getCurrentPosition()
+        neareststarttime = now + 5 * gst.SECOND
 
-        if start < now:
-            if start + item.duration < now:
+        if start < neareststarttime:
+            if start + item.duration < neareststarttime:
                 self.debug("Item too late; skipping entirely")
                 return
             else:
-                change = now - start
+                change = neareststarttime - start
                 self.debug("Starting item with offset %d", change)
                 item.duration -= change
                 item.offset += change
-                start = now
+                start = neareststarttime
+
+        end = start + item.duration
+        timeuntilend = end - now
+        # After the end time, remove this item from the composition, otherwise
+        # it will continue to use huge gobs of memory and lots of threads.
+        reactor.callLater(timeuntilend/gst.SECOND + 5, 
+            self.unscheduleItem, item)
 
         if self._hasVideo and item.hasVideo:
             self.debug("Adding video source with start %d, duration %d, "
                 "offset %d", start, item.duration, item.offset)
-            item.vsrc = file_gnl_src(None, item.uri, self.videocaps,
+            vsrc = file_gnl_src(None, item.uri, self.videocaps,
                 start, item.duration, item.offset, 0)
-            self.videocomp.add(item.vsrc)
-            self.debug("Added")
+            self.videocomp.add(vsrc)
+            self._vsrcs[item] = vsrc
         if self._hasAudio and item.hasAudio:
             self.debug("Adding audio source with start %d, duration %d, "
                 "offset %d", start, item.duration, item.offset)
-            item.asrc = file_gnl_src(None, item.uri, self.audiocaps,
+            asrc = file_gnl_src(None, item.uri, self.audiocaps,
                 start, item.duration, item.offset, 0)
-            self.audiocomp.add(item.asrc)
+            self.audiocomp.add(asrc)
+            self._asrcs[item] = asrc
         self.debug("Done scheduling")
 
     def unscheduleItem(self, item):
         self.debug("Unscheduling item at uri %s", item.uri)
         if self._hasVideo and item.hasVideo:
-            self.videocomp.remove(item.vsrc)
+            vsrc = self._vsrcs.pop(item)
+            self.videocomp.remove(vsrc)
         if self._hasAudio and item.hasAudio: 
-            self.audiocomp.remove(item.asrc)
+            asrc = self._asrcs.pop(item)
+            self.audiocomp.remove(asrc)
+
+    def adjustItemScheduling(self, item):
+        if self._hasVideo and item.hasVideo:
+            vsrc = self._vsrcs[item]
+            vsrc.props.start = item.timestamp
+            vsrc.props.duration = item.duration
+            vsrc.props.media_duration = item.duration
+        if self._hasAudio and item.hasAudio:
+            asrc = self._asrcs[item]
+            asrc.props.start = item.timestamp
+            asrc.props.duration = item.duration
+            asrc.props.media_duration = item.duration
 
     def addPlaylist(self, data):
-        self.playlist.parseData(data)
+        self.playlistparser.parseData(data)
 
     def create_pipeline(self):
         props = self.config['properties'];
@@ -304,10 +332,11 @@
     def do_start(self, clocking):
         self.link()
 
-        self.playlist = playlistparser.Playlist(self)
+        playlist = playlistparser.Playlist(self)
+        self.playlistparser = playlistparser.PlaylistXMLParser(playlist)
         try:
             if self._playlistfile:
-                self.playlist.parseFile(self._playlistfile)
+                self.playlistparser.parseFile(self._playlistfile)
         except fxml.ParserError, e:
             self.warning("Failed to parse playlist file: %r", e)
 

Modified: flumotion/branches/platform-3/flumotion/component/producers/playlist/playlistparser.py
==============================================================================
--- flumotion/branches/platform-3/flumotion/component/producers/playlist/playlistparser.py	(original)
+++ flumotion/branches/platform-3/flumotion/component/producers/playlist/playlistparser.py	Mon May 21 18:38:26 2007
@@ -23,6 +23,7 @@
 from gst.extend import discoverer
 
 import time
+import calendar
 from StringIO import StringIO
 
 from xml.dom import Node
@@ -31,20 +32,6 @@
 
 from flumotion.common import log, fxml
 
-import singledecodebin
-
-def file_gnl_src(name, uri, caps, start, duration, offset, priority):
-    src = singledecodebin.SingleDecodeBin(caps, uri)
-    gnlsrc = gst.element_factory_make('gnlsource', name)
-    gnlsrc.props.start = start
-    gnlsrc.props.duration = duration
-    gnlsrc.props.media_start = offset
-    gnlsrc.props.media_duration = duration
-    gnlsrc.props.priority = priority
-    gnlsrc.add(src)
-
-    return gnlsrc
-
 class PlaylistItem(object, log.Loggable):
     def __init__(self, id, timestamp, uri, offset, duration):
         self.id = id
@@ -53,34 +40,12 @@
         self.offset = offset
         self.duration = duration
 
-        # Currently always set to true; later this should come from what the
-        # discoverer says.
         self.hasAudio = True
         self.hasVideo = True
 
-        # Audio and video gnlsource objects
-        self.vsrc = None
-        self.asrc = None
-
         self.next = None
         self.prev = None
 
-    def setDuration(self, duration):
-        self.duration = duration
-        if self.asrc:
-            self.asrc.props.duration = duration
-            self.asrc.props.media_duration = duration
-        if self.vsrc:
-            self.vsrc.props.duration = duration
-            self.vsrc.props.media_duration = duration
-
-    def setTimestamp(self, timestamp):
-        self.timestamp = timestamp
-        if self.asrc:
-            self.asrc.props.start = timestamp
-        if self.vsrc:
-            self.vsrc.props.start = timestamp
-
 class Playlist(object, log.Loggable):
     def __init__(self, producer):
         """
@@ -91,12 +56,24 @@
 
         self.producer = producer
 
-        self._pending_items = []
-        self._discovering = False
+    def _findItem(self, position):
+        # Get the item that corresponds to position, or None
+        cur = self.items
+        while cur:
+            if cur.timestamp < position and \
+                    cur.timestamp + cur.duration > position:
+                return cur
+            if cur.timestamp > position:
+                return None # fail without having to iterate over everything
+            cur = cur.next
+        return None
 
     def _getCurrentItem(self):
-        # TODO: improve this!
-        return None
+        position = self.producer.getCurrentPosition()
+        item = self._findItem(position)
+        self.debug("Item %r found as current for playback position %d", 
+            item, position)
+        return item
 
     def removeItems(self, id):
         current = self._getCurrentItem()
@@ -109,23 +86,30 @@
                 continue
             if item.prev:
                 item.prev.next = item.next
+            else:
+                self.items = item.next
+
             if item.next:
                 item.next.prev = item.prev
             self.producer.unscheduleItem(item)
 
         del self._itemsById[id]
         
-
     def addItem(self, id, timestamp, uri, offset, duration, hasAudio, hasVideo):
         """
         Add an item to the playlist.
-        The duration of previous and this entry may be adjusted to make it fit.
+
+        This may remove overlapping entries, or adjust timestamps/durations of
+        entries to make the new one fit.
         """
         current = self._getCurrentItem()
         if current and timestamp < current.timestamp + current.duration:
             self.warning("New object at uri %s starts during current object, "
                 "cannot add")
-            return
+            return None
+        # We don't care about anything older than now; drop references to them
+        if current:
+            self.items = current
 
         newitem = PlaylistItem(id, timestamp, uri, offset, duration)
         newitem.hasAudio = hasAudio
@@ -160,6 +144,9 @@
             # deleted. Do so.
             cur = prev.next
             while cur != next:
+                self._itemsById[cur.id].remove(cur)
+                if not self._itemsById[cur.id]:
+                    del self._itemsById[cur.id]
                 self.producer.unscheduleItem(cur)
                 cur = cur.next
 
@@ -178,61 +165,29 @@
         if prev and prev.timestamp + prev.duration > newitem.timestamp:
             self.debug("Changing duration of previous item from %d to %d", 
                 prev.duration, newitem.timestamp - prev.timestamp)
-            prev.setDuration(newitem.timestamp - prev.timestamp)
+            prev.duration = newitem.timestamp - prev.timestamp
+            self.producer.adjustItemScheduling(prev)
 
         if next and newitem.timestamp + newitem.duration > next.timestamp:
             self.debug("Changing timestamp of next item from %d to %d to fit", 
                 newitem.timestamp, newitem.timestamp + newitem.duration)
             ts = newitem.timestamp + newitem.duration
             duration = next.duration - (ts - next.timestamp)
-            next.setTimestamp(ts)
-            next.setDuration(duration)
+            next.duration = duration
+            next.timestamp = ts
+            self.producer.adjustItemScheduling(next)
 
         # Then we need to actually add newitem into the gnonlin timeline
         self.producer.scheduleItem(newitem)
 
-    def expireOldEntries(self):
-        """
-        Delete references to old playlist entries that have passed.
-        TODO: is this needed? It's to save memory, but probably not very much 
-        memory...
-        """
-        pass
+        return newitem
 
-    def parseData(self, data):
-        """
-        Parse playlist XML document data
-        """
-        file = StringIO(data)
-        self.parseFile(file)
+class PlaylistParser(object, log.Loggable):
+    def __init__(self, playlist):
+        self.playlist = playlist
 
-    def replaceFile(self, file, id):
-        self.removeItems(id)
-        self.parseFile(file, id)
-
-    def parseFile(self, file, id=None):
-        """
-        Parse a playlist file. Adds the contents of the file to the existing 
-        playlist, overwriting any existing entries for the same time period.
-        """
-        parser = fxml.Parser()
-
-        root = parser.getRoot(file)
-
-        node = root.documentElement
-        self.debug("Parsing playlist from file %s", file)
-        if node.nodeName != 'playlist':
-            raise fxml.ParserError("Root node is not 'playlist'")
-
-        for child in node.childNodes:
-            if child.nodeType == Node.ELEMENT_NODE and \
-                    child.nodeName == 'entry':
-                self.debug("Parsing entry")
-                self._parsePlaylistEntry(parser, child, id)
-
-        # Now launch the discoverer for any pending items
-        if not self._discovering:
-            self._discoverPending()
+        self._pending_items = []
+        self._discovering = False
 
     def _discoverPending(self):
         def _discovered(disc, is_media):
@@ -259,15 +214,17 @@
                     offset = 0
 
                 if duration > 0:
-                    self.addItem(id, timestamp, uri, offset, duration, 
+                    self.playlist.addItem(id, timestamp, uri, offset, duration, 
                         hasA, hasV)
                 else:
                     self.warning("Duration of item is zero, not adding")
             else:
                 self.warning("Discover failed to find media in %s", item[0])
     
-            self.debug("Continuing on to next file")
-            self._discoverPending()
+            # We don't want to burn too much cpu discovering all the files;
+            # this throttles the discovery rate to a reasonable level
+            self.debug("Continuing on to next file in one second")
+            reactor.callLater(1, self._discoverPending)
 
         if not self._pending_items:
             self.debug("No more files to discover")
@@ -284,6 +241,54 @@
         disc.connect('discovered', _discovered)
         disc.discover()
 
+    def addItemToPlaylist(self, filename, timestamp, duration, offset, id):
+        # We only want to add it if it's plausibly schedulable.
+        end = timestamp
+        if duration is not None:
+            end += duration
+        if end < time.time() * gst.SECOND:
+            self.debug("Early-out: ignoring add for item in past")
+            return
+
+        self._pending_items.append((filename, timestamp, duration, offset, id))
+
+        # Now launch the discoverer for any pending items
+        if not self._discovering:
+            self._discoverPending()
+
+class PlaylistXMLParser(PlaylistParser):
+
+    def parseData(self, data):
+        """
+        Parse playlist XML document data
+        """
+        file = StringIO(data)
+        self.parseFile(file)
+
+    def replaceFile(self, file, id):
+        self.playlist.removeItems(id)
+        self.parseFile(file, id)
+
+    def parseFile(self, file, id=None):
+        """
+        Parse a playlist file. Adds the contents of the file to the existing 
+        playlist, overwriting any existing entries for the same time period.
+        """
+        parser = fxml.Parser()
+
+        root = parser.getRoot(file)
+
+        node = root.documentElement
+        self.debug("Parsing playlist from file %s", file)
+        if node.nodeName != 'playlist':
+            raise fxml.ParserError("Root node is not 'playlist'")
+
+        for child in node.childNodes:
+            if child.nodeType == Node.ELEMENT_NODE and \
+                    child.nodeName == 'entry':
+                self.debug("Parsing entry")
+                self._parsePlaylistEntry(parser, child, id)
+
     def _parsePlaylistEntry(self, parser, entry, id):
         mandatory = ['filename', 'time']
         optional = ['duration', 'offset']
@@ -299,18 +304,24 @@
 
         timestamp = self._parseTimestamp(timestamp)
 
-        self._pending_items.append((filename, timestamp, duration, offset, id))
+        self.addItemToPlaylist(filename, timestamp, duration, offset, id)
 
     def _parseTimestamp(self, ts):
-        # Take TS in YYYY-MM-DDThh:mm:ssZ format, return timestamp in 
+        # Take TS in YYYY-MM-DDThh:mm:ss.ssZ format, return timestamp in 
         # nanoseconds since the epoch
-        format = "%Y-%m-%dT%H:%M:%SZ"
 
-        try:
-            timestruct = time.strptime(ts, format)
+        # time.strptime() doesn't handle the fractional seconds part. We ignore
+        # it entirely, after verifying that it has the right format.
+        tsmain, trailing = ts[:-4], ts[-4:]
+        if trailing[0] != '.' or trailing[3] != 'Z' or \
+                not trailing[1].isdigit() or not trailing[2].isdigit():
+            raise fxml.ParserError("Invalid timestamp %s" % ts)
+        format = "%Y-%m-%dT%H:%M:%S"
 
-            return int(time.mktime(timestruct) * gst.SECOND)
+        try:
+            timestruct = time.strptime(tsmain, format)
+            return int(calendar.timegm(timestruct) * gst.SECOND)
         except ValueError:
-            raise fxml.ParserError("Invalid timestamp %s", ts)
+            raise fxml.ParserError("Invalid timestamp %s" % ts)
 
 


More information about the flumotion-commit mailing list