3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
23 class TransportPCETopologyTesting(unittest.TestCase):
25 honeynode_process1 = None
26 honeynode_process2 = None
27 honeynode_process3 = None
28 honeynode_process4 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
32 #START_IGNORE_XTESTING
35 def __init_logfile(cls):
36 if not os.path.exists("transportpce_tests/log"):
37 os.makedirs("transportpce_tests/log")
38 if os.path.isfile("./transportpce_tests/log/response.log"):
39 os.remove("transportpce_tests/log/response.log")
42 def __start_honeynode1(cls):
43 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
44 "/honeynode-distribution-1.18.01/honeycomb-tpce")
45 if os.path.isfile(executable):
46 with open('honeynode1.log', 'w') as outfile:
47 cls.honeynode_process1 = subprocess.Popen(
48 [executable, "17830", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
52 def __start_honeynode2(cls):
53 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
54 "/honeynode-distribution-1.18.01/honeycomb-tpce")
55 if os.path.isfile(executable):
56 with open('honeynode2.log', 'w') as outfile:
57 cls.honeynode_process2 = subprocess.Popen(
58 [executable, "17831", "sample_configs/openroadm/2.1/oper-ROADMA.xml"],
62 def __start_honeynode3(cls):
63 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
64 "/honeynode-distribution-1.18.01/honeycomb-tpce")
65 if os.path.isfile(executable):
66 with open('honeynode3.log', 'w') as outfile:
67 cls.honeynode_process3 = subprocess.Popen(
68 [executable, "17832", "sample_configs/openroadm/2.1/oper-ROADMB.xml"],
72 def __start_honeynode4(cls):
73 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
74 "/honeynode-distribution-1.18.01/honeycomb-tpce")
75 if os.path.isfile(executable):
76 with open('honeynode4.log', 'w') as outfile:
77 cls.honeynode_process4 = subprocess.Popen(
78 [executable, "17833", "sample_configs/openroadm/2.1/oper-ROADMC.xml"],
83 executable = "../karaf/target/assembly/bin/karaf"
84 with open('odl.log', 'w') as outfile:
85 cls.odl_process = subprocess.Popen(
86 ["bash", executable, "server"], stdout=outfile,
87 stdin=open(os.devnull))
93 cls.__start_honeynode1()
95 cls.__start_honeynode2()
97 cls.__start_honeynode3()
99 cls.__start_honeynode4()
105 def tearDownClass(cls):
106 for child in psutil.Process(cls.odl_process.pid).children():
107 child.send_signal(signal.SIGINT)
109 cls.odl_process.send_signal(signal.SIGINT)
110 cls.odl_process.wait()
111 for child in psutil.Process(cls.honeynode_process1.pid).children():
112 child.send_signal(signal.SIGINT)
114 cls.honeynode_process1.send_signal(signal.SIGINT)
115 cls.honeynode_process1.wait()
116 for child in psutil.Process(cls.honeynode_process2.pid).children():
117 child.send_signal(signal.SIGINT)
119 cls.honeynode_process2.send_signal(signal.SIGINT)
120 cls.honeynode_process2.wait()
122 for child in psutil.Process(cls.honeynode_process3.pid).children():
123 child.send_signal(signal.SIGINT)
125 cls.honeynode_process3.send_signal(signal.SIGINT)
126 cls.honeynode_process3.wait()
128 for child in psutil.Process(cls.honeynode_process4.pid).children():
129 child.send_signal(signal.SIGINT)
131 cls.honeynode_process4.send_signal(signal.SIGINT)
132 cls.honeynode_process4.wait()
136 print ("execution of {}".format(self.id().split(".")[-1]))
142 def test_01_connect_ROADMA(self):
144 url = ("{}/config/network-topology:"
145 "network-topology/topology/topology-netconf/node/ROADMA"
146 .format(self.restconf_baseurl))
149 "netconf-node-topology:username": "admin",
150 "netconf-node-topology:password": "admin",
151 "netconf-node-topology:host": "127.0.0.1",
152 "netconf-node-topology:port": "17831",
153 "netconf-node-topology:tcp-only": "false",
154 "netconf-node-topology:pass-through": {}}]}
155 headers = {'content-type': 'application/json'}
156 response = requests.request(
157 "PUT", url, data=json.dumps(data), headers=headers,
158 auth=('admin', 'admin'))
159 self.assertEqual(response.status_code, requests.codes.created)
162 def test_02_getClliNetwork(self):
163 url = ("{}/config/ietf-network:networks/network/clli-network"
164 .format(self.restconf_baseurl))
165 headers = {'content-type': 'application/json'}
166 response = requests.request(
167 "GET", url, headers=headers, auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
171 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
173 def test_03_getOpenRoadmNetwork(self):
174 url = ("{}/config/ietf-network:networks/network/openroadm-network"
175 .format(self.restconf_baseurl))
176 headers = {'content-type': 'application/json'}
177 response = requests.request(
178 "GET", url, headers=headers, auth=('admin', 'admin'))
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADMA')
182 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
183 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
184 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:node-type'],'ROADM')
185 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'2')
187 def test_04_getLinks_OpenroadmTopology(self):
188 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
189 .format(self.restconf_baseurl))
190 headers = {'content-type': 'application/json'}
191 response = requests.request(
192 "GET", url, headers=headers, auth=('admin', 'admin'))
193 self.assertEqual(response.status_code, requests.codes.ok)
194 res = response.json()
195 #Tests related to links
196 nbLink=len(res['network'][0]['ietf-network-topology:link'])
197 self.assertEqual(nbLink,6)
198 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
199 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
200 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
201 for i in range(0,nbLink):
202 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
203 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
204 find= linkId in expressLink
205 self.assertEqual(find, True)
206 expressLink.remove(linkId)
207 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
208 find= linkId in addLink
209 self.assertEqual(find, True)
210 addLink.remove(linkId)
211 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
212 find= linkId in dropLink
213 self.assertEqual(find, True)
214 dropLink.remove(linkId)
216 self.assertFalse(True)
217 self.assertEqual(len(expressLink),0)
218 self.assertEqual(len(addLink),0)
219 self.assertEqual(len(dropLink),0)
221 def test_05_getNodes_OpenRoadmTopology(self):
222 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
223 .format(self.restconf_baseurl))
224 headers = {'content-type': 'application/json'}
225 response = requests.request(
226 "GET", url, headers=headers, auth=('admin', 'admin'))
227 res = response.json()
228 #Tests related to nodes
229 self.assertEqual(response.status_code, requests.codes.ok)
230 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
231 outfile1.write(str(len(res['network'][0]['node'])))
232 nbNode=len(res['network'][0]['node'])
233 self.assertEqual(nbNode,3)
234 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
235 for i in range(0,nbNode):
236 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
237 res['network'][0]['node'][i]['supporting-node'])
238 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
239 nodeId=res['network'][0]['node'][i]['node-id']
240 if(nodeId=='ROADMA-SRG1'):
241 #Test related to SRG1
242 self.assertEqual(nodeType,'SRG')
243 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
244 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
245 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
246 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
247 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
248 listNode.remove(nodeId)
249 elif(nodeId=='ROADMA-DEG1'):
250 #Test related to DEG1
251 self.assertEqual(nodeType,'DEGREE')
252 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
253 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
254 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
255 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
256 listNode.remove(nodeId)
257 elif(nodeId=='ROADMA-DEG2'):
258 #Test related to DEG2
259 self.assertEqual(nodeType,'DEGREE')
260 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
261 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
262 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
263 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
264 listNode.remove(nodeId)
266 self.assertFalse(True)
267 self.assertEqual(len(listNode),0)
269 def test_06_connect_XPDRA(self):
270 url = ("{}/config/network-topology:"
271 "network-topology/topology/topology-netconf/node/XPDRA"
272 .format(self.restconf_baseurl))
275 "netconf-node-topology:username": "admin",
276 "netconf-node-topology:password": "admin",
277 "netconf-node-topology:host": "127.0.0.1",
278 "netconf-node-topology:port": "17830",
279 "netconf-node-topology:tcp-only": "false",
280 "netconf-node-topology:pass-through": {}}]}
281 headers = {'content-type': 'application/json'}
282 response = requests.request(
283 "PUT", url, data=json.dumps(data), headers=headers,
284 auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.created)
288 def test_07_getClliNetwork(self):
289 url = ("{}/config/ietf-network:networks/network/clli-network"
290 .format(self.restconf_baseurl))
291 headers = {'content-type': 'application/json'}
292 response = requests.request(
293 "GET", url, headers=headers, auth=('admin', 'admin'))
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
296 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
297 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
299 def test_08_getOpenRoadmNetwork(self):
300 url = ("{}/config/ietf-network:networks/network/openroadm-network"
301 .format(self.restconf_baseurl))
302 headers = {'content-type': 'application/json'}
303 response = requests.request(
304 "GET", url, headers=headers, auth=('admin', 'admin'))
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 nbNode=len(res['network'][0]['node'])
308 self.assertEqual(nbNode,2)
309 for i in range(0,nbNode):
310 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
311 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
312 nodeId=res['network'][0]['node'][i]['node-id']
314 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
315 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
316 elif(nodeId=='ROADMA'):
317 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
318 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
320 self.assertFalse(True)
322 def test_09_getNodes_OpenRoadmTopology(self):
323 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
324 .format(self.restconf_baseurl))
325 headers = {'content-type': 'application/json'}
326 response = requests.request(
327 "GET", url, headers=headers, auth=('admin', 'admin'))
328 res = response.json()
329 #Tests related to nodes
330 self.assertEqual(response.status_code, requests.codes.ok)
331 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
332 outfile1.write(str(len(res['network'][0]['node'])))
333 nbNode=len(res['network'][0]['node'])
334 self.assertEqual(nbNode,4)
335 listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
336 for i in range(0,nbNode):
337 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
338 nodeId=res['network'][0]['node'][i]['node-id']
339 #Tests related to XPDRA nodes
340 if(nodeId=='XPDRA-XPDR1'):
341 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
342 res['network'][0]['node'][i]['supporting-node'])
343 self.assertEqual(nodeType,'XPONDER')
344 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
345 self.assertTrue(nbTps >= 2)
348 for j in range(0,nbTps):
349 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
350 if (tpType=='XPONDER-CLIENT'):
352 elif (tpType=='XPONDER-NETWORK'):
354 self.assertTrue(client > 0)
355 self.assertTrue(network > 0)
356 listNode.remove(nodeId)
357 elif(nodeId=='ROADMA-SRG1'):
358 #Test related to SRG1
359 self.assertEqual(nodeType,'SRG')
360 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
361 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
362 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
366 res['network'][0]['node'][i]['supporting-node'])
367 listNode.remove(nodeId)
368 elif(nodeId=='ROADMA-DEG1'):
369 #Test related to DEG1
370 self.assertEqual(nodeType,'DEGREE')
371 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
376 res['network'][0]['node'][i]['supporting-node'])
377 listNode.remove(nodeId)
378 elif(nodeId=='ROADMA-DEG2'):
379 #Test related to DEG2
380 self.assertEqual(nodeType,'DEGREE')
381 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
382 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
383 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
384 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
385 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
386 res['network'][0]['node'][i]['supporting-node'])
387 listNode.remove(nodeId)
389 self.assertFalse(True)
390 self.assertEqual(len(listNode),0)
393 #Connect the tail XPDRA to ROADMA and vice versa
394 def test_10_connect_tail_xpdr_rdm(self):
395 #Connect the tail: XPDRA to ROADMA
396 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
397 .format(self.restconf_baseurl))
398 data = {"networkutils:input": {
399 "networkutils:links-input": {
400 "networkutils:xpdr-node": "XPDRA",
401 "networkutils:xpdr-num": "1",
402 "networkutils:network-num": "1",
403 "networkutils:rdm-node": "ROADMA",
404 "networkutils:srg-num": "1",
405 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
409 headers = {'content-type': 'application/json'}
410 response = requests.request(
411 "POST", url, data=json.dumps(data), headers=headers,
412 auth=('admin', 'admin'))
413 self.assertEqual(response.status_code, requests.codes.ok)
416 def test_11_connect_tail_rdm_xpdr(self):
417 #Connect the tail: ROADMA to XPDRA
418 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
419 .format(self.restconf_baseurl))
420 data = {"networkutils:input": {
421 "networkutils:links-input": {
422 "networkutils:xpdr-node": "XPDRA",
423 "networkutils:xpdr-num": "1",
424 "networkutils:network-num": "1",
425 "networkutils:rdm-node": "ROADMA",
426 "networkutils:srg-num": "1",
427 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
431 headers = {'content-type': 'application/json'}
432 response = requests.request(
433 "POST", url, data=json.dumps(data), headers=headers,
434 auth=('admin', 'admin'))
435 self.assertEqual(response.status_code, requests.codes.ok)
438 def test_12_getLinks_OpenRoadmTopology(self):
439 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
440 .format(self.restconf_baseurl))
441 headers = {'content-type': 'application/json'}
442 response = requests.request(
443 "GET", url, headers=headers, auth=('admin', 'admin'))
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 #Tests related to links
447 nbLink=len(res['network'][0]['ietf-network-topology:link'])
448 self.assertEqual(nbLink,8)
449 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
450 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
451 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
452 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
453 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
454 for i in range(0,nbLink):
455 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
456 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
457 if(nodeType=='EXPRESS-LINK'):
458 find= linkId in expressLink
459 self.assertEqual(find, True)
460 expressLink.remove(linkId)
461 elif(nodeType=='ADD-LINK'):
462 find= linkId in addLink
463 self.assertEqual(find, True)
464 addLink.remove(linkId)
465 elif(nodeType=='DROP-LINK'):
466 find= linkId in dropLink
467 self.assertEqual(find, True)
468 dropLink.remove(linkId)
469 elif(nodeType=='XPONDER-INPUT'):
470 find= linkId in XPDR_IN
471 self.assertEqual(find, True)
472 XPDR_IN.remove(linkId)
473 elif(nodeType=='XPONDER-OUTPUT'):
474 find= linkId in XPDR_OUT
475 self.assertEqual(find, True)
476 XPDR_OUT.remove(linkId)
478 self.assertFalse(True)
479 self.assertEqual(len(expressLink),0)
480 self.assertEqual(len(addLink),0)
481 self.assertEqual(len(dropLink),0)
482 self.assertEqual(len(XPDR_IN),0)
483 self.assertEqual(len(XPDR_OUT),0)
485 def test_13_connect_ROADMC(self):
487 url = ("{}/config/network-topology:"
488 "network-topology/topology/topology-netconf/node/ROADMC"
489 .format(self.restconf_baseurl))
492 "netconf-node-topology:username": "admin",
493 "netconf-node-topology:password": "admin",
494 "netconf-node-topology:host": "127.0.0.1",
495 "netconf-node-topology:port": "17833",
496 "netconf-node-topology:tcp-only": "false",
497 "netconf-node-topology:pass-through": {}}]}
498 headers = {'content-type': 'application/json'}
499 response = requests.request(
500 "PUT", url, data=json.dumps(data), headers=headers,
501 auth=('admin', 'admin'))
502 self.assertEqual(response.status_code, requests.codes.created)
505 def test_14_getClliNetwork(self):
506 url = ("{}/config/ietf-network:networks/network/clli-network"
507 .format(self.restconf_baseurl))
508 headers = {'content-type': 'application/json'}
509 response = requests.request(
510 "GET", url, headers=headers, auth=('admin', 'admin'))
511 self.assertEqual(response.status_code, requests.codes.ok)
512 res = response.json()
513 nbNode=len(res['network'][0]['node'])
514 listNode=['NodeA','NodeC']
515 for i in range(0,nbNode):
516 nodeId = res['network'][0]['node'][i]['node-id']
517 find= nodeId in listNode
518 self.assertEqual(find, True)
520 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
522 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
523 listNode.remove(nodeId)
525 self.assertEqual(len(listNode),0)
527 def test_15_getOpenRoadmNetwork(self):
528 url = ("{}/config/ietf-network:networks/network/openroadm-network"
529 .format(self.restconf_baseurl))
530 headers = {'content-type': 'application/json'}
531 response = requests.request(
532 "GET", url, headers=headers, auth=('admin', 'admin'))
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 nbNode=len(res['network'][0]['node'])
536 self.assertEqual(nbNode,3)
537 listNode=['XPDRA','ROADMA','ROADMC']
538 for i in range(0,nbNode):
539 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
540 nodeId=res['network'][0]['node'][i]['node-id']
542 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
543 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
544 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
545 listNode.remove(nodeId)
546 elif(nodeId=='ROADMA'):
547 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
548 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
549 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
550 listNode.remove(nodeId)
551 elif(nodeId=='ROADMC'):
552 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
553 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
554 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
555 listNode.remove(nodeId)
557 self.assertFalse(True)
558 self.assertEqual(len(listNode),0)
560 def test_16_getROADMLinkOpenRoadmTopology(self):
561 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
562 .format(self.restconf_baseurl))
563 headers = {'content-type': 'application/json'}
564 response = requests.request(
565 "GET", url, headers=headers, auth=('admin', 'admin'))
566 self.assertEqual(response.status_code, requests.codes.ok)
567 res = response.json()
568 #Tests related to links
569 nbLink=len(res['network'][0]['ietf-network-topology:link'])
570 self.assertEqual(nbLink,16)
571 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX',
572 'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX','ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX']
573 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',
574 'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX','ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX']
575 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX',
576 'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX','ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX']
577 R2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX']
578 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
579 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
580 for i in range(0,nbLink):
581 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
582 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
583 if(nodeType=='EXPRESS-LINK'):
584 find= linkId in expressLink
585 self.assertEqual(find, True)
586 expressLink.remove(linkId)
587 elif(nodeType=='ADD-LINK'):
588 find= linkId in addLink
589 self.assertEqual(find, True)
590 addLink.remove(linkId)
591 elif(nodeType=='DROP-LINK'):
592 find= linkId in dropLink
593 self.assertEqual(find, True)
594 dropLink.remove(linkId)
595 elif(nodeType=='ROADM-TO-ROADM'):
596 find= linkId in R2RLink
597 self.assertEqual(find, True)
598 R2RLink.remove(linkId)
599 elif(nodeType=='XPONDER-INPUT'):
600 find= linkId in XPDR_IN
601 self.assertEqual(find, True)
602 XPDR_IN.remove(linkId)
603 elif(nodeType=='XPONDER-OUTPUT'):
604 find= linkId in XPDR_OUT
605 self.assertEqual(find, True)
606 XPDR_OUT.remove(linkId)
608 self.assertFalse(True)
609 self.assertEqual(len(expressLink),0)
610 self.assertEqual(len(addLink),0)
611 self.assertEqual(len(dropLink),0)
612 self.assertEqual(len(R2RLink),0)
613 self.assertEqual(len(XPDR_IN),0)
614 self.assertEqual(len(XPDR_OUT),0)
616 def test_17_getNodes_OpenRoadmTopology(self):
617 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
618 .format(self.restconf_baseurl))
619 headers = {'content-type': 'application/json'}
620 response = requests.request(
621 "GET", url, headers=headers, auth=('admin', 'admin'))
622 res = response.json()
623 #Tests related to nodes
624 self.assertEqual(response.status_code, requests.codes.ok)
625 nbNode=len(res['network'][0]['node'])
626 self.assertEqual(nbNode,7)
627 listNode=['XPDRA-XPDR1',
628 'ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2',
629 'ROADMC-SRG1','ROADMC-DEG1','ROADMC-DEG2']
630 #************************Tests related to XPDRA nodes
631 for i in range(0,nbNode):
632 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
633 nodeId=res['network'][0]['node'][i]['node-id']
634 if(nodeId=='XPDRA-XPDR1'):
635 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
636 res['network'][0]['node'][i]['supporting-node'])
637 self.assertEqual(nodeType,'XPONDER')
638 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
639 self.assertTrue(nbTps >= 2)
642 for j in range(0,nbTps):
643 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
644 if (tpType=='XPONDER-CLIENT'):
646 elif (tpType=='XPONDER-NETWORK'):
648 self.assertTrue(client > 0)
649 self.assertTrue(network > 0)
650 listNode.remove(nodeId)
651 elif(nodeId=='ROADMA-SRG1'):
652 #Test related to SRG1
653 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
654 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
655 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
656 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
657 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
658 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
659 res['network'][0]['node'][i]['supporting-node'])
660 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
661 listNode.remove(nodeId)
662 elif(nodeId=='ROADMA-DEG1'):
663 #Test related to DEG1
664 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
665 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
666 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
667 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
668 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
669 res['network'][0]['node'][i]['supporting-node'])
670 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
671 listNode.remove(nodeId)
672 elif(nodeId=='ROADMA-DEG2'):
673 #Test related to DEG2
674 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
675 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
676 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
677 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
678 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
679 res['network'][0]['node'][i]['supporting-node'])
680 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
681 listNode.remove(nodeId)
682 elif(nodeId=='ROADMC-SRG1'):
683 #Test related to SRG1
684 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
685 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
686 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
687 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
688 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
689 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
690 res['network'][0]['node'][i]['supporting-node'])
691 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
692 listNode.remove(nodeId)
693 elif(nodeId=='ROADMC-DEG1'):
694 #Test related to DEG1
695 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
696 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
697 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
698 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
699 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
700 res['network'][0]['node'][i]['supporting-node'])
701 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
702 listNode.remove(nodeId)
703 elif(nodeId=='ROADMC-DEG2'):
704 #Test related to DEG2
705 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
706 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
707 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
708 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
709 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
710 res['network'][0]['node'][i]['supporting-node'])
711 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
712 listNode.remove(nodeId)
714 self.assertFalse(True)
715 self.assertEqual(len(listNode),0)
717 def test_18_connect_ROADMB(self):
718 url = ("{}/config/network-topology:"
719 "network-topology/topology/topology-netconf/node/ROADMB"
720 .format(self.restconf_baseurl))
723 "netconf-node-topology:username": "admin",
724 "netconf-node-topology:password": "admin",
725 "netconf-node-topology:host": "127.0.0.1",
726 "netconf-node-topology:port": "17832",
727 "netconf-node-topology:tcp-only": "false",
728 "netconf-node-topology:pass-through": {}}]}
729 headers = {'content-type': 'application/json'}
730 response = requests.request(
731 "PUT", url, data=json.dumps(data), headers=headers,
732 auth=('admin', 'admin'))
733 self.assertEqual(response.status_code, requests.codes.created)
736 def test_19_getClliNetwork(self):
737 url = ("{}/config/ietf-network:networks/network/clli-network"
738 .format(self.restconf_baseurl))
739 headers = {'content-type': 'application/json'}
740 response = requests.request(
741 "GET", url, headers=headers, auth=('admin', 'admin'))
742 self.assertEqual(response.status_code, requests.codes.ok)
743 res = response.json()
744 nbNode=len(res['network'][0]['node'])
745 listNode=['NodeA','NodeB','NodeC']
746 for i in range(0,nbNode):
747 nodeId = res['network'][0]['node'][i]['node-id']
748 find= nodeId in listNode
749 self.assertEqual(find, True)
751 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
752 elif(nodeId=='NodeB'):
753 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
755 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
756 listNode.remove(nodeId)
758 self.assertEqual(len(listNode),0)
760 def test_20_verifyDegree(self):
761 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
762 .format(self.restconf_baseurl))
763 headers = {'content-type': 'application/json'}
764 response = requests.request(
765 "GET", url, headers=headers, auth=('admin', 'admin'))
766 self.assertEqual(response.status_code, requests.codes.ok)
767 res = response.json()
768 #Tests related to links
769 nbLink=len(res['network'][0]['ietf-network-topology:link'])
770 listR2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX',
771 'ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX','ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX',
772 'ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX','ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX']
773 for i in range(0,nbLink):
774 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'] == 'ROADM-TO-ROADM':
775 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
776 find= link_id in listR2RLink
777 self.assertEqual(find, True)
778 listR2RLink.remove(link_id)
779 self.assertEqual(len(listR2RLink),0)
781 def test_21_verifyOppositeLinkTopology(self):
782 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
783 .format(self.restconf_baseurl))
784 headers = {'content-type': 'application/json'}
785 response = requests.request(
786 "GET", url, headers=headers, auth=('admin', 'admin'))
787 self.assertEqual(response.status_code, requests.codes.ok)
788 res = response.json()
789 #Write the response in the log
790 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
791 outfile1.write(str(res))
792 #Tests related to links
793 nbLink=len(res['network'][0]['ietf-network-topology:link'])
794 self.assertEqual(nbLink,26)
795 for i in range(0,nbLink):
796 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
797 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
798 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
799 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
800 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
801 #Find the opposite link
802 url_oppLink="{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
803 url = (url_oppLink.format(self.restconf_baseurl))
804 headers = {'content-type': 'application/json'}
805 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
806 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
807 res_oppLink = response_oppLink.json()
808 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:opposite-link'],link_id)
809 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
810 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
811 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-network-topology:link-type']
812 if link_type=='ADD-LINK':
813 self.assertEqual(oppLink_type, 'DROP-LINK')
814 elif link_type=='DROP-LINK':
815 self.assertEqual(oppLink_type, 'ADD-LINK')
816 elif link_type=='EXPRESS-LINK':
817 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
818 elif link_type=='ROADM-TO-ROADM':
819 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
820 elif link_type=='XPONDER-INPUT':
821 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
822 elif link_type=='XPONDER-OUTPUT':
823 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
826 def test_22_disconnect_ROADMB(self):
827 #Delete in the topology-netconf
828 url = ("{}/config/network-topology:"
829 "network-topology/topology/topology-netconf/node/ROADMB"
830 .format(self.restconf_baseurl))
832 headers = {'content-type': 'application/json'}
833 response = requests.request(
834 "DELETE", url, data=json.dumps(data), headers=headers,
835 auth=('admin', 'admin'))
836 self.assertEqual(response.status_code, requests.codes.ok)
837 #Delete in the clli-network
838 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
839 .format(self.restconf_baseurl))
841 headers = {'content-type': 'application/json'}
842 response = requests.request(
843 "DELETE", url, data=json.dumps(data), headers=headers,
844 auth=('admin', 'admin'))
845 self.assertEqual(response.status_code, requests.codes.ok)
846 #Delete in the openroadm-network
847 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMB"
848 # .format(self.restconf_baseurl))
850 # headers = {'content-type': 'application/json'}
851 # response = requests.request(
852 # "DELETE", url, data=json.dumps(data), headers=headers,
853 # auth=('admin', 'admin'))
854 # self.assertEqual(response.status_code, requests.codes.ok)
856 def test_23_disconnect_ROADMC(self):
857 #Delete in the topology-netconf
858 url = ("{}/config/network-topology:"
859 "network-topology/topology/topology-netconf/node/ROADMC"
860 .format(self.restconf_baseurl))
862 headers = {'content-type': 'application/json'}
863 response = requests.request(
864 "DELETE", url, data=json.dumps(data), headers=headers,
865 auth=('admin', 'admin'))
866 self.assertEqual(response.status_code, requests.codes.ok)
867 #Delete in the clli-network
868 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
869 .format(self.restconf_baseurl))
871 headers = {'content-type': 'application/json'}
872 response = requests.request(
873 "DELETE", url, data=json.dumps(data), headers=headers,
874 auth=('admin', 'admin'))
875 self.assertEqual(response.status_code, requests.codes.ok)
876 #Delete in the openroadm-network
877 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMC"
878 # .format(self.restconf_baseurl))
880 # headers = {'content-type': 'application/json'}
881 # response = requests.request(
882 # "DELETE", url, data=json.dumps(data), headers=headers,
883 # auth=('admin', 'admin'))
884 # self.assertEqual(response.status_code, requests.codes.ok)
886 # def test_24_getLinks_OpenRoadmTopology(self):
887 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
888 # .format(self.restconf_baseurl))
889 # headers = {'content-type': 'application/json'}
890 # response = requests.request(
891 # "GET", url, headers=headers, auth=('admin', 'admin'))
892 # self.assertEqual(response.status_code, requests.codes.ok)
893 # res = response.json()
894 # #Write the response in the log
895 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
896 # outfile1.write(str(res))
897 # #Tests related to links
898 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
899 # self.assertEqual(nbLink,8)
900 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
901 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
902 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
903 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
904 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
905 # for i in range(0,nbLink):
906 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
907 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
908 # if(nodeType=='EXPRESS-LINK'):
909 # find= linkId in expressLink
910 # self.assertEqual(find, True)
911 # expressLink.remove(linkId)
912 # elif(nodeType=='ADD-LINK'):
913 # find= linkId in addLink
914 # self.assertEqual(find, True)
915 # addLink.remove(linkId)
916 # elif(nodeType=='DROP-LINK'):
917 # find= linkId in dropLink
918 # self.assertEqual(find, True)
919 # dropLink.remove(linkId)
920 # elif(nodeType=='XPONDER-INPUT'):
921 # find= linkId in XPDR_IN
922 # self.assertEqual(find, True)
923 # XPDR_IN.remove(linkId)
924 # elif(nodeType=='XPONDER-OUTPUT'):
925 # find= linkId in XPDR_OUT
926 # self.assertEqual(find, True)
927 # XPDR_OUT.remove(linkId)
929 # self.assertFalse(True)
930 # self.assertEqual(len(expressLink),0)
931 # self.assertEqual(len(addLink),0)
932 # self.assertEqual(len(dropLink),0)
933 # self.assertEqual(len(XPDR_IN),0)
934 # self.assertEqual(len(XPDR_OUT),0)
936 # for i in range(0,nbLink):
937 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'ROADM-TO-ROADM')
938 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
939 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
940 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
941 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
942 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
943 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
945 # def test_25_getNodes_OpenRoadmTopology(self):
946 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
947 # .format(self.restconf_baseurl))
948 # headers = {'content-type': 'application/json'}
949 # response = requests.request(
950 # "GET", url, headers=headers, auth=('admin', 'admin'))
951 # res = response.json()
952 # #Tests related to nodes
953 # self.assertEqual(response.status_code, requests.codes.ok)
954 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
955 # outfile1.write(str(len(res['network'][0]['node'])))
956 # nbNode=len(res['network'][0]['node'])
957 # self.assertEqual(nbNode,4)
958 # listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
959 # for i in range(0,nbNode):
960 # nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
961 # nodeId=res['network'][0]['node'][i]['node-id']
962 # #Tests related to XPDRA nodes
963 # if(nodeId=='XPDRA-XPDR1'):
964 # self.assertEqual(nodeType,'XPONDER')
965 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),2)
966 # self.assertEqual({'tp-id': 'XPDR1-CLIENT1', 'org-openroadm-network-topology:tp-type': 'XPONDER-CLIENT',
967 # 'org-openroadm-network-topology:xpdr-network-attributes': {
968 # 'tail-equipment-id': 'XPDR1-NETWORK1'}},
969 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][0])
970 # self.assertEqual({'tp-id': 'XPDR1-NETWORK1', 'org-openroadm-network-topology:tp-type': 'XPONDER-NETWORK',
971 # 'org-openroadm-network-topology:xpdr-client-attributes': {'tail-equipment-id': 'XPDR1-CLIENT1'}},
972 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][1])
973 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
974 # res['network'][0]['node'][i]['supporting-node'])
975 # listNode.remove(nodeId)
976 # elif(nodeId=='ROADMA-SRG1'):
977 # #Test related to SRG1
978 # self.assertEqual(nodeType,'SRG')
979 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
980 # self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
981 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
982 # self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
983 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
984 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
985 # res['network'][0]['node'][i]['supporting-node'])
986 # listNode.remove(nodeId)
987 # elif(nodeId=='ROADMA-DEG1'):
988 # #Test related to DEG1
989 # self.assertEqual(nodeType,'DEGREE')
990 # self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
991 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
992 # self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
993 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
994 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
995 # res['network'][0]['node'][i]['supporting-node'])
996 # listNode.remove(nodeId)
997 # elif(nodeId=='ROADMA-DEG2'):
998 # #Test related to DEG2
999 # self.assertEqual(nodeType,'DEGREE')
1000 # self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1001 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1002 # self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1003 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1004 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1005 # res['network'][0]['node'][i]['supporting-node'])
1006 # listNode.remove(nodeId)
1008 # self.assertFalse(True)
1009 # self.assertEqual(len(listNode),0)
1010 # #Test related to SRG1 of ROADMC
1011 # for i in range(0,nbNode):
1012 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-SRG1')
1013 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG1')
1014 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG2')
1016 def test_26_getOpenRoadmNetwork(self):
1017 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1018 .format(self.restconf_baseurl))
1019 headers = {'content-type': 'application/json'}
1020 response = requests.request(
1021 "GET", url, headers=headers, auth=('admin', 'admin'))
1022 self.assertEqual(response.status_code, requests.codes.ok)
1023 res = response.json()
1024 nbNode=len(res['network'][0]['node'])
1025 self.assertEqual(nbNode,2)
1026 for i in range(0,nbNode-1):
1027 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC')
1029 def test_27_getClliNetwork(self):
1030 url = ("{}/config/ietf-network:networks/network/clli-network"
1031 .format(self.restconf_baseurl))
1032 headers = {'content-type': 'application/json'}
1033 response = requests.request(
1034 "GET", url, headers=headers, auth=('admin', 'admin'))
1035 self.assertEqual(response.status_code, requests.codes.ok)
1036 res = response.json()
1037 nbNode=len(res['network'][0]['node'])
1038 self.assertEqual(nbNode,1)
1039 for i in range(0,nbNode-1):
1040 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
1042 def test_28_disconnect_XPDRA(self):
1043 url = ("{}/config/network-topology:"
1044 "network-topology/topology/topology-netconf/node/XPDRA"
1045 .format(self.restconf_baseurl))
1047 headers = {'content-type': 'application/json'}
1048 response = requests.request(
1049 "DELETE", url, data=json.dumps(data), headers=headers,
1050 auth=('admin', 'admin'))
1051 self.assertEqual(response.status_code, requests.codes.ok)
1052 #Delete in the openroadm-network
1053 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/XPDRA"
1054 # .format(self.restconf_baseurl))
1056 # headers = {'content-type': 'application/json'}
1057 # response = requests.request(
1058 # "DELETE", url, data=json.dumps(data), headers=headers,
1059 # auth=('admin', 'admin'))
1060 # self.assertEqual(response.status_code, requests.codes.ok)
1062 def test_29_getClliNetwork(self):
1063 url = ("{}/config/ietf-network:networks/network/clli-network"
1064 .format(self.restconf_baseurl))
1065 headers = {'content-type': 'application/json'}
1066 response = requests.request(
1067 "GET", url, headers=headers, auth=('admin', 'admin'))
1068 self.assertEqual(response.status_code, requests.codes.ok)
1069 res = response.json()
1070 nbNode=len(res['network'][0]['node'])
1071 self.assertEqual(nbNode,1)
1072 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1074 def test_30_getOpenRoadmNetwork(self):
1075 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1076 .format(self.restconf_baseurl))
1077 headers = {'content-type': 'application/json'}
1078 response = requests.request(
1079 "GET", url, headers=headers, auth=('admin', 'admin'))
1080 self.assertEqual(response.status_code, requests.codes.ok)
1081 res = response.json()
1082 nbNode=len(res['network'][0]['node'])
1083 self.assertEqual(nbNode,1)
1084 for i in range(0,nbNode):
1085 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDRA')
1087 def test_31_getNodes_OpenRoadmTopology(self):
1088 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1089 .format(self.restconf_baseurl))
1090 headers = {'content-type': 'application/json'}
1091 response = requests.request(
1092 "GET", url, headers=headers, auth=('admin', 'admin'))
1093 res = response.json()
1094 #Tests related to nodes
1095 self.assertEqual(response.status_code, requests.codes.ok)
1096 nbNode=len(res['network'][0]['node'])
1097 self.assertEqual(nbNode,3)
1098 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
1099 for i in range(0,nbNode):
1100 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1101 res['network'][0]['node'][i]['supporting-node'])
1102 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
1103 nodeId=res['network'][0]['node'][i]['node-id']
1104 if(nodeId=='ROADMA-SRG1'):
1105 #Test related to SRG1
1106 self.assertEqual(nodeType,'SRG')
1107 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
1108 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
1109 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1110 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
1111 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1112 listNode.remove(nodeId)
1113 elif(nodeId=='ROADMA-DEG1'):
1114 #Test related to DEG1
1115 self.assertEqual(nodeType,'DEGREE')
1116 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1117 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1118 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1119 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1120 listNode.remove(nodeId)
1121 elif(nodeId=='ROADMA-DEG2'):
1122 #Test related to DEG2
1123 self.assertEqual(nodeType,'DEGREE')
1124 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1125 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1126 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1127 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1128 listNode.remove(nodeId)
1130 self.assertFalse(True)
1131 self.assertEqual(len(listNode),0)
1133 def test_32_disconnect_ROADM_XPDRA_link(self):
1135 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1136 "link/XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX"
1137 .format(self.restconf_baseurl))
1139 headers = {'content-type': 'application/json'}
1140 response = requests.request(
1141 "DELETE", url, data=json.dumps(data), headers=headers,
1142 auth=('admin', 'admin'))
1143 self.assertEqual(response.status_code, requests.codes.ok)
1145 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1146 "link/ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1"
1147 .format(self.restconf_baseurl))
1149 headers = {'content-type': 'application/json'}
1150 response = requests.request(
1151 "DELETE", url, data=json.dumps(data), headers=headers,
1152 auth=('admin', 'admin'))
1153 self.assertEqual(response.status_code, requests.codes.ok)
1155 # def test_33_getLinks_OpenRoadmTopology(self):
1156 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1157 # .format(self.restconf_baseurl))
1158 # headers = {'content-type': 'application/json'}
1159 # response = requests.request(
1160 # "GET", url, headers=headers, auth=('admin', 'admin'))
1161 # self.assertEqual(response.status_code, requests.codes.ok)
1162 # res = response.json()
1163 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1164 # self.assertEqual(nbLink,6)
1165 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1166 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
1167 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1168 # for i in range(0,nbLink):
1169 # if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
1170 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1171 # find= link_id in expressLink
1172 # self.assertEqual(find, True)
1173 # expressLink.remove(link_id)
1174 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
1175 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1176 # find= link_id in addLink
1177 # self.assertEqual(find, True)
1178 # addLink.remove(link_id)
1179 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
1180 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1181 # find= link_id in dropLink
1182 # self.assertEqual(find, True)
1183 # dropLink.remove(link_id)
1185 # self.assertFalse(True)
1186 # self.assertEqual(len(expressLink),0)
1187 # self.assertEqual(len(addLink),0)
1188 # self.assertEqual(len(dropLink),0)
1189 # for i in range(0,nbLink):
1190 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-OUTPUT')
1191 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-INPUT')
1193 def test_34_disconnect_ROADMA(self):
1194 url = ("{}/config/network-topology:"
1195 "network-topology/topology/topology-netconf/node/ROADMA"
1196 .format(self.restconf_baseurl))
1198 headers = {'content-type': 'application/json'}
1199 response = requests.request(
1200 "DELETE", url, data=json.dumps(data), headers=headers,
1201 auth=('admin', 'admin'))
1202 self.assertEqual(response.status_code, requests.codes.ok)
1203 #Delete in the clli-network
1204 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1205 .format(self.restconf_baseurl))
1207 headers = {'content-type': 'application/json'}
1208 response = requests.request(
1209 "DELETE", url, data=json.dumps(data), headers=headers,
1210 auth=('admin', 'admin'))
1211 self.assertEqual(response.status_code, requests.codes.ok)
1212 #Delete in the openroadm-network
1213 # url = ("{}/config/ietf-network:networks/network/openroadm-network/node/ROADMA"
1214 # .format(self.restconf_baseurl))
1216 # headers = {'content-type': 'application/json'}
1217 # response = requests.request(
1218 # "DELETE", url, data=json.dumps(data), headers=headers,
1219 # auth=('admin', 'admin'))
1220 # self.assertEqual(response.status_code, requests.codes.ok)
1223 def test_35_getClliNetwork(self):
1224 url = ("{}/config/ietf-network:networks/network/clli-network"
1225 .format(self.restconf_baseurl))
1226 headers = {'content-type': 'application/json'}
1227 response = requests.request(
1228 "GET", url, headers=headers, auth=('admin', 'admin'))
1229 self.assertEqual(response.status_code, requests.codes.ok)
1230 res = response.json()
1231 self.assertNotIn('node', res['network'][0])
1233 def test_36_getOpenRoadmNetwork(self):
1234 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1235 .format(self.restconf_baseurl))
1236 headers = {'content-type': 'application/json'}
1237 response = requests.request(
1238 "GET", url, headers=headers, auth=('admin', 'admin'))
1239 self.assertEqual(response.status_code, requests.codes.ok)
1240 res = response.json()
1241 self.assertNotIn('node', res['network'][0])
1243 # def test_37_getOpenRoadmTopology(self):
1244 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1245 # .format(self.restconf_baseurl))
1246 # headers = {'content-type': 'application/json'}
1247 # response = requests.request(
1248 # "GET", url, headers=headers, auth=('admin', 'admin'))
1249 # self.assertEqual(response.status_code, requests.codes.ok)
1250 # res = response.json()
1251 # self.assertNotIn('node', res['network'][0])
1252 # self.assertNotIn('ietf-network-topology:link', res['network'][0])
1254 if __name__ == "__main__":
1255 #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
1256 #logging.debug('I am there')
1257 unittest.main(verbosity=2)