# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import json
-import signal
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+# a pylint false positive due to unittest
+# pylint: disable=no-self-use
+
import time
import unittest
import requests
-import psutil
-import test_utils
+from common import test_utils
class TransportPCEtesting(unittest.TestCase):
- honeynode_process1 = None
- honeynode_process2 = None
- odl_process = None
- restconf_baseurl = "http://localhost:8181/restconf"
-
-# START_IGNORE_XTESTING
+ processes = None
@classmethod
def setUpClass(cls):
- print("starting honeynode1...")
- cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
- time.sleep(20)
-
- print("starting honeynode2...")
- cls.honeynode_process2 = test_utils.start_roadma_honeynode()
- time.sleep(20)
-
- print("starting opendaylight...")
- cls.odl_process = test_utils.start_tpce()
- time.sleep(60)
- print("opendaylight started")
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
@classmethod
def tearDownClass(cls):
- for child in psutil.Process(cls.odl_process.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.odl_process.send_signal(signal.SIGINT)
- cls.odl_process.wait()
- for child in psutil.Process(cls.honeynode_process1.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process1.send_signal(signal.SIGINT)
- cls.honeynode_process1.wait()
- for child in psutil.Process(cls.honeynode_process2.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process2.send_signal(signal.SIGINT)
- cls.honeynode_process2.wait()
+ # pylint: disable=not-an-iterable
+ for process in cls.processes:
+ test_utils.shutdown_process(process)
+ print("all processes killed")
def setUp(self):
time.sleep(10)
-# END_IGNORE_XTESTING
-
# Connect the ROADMA
def test_01_connect_rdm(self):
- # Config ROADMA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17831",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADMA01", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# Verify the termination points of the ROADMA
- def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
- urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- responseTopo = requests.request("GET", urlTopo, headers=headers, auth=('admin', 'admin'))
+ def test_02_compare_Openroadm_topology_portmapping_rdm(self):
+ responseTopo = test_utils.get_ordm_topo_request("")
resTopo = responseTopo.json()
nbNode = len(resTopo['network'][0]['node'])
- nbMapCumul = 0
- nbMappings = 0
for i in range(0, nbNode):
nodeId = resTopo['network'][0]['node'][i]['node-id']
nodeMapId = nodeId.split("-")[0]
- urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
- urlMapListFull = urlMapList.format(self.restconf_baseurl)
- responseMapList = requests.request("GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
- resMapList = responseMapList.json()
-
- nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
+ test_utils.portmapping_request(nodeMapId)
nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
- nbMapCurrent = 0
for j in range(0, nbTp):
tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
if((not "CP" in tpId) and (not "CTP" in tpId)):
- urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
- urlMapFull = urlMap.format(self.restconf_baseurl)
- responseMap = requests.request("GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(responseMap.status_code, requests.codes.ok)
- if responseMap.status_code == requests.codes.ok:
- nbMapCurrent += 1
- nbMapCumul += nbMapCurrent
- nbMappings -= nbMapCurrent
- self.assertEqual(nbMappings, 0)
+ test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
# Disconnect the ROADMA
def test_03_disconnect_rdm(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils.unmount_device("ROADMA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- # Config XPDRA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17830",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDRA01", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# #Verify the termination points related to XPDR
- def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
- self.test_02_compareOpenroadmTopologyPortMapping_rdm()
+ def test_05_compare_Openroadm_topology_portmapping_xpdr(self):
+ self.test_02_compare_Openroadm_topology_portmapping_rdm()
# Disconnect the XPDRA
def test_06_disconnect_device(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils.unmount_device("XPDRA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
if __name__ == "__main__":