-#END_IGNORE_XTESTING
-
- #Connect the ROADMA
- def test_01_connect_roadma(self):
- #Config ROADMA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17831",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- #seems sometimes to return 200 instead of 201
- #self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
-
- #Verify the termination points of the ROADMA
- def test_02_compareOpenroadmTopologyPortMapping(self):
- #Verify the termination points related to the SRGs
- nbSrg=1
- for s in range(1,nbSrg+1):
- url_topo="{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG"+`s`
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write('Config: '+`s`+' : '+url_topo+'\n')
- url = (url_topo.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_topo = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_topo.status_code, requests.codes.ok)
- res_topo = response_topo.json()
- nbTP=len(res_topo['node'][0]['ietf-network-topology:termination-point'])
- for i in range(0,nbTP):
- tp_id=res_topo['node'][0]['ietf-network-topology:termination-point'][i]['tp-id']
- if(not "CP" in tp_id):
- url_map="{}/config/transportpce-portmapping:network/nodes/ROADMA/mapping/"+tp_id
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write('Config: '+`i`+'/'+ `nbTP`+' : '+url_map+'\n')
- url = (url_map.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_portMap = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_portMap.status_code, requests.codes.ok)
-
- #Verify the termination points related to the degrees
- nbDeg=2
- for d in range(1,nbDeg+1):
- url_topo="{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG"+`d`
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write(url_topo+'\n')
- url = (url_topo.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_topo = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_topo.status_code, requests.codes.ok)
- res_topo = response_topo.json()
- nbTP=len(res_topo['node'][0]['ietf-network-topology:termination-point'])
- for i in range(0,nbTP):
- tp_id=res_topo['node'][0]['ietf-network-topology:termination-point'][i]['tp-id']
- if(not "CTP" in tp_id):
- url_map ="{}/config/transportpce-portmapping:network/nodes/ROADMA/mapping/"+tp_id
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write('Config: '+`i`+'/'+ `nbTP`+' : '+url_map+'\n')
- url = (url_map.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_portMap = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_portMap.status_code, requests.codes.ok)
- time.sleep(1)
-
- #Disconnect the ROADMA
- def test_03_disconnect_device(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- #Delete in the openroadm-network
-# url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMA"
-# .format(self.restconf_baseurl))
-# data = {}
-# headers = {'content-type': 'application/json'}
-# response = requests.request(
-# "DELETE", url, data=json.dumps(data), headers=headers,
-# auth=('admin', 'admin'))
-# self.assertEqual(response.status_code, requests.codes.ok)
-# time.sleep(5)
-
- #Connect the XPDRA
+ # Connect the ROADMA
+ def test_01_connect_rdm(self):
+ response = test_utils.mount_device("ROADMA01", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+
+ # Verify the termination points of the ROADMA
+ def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
+ urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
+ .format(test_utils.RESTCONF_BASE_URL))
+ responseTopo = requests.request("GET", urlTopo, headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ resTopo = responseTopo.json()
+ nbNode = len(resTopo['network'][0]['node'])
+ nbMapCumul = 0
+ nbMappings = 0
+ for i in range(0, nbNode):
+ nodeId = resTopo['network'][0]['node'][i]['node-id']
+ nodeMapId = nodeId.split("-")[0]
+ urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
+ urlMapListFull = urlMapList.format(test_utils.RESTCONF_BASE_URL)
+ responseMapList = requests.request("GET", urlMapListFull, headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ resMapList = responseMapList.json()
+
+ nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
+ nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
+ nbMapCurrent = 0
+ for j in range(0, nbTp):
+ tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
+ if((not "CP" in tpId) and (not "CTP" in tpId)):
+ urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
+ urlMapFull = urlMap.format(test_utils.RESTCONF_BASE_URL)
+ responseMap = requests.request("GET", urlMapFull, headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.assertEqual(responseMap.status_code, requests.codes.ok)
+ if responseMap.status_code == requests.codes.ok:
+ nbMapCurrent += 1
+ nbMapCumul += nbMapCurrent
+ nbMappings -= nbMapCurrent
+ self.assertEqual(nbMappings, 0)
+
+ # Disconnect the ROADMA
+ def test_03_disconnect_rdm(self):
+ response = test_utils.unmount_device("ROADMA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+
+# #Connect the XPDRA