3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
24 class TransportPCEtesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
33 #START_IGNORE_XTESTING
37 print ("starting honeynode1...")
38 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
41 print ("starting honeynode2...")
42 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
45 print ("starting honeynode3...")
46 cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
49 print ("starting honeynode4...")
50 cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
52 print ("all honeynodes started")
54 print ("starting opendaylight...")
55 cls.odl_process = test_utils.start_tpce()
57 print ("opendaylight started")
60 def tearDownClass(cls):
61 for child in psutil.Process(cls.odl_process.pid).children():
62 child.send_signal(signal.SIGINT)
64 cls.odl_process.send_signal(signal.SIGINT)
65 cls.odl_process.wait()
66 for child in psutil.Process(cls.honeynode_process1.pid).children():
67 child.send_signal(signal.SIGINT)
69 cls.honeynode_process1.send_signal(signal.SIGINT)
70 cls.honeynode_process1.wait()
71 for child in psutil.Process(cls.honeynode_process2.pid).children():
72 child.send_signal(signal.SIGINT)
74 cls.honeynode_process2.send_signal(signal.SIGINT)
75 cls.honeynode_process2.wait()
76 for child in psutil.Process(cls.honeynode_process3.pid).children():
77 child.send_signal(signal.SIGINT)
79 cls.honeynode_process3.send_signal(signal.SIGINT)
80 cls.honeynode_process3.wait()
81 for child in psutil.Process(cls.honeynode_process4.pid).children():
82 child.send_signal(signal.SIGINT)
84 cls.honeynode_process4.send_signal(signal.SIGINT)
85 cls.honeynode_process4.wait()
93 def test_01_connect_ROADM_A1(self):
95 url = ("{}/config/network-topology:"
96 "network-topology/topology/topology-netconf/node/ROADM-A1"
97 .format(self.restconf_baseurl))
99 "node-id": "ROADM-A1",
100 "netconf-node-topology:username": "admin",
101 "netconf-node-topology:password": "admin",
102 "netconf-node-topology:host": "127.0.0.1",
103 "netconf-node-topology:port": "17841",
104 "netconf-node-topology:tcp-only": "false",
105 "netconf-node-topology:pass-through": {}}]}
106 headers = {'content-type': 'application/json'}
107 response = requests.request(
108 "PUT", url, data=json.dumps(data), headers=headers,
109 auth=('admin', 'admin'))
110 self.assertEqual(response.status_code, requests.codes.created)
113 def test_02_getClliNetwork(self):
114 url = ("{}/config/ietf-network:networks/network/clli-network"
115 .format(self.restconf_baseurl))
116 headers = {'content-type': 'application/json'}
117 response = requests.request(
118 "GET", url, headers=headers, auth=('admin', 'admin'))
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
122 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
123 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
125 def test_03_getOpenRoadmNetwork(self):
126 url = ("{}/config/ietf-network:networks/network/openroadm-network"
127 .format(self.restconf_baseurl))
128 headers = {'content-type': 'application/json'}
129 response = requests.request(
130 "GET", url, headers=headers, auth=('admin', 'admin'))
131 self.assertEqual(response.status_code, requests.codes.ok)
132 res = response.json()
133 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADM-A1')
134 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
135 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
136 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'],'ROADM')
137 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'model2')
139 def test_04_getLinks_OpenroadmTopology(self):
140 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
141 .format(self.restconf_baseurl))
142 headers = {'content-type': 'application/json'}
143 response = requests.request(
144 "GET", url, headers=headers, auth=('admin', 'admin'))
145 self.assertEqual(response.status_code, requests.codes.ok)
146 res = response.json()
147 #Tests related to links
148 nbLink=len(res['network'][0]['ietf-network-topology:link'])
149 self.assertEqual(nbLink,10)
150 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
151 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
152 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
153 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
154 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
155 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
156 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
157 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
158 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
159 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
160 for i in range(0,nbLink):
161 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
162 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='EXPRESS-LINK'):
163 find= linkId in expressLink
164 self.assertEqual(find, True)
165 expressLink.remove(linkId)
166 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='ADD-LINK'):
167 find= linkId in addLink
168 self.assertEqual(find, True)
169 addLink.remove(linkId)
170 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='DROP-LINK'):
171 find= linkId in dropLink
172 self.assertEqual(find, True)
173 dropLink.remove(linkId)
175 self.assertFalse(True)
176 self.assertEqual(len(expressLink),0)
177 self.assertEqual(len(addLink),0)
178 self.assertEqual(len(dropLink),0)
180 def test_05_getNodes_OpenRoadmTopology(self):
181 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
182 .format(self.restconf_baseurl))
183 headers = {'content-type': 'application/json'}
184 response = requests.request(
185 "GET", url, headers=headers, auth=('admin', 'admin'))
186 res = response.json()
187 #Tests related to nodes
188 self.assertEqual(response.status_code, requests.codes.ok)
189 nbNode=len(res['network'][0]['node'])
190 self.assertEqual(nbNode,4)
191 listNode=['ROADM-A1-SRG1', 'ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
192 for i in range(0,nbNode):
193 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
194 res['network'][0]['node'][i]['supporting-node'])
195 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
196 nodeId=res['network'][0]['node'][i]['node-id']
197 if(nodeId=='ROADM-A1-SRG1'):
198 #Test related to SRG1
199 self.assertEqual(nodeType,'SRG')
200 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
201 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
202 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
203 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
204 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
205 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
206 res['network'][0]['node'][i]['supporting-node'])
207 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
208 res['network'][0]['node'][i]['supporting-node'])
209 listNode.remove(nodeId)
210 elif(nodeId=='ROADM-A1-SRG3'):
211 #Test related to SRG1
212 self.assertEqual(nodeType,'SRG')
213 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
214 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
215 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
216 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
217 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
218 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
219 res['network'][0]['node'][i]['supporting-node'])
220 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
221 res['network'][0]['node'][i]['supporting-node'])
222 listNode.remove(nodeId)
223 elif(nodeId=='ROADM-A1-DEG1'):
224 #Test related to DEG1
225 self.assertEqual(nodeType,'DEGREE')
226 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
227 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
228 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
229 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
230 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
231 res['network'][0]['node'][i]['supporting-node'])
232 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
233 res['network'][0]['node'][i]['supporting-node'])
234 listNode.remove(nodeId)
235 elif(nodeId=='ROADM-A1-DEG2'):
236 #Test related to DEG2
237 self.assertEqual(nodeType,'DEGREE')
238 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
239 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
240 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
241 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
242 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
243 res['network'][0]['node'][i]['supporting-node'])
244 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
245 res['network'][0]['node'][i]['supporting-node'])
246 listNode.remove(nodeId)
248 self.assertFalse(True)
249 self.assertEqual(len(listNode),0)
251 def test_06_connect_XPDRA(self):
252 url = ("{}/config/network-topology:"
253 "network-topology/topology/topology-netconf/node/XPDR-A1"
254 .format(self.restconf_baseurl))
256 "node-id": "XPDR-A1",
257 "netconf-node-topology:username": "admin",
258 "netconf-node-topology:password": "admin",
259 "netconf-node-topology:host": "127.0.0.1",
260 "netconf-node-topology:port": "17840",
261 "netconf-node-topology:tcp-only": "false",
262 "netconf-node-topology:pass-through": {}}]}
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "PUT", url, data=json.dumps(data), headers=headers,
266 auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.created)
270 def test_07_getClliNetwork(self):
271 url = ("{}/config/ietf-network:networks/network/clli-network"
272 .format(self.restconf_baseurl))
273 headers = {'content-type': 'application/json'}
274 response = requests.request(
275 "GET", url, headers=headers, auth=('admin', 'admin'))
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
279 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
281 def test_08_getOpenRoadmNetwork(self):
282 url = ("{}/config/ietf-network:networks/network/openroadm-network"
283 .format(self.restconf_baseurl))
284 headers = {'content-type': 'application/json'}
285 response = requests.request(
286 "GET", url, headers=headers, auth=('admin', 'admin'))
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 nbNode=len(res['network'][0]['node'])
290 self.assertEqual(nbNode,2)
291 for i in range(0,nbNode):
292 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
293 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
294 nodeId=res['network'][0]['node'][i]['node-id']
295 if(nodeId=='XPDR-A1'):
296 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'XPONDER')
297 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
298 elif(nodeId=='ROADM-A1'):
299 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'ROADM')
300 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
302 self.assertFalse(True)
304 def test_09_getNodes_OpenRoadmTopology(self):
305 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
306 .format(self.restconf_baseurl))
307 headers = {'content-type': 'application/json'}
308 response = requests.request(
309 "GET", url, headers=headers, auth=('admin', 'admin'))
310 res = response.json()
311 #Tests related to nodes
312 self.assertEqual(response.status_code, requests.codes.ok)
313 nbNode=len(res['network'][0]['node'])
314 self.assertEqual(nbNode,5)
315 listNode=['XPDR-A1-XPDR1','ROADM-A1-SRG1','ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
316 for i in range(0,nbNode):
317 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
318 nodeId=res['network'][0]['node'][i]['node-id']
319 #Tests related to XPDRA nodes
320 if(nodeId=='XPDR-A1-XPDR1'):
321 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
322 res['network'][0]['node'][i]['supporting-node'])
323 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
324 res['network'][0]['node'][i]['supporting-node'])
325 self.assertEqual(nodeType,'XPONDER')
326 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
329 for j in range(0,nbTps):
330 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
331 tpId=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
332 if (tpType=='XPONDER-CLIENT'):
334 elif (tpType=='XPONDER-NETWORK'):
336 if (tpId == 'XPDR1-NETWORK2'):
337 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
338 ['transportpce-topology:associated-connection-map-port'],'XPDR1-CLIENT2')
339 if (tpId == 'XPDR1-CLIENT2'):
340 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
341 ['transportpce-topology:associated-connection-map-port'],'XPDR1-NETWORK2')
343 self.assertTrue(client == 2)
344 self.assertTrue(network == 2)
345 listNode.remove(nodeId)
346 elif(nodeId=='ROADM-A1-SRG1'):
347 #Test related to SRG1
348 self.assertEqual(nodeType,'SRG')
349 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
350 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
351 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
352 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
353 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
354 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
355 res['network'][0]['node'][i]['supporting-node'])
356 listNode.remove(nodeId)
357 elif(nodeId=='ROADM-A1-SRG3'):
358 #Test related to SRG1
359 self.assertEqual(nodeType,'SRG')
360 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
361 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
362 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
366 res['network'][0]['node'][i]['supporting-node'])
367 listNode.remove(nodeId)
368 elif(nodeId=='ROADM-A1-DEG1'):
369 #Test related to DEG1
370 self.assertEqual(nodeType,'DEGREE')
371 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
376 res['network'][0]['node'][i]['supporting-node'])
377 listNode.remove(nodeId)
378 elif(nodeId=='ROADM-A1-DEG2'):
379 #Test related to DEG2
380 self.assertEqual(nodeType,'DEGREE')
381 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
382 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
383 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
384 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
385 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
386 res['network'][0]['node'][i]['supporting-node'])
387 listNode.remove(nodeId)
389 self.assertFalse(True)
390 self.assertEqual(len(listNode),0)
392 #Connect the tail XPDRA to ROADMA and vice versa
393 def test_10_connect_tail_xpdr_rdm(self):
394 #Connect the tail: XPDRA to ROADMA
395 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
396 .format(self.restconf_baseurl))
397 data = {"networkutils:input": {
398 "networkutils:links-input": {
399 "networkutils:xpdr-node": "XPDR-A1",
400 "networkutils:xpdr-num": "1",
401 "networkutils:network-num": "1",
402 "networkutils:rdm-node": "ROADM-A1",
403 "networkutils:srg-num": "1",
404 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
408 headers = {'content-type': 'application/json'}
409 response = requests.request(
410 "POST", url, data=json.dumps(data), headers=headers,
411 auth=('admin', 'admin'))
412 self.assertEqual(response.status_code, requests.codes.ok)
414 def test_11_connect_tail_rdm_xpdr(self):
415 #Connect the tail: ROADMA to XPDRA
416 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
417 .format(self.restconf_baseurl))
418 data = {"networkutils:input": {
419 "networkutils:links-input": {
420 "networkutils:xpdr-node": "XPDR-A1",
421 "networkutils:xpdr-num": "1",
422 "networkutils:network-num": "1",
423 "networkutils:rdm-node": "ROADM-A1",
424 "networkutils:srg-num": "1",
425 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
429 headers = {'content-type': 'application/json'}
430 response = requests.request(
431 "POST", url, data=json.dumps(data), headers=headers,
432 auth=('admin', 'admin'))
433 self.assertEqual(response.status_code, requests.codes.ok)
435 def test_12_getLinks_OpenRoadmTopology(self):
436 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
437 .format(self.restconf_baseurl))
438 headers = {'content-type': 'application/json'}
439 response = requests.request(
440 "GET", url, headers=headers, auth=('admin', 'admin'))
441 self.assertEqual(response.status_code, requests.codes.ok)
442 res = response.json()
443 #Tests related to links
444 nbLink=len(res['network'][0]['ietf-network-topology:link'])
445 self.assertEqual(nbLink,12)
446 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
447 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
448 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
449 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
450 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
451 XPDR_IN=['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
452 XPDR_OUT=['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
453 for i in range(0,nbLink):
454 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
455 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
456 if(nodeType=='EXPRESS-LINK'):
457 find= linkId in expressLink
458 self.assertEqual(find, True)
459 expressLink.remove(linkId)
460 elif(nodeType=='ADD-LINK'):
461 find= linkId in addLink
462 self.assertEqual(find, True)
463 addLink.remove(linkId)
464 elif(nodeType=='DROP-LINK'):
465 find= linkId in dropLink
466 self.assertEqual(find, True)
467 dropLink.remove(linkId)
468 elif(nodeType=='XPONDER-INPUT'):
469 find= linkId in XPDR_IN
470 self.assertEqual(find, True)
471 XPDR_IN.remove(linkId)
472 elif(nodeType=='XPONDER-OUTPUT'):
473 find= linkId in XPDR_OUT
474 self.assertEqual(find, True)
475 XPDR_OUT.remove(linkId)
477 self.assertFalse(True)
478 self.assertEqual(len(expressLink),0)
479 self.assertEqual(len(addLink),0)
480 self.assertEqual(len(dropLink),0)
481 self.assertEqual(len(XPDR_IN),0)
482 self.assertEqual(len(XPDR_OUT),0)
484 def test_13_connect_ROADMC(self):
486 url = ("{}/config/network-topology:"
487 "network-topology/topology/topology-netconf/node/ROADM-C1"
488 .format(self.restconf_baseurl))
490 "node-id": "ROADM-C1",
491 "netconf-node-topology:username": "admin",
492 "netconf-node-topology:password": "admin",
493 "netconf-node-topology:host": "127.0.0.1",
494 "netconf-node-topology:port": "17843",
495 "netconf-node-topology:tcp-only": "false",
496 "netconf-node-topology:pass-through": {}}]}
497 headers = {'content-type': 'application/json'}
498 response = requests.request(
499 "PUT", url, data=json.dumps(data), headers=headers,
500 auth=('admin', 'admin'))
501 self.assertEqual(response.status_code, requests.codes.created)
504 def test_14_omsAttributes_ROADMA_ROADMC(self):
505 # Config ROADMA-ROADMC oms-attributes
506 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
507 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
508 "OMS-attributes/span"
509 .format(self.restconf_baseurl))
511 "auto-spanloss": "true",
512 "engineered-spanloss": 12.2,
513 "link-concatenation": [{
516 "SRLG-length": 100000,
518 headers = {'content-type': 'application/json'}
519 response = requests.request(
520 "PUT", url, data=json.dumps(data), headers=headers,
521 auth=('admin', 'admin'))
522 self.assertEqual(response.status_code, requests.codes.created)
524 def test_15_omsAttributes_ROADMC_ROADMA(self):
525 # Config ROADM-C1-ROADM-A1 oms-attributes
526 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
527 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
528 "OMS-attributes/span"
529 .format(self.restconf_baseurl))
531 "auto-spanloss": "true",
532 "engineered-spanloss": 12.2,
533 "link-concatenation": [{
536 "SRLG-length": 100000,
539 headers = {'content-type': 'application/json'}
540 response = requests.request(
541 "PUT", url, data=json.dumps(data), headers=headers,
542 auth=('admin', 'admin'))
543 self.assertEqual(response.status_code, requests.codes.created)
545 def test_16_getClliNetwork(self):
546 url = ("{}/config/ietf-network:networks/network/clli-network"
547 .format(self.restconf_baseurl))
548 headers = {'content-type': 'application/json'}
549 response = requests.request(
550 "GET", url, headers=headers, auth=('admin', 'admin'))
551 self.assertEqual(response.status_code, requests.codes.ok)
552 res = response.json()
553 nbNode=len(res['network'][0]['node'])
554 listNode=['NodeA','NodeC']
555 for i in range(0,nbNode):
556 nodeId = res['network'][0]['node'][i]['node-id']
557 find= nodeId in listNode
558 self.assertEqual(find, True)
560 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
562 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
563 listNode.remove(nodeId)
564 self.assertEqual(len(listNode),0)
566 def test_17_getOpenRoadmNetwork(self):
567 url = ("{}/config/ietf-network:networks/network/openroadm-network"
568 .format(self.restconf_baseurl))
569 headers = {'content-type': 'application/json'}
570 response = requests.request(
571 "GET", url, headers=headers, auth=('admin', 'admin'))
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 nbNode=len(res['network'][0]['node'])
575 self.assertEqual(nbNode,3)
576 listNode=['XPDR-A1','ROADM-A1','ROADM-C1']
577 for i in range(0,nbNode):
578 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
579 nodeId=res['network'][0]['node'][i]['node-id']
580 if(nodeId=='XPDR-A1'):
581 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
582 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'XPONDER')
583 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
584 listNode.remove(nodeId)
585 elif(nodeId=='ROADM-A1'):
586 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
587 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'ROADM')
588 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
589 listNode.remove(nodeId)
590 elif(nodeId=='ROADM-C1'):
591 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
592 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'ROADM')
593 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
594 listNode.remove(nodeId)
596 self.assertFalse(True)
597 self.assertEqual(len(listNode),0)
599 def test_18_getROADMLinkOpenRoadmTopology(self):
600 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
601 .format(self.restconf_baseurl))
602 headers = {'content-type': 'application/json'}
603 response = requests.request(
604 "GET", url, headers=headers, auth=('admin', 'admin'))
605 self.assertEqual(response.status_code, requests.codes.ok)
606 res = response.json()
607 #Tests related to links
608 nbLink=len(res['network'][0]['ietf-network-topology:link'])
609 self.assertEqual(nbLink,20)
610 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
611 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX','ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
612 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
613 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
614 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX','ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
615 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
616 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
617 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX','ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
618 R2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
619 XPDR_IN=['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
620 XPDR_OUT=['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
621 for i in range(0,nbLink):
622 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
623 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
624 if(nodeType=='EXPRESS-LINK'):
625 find= linkId in expressLink
626 self.assertEqual(find, True)
627 expressLink.remove(linkId)
628 elif(nodeType=='ADD-LINK'):
629 find= linkId in addLink
630 self.assertEqual(find, True)
631 addLink.remove(linkId)
632 elif(nodeType=='DROP-LINK'):
633 find= linkId in dropLink
634 self.assertEqual(find, True)
635 dropLink.remove(linkId)
636 elif(nodeType=='ROADM-TO-ROADM'):
637 find= linkId in R2RLink
638 self.assertEqual(find, True)
639 R2RLink.remove(linkId)
640 elif(nodeType=='XPONDER-INPUT'):
641 find= linkId in XPDR_IN
642 self.assertEqual(find, True)
643 XPDR_IN.remove(linkId)
644 elif(nodeType=='XPONDER-OUTPUT'):
645 find= linkId in XPDR_OUT
646 self.assertEqual(find, True)
647 XPDR_OUT.remove(linkId)
649 self.assertFalse(True)
650 self.assertEqual(len(expressLink),0)
651 self.assertEqual(len(addLink),0)
652 self.assertEqual(len(dropLink),0)
653 self.assertEqual(len(R2RLink),0)
654 self.assertEqual(len(XPDR_IN),0)
655 self.assertEqual(len(XPDR_OUT),0)
657 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
658 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
659 .format(self.restconf_baseurl))
660 headers = {'content-type': 'application/json'}
661 response = requests.request(
662 "GET", url, headers=headers, auth=('admin', 'admin'))
663 self.assertEqual(response.status_code, requests.codes.ok)
664 res = response.json()
665 #Tests related to links
666 nbLink=len(res['network'][0]['ietf-network-topology:link'])
667 self.assertEqual(nbLink,20)
668 R2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
669 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
670 for i in range(0,nbLink):
671 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
672 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
673 if(link_id in R2RLink):
675 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
676 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
677 length = res['network'][0]['ietf-network-topology:link'][i][
678 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
679 if((spanLoss!=None)&(length!=None)):
681 self.assertTrue(find)
682 R2RLink.remove(link_id)
683 self.assertEqual(len(R2RLink),0)
685 def test_20_getNodes_OpenRoadmTopology(self):
686 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
687 .format(self.restconf_baseurl))
688 headers = {'content-type': 'application/json'}
689 response = requests.request(
690 "GET", url, headers=headers, auth=('admin', 'admin'))
691 res = response.json()
692 #Tests related to nodes
693 self.assertEqual(response.status_code, requests.codes.ok)
694 nbNode=len(res['network'][0]['node'])
695 self.assertEqual(nbNode,8)
696 listNode=['XPDR-A1-XPDR1',
697 'ROADM-A1-SRG1','ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2',
698 'ROADM-C1-SRG1','ROADM-C1-DEG1', 'ROADM-C1-DEG2']
699 #************************Tests related to XPDRA nodes
700 for i in range(0,nbNode):
701 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
702 nodeId=res['network'][0]['node'][i]['node-id']
703 if(nodeId=='XPDR-A1-XPDR1'):
704 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
705 res['network'][0]['node'][i]['supporting-node'])
706 self.assertEqual(nodeType,'XPONDER')
707 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
708 self.assertTrue(nbTps >= 4)
711 for j in range(0,nbTps):
712 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
713 if (tpType=='XPONDER-CLIENT'):
715 elif (tpType=='XPONDER-NETWORK'):
717 self.assertTrue(client == 2)
718 self.assertTrue(network == 2)
719 listNode.remove(nodeId)
720 elif(nodeId=='ROADM-A1-SRG1'):
721 #Test related to SRG1
722 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
723 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
724 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
725 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
726 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
727 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
728 res['network'][0]['node'][i]['supporting-node'])
729 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'SRG')
730 listNode.remove(nodeId)
731 elif(nodeId=='ROADM-A1-SRG3'):
732 #Test related to SRG1
733 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
734 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
735 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
736 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
737 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
738 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
739 res['network'][0]['node'][i]['supporting-node'])
740 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'SRG')
741 listNode.remove(nodeId)
742 elif(nodeId=='ROADM-A1-DEG1'):
743 #Test related to DEG1
744 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
745 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
746 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
747 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
748 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
749 res['network'][0]['node'][i]['supporting-node'])
750 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
751 listNode.remove(nodeId)
752 elif(nodeId=='ROADM-A1-DEG2'):
753 #Test related to DEG2
754 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
755 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
756 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
757 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
758 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
759 res['network'][0]['node'][i]['supporting-node'])
760 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
761 listNode.remove(nodeId)
762 elif(nodeId=='ROADM-C1-SRG1'):
763 #Test related to SRG1
764 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
765 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
766 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
767 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
768 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
769 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
770 res['network'][0]['node'][i]['supporting-node'])
771 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'SRG')
772 listNode.remove(nodeId)
773 elif(nodeId=='ROADM-C1-DEG1'):
774 #Test related to DEG1
775 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
776 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
777 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
778 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
779 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
780 res['network'][0]['node'][i]['supporting-node'])
781 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
782 listNode.remove(nodeId)
783 elif(nodeId=='ROADM-C1-DEG2'):
784 #Test related to DEG1
785 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
786 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
787 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
788 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
789 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
790 res['network'][0]['node'][i]['supporting-node'])
791 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
792 listNode.remove(nodeId)
794 self.assertFalse(True)
795 self.assertEqual(len(listNode),0)
797 def test_21_connect_ROADMB(self):
798 url = ("{}/config/network-topology:"
799 "network-topology/topology/topology-netconf/node/ROADM-B1"
800 .format(self.restconf_baseurl))
802 "node-id": "ROADM-B1",
803 "netconf-node-topology:username": "admin",
804 "netconf-node-topology:password": "admin",
805 "netconf-node-topology:host": "127.0.0.1",
806 "netconf-node-topology:port": "17842",
807 "netconf-node-topology:tcp-only": "false",
808 "netconf-node-topology:pass-through": {}}]}
809 headers = {'content-type': 'application/json'}
810 response = requests.request(
811 "PUT", url, data=json.dumps(data), headers=headers,
812 auth=('admin', 'admin'))
813 self.assertEqual(response.status_code, requests.codes.created)
816 def test_22_omsAttributes_ROADMA_ROADMB(self):
817 # Config ROADM-A1-ROADM-B1 oms-attributes
818 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
819 "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
820 "OMS-attributes/span"
821 .format(self.restconf_baseurl))
823 "auto-spanloss": "true",
824 "engineered-spanloss": 12.2,
825 "spanloss-current": 12,
826 "spanloss-base": 11.4,
827 "link-concatenation": [{
830 "SRLG-length": 100000,
832 headers = {'content-type': 'application/json'}
833 response = requests.request(
834 "PUT", url, data=json.dumps(data), headers=headers,
835 auth=('admin', 'admin'))
836 self.assertEqual(response.status_code, requests.codes.created)
838 def test_23_omsAttributes_ROADMB_ROADMA(self):
839 # Config ROADM-B1-ROADM-A1 oms-attributes
840 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
841 "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
842 "OMS-attributes/span"
843 .format(self.restconf_baseurl))
845 "auto-spanloss": "true",
846 "engineered-spanloss": 12.2,
847 "spanloss-current": 12,
848 "spanloss-base": 11.4,
849 "link-concatenation": [{
852 "SRLG-length": 100000,
854 headers = {'content-type': 'application/json'}
855 response = requests.request(
856 "PUT", url, data=json.dumps(data), headers=headers,
857 auth=('admin', 'admin'))
858 self.assertEqual(response.status_code, requests.codes.created)
860 def test_24_omsAttributes_ROADMB_ROADMC(self):
861 # Config ROADM-B1-ROADM-C1 oms-attributes
862 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
863 "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
864 "OMS-attributes/span"
865 .format(self.restconf_baseurl))
867 "auto-spanloss": "true",
868 "engineered-spanloss": 12.2,
869 "spanloss-current": 12,
870 "spanloss-base": 11.4,
871 "link-concatenation": [{
874 "SRLG-length": 100000,
876 headers = {'content-type': 'application/json'}
877 response = requests.request(
878 "PUT", url, data=json.dumps(data), headers=headers,
879 auth=('admin', 'admin'))
880 self.assertEqual(response.status_code, requests.codes.created)
882 def test_25_omsAttributes_ROADMC_ROADMB(self):
883 # Config ROADM-C1-ROADM-B1 oms-attributes
884 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
885 "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
886 "OMS-attributes/span"
887 .format(self.restconf_baseurl))
889 "auto-spanloss": "true",
890 "engineered-spanloss": 12.2,
891 "link-concatenation": [{
894 "SRLG-length": 100000,
896 headers = {'content-type': 'application/json'}
897 response = requests.request(
898 "PUT", url, data=json.dumps(data), headers=headers,
899 auth=('admin', 'admin'))
900 self.assertEqual(response.status_code, requests.codes.created)
902 def test_26_getClliNetwork(self):
903 url = ("{}/config/ietf-network:networks/network/clli-network"
904 .format(self.restconf_baseurl))
905 headers = {'content-type': 'application/json'}
906 response = requests.request(
907 "GET", url, headers=headers, auth=('admin', 'admin'))
908 self.assertEqual(response.status_code, requests.codes.ok)
909 res = response.json()
910 nbNode=len(res['network'][0]['node'])
911 listNode=['NodeA','NodeB','NodeC']
912 for i in range(0,nbNode):
913 nodeId = res['network'][0]['node'][i]['node-id']
914 find= nodeId in listNode
915 self.assertEqual(find, True)
917 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
918 elif(nodeId=='NodeB'):
919 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
921 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
922 listNode.remove(nodeId)
923 self.assertEqual(len(listNode),0)
925 def test_27_verifyDegree(self):
926 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
927 .format(self.restconf_baseurl))
928 headers = {'content-type': 'application/json'}
929 response = requests.request(
930 "GET", url, headers=headers, auth=('admin', 'admin'))
931 self.assertEqual(response.status_code, requests.codes.ok)
932 res = response.json()
933 #Tests related to links
934 nbLink=len(res['network'][0]['ietf-network-topology:link'])
935 listR2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
936 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
937 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX','ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
938 for i in range(0,nbLink):
939 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
940 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
941 find= link_id in listR2RLink
942 self.assertEqual(find, True)
943 listR2RLink.remove(link_id)
944 self.assertEqual(len(listR2RLink),0)
946 def test_28_verifyOppositeLinkTopology(self):
947 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
948 .format(self.restconf_baseurl))
949 headers = {'content-type': 'application/json'}
950 response = requests.request(
951 "GET", url, headers=headers, auth=('admin', 'admin'))
952 self.assertEqual(response.status_code, requests.codes.ok)
953 res = response.json()
954 #Tests related to links
955 nbLink=len(res['network'][0]['ietf-network-topology:link'])
956 self.assertEqual(nbLink,26)
957 for i in range(0,nbLink):
958 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
959 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
960 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
961 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
962 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
963 #Find the opposite link
964 url_oppLink="{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
965 url = (url_oppLink.format(self.restconf_baseurl))
966 headers = {'content-type': 'application/json'}
967 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
968 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
969 res_oppLink = response_oppLink.json()
970 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:opposite-link'],link_id)
971 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
972 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
973 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
974 if link_type=='ADD-LINK':
975 self.assertEqual(oppLink_type, 'DROP-LINK')
976 elif link_type=='DROP-LINK':
977 self.assertEqual(oppLink_type, 'ADD-LINK')
978 elif link_type=='EXPRESS-LINK':
979 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
980 elif link_type=='ROADM-TO-ROADM':
981 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
982 elif link_type=='XPONDER-INPUT':
983 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
984 elif link_type=='XPONDER-OUTPUT':
985 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
987 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
988 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
989 .format(self.restconf_baseurl))
990 headers = {'content-type': 'application/json'}
991 response = requests.request(
992 "GET", url, headers=headers, auth=('admin', 'admin'))
993 self.assertEqual(response.status_code, requests.codes.ok)
994 res = response.json()
995 nbLink=len(res['network'][0]['ietf-network-topology:link'])
996 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
997 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
998 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
999 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
1000 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
1001 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
1002 for i in range(0,nbLink):
1003 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1004 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1005 if(link_id in R2RLink):
1007 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
1008 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
1009 length = res['network'][0]['ietf-network-topology:link'][i][
1010 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
1011 if((spanLoss!=None)&(length!=None)):
1013 self.assertTrue(find)
1014 R2RLink.remove(link_id)
1015 self.assertEqual(len(R2RLink),0)
1017 def test_30_disconnect_ROADMB(self):
1018 #Delete in the topology-netconf
1019 url = ("{}/config/network-topology:"
1020 "network-topology/topology/topology-netconf/node/ROADM-B1"
1021 .format(self.restconf_baseurl))
1023 headers = {'content-type': 'application/json'}
1024 response = requests.request(
1025 "DELETE", url, data=json.dumps(data), headers=headers,
1026 auth=('admin', 'admin'))
1027 self.assertEqual(response.status_code, requests.codes.ok)
1028 #Delete in the clli-network
1029 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1030 .format(self.restconf_baseurl))
1032 headers = {'content-type': 'application/json'}
1033 response = requests.request(
1034 "DELETE", url, data=json.dumps(data), headers=headers,
1035 auth=('admin', 'admin'))
1036 self.assertEqual(response.status_code, requests.codes.ok)
1038 def test_31_disconnect_ROADMC(self):
1039 #Delete in the topology-netconf
1040 url = ("{}/config/network-topology:"
1041 "network-topology/topology/topology-netconf/node/ROADM-C1"
1042 .format(self.restconf_baseurl))
1044 headers = {'content-type': 'application/json'}
1045 response = requests.request(
1046 "DELETE", url, data=json.dumps(data), headers=headers,
1047 auth=('admin', 'admin'))
1048 self.assertEqual(response.status_code, requests.codes.ok)
1049 #Delete in the clli-network
1050 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1051 .format(self.restconf_baseurl))
1053 headers = {'content-type': 'application/json'}
1054 response = requests.request(
1055 "DELETE", url, data=json.dumps(data), headers=headers,
1056 auth=('admin', 'admin'))
1057 self.assertEqual(response.status_code, requests.codes.ok)
1059 # def test_24_check_roadm2roadm_links_deletion(self):
1060 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1061 # .format(self.restconf_baseurl))
1062 # headers = {'content-type': 'application/json'}
1063 # response = requests.request(
1064 # "GET", url, headers=headers, auth=('admin', 'admin'))
1065 # self.assertEqual(response.status_code, requests.codes.ok)
1066 # res = response.json()
1067 # #Write the response in the log
1068 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1069 # outfile1.write(str(res))
1070 # #Tests related to links
1071 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1072 # self.assertEqual(nbLink,8)
1073 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1074 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1075 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1076 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1077 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1078 # for i in range(0,nbLink):
1079 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1080 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1081 # if(nodeType=='EXPRESS-LINK'):
1082 # find= linkId in expressLink
1083 # self.assertEqual(find, True)
1084 # expressLink.remove(linkId)
1085 # elif(nodeType=='ADD-LINK'):
1086 # find= linkId in addLink
1087 # self.assertEqual(find, True)
1088 # addLink.remove(linkId)
1089 # elif(nodeType=='DROP-LINK'):
1090 # find= linkId in dropLink
1091 # self.assertEqual(find, True)
1092 # dropLink.remove(linkId)
1093 # elif(nodeType=='XPONDER-INPUT'):
1094 # find= linkId in XPDR_IN
1095 # self.assertEqual(find, True)
1096 # XPDR_IN.remove(linkId)
1097 # elif(nodeType=='XPONDER-OUTPUT'):
1098 # find= linkId in XPDR_OUT
1099 # self.assertEqual(find, True)
1100 # XPDR_OUT.remove(linkId)
1102 # self.assertFalse(True)
1103 # self.assertEqual(len(expressLink),0)
1104 # self.assertEqual(len(addLink),0)
1105 # self.assertEqual(len(dropLink),0)
1106 # self.assertEqual(len(XPDR_IN),0)
1107 # self.assertEqual(len(XPDR_OUT),0)
1109 # for i in range(0,nbLink):
1110 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
1111 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1112 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1113 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1114 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1115 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1116 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1118 def test_32_getNodes_OpenRoadmTopology(self):
1119 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1120 .format(self.restconf_baseurl))
1121 headers = {'content-type': 'application/json'}
1122 response = requests.request(
1123 "GET", url, headers=headers, auth=('admin', 'admin'))
1124 res = response.json()
1125 #Tests related to nodes
1126 self.assertEqual(response.status_code, requests.codes.ok)
1127 nbNode=len(res['network'][0]['node'])
1128 self.assertEqual(nbNode,5)
1129 listNode=['XPDR-A1-XPDR1','ROADM-A1-SRG1', 'ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
1130 for i in range(0,nbNode):
1131 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1132 nodeId=res['network'][0]['node'][i]['node-id']
1133 #Tests related to XPDRA nodes
1134 if(nodeId=='XPDR-A1-XPDR1'):
1135 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1136 for j in range(0, nbTp):
1137 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1138 if (tpid == 'XPDR1-CLIENT1'):
1139 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1140 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1141 if (tpid == 'XPDR1-NETWORK1'):
1142 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1143 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1144 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1145 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1146 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
1147 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
1148 res['network'][0]['node'][i]['supporting-node'])
1149 listNode.remove(nodeId)
1150 elif(nodeId=='ROADM-A1-SRG1'):
1151 #Test related to SRG1
1152 self.assertEqual(nodeType,'SRG')
1153 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1154 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1155 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1156 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1157 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1158 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1159 res['network'][0]['node'][i]['supporting-node'])
1160 listNode.remove(nodeId)
1161 elif(nodeId=='ROADM-A1-SRG3'):
1162 #Test related to SRG1
1163 self.assertEqual(nodeType,'SRG')
1164 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1165 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1166 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1167 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1168 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1169 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1170 res['network'][0]['node'][i]['supporting-node'])
1171 listNode.remove(nodeId)
1172 elif(nodeId=='ROADM-A1-DEG1'):
1173 #Test related to DEG1
1174 self.assertEqual(nodeType,'DEGREE')
1175 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1176 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1177 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1178 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1179 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1180 res['network'][0]['node'][i]['supporting-node'])
1181 listNode.remove(nodeId)
1182 elif(nodeId=='ROADM-A1-DEG2'):
1183 #Test related to DEG2
1184 self.assertEqual(nodeType,'DEGREE')
1185 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1186 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1187 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1188 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1189 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1190 res['network'][0]['node'][i]['supporting-node'])
1191 listNode.remove(nodeId)
1193 self.assertFalse(True)
1194 self.assertEqual(len(listNode),0)
1195 #Test related to SRG1 of ROADMC
1196 for i in range(0,nbNode):
1197 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-SRG1')
1198 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-DEG1')
1199 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-DEG2')
1201 def test_33_getOpenRoadmNetwork(self):
1202 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1203 .format(self.restconf_baseurl))
1204 headers = {'content-type': 'application/json'}
1205 response = requests.request(
1206 "GET", url, headers=headers, auth=('admin', 'admin'))
1207 self.assertEqual(response.status_code, requests.codes.ok)
1208 res = response.json()
1209 nbNode=len(res['network'][0]['node'])
1210 self.assertEqual(nbNode,2)
1211 for i in range(0,nbNode-1):
1212 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1')
1213 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-B1')
1215 def test_34_getClliNetwork(self):
1216 url = ("{}/config/ietf-network:networks/network/clli-network"
1217 .format(self.restconf_baseurl))
1218 headers = {'content-type': 'application/json'}
1219 response = requests.request(
1220 "GET", url, headers=headers, auth=('admin', 'admin'))
1221 self.assertEqual(response.status_code, requests.codes.ok)
1222 res = response.json()
1223 nbNode=len(res['network'][0]['node'])
1224 self.assertEqual(nbNode,1)
1225 for i in range(0,nbNode-1):
1226 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
1228 def test_35_disconnect_XPDRA(self):
1229 url = ("{}/config/network-topology:"
1230 "network-topology/topology/topology-netconf/node/XPDR-A1"
1231 .format(self.restconf_baseurl))
1233 headers = {'content-type': 'application/json'}
1234 response = requests.request(
1235 "DELETE", url, data=json.dumps(data), headers=headers,
1236 auth=('admin', 'admin'))
1237 self.assertEqual(response.status_code, requests.codes.ok)
1239 def test_36_getClliNetwork(self):
1240 url = ("{}/config/ietf-network:networks/network/clli-network"
1241 .format(self.restconf_baseurl))
1242 headers = {'content-type': 'application/json'}
1243 response = requests.request(
1244 "GET", url, headers=headers, auth=('admin', 'admin'))
1245 self.assertEqual(response.status_code, requests.codes.ok)
1246 res = response.json()
1247 nbNode=len(res['network'][0]['node'])
1248 self.assertEqual(nbNode,1)
1249 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1251 def test_37_getOpenRoadmNetwork(self):
1252 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1253 .format(self.restconf_baseurl))
1254 headers = {'content-type': 'application/json'}
1255 response = requests.request(
1256 "GET", url, headers=headers, auth=('admin', 'admin'))
1257 self.assertEqual(response.status_code, requests.codes.ok)
1258 res = response.json()
1259 nbNode=len(res['network'][0]['node'])
1260 self.assertEqual(nbNode,1)
1261 for i in range(0,nbNode):
1262 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDR-A1')
1264 def test_38_getNodes_OpenRoadmTopology(self):
1265 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1266 .format(self.restconf_baseurl))
1267 headers = {'content-type': 'application/json'}
1268 response = requests.request(
1269 "GET", url, headers=headers, auth=('admin', 'admin'))
1270 res = response.json()
1271 #Tests related to nodes
1272 self.assertEqual(response.status_code, requests.codes.ok)
1273 nbNode=len(res['network'][0]['node'])
1274 self.assertEqual(nbNode,4)
1275 listNode=['ROADM-A1-SRG1','ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
1276 for i in range(0,nbNode):
1277 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1278 res['network'][0]['node'][i]['supporting-node'])
1279 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1280 nodeId=res['network'][0]['node'][i]['node-id']
1281 if(nodeId=='ROADM-A1-SRG1'):
1282 #Test related to SRG1
1283 self.assertEqual(nodeType,'SRG')
1284 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1285 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1286 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1287 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1288 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1289 listNode.remove(nodeId)
1290 elif(nodeId=='ROADM-A1-SRG3'):
1291 #Test related to SRG1
1292 self.assertEqual(nodeType,'SRG')
1293 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1294 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1295 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1296 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1297 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1298 listNode.remove(nodeId)
1299 elif(nodeId=='ROADM-A1-DEG1'):
1300 #Test related to DEG1
1301 self.assertEqual(nodeType,'DEGREE')
1302 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1303 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1304 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1305 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1306 listNode.remove(nodeId)
1307 elif(nodeId=='ROADM-A1-DEG2'):
1308 #Test related to DEG2
1309 self.assertEqual(nodeType,'DEGREE')
1310 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1311 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1312 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1313 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1314 listNode.remove(nodeId)
1316 self.assertFalse(True)
1317 self.assertEqual(len(listNode),0)
1319 def test_39_disconnect_ROADM_XPDRA_link(self):
1321 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1322 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1323 .format(self.restconf_baseurl))
1325 headers = {'content-type': 'application/json'}
1326 response = requests.request(
1327 "DELETE", url, data=json.dumps(data), headers=headers,
1328 auth=('admin', 'admin'))
1329 self.assertEqual(response.status_code, requests.codes.ok)
1331 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1332 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1333 .format(self.restconf_baseurl))
1335 headers = {'content-type': 'application/json'}
1336 response = requests.request(
1337 "DELETE", url, data=json.dumps(data), headers=headers,
1338 auth=('admin', 'admin'))
1339 self.assertEqual(response.status_code, requests.codes.ok)
1341 def test_40_getLinks_OpenRoadmTopology(self):
1342 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1343 .format(self.restconf_baseurl))
1344 headers = {'content-type': 'application/json'}
1345 response = requests.request(
1346 "GET", url, headers=headers, auth=('admin', 'admin'))
1347 self.assertEqual(response.status_code, requests.codes.ok)
1348 res = response.json()
1349 nbLink=len(res['network'][0]['ietf-network-topology:link'])
1350 self.assertEqual(nbLink, 16)
1351 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1352 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1353 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1354 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1355 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1356 roadmtoroadmLink = 0
1357 for i in range(0,nbLink):
1358 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='EXPRESS-LINK'):
1359 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1360 find= link_id in expressLink
1361 self.assertEqual(find, True)
1362 expressLink.remove(link_id)
1363 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='ADD-LINK'):
1364 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1365 find= link_id in addLink
1366 self.assertEqual(find, True)
1367 addLink.remove(link_id)
1368 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='DROP-LINK'):
1369 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1370 find= link_id in dropLink
1371 self.assertEqual(find, True)
1372 dropLink.remove(link_id)
1374 roadmtoroadmLink += 1
1375 self.assertEqual(len(expressLink),0)
1376 self.assertEqual(len(addLink),0)
1377 self.assertEqual(len(dropLink),0)
1378 self.assertEqual(roadmtoroadmLink, 6)
1379 for i in range(0,nbLink):
1380 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'XPONDER-OUTPUT')
1381 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'XPONDER-INPUT')
1383 def test_41_disconnect_ROADMA(self):
1384 url = ("{}/config/network-topology:"
1385 "network-topology/topology/topology-netconf/node/ROADM-A1"
1386 .format(self.restconf_baseurl))
1388 headers = {'content-type': 'application/json'}
1389 response = requests.request(
1390 "DELETE", url, data=json.dumps(data), headers=headers,
1391 auth=('admin', 'admin'))
1392 self.assertEqual(response.status_code, requests.codes.ok)
1393 #Delete in the clli-network
1394 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1395 .format(self.restconf_baseurl))
1397 headers = {'content-type': 'application/json'}
1398 response = requests.request(
1399 "DELETE", url, data=json.dumps(data), headers=headers,
1400 auth=('admin', 'admin'))
1401 self.assertEqual(response.status_code, requests.codes.ok)
1403 def test_42_getClliNetwork(self):
1404 url = ("{}/config/ietf-network:networks/network/clli-network"
1405 .format(self.restconf_baseurl))
1406 headers = {'content-type': 'application/json'}
1407 response = requests.request(
1408 "GET", url, headers=headers, auth=('admin', 'admin'))
1409 self.assertEqual(response.status_code, requests.codes.ok)
1410 res = response.json()
1411 self.assertNotIn('node', res['network'][0])
1413 def test_43_getOpenRoadmNetwork(self):
1414 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1415 .format(self.restconf_baseurl))
1416 headers = {'content-type': 'application/json'}
1417 response = requests.request(
1418 "GET", url, headers=headers, auth=('admin', 'admin'))
1419 self.assertEqual(response.status_code, requests.codes.ok)
1420 res = response.json()
1421 self.assertNotIn('node', res['network'][0])
1423 def test_44_check_roadm2roadm_link_persistence(self):
1424 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1425 .format(self.restconf_baseurl))
1426 headers = {'content-type': 'application/json'}
1427 response = requests.request(
1428 "GET", url, headers=headers, auth=('admin', 'admin'))
1429 self.assertEqual(response.status_code, requests.codes.ok)
1430 res = response.json()
1431 nbLink=len(res['network'][0]['ietf-network-topology:link'])
1432 self.assertNotIn('node', res['network'][0])
1433 self.assertEqual(nbLink, 6)
1435 if __name__ == "__main__":
1436 unittest.main(verbosity=2)