3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
15 from common import test_utils
18 class TransportPCEPortMappingTesting(unittest.TestCase):
24 cls.processes = test_utils.start_tpce()
25 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
28 def tearDownClass(cls):
29 for process in cls.processes:
30 test_utils.shutdown_process(process)
31 print("all processes killed")
34 print("execution of {}".format(self.id().split(".")[-1]))
37 # def test_01_restconfAPI(self):
38 # url = ("{}/operational/network-topology:network-topology/topology/"
39 # "topology-netconf/node/controller-config".format(test_utils.RESTCONF_BASE_URL))
40 # response = requests.request("GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
41 # self.assertEqual(response.status_code, requests.codes.ok)
42 # res = response.json()
43 # self.assertEqual(res['node'] [0] ['netconf-node-topology:connection-status'],
46 # def test_02_restconfAPI(self):
47 # url = ("{}/config/transportpce-portmapping:network/nodes/controller-config".format(test_utils.RESTCONF_BASE_URL))
48 # response = requests.request(
49 # "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
50 # self.assertEqual(response.status_code, requests.codes.not_found)
51 # res = response.json()
53 # {"error-type":"application", "error-tag":"data-missing",
54 # "error-message":"Request could not be completed because the relevant data model content does not exist "},
55 # res['errors']['error'])
57 def test_01_rdm_device_connection(self):
58 response = test_utils.mount_device("ROADMA01", 'roadma')
59 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
61 def test_02_rdm_device_connected(self):
62 url = ("{}/operational/network-topology:"
63 "network-topology/topology/topology-netconf/node/ROADMA01"
64 .format(test_utils.RESTCONF_BASE_URL))
65 response = requests.request(
66 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
67 self.assertEqual(response.status_code, requests.codes.ok)
70 res['node'][0]['netconf-node-topology:connection-status'],
74 def test_03_rdm_portmapping_info(self):
75 url = ("{}/config/transportpce-portmapping:network/"
76 "nodes/ROADMA01/node-info"
77 .format(test_utils.RESTCONF_BASE_URL))
78 response = requests.request(
79 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
80 self.assertEqual(response.status_code, requests.codes.ok)
83 {u'node-info': {u'node-type': u'rdm',
84 u'node-ip-address': u'127.0.0.12',
85 u'node-clli': u'NodeA',
86 u'openroadm-version': u'1.2.1', u'node-vendor': u'vendorA',
87 u'node-model': u'2'}},
91 def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
92 url = ("{}/config/transportpce-portmapping:network/"
93 "nodes/ROADMA01/mapping/DEG1-TTP-TXRX"
94 .format(test_utils.RESTCONF_BASE_URL))
95 response = requests.request(
96 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
97 self.assertEqual(response.status_code, requests.codes.ok)
100 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
101 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
104 def test_05_rdm_portmapping_SRG1_PP7_TXRX(self):
105 url = ("{}/config/transportpce-portmapping:network/"
106 "nodes/ROADMA01/mapping/SRG1-PP7-TXRX"
107 .format(test_utils.RESTCONF_BASE_URL))
108 response = requests.request(
109 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
110 self.assertEqual(response.status_code, requests.codes.ok)
111 res = response.json()
113 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
114 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
117 def test_06_rdm_portmapping_SRG3_PP1_TXRX(self):
118 url = ("{}/config/transportpce-portmapping:network/"
119 "nodes/ROADMA01/mapping/SRG3-PP1-TXRX"
120 .format(test_utils.RESTCONF_BASE_URL))
121 response = requests.request(
122 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
126 {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
127 'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
130 def test_07_xpdr_device_connection(self):
131 response = test_utils.mount_device("XPDRA01", 'xpdra')
132 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
134 def test_08_xpdr_device_connected(self):
135 url = ("{}/operational/network-topology:"
136 "network-topology/topology/topology-netconf/node/XPDRA01"
137 .format(test_utils.RESTCONF_BASE_URL))
138 response = requests.request(
139 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
140 self.assertEqual(response.status_code, requests.codes.ok)
141 res = response.json()
143 res['node'][0]['netconf-node-topology:connection-status'],
147 def test_09_xpdr_portmapping_info(self):
148 url = ("{}/config/transportpce-portmapping:network/"
149 "nodes/XPDRA01/node-info"
150 .format(test_utils.RESTCONF_BASE_URL))
151 response = requests.request(
152 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
153 self.assertEqual(response.status_code, requests.codes.ok)
154 res = response.json()
156 {u'node-info': {u'node-type': u'xpdr',
157 u'node-ip-address': u'127.0.0.10',
158 u'node-clli': u'NodeA',
159 u'openroadm-version': u'1.2.1', u'node-vendor': u'vendorA',
160 u'node-model': u'1'}},
164 def test_10_xpdr_portmapping_NETWORK1(self):
165 url = ("{}/config/transportpce-portmapping:network/"
166 "nodes/XPDRA01/mapping/XPDR1-NETWORK1"
167 .format(test_utils.RESTCONF_BASE_URL))
168 response = requests.request(
169 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
173 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
174 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
175 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
176 'lcp-hash-val': 'OSvMgUyP+mE='},
179 def test_11_xpdr_portmapping_NETWORK2(self):
180 url = ("{}/config/transportpce-portmapping:network/"
181 "nodes/XPDRA01/mapping/XPDR1-NETWORK2"
182 .format(test_utils.RESTCONF_BASE_URL))
183 response = requests.request(
184 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
188 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
189 'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
190 'connection-map-lcp': 'XPDR1-CLIENT3', 'port-qual': 'xpdr-network',
191 'lcp-hash-val': 'OSvMgUyP+mI='},
194 def test_12_xpdr_portmapping_CLIENT1(self):
195 url = ("{}/config/transportpce-portmapping:network/"
196 "nodes/XPDRA01/mapping/XPDR1-CLIENT1"
197 .format(test_utils.RESTCONF_BASE_URL))
198 response = requests.request(
199 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
200 self.assertEqual(response.status_code, requests.codes.ok)
201 res = response.json()
203 {'supporting-port': 'C1',
204 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
205 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
206 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
207 'lcp-hash-val': 'AO9UFkY/TLYw'},
210 def test_13_xpdr_portmapping_CLIENT2(self):
211 url = ("{}/config/transportpce-portmapping:network/"
212 "nodes/XPDRA01/mapping/XPDR1-CLIENT2"
213 .format(test_utils.RESTCONF_BASE_URL))
214 response = requests.request(
215 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
219 {'supporting-port': 'C2',
220 'supporting-circuit-pack-name': '1/0/C2-PLUG-CLIENT',
221 'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
222 'port-qual': 'xpdr-client',
223 'lcp-hash-val': 'AO9UFkY/TLYz'},
226 def test_14_xpdr_portmapping_CLIENT3(self):
227 url = ("{}/config/transportpce-portmapping:network/"
228 "nodes/XPDRA01/mapping/XPDR1-CLIENT3"
229 .format(test_utils.RESTCONF_BASE_URL))
230 response = requests.request(
231 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
235 {'supporting-port': 'C3',
236 'supporting-circuit-pack-name': '1/0/C3-PLUG-CLIENT',
237 'logical-connection-point': 'XPDR1-CLIENT3',
238 'connection-map-lcp': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
239 'port-qual': 'xpdr-client', 'lcp-hash-val': 'AO9UFkY/TLYy'},
242 def test_15_xpdr_portmapping_CLIENT4(self):
243 url = ("{}/config/transportpce-portmapping:network/"
244 "nodes/XPDRA01/mapping/XPDR1-CLIENT4"
245 .format(test_utils.RESTCONF_BASE_URL))
246 response = requests.request(
247 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
251 {'supporting-port': 'C4',
252 'supporting-circuit-pack-name': '1/0/C4-PLUG-CLIENT',
253 'logical-connection-point': 'XPDR1-CLIENT4', 'port-direction': 'bidirectional',
254 'port-qual': 'xpdr-client', 'lcp-hash-val': 'AO9UFkY/TLY1'},
257 def test_16_xpdr_device_disconnection(self):
258 response = test_utils.unmount_device("XPDRA01")
259 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
261 def test_17_xpdr_device_disconnected(self):
262 url = ("{}/operational/network-topology:network-topology/topology/"
263 "topology-netconf/node/XPDRA01".format(test_utils.RESTCONF_BASE_URL))
264 response = requests.request(
265 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
266 self.assertEqual(response.status_code, requests.codes.not_found)
267 res = response.json()
269 {"error-type": "application", "error-tag": "data-missing",
270 "error-message": "Request could not be completed because the relevant data model content does not exist"},
271 res['errors']['error'])
273 def test_18_xpdr_device_not_connected(self):
274 url = ("{}/config/transportpce-portmapping:network/nodes/XPDRA01".format(test_utils.RESTCONF_BASE_URL))
275 response = requests.request(
276 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
277 self.assertEqual(response.status_code, requests.codes.not_found)
278 res = response.json()
280 {"error-type": "application", "error-tag": "data-missing",
281 "error-message": "Request could not be completed because the relevant data model content does not exist"},
282 res['errors']['error'])
284 def test_19_rdm_device_disconnection(self):
285 response = test_utils.unmount_device("ROADMA01")
286 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
288 def test_20_rdm_device_disconnected(self):
289 url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADMA01"
290 .format(test_utils.RESTCONF_BASE_URL))
291 response = requests.request(
292 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
293 self.assertEqual(response.status_code, requests.codes.not_found)
294 res = response.json()
296 {"error-type": "application", "error-tag": "data-missing",
297 "error-message": "Request could not be completed because the relevant data model content does not exist"},
298 res['errors']['error'])
300 def test_21_rdm_device_not_connected(self):
301 url = ("{}/config/transportpce-portmapping:network/nodes/ROADMA01".format(test_utils.RESTCONF_BASE_URL))
302 response = requests.request(
303 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
304 self.assertEqual(response.status_code, requests.codes.not_found)
305 res = response.json()
307 {"error-type": "application", "error-tag": "data-missing",
308 "error-message": "Request could not be completed because the relevant data model content does not exist"},
309 res['errors']['error'])
312 if __name__ == "__main__":
313 unittest.main(verbosity=2)