-#END_IGNORE_XTESTING
-
- #Connect the ROADMA
- def test_01_connect_roadma(self):
- #Config ROADMA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17831",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- #seems sometimes to return 200 instead of 201
- #self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
-
- #Verify the termination points of the ROADMA
- def test_02_compareOpenroadmTopologyPortMapping(self):
- #Verify the termination points related to the SRGs
- nbSrg=1
- for s in range(1,nbSrg+1):
- url_topo="{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG"+`s`
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write('Config: '+`s`+' : '+url_topo+'\n')
- url = (url_topo.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_topo = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_topo.status_code, requests.codes.ok)
- res_topo = response_topo.json()
- nbTP=len(res_topo['node'][0]['ietf-network-topology:termination-point'])
- for i in range(0,nbTP):
- tp_id=res_topo['node'][0]['ietf-network-topology:termination-point'][i]['tp-id']
- if(not "CP" in tp_id):
- url_map="{}/config/transportpce-portmapping:network/nodes/ROADMA/mapping/"+tp_id
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write('Config: '+`i`+'/'+ `nbTP`+' : '+url_map+'\n')
- url = (url_map.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_portMap = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_portMap.status_code, requests.codes.ok)
-
- #Verify the termination points related to the degrees
- nbDeg=2
- for d in range(1,nbDeg+1):
- url_topo="{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG"+`d`
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write(url_topo+'\n')
- url = (url_topo.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_topo = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_topo.status_code, requests.codes.ok)
- res_topo = response_topo.json()
- nbTP=len(res_topo['node'][0]['ietf-network-topology:termination-point'])
- for i in range(0,nbTP):
- tp_id=res_topo['node'][0]['ietf-network-topology:termination-point'][i]['tp-id']
- if(not "CTP" in tp_id):
- url_map ="{}/config/transportpce-portmapping:network/nodes/ROADMA/mapping/"+tp_id
- with open('./transportpce_tests/log/topoPortMap.log', 'a') as outfile1:
- outfile1.write('Config: '+`i`+'/'+ `nbTP`+' : '+url_map+'\n')
- url = (url_map.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_portMap = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response_portMap.status_code, requests.codes.ok)
- time.sleep(1)
-
- #Disconnect the ROADMA
- def test_03_disconnect_device(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- #Delete in the openroadm-network
-# url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMA"
-# .format(self.restconf_baseurl))
-# data = {}
-# headers = {'content-type': 'application/json'}
-# response = requests.request(
-# "DELETE", url, data=json.dumps(data), headers=headers,
-# auth=('admin', 'admin'))
-# self.assertEqual(response.status_code, requests.codes.ok)
-# time.sleep(5)
-
- #Connect the XPDRA
+ # Connect the ROADMA
+ def test_01_connect_rdm(self):
+ response = test_utils.mount_device("ROADMA01", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+
+ # Verify the termination points of the ROADMA
+ def test_02_compare_Openroadm_topology_portmapping_rdm(self):
+ responseTopo = test_utils.get_ordm_topo_request("")
+ resTopo = responseTopo.json()
+ nbNode = len(resTopo['network'][0]['node'])
+ for i in range(0, nbNode):
+ nodeId = resTopo['network'][0]['node'][i]['node-id']
+ nodeMapId = nodeId.split("-")[0]
+ test_utils.portmapping_request(nodeMapId)
+ nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
+ for j in range(0, nbTp):
+ tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
+ if((not "CP" in tpId) and (not "CTP" in tpId)):
+ test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
+
+ # Disconnect the ROADMA
+ def test_03_disconnect_rdm(self):
+ response = test_utils.unmount_device("ROADMA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+
+# #Connect the XPDRA