3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
27 honeynode_process1 = None
28 honeynode_process2 = None
29 honeynode_process3 = None
30 honeynode_process4 = None
32 restconf_baseurl = "http://localhost:8181/restconf"
34 # START_IGNORE_XTESTING
38 print("starting honeynode1...")
39 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
42 print("starting honeynode2...")
43 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
46 print("starting honeynode3...")
47 cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
50 print("starting honeynode4...")
51 cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
53 print("all honeynodes started")
55 print("starting opendaylight...")
56 cls.odl_process = test_utils.start_tpce()
58 print("opendaylight started")
61 def tearDownClass(cls):
62 for child in psutil.Process(cls.odl_process.pid).children():
63 child.send_signal(signal.SIGINT)
65 cls.odl_process.send_signal(signal.SIGINT)
66 cls.odl_process.wait()
67 for child in psutil.Process(cls.honeynode_process1.pid).children():
68 child.send_signal(signal.SIGINT)
70 cls.honeynode_process1.send_signal(signal.SIGINT)
71 cls.honeynode_process1.wait()
72 for child in psutil.Process(cls.honeynode_process2.pid).children():
73 child.send_signal(signal.SIGINT)
75 cls.honeynode_process2.send_signal(signal.SIGINT)
76 cls.honeynode_process2.wait()
77 for child in psutil.Process(cls.honeynode_process3.pid).children():
78 child.send_signal(signal.SIGINT)
80 cls.honeynode_process3.send_signal(signal.SIGINT)
81 cls.honeynode_process3.wait()
82 for child in psutil.Process(cls.honeynode_process4.pid).children():
83 child.send_signal(signal.SIGINT)
85 cls.honeynode_process4.send_signal(signal.SIGINT)
86 cls.honeynode_process4.wait()
93 def test_01_connect_ROADM_A1(self):
95 url = ("{}/config/network-topology:"
96 "network-topology/topology/topology-netconf/node/ROADM-A1"
97 .format(self.restconf_baseurl))
99 "node-id": "ROADM-A1",
100 "netconf-node-topology:username": "admin",
101 "netconf-node-topology:password": "admin",
102 "netconf-node-topology:host": "127.0.0.1",
103 "netconf-node-topology:port": "17841",
104 "netconf-node-topology:tcp-only": "false",
105 "netconf-node-topology:pass-through": {}}]}
106 headers = {'content-type': 'application/json'}
107 response = requests.request(
108 "PUT", url, data=json.dumps(data), headers=headers,
109 auth=('admin', 'admin'))
110 self.assertEqual(response.status_code, requests.codes.created)
113 def test_02_getClliNetwork(self):
114 url = ("{}/config/ietf-network:networks/network/clli-network"
115 .format(self.restconf_baseurl))
116 headers = {'content-type': 'application/json'}
117 response = requests.request(
118 "GET", url, headers=headers, auth=('admin', 'admin'))
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
122 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
123 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
125 def test_03_getOpenRoadmNetwork(self):
126 url = ("{}/config/ietf-network:networks/network/openroadm-network"
127 .format(self.restconf_baseurl))
128 headers = {'content-type': 'application/json'}
129 response = requests.request(
130 "GET", url, headers=headers, auth=('admin', 'admin'))
131 self.assertEqual(response.status_code, requests.codes.ok)
132 res = response.json()
133 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADM-A1')
134 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
135 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
136 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
137 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
139 def test_04_getLinks_OpenroadmTopology(self):
140 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
141 .format(self.restconf_baseurl))
142 headers = {'content-type': 'application/json'}
143 response = requests.request(
144 "GET", url, headers=headers, auth=('admin', 'admin'))
145 self.assertEqual(response.status_code, requests.codes.ok)
146 res = response.json()
147 # Tests related to links
148 nbLink = len(res['network'][0]['ietf-network-topology:link'])
149 self.assertEqual(nbLink, 10)
150 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
151 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
152 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
153 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
154 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
155 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
156 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
157 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
158 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
159 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
160 for i in range(0, nbLink):
161 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
162 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
163 find = linkId in expressLink
164 self.assertEqual(find, True)
165 expressLink.remove(linkId)
166 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
167 find = linkId in addLink
168 self.assertEqual(find, True)
169 addLink.remove(linkId)
170 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
171 find = linkId in dropLink
172 self.assertEqual(find, True)
173 dropLink.remove(linkId)
175 self.assertFalse(True)
176 self.assertEqual(len(expressLink), 0)
177 self.assertEqual(len(addLink), 0)
178 self.assertEqual(len(dropLink), 0)
180 def test_05_getNodes_OpenRoadmTopology(self):
181 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
182 .format(self.restconf_baseurl))
183 headers = {'content-type': 'application/json'}
184 response = requests.request(
185 "GET", url, headers=headers, auth=('admin', 'admin'))
186 res = response.json()
187 # Tests related to nodes
188 self.assertEqual(response.status_code, requests.codes.ok)
189 nbNode = len(res['network'][0]['node'])
190 self.assertEqual(nbNode, 4)
191 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
192 for i in range(0, nbNode):
193 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
194 res['network'][0]['node'][i]['supporting-node'])
195 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
196 nodeId = res['network'][0]['node'][i]['node-id']
197 if(nodeId == 'ROADM-A1-SRG1'):
198 # Test related to SRG1
199 self.assertEqual(nodeType, 'SRG')
200 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
201 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
202 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
203 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
204 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
205 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
206 res['network'][0]['node'][i]['supporting-node'])
207 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
208 res['network'][0]['node'][i]['supporting-node'])
209 listNode.remove(nodeId)
210 elif(nodeId == 'ROADM-A1-SRG3'):
211 # Test related to SRG1
212 self.assertEqual(nodeType, 'SRG')
213 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
214 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
215 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
216 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
217 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
218 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
219 res['network'][0]['node'][i]['supporting-node'])
220 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
221 res['network'][0]['node'][i]['supporting-node'])
222 listNode.remove(nodeId)
223 elif(nodeId == 'ROADM-A1-DEG1'):
224 # Test related to DEG1
225 self.assertEqual(nodeType, 'DEGREE')
226 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
227 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
228 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
229 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
230 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
231 res['network'][0]['node'][i]['supporting-node'])
232 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
233 res['network'][0]['node'][i]['supporting-node'])
234 listNode.remove(nodeId)
235 elif(nodeId == 'ROADM-A1-DEG2'):
236 # Test related to DEG2
237 self.assertEqual(nodeType, 'DEGREE')
238 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
239 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
240 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
241 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
242 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
243 res['network'][0]['node'][i]['supporting-node'])
244 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
245 res['network'][0]['node'][i]['supporting-node'])
246 listNode.remove(nodeId)
248 self.assertFalse(True)
249 self.assertEqual(len(listNode), 0)
251 def test_06_connect_XPDRA(self):
252 url = ("{}/config/network-topology:"
253 "network-topology/topology/topology-netconf/node/XPDR-A1"
254 .format(self.restconf_baseurl))
256 "node-id": "XPDR-A1",
257 "netconf-node-topology:username": "admin",
258 "netconf-node-topology:password": "admin",
259 "netconf-node-topology:host": "127.0.0.1",
260 "netconf-node-topology:port": "17840",
261 "netconf-node-topology:tcp-only": "false",
262 "netconf-node-topology:pass-through": {}}]}
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "PUT", url, data=json.dumps(data), headers=headers,
266 auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.created)
270 def test_07_getClliNetwork(self):
271 url = ("{}/config/ietf-network:networks/network/clli-network"
272 .format(self.restconf_baseurl))
273 headers = {'content-type': 'application/json'}
274 response = requests.request(
275 "GET", url, headers=headers, auth=('admin', 'admin'))
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
279 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
281 def test_08_getOpenRoadmNetwork(self):
282 url = ("{}/config/ietf-network:networks/network/openroadm-network"
283 .format(self.restconf_baseurl))
284 headers = {'content-type': 'application/json'}
285 response = requests.request(
286 "GET", url, headers=headers, auth=('admin', 'admin'))
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 nbNode = len(res['network'][0]['node'])
290 self.assertEqual(nbNode, 2)
291 for i in range(0, nbNode):
292 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
293 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
294 nodeId = res['network'][0]['node'][i]['node-id']
295 if(nodeId == 'XPDR-A1'):
296 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
297 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
298 elif(nodeId == 'ROADM-A1'):
299 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
300 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
302 self.assertFalse(True)
304 def test_09_getNodes_OpenRoadmTopology(self):
305 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
306 .format(self.restconf_baseurl))
307 headers = {'content-type': 'application/json'}
308 response = requests.request(
309 "GET", url, headers=headers, auth=('admin', 'admin'))
310 res = response.json()
311 # Tests related to nodes
312 self.assertEqual(response.status_code, requests.codes.ok)
313 nbNode = len(res['network'][0]['node'])
314 self.assertEqual(nbNode, 5)
315 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
316 for i in range(0, nbNode):
317 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
318 nodeId = res['network'][0]['node'][i]['node-id']
319 # Tests related to XPDRA nodes
320 if(nodeId == 'XPDR-A1-XPDR1'):
321 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
322 res['network'][0]['node'][i]['supporting-node'])
323 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
324 res['network'][0]['node'][i]['supporting-node'])
325 self.assertEqual(nodeType, 'XPONDER')
326 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
329 for j in range(0, nbTps):
330 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
331 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
332 if (tpType == 'XPONDER-CLIENT'):
334 elif (tpType == 'XPONDER-NETWORK'):
336 if (tpId == 'XPDR1-NETWORK2'):
337 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
338 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT2')
339 if (tpId == 'XPDR1-CLIENT2'):
340 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
341 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
343 self.assertTrue(client == 2)
344 self.assertTrue(network == 2)
345 listNode.remove(nodeId)
346 elif(nodeId == 'ROADM-A1-SRG1'):
347 # Test related to SRG1
348 self.assertEqual(nodeType, 'SRG')
349 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
350 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
351 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
352 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
353 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
354 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
355 res['network'][0]['node'][i]['supporting-node'])
356 listNode.remove(nodeId)
357 elif(nodeId == 'ROADM-A1-SRG3'):
358 # Test related to SRG1
359 self.assertEqual(nodeType, 'SRG')
360 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
361 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
362 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
366 res['network'][0]['node'][i]['supporting-node'])
367 listNode.remove(nodeId)
368 elif(nodeId == 'ROADM-A1-DEG1'):
369 # Test related to DEG1
370 self.assertEqual(nodeType, 'DEGREE')
371 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
376 res['network'][0]['node'][i]['supporting-node'])
377 listNode.remove(nodeId)
378 elif(nodeId == 'ROADM-A1-DEG2'):
379 # Test related to DEG2
380 self.assertEqual(nodeType, 'DEGREE')
381 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
382 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
383 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
384 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
385 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
386 res['network'][0]['node'][i]['supporting-node'])
387 listNode.remove(nodeId)
389 self.assertFalse(True)
390 self.assertEqual(len(listNode), 0)
392 # Connect the tail XPDRA to ROADMA and vice versa
393 def test_10_connect_tail_xpdr_rdm(self):
394 # Connect the tail: XPDRA to ROADMA
395 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
396 .format(self.restconf_baseurl))
397 data = {"networkutils:input": {
398 "networkutils:links-input": {
399 "networkutils:xpdr-node": "XPDR-A1",
400 "networkutils:xpdr-num": "1",
401 "networkutils:network-num": "1",
402 "networkutils:rdm-node": "ROADM-A1",
403 "networkutils:srg-num": "1",
404 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
408 headers = {'content-type': 'application/json'}
409 response = requests.request(
410 "POST", url, data=json.dumps(data), headers=headers,
411 auth=('admin', 'admin'))
412 self.assertEqual(response.status_code, requests.codes.ok)
414 def test_11_connect_tail_rdm_xpdr(self):
415 # Connect the tail: ROADMA to XPDRA
416 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
417 .format(self.restconf_baseurl))
418 data = {"networkutils:input": {
419 "networkutils:links-input": {
420 "networkutils:xpdr-node": "XPDR-A1",
421 "networkutils:xpdr-num": "1",
422 "networkutils:network-num": "1",
423 "networkutils:rdm-node": "ROADM-A1",
424 "networkutils:srg-num": "1",
425 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
429 headers = {'content-type': 'application/json'}
430 response = requests.request(
431 "POST", url, data=json.dumps(data), headers=headers,
432 auth=('admin', 'admin'))
433 self.assertEqual(response.status_code, requests.codes.ok)
435 def test_12_getLinks_OpenRoadmTopology(self):
436 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
437 .format(self.restconf_baseurl))
438 headers = {'content-type': 'application/json'}
439 response = requests.request(
440 "GET", url, headers=headers, auth=('admin', 'admin'))
441 self.assertEqual(response.status_code, requests.codes.ok)
442 res = response.json()
443 # Tests related to links
444 nbLink = len(res['network'][0]['ietf-network-topology:link'])
445 self.assertEqual(nbLink, 12)
446 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
447 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
448 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
449 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
450 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
451 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
452 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
453 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
454 for i in range(0, nbLink):
455 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
456 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
457 if(nodeType == 'EXPRESS-LINK'):
458 find = linkId in expressLink
459 self.assertEqual(find, True)
460 expressLink.remove(linkId)
461 elif(nodeType == 'ADD-LINK'):
462 find = linkId in addLink
463 self.assertEqual(find, True)
464 addLink.remove(linkId)
465 elif(nodeType == 'DROP-LINK'):
466 find = linkId in dropLink
467 self.assertEqual(find, True)
468 dropLink.remove(linkId)
469 elif(nodeType == 'XPONDER-INPUT'):
470 find = linkId in XPDR_IN
471 self.assertEqual(find, True)
472 XPDR_IN.remove(linkId)
473 elif(nodeType == 'XPONDER-OUTPUT'):
474 find = linkId in XPDR_OUT
475 self.assertEqual(find, True)
476 XPDR_OUT.remove(linkId)
478 self.assertFalse(True)
479 self.assertEqual(len(expressLink), 0)
480 self.assertEqual(len(addLink), 0)
481 self.assertEqual(len(dropLink), 0)
482 self.assertEqual(len(XPDR_IN), 0)
483 self.assertEqual(len(XPDR_OUT), 0)
485 def test_13_connect_ROADMC(self):
487 url = ("{}/config/network-topology:"
488 "network-topology/topology/topology-netconf/node/ROADM-C1"
489 .format(self.restconf_baseurl))
491 "node-id": "ROADM-C1",
492 "netconf-node-topology:username": "admin",
493 "netconf-node-topology:password": "admin",
494 "netconf-node-topology:host": "127.0.0.1",
495 "netconf-node-topology:port": "17843",
496 "netconf-node-topology:tcp-only": "false",
497 "netconf-node-topology:pass-through": {}}]}
498 headers = {'content-type': 'application/json'}
499 response = requests.request(
500 "PUT", url, data=json.dumps(data), headers=headers,
501 auth=('admin', 'admin'))
502 self.assertEqual(response.status_code, requests.codes.created)
505 def test_14_omsAttributes_ROADMA_ROADMC(self):
506 # Config ROADMA-ROADMC oms-attributes
507 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
508 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
509 "OMS-attributes/span"
510 .format(self.restconf_baseurl))
512 "auto-spanloss": "true",
513 "engineered-spanloss": 12.2,
514 "link-concatenation": [{
517 "SRLG-length": 100000,
519 headers = {'content-type': 'application/json'}
520 response = requests.request(
521 "PUT", url, data=json.dumps(data), headers=headers,
522 auth=('admin', 'admin'))
523 self.assertEqual(response.status_code, requests.codes.created)
525 def test_15_omsAttributes_ROADMC_ROADMA(self):
526 # Config ROADM-C1-ROADM-A1 oms-attributes
527 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
528 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
529 "OMS-attributes/span"
530 .format(self.restconf_baseurl))
532 "auto-spanloss": "true",
533 "engineered-spanloss": 12.2,
534 "link-concatenation": [{
537 "SRLG-length": 100000,
540 headers = {'content-type': 'application/json'}
541 response = requests.request(
542 "PUT", url, data=json.dumps(data), headers=headers,
543 auth=('admin', 'admin'))
544 self.assertEqual(response.status_code, requests.codes.created)
546 def test_16_getClliNetwork(self):
547 url = ("{}/config/ietf-network:networks/network/clli-network"
548 .format(self.restconf_baseurl))
549 headers = {'content-type': 'application/json'}
550 response = requests.request(
551 "GET", url, headers=headers, auth=('admin', 'admin'))
552 self.assertEqual(response.status_code, requests.codes.ok)
553 res = response.json()
554 nbNode = len(res['network'][0]['node'])
555 listNode = ['NodeA', 'NodeC']
556 for i in range(0, nbNode):
557 nodeId = res['network'][0]['node'][i]['node-id']
558 find = nodeId in listNode
559 self.assertEqual(find, True)
560 if(nodeId == 'NodeA'):
561 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
563 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
564 listNode.remove(nodeId)
565 self.assertEqual(len(listNode), 0)
567 def test_17_getOpenRoadmNetwork(self):
568 url = ("{}/config/ietf-network:networks/network/openroadm-network"
569 .format(self.restconf_baseurl))
570 headers = {'content-type': 'application/json'}
571 response = requests.request(
572 "GET", url, headers=headers, auth=('admin', 'admin'))
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res = response.json()
575 nbNode = len(res['network'][0]['node'])
576 self.assertEqual(nbNode, 3)
577 listNode = ['XPDR-A1', 'ROADM-A1', 'ROADM-C1']
578 for i in range(0, nbNode):
579 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
580 nodeId = res['network'][0]['node'][i]['node-id']
581 if(nodeId == 'XPDR-A1'):
582 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
583 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
584 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
585 listNode.remove(nodeId)
586 elif(nodeId == 'ROADM-A1'):
587 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
588 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
589 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
590 listNode.remove(nodeId)
591 elif(nodeId == 'ROADM-C1'):
592 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
593 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
594 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
595 listNode.remove(nodeId)
597 self.assertFalse(True)
598 self.assertEqual(len(listNode), 0)
600 def test_18_getROADMLinkOpenRoadmTopology(self):
601 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
602 .format(self.restconf_baseurl))
603 headers = {'content-type': 'application/json'}
604 response = requests.request(
605 "GET", url, headers=headers, auth=('admin', 'admin'))
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
608 # Tests related to links
609 nbLink = len(res['network'][0]['ietf-network-topology:link'])
610 self.assertEqual(nbLink, 20)
611 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX', 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
612 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX', 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
613 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
614 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
615 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX', 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
616 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
617 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
618 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX', 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
619 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
620 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
621 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
622 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
623 for i in range(0, nbLink):
624 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
625 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
626 if(nodeType == 'EXPRESS-LINK'):
627 find = linkId in expressLink
628 self.assertEqual(find, True)
629 expressLink.remove(linkId)
630 elif(nodeType == 'ADD-LINK'):
631 find = linkId in addLink
632 self.assertEqual(find, True)
633 addLink.remove(linkId)
634 elif(nodeType == 'DROP-LINK'):
635 find = linkId in dropLink
636 self.assertEqual(find, True)
637 dropLink.remove(linkId)
638 elif(nodeType == 'ROADM-TO-ROADM'):
639 find = linkId in R2RLink
640 self.assertEqual(find, True)
641 R2RLink.remove(linkId)
642 elif(nodeType == 'XPONDER-INPUT'):
643 find = linkId in XPDR_IN
644 self.assertEqual(find, True)
645 XPDR_IN.remove(linkId)
646 elif(nodeType == 'XPONDER-OUTPUT'):
647 find = linkId in XPDR_OUT
648 self.assertEqual(find, True)
649 XPDR_OUT.remove(linkId)
651 self.assertFalse(True)
652 self.assertEqual(len(expressLink), 0)
653 self.assertEqual(len(addLink), 0)
654 self.assertEqual(len(dropLink), 0)
655 self.assertEqual(len(R2RLink), 0)
656 self.assertEqual(len(XPDR_IN), 0)
657 self.assertEqual(len(XPDR_OUT), 0)
659 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
660 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
661 .format(self.restconf_baseurl))
662 headers = {'content-type': 'application/json'}
663 response = requests.request(
664 "GET", url, headers=headers, auth=('admin', 'admin'))
665 self.assertEqual(response.status_code, requests.codes.ok)
666 res = response.json()
667 # Tests related to links
668 nbLink = len(res['network'][0]['ietf-network-topology:link'])
669 self.assertEqual(nbLink, 20)
670 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
671 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
672 for i in range(0, nbLink):
673 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
674 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
675 if(link_id in R2RLink):
677 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
678 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
679 length = res['network'][0]['ietf-network-topology:link'][i][
680 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
681 if((spanLoss != None) & (length != None)):
683 self.assertTrue(find)
684 R2RLink.remove(link_id)
685 self.assertEqual(len(R2RLink), 0)
687 def test_20_getNodes_OpenRoadmTopology(self):
688 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
689 .format(self.restconf_baseurl))
690 headers = {'content-type': 'application/json'}
691 response = requests.request(
692 "GET", url, headers=headers, auth=('admin', 'admin'))
693 res = response.json()
694 # Tests related to nodes
695 self.assertEqual(response.status_code, requests.codes.ok)
696 nbNode = len(res['network'][0]['node'])
697 self.assertEqual(nbNode, 8)
698 listNode = ['XPDR-A1-XPDR1',
699 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2',
700 'ROADM-C1-SRG1', 'ROADM-C1-DEG1', 'ROADM-C1-DEG2']
701 # ************************Tests related to XPDRA nodes
702 for i in range(0, nbNode):
703 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
704 nodeId = res['network'][0]['node'][i]['node-id']
705 if(nodeId == 'XPDR-A1-XPDR1'):
706 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
707 res['network'][0]['node'][i]['supporting-node'])
708 self.assertEqual(nodeType, 'XPONDER')
709 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
710 self.assertTrue(nbTps >= 4)
713 for j in range(0, nbTps):
714 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
715 if (tpType == 'XPONDER-CLIENT'):
717 elif (tpType == 'XPONDER-NETWORK'):
719 self.assertTrue(client == 2)
720 self.assertTrue(network == 2)
721 listNode.remove(nodeId)
722 elif(nodeId == 'ROADM-A1-SRG1'):
723 # Test related to SRG1
724 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
725 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
726 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
727 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
728 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
729 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
730 res['network'][0]['node'][i]['supporting-node'])
731 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
732 listNode.remove(nodeId)
733 elif(nodeId == 'ROADM-A1-SRG3'):
734 # Test related to SRG1
735 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
736 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
737 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
738 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
739 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
740 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
741 res['network'][0]['node'][i]['supporting-node'])
742 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
743 listNode.remove(nodeId)
744 elif(nodeId == 'ROADM-A1-DEG1'):
745 # Test related to DEG1
746 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
747 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
748 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
749 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
750 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
751 res['network'][0]['node'][i]['supporting-node'])
752 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
753 listNode.remove(nodeId)
754 elif(nodeId == 'ROADM-A1-DEG2'):
755 # Test related to DEG2
756 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
757 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
758 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
759 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
760 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
761 res['network'][0]['node'][i]['supporting-node'])
762 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
763 listNode.remove(nodeId)
764 elif(nodeId == 'ROADM-C1-SRG1'):
765 # Test related to SRG1
766 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
767 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
768 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
769 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
770 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
771 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
772 res['network'][0]['node'][i]['supporting-node'])
773 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
774 listNode.remove(nodeId)
775 elif(nodeId == 'ROADM-C1-DEG1'):
776 # Test related to DEG1
777 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
778 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
779 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
780 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
781 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
782 res['network'][0]['node'][i]['supporting-node'])
783 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
784 listNode.remove(nodeId)
785 elif(nodeId == 'ROADM-C1-DEG2'):
786 # Test related to DEG1
787 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
788 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
789 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
790 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
791 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
792 res['network'][0]['node'][i]['supporting-node'])
793 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
794 listNode.remove(nodeId)
796 self.assertFalse(True)
797 self.assertEqual(len(listNode), 0)
799 def test_21_connect_ROADMB(self):
800 url = ("{}/config/network-topology:"
801 "network-topology/topology/topology-netconf/node/ROADM-B1"
802 .format(self.restconf_baseurl))
804 "node-id": "ROADM-B1",
805 "netconf-node-topology:username": "admin",
806 "netconf-node-topology:password": "admin",
807 "netconf-node-topology:host": "127.0.0.1",
808 "netconf-node-topology:port": "17842",
809 "netconf-node-topology:tcp-only": "false",
810 "netconf-node-topology:pass-through": {}}]}
811 headers = {'content-type': 'application/json'}
812 response = requests.request(
813 "PUT", url, data=json.dumps(data), headers=headers,
814 auth=('admin', 'admin'))
815 self.assertEqual(response.status_code, requests.codes.created)
818 def test_22_omsAttributes_ROADMA_ROADMB(self):
819 # Config ROADM-A1-ROADM-B1 oms-attributes
820 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
821 "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
822 "OMS-attributes/span"
823 .format(self.restconf_baseurl))
825 "auto-spanloss": "true",
826 "engineered-spanloss": 12.2,
827 "spanloss-current": 12,
828 "spanloss-base": 11.4,
829 "link-concatenation": [{
832 "SRLG-length": 100000,
834 headers = {'content-type': 'application/json'}
835 response = requests.request(
836 "PUT", url, data=json.dumps(data), headers=headers,
837 auth=('admin', 'admin'))
838 self.assertEqual(response.status_code, requests.codes.created)
840 def test_23_omsAttributes_ROADMB_ROADMA(self):
841 # Config ROADM-B1-ROADM-A1 oms-attributes
842 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
843 "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
844 "OMS-attributes/span"
845 .format(self.restconf_baseurl))
847 "auto-spanloss": "true",
848 "engineered-spanloss": 12.2,
849 "spanloss-current": 12,
850 "spanloss-base": 11.4,
851 "link-concatenation": [{
854 "SRLG-length": 100000,
856 headers = {'content-type': 'application/json'}
857 response = requests.request(
858 "PUT", url, data=json.dumps(data), headers=headers,
859 auth=('admin', 'admin'))
860 self.assertEqual(response.status_code, requests.codes.created)
862 def test_24_omsAttributes_ROADMB_ROADMC(self):
863 # Config ROADM-B1-ROADM-C1 oms-attributes
864 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
865 "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
866 "OMS-attributes/span"
867 .format(self.restconf_baseurl))
869 "auto-spanloss": "true",
870 "engineered-spanloss": 12.2,
871 "spanloss-current": 12,
872 "spanloss-base": 11.4,
873 "link-concatenation": [{
876 "SRLG-length": 100000,
878 headers = {'content-type': 'application/json'}
879 response = requests.request(
880 "PUT", url, data=json.dumps(data), headers=headers,
881 auth=('admin', 'admin'))
882 self.assertEqual(response.status_code, requests.codes.created)
884 def test_25_omsAttributes_ROADMC_ROADMB(self):
885 # Config ROADM-C1-ROADM-B1 oms-attributes
886 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
887 "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
888 "OMS-attributes/span"
889 .format(self.restconf_baseurl))
891 "auto-spanloss": "true",
892 "engineered-spanloss": 12.2,
893 "link-concatenation": [{
896 "SRLG-length": 100000,
898 headers = {'content-type': 'application/json'}
899 response = requests.request(
900 "PUT", url, data=json.dumps(data), headers=headers,
901 auth=('admin', 'admin'))
902 self.assertEqual(response.status_code, requests.codes.created)
904 def test_26_getClliNetwork(self):
905 url = ("{}/config/ietf-network:networks/network/clli-network"
906 .format(self.restconf_baseurl))
907 headers = {'content-type': 'application/json'}
908 response = requests.request(
909 "GET", url, headers=headers, auth=('admin', 'admin'))
910 self.assertEqual(response.status_code, requests.codes.ok)
911 res = response.json()
912 nbNode = len(res['network'][0]['node'])
913 listNode = ['NodeA', 'NodeB', 'NodeC']
914 for i in range(0, nbNode):
915 nodeId = res['network'][0]['node'][i]['node-id']
916 find = nodeId in listNode
917 self.assertEqual(find, True)
918 if(nodeId == 'NodeA'):
919 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
920 elif(nodeId == 'NodeB'):
921 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
923 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
924 listNode.remove(nodeId)
925 self.assertEqual(len(listNode), 0)
927 def test_27_verifyDegree(self):
928 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
929 .format(self.restconf_baseurl))
930 headers = {'content-type': 'application/json'}
931 response = requests.request(
932 "GET", url, headers=headers, auth=('admin', 'admin'))
933 self.assertEqual(response.status_code, requests.codes.ok)
934 res = response.json()
935 # Tests related to links
936 nbLink = len(res['network'][0]['ietf-network-topology:link'])
937 listR2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
938 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
939 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX', 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
940 for i in range(0, nbLink):
941 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
942 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
943 find = link_id in listR2RLink
944 self.assertEqual(find, True)
945 listR2RLink.remove(link_id)
946 self.assertEqual(len(listR2RLink), 0)
948 def test_28_verifyOppositeLinkTopology(self):
949 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
950 .format(self.restconf_baseurl))
951 headers = {'content-type': 'application/json'}
952 response = requests.request(
953 "GET", url, headers=headers, auth=('admin', 'admin'))
954 self.assertEqual(response.status_code, requests.codes.ok)
955 res = response.json()
956 # Tests related to links
957 nbLink = len(res['network'][0]['ietf-network-topology:link'])
958 self.assertEqual(nbLink, 26)
959 for i in range(0, nbLink):
960 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
961 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
962 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
963 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
964 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
965 # Find the opposite link
966 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
967 url = (url_oppLink.format(self.restconf_baseurl))
968 headers = {'content-type': 'application/json'}
969 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
970 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
971 res_oppLink = response_oppLink.json()
972 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
973 ['org-openroadm-common-network:opposite-link'], link_id)
974 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
975 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
976 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
977 if link_type == 'ADD-LINK':
978 self.assertEqual(oppLink_type, 'DROP-LINK')
979 elif link_type == 'DROP-LINK':
980 self.assertEqual(oppLink_type, 'ADD-LINK')
981 elif link_type == 'EXPRESS-LINK':
982 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
983 elif link_type == 'ROADM-TO-ROADM':
984 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
985 elif link_type == 'XPONDER-INPUT':
986 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
987 elif link_type == 'XPONDER-OUTPUT':
988 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
990 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
991 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
992 .format(self.restconf_baseurl))
993 headers = {'content-type': 'application/json'}
994 response = requests.request(
995 "GET", url, headers=headers, auth=('admin', 'admin'))
996 self.assertEqual(response.status_code, requests.codes.ok)
997 res = response.json()
998 nbLink = len(res['network'][0]['ietf-network-topology:link'])
999 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
1000 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
1001 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
1002 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
1003 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
1004 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
1005 for i in range(0, nbLink):
1006 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1007 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1008 if(link_id in R2RLink):
1010 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
1011 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
1012 length = res['network'][0]['ietf-network-topology:link'][i][
1013 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
1014 if((spanLoss != None) & (length != None)):
1016 self.assertTrue(find)
1017 R2RLink.remove(link_id)
1018 self.assertEqual(len(R2RLink), 0)
1020 def test_30_disconnect_ROADMB(self):
1021 # Delete in the topology-netconf
1022 url = ("{}/config/network-topology:"
1023 "network-topology/topology/topology-netconf/node/ROADM-B1"
1024 .format(self.restconf_baseurl))
1026 headers = {'content-type': 'application/json'}
1027 response = requests.request(
1028 "DELETE", url, data=json.dumps(data), headers=headers,
1029 auth=('admin', 'admin'))
1030 self.assertEqual(response.status_code, requests.codes.ok)
1031 # Delete in the clli-network
1032 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1033 .format(self.restconf_baseurl))
1035 headers = {'content-type': 'application/json'}
1036 response = requests.request(
1037 "DELETE", url, data=json.dumps(data), headers=headers,
1038 auth=('admin', 'admin'))
1039 self.assertEqual(response.status_code, requests.codes.ok)
1041 def test_31_disconnect_ROADMC(self):
1042 # Delete in the topology-netconf
1043 url = ("{}/config/network-topology:"
1044 "network-topology/topology/topology-netconf/node/ROADM-C1"
1045 .format(self.restconf_baseurl))
1047 headers = {'content-type': 'application/json'}
1048 response = requests.request(
1049 "DELETE", url, data=json.dumps(data), headers=headers,
1050 auth=('admin', 'admin'))
1051 self.assertEqual(response.status_code, requests.codes.ok)
1052 # Delete in the clli-network
1053 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1054 .format(self.restconf_baseurl))
1056 headers = {'content-type': 'application/json'}
1057 response = requests.request(
1058 "DELETE", url, data=json.dumps(data), headers=headers,
1059 auth=('admin', 'admin'))
1060 self.assertEqual(response.status_code, requests.codes.ok)
1062 # def test_24_check_roadm2roadm_links_deletion(self):
1063 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1064 # .format(self.restconf_baseurl))
1065 # headers = {'content-type': 'application/json'}
1066 # response = requests.request(
1067 # "GET", url, headers=headers, auth=('admin', 'admin'))
1068 # self.assertEqual(response.status_code, requests.codes.ok)
1069 # res = response.json()
1070 # #Write the response in the log
1071 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1072 # outfile1.write(str(res))
1073 # #Tests related to links
1074 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1075 # self.assertEqual(nbLink,8)
1076 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1077 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1078 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1079 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1080 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1081 # for i in range(0,nbLink):
1082 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1083 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1084 # if(nodeType=='EXPRESS-LINK'):
1085 # find= linkId in expressLink
1086 # self.assertEqual(find, True)
1087 # expressLink.remove(linkId)
1088 # elif(nodeType=='ADD-LINK'):
1089 # find= linkId in addLink
1090 # self.assertEqual(find, True)
1091 # addLink.remove(linkId)
1092 # elif(nodeType=='DROP-LINK'):
1093 # find= linkId in dropLink
1094 # self.assertEqual(find, True)
1095 # dropLink.remove(linkId)
1096 # elif(nodeType=='XPONDER-INPUT'):
1097 # find= linkId in XPDR_IN
1098 # self.assertEqual(find, True)
1099 # XPDR_IN.remove(linkId)
1100 # elif(nodeType=='XPONDER-OUTPUT'):
1101 # find= linkId in XPDR_OUT
1102 # self.assertEqual(find, True)
1103 # XPDR_OUT.remove(linkId)
1105 # self.assertFalse(True)
1106 # self.assertEqual(len(expressLink),0)
1107 # self.assertEqual(len(addLink),0)
1108 # self.assertEqual(len(dropLink),0)
1109 # self.assertEqual(len(XPDR_IN),0)
1110 # self.assertEqual(len(XPDR_OUT),0)
1112 # for i in range(0,nbLink):
1113 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
1114 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1115 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1116 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1117 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1118 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1119 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1121 def test_32_getNodes_OpenRoadmTopology(self):
1122 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1123 .format(self.restconf_baseurl))
1124 headers = {'content-type': 'application/json'}
1125 response = requests.request(
1126 "GET", url, headers=headers, auth=('admin', 'admin'))
1127 res = response.json()
1128 # Tests related to nodes
1129 self.assertEqual(response.status_code, requests.codes.ok)
1130 nbNode = len(res['network'][0]['node'])
1131 self.assertEqual(nbNode, 5)
1132 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1133 for i in range(0, nbNode):
1134 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1135 nodeId = res['network'][0]['node'][i]['node-id']
1136 # Tests related to XPDRA nodes
1137 if(nodeId == 'XPDR-A1-XPDR1'):
1138 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1139 for j in range(0, nbTp):
1140 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1141 if (tpid == 'XPDR1-CLIENT1'):
1142 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1143 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1144 if (tpid == 'XPDR1-NETWORK1'):
1145 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1146 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1147 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1148 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1149 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
1150 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
1151 res['network'][0]['node'][i]['supporting-node'])
1152 listNode.remove(nodeId)
1153 elif(nodeId == 'ROADM-A1-SRG1'):
1154 # Test related to SRG1
1155 self.assertEqual(nodeType, 'SRG')
1156 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1157 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1158 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1159 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1160 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1161 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1162 res['network'][0]['node'][i]['supporting-node'])
1163 listNode.remove(nodeId)
1164 elif(nodeId == 'ROADM-A1-SRG3'):
1165 # Test related to SRG1
1166 self.assertEqual(nodeType, 'SRG')
1167 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1168 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1169 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1170 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1171 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1172 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1173 res['network'][0]['node'][i]['supporting-node'])
1174 listNode.remove(nodeId)
1175 elif(nodeId == 'ROADM-A1-DEG1'):
1176 # Test related to DEG1
1177 self.assertEqual(nodeType, 'DEGREE')
1178 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1179 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1180 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1181 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1182 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1183 res['network'][0]['node'][i]['supporting-node'])
1184 listNode.remove(nodeId)
1185 elif(nodeId == 'ROADM-A1-DEG2'):
1186 # Test related to DEG2
1187 self.assertEqual(nodeType, 'DEGREE')
1188 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1189 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1190 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1191 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1192 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1193 res['network'][0]['node'][i]['supporting-node'])
1194 listNode.remove(nodeId)
1196 self.assertFalse(True)
1197 self.assertEqual(len(listNode), 0)
1198 # Test related to SRG1 of ROADMC
1199 for i in range(0, nbNode):
1200 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-SRG1')
1201 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG1')
1202 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG2')
1204 def test_33_getOpenRoadmNetwork(self):
1205 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1206 .format(self.restconf_baseurl))
1207 headers = {'content-type': 'application/json'}
1208 response = requests.request(
1209 "GET", url, headers=headers, auth=('admin', 'admin'))
1210 self.assertEqual(response.status_code, requests.codes.ok)
1211 res = response.json()
1212 nbNode = len(res['network'][0]['node'])
1213 self.assertEqual(nbNode, 2)
1214 for i in range(0, nbNode-1):
1215 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1')
1216 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-B1')
1218 def test_34_getClliNetwork(self):
1219 url = ("{}/config/ietf-network:networks/network/clli-network"
1220 .format(self.restconf_baseurl))
1221 headers = {'content-type': 'application/json'}
1222 response = requests.request(
1223 "GET", url, headers=headers, auth=('admin', 'admin'))
1224 self.assertEqual(response.status_code, requests.codes.ok)
1225 res = response.json()
1226 nbNode = len(res['network'][0]['node'])
1227 self.assertEqual(nbNode, 1)
1228 for i in range(0, nbNode-1):
1229 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1231 def test_35_disconnect_XPDRA(self):
1232 url = ("{}/config/network-topology:"
1233 "network-topology/topology/topology-netconf/node/XPDR-A1"
1234 .format(self.restconf_baseurl))
1236 headers = {'content-type': 'application/json'}
1237 response = requests.request(
1238 "DELETE", url, data=json.dumps(data), headers=headers,
1239 auth=('admin', 'admin'))
1240 self.assertEqual(response.status_code, requests.codes.ok)
1242 def test_36_getClliNetwork(self):
1243 url = ("{}/config/ietf-network:networks/network/clli-network"
1244 .format(self.restconf_baseurl))
1245 headers = {'content-type': 'application/json'}
1246 response = requests.request(
1247 "GET", url, headers=headers, auth=('admin', 'admin'))
1248 self.assertEqual(response.status_code, requests.codes.ok)
1249 res = response.json()
1250 nbNode = len(res['network'][0]['node'])
1251 self.assertEqual(nbNode, 1)
1252 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1254 def test_37_getOpenRoadmNetwork(self):
1255 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1256 .format(self.restconf_baseurl))
1257 headers = {'content-type': 'application/json'}
1258 response = requests.request(
1259 "GET", url, headers=headers, auth=('admin', 'admin'))
1260 self.assertEqual(response.status_code, requests.codes.ok)
1261 res = response.json()
1262 nbNode = len(res['network'][0]['node'])
1263 self.assertEqual(nbNode, 1)
1264 for i in range(0, nbNode):
1265 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDR-A1')
1267 def test_38_getNodes_OpenRoadmTopology(self):
1268 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1269 .format(self.restconf_baseurl))
1270 headers = {'content-type': 'application/json'}
1271 response = requests.request(
1272 "GET", url, headers=headers, auth=('admin', 'admin'))
1273 res = response.json()
1274 # Tests related to nodes
1275 self.assertEqual(response.status_code, requests.codes.ok)
1276 nbNode = len(res['network'][0]['node'])
1277 self.assertEqual(nbNode, 4)
1278 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1279 for i in range(0, nbNode):
1280 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1281 res['network'][0]['node'][i]['supporting-node'])
1282 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1283 nodeId = res['network'][0]['node'][i]['node-id']
1284 if(nodeId == 'ROADM-A1-SRG1'):
1285 # Test related to SRG1
1286 self.assertEqual(nodeType, 'SRG')
1287 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1288 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1289 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1290 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1291 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1292 listNode.remove(nodeId)
1293 elif(nodeId == 'ROADM-A1-SRG3'):
1294 # Test related to SRG1
1295 self.assertEqual(nodeType, 'SRG')
1296 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1297 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1298 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1299 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1300 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1301 listNode.remove(nodeId)
1302 elif(nodeId == 'ROADM-A1-DEG1'):
1303 # Test related to DEG1
1304 self.assertEqual(nodeType, 'DEGREE')
1305 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1306 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1307 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1308 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1309 listNode.remove(nodeId)
1310 elif(nodeId == 'ROADM-A1-DEG2'):
1311 # Test related to DEG2
1312 self.assertEqual(nodeType, 'DEGREE')
1313 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1314 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1315 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1316 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1317 listNode.remove(nodeId)
1319 self.assertFalse(True)
1320 self.assertEqual(len(listNode), 0)
1322 def test_39_disconnect_ROADM_XPDRA_link(self):
1324 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1325 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1326 .format(self.restconf_baseurl))
1328 headers = {'content-type': 'application/json'}
1329 response = requests.request(
1330 "DELETE", url, data=json.dumps(data), headers=headers,
1331 auth=('admin', 'admin'))
1332 self.assertEqual(response.status_code, requests.codes.ok)
1334 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1335 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1336 .format(self.restconf_baseurl))
1338 headers = {'content-type': 'application/json'}
1339 response = requests.request(
1340 "DELETE", url, data=json.dumps(data), headers=headers,
1341 auth=('admin', 'admin'))
1342 self.assertEqual(response.status_code, requests.codes.ok)
1344 def test_40_getLinks_OpenRoadmTopology(self):
1345 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1346 .format(self.restconf_baseurl))
1347 headers = {'content-type': 'application/json'}
1348 response = requests.request(
1349 "GET", url, headers=headers, auth=('admin', 'admin'))
1350 self.assertEqual(response.status_code, requests.codes.ok)
1351 res = response.json()
1352 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1353 self.assertEqual(nbLink, 16)
1354 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1355 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1356 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1357 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1358 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1359 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1360 roadmtoroadmLink = 0
1361 for i in range(0, nbLink):
1362 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1363 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1364 find = link_id in expressLink
1365 self.assertEqual(find, True)
1366 expressLink.remove(link_id)
1367 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1368 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1369 find = link_id in addLink
1370 self.assertEqual(find, True)
1371 addLink.remove(link_id)
1372 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1373 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1374 find = link_id in dropLink
1375 self.assertEqual(find, True)
1376 dropLink.remove(link_id)
1378 roadmtoroadmLink += 1
1379 self.assertEqual(len(expressLink), 0)
1380 self.assertEqual(len(addLink), 0)
1381 self.assertEqual(len(dropLink), 0)
1382 self.assertEqual(roadmtoroadmLink, 6)
1383 for i in range(0, nbLink):
1384 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1385 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1386 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1387 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1389 def test_41_disconnect_ROADMA(self):
1390 url = ("{}/config/network-topology:"
1391 "network-topology/topology/topology-netconf/node/ROADM-A1"
1392 .format(self.restconf_baseurl))
1394 headers = {'content-type': 'application/json'}
1395 response = requests.request(
1396 "DELETE", url, data=json.dumps(data), headers=headers,
1397 auth=('admin', 'admin'))
1398 self.assertEqual(response.status_code, requests.codes.ok)
1399 # Delete in the clli-network
1400 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1401 .format(self.restconf_baseurl))
1403 headers = {'content-type': 'application/json'}
1404 response = requests.request(
1405 "DELETE", url, data=json.dumps(data), headers=headers,
1406 auth=('admin', 'admin'))
1407 self.assertEqual(response.status_code, requests.codes.ok)
1409 def test_42_getClliNetwork(self):
1410 url = ("{}/config/ietf-network:networks/network/clli-network"
1411 .format(self.restconf_baseurl))
1412 headers = {'content-type': 'application/json'}
1413 response = requests.request(
1414 "GET", url, headers=headers, auth=('admin', 'admin'))
1415 self.assertEqual(response.status_code, requests.codes.ok)
1416 res = response.json()
1417 self.assertNotIn('node', res['network'][0])
1419 def test_43_getOpenRoadmNetwork(self):
1420 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1421 .format(self.restconf_baseurl))
1422 headers = {'content-type': 'application/json'}
1423 response = requests.request(
1424 "GET", url, headers=headers, auth=('admin', 'admin'))
1425 self.assertEqual(response.status_code, requests.codes.ok)
1426 res = response.json()
1427 self.assertNotIn('node', res['network'][0])
1429 def test_44_check_roadm2roadm_link_persistence(self):
1430 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1431 .format(self.restconf_baseurl))
1432 headers = {'content-type': 'application/json'}
1433 response = requests.request(
1434 "GET", url, headers=headers, auth=('admin', 'admin'))
1435 self.assertEqual(response.status_code, requests.codes.ok)
1436 res = response.json()
1437 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1438 self.assertNotIn('node', res['network'][0])
1439 self.assertEqual(nbLink, 6)
1442 if __name__ == "__main__":
1443 unittest.main(verbosity=2)