3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
21 class TransportPCETopologyTesting(unittest.TestCase):
28 restconf_baseurl = "http://localhost:8181/restconf"
30 # START_IGNORE_XTESTING
34 cls.sim_process1 = test_utils.start_sim('xpdra')
37 cls.sim_process2 = test_utils.start_sim('roadma')
40 cls.sim_process3 = test_utils.start_sim('roadmb')
43 cls.sim_process4 = test_utils.start_sim('roadmc')
45 print("all sims started")
47 cls.odl_process = test_utils.start_tpce()
49 print("opendaylight started")
52 def tearDownClass(cls):
53 for child in psutil.Process(cls.odl_process.pid).children():
54 child.send_signal(signal.SIGINT)
56 cls.odl_process.send_signal(signal.SIGINT)
57 cls.odl_process.wait()
58 for child in psutil.Process(cls.sim_process1.pid).children():
59 child.send_signal(signal.SIGINT)
61 cls.sim_process1.send_signal(signal.SIGINT)
62 cls.sim_process1.wait()
63 for child in psutil.Process(cls.sim_process2.pid).children():
64 child.send_signal(signal.SIGINT)
66 cls.sim_process2.send_signal(signal.SIGINT)
67 cls.sim_process2.wait()
69 for child in psutil.Process(cls.sim_process3.pid).children():
70 child.send_signal(signal.SIGINT)
72 cls.sim_process3.send_signal(signal.SIGINT)
73 cls.sim_process3.wait()
75 for child in psutil.Process(cls.sim_process4.pid).children():
76 child.send_signal(signal.SIGINT)
78 cls.sim_process4.send_signal(signal.SIGINT)
79 cls.sim_process4.wait()
86 def test_01_connect_ROADMA(self):
88 url = ("{}/config/network-topology:"
89 "network-topology/topology/topology-netconf/node/ROADMA01"
90 .format(self.restconf_baseurl))
92 "node-id": "ROADMA01",
93 "netconf-node-topology:username": "admin",
94 "netconf-node-topology:password": "admin",
95 "netconf-node-topology:host": "127.0.0.1",
96 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
97 "netconf-node-topology:tcp-only": "false",
98 "netconf-node-topology:pass-through": {}}]}
99 headers = {'content-type': 'application/json'}
100 response = requests.request(
101 "PUT", url, data=json.dumps(data), headers=headers,
102 auth=('admin', 'admin'))
103 self.assertEqual(response.status_code, requests.codes.created)
106 def test_02_getClliNetwork(self):
107 url = ("{}/config/ietf-network:networks/network/clli-network"
108 .format(self.restconf_baseurl))
109 headers = {'content-type': 'application/json'}
110 response = requests.request(
111 "GET", url, headers=headers, auth=('admin', 'admin'))
112 self.assertEqual(response.status_code, requests.codes.ok)
113 res = response.json()
114 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
115 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
117 def test_03_getOpenRoadmNetwork(self):
118 url = ("{}/config/ietf-network:networks/network/openroadm-network"
119 .format(self.restconf_baseurl))
120 headers = {'content-type': 'application/json'}
121 response = requests.request(
122 "GET", url, headers=headers, auth=('admin', 'admin'))
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
125 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADMA01')
126 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
127 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
128 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
129 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], '2')
131 def test_04_getLinks_OpenroadmTopology(self):
132 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
133 .format(self.restconf_baseurl))
134 headers = {'content-type': 'application/json'}
135 response = requests.request(
136 "GET", url, headers=headers, auth=('admin', 'admin'))
137 self.assertEqual(response.status_code, requests.codes.ok)
138 res = response.json()
139 # Tests related to links
140 nbLink = len(res['network'][0]['ietf-network-topology:link'])
141 self.assertEqual(nbLink, 10)
142 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
143 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
144 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
145 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
146 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
147 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
148 for i in range(0, nbLink):
149 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
150 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK':
151 find = linkId in expressLink
152 self.assertEqual(find, True)
153 expressLink.remove(linkId)
154 elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK':
155 find = linkId in addLink
156 self.assertEqual(find, True)
157 addLink.remove(linkId)
158 elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK':
159 find = linkId in dropLink
160 self.assertEqual(find, True)
161 dropLink.remove(linkId)
163 self.assertFalse(True)
164 self.assertEqual(len(expressLink), 0)
165 self.assertEqual(len(addLink), 0)
166 self.assertEqual(len(dropLink), 0)
168 def test_05_getNodes_OpenRoadmTopology(self):
169 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
170 .format(self.restconf_baseurl))
171 headers = {'content-type': 'application/json'}
172 response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
173 res = response.json()
174 # Tests related to nodes
175 self.assertEqual(response.status_code, requests.codes.ok)
176 nbNode = len(res['network'][0]['node'])
177 self.assertEqual(nbNode, 4)
178 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
179 for i in range(0, nbNode):
180 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
181 res['network'][0]['node'][i]['supporting-node'])
182 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
183 nodeId = res['network'][0]['node'][i]['node-id']
184 if nodeId == 'ROADMA01-SRG1':
185 # Test related to SRG1
186 self.assertEqual(nodeType, 'SRG')
187 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
188 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
189 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
190 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
191 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
192 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
193 res['network'][0]['node'][i]['supporting-node'])
194 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
195 res['network'][0]['node'][i]['supporting-node'])
196 listNode.remove(nodeId)
197 elif nodeId == 'ROADMA01-SRG3':
198 # Test related to SRG1
199 self.assertEqual(nodeType, 'SRG')
200 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
201 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
202 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
203 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
204 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
205 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
206 res['network'][0]['node'][i]['supporting-node'])
207 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
208 res['network'][0]['node'][i]['supporting-node'])
209 listNode.remove(nodeId)
210 elif nodeId == 'ROADMA01-DEG1':
211 # Test related to DEG1
212 self.assertEqual(nodeType, 'DEGREE')
213 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
214 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
215 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
216 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
217 listNode.remove(nodeId)
218 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
219 res['network'][0]['node'][i]['supporting-node'])
220 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
221 res['network'][0]['node'][i]['supporting-node'])
222 elif nodeId == 'ROADMA01-DEG2':
223 # Test related to DEG2
224 self.assertEqual(nodeType, 'DEGREE')
225 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
226 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
227 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
228 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
229 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
230 res['network'][0]['node'][i]['supporting-node'])
231 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
232 res['network'][0]['node'][i]['supporting-node'])
233 listNode.remove(nodeId)
235 self.assertFalse(True)
236 self.assertEqual(len(listNode), 0)
238 def test_06_connect_XPDRA(self):
239 url = ("{}/config/network-topology:"
240 "network-topology/topology/topology-netconf/node/XPDRA01"
241 .format(self.restconf_baseurl))
243 "node-id": "XPDRA01",
244 "netconf-node-topology:username": "admin",
245 "netconf-node-topology:password": "admin",
246 "netconf-node-topology:host": "127.0.0.1",
247 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
248 "netconf-node-topology:tcp-only": "false",
249 "netconf-node-topology:pass-through": {}}]}
250 headers = {'content-type': 'application/json'}
251 response = requests.request(
252 "PUT", url, data=json.dumps(data), headers=headers,
253 auth=('admin', 'admin'))
254 self.assertEqual(response.status_code, requests.codes.created)
257 def test_07_getClliNetwork(self):
258 url = ("{}/config/ietf-network:networks/network/clli-network"
259 .format(self.restconf_baseurl))
260 headers = {'content-type': 'application/json'}
261 response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
264 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
265 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
267 def test_08_getOpenRoadmNetwork(self):
268 url = ("{}/config/ietf-network:networks/network/openroadm-network"
269 .format(self.restconf_baseurl))
270 headers = {'content-type': 'application/json'}
271 response = requests.request(
272 "GET", url, headers=headers, auth=('admin', 'admin'))
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 nbNode = len(res['network'][0]['node'])
276 self.assertEqual(nbNode, 2)
277 for i in range(0, nbNode):
278 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
279 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
280 nodeId = res['network'][0]['node'][i]['node-id']
281 if(nodeId == 'XPDRA01'):
282 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
283 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
284 elif(nodeId == 'ROADMA01'):
285 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
286 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
288 self.assertFalse(True)
290 def test_09_getNodes_OpenRoadmTopology(self):
291 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
292 .format(self.restconf_baseurl))
293 headers = {'content-type': 'application/json'}
294 response = requests.request(
295 "GET", url, headers=headers, auth=('admin', 'admin'))
296 res = response.json()
297 # Tests related to nodes
298 self.assertEqual(response.status_code, requests.codes.ok)
299 nbNode = len(res['network'][0]['node'])
300 self.assertEqual(nbNode, 5)
301 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
302 for i in range(0, nbNode):
303 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
304 nodeId = res['network'][0]['node'][i]['node-id']
305 # Tests related to XPDRA nodes
306 if(nodeId == 'XPDRA01-XPDR1'):
307 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
308 res['network'][0]['node'][i]['supporting-node'])
309 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
310 res['network'][0]['node'][i]['supporting-node'])
311 self.assertEqual(nodeType, 'XPONDER')
312 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
315 for j in range(0, nbTps):
316 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
317 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
318 if (tpType == 'XPONDER-CLIENT'):
320 elif (tpType == 'XPONDER-NETWORK'):
322 if (tpId == 'XPDR1-NETWORK2'):
323 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
324 [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
325 if (tpId == 'XPDR1-CLIENT3'):
326 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
327 [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
328 self.assertTrue(client == 4)
329 self.assertTrue(network == 2)
330 listNode.remove(nodeId)
331 elif(nodeId == 'ROADMA01-SRG1'):
332 # Test related to SRG1
333 self.assertEqual(nodeType, 'SRG')
334 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
335 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
336 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
337 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
338 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
339 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
340 res['network'][0]['node'][i]['supporting-node'])
341 listNode.remove(nodeId)
342 elif(nodeId == 'ROADMA01-SRG3'):
343 # Test related to SRG1
344 self.assertEqual(nodeType, 'SRG')
345 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
346 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
347 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
348 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
349 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
350 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
351 res['network'][0]['node'][i]['supporting-node'])
352 listNode.remove(nodeId)
353 elif(nodeId == 'ROADMA01-DEG1'):
354 # Test related to DEG1
355 self.assertEqual(nodeType, 'DEGREE')
356 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
357 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
358 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
359 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
360 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
361 res['network'][0]['node'][i]['supporting-node'])
362 listNode.remove(nodeId)
363 elif(nodeId == 'ROADMA01-DEG2'):
364 # Test related to DEG2
365 self.assertEqual(nodeType, 'DEGREE')
366 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
367 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
368 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
369 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
370 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
371 res['network'][0]['node'][i]['supporting-node'])
372 listNode.remove(nodeId)
374 self.assertFalse(True)
375 self.assertEqual(len(listNode), 0)
377 # Connect the tail XPDRA to ROADMA and vice versa
378 def test_10_connect_tail_xpdr_rdm(self):
379 # Connect the tail: XPDRA to ROADMA
380 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
381 .format(self.restconf_baseurl))
382 data = {"networkutils:input": {
383 "networkutils:links-input": {
384 "networkutils:xpdr-node": "XPDRA01",
385 "networkutils:xpdr-num": "1",
386 "networkutils:network-num": "1",
387 "networkutils:rdm-node": "ROADMA01",
388 "networkutils:srg-num": "1",
389 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
393 headers = {'content-type': 'application/json'}
394 response = requests.request(
395 "POST", url, data=json.dumps(data), headers=headers,
396 auth=('admin', 'admin'))
397 self.assertEqual(response.status_code, requests.codes.ok)
399 def test_11_connect_tail_rdm_xpdr(self):
400 # Connect the tail: ROADMA to XPDRA
401 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
402 .format(self.restconf_baseurl))
403 data = {"networkutils:input": {
404 "networkutils:links-input": {
405 "networkutils:xpdr-node": "XPDRA01",
406 "networkutils:xpdr-num": "1",
407 "networkutils:network-num": "1",
408 "networkutils:rdm-node": "ROADMA01",
409 "networkutils:srg-num": "1",
410 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
414 headers = {'content-type': 'application/json'}
415 response = requests.request(
416 "POST", url, data=json.dumps(data), headers=headers,
417 auth=('admin', 'admin'))
418 self.assertEqual(response.status_code, requests.codes.ok)
420 def test_12_getLinks_OpenRoadmTopology(self):
421 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
422 .format(self.restconf_baseurl))
423 headers = {'content-type': 'application/json'}
424 response = requests.request(
425 "GET", url, headers=headers, auth=('admin', 'admin'))
426 self.assertEqual(response.status_code, requests.codes.ok)
427 res = response.json()
428 # Tests related to links
429 nbLink = len(res['network'][0]['ietf-network-topology:link'])
430 self.assertEqual(nbLink, 12)
431 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
432 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
433 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
434 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
435 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
436 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
437 XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
438 XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
439 for i in range(0, nbLink):
440 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
441 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
442 if(nodeType == 'EXPRESS-LINK'):
443 find = linkId in expressLink
444 self.assertEqual(find, True)
445 expressLink.remove(linkId)
446 elif(nodeType == 'ADD-LINK'):
447 find = linkId in addLink
448 self.assertEqual(find, True)
449 addLink.remove(linkId)
450 elif(nodeType == 'DROP-LINK'):
451 find = linkId in dropLink
452 self.assertEqual(find, True)
453 dropLink.remove(linkId)
454 elif(nodeType == 'XPONDER-INPUT'):
455 find = linkId in XPDR_IN
456 self.assertEqual(find, True)
457 XPDR_IN.remove(linkId)
458 elif(nodeType == 'XPONDER-OUTPUT'):
459 find = linkId in XPDR_OUT
460 self.assertEqual(find, True)
461 XPDR_OUT.remove(linkId)
463 self.assertFalse(True)
464 self.assertEqual(len(expressLink), 0)
465 self.assertEqual(len(addLink), 0)
466 self.assertEqual(len(dropLink), 0)
467 self.assertEqual(len(XPDR_IN), 0)
468 self.assertEqual(len(XPDR_OUT), 0)
470 def test_13_connect_ROADMC(self):
472 url = ("{}/config/network-topology:"
473 "network-topology/topology/topology-netconf/node/ROADMC01"
474 .format(self.restconf_baseurl))
476 "node-id": "ROADMC01",
477 "netconf-node-topology:username": "admin",
478 "netconf-node-topology:password": "admin",
479 "netconf-node-topology:host": "127.0.0.1",
480 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
481 "netconf-node-topology:tcp-only": "false",
482 "netconf-node-topology:pass-through": {}}]}
483 headers = {'content-type': 'application/json'}
484 response = requests.request(
485 "PUT", url, data=json.dumps(data), headers=headers,
486 auth=('admin', 'admin'))
487 self.assertEqual(response.status_code, requests.codes.created)
490 def test_14_omsAttributes_ROADMA_ROADMC(self):
491 # Config ROADMA01-ROADMC01 oms-attributes
492 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
493 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
494 "OMS-attributes/span"
495 .format(self.restconf_baseurl))
497 "auto-spanloss": "true",
498 "engineered-spanloss": 12.2,
499 "link-concatenation": [{
502 "SRLG-length": 100000,
504 headers = {'content-type': 'application/json'}
505 response = requests.request(
506 "PUT", url, data=json.dumps(data), headers=headers,
507 auth=('admin', 'admin'))
508 self.assertEqual(response.status_code, requests.codes.created)
510 def test_15_omsAttributes_ROADMC_ROADMA(self):
511 # Config ROADMC01-ROADMA oms-attributes
512 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
513 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
514 "OMS-attributes/span"
515 .format(self.restconf_baseurl))
517 "auto-spanloss": "true",
518 "engineered-spanloss": 12.2,
519 "link-concatenation": [{
522 "SRLG-length": 100000,
524 headers = {'content-type': 'application/json'}
525 response = requests.request(
526 "PUT", url, data=json.dumps(data), headers=headers,
527 auth=('admin', 'admin'))
528 self.assertEqual(response.status_code, requests.codes.created)
530 def test_16_getClliNetwork(self):
531 url = ("{}/config/ietf-network:networks/network/clli-network"
532 .format(self.restconf_baseurl))
533 headers = {'content-type': 'application/json'}
534 response = requests.request(
535 "GET", url, headers=headers, auth=('admin', 'admin'))
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 nbNode = len(res['network'][0]['node'])
539 listNode = ['NodeA', 'NodeC']
540 for i in range(0, nbNode):
541 nodeId = res['network'][0]['node'][i]['node-id']
542 find = nodeId in listNode
543 self.assertEqual(find, True)
544 if(nodeId == 'NodeA'):
545 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
547 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
548 listNode.remove(nodeId)
550 self.assertEqual(len(listNode), 0)
552 def test_17_getOpenRoadmNetwork(self):
553 url = ("{}/config/ietf-network:networks/network/openroadm-network"
554 .format(self.restconf_baseurl))
555 headers = {'content-type': 'application/json'}
556 response = requests.request(
557 "GET", url, headers=headers, auth=('admin', 'admin'))
558 self.assertEqual(response.status_code, requests.codes.ok)
559 res = response.json()
560 nbNode = len(res['network'][0]['node'])
561 self.assertEqual(nbNode, 3)
562 listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
563 for i in range(0, nbNode):
564 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
565 nodeId = res['network'][0]['node'][i]['node-id']
566 if(nodeId == 'XPDRA01'):
567 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
568 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
569 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
570 listNode.remove(nodeId)
571 elif(nodeId == 'ROADMA01'):
572 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
573 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
574 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
575 listNode.remove(nodeId)
576 elif(nodeId == 'ROADMC01'):
577 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
578 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
579 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
580 listNode.remove(nodeId)
582 self.assertFalse(True)
583 self.assertEqual(len(listNode), 0)
585 def test_18_getROADMLinkOpenRoadmTopology(self):
586 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
587 .format(self.restconf_baseurl))
588 headers = {'content-type': 'application/json'}
589 response = requests.request(
590 "GET", url, headers=headers, auth=('admin', 'admin'))
591 self.assertEqual(response.status_code, requests.codes.ok)
592 res = response.json()
593 # Tests related to links
594 nbLink = len(res['network'][0]['ietf-network-topology:link'])
595 self.assertEqual(nbLink, 20)
596 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX', 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
597 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX', 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX']
598 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
599 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
600 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX', 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX']
601 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
602 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
603 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX', 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX']
604 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
605 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
606 XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
607 XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
608 for i in range(0, nbLink):
609 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
610 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
611 if(nodeType == 'EXPRESS-LINK'):
612 find = linkId in expressLink
613 self.assertEqual(find, True)
614 expressLink.remove(linkId)
615 elif(nodeType == 'ADD-LINK'):
616 find = linkId in addLink
617 self.assertEqual(find, True)
618 addLink.remove(linkId)
619 elif(nodeType == 'DROP-LINK'):
620 find = linkId in dropLink
621 self.assertEqual(find, True)
622 dropLink.remove(linkId)
623 elif(nodeType == 'ROADM-TO-ROADM'):
624 find = linkId in R2RLink
625 self.assertEqual(find, True)
626 R2RLink.remove(linkId)
627 elif(nodeType == 'XPONDER-INPUT'):
628 find = linkId in XPDR_IN
629 self.assertEqual(find, True)
630 XPDR_IN.remove(linkId)
631 elif(nodeType == 'XPONDER-OUTPUT'):
632 find = linkId in XPDR_OUT
633 self.assertEqual(find, True)
634 XPDR_OUT.remove(linkId)
636 self.assertFalse(True)
637 self.assertEqual(len(expressLink), 0)
638 self.assertEqual(len(addLink), 0)
639 self.assertEqual(len(dropLink), 0)
640 self.assertEqual(len(R2RLink), 0)
641 self.assertEqual(len(XPDR_IN), 0)
642 self.assertEqual(len(XPDR_OUT), 0)
644 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
645 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
646 .format(self.restconf_baseurl))
647 headers = {'content-type': 'application/json'}
648 response = requests.request(
649 "GET", url, headers=headers, auth=('admin', 'admin'))
650 self.assertEqual(response.status_code, requests.codes.ok)
651 res = response.json()
652 # Tests related to links
653 nbLink = len(res['network'][0]['ietf-network-topology:link'])
654 self.assertEqual(nbLink, 20)
655 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
656 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
657 for i in range(0, nbLink):
658 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
659 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
660 if(link_id in R2RLink):
662 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
663 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
664 length = res['network'][0]['ietf-network-topology:link'][i][
665 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
666 if((spanLoss != None) & (length != None)):
668 self.assertTrue(find)
669 R2RLink.remove(link_id)
670 self.assertEqual(len(R2RLink), 0)
672 def test_20_getNodes_OpenRoadmTopology(self):
673 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
674 .format(self.restconf_baseurl))
675 headers = {'content-type': 'application/json'}
676 response = requests.request(
677 "GET", url, headers=headers, auth=('admin', 'admin'))
678 res = response.json()
679 # Tests related to nodes
680 self.assertEqual(response.status_code, requests.codes.ok)
681 nbNode = len(res['network'][0]['node'])
682 self.assertEqual(nbNode, 8)
683 listNode = ['XPDRA01-XPDR1',
684 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
685 'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
686 # ************************Tests related to XPDRA nodes
687 for i in range(0, nbNode):
688 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
689 nodeId = res['network'][0]['node'][i]['node-id']
690 if(nodeId == 'XPDRA01-XPDR1'):
691 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
692 res['network'][0]['node'][i]['supporting-node'])
693 self.assertEqual(nodeType, 'XPONDER')
694 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
695 self.assertTrue(nbTps == 6)
698 for j in range(0, nbTps):
699 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
700 if (tpType == 'XPONDER-CLIENT'):
702 elif (tpType == 'XPONDER-NETWORK'):
704 self.assertTrue(client == 4)
705 self.assertTrue(network == 2)
706 listNode.remove(nodeId)
707 elif(nodeId == 'ROADMA01-SRG1'):
708 # Test related to SRG1
709 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
710 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
711 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
712 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
713 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
714 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
715 res['network'][0]['node'][i]['supporting-node'])
716 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
717 listNode.remove(nodeId)
718 elif(nodeId == 'ROADMA01-SRG3'):
719 # Test related to SRG1
720 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
721 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
722 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
723 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
724 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
725 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
726 res['network'][0]['node'][i]['supporting-node'])
727 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
728 listNode.remove(nodeId)
729 elif(nodeId == 'ROADMA01-DEG1'):
730 # Test related to DEG1
731 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
732 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
733 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
734 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
735 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
736 res['network'][0]['node'][i]['supporting-node'])
737 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
738 listNode.remove(nodeId)
739 elif(nodeId == 'ROADMA01-DEG2'):
740 # Test related to DEG2
741 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
742 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
743 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
744 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
745 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
746 res['network'][0]['node'][i]['supporting-node'])
747 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
748 listNode.remove(nodeId)
749 elif(nodeId == 'ROADMC01-SRG1'):
750 # Test related to SRG1
751 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
752 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
753 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
754 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
755 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
756 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
757 res['network'][0]['node'][i]['supporting-node'])
758 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
759 listNode.remove(nodeId)
760 elif(nodeId == 'ROADMC01-DEG1'):
761 # Test related to DEG1
762 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
763 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
764 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
765 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
766 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
767 res['network'][0]['node'][i]['supporting-node'])
768 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
769 listNode.remove(nodeId)
770 elif(nodeId == 'ROADMC01-DEG2'):
771 # Test related to DEG2
772 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
773 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
774 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
775 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
776 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
777 res['network'][0]['node'][i]['supporting-node'])
778 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
779 listNode.remove(nodeId)
781 self.assertFalse(True)
782 self.assertEqual(len(listNode), 0)
784 def test_21_connect_ROADMB(self):
785 url = ("{}/config/network-topology:"
786 "network-topology/topology/topology-netconf/node/ROADMB01"
787 .format(self.restconf_baseurl))
789 "node-id": "ROADMB01",
790 "netconf-node-topology:username": "admin",
791 "netconf-node-topology:password": "admin",
792 "netconf-node-topology:host": "127.0.0.1",
793 "netconf-node-topology:port": test_utils.sims['roadmb']['port'],
794 "netconf-node-topology:tcp-only": "false",
795 "netconf-node-topology:pass-through": {}}]}
796 headers = {'content-type': 'application/json'}
797 response = requests.request(
798 "PUT", url, data=json.dumps(data), headers=headers,
799 auth=('admin', 'admin'))
800 self.assertEqual(response.status_code, requests.codes.created)
803 def test_22_omsAttributes_ROADMA_ROADMB(self):
804 # Config ROADMA01-ROADMB01 oms-attributes
805 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
806 "link/ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
807 "OMS-attributes/span"
808 .format(self.restconf_baseurl))
810 "auto-spanloss": "true",
811 "engineered-spanloss": 12.2,
812 "link-concatenation": [{
815 "SRLG-length": 100000,
817 headers = {'content-type': 'application/json'}
818 response = requests.request(
819 "PUT", url, data=json.dumps(data), headers=headers,
820 auth=('admin', 'admin'))
821 self.assertEqual(response.status_code, requests.codes.created)
823 def test_23_omsAttributes_ROADMB_ROADMA(self):
824 # Config ROADMB01-ROADMA01 oms-attributes
825 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
826 "link/ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
827 "OMS-attributes/span"
828 .format(self.restconf_baseurl))
830 "auto-spanloss": "true",
831 "engineered-spanloss": 12.2,
832 "link-concatenation": [{
835 "SRLG-length": 100000,
837 headers = {'content-type': 'application/json'}
838 response = requests.request(
839 "PUT", url, data=json.dumps(data), headers=headers,
840 auth=('admin', 'admin'))
841 self.assertEqual(response.status_code, requests.codes.created)
843 def test_24_omsAttributes_ROADMB_ROADMC(self):
844 # Config ROADMB01-ROADMC01 oms-attributes
845 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
846 "link/ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
847 "OMS-attributes/span"
848 .format(self.restconf_baseurl))
850 "auto-spanloss": "true",
851 "engineered-spanloss": 12.2,
852 "link-concatenation": [{
855 "SRLG-length": 100000,
857 headers = {'content-type': 'application/json'}
858 response = requests.request(
859 "PUT", url, data=json.dumps(data), headers=headers,
860 auth=('admin', 'admin'))
861 self.assertEqual(response.status_code, requests.codes.created)
863 def test_25_omsAttributes_ROADMC_ROADMB(self):
864 # Config ROADMC01-ROADMB01 oms-attributes
865 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
866 "link/ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
867 "OMS-attributes/span"
868 .format(self.restconf_baseurl))
870 "auto-spanloss": "true",
871 "engineered-spanloss": 12.2,
872 "link-concatenation": [{
875 "SRLG-length": 100000,
877 headers = {'content-type': 'application/json'}
878 response = requests.request(
879 "PUT", url, data=json.dumps(data), headers=headers,
880 auth=('admin', 'admin'))
881 self.assertEqual(response.status_code, requests.codes.created)
883 def test_26_getClliNetwork(self):
884 url = ("{}/config/ietf-network:networks/network/clli-network"
885 .format(self.restconf_baseurl))
886 headers = {'content-type': 'application/json'}
887 response = requests.request(
888 "GET", url, headers=headers, auth=('admin', 'admin'))
889 self.assertEqual(response.status_code, requests.codes.ok)
890 res = response.json()
891 nbNode = len(res['network'][0]['node'])
892 listNode = ['NodeA', 'NodeB', 'NodeC']
893 for i in range(0, nbNode):
894 nodeId = res['network'][0]['node'][i]['node-id']
895 find = nodeId in listNode
896 self.assertEqual(find, True)
897 if(nodeId == 'NodeA'):
898 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
899 elif(nodeId == 'NodeB'):
900 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
902 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
903 listNode.remove(nodeId)
904 self.assertEqual(len(listNode), 0)
906 def test_27_verifyDegree(self):
907 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
908 .format(self.restconf_baseurl))
909 headers = {'content-type': 'application/json'}
910 response = requests.request(
911 "GET", url, headers=headers, auth=('admin', 'admin'))
912 self.assertEqual(response.status_code, requests.codes.ok)
913 res = response.json()
914 # Tests related to links
915 nbLink = len(res['network'][0]['ietf-network-topology:link'])
916 listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX', 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
917 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX', 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
918 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX', 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
919 for i in range(0, nbLink):
920 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
921 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
922 find = link_id in listR2RLink
923 self.assertEqual(find, True)
924 listR2RLink.remove(link_id)
925 self.assertEqual(len(listR2RLink), 0)
927 def test_28_verifyOppositeLinkTopology(self):
928 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
929 .format(self.restconf_baseurl))
930 headers = {'content-type': 'application/json'}
931 response = requests.request(
932 "GET", url, headers=headers, auth=('admin', 'admin'))
933 self.assertEqual(response.status_code, requests.codes.ok)
934 res = response.json()
935 # Tests related to links
936 nbLink = len(res['network'][0]['ietf-network-topology:link'])
937 self.assertEqual(nbLink, 34)
938 for i in range(0, nbLink):
939 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
940 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
941 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
942 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
943 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
944 # Find the opposite link
945 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
946 url = (url_oppLink.format(self.restconf_baseurl))
947 headers = {'content-type': 'application/json'}
948 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
949 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
950 res_oppLink = response_oppLink.json()
951 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
952 ['org-openroadm-common-network:opposite-link'], link_id)
953 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
954 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
955 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
956 if link_type == 'ADD-LINK':
957 self.assertEqual(oppLink_type, 'DROP-LINK')
958 elif link_type == 'DROP-LINK':
959 self.assertEqual(oppLink_type, 'ADD-LINK')
960 elif link_type == 'EXPRESS-LINK':
961 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
962 elif link_type == 'ROADM-TO-ROADM':
963 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
964 elif link_type == 'XPONDER-INPUT':
965 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
966 elif link_type == 'XPONDER-OUTPUT':
967 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
969 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
970 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
971 .format(self.restconf_baseurl))
972 headers = {'content-type': 'application/json'}
973 response = requests.request(
974 "GET", url, headers=headers, auth=('admin', 'admin'))
975 self.assertEqual(response.status_code, requests.codes.ok)
976 res = response.json()
977 nbLink = len(res['network'][0]['ietf-network-topology:link'])
978 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
979 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
980 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
981 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
982 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
983 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
984 for i in range(0, nbLink):
985 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
986 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
987 if(link_id in R2RLink):
989 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
990 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
991 length = res['network'][0]['ietf-network-topology:link'][i][
992 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
993 if((spanLoss != None) & (length != None)):
995 self.assertTrue(find)
996 R2RLink.remove(link_id)
997 self.assertEqual(len(R2RLink), 0)
999 def test_30_disconnect_ROADMB(self):
1000 # Delete in the topology-netconf
1001 url = ("{}/config/network-topology:"
1002 "network-topology/topology/topology-netconf/node/ROADMB01"
1003 .format(self.restconf_baseurl))
1005 headers = {'content-type': 'application/json'}
1006 response = requests.request(
1007 "DELETE", url, data=json.dumps(data), headers=headers,
1008 auth=('admin', 'admin'))
1009 self.assertEqual(response.status_code, requests.codes.ok)
1010 # Delete in the clli-network
1011 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1012 .format(self.restconf_baseurl))
1014 headers = {'content-type': 'application/json'}
1015 response = requests.request(
1016 "DELETE", url, data=json.dumps(data), headers=headers,
1017 auth=('admin', 'admin'))
1018 self.assertEqual(response.status_code, requests.codes.ok)
1020 def test_31_disconnect_ROADMC(self):
1021 # Delete in the topology-netconf
1022 url = ("{}/config/network-topology:"
1023 "network-topology/topology/topology-netconf/node/ROADMC01"
1024 .format(self.restconf_baseurl))
1026 headers = {'content-type': 'application/json'}
1027 response = requests.request(
1028 "DELETE", url, data=json.dumps(data), headers=headers,
1029 auth=('admin', 'admin'))
1030 self.assertEqual(response.status_code, requests.codes.ok)
1031 # Delete in the clli-network
1032 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1033 .format(self.restconf_baseurl))
1035 headers = {'content-type': 'application/json'}
1036 response = requests.request(
1037 "DELETE", url, data=json.dumps(data), headers=headers,
1038 auth=('admin', 'admin'))
1039 self.assertEqual(response.status_code, requests.codes.ok)
1041 def test_32_getNodes_OpenRoadmTopology(self):
1042 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1043 .format(self.restconf_baseurl))
1044 headers = {'content-type': 'application/json'}
1045 response = requests.request(
1046 "GET", url, headers=headers, auth=('admin', 'admin'))
1047 res = response.json()
1048 # Tests related to nodes
1049 self.assertEqual(response.status_code, requests.codes.ok)
1050 nbNode = len(res['network'][0]['node'])
1051 self.assertEqual(nbNode, 5)
1052 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1053 for i in range(0, nbNode):
1054 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1055 nodeId = res['network'][0]['node'][i]['node-id']
1056 # Tests related to XPDRA nodes
1057 if(nodeId == 'XPDRA01-XPDR1'):
1058 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1059 for j in range(0, nbTp):
1060 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1061 if (tpid == 'XPDR1-CLIENT1'):
1062 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1063 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1064 if (tpid == 'XPDR1-NETWORK1'):
1065 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1066 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1067 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1068 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1069 'ROADMA01-SRG1--SRG1-PP1-TXRX')
1070 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
1071 res['network'][0]['node'][i]['supporting-node'])
1072 listNode.remove(nodeId)
1073 elif(nodeId == 'ROADMA01-SRG1'):
1074 # Test related to SRG1
1075 self.assertEqual(nodeType, 'SRG')
1076 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1077 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1078 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1079 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1080 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1081 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1082 res['network'][0]['node'][i]['supporting-node'])
1083 listNode.remove(nodeId)
1084 elif(nodeId == 'ROADMA01-SRG3'):
1085 # Test related to SRG1
1086 self.assertEqual(nodeType, 'SRG')
1087 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1088 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1089 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1090 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1091 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1092 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1093 res['network'][0]['node'][i]['supporting-node'])
1094 listNode.remove(nodeId)
1095 elif(nodeId == 'ROADMA01-DEG1'):
1096 # Test related to DEG1
1097 self.assertEqual(nodeType, 'DEGREE')
1098 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1099 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1100 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1101 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1102 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1103 res['network'][0]['node'][i]['supporting-node'])
1104 listNode.remove(nodeId)
1105 elif(nodeId == 'ROADMA01-DEG2'):
1106 # Test related to DEG2
1107 self.assertEqual(nodeType, 'DEGREE')
1108 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1109 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1110 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1111 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1112 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1113 res['network'][0]['node'][i]['supporting-node'])
1114 listNode.remove(nodeId)
1116 self.assertFalse(True)
1117 self.assertEqual(len(listNode), 0)
1118 # Test related to SRG1 of ROADMC
1119 for i in range(0, nbNode):
1120 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-SRG1')
1121 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG1')
1122 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG2')
1124 def test_33_getOpenRoadmNetwork(self):
1125 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1126 .format(self.restconf_baseurl))
1127 headers = {'content-type': 'application/json'}
1128 response = requests.request(
1129 "GET", url, headers=headers, auth=('admin', 'admin'))
1130 self.assertEqual(response.status_code, requests.codes.ok)
1131 res = response.json()
1132 nbNode = len(res['network'][0]['node'])
1133 self.assertEqual(nbNode, 2)
1134 for i in range(0, nbNode-1):
1135 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01')
1136 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMB01')
1138 def test_34_getClliNetwork(self):
1139 url = ("{}/config/ietf-network:networks/network/clli-network"
1140 .format(self.restconf_baseurl))
1141 headers = {'content-type': 'application/json'}
1142 response = requests.request(
1143 "GET", url, headers=headers, auth=('admin', 'admin'))
1144 self.assertEqual(response.status_code, requests.codes.ok)
1145 res = response.json()
1146 nbNode = len(res['network'][0]['node'])
1147 self.assertEqual(nbNode, 1)
1148 for i in range(0, nbNode-1):
1149 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1151 def test_35_disconnect_XPDRA(self):
1152 url = ("{}/config/network-topology:"
1153 "network-topology/topology/topology-netconf/node/XPDRA01"
1154 .format(self.restconf_baseurl))
1156 headers = {'content-type': 'application/json'}
1157 response = requests.request(
1158 "DELETE", url, data=json.dumps(data), headers=headers,
1159 auth=('admin', 'admin'))
1160 self.assertEqual(response.status_code, requests.codes.ok)
1162 def test_36_getClliNetwork(self):
1163 url = ("{}/config/ietf-network:networks/network/clli-network"
1164 .format(self.restconf_baseurl))
1165 headers = {'content-type': 'application/json'}
1166 response = requests.request(
1167 "GET", url, headers=headers, auth=('admin', 'admin'))
1168 self.assertEqual(response.status_code, requests.codes.ok)
1169 res = response.json()
1170 nbNode = len(res['network'][0]['node'])
1171 self.assertEqual(nbNode, 1)
1172 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1174 def test_37_getOpenRoadmNetwork(self):
1175 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1176 .format(self.restconf_baseurl))
1177 headers = {'content-type': 'application/json'}
1178 response = requests.request(
1179 "GET", url, headers=headers, auth=('admin', 'admin'))
1180 self.assertEqual(response.status_code, requests.codes.ok)
1181 res = response.json()
1182 nbNode = len(res['network'][0]['node'])
1183 self.assertEqual(nbNode, 1)
1184 for i in range(0, nbNode):
1185 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDRA01')
1187 def test_38_getNodes_OpenRoadmTopology(self):
1188 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1189 .format(self.restconf_baseurl))
1190 headers = {'content-type': 'application/json'}
1191 response = requests.request(
1192 "GET", url, headers=headers, auth=('admin', 'admin'))
1193 res = response.json()
1194 # Tests related to nodes
1195 self.assertEqual(response.status_code, requests.codes.ok)
1196 nbNode = len(res['network'][0]['node'])
1197 self.assertEqual(nbNode, 4)
1198 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1199 for i in range(0, nbNode):
1200 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1201 res['network'][0]['node'][i]['supporting-node'])
1202 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1203 nodeId = res['network'][0]['node'][i]['node-id']
1204 if(nodeId == 'ROADMA01-SRG1'):
1205 # Test related to SRG1
1206 self.assertEqual(nodeType, 'SRG')
1207 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1208 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1209 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1210 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1211 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1212 listNode.remove(nodeId)
1213 elif(nodeId == 'ROADMA01-SRG3'):
1214 # Test related to SRG1
1215 self.assertEqual(nodeType, 'SRG')
1216 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1217 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1218 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1219 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1220 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1221 listNode.remove(nodeId)
1222 elif(nodeId == 'ROADMA01-DEG1'):
1223 # Test related to DEG1
1224 self.assertEqual(nodeType, 'DEGREE')
1225 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1226 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1227 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1228 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1229 listNode.remove(nodeId)
1230 elif(nodeId == 'ROADMA01-DEG2'):
1231 # Test related to DEG2
1232 self.assertEqual(nodeType, 'DEGREE')
1233 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1234 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1235 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1236 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1237 listNode.remove(nodeId)
1239 self.assertFalse(True)
1240 self.assertEqual(len(listNode), 0)
1242 def test_39_disconnect_ROADM_XPDRA_link(self):
1244 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1245 "link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
1246 .format(self.restconf_baseurl))
1248 headers = {'content-type': 'application/json'}
1249 response = requests.request(
1250 "DELETE", url, data=json.dumps(data), headers=headers,
1251 auth=('admin', 'admin'))
1252 self.assertEqual(response.status_code, requests.codes.ok)
1254 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1255 "link/ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1"
1256 .format(self.restconf_baseurl))
1258 headers = {'content-type': 'application/json'}
1259 response = requests.request(
1260 "DELETE", url, data=json.dumps(data), headers=headers,
1261 auth=('admin', 'admin'))
1262 self.assertEqual(response.status_code, requests.codes.ok)
1264 def test_40_getLinks_OpenRoadmTopology(self):
1265 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1266 .format(self.restconf_baseurl))
1267 headers = {'content-type': 'application/json'}
1268 response = requests.request(
1269 "GET", url, headers=headers, auth=('admin', 'admin'))
1270 self.assertEqual(response.status_code, requests.codes.ok)
1271 res = response.json()
1272 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1273 self.assertEqual(nbLink, 16)
1274 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1275 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
1276 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1277 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
1278 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
1279 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
1280 roadmtoroadmLink = 0
1281 for i in range(0, nbLink):
1282 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1283 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1284 find = link_id in expressLink
1285 self.assertEqual(find, True)
1286 expressLink.remove(link_id)
1287 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1288 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1289 find = link_id in addLink
1290 self.assertEqual(find, True)
1291 addLink.remove(link_id)
1292 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1293 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1294 find = link_id in dropLink
1295 self.assertEqual(find, True)
1296 dropLink.remove(link_id)
1298 roadmtoroadmLink += 1
1299 self.assertEqual(len(expressLink), 0)
1300 self.assertEqual(len(addLink), 0)
1301 self.assertEqual(len(dropLink), 0)
1302 self.assertEqual(roadmtoroadmLink, 6)
1303 for i in range(0, nbLink):
1304 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1305 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1306 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1307 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1309 def test_41_disconnect_ROADMA(self):
1310 url = ("{}/config/network-topology:"
1311 "network-topology/topology/topology-netconf/node/ROADMA01"
1312 .format(self.restconf_baseurl))
1314 headers = {'content-type': 'application/json'}
1315 response = requests.request(
1316 "DELETE", url, data=json.dumps(data), headers=headers,
1317 auth=('admin', 'admin'))
1318 self.assertEqual(response.status_code, requests.codes.ok)
1319 # Delete in the clli-network
1320 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1321 .format(self.restconf_baseurl))
1323 headers = {'content-type': 'application/json'}
1324 response = requests.request(
1325 "DELETE", url, data=json.dumps(data), headers=headers,
1326 auth=('admin', 'admin'))
1327 self.assertEqual(response.status_code, requests.codes.ok)
1329 def test_42_getClliNetwork(self):
1330 url = ("{}/config/ietf-network:networks/network/clli-network"
1331 .format(self.restconf_baseurl))
1332 headers = {'content-type': 'application/json'}
1333 response = requests.request(
1334 "GET", url, headers=headers, auth=('admin', 'admin'))
1335 self.assertEqual(response.status_code, requests.codes.ok)
1336 res = response.json()
1337 self.assertNotIn('node', res['network'][0])
1339 def test_43_getOpenRoadmNetwork(self):
1340 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1341 .format(self.restconf_baseurl))
1342 headers = {'content-type': 'application/json'}
1343 response = requests.request(
1344 "GET", url, headers=headers, auth=('admin', 'admin'))
1345 self.assertEqual(response.status_code, requests.codes.ok)
1346 res = response.json()
1347 self.assertNotIn('node', res['network'][0])
1349 def test_44_check_roadm2roadm_link_persistence(self):
1350 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1351 .format(self.restconf_baseurl))
1352 headers = {'content-type': 'application/json'}
1353 response = requests.request(
1354 "GET", url, headers=headers, auth=('admin', 'admin'))
1355 self.assertEqual(response.status_code, requests.codes.ok)
1356 res = response.json()
1357 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1358 self.assertNotIn('node', res['network'][0])
1359 self.assertEqual(nbLink, 6)
1362 if __name__ == "__main__":
1363 unittest.main(verbosity=2)