3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
28 restconf_baseurl = "http://localhost:8181/restconf"
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
45 def test_01_connect_rdm(self):
47 url = ("{}/config/network-topology:"
48 "network-topology/topology/topology-netconf/node/ROADM-A1"
49 .format(self.restconf_baseurl))
51 "node-id": "ROADM-A1",
52 "netconf-node-topology:username": "admin",
53 "netconf-node-topology:password": "admin",
54 "netconf-node-topology:host": "127.0.0.1",
55 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
56 "netconf-node-topology:tcp-only": "false",
57 "netconf-node-topology:pass-through": {}}]}
58 headers = {'content-type': 'application/json'}
59 response = requests.request(
60 "PUT", url, data=json.dumps(data), headers=headers,
61 auth=('admin', 'admin'))
62 self.assertEqual(response.status_code, requests.codes.created)
65 # Verify the termination points of the ROADMA
66 def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
67 urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
68 .format(self.restconf_baseurl))
69 headers = {'content-type': 'application/json'}
70 responseTopo = requests.request(
71 "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
72 resTopo = responseTopo.json()
73 nbNode = len(resTopo['network'][0]['node'])
76 for i in range(0, nbNode):
77 nodeId = resTopo['network'][0]['node'][i]['node-id']
78 print("nodeId={}".format(nodeId))
79 nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
80 print("nodeMapId={}".format(nodeMapId))
81 urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
82 urlMapListFull = urlMapList.format(self.restconf_baseurl)
83 responseMapList = requests.request(
84 "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
85 resMapList = responseMapList.json()
87 nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
88 nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
90 for j in range(0, nbTp):
91 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
92 if((not "CP" in tpId) and (not "CTP" in tpId)):
93 urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
94 urlMapFull = urlMap.format(self.restconf_baseurl)
95 responseMap = requests.request(
96 "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
97 self.assertEqual(responseMap.status_code, requests.codes.ok)
98 if(responseMap.status_code == requests.codes.ok):
100 nbMapCumul += nbMapCurrent
101 nbMappings -= nbMapCurrent
102 self.assertEqual(nbMappings, 0)
104 # Disconnect the ROADMA
105 def test_03_disconnect_rdm(self):
106 url = ("{}/config/network-topology:"
107 "network-topology/topology/topology-netconf/node/ROADM-A1"
108 .format(self.restconf_baseurl))
110 headers = {'content-type': 'application/json'}
111 response = requests.request(
112 "DELETE", url, data=json.dumps(data), headers=headers,
113 auth=('admin', 'admin'))
114 self.assertEqual(response.status_code, requests.codes.ok)
117 def test_04_connect_xpdr(self):
119 url = ("{}/config/network-topology:"
120 "network-topology/topology/topology-netconf/node/XPDR-A1"
121 .format(self.restconf_baseurl))
123 "node-id": "XPDR-A1",
124 "netconf-node-topology:username": "admin",
125 "netconf-node-topology:password": "admin",
126 "netconf-node-topology:host": "127.0.0.1",
127 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
128 "netconf-node-topology:tcp-only": "false",
129 "netconf-node-topology:pass-through": {}}]}
130 headers = {'content-type': 'application/json'}
131 response = requests.request(
132 "PUT", url, data=json.dumps(data), headers=headers,
133 auth=('admin', 'admin'))
134 self.assertEqual(response.status_code, requests.codes.created)
137 # #Verify the termination points related to XPDR
138 def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
139 self.test_02_compareOpenroadmTopologyPortMapping_rdm()
141 # Disconnect the XPDRA
142 def test_06_disconnect_device(self):
143 url = ("{}/config/network-topology:"
144 "network-topology/topology/topology-netconf/node/XPDR-A1"
145 .format(self.restconf_baseurl))
147 headers = {'content-type': 'application/json'}
148 response = requests.request(
149 "DELETE", url, data=json.dumps(data), headers=headers,
150 auth=('admin', 'admin'))
151 self.assertEqual(response.status_code, requests.codes.ok)
154 if __name__ == "__main__":
155 # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
156 #logging.debug('I am there')
157 unittest.main(verbosity=2)