wingo - in flumotion/trunk: . flumotion/common flumotion/test

flumotion-commit at lists.fluendo.com flumotion-commit at lists.fluendo.com
Mon Feb 12 16:36:21 CET 2007


Author: wingo
Date: Mon Feb 12 16:36:13 2007
New Revision: 4477

Added:
   flumotion/trunk/flumotion/test/test_common_netutils.py
Modified:
   flumotion/trunk/ChangeLog
   flumotion/trunk/flumotion/common/netutils.py
   flumotion/trunk/flumotion/test/Makefile.am
Log:
2007-02-12  Andy Wingo  <wingo at pobox.com>

	* flumotion/common/netutils.py (ipv4StringToInt, ipv4IntToString):
	New functions. We could use inet_pton and the like from socket,
	but the interface seems more error-prone.
	(Network): New object, a named set of subnets. Has a match()
	method to test if an ipv4 address falls within the set.

	* flumotion/test/Makefile.am (EXTRA_DIST): 
	* flumotion/test/test_common_netutils.py: New tests, test ipv4
	parsing and the Network subnet set object.



Modified: flumotion/trunk/ChangeLog
==============================================================================
--- flumotion/trunk/ChangeLog	(original)
+++ flumotion/trunk/ChangeLog	Mon Feb 12 16:36:13 2007
@@ -1,3 +1,15 @@
+2007-02-12  Andy Wingo  <wingo at pobox.com>
+
+	* flumotion/common/netutils.py (ipv4StringToInt, ipv4IntToString):
+	New functions. We could use inet_pton and the like from socket,
+	but the interface seems more error-prone.
+	(Network): New object, a named set of subnets. Has a match()
+	method to test if an ipv4 address falls within the set.
+
+	* flumotion/test/Makefile.am (EXTRA_DIST): 
+	* flumotion/test/test_common_netutils.py: New tests, test ipv4
+	parsing and the Network subnet set object.
+
 2007-02-09  Michael Smith  <msmith at fluendo.com>
 
 	* data/upgrade-to-0.4.1.xsl:

Modified: flumotion/trunk/flumotion/common/netutils.py
==============================================================================
--- flumotion/trunk/flumotion/common/netutils.py	(original)
+++ flumotion/trunk/flumotion/common/netutils.py	Mon Feb 12 16:36:13 2007
@@ -89,3 +89,50 @@
     except:
         return ip
 
+def ipv4StringToInt(s):
+    ret = 0
+    for n in map(int, s.split('.')):
+        ret <<= 8
+        ret += n
+    return ret
+
+def ipv4IntToString(n):
+    l = []
+    for i in range(4):
+        l.append(n % 256)
+        n >>= 8
+    l.reverse()
+    return '.'.join(map(str, l))
+
+class Network(set):
+    def __init__(self, name=None):
+        self.name = name
+
+    def _parseSubnet(self, ipv4String, prefixLen, netmask):
+        if netmask is not None:
+            netmask = ipv4StringToInt(netmask)
+        else:
+            netmask = ~((1 << (32 - prefixLen)) - 1)
+            if netmask < 0:
+                # so that netmasks made from this function are the same
+                # as those made by ipv4StringToInt
+                netmask += 1<<32
+        
+        ip = ipv4StringToInt(ipv4String)
+
+        return ip, netmask
+
+    def addSubnet(self, ipv4String, prefixLen=32, netmask=None):
+        self.add(self._parseSubnet(ipv4String, prefixLen, netmask))
+
+    def removeSubnet(self, ipv4String, prefixLen=32, netmask=None):
+        self.remove(self._parseSubnet(ipv4String, prefixLen, netmask))
+
+    def match(self, ipv4String):
+        ip = ipv4StringToInt(ipv4String)
+
+        for net, netmask in self:
+            if ip & netmask == net:
+                return True
+
+        return False

Modified: flumotion/trunk/flumotion/test/Makefile.am
==============================================================================
--- flumotion/trunk/flumotion/test/Makefile.am	(original)
+++ flumotion/trunk/flumotion/test/Makefile.am	Mon Feb 12 16:36:13 2007
@@ -19,6 +19,7 @@
 	test_common_connection.py 	\
 	test_common_gstreamer.py 	\
 	test_common_messages.py 	\
+	test_common_netutils.py 	\
 	test_common_package.py	 	\
 	test_common_planet.py	 	\
 	test_common_pygobject.py 	\

Added: flumotion/trunk/flumotion/test/test_common_netutils.py
==============================================================================
--- (empty file)
+++ flumotion/trunk/flumotion/test/test_common_netutils.py	Mon Feb 12 16:36:13 2007
@@ -0,0 +1,103 @@
+# -*- Mode: Python; test-case-name:flumotion.test.test_config -*-
+# vi:si:et:sw=4:sts=4:ts=4
+#
+# Flumotion - a streaming media server
+# Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
+# All rights reserved.
+
+# This file may be distributed and/or modified under the terms of
+# the GNU General Public License version 2 as published by
+# the Free Software Foundation.
+# This file is distributed without any warranty; without even the implied
+# warranty of merchantability or fitness for a particular purpose.
+# See "LICENSE.GPL" in the source distribution for more information.
+
+# Licensees having purchased or holding a valid Flumotion Advanced
+# Streaming Server license may use this file in accordance with the
+# Flumotion Advanced Streaming Server Commercial License Agreement.
+# See "LICENSE.Flumotion" in the source distribution for more information.
+
+# Headers in this file shall remain intact.
+
+import common
+
+from twisted.trial import unittest
+
+from flumotion.common.netutils import ipv4StringToInt, ipv4IntToString
+from flumotion.common.netutils import Network
+
+
+class TestIpv4Parse(unittest.TestCase):
+    def assertParseInvariant(self, ipv4String):
+        self.assertEquals(ipv4IntToString(ipv4StringToInt(ipv4String)),
+                          ipv4String)
+
+    def testIpv4Parse(self):
+        self.assertParseInvariant('0.0.0.1')
+        self.assertParseInvariant('0.0.1.0')
+        self.assertParseInvariant('0.1.0.0')
+        self.assertParseInvariant('1.0.0.0')
+        self.assertParseInvariant('0.10.0.10')
+        self.assertParseInvariant('10.0.10.0')
+        self.assertParseInvariant('192.168.10.1')
+        self.assertParseInvariant('195.10.6.237')
+        
+    def testIpv4ParseString(self):
+        self.assertEquals(ipv4StringToInt('0.0.0.1'), 1<<0)
+        self.assertEquals(ipv4StringToInt('0.0.1.0'), 1<<8)
+        self.assertEquals(ipv4StringToInt('0.1.0.0'), 1<<16)
+        self.assertEquals(ipv4StringToInt('1.0.0.0'), 1<<24)
+
+class TestNetwork(unittest.TestCase):
+    def testName(self):
+        self.assertEquals(Network().name, None)
+        self.assertEquals(Network('foo').name, 'foo')
+
+    def testAddRemove(self):
+        net = Network()
+        net.addSubnet('192.168.0.0', 24)
+        net.addSubnet('192.168.1.0', 24)
+        self.assertEquals(len(net), 2)
+        net.removeSubnet('192.168.0.0', 24)
+        net.removeSubnet('192.168.1.0', 24)
+        self.assertEquals(len(net), 0)
+        
+        net.addSubnet('192.168.0.0', netmask='255.255.255.0')
+        self.assertEquals(len(net), 1)
+        net.removeSubnet('192.168.0.0', netmask='255.255.255.0')
+        self.assertEquals(len(net), 0)
+
+        net.addSubnet('192.168.0.0', netmask='255.255.255.0')
+        self.assertEquals(len(net), 1)
+        net.removeSubnet('192.168.0.0', 24)
+        self.assertEquals(len(net), 0)
+
+    def testMatch(self):
+        net = Network()
+
+        self.failIf(net.match('192.168.1.0'))
+        
+        net.addSubnet('192.168.1.0', 24)
+
+        self.failUnless(net.match('192.168.1.0'))
+        self.failUnless(net.match('192.168.1.10'))
+        self.failUnless(net.match('192.168.1.255'))
+        
+        self.failIf(net.match('192.168.0.255'))
+        self.failIf(net.match('192.168.2.0'))
+
+        net.addSubnet('192.168.2.0', 24)
+
+        self.failIf(net.match('192.168.0.255'))
+        self.failUnless(net.match('192.168.1.255'))
+        self.failUnless(net.match('192.168.2.0'))
+
+        net.removeSubnet('192.168.1.0', 24)
+        net.removeSubnet('192.168.2.0', 24)
+
+        self.failIf(net.match('192.168.1.0'))
+        self.failIf(net.match('192.168.1.10'))
+        self.failIf(net.match('192.168.1.255'))
+        
+        self.failIf(net.match('192.168.0.255'))
+        self.failIf(net.match('192.168.2.0'))


More information about the flumotion-commit mailing list