logg.Logger()
cli.main()
+
if __name__ == "__main__":
main()
import argparse
-import mdsal.cli
-import netvirt.cli
-import odltools
+import odltools.mdsal.cli
+import odltools.netvirt.cli
import odltools.csit.cli
parser.add_argument("-d", "--dump", action="store_true",
help="dump extra debugging, e.g. ovs metadata")
parser.set_defaults(func=robotfiles.run)
-
text = element.text
attribs = element.attrib
if logger.isEnabledFor(logging.DEBUG) and text is not None and attribs:
- logger.debug("process_element: %s - %s - %s - %s - %s - %s", state.state, state.command, event, tag, (text is not None), attribs)
+ logger.debug("process_element: %s - %s - %s - %s - %s - %s",
+ state.state, state.command, event, tag, (text is not None), attribs)
if event == "end":
if element.tag == "test":
state.pdata['nodes'] = state.nodes
self.robotfile.write_pdata()
self.robotfile.write_debug_pdata()
+
if __name__ == '__main__':
unittest.main()
"""
DSMAP = {
- 'acls': ['acls.json', 'config','ietf-access-control-list:access-lists',
+ 'acls': ['acls.json', 'config', 'ietf-access-control-list:access-lists',
'access-lists', 'acls'],
'bindings': ['service-bindings.json', 'config', 'interface-service-bindings:service-bindings',
'service-bindings', 'services-info'],
VIF_TYPE_TO_PREFIX = {
- 'ovs':'tap',
- 'vhost_user':'vhu'
+ 'ovs': 'tap',
+ 'vhost_user': 'vhu'
}
VIF_TYPE = 'neutron-binding:vif-type'
def get_list(self, data, container_key, lst):
c = data and data.get(container_key, {})
- l = c.get(lst, [])
- return l
+ lst = c.get(lst, [])
+ return lst
def get_clist(self):
return self.get_list(self.data, self.container, self.clist)
self.id_manager_id_pools = id_manager.id_pools(Model.CONFIG, args)
self.ietf_interfaces_interfaces = ietf_interfaces.interfaces(Model.CONFIG, args)
self.ietf_interfaces_interfaces_state = ietf_interfaces.interfaces_state(Model.OPERATIONAL, args)
- self.interface_service_bindings_service_bindings = interface_service_bindings.service_bindings(Model.CONFIG, args)
+ self.interface_service_bindings_service_bindings = \
+ interface_service_bindings.service_bindings(Model.CONFIG, args)
self.itm_state_tunnels_state = itm_state.tunnels_state(Model.OPERATIONAL, args)
self.l3vpn_vpn_interfaces = l3vpn.vpn_instance_to_vpn_id(Model.CONFIG, args)
self.mip_mac = mip.mac(Model.CONFIG, args)
self.network_topology_network_topology_operational = network_topology.network_topology(Model.CONFIG, args)
self.neutron_neutron = neutron.neutron(Model.CONFIG, args)
self.odl_fib_fib_entries = odl_fib.fib_entries(Model.CONFIG, args)
- self.odl_interface_meta_if_index_interface_map = odl_interface_meta.if_indexes_interface_map(Model.OPERATIONAL, args)
+ self.odl_interface_meta_if_index_interface_map = \
+ odl_interface_meta.if_indexes_interface_map(Model.OPERATIONAL, args)
self.odl_inventory_nodes_config = opendaylight_inventory.nodes(Model.CONFIG, args)
self.odl_inventory_nodes_operational = opendaylight_inventory.nodes(Model.OPERATIONAL, args)
self.odl_l3vpn_vpn_instance_to_vpn_id = odl_l3vpn.vpn_instance_to_vpn_id(Model.CONFIG, args)
if "ietf_interfaces_interfaces_state" in models:
self.ietf_interfaces_interfaces_state = ietf_interfaces.interfaces_state(Model.OPERATIONAL, args)
if "interface_service_bindings_service_bindings" in models:
- self.interface_service_bindings_service_bindings = interface_service_bindings.service_bindings(Model.CONFIG, args)
+ self.interface_service_bindings_service_bindings = \
+ interface_service_bindings.service_bindings(Model.CONFIG, args)
if "itm_state_tunnels_state" in models:
self.itm_state_tunnels_state = itm_state.tunnels_state(Model.OPERATIONAL, args)
if "l3vpn_vpn_interfaces" in models:
if "odl_fib_fib_entries" in models:
self.odl_fib_fib_entries = odl_fib.fib_entries(Model.CONFIG, args)
if "odl_interface_meta_if_index_interface_map" in models:
- self.odl_interface_meta_if_index_interface_map = odl_interface_meta.if_indexes_interface_map(Model.OPERATIONAL, args)
+ self.odl_interface_meta_if_index_interface_map = \
+ odl_interface_meta.if_indexes_interface_map(Model.OPERATIONAL, args)
if "odl_inventory_nodes_config" in models:
self.odl_inventory_nodes_config = opendaylight_inventory.nodes(Model.CONFIG, args)
if "odl_inventory_nodes_operational" in models:
def get_ccl(self, parent, child, item):
c = self.data and self.data.get(parent, {})
- l = self.get_list(c, child, item)
- return l
+ lst = self.get_list(c, child, item)
+ return lst
def get_ccl_by_key(self, parent, child, item, key="uuid"):
d = {}
class IfIndexesInterfaceMap(Model):
CONTAINER = "if-indexes-interface-map"
CLIST = "if-index-interface"
- CLIST_KEY= "if-index"
+ CLIST_KEY = "if-index"
dpnid = self.get_dpn_from_ofnodeid(node['id'])
nodes_dict[dpnid] = node.get('flow-node-inventory:description', '')
return nodes_dict
-
d = self.interfaces_state.get_clist_by_key()
self.assertIsNotNone(d.get('tap67eb9b7f-db'))
+
if __name__ == '__main__':
unittest.main()
self.tunnels_state = tunnels_state(Model.OPERATIONAL, args)
def test_read_file(self):
- logger.debug("dpn-endpoints: %s",self.dpn_endpoints.data)
- logger.debug("dpn-endpoints: \n%s",self.dpn_endpoints.pretty_format(self.dpn_endpoints.data))
+ logger.debug("dpn-endpoints: %s", self.dpn_endpoints.data)
+ logger.debug("dpn-endpoints: \n%s", self.dpn_endpoints.pretty_format(self.dpn_endpoints.data))
def test_get_ip_address(self):
dpn_ids = self.dpn_endpoints.get_dpn_ids()
d = self.tunnels_state.get_clist_by_key()
self.assertIsNotNone(d and d['tun428ee8c4fe7'])
+
if __name__ == '__main__':
unittest.main()
d = self.network_topology.get_nodes_by_tid_and_key()
self.assertIsNotNone(d.get('ovsdb://uuid/8eabb815-5570-42fc-9635-89c880ebc4ac/bridge/br-int'))
+
if __name__ == '__main__':
unittest.main()
d = self.neutron.get_trunks_by_key()
self.assertIsNotNone(d.get('8e3c262e-7b45-4222-ac4e-528db75e5516'))
+
if __name__ == '__main__':
unittest.main()
d = self.fib_entries.get_vrf_entries_by_key()
self.assertIsNotNone(d.get(100011))
+
if __name__ == '__main__':
unittest.main()
d = self.nodes.get_dpn_host_mapping()
self.assertIsNotNone(d.get('132319289050514'))
+
if __name__ == '__main__':
unittest.main()
data = request.read_file(self.filename)
self.assertEquals(len(data), 1)
+
if __name__ == '__main__':
unittest.main()
print "Tunnel: \n{}".format(utils.format_json(args, tunnel))
if tunState:
print "TunState: \n{}".format(utils.format_json(args, tunState))
- if ifstate:
- ncId = ifstate.get('lower-layer-if')[0]
- nodeid = ncId[:ncId.rindex(':')]
- # analyze_inventory(nodeid, True, ncId, ifname)
- # analyze_inventory(nodeid, False, ncId, ifname)
+ # if ifstate:
+ # ncid = ifstate.get('lower-layer-if')[0]
+ # nodeid = ncid[:ncid.rindex(':')]
+ # analyze_inventory(nodeid, True, ncid, ifname)
+ # analyze_inventory(nodeid, False, ncid, ifname)
def analyze_trunks(args):
ietf_interfaces_interfaces = ietf_interfaces.interfaces(Model.CONFIG, args)
- ietf_interfaces_interfaces_state = ietf_interfaces.interfaces_state(Model.OPERATIONAL, args)
+ # ietf_interfaces_interfaces_state = ietf_interfaces.interfaces_state(Model.OPERATIONAL, args)
l3vpn_vpn_interfaces = l3vpn.vpn_instance_to_vpn_id(Model.CONFIG, args)
neutron_neutron = neutron.neutron(Model.CONFIG, args)
ntrunks = neutron_neutron.get_trunks_by_key()
vpninterfaces = l3vpn_vpn_interfaces.get_clist_by_key()
ifaces = ietf_interfaces_interfaces.get_clist_by_key()
- ifstates = ietf_interfaces_interfaces_state.get_clist_by_key()
+ # ifstates = ietf_interfaces_interfaces_state.get_clist_by_key()
subport_dict = {}
for v in ntrunks.itervalues():
nport = nports.get(v.get('port-id'))
snport = nports.get(sport_id)
svpniface = vpninterfaces.get(sport_id)
siface = ifaces.get(sport_id)
- sifstate = ifstates.get(sport_id)
+ # sifstate = ifstates.get(sport_id)
subport['SubNeutronPort'] = 'Correct' if snport else 'Wrong'
subport['SubVpnInterface'] = 'Correct' if svpniface else 'Wrong'
subport['ofport'] = Model.get_ofport_from_ncid()
if siface:
vlan_mode = siface.get('odl-interface:l2vlan-mode')
parent_iface_id = siface.get('odl-interface:parent-interface')
- if vlan_mode !='trunk-member':
+ if vlan_mode != 'trunk-member':
subport['SubIface'] = 'WrongMode'
- elif parent_iface_id !=v.get('port-id'):
+ elif parent_iface_id != v.get('port-id'):
subport['SubIface'] = 'WrongParent'
- elif siface.get('odl-interface:vlan-id') !=subport.get('segmentation-id'):
+ elif siface.get('odl-interface:vlan-id') != subport.get('segmentation-id'):
subport['SubIface'] = 'WrongVlanId'
else:
subport['SubIface'] = 'Correct'
else:
subport['SubIface'] = 'Wrong'
- s_subport = 'SegId:{}, PortId:{}, SubNeutronPort:{}, SubIface:{}, SubVpnIface:{}'.format(
- subport.get('segmentation-id'), subport.get('port-id'),
- subport.get('SubNeutronPort'),
- subport.get('SubIface'),
- subport.get('SubVpnInterface'))
+ # s_subport = 'SegId:{}, PortId:{}, SubNeutronPort:{}, SubIface:{}, SubVpnIface:{}'.format(
+ # subport.get('segmentation-id'), subport.get('port-id'),
+ # subport.get('SubNeutronPort'),
+ # subport.get('SubIface'),
+ # subport.get('SubVpnInterface'))
s_subports.append(subport)
subport_dict[subport['port-id']] = subport
- s_trunk = 'TrunkName:{}, TrunkId:{}, PortId:{}, NeutronPort:{}, SubPorts:{}'.format(
- v.get('name'), v.get('uuid'), v.get('port-id'),
- 'Correct' if nport else 'Wrong', utils.format_json(args, s_subports))
- print s_trunk
- print '\n------------------------------------'
- print 'Analyzing Flow status for SubPorts'
- print '------------------------------------'
- for flow in utils.sort(flows.get_all_flows(['ifm'], ['vlanid']), 'ifname'):
- subport = subport_dict.get(flow.get('ifname')) or None
- vlanid = subport.get('segmentation-id') if subport else None
- ofport = subport.get('ofport') if subport else None
- flow_status = 'Okay'
- if flow.get('ofport') and flow.get('ofport') != ofport:
- flow_status = 'OfPort mismatch for SubPort:{} and Flow:{}'.format(subport, flow.get('flow'))
- if flow.get('vlanid') and flow.get('vlanid') != vlanid:
- flow_status = 'VlanId mismatch for SubPort:{} and Flow:{}'.format(subport, flow.get('flow'))
- if subport:
- print 'SubPort:{},Table:{},FlowStatus:{}'.format(
- subport.get('port-id'), flow.get('table'), flow_status)
+ s_trunk = 'TrunkName:{}, TrunkId:{}, PortId:{}, NeutronPort:{}, SubPorts:{}'.format(
+ v.get('name'), v.get('uuid'), v.get('port-id'),
+ 'Correct' if nport else 'Wrong', utils.format_json(args, s_subports))
+ print s_trunk
+ print '\n------------------------------------'
+ print 'Analyzing Flow status for SubPorts'
+ print '------------------------------------'
+ for flow in utils.sort(flows.get_all_flows(['ifm'], ['vlanid']), 'ifname'):
+ subport = subport_dict.get(flow.get('ifname')) or None
+ vlanid = subport.get('segmentation-id') if subport else None
+ ofport = subport.get('ofport') if subport else None
+ flow_status = 'Okay'
+ if flow.get('ofport') and flow.get('ofport') != ofport:
+ flow_status = 'OfPort mismatch for SubPort:{} and Flow:{}'.format(subport, flow.get('flow'))
+ if flow.get('vlanid') and flow.get('vlanid') != vlanid:
+ flow_status = 'VlanId mismatch for SubPort:{} and Flow:{}'.format(subport, flow.get('flow'))
+ if subport:
+ print 'SubPort:{},Table:{},FlowStatus:{}'.format(
+ subport.get('port-id'), flow.get('table'), flow_status)
def analyze_neutron_port(port, iface, ifstate):
print "node: {} was not found".format("openflow:" + args.nodeid)
return
tables = node.get(Nodes.NODE_TABLE)
- groups = node.get(Nodes.NODE_GROUP)
+ # groups = node.get(Nodes.NODE_GROUP)
flow_list = []
print "Flows:"
for table in tables:
global gmodels
gmodels = Models()
gmodels.get_models(args, models)
-
-
def parse_flow(flow):
- #parse flow fields
- #hex(int(mask, 16) & int(data, 16))
+ # parse flow fields
+ # hex(int(mask, 16) & int(data, 16))
if flow['cookie']:
utils.to_hex(flow, 'cookie')
# parse instructions
for instruction in flow['instructions'].get('instruction', []):
if 'write-metadata' in instruction:
- utils.to_hex(instruction['write-metadata'],'metadata')
- utils.to_hex(instruction['write-metadata'],'metadata-mask')
+ utils.to_hex(instruction['write-metadata'], 'metadata')
+ utils.to_hex(instruction['write-metadata'], 'metadata-mask')
if 'apply-actions' in instruction:
for action in instruction['apply-actions'].get('action', []):
if 'openflowplugin-extension-nicira-action:nx-reg-load' in action:
# parse matches
if 'metadata' in flow['match']:
metadata = flow['match']['metadata']
- utils.to_hex(metadata,'metadata')
- utils.to_hex(metadata,'metadata-mask')
+ utils.to_hex(metadata, 'metadata')
+ utils.to_hex(metadata, 'metadata-mask')
for ofex in flow['match'].get('openflowplugin-extension-general:extension-list', []):
if ofex['extension-key'] == 'openflowplugin-extension-nicira-match:nxm-nx-reg6-key':
import collections
import logging
-from odltools.mdsal.models import ietf_interfaces
-from odltools.mdsal.models import odl_fib
-from odltools.mdsal.models import odl_interface_meta
-from odltools.mdsal.models import odl_l3vpn
-from odltools.mdsal.models import opendaylight_inventory
from odltools.mdsal.models.model import Model
-from odltools.mdsal.models.models import Models
from odltools.mdsal.models.opendaylight_inventory import Nodes
from odltools.netvirt import utils
import config
ifaces = {}
ifstates = {}
ifindexes = {}
- bindings = {}
+ # bindings = {}
einsts = {}
eifaces = {}
fibentries = {}
if not is_elantag_valid(eltag, eifaces, einsts, iface):
flow_info['reason'] = 'Lport Elantag mismatch'
return create_flow_dict(flow_info, flow)
- #return create_flow_dict(flow_info, flow)
+ # return create_flow_dict(flow_info, flow)
return None
# print einst_tag, flow_etag, mac_host
if flow_etag and einst_tag and flow_etag == einst_tag:
if mac_host.startswith(flow_host):
- act_resubmit = get_act_resubmit(flow)
- if (act_resubmit and act_resubmit.get('table') == 220):
+ act_resubmit = flow_parser.get_act_resubmit(flow)
+ if act_resubmit and act_resubmit.get('table') == 220:
return 'Correct'
else:
- act_tunnel = get_act_set_tunnel(flow)
+ act_tunnel = flow_parser.get_act_set_tunnel(flow)
if act_tunnel:
return 'Correct'
return 'Wrong'
ifaces = {}
ifstates = {}
ifindexes = {}
- bindings = {}
+ # bindings = {}
einsts = {}
eifaces = {}
fibentries = {}
vpninterfaces = {}
groups = {}
table_list = list(set([table for module in modules for table in TABLE_MAP[module]]))
- ##table_list = [214, 244]
+ # table_list = [214, 244]
of_nodes = config.gmodels.odl_inventory_nodes_config.get_clist_by_key()
if 'ifm' in modules:
if 'ifm' in modules and table['id'] in TABLE_MAP['ifm']:
flow_dict = stale_ifm_flow(flow, flow_info, ifaces, ifstates)
if 'l3vpn' in modules and table['id'] in TABLE_MAP['l3vpn']:
- flow_dict = stale_l3vpn_flow(flow, flow_info, groups, ifaces, ifindexes, vpnids, vpninterfaces, fibentries)
+ flow_dict = stale_l3vpn_flow(flow, flow_info, groups, ifaces, ifindexes, vpnids,
+ vpninterfaces, fibentries)
if 'elan' in modules and table['id'] in TABLE_MAP['elan']:
flow_dict = stale_elan_flow(flow, flow_info, ifaces, ifindexes, einsts, eifaces)
if 'acl' in modules and table['id'] in TABLE_MAP['acl']:
flow['table'], host, flow['id'],
utils.show_optionals(flow))
print result
- ##path = get_data_path('flows', flow)
- #print('http://192.168.2.32:8383/restconf/config/{}'.format(path))
- #print 'Flow:', utils.format_json(args, flow_parser.parse_flow(flow['flow']))
+ # path = get_data_path('flows', flow)
+ # print('http://192.168.2.32:8383/restconf/config/{}'.format(path))
+ # print 'Flow:', utils.format_json(args, flow_parser.parse_flow(flow['flow']))
def show_elan_flows(args):
host = compute_map.get(flow.get('dpnid'), flow.get('dpnid'))
result = 'MacHost:{}{}, Table:{}, FlowId:{}, {}, Flow:{}'.format(
flow['id'][-17:], host, flow['table'], flow['id'], utils.show_optionals(flow),
- utils.format_json(args, flow_parser.parse_flow(flow['flow'])))
+ utils.format_json(args, flow_parser.parse_flow(flow['flow'])))
print result
- #print 'Flow:', utils.format_json(args, flow_parser.parse_flow(flow['flow']))
+ # print 'Flow:', utils.format_json(args, flow_parser.parse_flow(flow['flow']))
def get_matchstr(args, flow):
flow = flow_info.get('flow')
dpnid = flow_info.get('dpnid')
host = compute_map.get(dpnid, dpnid)
- if ((flow_info.get('table') == 50 and
- flow.get('idle-timeout') == 300 and not
- nports.get(flow_info.get('src-mac'))) or
- (flow_info.get('table') == 51 and
- not nports.get(flow_info.get('dst-mac')))):
+ if ((flow_info.get('table') == 50 and flow.get('idle-timeout') == 300
+ and not nports.get(flow_info.get('src-mac')))
+ or (flow_info.get('table') == 51 and not nports.get(flow_info.get('dst-mac')))): # NOQA
+
result = 'Table:{}, Host:{}, FlowId:{}{}'.format(
flow_info.get('table'), host, flow.get('id'),
utils.show_optionals(flow_info))
compute_map = config.gmodels.odl_inventory_nodes_operational.get_dpn_host_mapping()
# neutron_neutron = neutron.neutron(Model.CONFIG, args)
nports = config.gmodels.neutron_neutron.get_ports_by_key()
- logger.info("dump_flows: %s", args)
for flow in utils.sort(get_all_flows(modules, filter_by), sort_by):
host = compute_map.get(flow.get('dpnid'), flow.get('dpnid'))
ip_list = get_ips_for_iface(nports, flow.get('ifname'))
if dpn and dpn != flow_info['dpnid']:
flow_info['reason'] = 'DpnId mismatch for flow and Interface'
return create_flow_dict(flow_info, flow)
- if (flow_info.get('lport') and ifstate.get('if-index')
- and flow_info['lport'] != ifstate['if-index']):
+ if flow_info.get('lport') and ifstate.get('if-index') and flow_info['lport'] != ifstate['if-index']:
flow_info['reason'] = 'Lport and IfIndex mismatch'
return create_flow_dict(flow_info, flow)
if (flow_info.get('ofport') and ifstate.get('lower-layer-if')
- and flow_info['ofport'] != Model.get_ofport_from_ncid(ifstate.get('lower-layer-if')[0])):
+ and flow_info['ofport'] != Model.get_ofport_from_ncid(ifstate.get('lower-layer-if')[0])): # NOQA
flow_info['reason'] = 'OfPort mismatch'
if (flow_info.get('vlanid') and iface.get('odl-interface:vlan-id')
- and flow_info['vlanid'] != iface.get('odl-interface:vlan-id')):
+ and flow_info['vlanid'] != iface.get('odl-interface:vlan-id')): # NOQA
flow_info['reason'] = 'VlanId mismatch'
return None
# return create_flow_dict(flow_info, flow)
logger.error("init: data is not a supported type")
return
self.start = 0
- logger.info("init: Copied %d lines", len(self.data))
+ logger.debug("init: Copied %d lines", len(self.data))
self.process_data()
self.format_data()
- logger.info("init: data has been processed and formatted")
+ logger.debug("init: data has been processed and formatted")
def pretty_print(self, data):
return "{}".format(pformat(data))
# Create a dictionary of all tokens in that flow.
# Append this flow dictionary to a list of flows.
for line in self.data[self.start:end]:
- pline = {}
- pline[Flows.IDLE_TIMEOUT] = "---"
- pline[Flows.SEND_FLOW_REMOVED] = "-"
+ pline = {Flows.IDLE_TIMEOUT: "---", Flows.SEND_FLOW_REMOVED: "-"}
tokens = line.split(" ")
for token in tokens:
# most lines are key=value so look for that pattern
self.pdata.append(pline)
logger.debug("process_data: Processed line %d into: \n%s",
self.start + len(self.pdata), pformat(pline))
- logger.info("process_data: Processed %d lines, skipped %d", len(self.pdata),
- self.start + len(self.data) - end)
+ logger.debug("process_data: Processed %d lines, skipped %d", len(self.pdata),
+ self.start + len(self.data) - end)
return self.pdata
for i, line in enumerate(self.pdata):
logger.debug("format_data: processing line %d: %s", self.start + i + 1, line)
- #if Flows.SEND_FLOW_REMOVED in line:
- # send_flow_rem = " {} ".format(line[Flows.SEND_FLOW_REMOVED])
- #else:
- # send_flow_rem = ""
-
- #if Flows.IDLE_TIMEOUT in line:
- # idle_timeo = " {}={}".format(Flows.IDLE_TIMEOUT, line[Flows.IDLE_TIMEOUT])
- #else:
- # idle_timeo = ""
-
if Flows.ACTIONS in line:
nactions = re_gt.sub(self.re_table, line[Flows.ACTIONS])
else:
stale_ids, bindings = flows.get_stale_bindings(args)
for iface_id in sorted(stale_ids):
for binding in bindings[iface_id].itervalues():
- #if binding.get('bound-services'):
+ # if binding.get('bound-services'):
path = get_data_path('bindings', binding)
print utils.format_json(bindings[iface_id])
print('http://{}:{}/restconf/config/{}'.format(args.ip, args.port, path))
def test_analyze_interface(self):
analyze.analyze_interface(self.args)
+
if __name__ == '__main__':
unittest.main()
self.flows = ovs_flows.Flows(self.data)
def test_process_data(self):
- print "pretty_print:\n{}".format(self.flows.pretty_print(self.flows.pdata))
+ # print "pretty_print:\n{}".format(self.flows.pretty_print(self.flows.pdata))
+ self.assertIsNotNone(self.flows.data)
def test_format_data(self):
- print "pretty_print:\n{}".format(self.flows.pretty_print(self.flows.fdata))
+ # print "pretty_print:\n{}".format(self.flows.pretty_print(self.flows.fdata))
+ self.assertIsNotNone(self.flows.fdata)
def test_write_file(self):
filename = "/tmp/flow_dumps.3.out.txt"
self.flows.write_fdata(filename)
self.assertTrue(os.path.exists(filename))
+
if __name__ == '__main__':
unittest.main()
request.write_file(self.outpath, data)
self.assertTrue(os.path.exists(self.outpath))
+
if __name__ == '__main__':
unittest.main()
def test_show_tables(self):
show.show_tables(self.args)
+
if __name__ == '__main__':
unittest.main()
def test_get_table_name(self):
self.assertEqual(tables.get_table_name(17), "DISPATCHER")
+
if __name__ == '__main__':
unittest.main()
args = parser.parse_args(['csit', self.DATAPATH, self.OUTPATH, '-g', '-d'])
robotfiles.run(args)
+
if __name__ == '__main__':
unittest.main()
def run(self):
os.system('rm -vrf ./build ./dist ./*.pyc ./*.tgz ./*.egg-info')
+
setup(
name='odltools',
version=__version__,
[tox]
-envlist = py27
+envlist = py27, flake8
[testenv]
deps = discover
[testenv:flake8]
deps = flake8
-basepython = python2.7
-commands = flake8 .
\ No newline at end of file
+commands = flake8
+
+[flake8]
+max-line-length = 119