class TransportPCEtesting(unittest.TestCase):
processes = None
- restconf_baseurl = "http://localhost:8181/restconf"
@classmethod
def setUpClass(cls):
# Connect the ROADMA
def test_01_connect_rdm(self):
- # Config ROADMA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-A1"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADM-A1",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['roadma']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADM-A1", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# Verify the termination points of the ROADMA
def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
responseTopo = requests.request(
- "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
+ "GET", urlTopo, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
resTopo = responseTopo.json()
nbNode = len(resTopo['network'][0]['node'])
nbMapCumul = 0
nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
print("nodeMapId={}".format(nodeMapId))
urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
- urlMapListFull = urlMapList.format(self.restconf_baseurl)
+ urlMapListFull = urlMapList.format(test_utils.RESTCONF_BASE_URL)
responseMapList = requests.request(
- "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
+ "GET", urlMapListFull, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
resMapList = responseMapList.json()
nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
if((not "CP" in tpId) and (not "CTP" in tpId)):
urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
- urlMapFull = urlMap.format(self.restconf_baseurl)
+ urlMapFull = urlMap.format(test_utils.RESTCONF_BASE_URL)
responseMap = requests.request(
- "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
+ "GET", urlMapFull, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(responseMap.status_code, requests.codes.ok)
if(responseMap.status_code == requests.codes.ok):
nbMapCurrent += 1
def test_03_disconnect_rdm(self):
url = ("{}/config/network-topology:"
"network-topology/topology/topology-netconf/node/ROADM-A1"
- .format(self.restconf_baseurl))
+ .format(test_utils.RESTCONF_BASE_URL))
data = {}
- headers = {'content-type': 'application/json'}
response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ "DELETE", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- # Config XPDRA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDR-A1"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDR-A1",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# #Verify the termination points related to XPDR
def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
def test_06_disconnect_device(self):
url = ("{}/config/network-topology:"
"network-topology/topology/topology-netconf/node/XPDR-A1"
- .format(self.restconf_baseurl))
+ .format(test_utils.RESTCONF_BASE_URL))
data = {}
- headers = {'content-type': 'application/json'}
response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ "DELETE", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)