3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
21 class TransportPCETopologyTesting(unittest.TestCase):
23 honeynode_process1 = None
24 honeynode_process2 = None
25 honeynode_process3 = None
26 honeynode_process4 = None
28 restconf_baseurl = "http://localhost:8181/restconf"
30 # START_IGNORE_XTESTING
34 print("starting honeynode1...")
35 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
38 print("starting honeynode2...")
39 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
42 print("starting honeynode3...")
43 cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
46 print("starting honeynode4...")
47 cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
49 print("all honeynodes started")
51 print("starting opendaylight...")
52 cls.odl_process = test_utils.start_tpce()
54 print("opendaylight started")
57 def tearDownClass(cls):
58 for child in psutil.Process(cls.odl_process.pid).children():
59 child.send_signal(signal.SIGINT)
61 cls.odl_process.send_signal(signal.SIGINT)
62 cls.odl_process.wait()
63 for child in psutil.Process(cls.honeynode_process1.pid).children():
64 child.send_signal(signal.SIGINT)
66 cls.honeynode_process1.send_signal(signal.SIGINT)
67 cls.honeynode_process1.wait()
68 for child in psutil.Process(cls.honeynode_process2.pid).children():
69 child.send_signal(signal.SIGINT)
71 cls.honeynode_process2.send_signal(signal.SIGINT)
72 cls.honeynode_process2.wait()
74 for child in psutil.Process(cls.honeynode_process3.pid).children():
75 child.send_signal(signal.SIGINT)
77 cls.honeynode_process3.send_signal(signal.SIGINT)
78 cls.honeynode_process3.wait()
80 for child in psutil.Process(cls.honeynode_process4.pid).children():
81 child.send_signal(signal.SIGINT)
83 cls.honeynode_process4.send_signal(signal.SIGINT)
84 cls.honeynode_process4.wait()
91 def test_01_connect_ROADMA(self):
93 url = ("{}/config/network-topology:"
94 "network-topology/topology/topology-netconf/node/ROADMA01"
95 .format(self.restconf_baseurl))
97 "node-id": "ROADMA01",
98 "netconf-node-topology:username": "admin",
99 "netconf-node-topology:password": "admin",
100 "netconf-node-topology:host": "127.0.0.1",
101 "netconf-node-topology:port": "17831",
102 "netconf-node-topology:tcp-only": "false",
103 "netconf-node-topology:pass-through": {}}]}
104 headers = {'content-type': 'application/json'}
105 response = requests.request(
106 "PUT", url, data=json.dumps(data), headers=headers,
107 auth=('admin', 'admin'))
108 self.assertEqual(response.status_code, requests.codes.created)
111 def test_02_getClliNetwork(self):
112 url = ("{}/config/ietf-network:networks/network/clli-network"
113 .format(self.restconf_baseurl))
114 headers = {'content-type': 'application/json'}
115 response = requests.request(
116 "GET", url, headers=headers, auth=('admin', 'admin'))
117 self.assertEqual(response.status_code, requests.codes.ok)
118 res = response.json()
119 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
120 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
122 def test_03_getOpenRoadmNetwork(self):
123 url = ("{}/config/ietf-network:networks/network/openroadm-network"
124 .format(self.restconf_baseurl))
125 headers = {'content-type': 'application/json'}
126 response = requests.request(
127 "GET", url, headers=headers, auth=('admin', 'admin'))
128 self.assertEqual(response.status_code, requests.codes.ok)
129 res = response.json()
130 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADMA01')
131 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
132 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
133 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
134 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], '2')
136 def test_04_getLinks_OpenroadmTopology(self):
137 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
138 .format(self.restconf_baseurl))
139 headers = {'content-type': 'application/json'}
140 response = requests.request(
141 "GET", url, headers=headers, auth=('admin', 'admin'))
142 self.assertEqual(response.status_code, requests.codes.ok)
143 res = response.json()
144 # Tests related to links
145 nbLink = len(res['network'][0]['ietf-network-topology:link'])
146 self.assertEqual(nbLink, 10)
147 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
148 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
149 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
150 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
151 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
152 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
153 for i in range(0, nbLink):
154 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
155 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK':
156 find = linkId in expressLink
157 self.assertEqual(find, True)
158 expressLink.remove(linkId)
159 elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK':
160 find = linkId in addLink
161 self.assertEqual(find, True)
162 addLink.remove(linkId)
163 elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK':
164 find = linkId in dropLink
165 self.assertEqual(find, True)
166 dropLink.remove(linkId)
168 self.assertFalse(True)
169 self.assertEqual(len(expressLink), 0)
170 self.assertEqual(len(addLink), 0)
171 self.assertEqual(len(dropLink), 0)
173 def test_05_getNodes_OpenRoadmTopology(self):
174 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
175 .format(self.restconf_baseurl))
176 headers = {'content-type': 'application/json'}
177 response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
178 res = response.json()
179 # Tests related to nodes
180 self.assertEqual(response.status_code, requests.codes.ok)
181 nbNode = len(res['network'][0]['node'])
182 self.assertEqual(nbNode, 4)
183 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
184 for i in range(0, nbNode):
185 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
186 res['network'][0]['node'][i]['supporting-node'])
187 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
188 nodeId = res['network'][0]['node'][i]['node-id']
189 if nodeId == 'ROADMA01-SRG1':
190 # Test related to SRG1
191 self.assertEqual(nodeType, 'SRG')
192 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
193 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
194 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
195 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
196 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
197 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
198 res['network'][0]['node'][i]['supporting-node'])
199 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
200 res['network'][0]['node'][i]['supporting-node'])
201 listNode.remove(nodeId)
202 elif nodeId == 'ROADMA01-SRG3':
203 # Test related to SRG1
204 self.assertEqual(nodeType, 'SRG')
205 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
206 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
207 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
208 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
209 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
210 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
211 res['network'][0]['node'][i]['supporting-node'])
212 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
213 res['network'][0]['node'][i]['supporting-node'])
214 listNode.remove(nodeId)
215 elif nodeId == 'ROADMA01-DEG1':
216 # Test related to DEG1
217 self.assertEqual(nodeType, 'DEGREE')
218 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
219 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
220 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
221 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
222 listNode.remove(nodeId)
223 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
224 res['network'][0]['node'][i]['supporting-node'])
225 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
226 res['network'][0]['node'][i]['supporting-node'])
227 elif nodeId == 'ROADMA01-DEG2':
228 # Test related to DEG2
229 self.assertEqual(nodeType, 'DEGREE')
230 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
231 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
232 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
233 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
234 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
235 res['network'][0]['node'][i]['supporting-node'])
236 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
237 res['network'][0]['node'][i]['supporting-node'])
238 listNode.remove(nodeId)
240 self.assertFalse(True)
241 self.assertEqual(len(listNode), 0)
243 def test_06_connect_XPDRA(self):
244 url = ("{}/config/network-topology:"
245 "network-topology/topology/topology-netconf/node/XPDRA01"
246 .format(self.restconf_baseurl))
248 "node-id": "XPDRA01",
249 "netconf-node-topology:username": "admin",
250 "netconf-node-topology:password": "admin",
251 "netconf-node-topology:host": "127.0.0.1",
252 "netconf-node-topology:port": "17830",
253 "netconf-node-topology:tcp-only": "false",
254 "netconf-node-topology:pass-through": {}}]}
255 headers = {'content-type': 'application/json'}
256 response = requests.request(
257 "PUT", url, data=json.dumps(data), headers=headers,
258 auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.created)
262 def test_07_getClliNetwork(self):
263 url = ("{}/config/ietf-network:networks/network/clli-network"
264 .format(self.restconf_baseurl))
265 headers = {'content-type': 'application/json'}
266 response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
270 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
272 def test_08_getOpenRoadmNetwork(self):
273 url = ("{}/config/ietf-network:networks/network/openroadm-network"
274 .format(self.restconf_baseurl))
275 headers = {'content-type': 'application/json'}
276 response = requests.request(
277 "GET", url, headers=headers, auth=('admin', 'admin'))
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 nbNode = len(res['network'][0]['node'])
281 self.assertEqual(nbNode, 2)
282 for i in range(0, nbNode):
283 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
284 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
285 nodeId = res['network'][0]['node'][i]['node-id']
286 if(nodeId == 'XPDRA01'):
287 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
288 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
289 elif(nodeId == 'ROADMA01'):
290 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
291 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
293 self.assertFalse(True)
295 def test_09_getNodes_OpenRoadmTopology(self):
296 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
297 .format(self.restconf_baseurl))
298 headers = {'content-type': 'application/json'}
299 response = requests.request(
300 "GET", url, headers=headers, auth=('admin', 'admin'))
301 res = response.json()
302 # Tests related to nodes
303 self.assertEqual(response.status_code, requests.codes.ok)
304 nbNode = len(res['network'][0]['node'])
305 self.assertEqual(nbNode, 5)
306 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
307 for i in range(0, nbNode):
308 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
309 nodeId = res['network'][0]['node'][i]['node-id']
310 # Tests related to XPDRA nodes
311 if(nodeId == 'XPDRA01-XPDR1'):
312 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
313 res['network'][0]['node'][i]['supporting-node'])
314 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
315 res['network'][0]['node'][i]['supporting-node'])
316 self.assertEqual(nodeType, 'XPONDER')
317 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
320 for j in range(0, nbTps):
321 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
322 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
323 if (tpType == 'XPONDER-CLIENT'):
325 elif (tpType == 'XPONDER-NETWORK'):
327 if (tpId == 'XPDR1-NETWORK2'):
328 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
329 [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
330 if (tpId == 'XPDR1-CLIENT3'):
331 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
332 [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
333 self.assertTrue(client == 4)
334 self.assertTrue(network == 2)
335 listNode.remove(nodeId)
336 elif(nodeId == 'ROADMA01-SRG1'):
337 # Test related to SRG1
338 self.assertEqual(nodeType, 'SRG')
339 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
340 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
341 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
342 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
343 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
344 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
345 res['network'][0]['node'][i]['supporting-node'])
346 listNode.remove(nodeId)
347 elif(nodeId == 'ROADMA01-SRG3'):
348 # Test related to SRG1
349 self.assertEqual(nodeType, 'SRG')
350 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
351 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
352 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
353 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
354 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
355 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
356 res['network'][0]['node'][i]['supporting-node'])
357 listNode.remove(nodeId)
358 elif(nodeId == 'ROADMA01-DEG1'):
359 # Test related to DEG1
360 self.assertEqual(nodeType, 'DEGREE')
361 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
362 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
366 res['network'][0]['node'][i]['supporting-node'])
367 listNode.remove(nodeId)
368 elif(nodeId == 'ROADMA01-DEG2'):
369 # Test related to DEG2
370 self.assertEqual(nodeType, 'DEGREE')
371 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
376 res['network'][0]['node'][i]['supporting-node'])
377 listNode.remove(nodeId)
379 self.assertFalse(True)
380 self.assertEqual(len(listNode), 0)
382 # Connect the tail XPDRA to ROADMA and vice versa
383 def test_10_connect_tail_xpdr_rdm(self):
384 # Connect the tail: XPDRA to ROADMA
385 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
386 .format(self.restconf_baseurl))
387 data = {"networkutils:input": {
388 "networkutils:links-input": {
389 "networkutils:xpdr-node": "XPDRA01",
390 "networkutils:xpdr-num": "1",
391 "networkutils:network-num": "1",
392 "networkutils:rdm-node": "ROADMA01",
393 "networkutils:srg-num": "1",
394 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
398 headers = {'content-type': 'application/json'}
399 response = requests.request(
400 "POST", url, data=json.dumps(data), headers=headers,
401 auth=('admin', 'admin'))
402 self.assertEqual(response.status_code, requests.codes.ok)
404 def test_11_connect_tail_rdm_xpdr(self):
405 # Connect the tail: ROADMA to XPDRA
406 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
407 .format(self.restconf_baseurl))
408 data = {"networkutils:input": {
409 "networkutils:links-input": {
410 "networkutils:xpdr-node": "XPDRA01",
411 "networkutils:xpdr-num": "1",
412 "networkutils:network-num": "1",
413 "networkutils:rdm-node": "ROADMA01",
414 "networkutils:srg-num": "1",
415 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
419 headers = {'content-type': 'application/json'}
420 response = requests.request(
421 "POST", url, data=json.dumps(data), headers=headers,
422 auth=('admin', 'admin'))
423 self.assertEqual(response.status_code, requests.codes.ok)
425 def test_12_getLinks_OpenRoadmTopology(self):
426 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
427 .format(self.restconf_baseurl))
428 headers = {'content-type': 'application/json'}
429 response = requests.request(
430 "GET", url, headers=headers, auth=('admin', 'admin'))
431 self.assertEqual(response.status_code, requests.codes.ok)
432 res = response.json()
433 # Tests related to links
434 nbLink = len(res['network'][0]['ietf-network-topology:link'])
435 self.assertEqual(nbLink, 12)
436 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
437 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
438 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
439 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
440 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
441 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
442 XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
443 XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
444 for i in range(0, nbLink):
445 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
446 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
447 if(nodeType == 'EXPRESS-LINK'):
448 find = linkId in expressLink
449 self.assertEqual(find, True)
450 expressLink.remove(linkId)
451 elif(nodeType == 'ADD-LINK'):
452 find = linkId in addLink
453 self.assertEqual(find, True)
454 addLink.remove(linkId)
455 elif(nodeType == 'DROP-LINK'):
456 find = linkId in dropLink
457 self.assertEqual(find, True)
458 dropLink.remove(linkId)
459 elif(nodeType == 'XPONDER-INPUT'):
460 find = linkId in XPDR_IN
461 self.assertEqual(find, True)
462 XPDR_IN.remove(linkId)
463 elif(nodeType == 'XPONDER-OUTPUT'):
464 find = linkId in XPDR_OUT
465 self.assertEqual(find, True)
466 XPDR_OUT.remove(linkId)
468 self.assertFalse(True)
469 self.assertEqual(len(expressLink), 0)
470 self.assertEqual(len(addLink), 0)
471 self.assertEqual(len(dropLink), 0)
472 self.assertEqual(len(XPDR_IN), 0)
473 self.assertEqual(len(XPDR_OUT), 0)
475 def test_13_connect_ROADMC(self):
477 url = ("{}/config/network-topology:"
478 "network-topology/topology/topology-netconf/node/ROADMC01"
479 .format(self.restconf_baseurl))
481 "node-id": "ROADMC01",
482 "netconf-node-topology:username": "admin",
483 "netconf-node-topology:password": "admin",
484 "netconf-node-topology:host": "127.0.0.1",
485 "netconf-node-topology:port": "17833",
486 "netconf-node-topology:tcp-only": "false",
487 "netconf-node-topology:pass-through": {}}]}
488 headers = {'content-type': 'application/json'}
489 response = requests.request(
490 "PUT", url, data=json.dumps(data), headers=headers,
491 auth=('admin', 'admin'))
492 self.assertEqual(response.status_code, requests.codes.created)
495 def test_14_omsAttributes_ROADMA_ROADMC(self):
496 # Config ROADMA01-ROADMC01 oms-attributes
497 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
498 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
499 "OMS-attributes/span"
500 .format(self.restconf_baseurl))
502 "auto-spanloss": "true",
503 "engineered-spanloss": 12.2,
504 "link-concatenation": [{
507 "SRLG-length": 100000,
509 headers = {'content-type': 'application/json'}
510 response = requests.request(
511 "PUT", url, data=json.dumps(data), headers=headers,
512 auth=('admin', 'admin'))
513 self.assertEqual(response.status_code, requests.codes.created)
515 def test_15_omsAttributes_ROADMC_ROADMA(self):
516 # Config ROADMC01-ROADMA oms-attributes
517 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
518 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
519 "OMS-attributes/span"
520 .format(self.restconf_baseurl))
522 "auto-spanloss": "true",
523 "engineered-spanloss": 12.2,
524 "link-concatenation": [{
527 "SRLG-length": 100000,
529 headers = {'content-type': 'application/json'}
530 response = requests.request(
531 "PUT", url, data=json.dumps(data), headers=headers,
532 auth=('admin', 'admin'))
533 self.assertEqual(response.status_code, requests.codes.created)
535 def test_16_getClliNetwork(self):
536 url = ("{}/config/ietf-network:networks/network/clli-network"
537 .format(self.restconf_baseurl))
538 headers = {'content-type': 'application/json'}
539 response = requests.request(
540 "GET", url, headers=headers, auth=('admin', 'admin'))
541 self.assertEqual(response.status_code, requests.codes.ok)
542 res = response.json()
543 nbNode = len(res['network'][0]['node'])
544 listNode = ['NodeA', 'NodeC']
545 for i in range(0, nbNode):
546 nodeId = res['network'][0]['node'][i]['node-id']
547 find = nodeId in listNode
548 self.assertEqual(find, True)
549 if(nodeId == 'NodeA'):
550 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
552 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
553 listNode.remove(nodeId)
555 self.assertEqual(len(listNode), 0)
557 def test_17_getOpenRoadmNetwork(self):
558 url = ("{}/config/ietf-network:networks/network/openroadm-network"
559 .format(self.restconf_baseurl))
560 headers = {'content-type': 'application/json'}
561 response = requests.request(
562 "GET", url, headers=headers, auth=('admin', 'admin'))
563 self.assertEqual(response.status_code, requests.codes.ok)
564 res = response.json()
565 nbNode = len(res['network'][0]['node'])
566 self.assertEqual(nbNode, 3)
567 listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
568 for i in range(0, nbNode):
569 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
570 nodeId = res['network'][0]['node'][i]['node-id']
571 if(nodeId == 'XPDRA01'):
572 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
573 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
574 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
575 listNode.remove(nodeId)
576 elif(nodeId == 'ROADMA01'):
577 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
578 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
579 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
580 listNode.remove(nodeId)
581 elif(nodeId == 'ROADMC01'):
582 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
583 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
584 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
585 listNode.remove(nodeId)
587 self.assertFalse(True)
588 self.assertEqual(len(listNode), 0)
590 def test_18_getROADMLinkOpenRoadmTopology(self):
591 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
592 .format(self.restconf_baseurl))
593 headers = {'content-type': 'application/json'}
594 response = requests.request(
595 "GET", url, headers=headers, auth=('admin', 'admin'))
596 self.assertEqual(response.status_code, requests.codes.ok)
597 res = response.json()
598 # Tests related to links
599 nbLink = len(res['network'][0]['ietf-network-topology:link'])
600 self.assertEqual(nbLink, 20)
601 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX', 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
602 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX', 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX']
603 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
604 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
605 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX', 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX']
606 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
607 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
608 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX', 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX']
609 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
610 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
611 XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
612 XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
613 for i in range(0, nbLink):
614 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
615 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
616 if(nodeType == 'EXPRESS-LINK'):
617 find = linkId in expressLink
618 self.assertEqual(find, True)
619 expressLink.remove(linkId)
620 elif(nodeType == 'ADD-LINK'):
621 find = linkId in addLink
622 self.assertEqual(find, True)
623 addLink.remove(linkId)
624 elif(nodeType == 'DROP-LINK'):
625 find = linkId in dropLink
626 self.assertEqual(find, True)
627 dropLink.remove(linkId)
628 elif(nodeType == 'ROADM-TO-ROADM'):
629 find = linkId in R2RLink
630 self.assertEqual(find, True)
631 R2RLink.remove(linkId)
632 elif(nodeType == 'XPONDER-INPUT'):
633 find = linkId in XPDR_IN
634 self.assertEqual(find, True)
635 XPDR_IN.remove(linkId)
636 elif(nodeType == 'XPONDER-OUTPUT'):
637 find = linkId in XPDR_OUT
638 self.assertEqual(find, True)
639 XPDR_OUT.remove(linkId)
641 self.assertFalse(True)
642 self.assertEqual(len(expressLink), 0)
643 self.assertEqual(len(addLink), 0)
644 self.assertEqual(len(dropLink), 0)
645 self.assertEqual(len(R2RLink), 0)
646 self.assertEqual(len(XPDR_IN), 0)
647 self.assertEqual(len(XPDR_OUT), 0)
649 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
650 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
651 .format(self.restconf_baseurl))
652 headers = {'content-type': 'application/json'}
653 response = requests.request(
654 "GET", url, headers=headers, auth=('admin', 'admin'))
655 self.assertEqual(response.status_code, requests.codes.ok)
656 res = response.json()
657 # Tests related to links
658 nbLink = len(res['network'][0]['ietf-network-topology:link'])
659 self.assertEqual(nbLink, 20)
660 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
661 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
662 for i in range(0, nbLink):
663 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
664 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
665 if(link_id in R2RLink):
667 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
668 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
669 length = res['network'][0]['ietf-network-topology:link'][i][
670 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
671 if((spanLoss != None) & (length != None)):
673 self.assertTrue(find)
674 R2RLink.remove(link_id)
675 self.assertEqual(len(R2RLink), 0)
677 def test_20_getNodes_OpenRoadmTopology(self):
678 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
679 .format(self.restconf_baseurl))
680 headers = {'content-type': 'application/json'}
681 response = requests.request(
682 "GET", url, headers=headers, auth=('admin', 'admin'))
683 res = response.json()
684 # Tests related to nodes
685 self.assertEqual(response.status_code, requests.codes.ok)
686 nbNode = len(res['network'][0]['node'])
687 self.assertEqual(nbNode, 8)
688 listNode = ['XPDRA01-XPDR1',
689 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
690 'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
691 # ************************Tests related to XPDRA nodes
692 for i in range(0, nbNode):
693 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
694 nodeId = res['network'][0]['node'][i]['node-id']
695 if(nodeId == 'XPDRA01-XPDR1'):
696 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
697 res['network'][0]['node'][i]['supporting-node'])
698 self.assertEqual(nodeType, 'XPONDER')
699 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
700 self.assertTrue(nbTps == 6)
703 for j in range(0, nbTps):
704 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
705 if (tpType == 'XPONDER-CLIENT'):
707 elif (tpType == 'XPONDER-NETWORK'):
709 self.assertTrue(client == 4)
710 self.assertTrue(network == 2)
711 listNode.remove(nodeId)
712 elif(nodeId == 'ROADMA01-SRG1'):
713 # Test related to SRG1
714 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
715 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
716 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
717 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
718 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
719 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
720 res['network'][0]['node'][i]['supporting-node'])
721 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
722 listNode.remove(nodeId)
723 elif(nodeId == 'ROADMA01-SRG3'):
724 # Test related to SRG1
725 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
726 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
727 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
728 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
729 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
730 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
731 res['network'][0]['node'][i]['supporting-node'])
732 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
733 listNode.remove(nodeId)
734 elif(nodeId == 'ROADMA01-DEG1'):
735 # Test related to DEG1
736 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
737 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
738 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
739 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
740 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
741 res['network'][0]['node'][i]['supporting-node'])
742 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
743 listNode.remove(nodeId)
744 elif(nodeId == 'ROADMA01-DEG2'):
745 # Test related to DEG2
746 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
747 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
748 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
749 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
750 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
751 res['network'][0]['node'][i]['supporting-node'])
752 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
753 listNode.remove(nodeId)
754 elif(nodeId == 'ROADMC01-SRG1'):
755 # Test related to SRG1
756 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
757 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
758 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
759 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
760 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
761 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
762 res['network'][0]['node'][i]['supporting-node'])
763 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
764 listNode.remove(nodeId)
765 elif(nodeId == 'ROADMC01-DEG1'):
766 # Test related to DEG1
767 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
768 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
769 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
770 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
771 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
772 res['network'][0]['node'][i]['supporting-node'])
773 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
774 listNode.remove(nodeId)
775 elif(nodeId == 'ROADMC01-DEG2'):
776 # Test related to DEG2
777 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
778 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
779 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
780 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
781 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
782 res['network'][0]['node'][i]['supporting-node'])
783 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
784 listNode.remove(nodeId)
786 self.assertFalse(True)
787 self.assertEqual(len(listNode), 0)
789 def test_21_connect_ROADMB(self):
790 url = ("{}/config/network-topology:"
791 "network-topology/topology/topology-netconf/node/ROADMB01"
792 .format(self.restconf_baseurl))
794 "node-id": "ROADMB01",
795 "netconf-node-topology:username": "admin",
796 "netconf-node-topology:password": "admin",
797 "netconf-node-topology:host": "127.0.0.1",
798 "netconf-node-topology:port": "17832",
799 "netconf-node-topology:tcp-only": "false",
800 "netconf-node-topology:pass-through": {}}]}
801 headers = {'content-type': 'application/json'}
802 response = requests.request(
803 "PUT", url, data=json.dumps(data), headers=headers,
804 auth=('admin', 'admin'))
805 self.assertEqual(response.status_code, requests.codes.created)
808 def test_22_omsAttributes_ROADMA_ROADMB(self):
809 # Config ROADMA01-ROADMB01 oms-attributes
810 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
811 "link/ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
812 "OMS-attributes/span"
813 .format(self.restconf_baseurl))
815 "auto-spanloss": "true",
816 "engineered-spanloss": 12.2,
817 "link-concatenation": [{
820 "SRLG-length": 100000,
822 headers = {'content-type': 'application/json'}
823 response = requests.request(
824 "PUT", url, data=json.dumps(data), headers=headers,
825 auth=('admin', 'admin'))
826 self.assertEqual(response.status_code, requests.codes.created)
828 def test_23_omsAttributes_ROADMB_ROADMA(self):
829 # Config ROADMB01-ROADMA01 oms-attributes
830 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
831 "link/ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
832 "OMS-attributes/span"
833 .format(self.restconf_baseurl))
835 "auto-spanloss": "true",
836 "engineered-spanloss": 12.2,
837 "link-concatenation": [{
840 "SRLG-length": 100000,
842 headers = {'content-type': 'application/json'}
843 response = requests.request(
844 "PUT", url, data=json.dumps(data), headers=headers,
845 auth=('admin', 'admin'))
846 self.assertEqual(response.status_code, requests.codes.created)
848 def test_24_omsAttributes_ROADMB_ROADMC(self):
849 # Config ROADMB01-ROADMC01 oms-attributes
850 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
851 "link/ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
852 "OMS-attributes/span"
853 .format(self.restconf_baseurl))
855 "auto-spanloss": "true",
856 "engineered-spanloss": 12.2,
857 "link-concatenation": [{
860 "SRLG-length": 100000,
862 headers = {'content-type': 'application/json'}
863 response = requests.request(
864 "PUT", url, data=json.dumps(data), headers=headers,
865 auth=('admin', 'admin'))
866 self.assertEqual(response.status_code, requests.codes.created)
868 def test_25_omsAttributes_ROADMC_ROADMB(self):
869 # Config ROADMC01-ROADMB01 oms-attributes
870 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
871 "link/ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
872 "OMS-attributes/span"
873 .format(self.restconf_baseurl))
875 "auto-spanloss": "true",
876 "engineered-spanloss": 12.2,
877 "link-concatenation": [{
880 "SRLG-length": 100000,
882 headers = {'content-type': 'application/json'}
883 response = requests.request(
884 "PUT", url, data=json.dumps(data), headers=headers,
885 auth=('admin', 'admin'))
886 self.assertEqual(response.status_code, requests.codes.created)
888 def test_26_getClliNetwork(self):
889 url = ("{}/config/ietf-network:networks/network/clli-network"
890 .format(self.restconf_baseurl))
891 headers = {'content-type': 'application/json'}
892 response = requests.request(
893 "GET", url, headers=headers, auth=('admin', 'admin'))
894 self.assertEqual(response.status_code, requests.codes.ok)
895 res = response.json()
896 nbNode = len(res['network'][0]['node'])
897 listNode = ['NodeA', 'NodeB', 'NodeC']
898 for i in range(0, nbNode):
899 nodeId = res['network'][0]['node'][i]['node-id']
900 find = nodeId in listNode
901 self.assertEqual(find, True)
902 if(nodeId == 'NodeA'):
903 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
904 elif(nodeId == 'NodeB'):
905 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
907 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
908 listNode.remove(nodeId)
909 self.assertEqual(len(listNode), 0)
911 def test_27_verifyDegree(self):
912 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
913 .format(self.restconf_baseurl))
914 headers = {'content-type': 'application/json'}
915 response = requests.request(
916 "GET", url, headers=headers, auth=('admin', 'admin'))
917 self.assertEqual(response.status_code, requests.codes.ok)
918 res = response.json()
919 # Tests related to links
920 nbLink = len(res['network'][0]['ietf-network-topology:link'])
921 listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX', 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
922 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX', 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
923 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX', 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
924 for i in range(0, nbLink):
925 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
926 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
927 find = link_id in listR2RLink
928 self.assertEqual(find, True)
929 listR2RLink.remove(link_id)
930 self.assertEqual(len(listR2RLink), 0)
932 def test_28_verifyOppositeLinkTopology(self):
933 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
934 .format(self.restconf_baseurl))
935 headers = {'content-type': 'application/json'}
936 response = requests.request(
937 "GET", url, headers=headers, auth=('admin', 'admin'))
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 # Tests related to links
941 nbLink = len(res['network'][0]['ietf-network-topology:link'])
942 self.assertEqual(nbLink, 34)
943 for i in range(0, nbLink):
944 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
945 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
946 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
947 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
948 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
949 # Find the opposite link
950 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
951 url = (url_oppLink.format(self.restconf_baseurl))
952 headers = {'content-type': 'application/json'}
953 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
954 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
955 res_oppLink = response_oppLink.json()
956 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
957 ['org-openroadm-common-network:opposite-link'], link_id)
958 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
959 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
960 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
961 if link_type == 'ADD-LINK':
962 self.assertEqual(oppLink_type, 'DROP-LINK')
963 elif link_type == 'DROP-LINK':
964 self.assertEqual(oppLink_type, 'ADD-LINK')
965 elif link_type == 'EXPRESS-LINK':
966 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
967 elif link_type == 'ROADM-TO-ROADM':
968 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
969 elif link_type == 'XPONDER-INPUT':
970 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
971 elif link_type == 'XPONDER-OUTPUT':
972 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
974 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
975 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
976 .format(self.restconf_baseurl))
977 headers = {'content-type': 'application/json'}
978 response = requests.request(
979 "GET", url, headers=headers, auth=('admin', 'admin'))
980 self.assertEqual(response.status_code, requests.codes.ok)
981 res = response.json()
982 nbLink = len(res['network'][0]['ietf-network-topology:link'])
983 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
984 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
985 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
986 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
987 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
988 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
989 for i in range(0, nbLink):
990 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
991 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
992 if(link_id in R2RLink):
994 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
995 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
996 length = res['network'][0]['ietf-network-topology:link'][i][
997 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
998 if((spanLoss != None) & (length != None)):
1000 self.assertTrue(find)
1001 R2RLink.remove(link_id)
1002 self.assertEqual(len(R2RLink), 0)
1004 def test_30_disconnect_ROADMB(self):
1005 # Delete in the topology-netconf
1006 url = ("{}/config/network-topology:"
1007 "network-topology/topology/topology-netconf/node/ROADMB01"
1008 .format(self.restconf_baseurl))
1010 headers = {'content-type': 'application/json'}
1011 response = requests.request(
1012 "DELETE", url, data=json.dumps(data), headers=headers,
1013 auth=('admin', 'admin'))
1014 self.assertEqual(response.status_code, requests.codes.ok)
1015 # Delete in the clli-network
1016 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1017 .format(self.restconf_baseurl))
1019 headers = {'content-type': 'application/json'}
1020 response = requests.request(
1021 "DELETE", url, data=json.dumps(data), headers=headers,
1022 auth=('admin', 'admin'))
1023 self.assertEqual(response.status_code, requests.codes.ok)
1025 def test_31_disconnect_ROADMC(self):
1026 # Delete in the topology-netconf
1027 url = ("{}/config/network-topology:"
1028 "network-topology/topology/topology-netconf/node/ROADMC01"
1029 .format(self.restconf_baseurl))
1031 headers = {'content-type': 'application/json'}
1032 response = requests.request(
1033 "DELETE", url, data=json.dumps(data), headers=headers,
1034 auth=('admin', 'admin'))
1035 self.assertEqual(response.status_code, requests.codes.ok)
1036 # Delete in the clli-network
1037 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1038 .format(self.restconf_baseurl))
1040 headers = {'content-type': 'application/json'}
1041 response = requests.request(
1042 "DELETE", url, data=json.dumps(data), headers=headers,
1043 auth=('admin', 'admin'))
1044 self.assertEqual(response.status_code, requests.codes.ok)
1046 def test_32_getNodes_OpenRoadmTopology(self):
1047 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1048 .format(self.restconf_baseurl))
1049 headers = {'content-type': 'application/json'}
1050 response = requests.request(
1051 "GET", url, headers=headers, auth=('admin', 'admin'))
1052 res = response.json()
1053 # Tests related to nodes
1054 self.assertEqual(response.status_code, requests.codes.ok)
1055 nbNode = len(res['network'][0]['node'])
1056 self.assertEqual(nbNode, 5)
1057 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1058 for i in range(0, nbNode):
1059 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1060 nodeId = res['network'][0]['node'][i]['node-id']
1061 # Tests related to XPDRA nodes
1062 if(nodeId == 'XPDRA01-XPDR1'):
1063 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1064 for j in range(0, nbTp):
1065 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1066 if (tpid == 'XPDR1-CLIENT1'):
1067 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1068 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1069 if (tpid == 'XPDR1-NETWORK1'):
1070 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1071 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1072 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1073 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1074 'ROADMA01-SRG1--SRG1-PP1-TXRX')
1075 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
1076 res['network'][0]['node'][i]['supporting-node'])
1077 listNode.remove(nodeId)
1078 elif(nodeId == 'ROADMA01-SRG1'):
1079 # Test related to SRG1
1080 self.assertEqual(nodeType, 'SRG')
1081 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1082 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1083 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1084 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1085 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1086 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1087 res['network'][0]['node'][i]['supporting-node'])
1088 listNode.remove(nodeId)
1089 elif(nodeId == 'ROADMA01-SRG3'):
1090 # Test related to SRG1
1091 self.assertEqual(nodeType, 'SRG')
1092 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1093 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1094 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1095 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1096 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1097 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1098 res['network'][0]['node'][i]['supporting-node'])
1099 listNode.remove(nodeId)
1100 elif(nodeId == 'ROADMA01-DEG1'):
1101 # Test related to DEG1
1102 self.assertEqual(nodeType, 'DEGREE')
1103 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1104 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1105 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1106 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1107 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1108 res['network'][0]['node'][i]['supporting-node'])
1109 listNode.remove(nodeId)
1110 elif(nodeId == 'ROADMA01-DEG2'):
1111 # Test related to DEG2
1112 self.assertEqual(nodeType, 'DEGREE')
1113 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1114 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1115 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1116 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1117 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1118 res['network'][0]['node'][i]['supporting-node'])
1119 listNode.remove(nodeId)
1121 self.assertFalse(True)
1122 self.assertEqual(len(listNode), 0)
1123 # Test related to SRG1 of ROADMC
1124 for i in range(0, nbNode):
1125 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-SRG1')
1126 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG1')
1127 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG2')
1129 def test_33_getOpenRoadmNetwork(self):
1130 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1131 .format(self.restconf_baseurl))
1132 headers = {'content-type': 'application/json'}
1133 response = requests.request(
1134 "GET", url, headers=headers, auth=('admin', 'admin'))
1135 self.assertEqual(response.status_code, requests.codes.ok)
1136 res = response.json()
1137 nbNode = len(res['network'][0]['node'])
1138 self.assertEqual(nbNode, 2)
1139 for i in range(0, nbNode-1):
1140 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01')
1141 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMB01')
1143 def test_34_getClliNetwork(self):
1144 url = ("{}/config/ietf-network:networks/network/clli-network"
1145 .format(self.restconf_baseurl))
1146 headers = {'content-type': 'application/json'}
1147 response = requests.request(
1148 "GET", url, headers=headers, auth=('admin', 'admin'))
1149 self.assertEqual(response.status_code, requests.codes.ok)
1150 res = response.json()
1151 nbNode = len(res['network'][0]['node'])
1152 self.assertEqual(nbNode, 1)
1153 for i in range(0, nbNode-1):
1154 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1156 def test_35_disconnect_XPDRA(self):
1157 url = ("{}/config/network-topology:"
1158 "network-topology/topology/topology-netconf/node/XPDRA01"
1159 .format(self.restconf_baseurl))
1161 headers = {'content-type': 'application/json'}
1162 response = requests.request(
1163 "DELETE", url, data=json.dumps(data), headers=headers,
1164 auth=('admin', 'admin'))
1165 self.assertEqual(response.status_code, requests.codes.ok)
1167 def test_36_getClliNetwork(self):
1168 url = ("{}/config/ietf-network:networks/network/clli-network"
1169 .format(self.restconf_baseurl))
1170 headers = {'content-type': 'application/json'}
1171 response = requests.request(
1172 "GET", url, headers=headers, auth=('admin', 'admin'))
1173 self.assertEqual(response.status_code, requests.codes.ok)
1174 res = response.json()
1175 nbNode = len(res['network'][0]['node'])
1176 self.assertEqual(nbNode, 1)
1177 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1179 def test_37_getOpenRoadmNetwork(self):
1180 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1181 .format(self.restconf_baseurl))
1182 headers = {'content-type': 'application/json'}
1183 response = requests.request(
1184 "GET", url, headers=headers, auth=('admin', 'admin'))
1185 self.assertEqual(response.status_code, requests.codes.ok)
1186 res = response.json()
1187 nbNode = len(res['network'][0]['node'])
1188 self.assertEqual(nbNode, 1)
1189 for i in range(0, nbNode):
1190 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDRA01')
1192 def test_38_getNodes_OpenRoadmTopology(self):
1193 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1194 .format(self.restconf_baseurl))
1195 headers = {'content-type': 'application/json'}
1196 response = requests.request(
1197 "GET", url, headers=headers, auth=('admin', 'admin'))
1198 res = response.json()
1199 # Tests related to nodes
1200 self.assertEqual(response.status_code, requests.codes.ok)
1201 nbNode = len(res['network'][0]['node'])
1202 self.assertEqual(nbNode, 4)
1203 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1204 for i in range(0, nbNode):
1205 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1206 res['network'][0]['node'][i]['supporting-node'])
1207 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1208 nodeId = res['network'][0]['node'][i]['node-id']
1209 if(nodeId == 'ROADMA01-SRG1'):
1210 # Test related to SRG1
1211 self.assertEqual(nodeType, 'SRG')
1212 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1213 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1214 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1215 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1216 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1217 listNode.remove(nodeId)
1218 elif(nodeId == 'ROADMA01-SRG3'):
1219 # Test related to SRG1
1220 self.assertEqual(nodeType, 'SRG')
1221 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1222 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1223 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1224 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1225 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1226 listNode.remove(nodeId)
1227 elif(nodeId == 'ROADMA01-DEG1'):
1228 # Test related to DEG1
1229 self.assertEqual(nodeType, 'DEGREE')
1230 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1231 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1232 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1233 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1234 listNode.remove(nodeId)
1235 elif(nodeId == 'ROADMA01-DEG2'):
1236 # Test related to DEG2
1237 self.assertEqual(nodeType, 'DEGREE')
1238 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1239 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1240 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1241 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1242 listNode.remove(nodeId)
1244 self.assertFalse(True)
1245 self.assertEqual(len(listNode), 0)
1247 def test_39_disconnect_ROADM_XPDRA_link(self):
1249 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1250 "link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
1251 .format(self.restconf_baseurl))
1253 headers = {'content-type': 'application/json'}
1254 response = requests.request(
1255 "DELETE", url, data=json.dumps(data), headers=headers,
1256 auth=('admin', 'admin'))
1257 self.assertEqual(response.status_code, requests.codes.ok)
1259 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1260 "link/ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1"
1261 .format(self.restconf_baseurl))
1263 headers = {'content-type': 'application/json'}
1264 response = requests.request(
1265 "DELETE", url, data=json.dumps(data), headers=headers,
1266 auth=('admin', 'admin'))
1267 self.assertEqual(response.status_code, requests.codes.ok)
1269 def test_40_getLinks_OpenRoadmTopology(self):
1270 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1271 .format(self.restconf_baseurl))
1272 headers = {'content-type': 'application/json'}
1273 response = requests.request(
1274 "GET", url, headers=headers, auth=('admin', 'admin'))
1275 self.assertEqual(response.status_code, requests.codes.ok)
1276 res = response.json()
1277 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1278 self.assertEqual(nbLink, 16)
1279 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1280 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
1281 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1282 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
1283 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
1284 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
1285 roadmtoroadmLink = 0
1286 for i in range(0, nbLink):
1287 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1288 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1289 find = link_id in expressLink
1290 self.assertEqual(find, True)
1291 expressLink.remove(link_id)
1292 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1293 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1294 find = link_id in addLink
1295 self.assertEqual(find, True)
1296 addLink.remove(link_id)
1297 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1298 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1299 find = link_id in dropLink
1300 self.assertEqual(find, True)
1301 dropLink.remove(link_id)
1303 roadmtoroadmLink += 1
1304 self.assertEqual(len(expressLink), 0)
1305 self.assertEqual(len(addLink), 0)
1306 self.assertEqual(len(dropLink), 0)
1307 self.assertEqual(roadmtoroadmLink, 6)
1308 for i in range(0, nbLink):
1309 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1310 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1311 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1312 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1314 def test_41_disconnect_ROADMA(self):
1315 url = ("{}/config/network-topology:"
1316 "network-topology/topology/topology-netconf/node/ROADMA01"
1317 .format(self.restconf_baseurl))
1319 headers = {'content-type': 'application/json'}
1320 response = requests.request(
1321 "DELETE", url, data=json.dumps(data), headers=headers,
1322 auth=('admin', 'admin'))
1323 self.assertEqual(response.status_code, requests.codes.ok)
1324 # Delete in the clli-network
1325 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1326 .format(self.restconf_baseurl))
1328 headers = {'content-type': 'application/json'}
1329 response = requests.request(
1330 "DELETE", url, data=json.dumps(data), headers=headers,
1331 auth=('admin', 'admin'))
1332 self.assertEqual(response.status_code, requests.codes.ok)
1334 def test_42_getClliNetwork(self):
1335 url = ("{}/config/ietf-network:networks/network/clli-network"
1336 .format(self.restconf_baseurl))
1337 headers = {'content-type': 'application/json'}
1338 response = requests.request(
1339 "GET", url, headers=headers, auth=('admin', 'admin'))
1340 self.assertEqual(response.status_code, requests.codes.ok)
1341 res = response.json()
1342 self.assertNotIn('node', res['network'][0])
1344 def test_43_getOpenRoadmNetwork(self):
1345 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1346 .format(self.restconf_baseurl))
1347 headers = {'content-type': 'application/json'}
1348 response = requests.request(
1349 "GET", url, headers=headers, auth=('admin', 'admin'))
1350 self.assertEqual(response.status_code, requests.codes.ok)
1351 res = response.json()
1352 self.assertNotIn('node', res['network'][0])
1354 def test_44_check_roadm2roadm_link_persistence(self):
1355 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1356 .format(self.restconf_baseurl))
1357 headers = {'content-type': 'application/json'}
1358 response = requests.request(
1359 "GET", url, headers=headers, auth=('admin', 'admin'))
1360 self.assertEqual(response.status_code, requests.codes.ok)
1361 res = response.json()
1362 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1363 self.assertNotIn('node', res['network'][0])
1364 self.assertEqual(nbLink, 6)
1367 if __name__ == "__main__":
1368 unittest.main(verbosity=2)