Change-Id: I4a9ba3bd31276956692d65500a5ee926b01c9d48
Signed-off-by: Thanh Ha <thanh.ha@linuxfoundation.org>
from mininet.node import RemoteController
from mininet.node import OVSKernelSwitch
from mininet.node import RemoteController
from mininet.node import OVSKernelSwitch
def create_network(controller_ip, controller_port):
"""Create topology and mininet network."""
topo = mininet.topo.Topo()
def create_network(controller_ip, controller_port):
"""Create topology and mininet network."""
topo = mininet.topo.Topo()
topo.addLink('h1', 's1')
topo.addLink('h2', 's1')
topo.addLink('h1', 's1')
topo.addLink('h2', 's1')
- switch=mininet.util.customConstructor(
- {'ovsk':OVSKernelSwitch}, 'ovsk,protocols=OpenFlow13')
+ switch = mininet.util.customConstructor(
+ {'ovsk': OVSKernelSwitch}, 'ovsk,protocols=OpenFlow13')
- controller=mininet.util.customConstructor(
+ controller = mininet.util.customConstructor(
{'remote': RemoteController}, 'remote,ip=%s:%s' % (controller_ip,
controller_port))
{'remote': RemoteController}, 'remote,ip=%s:%s' % (controller_ip,
controller_port))
net = mininet.net.Mininet(topo=topo, switch=switch, controller=controller)
return net
net = mininet.net.Mininet(topo=topo, switch=switch, controller=controller)
return net
"""
log = logging.getLogger(__name__)
"""
log = logging.getLogger(__name__)
def parse_matches(flow, matches):
flow['matches'] = {}
def parse_matches(flow, matches):
flow['matches'] = {}
try:
key, value = word.split('=', 1)
except ValueError:
try:
key, value = word.split('=', 1)
except ValueError:
- #TODO: need to figure out what to do here?
+ # TODO: need to figure out what to do here?
continue
if key == 'priority':
continue
if key == 'priority':
return sorted(flows, key=lambda x: x['duration'].rstrip('s'))
return sorted(flows, key=lambda x: x['duration'].rstrip('s'))
for child in expected_match.childNodes:
if child.nodeType is expected_match.TEXT_NODE:
continue
for child in expected_match.childNodes:
if child.nodeType is expected_match.TEXT_NODE:
continue
comparator = comparators.get(child.nodeName, default)
comparator(child, actual_match, kw)
comparator = comparators.get(child.nodeName, default)
comparator(child, actual_match, kw)
integer_comparator(expected_match, actual_match, kw, 10)
def compare_vlan_id(expected_match, actual_match, kw):
integer_comparator(expected_match, actual_match, kw, 10)
def compare_vlan_id(expected_match, actual_match, kw):
- integer_comparator(expected_match.getElementsByTagName('vlan-id')[0], \
+ integer_comparator(expected_match.getElementsByTagName('vlan-id')[0],
actual_match, kw, 10)
VLAN_COMPARATORS = {
actual_match, kw, 10)
VLAN_COMPARATORS = {
- 'vlan-pcp': compare_vlan_pcp,
+ 'vlan-pcp': compare_vlan_pcp,
'vlan-id': compare_vlan_id,
'vlan-id': compare_vlan_id,
# print 'ethernet_match_comparator-expected_match:', expected_match.toxml()
# print 'ethernet_match_comparator-actual_match:', actual_match
# print 'ethernet_match_comparator-expected_match:', expected_match.toxml()
# print 'ethernet_match_comparator-actual_match:', actual_match
- compare_elements(expected_match, actual_match, kw, \
+ compare_elements(expected_match, actual_match, kw,
VLAN_COMPARATORS, fallback_comparator)
VLAN_COMPARATORS, fallback_comparator)
name = kw.get(child.nodeName)
data = child.toxml(), name, actual_match
name = kw.get(child.nodeName)
data = child.toxml(), name, actual_match
- if expected_etype == 2048: # IP
- assert ((actual_match.get('ip', 'IP Not-present') is None) or \
- (actual_match.get('tcp', 'TCP Not-present') is None) or \
- (actual_match.get('sctp', 'SCTP Not-present') is None) or \
- (actual_match.get('udp', 'UDP Not-present') is None)), \
- 'Expected etype %s && actual etype %s=%s' % data
-
- elif expected_etype == 2054: #ARP
+ if expected_etype == 2048: # IP
+ assert ((actual_match.get('ip', 'IP Not-present') is None) or
+ (actual_match.get('tcp', 'TCP Not-present') is None) or
+ (actual_match.get('sctp', 'SCTP Not-present') is None) or
+ (actual_match.get('udp', 'UDP Not-present') is None)), 'Expected etype %s && actual etype %s=%s' % data # noqa
+
+ elif expected_etype == 2054: # ARP
assert actual_match.get('arp', 'ARP Not-present') is None, \
assert actual_match.get('arp', 'ARP Not-present') is None, \
- 'Expected etype %s && actual etype %s=%s' % data
+ 'Expected etype %s && actual etype %s=%s' % data
else:
actual_etype = int(actual_match[name], 16)
else:
actual_etype = int(actual_match[name], 16)
- assert expected_etype == actual_etype, \
- 'xml etype: %s && actual etype %s=%s' % data
-
+ assert expected_etype == actual_etype, 'xml etype: %s && actual etype %s=%s' % data
- 'ethernet-type': compare_etype,
+ 'ethernet-type': compare_etype,
'ethernet-source': ethernet_address_comparator,
'ethernet-destination': ethernet_address_comparator,
'ethernet-source': ethernet_address_comparator,
'ethernet-destination': ethernet_address_comparator,
# print 'ethernet_match_comparator-expected_match:', expected_match.toxml()
# print 'ethernet_match_comparator-actual_match:', actual_match
# print 'ethernet_match_comparator-expected_match:', expected_match.toxml()
# print 'ethernet_match_comparator-actual_match:', actual_match
- compare_elements(expected_match, actual_match, kw, \
+ compare_elements(expected_match, actual_match, kw,
ETH_COMPARATORS, fallback_comparator)
ETH_COMPARATORS, fallback_comparator)
def ipv4_comparator(expected_match, actual_match, kw):
# print 'ip_v4_comparator:', expected_match.toxml(), actual_match
def ipv4_comparator(expected_match, actual_match, kw):
# print 'ip_v4_comparator:', expected_match.toxml(), actual_match
name = child.nodeName
data = expected_match.toxml(), name, actual_match
name = child.nodeName
data = expected_match.toxml(), name, actual_match
- if expected_proto == 6: # TCP
- assert actual_match.get('tcp', 'TCP Not-present') is None, \
- 'ip protocol type: expected %s, actual %s=%s' % data
+ if expected_proto == 6: # TCP
+ assert actual_match.get('tcp', 'TCP Not-present') is None, 'ip protocol type: expected %s, actual %s=%s' % data # noqa
- elif expected_proto == 17: #UDP
- assert actual_match.get('udp', 'UDP Not-present') is None, \
- 'ip protocol type: expected %s, actual %s=%s' % data
+ elif expected_proto == 17: # UDP
+ assert actual_match.get('udp', 'UDP Not-present') is None, 'ip protocol type: expected %s, actual %s=%s' % data # noqa
- elif expected_proto == 132: #SCTP
- assert actual_match.get('sctp', 'SCTP Not-present') is None, \
- 'ip protocol type: expected %s, actual %s=%s' % data
+ elif expected_proto == 132: # SCTP
+ assert actual_match.get('sctp', 'SCTP Not-present') is None, 'ip protocol type: expected %s, actual %s=%s' % data # noqa
else:
fallback_comparator(child, actual_match, kw)
else:
fallback_comparator(child, actual_match, kw)
def compare_dscp(child, actual_match, kw):
# print 'compare_dscp:', child.toxml(), actual_match
expected_dscp = int(child.childNodes[0].data)
name = kw.get(child.nodeName)
actual_dscp = int(actual_match[name])
def compare_dscp(child, actual_match, kw):
# print 'compare_dscp:', child.toxml(), actual_match
expected_dscp = int(child.childNodes[0].data)
name = kw.get(child.nodeName)
actual_dscp = int(actual_match[name])
data = child.toxml(), name, actual_match
assert (expected_dscp * 4) == actual_dscp, 'dscp: expected %s, actual %s=%s' % data
data = child.toxml(), name, actual_match
assert (expected_dscp * 4) == actual_dscp, 'dscp: expected %s, actual %s=%s' % data
- 'ip-protocol': compare_proto,
+ 'ip-protocol': compare_proto,
# print 'ip_match_comparator:', expected_match.toxml(), actual_match
# print 'ip_match_comparator:', expected_match.toxml(), actual_match
- compare_elements(expected_match, actual_match, kw, \
+ compare_elements(expected_match, actual_match, kw,
IP_MATCH_COMPARATORS, fallback_comparator)
IP_MATCH_COMPARATORS, fallback_comparator)
# print 'match_comparator-actual_match:', actual_match
# print 'match_comparator: keywords', keywords
# print 'match_comparator-actual_match:', actual_match
# print 'match_comparator: keywords', keywords
- compare_elements(expected_match, actual_match, match_keywords, \
+ compare_elements(expected_match, actual_match, match_keywords,
MATCH_COMPARATORS, fallback_comparator)
MATCH_COMPARATORS, fallback_comparator)
}
# print 'instructions_comparator:', instructions_element, switch_flow
}
# print 'instructions_comparator:', instructions_element, switch_flow
- instructions = instructions_element.childNodes
+ instructions = instructions_element.childNodes # noqa
for instruction in instructions_element.childNodes:
if instruction.nodeType is instructions_element.TEXT_NODE:
continue
for instruction in instructions_element.childNodes:
if instruction.nodeType is instructions_element.TEXT_NODE:
continue
for itype in instruction.childNodes:
if itype.nodeType is itype.TEXT_NODE:
continue
for itype in instruction.childNodes:
if itype.nodeType is itype.TEXT_NODE:
continue
'default': default_comparator,
}
'default': default_comparator,
}
def all_nodes(xml_root):
"""
Generates every non-text nodes.
def all_nodes(xml_root):
"""
Generates every non-text nodes.
log.info('received status code: {}'.format(rsp.status_code))
log.debug('received content: {}'.format(rsp.text))
assert rsp.status_code == 204 or rsp.status_code == 200, 'Status' \
log.info('received status code: {}'.format(rsp.status_code))
log.debug('received content: {}'.format(rsp.text))
assert rsp.status_code == 204 or rsp.status_code == 200, 'Status' \
- ' code returned %d' % rsp.status_code
+ ' code returned %d' % rsp.status_code
# check request content against restconf's datastore
response = requests.get(url, auth=('admin', 'admin'),
# check request content against restconf's datastore
response = requests.get(url, auth=('admin', 'admin'),
action_keywords = None
with open('action-keywords.csv') as f:
action_keywords = dict(line.strip().split(';') for line in f
action_keywords = None
with open('action-keywords.csv') as f:
action_keywords = dict(line.strip().split(';') for line in f
- if not line.startswith('#'))
+ if not line.startswith('#'))
# fix arguments for unittest
del sys.argv[1:]
# fix arguments for unittest
del sys.argv[1:]