3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
23 class TransportPCETopologyTesting(unittest.TestCase):
25 honeynode_process1 = None
26 honeynode_process2 = None
27 honeynode_process3 = None
28 honeynode_process4 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
32 #START_IGNORE_XTESTING
35 def __init_logfile(cls):
36 if not os.path.exists("transportpce_tests/log"):
37 os.makedirs("transportpce_tests/log")
38 if os.path.isfile("./transportpce_tests/log/response.log"):
39 os.remove("transportpce_tests/log/response.log")
42 def __start_honeynode1(cls):
43 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
44 "/honeynode-distribution-1.18.01/honeycomb-tpce")
45 if os.path.isfile(executable):
46 with open('honeynode1.log', 'w') as outfile:
47 cls.honeynode_process1 = subprocess.Popen(
48 [executable, "17830", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
52 def __start_honeynode2(cls):
53 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
54 "/honeynode-distribution-1.18.01/honeycomb-tpce")
55 if os.path.isfile(executable):
56 with open('honeynode2.log', 'w') as outfile:
57 cls.honeynode_process2 = subprocess.Popen(
58 [executable, "17831", "sample_configs/openroadm/2.1/oper-ROADMA.xml"],
62 def __start_honeynode3(cls):
63 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
64 "/honeynode-distribution-1.18.01/honeycomb-tpce")
65 if os.path.isfile(executable):
66 with open('honeynode3.log', 'w') as outfile:
67 cls.honeynode_process3 = subprocess.Popen(
68 [executable, "17832", "sample_configs/openroadm/2.1/oper-ROADMB.xml"],
72 def __start_honeynode4(cls):
73 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
74 "/honeynode-distribution-1.18.01/honeycomb-tpce")
75 if os.path.isfile(executable):
76 with open('honeynode4.log', 'w') as outfile:
77 cls.honeynode_process4 = subprocess.Popen(
78 [executable, "17833", "sample_configs/openroadm/2.1/oper-ROADMC.xml"],
83 executable = "../karaf/target/assembly/bin/karaf"
84 with open('odl.log', 'w') as outfile:
85 cls.odl_process = subprocess.Popen(
86 ["bash", executable, "server"], stdout=outfile,
87 stdin=open(os.devnull))
93 cls.__start_honeynode1()
95 cls.__start_honeynode2()
97 cls.__start_honeynode3()
99 cls.__start_honeynode4()
105 def tearDownClass(cls):
106 for child in psutil.Process(cls.odl_process.pid).children():
107 child.send_signal(signal.SIGINT)
109 cls.odl_process.send_signal(signal.SIGINT)
110 cls.odl_process.wait()
111 for child in psutil.Process(cls.honeynode_process1.pid).children():
112 child.send_signal(signal.SIGINT)
114 cls.honeynode_process1.send_signal(signal.SIGINT)
115 cls.honeynode_process1.wait()
116 for child in psutil.Process(cls.honeynode_process2.pid).children():
117 child.send_signal(signal.SIGINT)
119 cls.honeynode_process2.send_signal(signal.SIGINT)
120 cls.honeynode_process2.wait()
122 for child in psutil.Process(cls.honeynode_process3.pid).children():
123 child.send_signal(signal.SIGINT)
125 cls.honeynode_process3.send_signal(signal.SIGINT)
126 cls.honeynode_process3.wait()
128 for child in psutil.Process(cls.honeynode_process4.pid).children():
129 child.send_signal(signal.SIGINT)
131 cls.honeynode_process4.send_signal(signal.SIGINT)
132 cls.honeynode_process4.wait()
135 print ("execution of {}".format(self.id().split(".")[-1]))
140 def test_01_connect_ROADMA(self):
142 url = ("{}/config/network-topology:"
143 "network-topology/topology/topology-netconf/node/ROADMA"
144 .format(self.restconf_baseurl))
147 "netconf-node-topology:username": "admin",
148 "netconf-node-topology:password": "admin",
149 "netconf-node-topology:host": "127.0.0.1",
150 "netconf-node-topology:port": "17831",
151 "netconf-node-topology:tcp-only": "false",
152 "netconf-node-topology:pass-through": {}}]}
153 headers = {'content-type': 'application/json'}
154 response = requests.request(
155 "PUT", url, data=json.dumps(data), headers=headers,
156 auth=('admin', 'admin'))
157 self.assertEqual(response.status_code, requests.codes.created)
160 def test_02_getClliNetwork(self):
161 url = ("{}/config/ietf-network:networks/network/clli-network"
162 .format(self.restconf_baseurl))
163 headers = {'content-type': 'application/json'}
164 response = requests.request(
165 "GET", url, headers=headers, auth=('admin', 'admin'))
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
169 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
171 def test_03_getOpenRoadmNetwork(self):
172 url = ("{}/config/ietf-network:networks/network/openroadm-network"
173 .format(self.restconf_baseurl))
174 headers = {'content-type': 'application/json'}
175 response = requests.request(
176 "GET", url, headers=headers, auth=('admin', 'admin'))
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADMA')
180 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
181 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
182 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:node-type'],'ROADM')
183 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'2')
185 def test_04_getLinks_OpenroadmTopology(self):
186 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
187 .format(self.restconf_baseurl))
188 headers = {'content-type': 'application/json'}
189 response = requests.request(
190 "GET", url, headers=headers, auth=('admin', 'admin'))
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
193 #Tests related to links
194 nbLink=len(res['network'][0]['ietf-network-topology:link'])
195 self.assertEqual(nbLink,6)
196 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
197 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
198 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
199 for i in range(0,nbLink):
200 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
201 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
202 find= linkId in expressLink
203 self.assertEqual(find, True)
204 expressLink.remove(linkId)
205 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
206 find= linkId in addLink
207 self.assertEqual(find, True)
208 addLink.remove(linkId)
209 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
210 find= linkId in dropLink
211 self.assertEqual(find, True)
212 dropLink.remove(linkId)
214 self.assertFalse(True)
215 self.assertEqual(len(expressLink),0)
216 self.assertEqual(len(addLink),0)
217 self.assertEqual(len(dropLink),0)
219 def test_05_getNodes_OpenRoadmTopology(self):
220 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
221 .format(self.restconf_baseurl))
222 headers = {'content-type': 'application/json'}
223 response = requests.request(
224 "GET", url, headers=headers, auth=('admin', 'admin'))
225 res = response.json()
226 #Tests related to nodes
227 self.assertEqual(response.status_code, requests.codes.ok)
228 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
229 outfile1.write(str(len(res['network'][0]['node'])))
230 nbNode=len(res['network'][0]['node'])
231 self.assertEqual(nbNode,3)
232 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
233 for i in range(0,nbNode):
234 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
235 res['network'][0]['node'][i]['supporting-node'])
236 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
237 nodeId=res['network'][0]['node'][i]['node-id']
238 if(nodeId=='ROADMA-SRG1'):
239 #Test related to SRG1
240 self.assertEqual(nodeType,'SRG')
241 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
242 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
243 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
244 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
245 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
246 listNode.remove(nodeId)
247 elif(nodeId=='ROADMA-DEG1'):
248 #Test related to DEG1
249 self.assertEqual(nodeType,'DEGREE')
250 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
251 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
252 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
253 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
254 listNode.remove(nodeId)
255 elif(nodeId=='ROADMA-DEG2'):
256 #Test related to DEG2
257 self.assertEqual(nodeType,'DEGREE')
258 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
259 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
260 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
261 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
262 listNode.remove(nodeId)
264 self.assertFalse(True)
265 self.assertEqual(len(listNode),0)
267 def test_06_connect_XPDRA(self):
268 url = ("{}/config/network-topology:"
269 "network-topology/topology/topology-netconf/node/XPDRA"
270 .format(self.restconf_baseurl))
273 "netconf-node-topology:username": "admin",
274 "netconf-node-topology:password": "admin",
275 "netconf-node-topology:host": "127.0.0.1",
276 "netconf-node-topology:port": "17830",
277 "netconf-node-topology:tcp-only": "false",
278 "netconf-node-topology:pass-through": {}}]}
279 headers = {'content-type': 'application/json'}
280 response = requests.request(
281 "PUT", url, data=json.dumps(data), headers=headers,
282 auth=('admin', 'admin'))
283 self.assertEqual(response.status_code, requests.codes.created)
286 def test_07_getClliNetwork(self):
287 url = ("{}/config/ietf-network:networks/network/clli-network"
288 .format(self.restconf_baseurl))
289 headers = {'content-type': 'application/json'}
290 response = requests.request(
291 "GET", url, headers=headers, auth=('admin', 'admin'))
292 self.assertEqual(response.status_code, requests.codes.ok)
293 res = response.json()
294 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
295 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
297 def test_08_getOpenRoadmNetwork(self):
298 url = ("{}/config/ietf-network:networks/network/openroadm-network"
299 .format(self.restconf_baseurl))
300 headers = {'content-type': 'application/json'}
301 response = requests.request(
302 "GET", url, headers=headers, auth=('admin', 'admin'))
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 nbNode=len(res['network'][0]['node'])
306 self.assertEqual(nbNode,2)
307 for i in range(0,nbNode):
308 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
309 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
310 nodeId=res['network'][0]['node'][i]['node-id']
312 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
313 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
314 elif(nodeId=='ROADMA'):
315 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
316 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
318 self.assertFalse(True)
320 def test_09_getNodes_OpenRoadmTopology(self):
321 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
322 .format(self.restconf_baseurl))
323 headers = {'content-type': 'application/json'}
324 response = requests.request(
325 "GET", url, headers=headers, auth=('admin', 'admin'))
326 res = response.json()
327 #Tests related to nodes
328 self.assertEqual(response.status_code, requests.codes.ok)
329 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
330 outfile1.write(str(len(res['network'][0]['node'])))
331 nbNode=len(res['network'][0]['node'])
332 self.assertEqual(nbNode,4)
333 listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
334 for i in range(0,nbNode):
335 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
336 nodeId=res['network'][0]['node'][i]['node-id']
337 #Tests related to XPDRA nodes
338 if(nodeId=='XPDRA-XPDR1'):
339 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
340 res['network'][0]['node'][i]['supporting-node'])
341 self.assertEqual(nodeType,'XPONDER')
342 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
343 self.assertTrue(nbTps >= 2)
346 for j in range(0,nbTps):
347 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
348 if (tpType=='XPONDER-CLIENT'):
350 elif (tpType=='XPONDER-NETWORK'):
352 self.assertTrue(client > 0)
353 self.assertTrue(network > 0)
354 listNode.remove(nodeId)
355 elif(nodeId=='ROADMA-SRG1'):
356 #Test related to SRG1
357 self.assertEqual(nodeType,'SRG')
358 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
359 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
360 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
361 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
362 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
364 res['network'][0]['node'][i]['supporting-node'])
365 listNode.remove(nodeId)
366 elif(nodeId=='ROADMA-DEG1'):
367 #Test related to DEG1
368 self.assertEqual(nodeType,'DEGREE')
369 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
370 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
371 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
374 res['network'][0]['node'][i]['supporting-node'])
375 listNode.remove(nodeId)
376 elif(nodeId=='ROADMA-DEG2'):
377 #Test related to DEG2
378 self.assertEqual(nodeType,'DEGREE')
379 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
380 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
381 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
382 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
383 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
384 res['network'][0]['node'][i]['supporting-node'])
385 listNode.remove(nodeId)
387 self.assertFalse(True)
388 self.assertEqual(len(listNode),0)
390 #Connect the tail XPDRA to ROADMA and vice versa
391 def test_10_connect_tail_xpdr_rdm(self):
392 #Connect the tail: XPDRA to ROADMA
393 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
394 .format(self.restconf_baseurl))
395 data = {"networkutils:input": {
396 "networkutils:links-input": {
397 "networkutils:xpdr-node": "XPDRA",
398 "networkutils:xpdr-num": "1",
399 "networkutils:network-num": "1",
400 "networkutils:rdm-node": "ROADMA",
401 "networkutils:srg-num": "1",
402 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
406 headers = {'content-type': 'application/json'}
407 response = requests.request(
408 "POST", url, data=json.dumps(data), headers=headers,
409 auth=('admin', 'admin'))
410 self.assertEqual(response.status_code, requests.codes.ok)
413 def test_11_connect_tail_rdm_xpdr(self):
414 #Connect the tail: ROADMA to XPDRA
415 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
416 .format(self.restconf_baseurl))
417 data = {"networkutils:input": {
418 "networkutils:links-input": {
419 "networkutils:xpdr-node": "XPDRA",
420 "networkutils:xpdr-num": "1",
421 "networkutils:network-num": "1",
422 "networkutils:rdm-node": "ROADMA",
423 "networkutils:srg-num": "1",
424 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
428 headers = {'content-type': 'application/json'}
429 response = requests.request(
430 "POST", url, data=json.dumps(data), headers=headers,
431 auth=('admin', 'admin'))
432 self.assertEqual(response.status_code, requests.codes.ok)
435 def test_12_getLinks_OpenRoadmTopology(self):
436 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
437 .format(self.restconf_baseurl))
438 headers = {'content-type': 'application/json'}
439 response = requests.request(
440 "GET", url, headers=headers, auth=('admin', 'admin'))
441 self.assertEqual(response.status_code, requests.codes.ok)
442 res = response.json()
443 #Tests related to links
444 nbLink=len(res['network'][0]['ietf-network-topology:link'])
445 self.assertEqual(nbLink,8)
446 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
447 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
448 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
449 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
450 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
451 for i in range(0,nbLink):
452 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
453 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
454 if(nodeType=='EXPRESS-LINK'):
455 find= linkId in expressLink
456 self.assertEqual(find, True)
457 expressLink.remove(linkId)
458 elif(nodeType=='ADD-LINK'):
459 find= linkId in addLink
460 self.assertEqual(find, True)
461 addLink.remove(linkId)
462 elif(nodeType=='DROP-LINK'):
463 find= linkId in dropLink
464 self.assertEqual(find, True)
465 dropLink.remove(linkId)
466 elif(nodeType=='XPONDER-INPUT'):
467 find= linkId in XPDR_IN
468 self.assertEqual(find, True)
469 XPDR_IN.remove(linkId)
470 elif(nodeType=='XPONDER-OUTPUT'):
471 find= linkId in XPDR_OUT
472 self.assertEqual(find, True)
473 XPDR_OUT.remove(linkId)
475 self.assertFalse(True)
476 self.assertEqual(len(expressLink),0)
477 self.assertEqual(len(addLink),0)
478 self.assertEqual(len(dropLink),0)
479 self.assertEqual(len(XPDR_IN),0)
480 self.assertEqual(len(XPDR_OUT),0)
482 def test_13_connect_ROADMC(self):
484 url = ("{}/config/network-topology:"
485 "network-topology/topology/topology-netconf/node/ROADMC"
486 .format(self.restconf_baseurl))
489 "netconf-node-topology:username": "admin",
490 "netconf-node-topology:password": "admin",
491 "netconf-node-topology:host": "127.0.0.1",
492 "netconf-node-topology:port": "17833",
493 "netconf-node-topology:tcp-only": "false",
494 "netconf-node-topology:pass-through": {}}]}
495 headers = {'content-type': 'application/json'}
496 response = requests.request(
497 "PUT", url, data=json.dumps(data), headers=headers,
498 auth=('admin', 'admin'))
499 self.assertEqual(response.status_code, requests.codes.created)
502 def test_14_omsAttributes_ROADMA_ROADMC(self):
503 # Config ROADMA-ROADMC oms-attributes
504 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
505 "link/ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
506 "OMS-attributes/span"
507 .format(self.restconf_baseurl))
510 "auto-spanloss": "true",
511 "engineered-spanloss": 12.2,
512 "link-concatenation": [{
515 "SRLG-length": 100000,
517 headers = {'content-type': 'application/json'}
518 response = requests.request(
519 "PUT", url, data=json.dumps(data), headers=headers,
520 auth=('admin', 'admin'))
521 self.assertEqual(response.status_code, requests.codes.created)
523 def test_15_omsAttributes_ROADMC_ROADMA(self):
524 # Config ROADMC-ROADMA oms-attributes
525 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
526 "link/ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
527 "OMS-attributes/span"
528 .format(self.restconf_baseurl))
531 "auto-spanloss": "true",
532 "engineered-spanloss": 12.2,
533 "link-concatenation": [{
536 "SRLG-length": 100000,
538 headers = {'content-type': 'application/json'}
539 response = requests.request(
540 "PUT", url, data=json.dumps(data), headers=headers,
541 auth=('admin', 'admin'))
542 self.assertEqual(response.status_code, requests.codes.created)
544 def test_16_getClliNetwork(self):
545 url = ("{}/config/ietf-network:networks/network/clli-network"
546 .format(self.restconf_baseurl))
547 headers = {'content-type': 'application/json'}
548 response = requests.request(
549 "GET", url, headers=headers, auth=('admin', 'admin'))
550 self.assertEqual(response.status_code, requests.codes.ok)
551 res = response.json()
552 nbNode=len(res['network'][0]['node'])
553 listNode=['NodeA','NodeC']
554 for i in range(0,nbNode):
555 nodeId = res['network'][0]['node'][i]['node-id']
556 find= nodeId in listNode
557 self.assertEqual(find, True)
559 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
561 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
562 listNode.remove(nodeId)
564 self.assertEqual(len(listNode),0)
566 def test_17_getOpenRoadmNetwork(self):
567 url = ("{}/config/ietf-network:networks/network/openroadm-network"
568 .format(self.restconf_baseurl))
569 headers = {'content-type': 'application/json'}
570 response = requests.request(
571 "GET", url, headers=headers, auth=('admin', 'admin'))
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 nbNode=len(res['network'][0]['node'])
575 self.assertEqual(nbNode,3)
576 listNode=['XPDRA','ROADMA','ROADMC']
577 for i in range(0,nbNode):
578 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
579 nodeId=res['network'][0]['node'][i]['node-id']
581 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
582 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
583 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
584 listNode.remove(nodeId)
585 elif(nodeId=='ROADMA'):
586 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
587 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
588 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
589 listNode.remove(nodeId)
590 elif(nodeId=='ROADMC'):
591 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
592 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
593 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
594 listNode.remove(nodeId)
596 self.assertFalse(True)
597 self.assertEqual(len(listNode),0)
599 def test_18_getROADMLinkOpenRoadmTopology(self):
600 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
601 .format(self.restconf_baseurl))
602 headers = {'content-type': 'application/json'}
603 response = requests.request(
604 "GET", url, headers=headers, auth=('admin', 'admin'))
605 self.assertEqual(response.status_code, requests.codes.ok)
606 res = response.json()
607 #Tests related to links
608 nbLink=len(res['network'][0]['ietf-network-topology:link'])
609 self.assertEqual(nbLink,16)
610 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX',
611 'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX','ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX']
612 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',
613 'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX','ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX']
614 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX',
615 'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX','ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX']
616 R2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX']
617 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
618 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
619 for i in range(0,nbLink):
620 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
621 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
622 if(nodeType=='EXPRESS-LINK'):
623 find= linkId in expressLink
624 self.assertEqual(find, True)
625 expressLink.remove(linkId)
626 elif(nodeType=='ADD-LINK'):
627 find= linkId in addLink
628 self.assertEqual(find, True)
629 addLink.remove(linkId)
630 elif(nodeType=='DROP-LINK'):
631 find= linkId in dropLink
632 self.assertEqual(find, True)
633 dropLink.remove(linkId)
634 elif(nodeType=='ROADM-TO-ROADM'):
635 find= linkId in R2RLink
636 self.assertEqual(find, True)
637 R2RLink.remove(linkId)
638 elif(nodeType=='XPONDER-INPUT'):
639 find= linkId in XPDR_IN
640 self.assertEqual(find, True)
641 XPDR_IN.remove(linkId)
642 elif(nodeType=='XPONDER-OUTPUT'):
643 find= linkId in XPDR_OUT
644 self.assertEqual(find, True)
645 XPDR_OUT.remove(linkId)
647 self.assertFalse(True)
648 self.assertEqual(len(expressLink),0)
649 self.assertEqual(len(addLink),0)
650 self.assertEqual(len(dropLink),0)
651 self.assertEqual(len(R2RLink),0)
652 self.assertEqual(len(XPDR_IN),0)
653 self.assertEqual(len(XPDR_OUT),0)
655 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
656 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
657 .format(self.restconf_baseurl))
658 headers = {'content-type': 'application/json'}
659 response = requests.request(
660 "GET", url, headers=headers, auth=('admin', 'admin'))
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 #Tests related to links
664 nbLink=len(res['network'][0]['ietf-network-topology:link'])
665 self.assertEqual(nbLink,16)
666 R2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX',
667 'ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX']
668 for i in range(0,nbLink):
669 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
670 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
671 if(link_id in R2RLink):
673 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
674 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
675 length = res['network'][0]['ietf-network-topology:link'][i][
676 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
677 if((spanLoss!=None)&(length!=None)):
679 self.assertTrue(find)
680 R2RLink.remove(link_id)
681 self.assertEqual(len(R2RLink),0)
683 def test_20_getNodes_OpenRoadmTopology(self):
684 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
685 .format(self.restconf_baseurl))
686 headers = {'content-type': 'application/json'}
687 response = requests.request(
688 "GET", url, headers=headers, auth=('admin', 'admin'))
689 res = response.json()
690 #Tests related to nodes
691 self.assertEqual(response.status_code, requests.codes.ok)
692 nbNode=len(res['network'][0]['node'])
693 self.assertEqual(nbNode,7)
694 listNode=['XPDRA-XPDR1',
695 'ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2',
696 'ROADMC-SRG1','ROADMC-DEG1','ROADMC-DEG2']
697 #************************Tests related to XPDRA nodes
698 for i in range(0,nbNode):
699 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
700 nodeId=res['network'][0]['node'][i]['node-id']
701 if(nodeId=='XPDRA-XPDR1'):
702 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
703 res['network'][0]['node'][i]['supporting-node'])
704 self.assertEqual(nodeType,'XPONDER')
705 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
706 self.assertTrue(nbTps >= 2)
709 for j in range(0,nbTps):
710 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
711 if (tpType=='XPONDER-CLIENT'):
713 elif (tpType=='XPONDER-NETWORK'):
715 self.assertTrue(client > 0)
716 self.assertTrue(network > 0)
717 listNode.remove(nodeId)
718 elif(nodeId=='ROADMA-SRG1'):
719 #Test related to SRG1
720 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
721 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
722 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
723 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
724 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
725 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
726 res['network'][0]['node'][i]['supporting-node'])
727 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
728 listNode.remove(nodeId)
729 elif(nodeId=='ROADMA-DEG1'):
730 #Test related to DEG1
731 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
732 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
733 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
734 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
735 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
736 res['network'][0]['node'][i]['supporting-node'])
737 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
738 listNode.remove(nodeId)
739 elif(nodeId=='ROADMA-DEG2'):
740 #Test related to DEG2
741 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
742 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
743 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
744 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
745 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
746 res['network'][0]['node'][i]['supporting-node'])
747 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
748 listNode.remove(nodeId)
749 elif(nodeId=='ROADMC-SRG1'):
750 #Test related to SRG1
751 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
752 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
753 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
754 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
755 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
756 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
757 res['network'][0]['node'][i]['supporting-node'])
758 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
759 listNode.remove(nodeId)
760 elif(nodeId=='ROADMC-DEG1'):
761 #Test related to DEG1
762 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
763 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
764 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
765 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
766 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
767 res['network'][0]['node'][i]['supporting-node'])
768 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
769 listNode.remove(nodeId)
770 elif(nodeId=='ROADMC-DEG2'):
771 #Test related to DEG2
772 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
773 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
774 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
775 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
776 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
777 res['network'][0]['node'][i]['supporting-node'])
778 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
779 listNode.remove(nodeId)
781 self.assertFalse(True)
782 self.assertEqual(len(listNode),0)
784 def test_21_connect_ROADMB(self):
785 url = ("{}/config/network-topology:"
786 "network-topology/topology/topology-netconf/node/ROADMB"
787 .format(self.restconf_baseurl))
790 "netconf-node-topology:username": "admin",
791 "netconf-node-topology:password": "admin",
792 "netconf-node-topology:host": "127.0.0.1",
793 "netconf-node-topology:port": "17832",
794 "netconf-node-topology:tcp-only": "false",
795 "netconf-node-topology:pass-through": {}}]}
796 headers = {'content-type': 'application/json'}
797 response = requests.request(
798 "PUT", url, data=json.dumps(data), headers=headers,
799 auth=('admin', 'admin'))
800 self.assertEqual(response.status_code, requests.codes.created)
803 def test_22_omsAttributes_ROADMA_ROADMB(self):
804 # Config ROADMA-ROADMB oms-attributes
805 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
806 "link/ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
807 "OMS-attributes/span"
808 .format(self.restconf_baseurl))
811 "auto-spanloss": "true",
812 "engineered-spanloss": 12.2,
813 "link-concatenation": [{
816 "SRLG-length": 100000,
818 headers = {'content-type': 'application/json'}
819 response = requests.request(
820 "PUT", url, data=json.dumps(data), headers=headers,
821 auth=('admin', 'admin'))
822 self.assertEqual(response.status_code, requests.codes.created)
824 def test_23_omsAttributes_ROADMB_ROADMA(self):
825 # Config ROADMB-ROADMA oms-attributes
826 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
827 "link/ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
828 "OMS-attributes/span"
829 .format(self.restconf_baseurl))
832 "auto-spanloss": "true",
833 "engineered-spanloss": 12.2,
834 "link-concatenation": [{
837 "SRLG-length": 100000,
839 headers = {'content-type': 'application/json'}
840 response = requests.request(
841 "PUT", url, data=json.dumps(data), headers=headers,
842 auth=('admin', 'admin'))
843 self.assertEqual(response.status_code, requests.codes.created)
845 def test_24_omsAttributes_ROADMB_ROADMC(self):
846 # Config ROADMB-ROADMC oms-attributes
847 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
848 "link/ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
849 "OMS-attributes/span"
850 .format(self.restconf_baseurl))
853 "auto-spanloss": "true",
854 "engineered-spanloss": 12.2,
855 "link-concatenation": [{
858 "SRLG-length": 100000,
860 headers = {'content-type': 'application/json'}
861 response = requests.request(
862 "PUT", url, data=json.dumps(data), headers=headers,
863 auth=('admin', 'admin'))
864 self.assertEqual(response.status_code, requests.codes.created)
866 def test_25_omsAttributes_ROADMC_ROADMB(self):
867 # Config ROADMC-ROADMB oms-attributes
868 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
869 "link/ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
870 "OMS-attributes/span"
871 .format(self.restconf_baseurl))
874 "auto-spanloss": "true",
875 "engineered-spanloss": 12.2,
876 "link-concatenation": [{
879 "SRLG-length": 100000,
881 headers = {'content-type': 'application/json'}
882 response = requests.request(
883 "PUT", url, data=json.dumps(data), headers=headers,
884 auth=('admin', 'admin'))
885 self.assertEqual(response.status_code, requests.codes.created)
887 def test_26_getClliNetwork(self):
888 url = ("{}/config/ietf-network:networks/network/clli-network"
889 .format(self.restconf_baseurl))
890 headers = {'content-type': 'application/json'}
891 response = requests.request(
892 "GET", url, headers=headers, auth=('admin', 'admin'))
893 self.assertEqual(response.status_code, requests.codes.ok)
894 res = response.json()
895 nbNode=len(res['network'][0]['node'])
896 listNode=['NodeA','NodeB','NodeC']
897 for i in range(0,nbNode):
898 nodeId = res['network'][0]['node'][i]['node-id']
899 find= nodeId in listNode
900 self.assertEqual(find, True)
902 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
903 elif(nodeId=='NodeB'):
904 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
906 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
907 listNode.remove(nodeId)
909 self.assertEqual(len(listNode),0)
911 def test_27_verifyDegree(self):
912 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
913 .format(self.restconf_baseurl))
914 headers = {'content-type': 'application/json'}
915 response = requests.request(
916 "GET", url, headers=headers, auth=('admin', 'admin'))
917 self.assertEqual(response.status_code, requests.codes.ok)
918 res = response.json()
919 #Tests related to links
920 nbLink=len(res['network'][0]['ietf-network-topology:link'])
921 listR2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX',
922 'ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX','ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX',
923 'ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX','ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX']
924 for i in range(0,nbLink):
925 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'] == 'ROADM-TO-ROADM':
926 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
927 find= link_id in listR2RLink
928 self.assertEqual(find, True)
929 listR2RLink.remove(link_id)
930 self.assertEqual(len(listR2RLink),0)
932 def test_28_verifyOppositeLinkTopology(self):
933 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
934 .format(self.restconf_baseurl))
935 headers = {'content-type': 'application/json'}
936 response = requests.request(
937 "GET", url, headers=headers, auth=('admin', 'admin'))
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 #Write the response in the log
941 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
942 outfile1.write(str(res))
943 #Tests related to links
944 nbLink=len(res['network'][0]['ietf-network-topology:link'])
945 self.assertEqual(nbLink,26)
946 for i in range(0,nbLink):
947 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
948 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
949 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
950 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
951 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
952 #Find the opposite link
953 url_oppLink="{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
954 url = (url_oppLink.format(self.restconf_baseurl))
955 headers = {'content-type': 'application/json'}
956 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
957 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
958 res_oppLink = response_oppLink.json()
959 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:opposite-link'],link_id)
960 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
961 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
962 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-network-topology:link-type']
963 if link_type=='ADD-LINK':
964 self.assertEqual(oppLink_type, 'DROP-LINK')
965 elif link_type=='DROP-LINK':
966 self.assertEqual(oppLink_type, 'ADD-LINK')
967 elif link_type=='EXPRESS-LINK':
968 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
969 elif link_type=='ROADM-TO-ROADM':
970 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
971 elif link_type=='XPONDER-INPUT':
972 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
973 elif link_type=='XPONDER-OUTPUT':
974 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
977 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
978 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
979 .format(self.restconf_baseurl))
980 headers = {'content-type': 'application/json'}
981 response = requests.request(
982 "GET", url, headers=headers, auth=('admin', 'admin'))
983 self.assertEqual(response.status_code, requests.codes.ok)
984 res = response.json()
985 nbLink=len(res['network'][0]['ietf-network-topology:link'])
986 R2RLink = ['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX',
987 'ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX',
988 'ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX',
989 'ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX',
990 'ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX',
991 'ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX']
992 for i in range(0,nbLink):
993 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
994 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
995 if(link_id in R2RLink):
997 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
998 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
999 length = res['network'][0]['ietf-network-topology:link'][i][
1000 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
1001 if((spanLoss!=None)&(length!=None)):
1003 self.assertTrue(find)
1004 R2RLink.remove(link_id)
1005 self.assertEqual(len(R2RLink),0)
1007 def test_30_disconnect_ROADMB(self):
1008 #Delete in the topology-netconf
1009 url = ("{}/config/network-topology:"
1010 "network-topology/topology/topology-netconf/node/ROADMB"
1011 .format(self.restconf_baseurl))
1013 headers = {'content-type': 'application/json'}
1014 response = requests.request(
1015 "DELETE", url, data=json.dumps(data), headers=headers,
1016 auth=('admin', 'admin'))
1017 self.assertEqual(response.status_code, requests.codes.ok)
1018 #Delete in the clli-network
1019 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1020 .format(self.restconf_baseurl))
1022 headers = {'content-type': 'application/json'}
1023 response = requests.request(
1024 "DELETE", url, data=json.dumps(data), headers=headers,
1025 auth=('admin', 'admin'))
1026 self.assertEqual(response.status_code, requests.codes.ok)
1027 #Delete in the openroadm-network
1028 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMB"
1029 # .format(self.restconf_baseurl))
1031 # headers = {'content-type': 'application/json'}
1032 # response = requests.request(
1033 # "DELETE", url, data=json.dumps(data), headers=headers,
1034 # auth=('admin', 'admin'))
1035 # self.assertEqual(response.status_code, requests.codes.ok)
1037 def test_31_delete_ROADMA_ROADMB(self):
1038 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1039 "link/ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX"
1040 .format(self.restconf_baseurl))
1042 headers = {'content-type': 'application/json'}
1043 response = requests.request(
1044 "DELETE", url, data=json.dumps(data), headers=headers,
1045 auth=('admin', 'admin'))
1046 self.assertEqual(response.status_code, requests.codes.ok)
1048 def test_32_delete_ROADMB_ROADMA(self):
1049 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1050 "link/ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX"
1051 .format(self.restconf_baseurl))
1053 headers = {'content-type': 'application/json'}
1054 response = requests.request(
1055 "DELETE", url, data=json.dumps(data), headers=headers,
1056 auth=('admin', 'admin'))
1057 self.assertEqual(response.status_code, requests.codes.ok)
1059 def test_33_delete_ROADMB_ROADMC(self):
1060 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1061 "link/ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX"
1062 .format(self.restconf_baseurl))
1064 headers = {'content-type': 'application/json'}
1065 response = requests.request(
1066 "DELETE", url, data=json.dumps(data), headers=headers,
1067 auth=('admin', 'admin'))
1068 self.assertEqual(response.status_code, requests.codes.ok)
1070 def test_34_delete_ROADMC_ROADMB(self):
1071 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1072 "link/ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX"
1073 .format(self.restconf_baseurl))
1075 headers = {'content-type': 'application/json'}
1076 response = requests.request(
1077 "DELETE", url, data=json.dumps(data), headers=headers,
1078 auth=('admin', 'admin'))
1079 self.assertEqual(response.status_code, requests.codes.ok)
1081 def test_35_disconnect_ROADMC(self):
1082 #Delete in the topology-netconf
1083 url = ("{}/config/network-topology:"
1084 "network-topology/topology/topology-netconf/node/ROADMC"
1085 .format(self.restconf_baseurl))
1087 headers = {'content-type': 'application/json'}
1088 response = requests.request(
1089 "DELETE", url, data=json.dumps(data), headers=headers,
1090 auth=('admin', 'admin'))
1091 self.assertEqual(response.status_code, requests.codes.ok)
1092 #Delete in the clli-network
1093 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1094 .format(self.restconf_baseurl))
1096 headers = {'content-type': 'application/json'}
1097 response = requests.request(
1098 "DELETE", url, data=json.dumps(data), headers=headers,
1099 auth=('admin', 'admin'))
1100 self.assertEqual(response.status_code, requests.codes.ok)
1101 #Delete in the openroadm-network
1102 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMC"
1103 # .format(self.restconf_baseurl))
1105 # headers = {'content-type': 'application/json'}
1106 # response = requests.request(
1107 # "DELETE", url, data=json.dumps(data), headers=headers,
1108 # auth=('admin', 'admin'))
1109 # self.assertEqual(response.status_code, requests.codes.ok)
1111 # def test_24_getLinks_OpenRoadmTopology(self):
1112 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1113 # .format(self.restconf_baseurl))
1114 # headers = {'content-type': 'application/json'}
1115 # response = requests.request(
1116 # "GET", url, headers=headers, auth=('admin', 'admin'))
1117 # self.assertEqual(response.status_code, requests.codes.ok)
1118 # res = response.json()
1119 # #Write the response in the log
1120 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1121 # outfile1.write(str(res))
1122 # #Tests related to links
1123 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1124 # self.assertEqual(nbLink,8)
1125 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1126 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1127 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1128 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1129 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1130 # for i in range(0,nbLink):
1131 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
1132 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1133 # if(nodeType=='EXPRESS-LINK'):
1134 # find= linkId in expressLink
1135 # self.assertEqual(find, True)
1136 # expressLink.remove(linkId)
1137 # elif(nodeType=='ADD-LINK'):
1138 # find= linkId in addLink
1139 # self.assertEqual(find, True)
1140 # addLink.remove(linkId)
1141 # elif(nodeType=='DROP-LINK'):
1142 # find= linkId in dropLink
1143 # self.assertEqual(find, True)
1144 # dropLink.remove(linkId)
1145 # elif(nodeType=='XPONDER-INPUT'):
1146 # find= linkId in XPDR_IN
1147 # self.assertEqual(find, True)
1148 # XPDR_IN.remove(linkId)
1149 # elif(nodeType=='XPONDER-OUTPUT'):
1150 # find= linkId in XPDR_OUT
1151 # self.assertEqual(find, True)
1152 # XPDR_OUT.remove(linkId)
1154 # self.assertFalse(True)
1155 # self.assertEqual(len(expressLink),0)
1156 # self.assertEqual(len(addLink),0)
1157 # self.assertEqual(len(dropLink),0)
1158 # self.assertEqual(len(XPDR_IN),0)
1159 # self.assertEqual(len(XPDR_OUT),0)
1161 # for i in range(0,nbLink):
1162 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'ROADM-TO-ROADM')
1163 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1164 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1165 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1166 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1167 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1168 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1170 # def test_25_getNodes_OpenRoadmTopology(self):
1171 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1172 # .format(self.restconf_baseurl))
1173 # headers = {'content-type': 'application/json'}
1174 # response = requests.request(
1175 # "GET", url, headers=headers, auth=('admin', 'admin'))
1176 # res = response.json()
1177 # #Tests related to nodes
1178 # self.assertEqual(response.status_code, requests.codes.ok)
1179 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1180 # outfile1.write(str(len(res['network'][0]['node'])))
1181 # nbNode=len(res['network'][0]['node'])
1182 # self.assertEqual(nbNode,4)
1183 # listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
1184 # for i in range(0,nbNode):
1185 # nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
1186 # nodeId=res['network'][0]['node'][i]['node-id']
1187 # #Tests related to XPDRA nodes
1188 # if(nodeId=='XPDRA-XPDR1'):
1189 # self.assertEqual(nodeType,'XPONDER')
1190 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),2)
1191 # self.assertEqual({'tp-id': 'XPDR1-CLIENT1', 'org-openroadm-network-topology:tp-type': 'XPONDER-CLIENT',
1192 # 'org-openroadm-network-topology:xpdr-network-attributes': {
1193 # 'tail-equipment-id': 'XPDR1-NETWORK1'}},
1194 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][0])
1195 # self.assertEqual({'tp-id': 'XPDR1-NETWORK1', 'org-openroadm-network-topology:tp-type': 'XPONDER-NETWORK',
1196 # 'org-openroadm-network-topology:xpdr-client-attributes': {'tail-equipment-id': 'XPDR1-CLIENT1'}},
1197 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][1])
1198 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
1199 # res['network'][0]['node'][i]['supporting-node'])
1200 # listNode.remove(nodeId)
1201 # elif(nodeId=='ROADMA-SRG1'):
1202 # #Test related to SRG1
1203 # self.assertEqual(nodeType,'SRG')
1204 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
1205 # self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
1206 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1207 # self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
1208 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1209 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1210 # res['network'][0]['node'][i]['supporting-node'])
1211 # listNode.remove(nodeId)
1212 # elif(nodeId=='ROADMA-DEG1'):
1213 # #Test related to DEG1
1214 # self.assertEqual(nodeType,'DEGREE')
1215 # self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1216 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1217 # self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1218 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1219 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1220 # res['network'][0]['node'][i]['supporting-node'])
1221 # listNode.remove(nodeId)
1222 # elif(nodeId=='ROADMA-DEG2'):
1223 # #Test related to DEG2
1224 # self.assertEqual(nodeType,'DEGREE')
1225 # self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1226 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1227 # self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1228 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1229 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1230 # res['network'][0]['node'][i]['supporting-node'])
1231 # listNode.remove(nodeId)
1233 # self.assertFalse(True)
1234 # self.assertEqual(len(listNode),0)
1235 # #Test related to SRG1 of ROADMC
1236 # for i in range(0,nbNode):
1237 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-SRG1')
1238 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG1')
1239 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG2')
1241 def test_36_delete_ROADMA_ROADMC(self):
1242 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1243 "link/ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
1244 .format(self.restconf_baseurl))
1246 headers = {'content-type': 'application/json'}
1247 response = requests.request(
1248 "DELETE", url, data=json.dumps(data), headers=headers,
1249 auth=('admin', 'admin'))
1250 self.assertEqual(response.status_code, requests.codes.ok)
1252 def test_37_delete_ROADMC_ROADMA(self):
1253 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1254 "link/ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX"
1255 .format(self.restconf_baseurl))
1257 headers = {'content-type': 'application/json'}
1258 response = requests.request(
1259 "DELETE", url, data=json.dumps(data), headers=headers,
1260 auth=('admin', 'admin'))
1261 self.assertEqual(response.status_code, requests.codes.ok)
1263 def test_38_getOpenRoadmNetwork(self):
1264 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1265 .format(self.restconf_baseurl))
1266 headers = {'content-type': 'application/json'}
1267 response = requests.request(
1268 "GET", url, headers=headers, auth=('admin', 'admin'))
1269 self.assertEqual(response.status_code, requests.codes.ok)
1270 res = response.json()
1271 nbNode=len(res['network'][0]['node'])
1272 self.assertEqual(nbNode,2)
1273 for i in range(0,nbNode-1):
1274 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC')
1276 def test_39_getClliNetwork(self):
1277 url = ("{}/config/ietf-network:networks/network/clli-network"
1278 .format(self.restconf_baseurl))
1279 headers = {'content-type': 'application/json'}
1280 response = requests.request(
1281 "GET", url, headers=headers, auth=('admin', 'admin'))
1282 self.assertEqual(response.status_code, requests.codes.ok)
1283 res = response.json()
1284 nbNode=len(res['network'][0]['node'])
1285 self.assertEqual(nbNode,1)
1286 for i in range(0,nbNode-1):
1287 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
1289 def test_40_disconnect_XPDRA(self):
1290 url = ("{}/config/network-topology:"
1291 "network-topology/topology/topology-netconf/node/XPDRA"
1292 .format(self.restconf_baseurl))
1294 headers = {'content-type': 'application/json'}
1295 response = requests.request(
1296 "DELETE", url, data=json.dumps(data), headers=headers,
1297 auth=('admin', 'admin'))
1298 self.assertEqual(response.status_code, requests.codes.ok)
1299 #Delete in the openroadm-network
1300 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/XPDRA"
1301 # .format(self.restconf_baseurl))
1303 # headers = {'content-type': 'application/json'}
1304 # response = requests.request(
1305 # "DELETE", url, data=json.dumps(data), headers=headers,
1306 # auth=('admin', 'admin'))
1307 # self.assertEqual(response.status_code, requests.codes.ok)
1309 def test_41_getClliNetwork(self):
1310 url = ("{}/config/ietf-network:networks/network/clli-network"
1311 .format(self.restconf_baseurl))
1312 headers = {'content-type': 'application/json'}
1313 response = requests.request(
1314 "GET", url, headers=headers, auth=('admin', 'admin'))
1315 self.assertEqual(response.status_code, requests.codes.ok)
1316 res = response.json()
1317 nbNode=len(res['network'][0]['node'])
1318 self.assertEqual(nbNode,1)
1319 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1321 def test_42_getOpenRoadmNetwork(self):
1322 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1323 .format(self.restconf_baseurl))
1324 headers = {'content-type': 'application/json'}
1325 response = requests.request(
1326 "GET", url, headers=headers, auth=('admin', 'admin'))
1327 self.assertEqual(response.status_code, requests.codes.ok)
1328 res = response.json()
1329 nbNode=len(res['network'][0]['node'])
1330 self.assertEqual(nbNode,1)
1331 for i in range(0,nbNode):
1332 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDRA')
1334 def test_43_getNodes_OpenRoadmTopology(self):
1335 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1336 .format(self.restconf_baseurl))
1337 headers = {'content-type': 'application/json'}
1338 response = requests.request(
1339 "GET", url, headers=headers, auth=('admin', 'admin'))
1340 res = response.json()
1341 #Tests related to nodes
1342 self.assertEqual(response.status_code, requests.codes.ok)
1343 nbNode=len(res['network'][0]['node'])
1344 self.assertEqual(nbNode,3)
1345 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
1346 for i in range(0,nbNode):
1347 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1348 res['network'][0]['node'][i]['supporting-node'])
1349 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
1350 nodeId=res['network'][0]['node'][i]['node-id']
1351 if(nodeId=='ROADMA-SRG1'):
1352 #Test related to SRG1
1353 self.assertEqual(nodeType,'SRG')
1354 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
1355 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
1356 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1357 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
1358 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1359 listNode.remove(nodeId)
1360 elif(nodeId=='ROADMA-DEG1'):
1361 #Test related to DEG1
1362 self.assertEqual(nodeType,'DEGREE')
1363 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1365 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1366 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1367 listNode.remove(nodeId)
1368 elif(nodeId=='ROADMA-DEG2'):
1369 #Test related to DEG2
1370 self.assertEqual(nodeType,'DEGREE')
1371 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1373 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1375 listNode.remove(nodeId)
1377 self.assertFalse(True)
1378 self.assertEqual(len(listNode),0)
1380 def test_44_disconnect_ROADM_XPDRA_link(self):
1382 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1383 "link/XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX"
1384 .format(self.restconf_baseurl))
1386 headers = {'content-type': 'application/json'}
1387 response = requests.request(
1388 "DELETE", url, data=json.dumps(data), headers=headers,
1389 auth=('admin', 'admin'))
1390 self.assertEqual(response.status_code, requests.codes.ok)
1392 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1393 "link/ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1"
1394 .format(self.restconf_baseurl))
1396 headers = {'content-type': 'application/json'}
1397 response = requests.request(
1398 "DELETE", url, data=json.dumps(data), headers=headers,
1399 auth=('admin', 'admin'))
1400 self.assertEqual(response.status_code, requests.codes.ok)
1402 # def test_33_getLinks_OpenRoadmTopology(self):
1403 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1404 # .format(self.restconf_baseurl))
1405 # headers = {'content-type': 'application/json'}
1406 # response = requests.request(
1407 # "GET", url, headers=headers, auth=('admin', 'admin'))
1408 # self.assertEqual(response.status_code, requests.codes.ok)
1409 # res = response.json()
1410 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1411 # self.assertEqual(nbLink,6)
1412 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1413 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
1414 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1415 # for i in range(0,nbLink):
1416 # if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
1417 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1418 # find= link_id in expressLink
1419 # self.assertEqual(find, True)
1420 # expressLink.remove(link_id)
1421 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
1422 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1423 # find= link_id in addLink
1424 # self.assertEqual(find, True)
1425 # addLink.remove(link_id)
1426 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
1427 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1428 # find= link_id in dropLink
1429 # self.assertEqual(find, True)
1430 # dropLink.remove(link_id)
1432 # self.assertFalse(True)
1433 # self.assertEqual(len(expressLink),0)
1434 # self.assertEqual(len(addLink),0)
1435 # self.assertEqual(len(dropLink),0)
1436 # for i in range(0,nbLink):
1437 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-OUTPUT')
1438 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-INPUT')
1440 def test_45_disconnect_ROADMA(self):
1441 url = ("{}/config/network-topology:"
1442 "network-topology/topology/topology-netconf/node/ROADMA"
1443 .format(self.restconf_baseurl))
1445 headers = {'content-type': 'application/json'}
1446 response = requests.request(
1447 "DELETE", url, data=json.dumps(data), headers=headers,
1448 auth=('admin', 'admin'))
1449 self.assertEqual(response.status_code, requests.codes.ok)
1450 #Delete in the clli-network
1451 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1452 .format(self.restconf_baseurl))
1454 headers = {'content-type': 'application/json'}
1455 response = requests.request(
1456 "DELETE", url, data=json.dumps(data), headers=headers,
1457 auth=('admin', 'admin'))
1458 self.assertEqual(response.status_code, requests.codes.ok)
1459 #Delete in the openroadm-network
1460 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMA"
1461 # .format(self.restconf_baseurl))
1463 # headers = {'content-type': 'application/json'}
1464 # response = requests.request(
1465 # "DELETE", url, data=json.dumps(data), headers=headers,
1466 # auth=('admin', 'admin'))
1467 # self.assertEqual(response.status_code, requests.codes.ok)
1470 def test_46_getClliNetwork(self):
1471 url = ("{}/config/ietf-network:networks/network/clli-network"
1472 .format(self.restconf_baseurl))
1473 headers = {'content-type': 'application/json'}
1474 response = requests.request(
1475 "GET", url, headers=headers, auth=('admin', 'admin'))
1476 self.assertEqual(response.status_code, requests.codes.ok)
1477 res = response.json()
1478 self.assertNotIn('node', res['network'][0])
1480 def test_47_getOpenRoadmNetwork(self):
1481 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1482 .format(self.restconf_baseurl))
1483 headers = {'content-type': 'application/json'}
1484 response = requests.request(
1485 "GET", url, headers=headers, auth=('admin', 'admin'))
1486 self.assertEqual(response.status_code, requests.codes.ok)
1487 res = response.json()
1488 self.assertNotIn('node', res['network'][0])
1490 # def test_37_getOpenRoadmTopology(self):
1491 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1492 # .format(self.restconf_baseurl))
1493 # headers = {'content-type': 'application/json'}
1494 # response = requests.request(
1495 # "GET", url, headers=headers, auth=('admin', 'admin'))
1496 # self.assertEqual(response.status_code, requests.codes.ok)
1497 # res = response.json()
1498 # self.assertNotIn('node', res['network'][0])
1499 # self.assertNotIn('ietf-network-topology:link', res['network'][0])
1501 if __name__ == "__main__":
1502 #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
1503 #logging.debug('I am there')
1504 unittest.main(verbosity=2)