3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
32 restconf_baseurl = "http://localhost:8181/restconf"
34 # START_IGNORE_XTESTING
38 cls.sim_process1 = test_utils.start_sim('xpdra')
40 cls.sim_process2 = test_utils.start_sim('roadma')
42 cls.sim_process3 = test_utils.start_sim('roadmb')
44 cls.sim_process4 = test_utils.start_sim('roadmc')
45 print("all sims started")
47 print("starting opendaylight...")
48 cls.odl_process = test_utils.start_tpce()
50 print("opendaylight started")
53 def tearDownClass(cls):
54 for child in psutil.Process(cls.odl_process.pid).children():
55 child.send_signal(signal.SIGINT)
57 cls.odl_process.send_signal(signal.SIGINT)
58 cls.odl_process.wait()
59 for child in psutil.Process(cls.sim_process1.pid).children():
60 child.send_signal(signal.SIGINT)
62 cls.sim_process1.send_signal(signal.SIGINT)
63 cls.sim_process1.wait()
64 for child in psutil.Process(cls.sim_process2.pid).children():
65 child.send_signal(signal.SIGINT)
67 cls.sim_process2.send_signal(signal.SIGINT)
68 cls.sim_process2.wait()
69 for child in psutil.Process(cls.sim_process3.pid).children():
70 child.send_signal(signal.SIGINT)
72 cls.sim_process3.send_signal(signal.SIGINT)
73 cls.sim_process3.wait()
74 for child in psutil.Process(cls.sim_process4.pid).children():
75 child.send_signal(signal.SIGINT)
77 cls.sim_process4.send_signal(signal.SIGINT)
78 cls.sim_process4.wait()
85 def test_01_connect_ROADM_A1(self):
87 url = ("{}/config/network-topology:"
88 "network-topology/topology/topology-netconf/node/ROADM-A1"
89 .format(self.restconf_baseurl))
91 "node-id": "ROADM-A1",
92 "netconf-node-topology:username": "admin",
93 "netconf-node-topology:password": "admin",
94 "netconf-node-topology:host": "127.0.0.1",
95 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
96 "netconf-node-topology:tcp-only": "false",
97 "netconf-node-topology:pass-through": {}}]}
98 headers = {'content-type': 'application/json'}
99 response = requests.request(
100 "PUT", url, data=json.dumps(data), headers=headers,
101 auth=('admin', 'admin'))
102 self.assertEqual(response.status_code, requests.codes.created)
105 def test_02_getClliNetwork(self):
106 url = ("{}/config/ietf-network:networks/network/clli-network"
107 .format(self.restconf_baseurl))
108 headers = {'content-type': 'application/json'}
109 response = requests.request(
110 "GET", url, headers=headers, auth=('admin', 'admin'))
111 self.assertEqual(response.status_code, requests.codes.ok)
112 res = response.json()
114 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
115 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
117 def test_03_getOpenRoadmNetwork(self):
118 url = ("{}/config/ietf-network:networks/network/openroadm-network"
119 .format(self.restconf_baseurl))
120 headers = {'content-type': 'application/json'}
121 response = requests.request(
122 "GET", url, headers=headers, auth=('admin', 'admin'))
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
125 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADM-A1')
126 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
127 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
128 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
129 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
131 def test_04_getLinks_OpenroadmTopology(self):
132 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
133 .format(self.restconf_baseurl))
134 headers = {'content-type': 'application/json'}
135 response = requests.request(
136 "GET", url, headers=headers, auth=('admin', 'admin'))
137 self.assertEqual(response.status_code, requests.codes.ok)
138 res = response.json()
139 # Tests related to links
140 nbLink = len(res['network'][0]['ietf-network-topology:link'])
141 self.assertEqual(nbLink, 10)
142 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
143 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
144 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
145 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
146 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
147 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
148 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
149 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
150 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
151 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
152 for i in range(0, nbLink):
153 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
154 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
155 find = linkId in expressLink
156 self.assertEqual(find, True)
157 expressLink.remove(linkId)
158 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
159 find = linkId in addLink
160 self.assertEqual(find, True)
161 addLink.remove(linkId)
162 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
163 find = linkId in dropLink
164 self.assertEqual(find, True)
165 dropLink.remove(linkId)
167 self.assertFalse(True)
168 self.assertEqual(len(expressLink), 0)
169 self.assertEqual(len(addLink), 0)
170 self.assertEqual(len(dropLink), 0)
172 def test_05_getNodes_OpenRoadmTopology(self):
173 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
174 .format(self.restconf_baseurl))
175 headers = {'content-type': 'application/json'}
176 response = requests.request(
177 "GET", url, headers=headers, auth=('admin', 'admin'))
178 res = response.json()
179 # Tests related to nodes
180 self.assertEqual(response.status_code, requests.codes.ok)
181 nbNode = len(res['network'][0]['node'])
182 self.assertEqual(nbNode, 4)
183 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
184 for i in range(0, nbNode):
185 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
186 res['network'][0]['node'][i]['supporting-node'])
187 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
188 nodeId = res['network'][0]['node'][i]['node-id']
189 if(nodeId == 'ROADM-A1-SRG1'):
190 # Test related to SRG1
191 self.assertEqual(nodeType, 'SRG')
192 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
193 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
194 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
195 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
196 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
197 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
198 res['network'][0]['node'][i]['supporting-node'])
199 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
200 res['network'][0]['node'][i]['supporting-node'])
201 listNode.remove(nodeId)
202 elif(nodeId == 'ROADM-A1-SRG3'):
203 # Test related to SRG1
204 self.assertEqual(nodeType, 'SRG')
205 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
206 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
207 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
208 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
209 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
210 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
211 res['network'][0]['node'][i]['supporting-node'])
212 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
213 res['network'][0]['node'][i]['supporting-node'])
214 listNode.remove(nodeId)
215 elif(nodeId == 'ROADM-A1-DEG1'):
216 # Test related to DEG1
217 self.assertEqual(nodeType, 'DEGREE')
218 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
219 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
220 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
221 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
222 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
223 res['network'][0]['node'][i]['supporting-node'])
224 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
225 res['network'][0]['node'][i]['supporting-node'])
226 listNode.remove(nodeId)
227 elif(nodeId == 'ROADM-A1-DEG2'):
228 # Test related to DEG2
229 self.assertEqual(nodeType, 'DEGREE')
230 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
231 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
232 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
233 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
234 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
235 res['network'][0]['node'][i]['supporting-node'])
236 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
237 res['network'][0]['node'][i]['supporting-node'])
238 listNode.remove(nodeId)
240 self.assertFalse(True)
241 self.assertEqual(len(listNode), 0)
243 def test_06_connect_XPDRA(self):
244 url = ("{}/config/network-topology:"
245 "network-topology/topology/topology-netconf/node/XPDR-A1"
246 .format(self.restconf_baseurl))
248 "node-id": "XPDR-A1",
249 "netconf-node-topology:username": "admin",
250 "netconf-node-topology:password": "admin",
251 "netconf-node-topology:host": "127.0.0.1",
252 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
253 "netconf-node-topology:tcp-only": "false",
254 "netconf-node-topology:pass-through": {}}]}
255 headers = {'content-type': 'application/json'}
256 response = requests.request(
257 "PUT", url, data=json.dumps(data), headers=headers,
258 auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.created)
262 def test_07_getClliNetwork(self):
263 url = ("{}/config/ietf-network:networks/network/clli-network"
264 .format(self.restconf_baseurl))
265 headers = {'content-type': 'application/json'}
266 response = requests.request(
267 "GET", url, headers=headers, auth=('admin', 'admin'))
268 self.assertEqual(response.status_code, requests.codes.ok)
269 res = response.json()
270 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
271 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
273 def test_08_getOpenRoadmNetwork(self):
274 url = ("{}/config/ietf-network:networks/network/openroadm-network"
275 .format(self.restconf_baseurl))
276 headers = {'content-type': 'application/json'}
277 response = requests.request(
278 "GET", url, headers=headers, auth=('admin', 'admin'))
279 self.assertEqual(response.status_code, requests.codes.ok)
280 res = response.json()
281 nbNode = len(res['network'][0]['node'])
282 self.assertEqual(nbNode, 2)
283 for i in range(0, nbNode):
284 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
285 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
286 nodeId = res['network'][0]['node'][i]['node-id']
287 if(nodeId == 'XPDR-A1'):
288 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
289 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
290 elif(nodeId == 'ROADM-A1'):
291 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
292 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
294 self.assertFalse(True)
296 def test_09_getNodes_OpenRoadmTopology(self):
297 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
298 .format(self.restconf_baseurl))
299 headers = {'content-type': 'application/json'}
300 response = requests.request(
301 "GET", url, headers=headers, auth=('admin', 'admin'))
302 res = response.json()
303 # Tests related to nodes
304 self.assertEqual(response.status_code, requests.codes.ok)
305 nbNode = len(res['network'][0]['node'])
306 self.assertEqual(nbNode, 5)
307 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
308 for i in range(0, nbNode):
309 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
310 nodeId = res['network'][0]['node'][i]['node-id']
311 # Tests related to XPDRA nodes
312 if(nodeId == 'XPDR-A1-XPDR1'):
313 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
314 res['network'][0]['node'][i]['supporting-node'])
315 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
316 res['network'][0]['node'][i]['supporting-node'])
317 self.assertEqual(nodeType, 'XPONDER')
318 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
321 for j in range(0, nbTps):
322 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
323 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
324 if (tpType == 'XPONDER-CLIENT'):
326 elif (tpType == 'XPONDER-NETWORK'):
328 if (tpId == 'XPDR1-NETWORK2'):
329 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
330 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT2')
331 if (tpId == 'XPDR1-CLIENT2'):
332 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
333 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
335 self.assertTrue(client == 2)
336 self.assertTrue(network == 2)
337 listNode.remove(nodeId)
338 elif(nodeId == 'ROADM-A1-SRG1'):
339 # Test related to SRG1
340 self.assertEqual(nodeType, 'SRG')
341 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
342 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
343 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
344 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
345 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
346 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
347 res['network'][0]['node'][i]['supporting-node'])
348 listNode.remove(nodeId)
349 elif(nodeId == 'ROADM-A1-SRG3'):
350 # Test related to SRG1
351 self.assertEqual(nodeType, 'SRG')
352 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
353 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
354 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
355 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
356 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
357 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
358 res['network'][0]['node'][i]['supporting-node'])
359 listNode.remove(nodeId)
360 elif(nodeId == 'ROADM-A1-DEG1'):
361 # Test related to DEG1
362 self.assertEqual(nodeType, 'DEGREE')
363 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
366 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
367 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
368 res['network'][0]['node'][i]['supporting-node'])
369 listNode.remove(nodeId)
370 elif(nodeId == 'ROADM-A1-DEG2'):
371 # Test related to DEG2
372 self.assertEqual(nodeType, 'DEGREE')
373 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
376 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
377 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
378 res['network'][0]['node'][i]['supporting-node'])
379 listNode.remove(nodeId)
381 self.assertFalse(True)
382 self.assertEqual(len(listNode), 0)
384 # Connect the tail XPDRA to ROADMA and vice versa
385 def test_10_connect_tail_xpdr_rdm(self):
386 # Connect the tail: XPDRA to ROADMA
387 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
388 .format(self.restconf_baseurl))
389 data = {"networkutils:input": {
390 "networkutils:links-input": {
391 "networkutils:xpdr-node": "XPDR-A1",
392 "networkutils:xpdr-num": "1",
393 "networkutils:network-num": "1",
394 "networkutils:rdm-node": "ROADM-A1",
395 "networkutils:srg-num": "1",
396 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
400 headers = {'content-type': 'application/json'}
401 response = requests.request(
402 "POST", url, data=json.dumps(data), headers=headers,
403 auth=('admin', 'admin'))
404 self.assertEqual(response.status_code, requests.codes.ok)
406 def test_11_connect_tail_rdm_xpdr(self):
407 # Connect the tail: ROADMA to XPDRA
408 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
409 .format(self.restconf_baseurl))
410 data = {"networkutils:input": {
411 "networkutils:links-input": {
412 "networkutils:xpdr-node": "XPDR-A1",
413 "networkutils:xpdr-num": "1",
414 "networkutils:network-num": "1",
415 "networkutils:rdm-node": "ROADM-A1",
416 "networkutils:srg-num": "1",
417 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
421 headers = {'content-type': 'application/json'}
422 response = requests.request(
423 "POST", url, data=json.dumps(data), headers=headers,
424 auth=('admin', 'admin'))
425 self.assertEqual(response.status_code, requests.codes.ok)
427 def test_12_getLinks_OpenRoadmTopology(self):
428 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
429 .format(self.restconf_baseurl))
430 headers = {'content-type': 'application/json'}
431 response = requests.request(
432 "GET", url, headers=headers, auth=('admin', 'admin'))
433 self.assertEqual(response.status_code, requests.codes.ok)
434 res = response.json()
435 # Tests related to links
436 nbLink = len(res['network'][0]['ietf-network-topology:link'])
437 self.assertEqual(nbLink, 12)
438 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
439 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
440 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
441 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
442 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
443 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
444 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
445 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
446 for i in range(0, nbLink):
447 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
448 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
449 if(nodeType == 'EXPRESS-LINK'):
450 find = linkId in expressLink
451 self.assertEqual(find, True)
452 expressLink.remove(linkId)
453 elif(nodeType == 'ADD-LINK'):
454 find = linkId in addLink
455 self.assertEqual(find, True)
456 addLink.remove(linkId)
457 elif(nodeType == 'DROP-LINK'):
458 find = linkId in dropLink
459 self.assertEqual(find, True)
460 dropLink.remove(linkId)
461 elif(nodeType == 'XPONDER-INPUT'):
462 find = linkId in XPDR_IN
463 self.assertEqual(find, True)
464 XPDR_IN.remove(linkId)
465 elif(nodeType == 'XPONDER-OUTPUT'):
466 find = linkId in XPDR_OUT
467 self.assertEqual(find, True)
468 XPDR_OUT.remove(linkId)
470 self.assertFalse(True)
471 self.assertEqual(len(expressLink), 0)
472 self.assertEqual(len(addLink), 0)
473 self.assertEqual(len(dropLink), 0)
474 self.assertEqual(len(XPDR_IN), 0)
475 self.assertEqual(len(XPDR_OUT), 0)
477 def test_13_connect_ROADMC(self):
479 url = ("{}/config/network-topology:"
480 "network-topology/topology/topology-netconf/node/ROADM-C1"
481 .format(self.restconf_baseurl))
483 "node-id": "ROADM-C1",
484 "netconf-node-topology:username": "admin",
485 "netconf-node-topology:password": "admin",
486 "netconf-node-topology:host": "127.0.0.1",
487 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
488 "netconf-node-topology:tcp-only": "false",
489 "netconf-node-topology:pass-through": {}}]}
490 headers = {'content-type': 'application/json'}
491 response = requests.request(
492 "PUT", url, data=json.dumps(data), headers=headers,
493 auth=('admin', 'admin'))
494 self.assertEqual(response.status_code, requests.codes.created)
497 def test_14_omsAttributes_ROADMA_ROADMC(self):
498 # Config ROADMA-ROADMC oms-attributes
499 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
500 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
501 "OMS-attributes/span"
502 .format(self.restconf_baseurl))
504 "auto-spanloss": "true",
505 "engineered-spanloss": 12.2,
506 "link-concatenation": [{
509 "SRLG-length": 100000,
511 headers = {'content-type': 'application/json'}
512 response = requests.request(
513 "PUT", url, data=json.dumps(data), headers=headers,
514 auth=('admin', 'admin'))
515 self.assertEqual(response.status_code, requests.codes.created)
517 def test_15_omsAttributes_ROADMC_ROADMA(self):
518 # Config ROADM-C1-ROADM-A1 oms-attributes
519 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
520 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
521 "OMS-attributes/span"
522 .format(self.restconf_baseurl))
524 "auto-spanloss": "true",
525 "engineered-spanloss": 12.2,
526 "link-concatenation": [{
529 "SRLG-length": 100000,
532 headers = {'content-type': 'application/json'}
533 response = requests.request(
534 "PUT", url, data=json.dumps(data), headers=headers,
535 auth=('admin', 'admin'))
536 self.assertEqual(response.status_code, requests.codes.created)
538 def test_16_getClliNetwork(self):
539 url = ("{}/config/ietf-network:networks/network/clli-network"
540 .format(self.restconf_baseurl))
541 headers = {'content-type': 'application/json'}
542 response = requests.request(
543 "GET", url, headers=headers, auth=('admin', 'admin'))
544 self.assertEqual(response.status_code, requests.codes.ok)
545 res = response.json()
546 nbNode = len(res['network'][0]['node'])
547 listNode = ['NodeA', 'NodeC']
548 for i in range(0, nbNode):
549 nodeId = res['network'][0]['node'][i]['node-id']
550 find = nodeId in listNode
551 self.assertEqual(find, True)
552 if(nodeId == 'NodeA'):
553 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
555 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
556 listNode.remove(nodeId)
557 self.assertEqual(len(listNode), 0)
559 def test_17_getOpenRoadmNetwork(self):
560 url = ("{}/config/ietf-network:networks/network/openroadm-network"
561 .format(self.restconf_baseurl))
562 headers = {'content-type': 'application/json'}
563 response = requests.request(
564 "GET", url, headers=headers, auth=('admin', 'admin'))
565 self.assertEqual(response.status_code, requests.codes.ok)
566 res = response.json()
567 nbNode = len(res['network'][0]['node'])
568 self.assertEqual(nbNode, 3)
569 listNode = ['XPDR-A1', 'ROADM-A1', 'ROADM-C1']
570 for i in range(0, nbNode):
571 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
572 nodeId = res['network'][0]['node'][i]['node-id']
573 if(nodeId == 'XPDR-A1'):
574 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
575 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
576 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
577 listNode.remove(nodeId)
578 elif(nodeId == 'ROADM-A1'):
579 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
580 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
581 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
582 listNode.remove(nodeId)
583 elif(nodeId == 'ROADM-C1'):
584 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
585 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
586 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
587 listNode.remove(nodeId)
589 self.assertFalse(True)
590 self.assertEqual(len(listNode), 0)
592 def test_18_getROADMLinkOpenRoadmTopology(self):
593 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
594 .format(self.restconf_baseurl))
595 headers = {'content-type': 'application/json'}
596 response = requests.request(
597 "GET", url, headers=headers, auth=('admin', 'admin'))
598 self.assertEqual(response.status_code, requests.codes.ok)
599 res = response.json()
600 # Tests related to links
601 nbLink = len(res['network'][0]['ietf-network-topology:link'])
602 self.assertEqual(nbLink, 20)
603 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX', 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
604 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX', 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
605 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
606 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
607 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX', 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
608 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
609 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
610 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX', 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
611 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
612 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
613 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
614 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
615 for i in range(0, nbLink):
616 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
617 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
618 if(nodeType == 'EXPRESS-LINK'):
619 find = linkId in expressLink
620 self.assertEqual(find, True)
621 expressLink.remove(linkId)
622 elif(nodeType == 'ADD-LINK'):
623 find = linkId in addLink
624 self.assertEqual(find, True)
625 addLink.remove(linkId)
626 elif(nodeType == 'DROP-LINK'):
627 find = linkId in dropLink
628 self.assertEqual(find, True)
629 dropLink.remove(linkId)
630 elif(nodeType == 'ROADM-TO-ROADM'):
631 find = linkId in R2RLink
632 self.assertEqual(find, True)
633 R2RLink.remove(linkId)
634 elif(nodeType == 'XPONDER-INPUT'):
635 find = linkId in XPDR_IN
636 self.assertEqual(find, True)
637 XPDR_IN.remove(linkId)
638 elif(nodeType == 'XPONDER-OUTPUT'):
639 find = linkId in XPDR_OUT
640 self.assertEqual(find, True)
641 XPDR_OUT.remove(linkId)
643 self.assertFalse(True)
644 self.assertEqual(len(expressLink), 0)
645 self.assertEqual(len(addLink), 0)
646 self.assertEqual(len(dropLink), 0)
647 self.assertEqual(len(R2RLink), 0)
648 self.assertEqual(len(XPDR_IN), 0)
649 self.assertEqual(len(XPDR_OUT), 0)
651 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
652 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
653 .format(self.restconf_baseurl))
654 headers = {'content-type': 'application/json'}
655 response = requests.request(
656 "GET", url, headers=headers, auth=('admin', 'admin'))
657 self.assertEqual(response.status_code, requests.codes.ok)
658 res = response.json()
659 # Tests related to links
660 nbLink = len(res['network'][0]['ietf-network-topology:link'])
661 self.assertEqual(nbLink, 20)
662 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
663 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
664 for i in range(0, nbLink):
665 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
666 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
667 if(link_id in R2RLink):
669 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
670 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
671 length = res['network'][0]['ietf-network-topology:link'][i][
672 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
673 if((spanLoss != None) & (length != None)):
675 self.assertTrue(find)
676 R2RLink.remove(link_id)
677 self.assertEqual(len(R2RLink), 0)
679 def test_20_getNodes_OpenRoadmTopology(self):
680 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
681 .format(self.restconf_baseurl))
682 headers = {'content-type': 'application/json'}
683 response = requests.request(
684 "GET", url, headers=headers, auth=('admin', 'admin'))
685 res = response.json()
686 # Tests related to nodes
687 self.assertEqual(response.status_code, requests.codes.ok)
688 nbNode = len(res['network'][0]['node'])
689 self.assertEqual(nbNode, 8)
690 listNode = ['XPDR-A1-XPDR1',
691 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2',
692 'ROADM-C1-SRG1', 'ROADM-C1-DEG1', 'ROADM-C1-DEG2']
693 # ************************Tests related to XPDRA nodes
694 for i in range(0, nbNode):
695 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
696 nodeId = res['network'][0]['node'][i]['node-id']
697 if(nodeId == 'XPDR-A1-XPDR1'):
698 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
699 res['network'][0]['node'][i]['supporting-node'])
700 self.assertEqual(nodeType, 'XPONDER')
701 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
702 self.assertTrue(nbTps >= 4)
705 for j in range(0, nbTps):
706 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
707 if (tpType == 'XPONDER-CLIENT'):
709 elif (tpType == 'XPONDER-NETWORK'):
711 self.assertTrue(client == 2)
712 self.assertTrue(network == 2)
713 listNode.remove(nodeId)
714 elif(nodeId == 'ROADM-A1-SRG1'):
715 # Test related to SRG1
716 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
717 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
718 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
719 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
720 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
721 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
722 res['network'][0]['node'][i]['supporting-node'])
723 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
724 listNode.remove(nodeId)
725 elif(nodeId == 'ROADM-A1-SRG3'):
726 # Test related to SRG1
727 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
728 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
729 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
730 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
731 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
732 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
733 res['network'][0]['node'][i]['supporting-node'])
734 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
735 listNode.remove(nodeId)
736 elif(nodeId == 'ROADM-A1-DEG1'):
737 # Test related to DEG1
738 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
739 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
740 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
741 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
742 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
743 res['network'][0]['node'][i]['supporting-node'])
744 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
745 listNode.remove(nodeId)
746 elif(nodeId == 'ROADM-A1-DEG2'):
747 # Test related to DEG2
748 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
749 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
750 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
751 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
752 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
753 res['network'][0]['node'][i]['supporting-node'])
754 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
755 listNode.remove(nodeId)
756 elif(nodeId == 'ROADM-C1-SRG1'):
757 # Test related to SRG1
758 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
759 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
760 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
761 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
762 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
763 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
764 res['network'][0]['node'][i]['supporting-node'])
765 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
766 listNode.remove(nodeId)
767 elif(nodeId == 'ROADM-C1-DEG1'):
768 # Test related to DEG1
769 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
770 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
771 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
772 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
773 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
774 res['network'][0]['node'][i]['supporting-node'])
775 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
776 listNode.remove(nodeId)
777 elif(nodeId == 'ROADM-C1-DEG2'):
778 # Test related to DEG1
779 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
780 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
781 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
782 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
783 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
784 res['network'][0]['node'][i]['supporting-node'])
785 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
786 listNode.remove(nodeId)
788 self.assertFalse(True)
789 self.assertEqual(len(listNode), 0)
791 def test_21_connect_ROADMB(self):
792 url = ("{}/config/network-topology:"
793 "network-topology/topology/topology-netconf/node/ROADM-B1"
794 .format(self.restconf_baseurl))
796 "node-id": "ROADM-B1",
797 "netconf-node-topology:username": "admin",
798 "netconf-node-topology:password": "admin",
799 "netconf-node-topology:host": "127.0.0.1",
800 "netconf-node-topology:port": test_utils.sims['roadmb']['port'],
801 "netconf-node-topology:tcp-only": "false",
802 "netconf-node-topology:pass-through": {}}]}
803 headers = {'content-type': 'application/json'}
804 response = requests.request(
805 "PUT", url, data=json.dumps(data), headers=headers,
806 auth=('admin', 'admin'))
807 self.assertEqual(response.status_code, requests.codes.created)
810 def test_22_omsAttributes_ROADMA_ROADMB(self):
811 # Config ROADM-A1-ROADM-B1 oms-attributes
812 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
813 "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
814 "OMS-attributes/span"
815 .format(self.restconf_baseurl))
817 "auto-spanloss": "true",
818 "engineered-spanloss": 12.2,
819 "spanloss-current": 12,
820 "spanloss-base": 11.4,
821 "link-concatenation": [{
824 "SRLG-length": 100000,
826 headers = {'content-type': 'application/json'}
827 response = requests.request(
828 "PUT", url, data=json.dumps(data), headers=headers,
829 auth=('admin', 'admin'))
830 self.assertEqual(response.status_code, requests.codes.created)
832 def test_23_omsAttributes_ROADMB_ROADMA(self):
833 # Config ROADM-B1-ROADM-A1 oms-attributes
834 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
835 "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
836 "OMS-attributes/span"
837 .format(self.restconf_baseurl))
839 "auto-spanloss": "true",
840 "engineered-spanloss": 12.2,
841 "spanloss-current": 12,
842 "spanloss-base": 11.4,
843 "link-concatenation": [{
846 "SRLG-length": 100000,
848 headers = {'content-type': 'application/json'}
849 response = requests.request(
850 "PUT", url, data=json.dumps(data), headers=headers,
851 auth=('admin', 'admin'))
852 self.assertEqual(response.status_code, requests.codes.created)
854 def test_24_omsAttributes_ROADMB_ROADMC(self):
855 # Config ROADM-B1-ROADM-C1 oms-attributes
856 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
857 "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
858 "OMS-attributes/span"
859 .format(self.restconf_baseurl))
861 "auto-spanloss": "true",
862 "engineered-spanloss": 12.2,
863 "spanloss-current": 12,
864 "spanloss-base": 11.4,
865 "link-concatenation": [{
868 "SRLG-length": 100000,
870 headers = {'content-type': 'application/json'}
871 response = requests.request(
872 "PUT", url, data=json.dumps(data), headers=headers,
873 auth=('admin', 'admin'))
874 self.assertEqual(response.status_code, requests.codes.created)
876 def test_25_omsAttributes_ROADMC_ROADMB(self):
877 # Config ROADM-C1-ROADM-B1 oms-attributes
878 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
879 "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
880 "OMS-attributes/span"
881 .format(self.restconf_baseurl))
883 "auto-spanloss": "true",
884 "engineered-spanloss": 12.2,
885 "link-concatenation": [{
888 "SRLG-length": 100000,
890 headers = {'content-type': 'application/json'}
891 response = requests.request(
892 "PUT", url, data=json.dumps(data), headers=headers,
893 auth=('admin', 'admin'))
894 self.assertEqual(response.status_code, requests.codes.created)
896 def test_26_getClliNetwork(self):
897 url = ("{}/config/ietf-network:networks/network/clli-network"
898 .format(self.restconf_baseurl))
899 headers = {'content-type': 'application/json'}
900 response = requests.request(
901 "GET", url, headers=headers, auth=('admin', 'admin'))
902 self.assertEqual(response.status_code, requests.codes.ok)
903 res = response.json()
904 nbNode = len(res['network'][0]['node'])
905 listNode = ['NodeA', 'NodeB', 'NodeC']
906 for i in range(0, nbNode):
907 nodeId = res['network'][0]['node'][i]['node-id']
908 find = nodeId in listNode
909 self.assertEqual(find, True)
910 if(nodeId == 'NodeA'):
911 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
912 elif(nodeId == 'NodeB'):
913 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
915 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
916 listNode.remove(nodeId)
917 self.assertEqual(len(listNode), 0)
919 def test_27_verifyDegree(self):
920 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
921 .format(self.restconf_baseurl))
922 headers = {'content-type': 'application/json'}
923 response = requests.request(
924 "GET", url, headers=headers, auth=('admin', 'admin'))
925 self.assertEqual(response.status_code, requests.codes.ok)
926 res = response.json()
927 # Tests related to links
928 nbLink = len(res['network'][0]['ietf-network-topology:link'])
929 listR2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
930 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
931 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX', 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
932 for i in range(0, nbLink):
933 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
934 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
935 find = link_id in listR2RLink
936 self.assertEqual(find, True)
937 listR2RLink.remove(link_id)
938 self.assertEqual(len(listR2RLink), 0)
940 def test_28_verifyOppositeLinkTopology(self):
941 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
942 .format(self.restconf_baseurl))
943 headers = {'content-type': 'application/json'}
944 response = requests.request(
945 "GET", url, headers=headers, auth=('admin', 'admin'))
946 self.assertEqual(response.status_code, requests.codes.ok)
947 res = response.json()
948 # Tests related to links
949 nbLink = len(res['network'][0]['ietf-network-topology:link'])
950 self.assertEqual(nbLink, 26)
951 for i in range(0, nbLink):
952 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
953 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
954 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
955 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
956 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
957 # Find the opposite link
958 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
959 url = (url_oppLink.format(self.restconf_baseurl))
960 headers = {'content-type': 'application/json'}
961 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
962 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
963 res_oppLink = response_oppLink.json()
964 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
965 ['org-openroadm-common-network:opposite-link'], link_id)
966 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
967 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
968 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
969 if link_type == 'ADD-LINK':
970 self.assertEqual(oppLink_type, 'DROP-LINK')
971 elif link_type == 'DROP-LINK':
972 self.assertEqual(oppLink_type, 'ADD-LINK')
973 elif link_type == 'EXPRESS-LINK':
974 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
975 elif link_type == 'ROADM-TO-ROADM':
976 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
977 elif link_type == 'XPONDER-INPUT':
978 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
979 elif link_type == 'XPONDER-OUTPUT':
980 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
982 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
983 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
984 .format(self.restconf_baseurl))
985 headers = {'content-type': 'application/json'}
986 response = requests.request(
987 "GET", url, headers=headers, auth=('admin', 'admin'))
988 self.assertEqual(response.status_code, requests.codes.ok)
989 res = response.json()
990 nbLink = len(res['network'][0]['ietf-network-topology:link'])
991 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
992 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
993 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
994 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
995 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
996 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
997 for i in range(0, nbLink):
998 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
999 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1000 if(link_id in R2RLink):
1002 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
1003 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
1004 length = res['network'][0]['ietf-network-topology:link'][i][
1005 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
1006 if((spanLoss != None) & (length != None)):
1008 self.assertTrue(find)
1009 R2RLink.remove(link_id)
1010 self.assertEqual(len(R2RLink), 0)
1012 def test_30_disconnect_ROADMB(self):
1013 # Delete in the topology-netconf
1014 url = ("{}/config/network-topology:"
1015 "network-topology/topology/topology-netconf/node/ROADM-B1"
1016 .format(self.restconf_baseurl))
1018 headers = {'content-type': 'application/json'}
1019 response = requests.request(
1020 "DELETE", url, data=json.dumps(data), headers=headers,
1021 auth=('admin', 'admin'))
1022 self.assertEqual(response.status_code, requests.codes.ok)
1023 # Delete in the clli-network
1024 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1025 .format(self.restconf_baseurl))
1027 headers = {'content-type': 'application/json'}
1028 response = requests.request(
1029 "DELETE", url, data=json.dumps(data), headers=headers,
1030 auth=('admin', 'admin'))
1031 self.assertEqual(response.status_code, requests.codes.ok)
1033 def test_31_disconnect_ROADMC(self):
1034 # Delete in the topology-netconf
1035 url = ("{}/config/network-topology:"
1036 "network-topology/topology/topology-netconf/node/ROADM-C1"
1037 .format(self.restconf_baseurl))
1039 headers = {'content-type': 'application/json'}
1040 response = requests.request(
1041 "DELETE", url, data=json.dumps(data), headers=headers,
1042 auth=('admin', 'admin'))
1043 self.assertEqual(response.status_code, requests.codes.ok)
1044 # Delete in the clli-network
1045 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1046 .format(self.restconf_baseurl))
1048 headers = {'content-type': 'application/json'}
1049 response = requests.request(
1050 "DELETE", url, data=json.dumps(data), headers=headers,
1051 auth=('admin', 'admin'))
1052 self.assertEqual(response.status_code, requests.codes.ok)
1054 # def test_24_check_roadm2roadm_links_deletion(self):
1055 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1056 # .format(self.restconf_baseurl))
1057 # headers = {'content-type': 'application/json'}
1058 # response = requests.request(
1059 # "GET", url, headers=headers, auth=('admin', 'admin'))
1060 # self.assertEqual(response.status_code, requests.codes.ok)
1061 # res = response.json()
1062 # #Write the response in the log
1063 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1064 # outfile1.write(str(res))
1065 # #Tests related to links
1066 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1067 # self.assertEqual(nbLink,8)
1068 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1069 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1070 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1071 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1072 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1073 # for i in range(0,nbLink):
1074 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1075 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1076 # if(nodeType=='EXPRESS-LINK'):
1077 # find= linkId in expressLink
1078 # self.assertEqual(find, True)
1079 # expressLink.remove(linkId)
1080 # elif(nodeType=='ADD-LINK'):
1081 # find= linkId in addLink
1082 # self.assertEqual(find, True)
1083 # addLink.remove(linkId)
1084 # elif(nodeType=='DROP-LINK'):
1085 # find= linkId in dropLink
1086 # self.assertEqual(find, True)
1087 # dropLink.remove(linkId)
1088 # elif(nodeType=='XPONDER-INPUT'):
1089 # find= linkId in XPDR_IN
1090 # self.assertEqual(find, True)
1091 # XPDR_IN.remove(linkId)
1092 # elif(nodeType=='XPONDER-OUTPUT'):
1093 # find= linkId in XPDR_OUT
1094 # self.assertEqual(find, True)
1095 # XPDR_OUT.remove(linkId)
1097 # self.assertFalse(True)
1098 # self.assertEqual(len(expressLink),0)
1099 # self.assertEqual(len(addLink),0)
1100 # self.assertEqual(len(dropLink),0)
1101 # self.assertEqual(len(XPDR_IN),0)
1102 # self.assertEqual(len(XPDR_OUT),0)
1104 # for i in range(0,nbLink):
1105 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
1106 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1107 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1108 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1109 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1110 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1111 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1113 def test_32_getNodes_OpenRoadmTopology(self):
1114 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1115 .format(self.restconf_baseurl))
1116 headers = {'content-type': 'application/json'}
1117 response = requests.request(
1118 "GET", url, headers=headers, auth=('admin', 'admin'))
1119 res = response.json()
1120 # Tests related to nodes
1121 self.assertEqual(response.status_code, requests.codes.ok)
1122 nbNode = len(res['network'][0]['node'])
1123 self.assertEqual(nbNode, 5)
1124 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1125 for i in range(0, nbNode):
1126 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1127 nodeId = res['network'][0]['node'][i]['node-id']
1128 # Tests related to XPDRA nodes
1129 if(nodeId == 'XPDR-A1-XPDR1'):
1130 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1131 for j in range(0, nbTp):
1132 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1133 if (tpid == 'XPDR1-CLIENT1'):
1134 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1135 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1136 if (tpid == 'XPDR1-NETWORK1'):
1137 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1138 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1139 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1140 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1141 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
1142 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
1143 res['network'][0]['node'][i]['supporting-node'])
1144 listNode.remove(nodeId)
1145 elif(nodeId == 'ROADM-A1-SRG1'):
1146 # Test related to SRG1
1147 self.assertEqual(nodeType, 'SRG')
1148 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1149 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1150 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1151 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1152 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1153 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1154 res['network'][0]['node'][i]['supporting-node'])
1155 listNode.remove(nodeId)
1156 elif(nodeId == 'ROADM-A1-SRG3'):
1157 # Test related to SRG1
1158 self.assertEqual(nodeType, 'SRG')
1159 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1160 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1161 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1162 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1163 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1164 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1165 res['network'][0]['node'][i]['supporting-node'])
1166 listNode.remove(nodeId)
1167 elif(nodeId == 'ROADM-A1-DEG1'):
1168 # Test related to DEG1
1169 self.assertEqual(nodeType, 'DEGREE')
1170 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1171 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1172 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1173 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1174 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1175 res['network'][0]['node'][i]['supporting-node'])
1176 listNode.remove(nodeId)
1177 elif(nodeId == 'ROADM-A1-DEG2'):
1178 # Test related to DEG2
1179 self.assertEqual(nodeType, 'DEGREE')
1180 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1181 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1182 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1183 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1184 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1185 res['network'][0]['node'][i]['supporting-node'])
1186 listNode.remove(nodeId)
1188 self.assertFalse(True)
1189 self.assertEqual(len(listNode), 0)
1190 # Test related to SRG1 of ROADMC
1191 for i in range(0, nbNode):
1192 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-SRG1')
1193 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG1')
1194 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG2')
1196 def test_33_getOpenRoadmNetwork(self):
1197 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1198 .format(self.restconf_baseurl))
1199 headers = {'content-type': 'application/json'}
1200 response = requests.request(
1201 "GET", url, headers=headers, auth=('admin', 'admin'))
1202 self.assertEqual(response.status_code, requests.codes.ok)
1203 res = response.json()
1204 nbNode = len(res['network'][0]['node'])
1205 self.assertEqual(nbNode, 2)
1206 for i in range(0, nbNode-1):
1207 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1')
1208 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-B1')
1210 def test_34_getClliNetwork(self):
1211 url = ("{}/config/ietf-network:networks/network/clli-network"
1212 .format(self.restconf_baseurl))
1213 headers = {'content-type': 'application/json'}
1214 response = requests.request(
1215 "GET", url, headers=headers, auth=('admin', 'admin'))
1216 self.assertEqual(response.status_code, requests.codes.ok)
1217 res = response.json()
1218 nbNode = len(res['network'][0]['node'])
1219 self.assertEqual(nbNode, 1)
1220 for i in range(0, nbNode-1):
1221 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1223 def test_35_disconnect_XPDRA(self):
1224 url = ("{}/config/network-topology:"
1225 "network-topology/topology/topology-netconf/node/XPDR-A1"
1226 .format(self.restconf_baseurl))
1228 headers = {'content-type': 'application/json'}
1229 response = requests.request(
1230 "DELETE", url, data=json.dumps(data), headers=headers,
1231 auth=('admin', 'admin'))
1232 self.assertEqual(response.status_code, requests.codes.ok)
1234 def test_36_getClliNetwork(self):
1235 url = ("{}/config/ietf-network:networks/network/clli-network"
1236 .format(self.restconf_baseurl))
1237 headers = {'content-type': 'application/json'}
1238 response = requests.request(
1239 "GET", url, headers=headers, auth=('admin', 'admin'))
1240 self.assertEqual(response.status_code, requests.codes.ok)
1241 res = response.json()
1242 nbNode = len(res['network'][0]['node'])
1243 self.assertEqual(nbNode, 1)
1244 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1246 def test_37_getOpenRoadmNetwork(self):
1247 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1248 .format(self.restconf_baseurl))
1249 headers = {'content-type': 'application/json'}
1250 response = requests.request(
1251 "GET", url, headers=headers, auth=('admin', 'admin'))
1252 self.assertEqual(response.status_code, requests.codes.ok)
1253 res = response.json()
1254 nbNode = len(res['network'][0]['node'])
1255 self.assertEqual(nbNode, 1)
1256 for i in range(0, nbNode):
1257 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDR-A1')
1259 def test_38_getNodes_OpenRoadmTopology(self):
1260 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1261 .format(self.restconf_baseurl))
1262 headers = {'content-type': 'application/json'}
1263 response = requests.request(
1264 "GET", url, headers=headers, auth=('admin', 'admin'))
1265 res = response.json()
1266 # Tests related to nodes
1267 self.assertEqual(response.status_code, requests.codes.ok)
1268 nbNode = len(res['network'][0]['node'])
1269 self.assertEqual(nbNode, 4)
1270 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1271 for i in range(0, nbNode):
1272 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1273 res['network'][0]['node'][i]['supporting-node'])
1274 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1275 nodeId = res['network'][0]['node'][i]['node-id']
1276 if(nodeId == 'ROADM-A1-SRG1'):
1277 # Test related to SRG1
1278 self.assertEqual(nodeType, 'SRG')
1279 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1280 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1281 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1282 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1283 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1284 listNode.remove(nodeId)
1285 elif(nodeId == 'ROADM-A1-SRG3'):
1286 # Test related to SRG1
1287 self.assertEqual(nodeType, 'SRG')
1288 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1289 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1290 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1291 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1292 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1293 listNode.remove(nodeId)
1294 elif(nodeId == 'ROADM-A1-DEG1'):
1295 # Test related to DEG1
1296 self.assertEqual(nodeType, 'DEGREE')
1297 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1298 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1299 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1300 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1301 listNode.remove(nodeId)
1302 elif(nodeId == 'ROADM-A1-DEG2'):
1303 # Test related to DEG2
1304 self.assertEqual(nodeType, 'DEGREE')
1305 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1306 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1307 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1308 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1309 listNode.remove(nodeId)
1311 self.assertFalse(True)
1312 self.assertEqual(len(listNode), 0)
1314 def test_39_disconnect_ROADM_XPDRA_link(self):
1316 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1317 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1318 .format(self.restconf_baseurl))
1320 headers = {'content-type': 'application/json'}
1321 response = requests.request(
1322 "DELETE", url, data=json.dumps(data), headers=headers,
1323 auth=('admin', 'admin'))
1324 self.assertEqual(response.status_code, requests.codes.ok)
1326 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1327 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1328 .format(self.restconf_baseurl))
1330 headers = {'content-type': 'application/json'}
1331 response = requests.request(
1332 "DELETE", url, data=json.dumps(data), headers=headers,
1333 auth=('admin', 'admin'))
1334 self.assertEqual(response.status_code, requests.codes.ok)
1336 def test_40_getLinks_OpenRoadmTopology(self):
1337 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1338 .format(self.restconf_baseurl))
1339 headers = {'content-type': 'application/json'}
1340 response = requests.request(
1341 "GET", url, headers=headers, auth=('admin', 'admin'))
1342 self.assertEqual(response.status_code, requests.codes.ok)
1343 res = response.json()
1344 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1345 self.assertEqual(nbLink, 16)
1346 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1347 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1348 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1349 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1350 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1351 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1352 roadmtoroadmLink = 0
1353 for i in range(0, nbLink):
1354 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1355 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1356 find = link_id in expressLink
1357 self.assertEqual(find, True)
1358 expressLink.remove(link_id)
1359 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1360 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1361 find = link_id in addLink
1362 self.assertEqual(find, True)
1363 addLink.remove(link_id)
1364 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1365 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1366 find = link_id in dropLink
1367 self.assertEqual(find, True)
1368 dropLink.remove(link_id)
1370 roadmtoroadmLink += 1
1371 self.assertEqual(len(expressLink), 0)
1372 self.assertEqual(len(addLink), 0)
1373 self.assertEqual(len(dropLink), 0)
1374 self.assertEqual(roadmtoroadmLink, 6)
1375 for i in range(0, nbLink):
1376 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1377 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1378 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1379 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1381 def test_41_disconnect_ROADMA(self):
1382 url = ("{}/config/network-topology:"
1383 "network-topology/topology/topology-netconf/node/ROADM-A1"
1384 .format(self.restconf_baseurl))
1386 headers = {'content-type': 'application/json'}
1387 response = requests.request(
1388 "DELETE", url, data=json.dumps(data), headers=headers,
1389 auth=('admin', 'admin'))
1390 self.assertEqual(response.status_code, requests.codes.ok)
1391 # Delete in the clli-network
1392 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1393 .format(self.restconf_baseurl))
1395 headers = {'content-type': 'application/json'}
1396 response = requests.request(
1397 "DELETE", url, data=json.dumps(data), headers=headers,
1398 auth=('admin', 'admin'))
1399 self.assertEqual(response.status_code, requests.codes.ok)
1401 def test_42_getClliNetwork(self):
1402 url = ("{}/config/ietf-network:networks/network/clli-network"
1403 .format(self.restconf_baseurl))
1404 headers = {'content-type': 'application/json'}
1405 response = requests.request(
1406 "GET", url, headers=headers, auth=('admin', 'admin'))
1407 self.assertEqual(response.status_code, requests.codes.ok)
1408 res = response.json()
1409 self.assertNotIn('node', res['network'][0])
1411 def test_43_getOpenRoadmNetwork(self):
1412 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1413 .format(self.restconf_baseurl))
1414 headers = {'content-type': 'application/json'}
1415 response = requests.request(
1416 "GET", url, headers=headers, auth=('admin', 'admin'))
1417 self.assertEqual(response.status_code, requests.codes.ok)
1418 res = response.json()
1419 self.assertNotIn('node', res['network'][0])
1421 def test_44_check_roadm2roadm_link_persistence(self):
1422 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1423 .format(self.restconf_baseurl))
1424 headers = {'content-type': 'application/json'}
1425 response = requests.request(
1426 "GET", url, headers=headers, auth=('admin', 'admin'))
1427 self.assertEqual(response.status_code, requests.codes.ok)
1428 res = response.json()
1429 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1430 self.assertNotIn('node', res['network'][0])
1431 self.assertEqual(nbLink, 6)
1434 if __name__ == "__main__":
1435 unittest.main(verbosity=2)