3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
23 class TransportPCEtesting(unittest.TestCase):
25 testtools_process1 = None
26 testtools_process2 = None
27 testtools_process3 = None
28 testtools_process4 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
33 def __start_testtools(cls):
34 executable = ("./netconf/netconf/tools/netconf-testtool/target/"
35 "netconf-testtool-1.5.0-executable.jar")
36 if os.path.isfile(executable):
37 if not os.path.exists("transportpce_tests/log"):
38 os.makedirs("transportpce_tests/log")
39 if os.path.isfile("./transportpce_tests/log/response.log"):
40 os.remove("transportpce_tests/log/response.log")
41 with open('transportpce_tests/log/testtools_ROADMA.log', 'w') as outfile1:
42 cls.testtools_process1 = subprocess.Popen(
43 ["java", "-jar", executable, "--schemas-dir", "schemas",
44 "--initial-config-xml", "sample_configs/nodes_config/sample-config-ROADMA.xml","--starting-port","17831"],
46 with open('transportpce_tests/log/testtools_ROADMB.log', 'w') as outfile2:
47 cls.testtools_process2 = subprocess.Popen(
48 ["java", "-jar", executable, "--schemas-dir", "schemas",
49 "--initial-config-xml", "sample_configs/nodes_config/sample-config-ROADMB.xml","--starting-port","17832"],
51 with open('transportpce_tests/log/testtools_ROADMC.log', 'w') as outfile3:
52 cls.testtools_process3 = subprocess.Popen(
53 ["java", "-jar", executable, "--schemas-dir", "schemas",
54 "--initial-config-xml", "sample_configs/nodes_config/sample-config-ROADMC.xml","--starting-port","17833"],
56 with open('transportpce_tests/log/testtools_XPDRA.log', 'w') as outfile4:
57 cls.testtools_process4 = subprocess.Popen(
58 ["java", "-jar", executable, "--schemas-dir", "schemas",
59 "--initial-config-xml", "sample_configs/nodes_config/sample-config-XPDRA.xml","--starting-port","17830"],
64 executable = "../karaf/target/assembly/bin/karaf"
65 with open('transportpce_tests/log/odl.log', 'w') as outfile:
66 cls.odl_process = subprocess.Popen(
67 ["bash", executable, "server"], stdout=outfile,
68 stdin=open(os.devnull))
72 cls.__start_testtools()
77 def tearDownClass(cls):
78 cls.testtools_process1.send_signal(signal.SIGINT)
79 cls.testtools_process1.wait()
80 cls.testtools_process2.send_signal(signal.SIGINT)
81 cls.testtools_process2.wait()
82 cls.testtools_process3.send_signal(signal.SIGINT)
83 cls.testtools_process3.wait()
84 cls.testtools_process4.send_signal(signal.SIGINT)
85 cls.testtools_process4.wait()
86 for child in psutil.Process(cls.odl_process.pid).children():
87 child.send_signal(signal.SIGINT)
89 cls.odl_process.send_signal(signal.SIGINT)
90 cls.odl_process.wait()
91 print('End of the tear down class')
96 def test_01_connect_ROADMA(self):
98 url = ("{}/config/network-topology:"
99 "network-topology/topology/topology-netconf/node/ROADMA"
100 .format(self.restconf_baseurl))
103 "netconf-node-topology:username": "admin",
104 "netconf-node-topology:password": "admin",
105 "netconf-node-topology:host": "127.0.0.1",
106 "netconf-node-topology:port": "17831",
107 "netconf-node-topology:tcp-only": "false",
108 "netconf-node-topology:pass-through": {}}]}
109 headers = {'content-type': 'application/json'}
110 response = requests.request(
111 "PUT", url, data=json.dumps(data), headers=headers,
112 auth=('admin', 'admin'))
113 self.assertEqual(response.status_code, requests.codes.created)
116 def test_02_getClliNetwork(self):
117 url = ("{}/config/ietf-network:network/clli-network"
118 .format(self.restconf_baseurl))
119 headers = {'content-type': 'application/json'}
120 response = requests.request(
121 "GET", url, headers=headers, auth=('admin', 'admin'))
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
125 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
127 def test_03_getOpenRoadmNetwork(self):
128 url = ("{}/config/ietf-network:network/openroadm-network"
129 .format(self.restconf_baseurl))
130 headers = {'content-type': 'application/json'}
131 response = requests.request(
132 "GET", url, headers=headers, auth=('admin', 'admin'))
133 self.assertEqual(response.status_code, requests.codes.ok)
134 res = response.json()
135 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADMA')
136 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
137 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
138 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:node-type'],'ROADM')
139 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'2')
141 def test_04_getLinks_OpenroadmTopology(self):
142 url = ("{}/config/ietf-network:network/openroadm-topology"
143 .format(self.restconf_baseurl))
144 headers = {'content-type': 'application/json'}
145 response = requests.request(
146 "GET", url, headers=headers, auth=('admin', 'admin'))
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 #Tests related to links
150 nbLink=len(res['network'][0]['ietf-network-topology:link'])
151 self.assertEqual(nbLink,6)
152 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
153 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
154 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
155 for i in range(0,nbLink):
156 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
157 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
158 find= linkId in expressLink
159 self.assertEqual(find, True)
160 expressLink.remove(linkId)
161 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
162 find= linkId in addLink
163 self.assertEqual(find, True)
164 addLink.remove(linkId)
165 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
166 find= linkId in dropLink
167 self.assertEqual(find, True)
168 dropLink.remove(linkId)
170 self.assertFalse(True)
171 self.assertEqual(len(expressLink),0)
172 self.assertEqual(len(addLink),0)
173 self.assertEqual(len(dropLink),0)
175 def test_05_getNodes_OpenRoadmTopology(self):
176 url = ("{}/config/ietf-network:network/openroadm-topology"
177 .format(self.restconf_baseurl))
178 headers = {'content-type': 'application/json'}
179 response = requests.request(
180 "GET", url, headers=headers, auth=('admin', 'admin'))
181 res = response.json()
182 #Tests related to nodes
183 self.assertEqual(response.status_code, requests.codes.ok)
184 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
185 outfile1.write(str(len(res['network'][0]['node'])))
186 nbNode=len(res['network'][0]['node'])
187 self.assertEqual(nbNode,3)
188 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
189 for i in range(0,nbNode):
190 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
191 res['network'][0]['node'][i]['supporting-node'])
192 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
193 nodeId=res['network'][0]['node'][i]['node-id']
194 if(nodeId=='ROADMA-SRG1'):
195 #Test related to SRG1
196 self.assertEqual(nodeType,'SRG')
197 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
198 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
199 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
200 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
201 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
202 listNode.remove(nodeId)
203 elif(nodeId=='ROADMA-DEG1'):
204 #Test related to DEG1
205 self.assertEqual(nodeType,'DEGREE')
206 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
207 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
208 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
209 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
210 listNode.remove(nodeId)
211 elif(nodeId=='ROADMA-DEG2'):
212 #Test related to DEG2
213 self.assertEqual(nodeType,'DEGREE')
214 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
215 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
216 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
217 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
218 listNode.remove(nodeId)
220 self.assertFalse(True)
221 self.assertEqual(len(listNode),0)
223 def test_06_connect_XPDRA(self):
224 url = ("{}/config/network-topology:"
225 "network-topology/topology/topology-netconf/node/XPDRA"
226 .format(self.restconf_baseurl))
229 "netconf-node-topology:username": "admin",
230 "netconf-node-topology:password": "admin",
231 "netconf-node-topology:host": "127.0.0.1",
232 "netconf-node-topology:port": "17830",
233 "netconf-node-topology:tcp-only": "false",
234 "netconf-node-topology:pass-through": {}}]}
235 headers = {'content-type': 'application/json'}
236 response = requests.request(
237 "PUT", url, data=json.dumps(data), headers=headers,
238 auth=('admin', 'admin'))
239 self.assertEqual(response.status_code, requests.codes.created)
242 def test_07_getClliNetwork(self):
243 url = ("{}/config/ietf-network:network/clli-network"
244 .format(self.restconf_baseurl))
245 headers = {'content-type': 'application/json'}
246 response = requests.request(
247 "GET", url, headers=headers, auth=('admin', 'admin'))
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
251 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
253 def test_08_getOpenRoadmNetwork(self):
254 url = ("{}/config/ietf-network:network/openroadm-network"
255 .format(self.restconf_baseurl))
256 headers = {'content-type': 'application/json'}
257 response = requests.request(
258 "GET", url, headers=headers, auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
261 nbNode=len(res['network'][0]['node'])
262 self.assertEqual(nbNode,2)
263 for i in range(0,nbNode):
264 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
265 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
266 nodeId=res['network'][0]['node'][i]['node-id']
268 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
269 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
270 elif(nodeId=='ROADMA'):
271 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
272 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
274 self.assertFalse(True)
276 def test_09_getNodes_OpenRoadmTopology(self):
277 url = ("{}/config/ietf-network:network/openroadm-topology"
278 .format(self.restconf_baseurl))
279 headers = {'content-type': 'application/json'}
280 response = requests.request(
281 "GET", url, headers=headers, auth=('admin', 'admin'))
282 res = response.json()
283 #Tests related to nodes
284 self.assertEqual(response.status_code, requests.codes.ok)
285 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
286 outfile1.write(str(len(res['network'][0]['node'])))
287 nbNode=len(res['network'][0]['node'])
288 self.assertEqual(nbNode,4)
289 listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
290 for i in range(0,nbNode):
291 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
292 nodeId=res['network'][0]['node'][i]['node-id']
293 #Tests related to XPDRA nodes
294 if(nodeId=='XPDRA-XPDR1'):
295 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
296 res['network'][0]['node'][i]['supporting-node'])
297 self.assertEqual(nodeType,'XPONDER')
298 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
299 self.assertTrue(nbTps >= 2)
302 for j in range(0,nbTps):
303 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
304 if (tpType=='XPONDER-CLIENT'):
306 elif (tpType=='XPONDER-NETWORK'):
308 self.assertTrue(client > 0)
309 self.assertTrue(network > 0)
310 listNode.remove(nodeId)
311 elif(nodeId=='ROADMA-SRG1'):
312 #Test related to SRG1
313 self.assertEqual(nodeType,'SRG')
314 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
315 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
316 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
317 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
318 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
319 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
320 res['network'][0]['node'][i]['supporting-node'])
321 listNode.remove(nodeId)
322 elif(nodeId=='ROADMA-DEG1'):
323 #Test related to DEG1
324 self.assertEqual(nodeType,'DEGREE')
325 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
326 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
327 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
328 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
329 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
330 res['network'][0]['node'][i]['supporting-node'])
331 listNode.remove(nodeId)
332 elif(nodeId=='ROADMA-DEG2'):
333 #Test related to DEG2
334 self.assertEqual(nodeType,'DEGREE')
335 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
336 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
337 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
338 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
339 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
340 res['network'][0]['node'][i]['supporting-node'])
341 listNode.remove(nodeId)
343 self.assertFalse(True)
344 self.assertEqual(len(listNode),0)
346 #Connect the tail XPDRA to ROADMA and vice versa
347 def test_10_connect_tail_xpdr_rdm(self):
348 #Connect the tail: XPDRA to ROADMA
349 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
350 .format(self.restconf_baseurl))
351 data = {"networkutils:input": {
352 "networkutils:links-input": {
353 "networkutils:xpdr-node": "XPDRA",
354 "networkutils:xpdr-num": "1",
355 "networkutils:network-num": "1",
356 "networkutils:rdm-node": "ROADMA",
357 "networkutils:srg-num": "1",
358 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
362 headers = {'content-type': 'application/json'}
363 response = requests.request(
364 "POST", url, data=json.dumps(data), headers=headers,
365 auth=('admin', 'admin'))
366 self.assertEqual(response.status_code, requests.codes.ok)
369 def test_11_connect_tail_rdm_xpdr(self):
370 #Connect the tail: ROADMA to XPDRA
371 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
372 .format(self.restconf_baseurl))
373 data = {"networkutils:input": {
374 "networkutils:links-input": {
375 "networkutils:xpdr-node": "XPDRA",
376 "networkutils:xpdr-num": "1",
377 "networkutils:network-num": "1",
378 "networkutils:rdm-node": "ROADMA",
379 "networkutils:srg-num": "1",
380 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
384 headers = {'content-type': 'application/json'}
385 response = requests.request(
386 "POST", url, data=json.dumps(data), headers=headers,
387 auth=('admin', 'admin'))
388 self.assertEqual(response.status_code, requests.codes.ok)
391 def test_12_getLinks_OpenRoadmTopology(self):
392 url = ("{}/config/ietf-network:network/openroadm-topology"
393 .format(self.restconf_baseurl))
394 headers = {'content-type': 'application/json'}
395 response = requests.request(
396 "GET", url, headers=headers, auth=('admin', 'admin'))
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 #Tests related to links
400 nbLink=len(res['network'][0]['ietf-network-topology:link'])
401 self.assertEqual(nbLink,8)
402 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
403 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
404 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
405 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
406 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
407 for i in range(0,nbLink):
408 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
409 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
410 if(nodeType=='EXPRESS-LINK'):
411 find= linkId in expressLink
412 self.assertEqual(find, True)
413 expressLink.remove(linkId)
414 elif(nodeType=='ADD-LINK'):
415 find= linkId in addLink
416 self.assertEqual(find, True)
417 addLink.remove(linkId)
418 elif(nodeType=='DROP-LINK'):
419 find= linkId in dropLink
420 self.assertEqual(find, True)
421 dropLink.remove(linkId)
422 elif(nodeType=='XPONDER-INPUT'):
423 find= linkId in XPDR_IN
424 self.assertEqual(find, True)
425 XPDR_IN.remove(linkId)
426 elif(nodeType=='XPONDER-OUTPUT'):
427 find= linkId in XPDR_OUT
428 self.assertEqual(find, True)
429 XPDR_OUT.remove(linkId)
431 self.assertFalse(True)
432 self.assertEqual(len(expressLink),0)
433 self.assertEqual(len(addLink),0)
434 self.assertEqual(len(dropLink),0)
435 self.assertEqual(len(XPDR_IN),0)
436 self.assertEqual(len(XPDR_OUT),0)
438 def test_13_connect_ROADMC(self):
440 url = ("{}/config/network-topology:"
441 "network-topology/topology/topology-netconf/node/ROADMC"
442 .format(self.restconf_baseurl))
445 "netconf-node-topology:username": "admin",
446 "netconf-node-topology:password": "admin",
447 "netconf-node-topology:host": "127.0.0.1",
448 "netconf-node-topology:port": "17833",
449 "netconf-node-topology:tcp-only": "false",
450 "netconf-node-topology:pass-through": {}}]}
451 headers = {'content-type': 'application/json'}
452 response = requests.request(
453 "PUT", url, data=json.dumps(data), headers=headers,
454 auth=('admin', 'admin'))
455 self.assertEqual(response.status_code, requests.codes.created)
458 def test_14_getClliNetwork(self):
459 url = ("{}/config/ietf-network:network/clli-network"
460 .format(self.restconf_baseurl))
461 headers = {'content-type': 'application/json'}
462 response = requests.request(
463 "GET", url, headers=headers, auth=('admin', 'admin'))
464 self.assertEqual(response.status_code, requests.codes.ok)
465 res = response.json()
466 nbNode=len(res['network'][0]['node'])
467 listNode=['NodeA','NodeC']
468 for i in range(0,nbNode):
469 nodeId = res['network'][0]['node'][i]['node-id']
470 find= nodeId in listNode
471 self.assertEqual(find, True)
473 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
475 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
476 listNode.remove(nodeId)
478 self.assertEqual(len(listNode),0)
480 def test_15_getOpenRoadmNetwork(self):
481 url = ("{}/config/ietf-network:network/openroadm-network"
482 .format(self.restconf_baseurl))
483 headers = {'content-type': 'application/json'}
484 response = requests.request(
485 "GET", url, headers=headers, auth=('admin', 'admin'))
486 self.assertEqual(response.status_code, requests.codes.ok)
487 res = response.json()
488 nbNode=len(res['network'][0]['node'])
489 self.assertEqual(nbNode,3)
490 listNode=['XPDRA','ROADMA','ROADMC']
491 for i in range(0,nbNode):
492 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
493 nodeId=res['network'][0]['node'][i]['node-id']
495 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
496 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
497 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
498 listNode.remove(nodeId)
499 elif(nodeId=='ROADMA'):
500 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
501 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
502 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
503 listNode.remove(nodeId)
504 elif(nodeId=='ROADMC'):
505 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
506 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
507 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
508 listNode.remove(nodeId)
510 self.assertFalse(True)
511 self.assertEqual(len(listNode),0)
513 def test_16_getROADMLinkOpenRoadmTopology(self):
514 url = ("{}/config/ietf-network:network/openroadm-topology"
515 .format(self.restconf_baseurl))
516 headers = {'content-type': 'application/json'}
517 response = requests.request(
518 "GET", url, headers=headers, auth=('admin', 'admin'))
519 self.assertEqual(response.status_code, requests.codes.ok)
520 res = response.json()
521 #Tests related to links
522 nbLink=len(res['network'][0]['ietf-network-topology:link'])
523 self.assertEqual(nbLink,16)
524 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX',
525 'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX','ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX']
526 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',
527 'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX','ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX']
528 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX',
529 'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX','ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX']
530 R2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX']
531 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
532 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
533 for i in range(0,nbLink):
534 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
535 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
536 if(nodeType=='EXPRESS-LINK'):
537 find= linkId in expressLink
538 self.assertEqual(find, True)
539 expressLink.remove(linkId)
540 elif(nodeType=='ADD-LINK'):
541 find= linkId in addLink
542 self.assertEqual(find, True)
543 addLink.remove(linkId)
544 elif(nodeType=='DROP-LINK'):
545 find= linkId in dropLink
546 self.assertEqual(find, True)
547 dropLink.remove(linkId)
548 elif(nodeType=='ROADM-TO-ROADM'):
549 find= linkId in R2RLink
550 self.assertEqual(find, True)
551 R2RLink.remove(linkId)
552 elif(nodeType=='XPONDER-INPUT'):
553 find= linkId in XPDR_IN
554 self.assertEqual(find, True)
555 XPDR_IN.remove(linkId)
556 elif(nodeType=='XPONDER-OUTPUT'):
557 find= linkId in XPDR_OUT
558 self.assertEqual(find, True)
559 XPDR_OUT.remove(linkId)
561 self.assertFalse(True)
562 self.assertEqual(len(expressLink),0)
563 self.assertEqual(len(addLink),0)
564 self.assertEqual(len(dropLink),0)
565 self.assertEqual(len(R2RLink),0)
566 self.assertEqual(len(XPDR_IN),0)
567 self.assertEqual(len(XPDR_OUT),0)
569 def test_17_getNodes_OpenRoadmTopology(self):
570 url = ("{}/config/ietf-network:network/openroadm-topology"
571 .format(self.restconf_baseurl))
572 headers = {'content-type': 'application/json'}
573 response = requests.request(
574 "GET", url, headers=headers, auth=('admin', 'admin'))
575 res = response.json()
576 #Tests related to nodes
577 self.assertEqual(response.status_code, requests.codes.ok)
578 nbNode=len(res['network'][0]['node'])
579 self.assertEqual(nbNode,7)
580 listNode=['XPDRA-XPDR1',
581 'ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2',
582 'ROADMC-SRG1','ROADMC-DEG1','ROADMC-DEG2']
583 #************************Tests related to XPDRA nodes
584 for i in range(0,nbNode):
585 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
586 nodeId=res['network'][0]['node'][i]['node-id']
587 if(nodeId=='XPDRA-XPDR1'):
588 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
589 res['network'][0]['node'][i]['supporting-node'])
590 self.assertEqual(nodeType,'XPONDER')
591 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
592 self.assertTrue(nbTps >= 2)
595 for j in range(0,nbTps):
596 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
597 if (tpType=='XPONDER-CLIENT'):
599 elif (tpType=='XPONDER-NETWORK'):
601 self.assertTrue(client > 0)
602 self.assertTrue(network > 0)
603 listNode.remove(nodeId)
604 elif(nodeId=='ROADMA-SRG1'):
605 #Test related to SRG1
606 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
607 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
608 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
609 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
610 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
611 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
612 res['network'][0]['node'][i]['supporting-node'])
613 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
614 listNode.remove(nodeId)
615 elif(nodeId=='ROADMA-DEG1'):
616 #Test related to DEG1
617 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
618 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
619 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
620 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
621 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
622 res['network'][0]['node'][i]['supporting-node'])
623 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
624 listNode.remove(nodeId)
625 elif(nodeId=='ROADMA-DEG2'):
626 #Test related to DEG2
627 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
628 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
629 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
630 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
631 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
632 res['network'][0]['node'][i]['supporting-node'])
633 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
634 listNode.remove(nodeId)
635 elif(nodeId=='ROADMC-SRG1'):
636 #Test related to SRG1
637 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
638 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
639 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
640 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
641 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
642 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
643 res['network'][0]['node'][i]['supporting-node'])
644 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
645 listNode.remove(nodeId)
646 elif(nodeId=='ROADMC-DEG1'):
647 #Test related to DEG1
648 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
649 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
650 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
651 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
652 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
653 res['network'][0]['node'][i]['supporting-node'])
654 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
655 listNode.remove(nodeId)
656 elif(nodeId=='ROADMC-DEG2'):
657 #Test related to DEG2
658 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
659 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
660 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
661 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
662 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
663 res['network'][0]['node'][i]['supporting-node'])
664 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
665 listNode.remove(nodeId)
667 self.assertFalse(True)
668 self.assertEqual(len(listNode),0)
670 def test_18_connect_ROADMB(self):
671 url = ("{}/config/network-topology:"
672 "network-topology/topology/topology-netconf/node/ROADMB"
673 .format(self.restconf_baseurl))
676 "netconf-node-topology:username": "admin",
677 "netconf-node-topology:password": "admin",
678 "netconf-node-topology:host": "127.0.0.1",
679 "netconf-node-topology:port": "17832",
680 "netconf-node-topology:tcp-only": "false",
681 "netconf-node-topology:pass-through": {}}]}
682 headers = {'content-type': 'application/json'}
683 response = requests.request(
684 "PUT", url, data=json.dumps(data), headers=headers,
685 auth=('admin', 'admin'))
686 self.assertEqual(response.status_code, requests.codes.created)
689 def test_19_getClliNetwork(self):
690 url = ("{}/config/ietf-network:network/clli-network"
691 .format(self.restconf_baseurl))
692 headers = {'content-type': 'application/json'}
693 response = requests.request(
694 "GET", url, headers=headers, auth=('admin', 'admin'))
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res = response.json()
697 nbNode=len(res['network'][0]['node'])
698 listNode=['NodeA','NodeB','NodeC']
699 for i in range(0,nbNode):
700 nodeId = res['network'][0]['node'][i]['node-id']
701 find= nodeId in listNode
702 self.assertEqual(find, True)
704 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
705 elif(nodeId=='NodeB'):
706 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
708 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
709 listNode.remove(nodeId)
711 self.assertEqual(len(listNode),0)
713 def test_20_verifyDegree(self):
714 url = ("{}/config/ietf-network:network/openroadm-topology"
715 .format(self.restconf_baseurl))
716 headers = {'content-type': 'application/json'}
717 response = requests.request(
718 "GET", url, headers=headers, auth=('admin', 'admin'))
719 self.assertEqual(response.status_code, requests.codes.ok)
720 res = response.json()
721 #Tests related to links
722 nbLink=len(res['network'][0]['ietf-network-topology:link'])
723 listR2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX',
724 'ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX','ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX',
725 'ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX','ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX']
726 for i in range(0,nbLink):
727 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'] == 'ROADM-TO-ROADM':
728 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
729 find= link_id in listR2RLink
730 self.assertEqual(find, True)
731 listR2RLink.remove(link_id)
732 self.assertEqual(len(listR2RLink),0)
734 def test_21_verifyOppositeLinkTopology(self):
735 url = ("{}/config/ietf-network:network/openroadm-topology"
736 .format(self.restconf_baseurl))
737 headers = {'content-type': 'application/json'}
738 response = requests.request(
739 "GET", url, headers=headers, auth=('admin', 'admin'))
740 self.assertEqual(response.status_code, requests.codes.ok)
741 res = response.json()
742 #Write the response in the log
743 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
744 outfile1.write(str(res))
745 #Tests related to links
746 nbLink=len(res['network'][0]['ietf-network-topology:link'])
747 self.assertEqual(nbLink,26)
748 for i in range(0,nbLink):
749 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
750 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
751 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
752 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
753 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-opposite-links:opposite-link']
754 #Find the opposite link
755 url_oppLink="{}/config/ietf-network:network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
756 url = (url_oppLink.format(self.restconf_baseurl))
757 headers = {'content-type': 'application/json'}
758 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
759 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
760 res_oppLink = response_oppLink.json()
761 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-opposite-links:opposite-link'],link_id)
762 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
763 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
764 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-network-topology:link-type']
765 if link_type=='ADD-LINK':
766 self.assertEqual(oppLink_type, 'DROP-LINK')
767 elif link_type=='DROP-LINK':
768 self.assertEqual(oppLink_type, 'ADD-LINK')
769 elif link_type=='EXPRESS-LINK':
770 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
771 elif link_type=='ROADM-TO-ROADM':
772 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
773 elif link_type=='XPONDER-INPUT':
774 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
775 elif link_type=='XPONDER-OUTPUT':
776 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
779 def test_22_disconnect_ROADMB(self):
780 #Delete in the topology-netconf
781 url = ("{}/config/network-topology:"
782 "network-topology/topology/topology-netconf/node/ROADMB"
783 .format(self.restconf_baseurl))
785 headers = {'content-type': 'application/json'}
786 response = requests.request(
787 "DELETE", url, data=json.dumps(data), headers=headers,
788 auth=('admin', 'admin'))
789 self.assertEqual(response.status_code, requests.codes.ok)
790 #Delete in the clli-network
791 url = ("{}/config/ietf-network:network/clli-network/node/NodeB"
792 .format(self.restconf_baseurl))
794 headers = {'content-type': 'application/json'}
795 response = requests.request(
796 "DELETE", url, data=json.dumps(data), headers=headers,
797 auth=('admin', 'admin'))
798 self.assertEqual(response.status_code, requests.codes.ok)
799 #Delete in the openroadm-network
800 url = ("{}/config/ietf-network:network/openroadm-network/node/ROADMB"
801 .format(self.restconf_baseurl))
803 headers = {'content-type': 'application/json'}
804 response = requests.request(
805 "DELETE", url, data=json.dumps(data), headers=headers,
806 auth=('admin', 'admin'))
807 self.assertEqual(response.status_code, requests.codes.ok)
809 def test_23_disconnect_ROADMC(self):
810 #Delete in the topology-netconf
811 url = ("{}/config/network-topology:"
812 "network-topology/topology/topology-netconf/node/ROADMC"
813 .format(self.restconf_baseurl))
815 headers = {'content-type': 'application/json'}
816 response = requests.request(
817 "DELETE", url, data=json.dumps(data), headers=headers,
818 auth=('admin', 'admin'))
819 self.assertEqual(response.status_code, requests.codes.ok)
820 #Delete in the clli-network
821 url = ("{}/config/ietf-network:network/clli-network/node/NodeC"
822 .format(self.restconf_baseurl))
824 headers = {'content-type': 'application/json'}
825 response = requests.request(
826 "DELETE", url, data=json.dumps(data), headers=headers,
827 auth=('admin', 'admin'))
828 self.assertEqual(response.status_code, requests.codes.ok)
829 #Delete in the openroadm-network
830 url = ("{}/config/ietf-network:network/openroadm-network/node/ROADMC"
831 .format(self.restconf_baseurl))
833 headers = {'content-type': 'application/json'}
834 response = requests.request(
835 "DELETE", url, data=json.dumps(data), headers=headers,
836 auth=('admin', 'admin'))
837 self.assertEqual(response.status_code, requests.codes.ok)
839 # def test_24_getLinks_OpenRoadmTopology(self):
840 # url = ("{}/config/ietf-network:network/openroadm-topology"
841 # .format(self.restconf_baseurl))
842 # headers = {'content-type': 'application/json'}
843 # response = requests.request(
844 # "GET", url, headers=headers, auth=('admin', 'admin'))
845 # self.assertEqual(response.status_code, requests.codes.ok)
846 # res = response.json()
847 # #Write the response in the log
848 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
849 # outfile1.write(str(res))
850 # #Tests related to links
851 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
852 # self.assertEqual(nbLink,8)
853 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
854 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
855 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
856 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
857 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
858 # for i in range(0,nbLink):
859 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
860 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
861 # if(nodeType=='EXPRESS-LINK'):
862 # find= linkId in expressLink
863 # self.assertEqual(find, True)
864 # expressLink.remove(linkId)
865 # elif(nodeType=='ADD-LINK'):
866 # find= linkId in addLink
867 # self.assertEqual(find, True)
868 # addLink.remove(linkId)
869 # elif(nodeType=='DROP-LINK'):
870 # find= linkId in dropLink
871 # self.assertEqual(find, True)
872 # dropLink.remove(linkId)
873 # elif(nodeType=='XPONDER-INPUT'):
874 # find= linkId in XPDR_IN
875 # self.assertEqual(find, True)
876 # XPDR_IN.remove(linkId)
877 # elif(nodeType=='XPONDER-OUTPUT'):
878 # find= linkId in XPDR_OUT
879 # self.assertEqual(find, True)
880 # XPDR_OUT.remove(linkId)
882 # self.assertFalse(True)
883 # self.assertEqual(len(expressLink),0)
884 # self.assertEqual(len(addLink),0)
885 # self.assertEqual(len(dropLink),0)
886 # self.assertEqual(len(XPDR_IN),0)
887 # self.assertEqual(len(XPDR_OUT),0)
889 # for i in range(0,nbLink):
890 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'ROADM-TO-ROADM')
891 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
892 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
893 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
894 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
895 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
896 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
898 # def test_25_getNodes_OpenRoadmTopology(self):
899 # url = ("{}/config/ietf-network:network/openroadm-topology"
900 # .format(self.restconf_baseurl))
901 # headers = {'content-type': 'application/json'}
902 # response = requests.request(
903 # "GET", url, headers=headers, auth=('admin', 'admin'))
904 # res = response.json()
905 # #Tests related to nodes
906 # self.assertEqual(response.status_code, requests.codes.ok)
907 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
908 # outfile1.write(str(len(res['network'][0]['node'])))
909 # nbNode=len(res['network'][0]['node'])
910 # self.assertEqual(nbNode,4)
911 # listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
912 # for i in range(0,nbNode):
913 # nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
914 # nodeId=res['network'][0]['node'][i]['node-id']
915 # #Tests related to XPDRA nodes
916 # if(nodeId=='XPDRA-XPDR1'):
917 # self.assertEqual(nodeType,'XPONDER')
918 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),2)
919 # self.assertEqual({'tp-id': 'XPDR1-CLIENT1', 'org-openroadm-network-topology:tp-type': 'XPONDER-CLIENT',
920 # 'org-openroadm-network-topology:xpdr-network-attributes': {
921 # 'tail-equipment-id': 'XPDR1-NETWORK1'}},
922 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][0])
923 # self.assertEqual({'tp-id': 'XPDR1-NETWORK1', 'org-openroadm-network-topology:tp-type': 'XPONDER-NETWORK',
924 # 'org-openroadm-network-topology:xpdr-client-attributes': {'tail-equipment-id': 'XPDR1-CLIENT1'}},
925 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][1])
926 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
927 # res['network'][0]['node'][i]['supporting-node'])
928 # listNode.remove(nodeId)
929 # elif(nodeId=='ROADMA-SRG1'):
930 # #Test related to SRG1
931 # self.assertEqual(nodeType,'SRG')
932 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
933 # self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
934 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
935 # self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
936 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
937 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
938 # res['network'][0]['node'][i]['supporting-node'])
939 # listNode.remove(nodeId)
940 # elif(nodeId=='ROADMA-DEG1'):
941 # #Test related to DEG1
942 # self.assertEqual(nodeType,'DEGREE')
943 # self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
944 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
945 # self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
946 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
947 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
948 # res['network'][0]['node'][i]['supporting-node'])
949 # listNode.remove(nodeId)
950 # elif(nodeId=='ROADMA-DEG2'):
951 # #Test related to DEG2
952 # self.assertEqual(nodeType,'DEGREE')
953 # self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
954 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
955 # self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
956 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
957 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
958 # res['network'][0]['node'][i]['supporting-node'])
959 # listNode.remove(nodeId)
961 # self.assertFalse(True)
962 # self.assertEqual(len(listNode),0)
963 # #Test related to SRG1 of ROADMC
964 # for i in range(0,nbNode):
965 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-SRG1')
966 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG1')
967 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG2')
969 def test_26_getOpenRoadmNetwork(self):
970 url = ("{}/config/ietf-network:network/openroadm-network"
971 .format(self.restconf_baseurl))
972 headers = {'content-type': 'application/json'}
973 response = requests.request(
974 "GET", url, headers=headers, auth=('admin', 'admin'))
975 self.assertEqual(response.status_code, requests.codes.ok)
976 res = response.json()
977 nbNode=len(res['network'][0]['node'])
978 self.assertEqual(nbNode,2)
979 for i in range(0,nbNode-1):
980 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC')
982 def test_27_getClliNetwork(self):
983 url = ("{}/config/ietf-network:network/clli-network"
984 .format(self.restconf_baseurl))
985 headers = {'content-type': 'application/json'}
986 response = requests.request(
987 "GET", url, headers=headers, auth=('admin', 'admin'))
988 self.assertEqual(response.status_code, requests.codes.ok)
989 res = response.json()
990 nbNode=len(res['network'][0]['node'])
991 self.assertEqual(nbNode,1)
992 for i in range(0,nbNode-1):
993 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
995 def test_28_disconnect_XPDRA(self):
996 url = ("{}/config/network-topology:"
997 "network-topology/topology/topology-netconf/node/XPDRA"
998 .format(self.restconf_baseurl))
1000 headers = {'content-type': 'application/json'}
1001 response = requests.request(
1002 "DELETE", url, data=json.dumps(data), headers=headers,
1003 auth=('admin', 'admin'))
1004 self.assertEqual(response.status_code, requests.codes.ok)
1005 #Delete in the openroadm-network
1006 url = ("{}/config/ietf-network:network/openroadm-network/node/XPDRA"
1007 .format(self.restconf_baseurl))
1009 headers = {'content-type': 'application/json'}
1010 response = requests.request(
1011 "DELETE", url, data=json.dumps(data), headers=headers,
1012 auth=('admin', 'admin'))
1013 self.assertEqual(response.status_code, requests.codes.ok)
1015 def test_29_getClliNetwork(self):
1016 url = ("{}/config/ietf-network:network/clli-network"
1017 .format(self.restconf_baseurl))
1018 headers = {'content-type': 'application/json'}
1019 response = requests.request(
1020 "GET", url, headers=headers, auth=('admin', 'admin'))
1021 self.assertEqual(response.status_code, requests.codes.ok)
1022 res = response.json()
1023 nbNode=len(res['network'][0]['node'])
1024 self.assertEqual(nbNode,1)
1025 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1027 def test_30_getOpenRoadmNetwork(self):
1028 url = ("{}/config/ietf-network:network/openroadm-network"
1029 .format(self.restconf_baseurl))
1030 headers = {'content-type': 'application/json'}
1031 response = requests.request(
1032 "GET", url, headers=headers, auth=('admin', 'admin'))
1033 self.assertEqual(response.status_code, requests.codes.ok)
1034 res = response.json()
1035 nbNode=len(res['network'][0]['node'])
1036 self.assertEqual(nbNode,1)
1037 for i in range(0,nbNode):
1038 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDRA')
1040 def test_31_getNodes_OpenRoadmTopology(self):
1041 url = ("{}/config/ietf-network:network/openroadm-topology"
1042 .format(self.restconf_baseurl))
1043 headers = {'content-type': 'application/json'}
1044 response = requests.request(
1045 "GET", url, headers=headers, auth=('admin', 'admin'))
1046 res = response.json()
1047 #Tests related to nodes
1048 self.assertEqual(response.status_code, requests.codes.ok)
1049 nbNode=len(res['network'][0]['node'])
1050 self.assertEqual(nbNode,3)
1051 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
1052 for i in range(0,nbNode):
1053 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1054 res['network'][0]['node'][i]['supporting-node'])
1055 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
1056 nodeId=res['network'][0]['node'][i]['node-id']
1057 if(nodeId=='ROADMA-SRG1'):
1058 #Test related to SRG1
1059 self.assertEqual(nodeType,'SRG')
1060 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
1061 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
1062 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1063 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
1064 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1065 listNode.remove(nodeId)
1066 elif(nodeId=='ROADMA-DEG1'):
1067 #Test related to DEG1
1068 self.assertEqual(nodeType,'DEGREE')
1069 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1070 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1071 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1072 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1073 listNode.remove(nodeId)
1074 elif(nodeId=='ROADMA-DEG2'):
1075 #Test related to DEG2
1076 self.assertEqual(nodeType,'DEGREE')
1077 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1078 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1079 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1080 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1081 listNode.remove(nodeId)
1083 self.assertFalse(True)
1084 self.assertEqual(len(listNode),0)
1086 def test_32_disconnect_ROADM_XPDRA_link(self):
1088 url = ("{}/config/ietf-network:network/openroadm-topology/ietf-network-topology:"
1089 "link/XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX"
1090 .format(self.restconf_baseurl))
1092 headers = {'content-type': 'application/json'}
1093 response = requests.request(
1094 "DELETE", url, data=json.dumps(data), headers=headers,
1095 auth=('admin', 'admin'))
1096 self.assertEqual(response.status_code, requests.codes.ok)
1098 url = ("{}/config/ietf-network:network/openroadm-topology/ietf-network-topology:"
1099 "link/ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1"
1100 .format(self.restconf_baseurl))
1102 headers = {'content-type': 'application/json'}
1103 response = requests.request(
1104 "DELETE", url, data=json.dumps(data), headers=headers,
1105 auth=('admin', 'admin'))
1106 self.assertEqual(response.status_code, requests.codes.ok)
1108 # def test_33_getLinks_OpenRoadmTopology(self):
1109 # url = ("{}/config/ietf-network:network/openroadm-topology"
1110 # .format(self.restconf_baseurl))
1111 # headers = {'content-type': 'application/json'}
1112 # response = requests.request(
1113 # "GET", url, headers=headers, auth=('admin', 'admin'))
1114 # self.assertEqual(response.status_code, requests.codes.ok)
1115 # res = response.json()
1116 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1117 # self.assertEqual(nbLink,6)
1118 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1119 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
1120 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1121 # for i in range(0,nbLink):
1122 # if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
1123 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1124 # find= link_id in expressLink
1125 # self.assertEqual(find, True)
1126 # expressLink.remove(link_id)
1127 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
1128 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1129 # find= link_id in addLink
1130 # self.assertEqual(find, True)
1131 # addLink.remove(link_id)
1132 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
1133 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1134 # find= link_id in dropLink
1135 # self.assertEqual(find, True)
1136 # dropLink.remove(link_id)
1138 # self.assertFalse(True)
1139 # self.assertEqual(len(expressLink),0)
1140 # self.assertEqual(len(addLink),0)
1141 # self.assertEqual(len(dropLink),0)
1142 # for i in range(0,nbLink):
1143 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-OUTPUT')
1144 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-INPUT')
1146 def test_34_disconnect_ROADMA(self):
1147 url = ("{}/config/network-topology:"
1148 "network-topology/topology/topology-netconf/node/ROADMA"
1149 .format(self.restconf_baseurl))
1151 headers = {'content-type': 'application/json'}
1152 response = requests.request(
1153 "DELETE", url, data=json.dumps(data), headers=headers,
1154 auth=('admin', 'admin'))
1155 self.assertEqual(response.status_code, requests.codes.ok)
1156 #Delete in the clli-network
1157 url = ("{}/config/ietf-network:network/clli-network/node/NodeA"
1158 .format(self.restconf_baseurl))
1160 headers = {'content-type': 'application/json'}
1161 response = requests.request(
1162 "DELETE", url, data=json.dumps(data), headers=headers,
1163 auth=('admin', 'admin'))
1164 self.assertEqual(response.status_code, requests.codes.ok)
1165 #Delete in the openroadm-network
1166 url = ("{}/config/ietf-network:network/openroadm-network/node/ROADMA"
1167 .format(self.restconf_baseurl))
1169 headers = {'content-type': 'application/json'}
1170 response = requests.request(
1171 "DELETE", url, data=json.dumps(data), headers=headers,
1172 auth=('admin', 'admin'))
1173 self.assertEqual(response.status_code, requests.codes.ok)
1176 def test_35_getClliNetwork(self):
1177 url = ("{}/config/ietf-network:network/clli-network"
1178 .format(self.restconf_baseurl))
1179 headers = {'content-type': 'application/json'}
1180 response = requests.request(
1181 "GET", url, headers=headers, auth=('admin', 'admin'))
1182 self.assertEqual(response.status_code, requests.codes.ok)
1183 res = response.json()
1184 self.assertNotIn('node', res['network'][0])
1186 def test_36_getOpenRoadmNetwork(self):
1187 url = ("{}/config/ietf-network:network/openroadm-network"
1188 .format(self.restconf_baseurl))
1189 headers = {'content-type': 'application/json'}
1190 response = requests.request(
1191 "GET", url, headers=headers, auth=('admin', 'admin'))
1192 self.assertEqual(response.status_code, requests.codes.ok)
1193 res = response.json()
1194 self.assertNotIn('node', res['network'][0])
1196 # def test_37_getOpenRoadmTopology(self):
1197 # url = ("{}/config/ietf-network:network/openroadm-topology"
1198 # .format(self.restconf_baseurl))
1199 # headers = {'content-type': 'application/json'}
1200 # response = requests.request(
1201 # "GET", url, headers=headers, auth=('admin', 'admin'))
1202 # self.assertEqual(response.status_code, requests.codes.ok)
1203 # res = response.json()
1204 # self.assertNotIn('node', res['network'][0])
1205 # self.assertNotIn('ietf-network-topology:link', res['network'][0])
1207 if __name__ == "__main__":
1208 #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
1209 #logging.debug('I am there')
1210 unittest.main(verbosity=2)