3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
32 restconf_baseurl = "http://localhost:8181/restconf"
34 # START_IGNORE_XTESTING
38 cls.odl_process = test_utils.start_tpce()
39 cls.sim_process1 = test_utils.start_sim('xpdra')
40 cls.sim_process2 = test_utils.start_sim('roadma')
41 cls.sim_process3 = test_utils.start_sim('roadmb')
42 cls.sim_process4 = test_utils.start_sim('roadmc')
45 def tearDownClass(cls):
46 for child in psutil.Process(cls.odl_process.pid).children():
47 child.send_signal(signal.SIGINT)
49 cls.odl_process.send_signal(signal.SIGINT)
50 cls.odl_process.wait()
51 for child in psutil.Process(cls.sim_process1.pid).children():
52 child.send_signal(signal.SIGINT)
54 cls.sim_process1.send_signal(signal.SIGINT)
55 cls.sim_process1.wait()
56 for child in psutil.Process(cls.sim_process2.pid).children():
57 child.send_signal(signal.SIGINT)
59 cls.sim_process2.send_signal(signal.SIGINT)
60 cls.sim_process2.wait()
61 for child in psutil.Process(cls.sim_process3.pid).children():
62 child.send_signal(signal.SIGINT)
64 cls.sim_process3.send_signal(signal.SIGINT)
65 cls.sim_process3.wait()
66 for child in psutil.Process(cls.sim_process4.pid).children():
67 child.send_signal(signal.SIGINT)
69 cls.sim_process4.send_signal(signal.SIGINT)
70 cls.sim_process4.wait()
77 def test_01_connect_ROADM_A1(self):
79 url = ("{}/config/network-topology:"
80 "network-topology/topology/topology-netconf/node/ROADM-A1"
81 .format(self.restconf_baseurl))
83 "node-id": "ROADM-A1",
84 "netconf-node-topology:username": "admin",
85 "netconf-node-topology:password": "admin",
86 "netconf-node-topology:host": "127.0.0.1",
87 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
88 "netconf-node-topology:tcp-only": "false",
89 "netconf-node-topology:pass-through": {}}]}
90 headers = {'content-type': 'application/json'}
91 response = requests.request(
92 "PUT", url, data=json.dumps(data), headers=headers,
93 auth=('admin', 'admin'))
94 self.assertEqual(response.status_code, requests.codes.created)
97 def test_02_getClliNetwork(self):
98 url = ("{}/config/ietf-network:networks/network/clli-network"
99 .format(self.restconf_baseurl))
100 headers = {'content-type': 'application/json'}
101 response = requests.request(
102 "GET", url, headers=headers, auth=('admin', 'admin'))
103 self.assertEqual(response.status_code, requests.codes.ok)
104 res = response.json()
106 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
107 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
109 def test_03_getOpenRoadmNetwork(self):
110 url = ("{}/config/ietf-network:networks/network/openroadm-network"
111 .format(self.restconf_baseurl))
112 headers = {'content-type': 'application/json'}
113 response = requests.request(
114 "GET", url, headers=headers, auth=('admin', 'admin'))
115 self.assertEqual(response.status_code, requests.codes.ok)
116 res = response.json()
117 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADM-A1')
118 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
119 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
120 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
121 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
123 def test_04_getLinks_OpenroadmTopology(self):
124 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
125 .format(self.restconf_baseurl))
126 headers = {'content-type': 'application/json'}
127 response = requests.request(
128 "GET", url, headers=headers, auth=('admin', 'admin'))
129 self.assertEqual(response.status_code, requests.codes.ok)
130 res = response.json()
131 # Tests related to links
132 nbLink = len(res['network'][0]['ietf-network-topology:link'])
133 self.assertEqual(nbLink, 10)
134 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
135 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
136 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
137 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
138 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
139 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
140 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
141 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
142 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
143 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
144 for i in range(0, nbLink):
145 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
146 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
147 find = linkId in expressLink
148 self.assertEqual(find, True)
149 expressLink.remove(linkId)
150 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
151 find = linkId in addLink
152 self.assertEqual(find, True)
153 addLink.remove(linkId)
154 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
155 find = linkId in dropLink
156 self.assertEqual(find, True)
157 dropLink.remove(linkId)
159 self.assertFalse(True)
160 self.assertEqual(len(expressLink), 0)
161 self.assertEqual(len(addLink), 0)
162 self.assertEqual(len(dropLink), 0)
164 def test_05_getNodes_OpenRoadmTopology(self):
165 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
166 .format(self.restconf_baseurl))
167 headers = {'content-type': 'application/json'}
168 response = requests.request(
169 "GET", url, headers=headers, auth=('admin', 'admin'))
170 res = response.json()
171 # Tests related to nodes
172 self.assertEqual(response.status_code, requests.codes.ok)
173 nbNode = len(res['network'][0]['node'])
174 self.assertEqual(nbNode, 4)
175 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
176 for i in range(0, nbNode):
177 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
178 res['network'][0]['node'][i]['supporting-node'])
179 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
180 nodeId = res['network'][0]['node'][i]['node-id']
181 if(nodeId == 'ROADM-A1-SRG1'):
182 # Test related to SRG1
183 self.assertEqual(nodeType, 'SRG')
184 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
185 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
186 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
187 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
188 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
189 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
190 res['network'][0]['node'][i]['supporting-node'])
191 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
192 res['network'][0]['node'][i]['supporting-node'])
193 listNode.remove(nodeId)
194 elif(nodeId == 'ROADM-A1-SRG3'):
195 # Test related to SRG1
196 self.assertEqual(nodeType, 'SRG')
197 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
198 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
199 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
200 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
201 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
202 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
203 res['network'][0]['node'][i]['supporting-node'])
204 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
205 res['network'][0]['node'][i]['supporting-node'])
206 listNode.remove(nodeId)
207 elif(nodeId == 'ROADM-A1-DEG1'):
208 # Test related to DEG1
209 self.assertEqual(nodeType, 'DEGREE')
210 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
211 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
212 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
213 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
214 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
215 res['network'][0]['node'][i]['supporting-node'])
216 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
217 res['network'][0]['node'][i]['supporting-node'])
218 listNode.remove(nodeId)
219 elif(nodeId == 'ROADM-A1-DEG2'):
220 # Test related to DEG2
221 self.assertEqual(nodeType, 'DEGREE')
222 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
223 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
224 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
225 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
226 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
227 res['network'][0]['node'][i]['supporting-node'])
228 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
229 res['network'][0]['node'][i]['supporting-node'])
230 listNode.remove(nodeId)
232 self.assertFalse(True)
233 self.assertEqual(len(listNode), 0)
235 def test_06_connect_XPDRA(self):
236 url = ("{}/config/network-topology:"
237 "network-topology/topology/topology-netconf/node/XPDR-A1"
238 .format(self.restconf_baseurl))
240 "node-id": "XPDR-A1",
241 "netconf-node-topology:username": "admin",
242 "netconf-node-topology:password": "admin",
243 "netconf-node-topology:host": "127.0.0.1",
244 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
245 "netconf-node-topology:tcp-only": "false",
246 "netconf-node-topology:pass-through": {}}]}
247 headers = {'content-type': 'application/json'}
248 response = requests.request(
249 "PUT", url, data=json.dumps(data), headers=headers,
250 auth=('admin', 'admin'))
251 self.assertEqual(response.status_code, requests.codes.created)
254 def test_07_getClliNetwork(self):
255 url = ("{}/config/ietf-network:networks/network/clli-network"
256 .format(self.restconf_baseurl))
257 headers = {'content-type': 'application/json'}
258 response = requests.request(
259 "GET", url, headers=headers, auth=('admin', 'admin'))
260 self.assertEqual(response.status_code, requests.codes.ok)
261 res = response.json()
262 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
263 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
265 def test_08_getOpenRoadmNetwork(self):
266 url = ("{}/config/ietf-network:networks/network/openroadm-network"
267 .format(self.restconf_baseurl))
268 headers = {'content-type': 'application/json'}
269 response = requests.request(
270 "GET", url, headers=headers, auth=('admin', 'admin'))
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 nbNode = len(res['network'][0]['node'])
274 self.assertEqual(nbNode, 2)
275 for i in range(0, nbNode):
276 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
277 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
278 nodeId = res['network'][0]['node'][i]['node-id']
279 if(nodeId == 'XPDR-A1'):
280 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
281 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
282 elif(nodeId == 'ROADM-A1'):
283 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
284 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
286 self.assertFalse(True)
288 def test_09_getNodes_OpenRoadmTopology(self):
289 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
290 .format(self.restconf_baseurl))
291 headers = {'content-type': 'application/json'}
292 response = requests.request(
293 "GET", url, headers=headers, auth=('admin', 'admin'))
294 res = response.json()
295 # Tests related to nodes
296 self.assertEqual(response.status_code, requests.codes.ok)
297 nbNode = len(res['network'][0]['node'])
298 self.assertEqual(nbNode, 5)
299 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
300 for i in range(0, nbNode):
301 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
302 nodeId = res['network'][0]['node'][i]['node-id']
303 # Tests related to XPDRA nodes
304 if(nodeId == 'XPDR-A1-XPDR1'):
305 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
306 res['network'][0]['node'][i]['supporting-node'])
307 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
308 res['network'][0]['node'][i]['supporting-node'])
309 self.assertEqual(nodeType, 'XPONDER')
310 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
313 for j in range(0, nbTps):
314 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
315 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
316 if (tpType == 'XPONDER-CLIENT'):
318 elif (tpType == 'XPONDER-NETWORK'):
320 if (tpId == 'XPDR1-NETWORK2'):
321 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
322 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT2')
323 if (tpId == 'XPDR1-CLIENT2'):
324 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
325 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
327 self.assertTrue(client == 2)
328 self.assertTrue(network == 2)
329 listNode.remove(nodeId)
330 elif(nodeId == 'ROADM-A1-SRG1'):
331 # Test related to SRG1
332 self.assertEqual(nodeType, 'SRG')
333 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
334 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
335 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
336 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
337 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
338 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
339 res['network'][0]['node'][i]['supporting-node'])
340 listNode.remove(nodeId)
341 elif(nodeId == 'ROADM-A1-SRG3'):
342 # Test related to SRG1
343 self.assertEqual(nodeType, 'SRG')
344 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
345 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
346 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
347 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
348 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
349 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
350 res['network'][0]['node'][i]['supporting-node'])
351 listNode.remove(nodeId)
352 elif(nodeId == 'ROADM-A1-DEG1'):
353 # Test related to DEG1
354 self.assertEqual(nodeType, 'DEGREE')
355 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
356 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
357 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
358 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
359 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
360 res['network'][0]['node'][i]['supporting-node'])
361 listNode.remove(nodeId)
362 elif(nodeId == 'ROADM-A1-DEG2'):
363 # Test related to DEG2
364 self.assertEqual(nodeType, 'DEGREE')
365 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
366 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
367 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
368 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
369 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
370 res['network'][0]['node'][i]['supporting-node'])
371 listNode.remove(nodeId)
373 self.assertFalse(True)
374 self.assertEqual(len(listNode), 0)
376 # Connect the tail XPDRA to ROADMA and vice versa
377 def test_10_connect_tail_xpdr_rdm(self):
378 # Connect the tail: XPDRA to ROADMA
379 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
380 .format(self.restconf_baseurl))
381 data = {"networkutils:input": {
382 "networkutils:links-input": {
383 "networkutils:xpdr-node": "XPDR-A1",
384 "networkutils:xpdr-num": "1",
385 "networkutils:network-num": "1",
386 "networkutils:rdm-node": "ROADM-A1",
387 "networkutils:srg-num": "1",
388 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
392 headers = {'content-type': 'application/json'}
393 response = requests.request(
394 "POST", url, data=json.dumps(data), headers=headers,
395 auth=('admin', 'admin'))
396 self.assertEqual(response.status_code, requests.codes.ok)
398 def test_11_connect_tail_rdm_xpdr(self):
399 # Connect the tail: ROADMA to XPDRA
400 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
401 .format(self.restconf_baseurl))
402 data = {"networkutils:input": {
403 "networkutils:links-input": {
404 "networkutils:xpdr-node": "XPDR-A1",
405 "networkutils:xpdr-num": "1",
406 "networkutils:network-num": "1",
407 "networkutils:rdm-node": "ROADM-A1",
408 "networkutils:srg-num": "1",
409 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
413 headers = {'content-type': 'application/json'}
414 response = requests.request(
415 "POST", url, data=json.dumps(data), headers=headers,
416 auth=('admin', 'admin'))
417 self.assertEqual(response.status_code, requests.codes.ok)
419 def test_12_getLinks_OpenRoadmTopology(self):
420 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
421 .format(self.restconf_baseurl))
422 headers = {'content-type': 'application/json'}
423 response = requests.request(
424 "GET", url, headers=headers, auth=('admin', 'admin'))
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
427 # Tests related to links
428 nbLink = len(res['network'][0]['ietf-network-topology:link'])
429 self.assertEqual(nbLink, 12)
430 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
431 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
432 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
433 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
434 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
435 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
436 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
437 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
438 for i in range(0, nbLink):
439 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
440 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
441 if(nodeType == 'EXPRESS-LINK'):
442 find = linkId in expressLink
443 self.assertEqual(find, True)
444 expressLink.remove(linkId)
445 elif(nodeType == 'ADD-LINK'):
446 find = linkId in addLink
447 self.assertEqual(find, True)
448 addLink.remove(linkId)
449 elif(nodeType == 'DROP-LINK'):
450 find = linkId in dropLink
451 self.assertEqual(find, True)
452 dropLink.remove(linkId)
453 elif(nodeType == 'XPONDER-INPUT'):
454 find = linkId in XPDR_IN
455 self.assertEqual(find, True)
456 XPDR_IN.remove(linkId)
457 elif(nodeType == 'XPONDER-OUTPUT'):
458 find = linkId in XPDR_OUT
459 self.assertEqual(find, True)
460 XPDR_OUT.remove(linkId)
462 self.assertFalse(True)
463 self.assertEqual(len(expressLink), 0)
464 self.assertEqual(len(addLink), 0)
465 self.assertEqual(len(dropLink), 0)
466 self.assertEqual(len(XPDR_IN), 0)
467 self.assertEqual(len(XPDR_OUT), 0)
469 def test_13_connect_ROADMC(self):
471 url = ("{}/config/network-topology:"
472 "network-topology/topology/topology-netconf/node/ROADM-C1"
473 .format(self.restconf_baseurl))
475 "node-id": "ROADM-C1",
476 "netconf-node-topology:username": "admin",
477 "netconf-node-topology:password": "admin",
478 "netconf-node-topology:host": "127.0.0.1",
479 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
480 "netconf-node-topology:tcp-only": "false",
481 "netconf-node-topology:pass-through": {}}]}
482 headers = {'content-type': 'application/json'}
483 response = requests.request(
484 "PUT", url, data=json.dumps(data), headers=headers,
485 auth=('admin', 'admin'))
486 self.assertEqual(response.status_code, requests.codes.created)
489 def test_14_omsAttributes_ROADMA_ROADMC(self):
490 # Config ROADMA-ROADMC oms-attributes
491 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
492 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
493 "OMS-attributes/span"
494 .format(self.restconf_baseurl))
496 "auto-spanloss": "true",
497 "engineered-spanloss": 12.2,
498 "link-concatenation": [{
501 "SRLG-length": 100000,
503 headers = {'content-type': 'application/json'}
504 response = requests.request(
505 "PUT", url, data=json.dumps(data), headers=headers,
506 auth=('admin', 'admin'))
507 self.assertEqual(response.status_code, requests.codes.created)
509 def test_15_omsAttributes_ROADMC_ROADMA(self):
510 # Config ROADM-C1-ROADM-A1 oms-attributes
511 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
512 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
513 "OMS-attributes/span"
514 .format(self.restconf_baseurl))
516 "auto-spanloss": "true",
517 "engineered-spanloss": 12.2,
518 "link-concatenation": [{
521 "SRLG-length": 100000,
524 headers = {'content-type': 'application/json'}
525 response = requests.request(
526 "PUT", url, data=json.dumps(data), headers=headers,
527 auth=('admin', 'admin'))
528 self.assertEqual(response.status_code, requests.codes.created)
530 def test_16_getClliNetwork(self):
531 url = ("{}/config/ietf-network:networks/network/clli-network"
532 .format(self.restconf_baseurl))
533 headers = {'content-type': 'application/json'}
534 response = requests.request(
535 "GET", url, headers=headers, auth=('admin', 'admin'))
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 nbNode = len(res['network'][0]['node'])
539 listNode = ['NodeA', 'NodeC']
540 for i in range(0, nbNode):
541 nodeId = res['network'][0]['node'][i]['node-id']
542 find = nodeId in listNode
543 self.assertEqual(find, True)
544 if(nodeId == 'NodeA'):
545 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
547 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
548 listNode.remove(nodeId)
549 self.assertEqual(len(listNode), 0)
551 def test_17_getOpenRoadmNetwork(self):
552 url = ("{}/config/ietf-network:networks/network/openroadm-network"
553 .format(self.restconf_baseurl))
554 headers = {'content-type': 'application/json'}
555 response = requests.request(
556 "GET", url, headers=headers, auth=('admin', 'admin'))
557 self.assertEqual(response.status_code, requests.codes.ok)
558 res = response.json()
559 nbNode = len(res['network'][0]['node'])
560 self.assertEqual(nbNode, 3)
561 listNode = ['XPDR-A1', 'ROADM-A1', 'ROADM-C1']
562 for i in range(0, nbNode):
563 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
564 nodeId = res['network'][0]['node'][i]['node-id']
565 if(nodeId == 'XPDR-A1'):
566 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
567 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
568 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
569 listNode.remove(nodeId)
570 elif(nodeId == 'ROADM-A1'):
571 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
572 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
573 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
574 listNode.remove(nodeId)
575 elif(nodeId == 'ROADM-C1'):
576 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
577 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
578 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
579 listNode.remove(nodeId)
581 self.assertFalse(True)
582 self.assertEqual(len(listNode), 0)
584 def test_18_getROADMLinkOpenRoadmTopology(self):
585 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
586 .format(self.restconf_baseurl))
587 headers = {'content-type': 'application/json'}
588 response = requests.request(
589 "GET", url, headers=headers, auth=('admin', 'admin'))
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 # Tests related to links
593 nbLink = len(res['network'][0]['ietf-network-topology:link'])
594 self.assertEqual(nbLink, 20)
595 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX', 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
596 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX', 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
597 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
598 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
599 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX', 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
600 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
601 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
602 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX', 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
603 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
604 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
605 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
606 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
607 for i in range(0, nbLink):
608 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
609 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
610 if(nodeType == 'EXPRESS-LINK'):
611 find = linkId in expressLink
612 self.assertEqual(find, True)
613 expressLink.remove(linkId)
614 elif(nodeType == 'ADD-LINK'):
615 find = linkId in addLink
616 self.assertEqual(find, True)
617 addLink.remove(linkId)
618 elif(nodeType == 'DROP-LINK'):
619 find = linkId in dropLink
620 self.assertEqual(find, True)
621 dropLink.remove(linkId)
622 elif(nodeType == 'ROADM-TO-ROADM'):
623 find = linkId in R2RLink
624 self.assertEqual(find, True)
625 R2RLink.remove(linkId)
626 elif(nodeType == 'XPONDER-INPUT'):
627 find = linkId in XPDR_IN
628 self.assertEqual(find, True)
629 XPDR_IN.remove(linkId)
630 elif(nodeType == 'XPONDER-OUTPUT'):
631 find = linkId in XPDR_OUT
632 self.assertEqual(find, True)
633 XPDR_OUT.remove(linkId)
635 self.assertFalse(True)
636 self.assertEqual(len(expressLink), 0)
637 self.assertEqual(len(addLink), 0)
638 self.assertEqual(len(dropLink), 0)
639 self.assertEqual(len(R2RLink), 0)
640 self.assertEqual(len(XPDR_IN), 0)
641 self.assertEqual(len(XPDR_OUT), 0)
643 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
644 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
645 .format(self.restconf_baseurl))
646 headers = {'content-type': 'application/json'}
647 response = requests.request(
648 "GET", url, headers=headers, auth=('admin', 'admin'))
649 self.assertEqual(response.status_code, requests.codes.ok)
650 res = response.json()
651 # Tests related to links
652 nbLink = len(res['network'][0]['ietf-network-topology:link'])
653 self.assertEqual(nbLink, 20)
654 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
655 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
656 for i in range(0, nbLink):
657 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
658 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
659 if(link_id in R2RLink):
661 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
662 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
663 length = res['network'][0]['ietf-network-topology:link'][i][
664 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
665 if((spanLoss != None) & (length != None)):
667 self.assertTrue(find)
668 R2RLink.remove(link_id)
669 self.assertEqual(len(R2RLink), 0)
671 def test_20_getNodes_OpenRoadmTopology(self):
672 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
673 .format(self.restconf_baseurl))
674 headers = {'content-type': 'application/json'}
675 response = requests.request(
676 "GET", url, headers=headers, auth=('admin', 'admin'))
677 res = response.json()
678 # Tests related to nodes
679 self.assertEqual(response.status_code, requests.codes.ok)
680 nbNode = len(res['network'][0]['node'])
681 self.assertEqual(nbNode, 8)
682 listNode = ['XPDR-A1-XPDR1',
683 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2',
684 'ROADM-C1-SRG1', 'ROADM-C1-DEG1', 'ROADM-C1-DEG2']
685 # ************************Tests related to XPDRA nodes
686 for i in range(0, nbNode):
687 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
688 nodeId = res['network'][0]['node'][i]['node-id']
689 if(nodeId == 'XPDR-A1-XPDR1'):
690 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
691 res['network'][0]['node'][i]['supporting-node'])
692 self.assertEqual(nodeType, 'XPONDER')
693 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
694 self.assertTrue(nbTps >= 4)
697 for j in range(0, nbTps):
698 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
699 if (tpType == 'XPONDER-CLIENT'):
701 elif (tpType == 'XPONDER-NETWORK'):
703 self.assertTrue(client == 2)
704 self.assertTrue(network == 2)
705 listNode.remove(nodeId)
706 elif(nodeId == 'ROADM-A1-SRG1'):
707 # Test related to SRG1
708 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
709 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
710 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
711 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
712 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
713 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
714 res['network'][0]['node'][i]['supporting-node'])
715 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
716 listNode.remove(nodeId)
717 elif(nodeId == 'ROADM-A1-SRG3'):
718 # Test related to SRG1
719 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
720 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
721 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
722 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
723 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
724 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
725 res['network'][0]['node'][i]['supporting-node'])
726 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
727 listNode.remove(nodeId)
728 elif(nodeId == 'ROADM-A1-DEG1'):
729 # Test related to DEG1
730 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
731 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
732 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
733 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
734 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
735 res['network'][0]['node'][i]['supporting-node'])
736 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
737 listNode.remove(nodeId)
738 elif(nodeId == 'ROADM-A1-DEG2'):
739 # Test related to DEG2
740 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
741 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
742 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
743 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
744 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
745 res['network'][0]['node'][i]['supporting-node'])
746 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
747 listNode.remove(nodeId)
748 elif(nodeId == 'ROADM-C1-SRG1'):
749 # Test related to SRG1
750 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
751 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
752 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
753 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
754 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
755 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
756 res['network'][0]['node'][i]['supporting-node'])
757 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
758 listNode.remove(nodeId)
759 elif(nodeId == 'ROADM-C1-DEG1'):
760 # Test related to DEG1
761 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
762 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
763 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
764 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
765 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
766 res['network'][0]['node'][i]['supporting-node'])
767 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
768 listNode.remove(nodeId)
769 elif(nodeId == 'ROADM-C1-DEG2'):
770 # Test related to DEG1
771 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
772 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
773 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
774 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
775 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
776 res['network'][0]['node'][i]['supporting-node'])
777 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
778 listNode.remove(nodeId)
780 self.assertFalse(True)
781 self.assertEqual(len(listNode), 0)
783 def test_21_connect_ROADMB(self):
784 url = ("{}/config/network-topology:"
785 "network-topology/topology/topology-netconf/node/ROADM-B1"
786 .format(self.restconf_baseurl))
788 "node-id": "ROADM-B1",
789 "netconf-node-topology:username": "admin",
790 "netconf-node-topology:password": "admin",
791 "netconf-node-topology:host": "127.0.0.1",
792 "netconf-node-topology:port": test_utils.sims['roadmb']['port'],
793 "netconf-node-topology:tcp-only": "false",
794 "netconf-node-topology:pass-through": {}}]}
795 headers = {'content-type': 'application/json'}
796 response = requests.request(
797 "PUT", url, data=json.dumps(data), headers=headers,
798 auth=('admin', 'admin'))
799 self.assertEqual(response.status_code, requests.codes.created)
802 def test_22_omsAttributes_ROADMA_ROADMB(self):
803 # Config ROADM-A1-ROADM-B1 oms-attributes
804 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
805 "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
806 "OMS-attributes/span"
807 .format(self.restconf_baseurl))
809 "auto-spanloss": "true",
810 "engineered-spanloss": 12.2,
811 "spanloss-current": 12,
812 "spanloss-base": 11.4,
813 "link-concatenation": [{
816 "SRLG-length": 100000,
818 headers = {'content-type': 'application/json'}
819 response = requests.request(
820 "PUT", url, data=json.dumps(data), headers=headers,
821 auth=('admin', 'admin'))
822 self.assertEqual(response.status_code, requests.codes.created)
824 def test_23_omsAttributes_ROADMB_ROADMA(self):
825 # Config ROADM-B1-ROADM-A1 oms-attributes
826 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
827 "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
828 "OMS-attributes/span"
829 .format(self.restconf_baseurl))
831 "auto-spanloss": "true",
832 "engineered-spanloss": 12.2,
833 "spanloss-current": 12,
834 "spanloss-base": 11.4,
835 "link-concatenation": [{
838 "SRLG-length": 100000,
840 headers = {'content-type': 'application/json'}
841 response = requests.request(
842 "PUT", url, data=json.dumps(data), headers=headers,
843 auth=('admin', 'admin'))
844 self.assertEqual(response.status_code, requests.codes.created)
846 def test_24_omsAttributes_ROADMB_ROADMC(self):
847 # Config ROADM-B1-ROADM-C1 oms-attributes
848 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
849 "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
850 "OMS-attributes/span"
851 .format(self.restconf_baseurl))
853 "auto-spanloss": "true",
854 "engineered-spanloss": 12.2,
855 "spanloss-current": 12,
856 "spanloss-base": 11.4,
857 "link-concatenation": [{
860 "SRLG-length": 100000,
862 headers = {'content-type': 'application/json'}
863 response = requests.request(
864 "PUT", url, data=json.dumps(data), headers=headers,
865 auth=('admin', 'admin'))
866 self.assertEqual(response.status_code, requests.codes.created)
868 def test_25_omsAttributes_ROADMC_ROADMB(self):
869 # Config ROADM-C1-ROADM-B1 oms-attributes
870 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
871 "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
872 "OMS-attributes/span"
873 .format(self.restconf_baseurl))
875 "auto-spanloss": "true",
876 "engineered-spanloss": 12.2,
877 "link-concatenation": [{
880 "SRLG-length": 100000,
882 headers = {'content-type': 'application/json'}
883 response = requests.request(
884 "PUT", url, data=json.dumps(data), headers=headers,
885 auth=('admin', 'admin'))
886 self.assertEqual(response.status_code, requests.codes.created)
888 def test_26_getClliNetwork(self):
889 url = ("{}/config/ietf-network:networks/network/clli-network"
890 .format(self.restconf_baseurl))
891 headers = {'content-type': 'application/json'}
892 response = requests.request(
893 "GET", url, headers=headers, auth=('admin', 'admin'))
894 self.assertEqual(response.status_code, requests.codes.ok)
895 res = response.json()
896 nbNode = len(res['network'][0]['node'])
897 listNode = ['NodeA', 'NodeB', 'NodeC']
898 for i in range(0, nbNode):
899 nodeId = res['network'][0]['node'][i]['node-id']
900 find = nodeId in listNode
901 self.assertEqual(find, True)
902 if(nodeId == 'NodeA'):
903 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
904 elif(nodeId == 'NodeB'):
905 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
907 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
908 listNode.remove(nodeId)
909 self.assertEqual(len(listNode), 0)
911 def test_27_verifyDegree(self):
912 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
913 .format(self.restconf_baseurl))
914 headers = {'content-type': 'application/json'}
915 response = requests.request(
916 "GET", url, headers=headers, auth=('admin', 'admin'))
917 self.assertEqual(response.status_code, requests.codes.ok)
918 res = response.json()
919 # Tests related to links
920 nbLink = len(res['network'][0]['ietf-network-topology:link'])
921 listR2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
922 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
923 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX', 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
924 for i in range(0, nbLink):
925 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
926 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
927 find = link_id in listR2RLink
928 self.assertEqual(find, True)
929 listR2RLink.remove(link_id)
930 self.assertEqual(len(listR2RLink), 0)
932 def test_28_verifyOppositeLinkTopology(self):
933 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
934 .format(self.restconf_baseurl))
935 headers = {'content-type': 'application/json'}
936 response = requests.request(
937 "GET", url, headers=headers, auth=('admin', 'admin'))
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 # Tests related to links
941 nbLink = len(res['network'][0]['ietf-network-topology:link'])
942 self.assertEqual(nbLink, 26)
943 for i in range(0, nbLink):
944 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
945 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
946 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
947 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
948 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
949 # Find the opposite link
950 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
951 url = (url_oppLink.format(self.restconf_baseurl))
952 headers = {'content-type': 'application/json'}
953 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
954 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
955 res_oppLink = response_oppLink.json()
956 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
957 ['org-openroadm-common-network:opposite-link'], link_id)
958 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
959 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
960 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
961 if link_type == 'ADD-LINK':
962 self.assertEqual(oppLink_type, 'DROP-LINK')
963 elif link_type == 'DROP-LINK':
964 self.assertEqual(oppLink_type, 'ADD-LINK')
965 elif link_type == 'EXPRESS-LINK':
966 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
967 elif link_type == 'ROADM-TO-ROADM':
968 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
969 elif link_type == 'XPONDER-INPUT':
970 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
971 elif link_type == 'XPONDER-OUTPUT':
972 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
974 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
975 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
976 .format(self.restconf_baseurl))
977 headers = {'content-type': 'application/json'}
978 response = requests.request(
979 "GET", url, headers=headers, auth=('admin', 'admin'))
980 self.assertEqual(response.status_code, requests.codes.ok)
981 res = response.json()
982 nbLink = len(res['network'][0]['ietf-network-topology:link'])
983 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
984 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
985 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
986 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
987 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
988 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
989 for i in range(0, nbLink):
990 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
991 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
992 if(link_id in R2RLink):
994 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
995 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
996 length = res['network'][0]['ietf-network-topology:link'][i][
997 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
998 if((spanLoss != None) & (length != None)):
1000 self.assertTrue(find)
1001 R2RLink.remove(link_id)
1002 self.assertEqual(len(R2RLink), 0)
1004 def test_30_disconnect_ROADMB(self):
1005 # Delete in the topology-netconf
1006 url = ("{}/config/network-topology:"
1007 "network-topology/topology/topology-netconf/node/ROADM-B1"
1008 .format(self.restconf_baseurl))
1010 headers = {'content-type': 'application/json'}
1011 response = requests.request(
1012 "DELETE", url, data=json.dumps(data), headers=headers,
1013 auth=('admin', 'admin'))
1014 self.assertEqual(response.status_code, requests.codes.ok)
1015 # Delete in the clli-network
1016 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1017 .format(self.restconf_baseurl))
1019 headers = {'content-type': 'application/json'}
1020 response = requests.request(
1021 "DELETE", url, data=json.dumps(data), headers=headers,
1022 auth=('admin', 'admin'))
1023 self.assertEqual(response.status_code, requests.codes.ok)
1025 def test_31_disconnect_ROADMC(self):
1026 # Delete in the topology-netconf
1027 url = ("{}/config/network-topology:"
1028 "network-topology/topology/topology-netconf/node/ROADM-C1"
1029 .format(self.restconf_baseurl))
1031 headers = {'content-type': 'application/json'}
1032 response = requests.request(
1033 "DELETE", url, data=json.dumps(data), headers=headers,
1034 auth=('admin', 'admin'))
1035 self.assertEqual(response.status_code, requests.codes.ok)
1036 # Delete in the clli-network
1037 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1038 .format(self.restconf_baseurl))
1040 headers = {'content-type': 'application/json'}
1041 response = requests.request(
1042 "DELETE", url, data=json.dumps(data), headers=headers,
1043 auth=('admin', 'admin'))
1044 self.assertEqual(response.status_code, requests.codes.ok)
1046 # def test_24_check_roadm2roadm_links_deletion(self):
1047 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1048 # .format(self.restconf_baseurl))
1049 # headers = {'content-type': 'application/json'}
1050 # response = requests.request(
1051 # "GET", url, headers=headers, auth=('admin', 'admin'))
1052 # self.assertEqual(response.status_code, requests.codes.ok)
1053 # res = response.json()
1054 # #Write the response in the log
1055 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1056 # outfile1.write(str(res))
1057 # #Tests related to links
1058 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1059 # self.assertEqual(nbLink,8)
1060 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1061 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1062 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1063 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1064 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1065 # for i in range(0,nbLink):
1066 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1067 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1068 # if(nodeType=='EXPRESS-LINK'):
1069 # find= linkId in expressLink
1070 # self.assertEqual(find, True)
1071 # expressLink.remove(linkId)
1072 # elif(nodeType=='ADD-LINK'):
1073 # find= linkId in addLink
1074 # self.assertEqual(find, True)
1075 # addLink.remove(linkId)
1076 # elif(nodeType=='DROP-LINK'):
1077 # find= linkId in dropLink
1078 # self.assertEqual(find, True)
1079 # dropLink.remove(linkId)
1080 # elif(nodeType=='XPONDER-INPUT'):
1081 # find= linkId in XPDR_IN
1082 # self.assertEqual(find, True)
1083 # XPDR_IN.remove(linkId)
1084 # elif(nodeType=='XPONDER-OUTPUT'):
1085 # find= linkId in XPDR_OUT
1086 # self.assertEqual(find, True)
1087 # XPDR_OUT.remove(linkId)
1089 # self.assertFalse(True)
1090 # self.assertEqual(len(expressLink),0)
1091 # self.assertEqual(len(addLink),0)
1092 # self.assertEqual(len(dropLink),0)
1093 # self.assertEqual(len(XPDR_IN),0)
1094 # self.assertEqual(len(XPDR_OUT),0)
1096 # for i in range(0,nbLink):
1097 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
1098 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1099 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1100 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1101 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1102 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1103 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1105 def test_32_getNodes_OpenRoadmTopology(self):
1106 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1107 .format(self.restconf_baseurl))
1108 headers = {'content-type': 'application/json'}
1109 response = requests.request(
1110 "GET", url, headers=headers, auth=('admin', 'admin'))
1111 res = response.json()
1112 # Tests related to nodes
1113 self.assertEqual(response.status_code, requests.codes.ok)
1114 nbNode = len(res['network'][0]['node'])
1115 self.assertEqual(nbNode, 5)
1116 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1117 for i in range(0, nbNode):
1118 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1119 nodeId = res['network'][0]['node'][i]['node-id']
1120 # Tests related to XPDRA nodes
1121 if(nodeId == 'XPDR-A1-XPDR1'):
1122 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1123 for j in range(0, nbTp):
1124 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1125 if (tpid == 'XPDR1-CLIENT1'):
1126 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1127 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1128 if (tpid == 'XPDR1-NETWORK1'):
1129 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1130 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1131 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1132 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1133 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
1134 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
1135 res['network'][0]['node'][i]['supporting-node'])
1136 listNode.remove(nodeId)
1137 elif(nodeId == 'ROADM-A1-SRG1'):
1138 # Test related to SRG1
1139 self.assertEqual(nodeType, 'SRG')
1140 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1141 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1142 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1143 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1144 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1145 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1146 res['network'][0]['node'][i]['supporting-node'])
1147 listNode.remove(nodeId)
1148 elif(nodeId == 'ROADM-A1-SRG3'):
1149 # Test related to SRG1
1150 self.assertEqual(nodeType, 'SRG')
1151 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1152 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1153 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1154 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1155 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1156 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1157 res['network'][0]['node'][i]['supporting-node'])
1158 listNode.remove(nodeId)
1159 elif(nodeId == 'ROADM-A1-DEG1'):
1160 # Test related to DEG1
1161 self.assertEqual(nodeType, 'DEGREE')
1162 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1163 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1164 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1165 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1166 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1167 res['network'][0]['node'][i]['supporting-node'])
1168 listNode.remove(nodeId)
1169 elif(nodeId == 'ROADM-A1-DEG2'):
1170 # Test related to DEG2
1171 self.assertEqual(nodeType, 'DEGREE')
1172 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1173 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1174 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1175 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1176 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1177 res['network'][0]['node'][i]['supporting-node'])
1178 listNode.remove(nodeId)
1180 self.assertFalse(True)
1181 self.assertEqual(len(listNode), 0)
1182 # Test related to SRG1 of ROADMC
1183 for i in range(0, nbNode):
1184 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-SRG1')
1185 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG1')
1186 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG2')
1188 def test_33_getOpenRoadmNetwork(self):
1189 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1190 .format(self.restconf_baseurl))
1191 headers = {'content-type': 'application/json'}
1192 response = requests.request(
1193 "GET", url, headers=headers, auth=('admin', 'admin'))
1194 self.assertEqual(response.status_code, requests.codes.ok)
1195 res = response.json()
1196 nbNode = len(res['network'][0]['node'])
1197 self.assertEqual(nbNode, 2)
1198 for i in range(0, nbNode-1):
1199 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1')
1200 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-B1')
1202 def test_34_getClliNetwork(self):
1203 url = ("{}/config/ietf-network:networks/network/clli-network"
1204 .format(self.restconf_baseurl))
1205 headers = {'content-type': 'application/json'}
1206 response = requests.request(
1207 "GET", url, headers=headers, auth=('admin', 'admin'))
1208 self.assertEqual(response.status_code, requests.codes.ok)
1209 res = response.json()
1210 nbNode = len(res['network'][0]['node'])
1211 self.assertEqual(nbNode, 1)
1212 for i in range(0, nbNode-1):
1213 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1215 def test_35_disconnect_XPDRA(self):
1216 url = ("{}/config/network-topology:"
1217 "network-topology/topology/topology-netconf/node/XPDR-A1"
1218 .format(self.restconf_baseurl))
1220 headers = {'content-type': 'application/json'}
1221 response = requests.request(
1222 "DELETE", url, data=json.dumps(data), headers=headers,
1223 auth=('admin', 'admin'))
1224 self.assertEqual(response.status_code, requests.codes.ok)
1226 def test_36_getClliNetwork(self):
1227 url = ("{}/config/ietf-network:networks/network/clli-network"
1228 .format(self.restconf_baseurl))
1229 headers = {'content-type': 'application/json'}
1230 response = requests.request(
1231 "GET", url, headers=headers, auth=('admin', 'admin'))
1232 self.assertEqual(response.status_code, requests.codes.ok)
1233 res = response.json()
1234 nbNode = len(res['network'][0]['node'])
1235 self.assertEqual(nbNode, 1)
1236 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1238 def test_37_getOpenRoadmNetwork(self):
1239 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1240 .format(self.restconf_baseurl))
1241 headers = {'content-type': 'application/json'}
1242 response = requests.request(
1243 "GET", url, headers=headers, auth=('admin', 'admin'))
1244 self.assertEqual(response.status_code, requests.codes.ok)
1245 res = response.json()
1246 nbNode = len(res['network'][0]['node'])
1247 self.assertEqual(nbNode, 1)
1248 for i in range(0, nbNode):
1249 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDR-A1')
1251 def test_38_getNodes_OpenRoadmTopology(self):
1252 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1253 .format(self.restconf_baseurl))
1254 headers = {'content-type': 'application/json'}
1255 response = requests.request(
1256 "GET", url, headers=headers, auth=('admin', 'admin'))
1257 res = response.json()
1258 # Tests related to nodes
1259 self.assertEqual(response.status_code, requests.codes.ok)
1260 nbNode = len(res['network'][0]['node'])
1261 self.assertEqual(nbNode, 4)
1262 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1263 for i in range(0, nbNode):
1264 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1265 res['network'][0]['node'][i]['supporting-node'])
1266 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1267 nodeId = res['network'][0]['node'][i]['node-id']
1268 if(nodeId == 'ROADM-A1-SRG1'):
1269 # Test related to SRG1
1270 self.assertEqual(nodeType, 'SRG')
1271 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1272 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1273 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1274 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1275 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1276 listNode.remove(nodeId)
1277 elif(nodeId == 'ROADM-A1-SRG3'):
1278 # Test related to SRG1
1279 self.assertEqual(nodeType, 'SRG')
1280 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1281 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1282 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1283 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1284 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1285 listNode.remove(nodeId)
1286 elif(nodeId == 'ROADM-A1-DEG1'):
1287 # Test related to DEG1
1288 self.assertEqual(nodeType, 'DEGREE')
1289 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1290 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1291 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1292 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1293 listNode.remove(nodeId)
1294 elif(nodeId == 'ROADM-A1-DEG2'):
1295 # Test related to DEG2
1296 self.assertEqual(nodeType, 'DEGREE')
1297 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1298 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1299 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1300 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1301 listNode.remove(nodeId)
1303 self.assertFalse(True)
1304 self.assertEqual(len(listNode), 0)
1306 def test_39_disconnect_ROADM_XPDRA_link(self):
1308 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1309 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1310 .format(self.restconf_baseurl))
1312 headers = {'content-type': 'application/json'}
1313 response = requests.request(
1314 "DELETE", url, data=json.dumps(data), headers=headers,
1315 auth=('admin', 'admin'))
1316 self.assertEqual(response.status_code, requests.codes.ok)
1318 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1319 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1320 .format(self.restconf_baseurl))
1322 headers = {'content-type': 'application/json'}
1323 response = requests.request(
1324 "DELETE", url, data=json.dumps(data), headers=headers,
1325 auth=('admin', 'admin'))
1326 self.assertEqual(response.status_code, requests.codes.ok)
1328 def test_40_getLinks_OpenRoadmTopology(self):
1329 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1330 .format(self.restconf_baseurl))
1331 headers = {'content-type': 'application/json'}
1332 response = requests.request(
1333 "GET", url, headers=headers, auth=('admin', 'admin'))
1334 self.assertEqual(response.status_code, requests.codes.ok)
1335 res = response.json()
1336 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1337 self.assertEqual(nbLink, 16)
1338 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1339 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1340 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1341 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1342 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1343 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1344 roadmtoroadmLink = 0
1345 for i in range(0, nbLink):
1346 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1347 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1348 find = link_id in expressLink
1349 self.assertEqual(find, True)
1350 expressLink.remove(link_id)
1351 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1352 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1353 find = link_id in addLink
1354 self.assertEqual(find, True)
1355 addLink.remove(link_id)
1356 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1357 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1358 find = link_id in dropLink
1359 self.assertEqual(find, True)
1360 dropLink.remove(link_id)
1362 roadmtoroadmLink += 1
1363 self.assertEqual(len(expressLink), 0)
1364 self.assertEqual(len(addLink), 0)
1365 self.assertEqual(len(dropLink), 0)
1366 self.assertEqual(roadmtoroadmLink, 6)
1367 for i in range(0, nbLink):
1368 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1369 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1370 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1371 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1373 def test_41_disconnect_ROADMA(self):
1374 url = ("{}/config/network-topology:"
1375 "network-topology/topology/topology-netconf/node/ROADM-A1"
1376 .format(self.restconf_baseurl))
1378 headers = {'content-type': 'application/json'}
1379 response = requests.request(
1380 "DELETE", url, data=json.dumps(data), headers=headers,
1381 auth=('admin', 'admin'))
1382 self.assertEqual(response.status_code, requests.codes.ok)
1383 # Delete in the clli-network
1384 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1385 .format(self.restconf_baseurl))
1387 headers = {'content-type': 'application/json'}
1388 response = requests.request(
1389 "DELETE", url, data=json.dumps(data), headers=headers,
1390 auth=('admin', 'admin'))
1391 self.assertEqual(response.status_code, requests.codes.ok)
1393 def test_42_getClliNetwork(self):
1394 url = ("{}/config/ietf-network:networks/network/clli-network"
1395 .format(self.restconf_baseurl))
1396 headers = {'content-type': 'application/json'}
1397 response = requests.request(
1398 "GET", url, headers=headers, auth=('admin', 'admin'))
1399 self.assertEqual(response.status_code, requests.codes.ok)
1400 res = response.json()
1401 self.assertNotIn('node', res['network'][0])
1403 def test_43_getOpenRoadmNetwork(self):
1404 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1405 .format(self.restconf_baseurl))
1406 headers = {'content-type': 'application/json'}
1407 response = requests.request(
1408 "GET", url, headers=headers, auth=('admin', 'admin'))
1409 self.assertEqual(response.status_code, requests.codes.ok)
1410 res = response.json()
1411 self.assertNotIn('node', res['network'][0])
1413 def test_44_check_roadm2roadm_link_persistence(self):
1414 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1415 .format(self.restconf_baseurl))
1416 headers = {'content-type': 'application/json'}
1417 response = requests.request(
1418 "GET", url, headers=headers, auth=('admin', 'admin'))
1419 self.assertEqual(response.status_code, requests.codes.ok)
1420 res = response.json()
1421 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1422 self.assertNotIn('node', res['network'][0])
1423 self.assertEqual(nbLink, 6)
1426 if __name__ == "__main__":
1427 unittest.main(verbosity=2)