3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
23 class TransportPCEtesting(unittest.TestCase):
25 testtools_process1 = None
26 testtools_process2 = None
27 testtools_process3 = None
28 testtools_process4 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
33 def __start_testtools(cls):
34 executable = ("./netconf/netconf/tools/netconf-testtool/target/"
35 "netconf-testtool-1.5.0-SNAPSHOT-executable.jar")
36 if os.path.isfile(executable):
37 if not os.path.exists("transportpce_tests/log"):
38 os.makedirs("transportpce_tests/log")
39 if os.path.isfile("./transportpce_tests/log/response.log"):
40 os.remove("transportpce_tests/log/response.log")
41 with open('transportpce_tests/log/testtools_ROADMA.log', 'w') as outfile1:
42 cls.testtools_process1 = subprocess.Popen(
43 ["java", "-jar", executable, "--schemas-dir", "schemas",
44 "--initial-config-xml", "sample_configs/nodes_config/sample-config-ROADMA.xml","--starting-port","17831"],
46 with open('transportpce_tests/log/testtools_ROADMB.log', 'w') as outfile2:
47 cls.testtools_process2 = subprocess.Popen(
48 ["java", "-jar", executable, "--schemas-dir", "schemas",
49 "--initial-config-xml", "sample_configs/nodes_config/sample-config-ROADMB.xml","--starting-port","17832"],
51 with open('transportpce_tests/log/testtools_ROADMC.log', 'w') as outfile3:
52 cls.testtools_process3 = subprocess.Popen(
53 ["java", "-jar", executable, "--schemas-dir", "schemas",
54 "--initial-config-xml", "sample_configs/nodes_config/sample-config-ROADMC.xml","--starting-port","17833"],
56 with open('transportpce_tests/log/testtools_XPDRA.log', 'w') as outfile4:
57 cls.testtools_process4 = subprocess.Popen(
58 ["java", "-jar", executable, "--schemas-dir", "schemas",
59 "--initial-config-xml", "sample_configs/nodes_config/sample-config-XPDRA.xml","--starting-port","17830"],
64 executable = "../karaf/target/assembly/bin/karaf"
65 with open('transportpce_tests/log/odl.log', 'w') as outfile:
66 cls.odl_process = subprocess.Popen(
67 ["bash", executable, "server"], stdout=outfile,
68 stdin=open(os.devnull))
72 cls.__start_testtools()
77 def tearDownClass(cls):
78 cls.testtools_process1.send_signal(signal.SIGINT)
79 cls.testtools_process1.wait()
80 cls.testtools_process2.send_signal(signal.SIGINT)
81 cls.testtools_process2.wait()
82 cls.testtools_process3.send_signal(signal.SIGINT)
83 cls.testtools_process3.wait()
84 cls.testtools_process4.send_signal(signal.SIGINT)
85 cls.testtools_process4.wait()
86 for child in psutil.Process(cls.odl_process.pid).children():
87 child.send_signal(signal.SIGINT)
89 cls.odl_process.send_signal(signal.SIGINT)
90 cls.odl_process.wait()
91 print('End of the tear down class')
96 def test_01_connect_ROADMA(self):
98 url = ("{}/config/network-topology:"
99 "network-topology/topology/topology-netconf/node/ROADMA"
100 .format(self.restconf_baseurl))
103 "netconf-node-topology:username": "admin",
104 "netconf-node-topology:password": "admin",
105 "netconf-node-topology:host": "127.0.0.1",
106 "netconf-node-topology:port": "17831",
107 "netconf-node-topology:tcp-only": "false",
108 "netconf-node-topology:pass-through": {}}]}
109 headers = {'content-type': 'application/json'}
110 response = requests.request(
111 "PUT", url, data=json.dumps(data), headers=headers,
112 auth=('admin', 'admin'))
113 self.assertEqual(response.status_code, requests.codes.created)
116 def test_02_getClliNetwork(self):
117 url = ("{}/config/ietf-network:network/clli-network"
118 .format(self.restconf_baseurl))
119 headers = {'content-type': 'application/json'}
120 response = requests.request(
121 "GET", url, headers=headers, auth=('admin', 'admin'))
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
125 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
127 def test_03_getOpenRoadmNetwork(self):
128 url = ("{}/config/ietf-network:network/openroadm-network"
129 .format(self.restconf_baseurl))
130 headers = {'content-type': 'application/json'}
131 response = requests.request(
132 "GET", url, headers=headers, auth=('admin', 'admin'))
133 self.assertEqual(response.status_code, requests.codes.ok)
134 res = response.json()
135 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADMA')
136 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
137 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
138 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:node-type'],'ROADM')
139 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'2')
141 def test_04_getLinks_OpenroadmTopology(self):
142 url = ("{}/config/ietf-network:network/openroadm-topology"
143 .format(self.restconf_baseurl))
144 headers = {'content-type': 'application/json'}
145 response = requests.request(
146 "GET", url, headers=headers, auth=('admin', 'admin'))
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 #Tests related to links
150 nbLink=len(res['network'][0]['ietf-network-topology:link'])
151 self.assertEqual(nbLink,6)
152 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
153 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
154 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
155 for i in range(0,nbLink):
156 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
157 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
158 find= linkId in expressLink
159 self.assertEqual(find, True)
160 expressLink.remove(linkId)
161 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
162 find= linkId in addLink
163 self.assertEqual(find, True)
164 addLink.remove(linkId)
165 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
166 find= linkId in dropLink
167 self.assertEqual(find, True)
168 dropLink.remove(linkId)
170 self.assertFalse(True)
171 self.assertEqual(len(expressLink),0)
172 self.assertEqual(len(addLink),0)
173 self.assertEqual(len(dropLink),0)
175 def test_05_getNodes_OpenRoadmTopology(self):
176 url = ("{}/config/ietf-network:network/openroadm-topology"
177 .format(self.restconf_baseurl))
178 headers = {'content-type': 'application/json'}
179 response = requests.request(
180 "GET", url, headers=headers, auth=('admin', 'admin'))
181 res = response.json()
182 #Tests related to nodes
183 self.assertEqual(response.status_code, requests.codes.ok)
184 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
185 outfile1.write(str(len(res['network'][0]['node'])))
186 nbNode=len(res['network'][0]['node'])
187 self.assertEqual(nbNode,3)
188 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
189 for i in range(0,nbNode):
190 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
191 res['network'][0]['node'][i]['supporting-node'])
192 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
193 nodeId=res['network'][0]['node'][i]['node-id']
194 if(nodeId=='ROADMA-SRG1'):
195 #Test related to SRG1
196 self.assertEqual(nodeType,'SRG')
197 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
198 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
199 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
200 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
201 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
202 listNode.remove(nodeId)
203 elif(nodeId=='ROADMA-DEG1'):
204 #Test related to DEG1
205 self.assertEqual(nodeType,'DEGREE')
206 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
207 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
208 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
209 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
210 listNode.remove(nodeId)
211 elif(nodeId=='ROADMA-DEG2'):
212 #Test related to DEG2
213 self.assertEqual(nodeType,'DEGREE')
214 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
215 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
216 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
217 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
218 listNode.remove(nodeId)
220 self.assertFalse(True)
221 self.assertEqual(len(listNode),0)
223 def test_06_connect_XPDRA(self):
224 url = ("{}/config/network-topology:"
225 "network-topology/topology/topology-netconf/node/XPDRA"
226 .format(self.restconf_baseurl))
229 "netconf-node-topology:username": "admin",
230 "netconf-node-topology:password": "admin",
231 "netconf-node-topology:host": "127.0.0.1",
232 "netconf-node-topology:port": "17830",
233 "netconf-node-topology:tcp-only": "false",
234 "netconf-node-topology:pass-through": {}}]}
235 headers = {'content-type': 'application/json'}
236 response = requests.request(
237 "PUT", url, data=json.dumps(data), headers=headers,
238 auth=('admin', 'admin'))
239 self.assertEqual(response.status_code, requests.codes.created)
242 def test_07_getClliNetwork(self):
243 url = ("{}/config/ietf-network:network/clli-network"
244 .format(self.restconf_baseurl))
245 headers = {'content-type': 'application/json'}
246 response = requests.request(
247 "GET", url, headers=headers, auth=('admin', 'admin'))
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
251 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
253 def test_08_getOpenRoadmNetwork(self):
254 url = ("{}/config/ietf-network:network/openroadm-network"
255 .format(self.restconf_baseurl))
256 headers = {'content-type': 'application/json'}
257 response = requests.request(
258 "GET", url, headers=headers, auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
261 nbNode=len(res['network'][0]['node'])
262 self.assertEqual(nbNode,2)
263 for i in range(0,nbNode):
264 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
265 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
266 nodeId=res['network'][0]['node'][i]['node-id']
268 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
269 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
270 elif(nodeId=='ROADMA'):
271 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
272 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
274 self.assertFalse(True)
276 def test_09_getNodes_OpenRoadmTopology(self):
277 url = ("{}/config/ietf-network:network/openroadm-topology"
278 .format(self.restconf_baseurl))
279 headers = {'content-type': 'application/json'}
280 response = requests.request(
281 "GET", url, headers=headers, auth=('admin', 'admin'))
282 res = response.json()
283 #Tests related to nodes
284 self.assertEqual(response.status_code, requests.codes.ok)
285 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
286 outfile1.write(str(len(res['network'][0]['node'])))
287 nbNode=len(res['network'][0]['node'])
288 self.assertEqual(nbNode,4)
289 listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
290 for i in range(0,nbNode):
291 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
292 nodeId=res['network'][0]['node'][i]['node-id']
293 #Tests related to XPDRA nodes
294 if(nodeId=='XPDRA-XPDR1'):
295 self.assertEqual(nodeType,'XPONDER')
296 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),2)
297 self.assertEqual({'tp-id': 'XPDR1-CLIENT1', 'org-openroadm-network-topology:tp-type': 'XPONDER-CLIENT',
298 'org-openroadm-network-topology:xpdr-network-attributes': {'tail-equipment-id': 'XPDR1-NETWORK1'}},
299 res['network'][0]['node'][i]['ietf-network-topology:termination-point'][0])
300 self.assertEqual({'tp-id': 'XPDR1-NETWORK1', 'org-openroadm-network-topology:tp-type': 'XPONDER-NETWORK',
301 'org-openroadm-network-topology:xpdr-client-attributes': {'tail-equipment-id': 'XPDR1-CLIENT1'}},
302 res['network'][0]['node'][i]['ietf-network-topology:termination-point'][1])
303 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
304 res['network'][0]['node'][i]['supporting-node'])
305 listNode.remove(nodeId)
306 elif(nodeId=='ROADMA-SRG1'):
307 #Test related to SRG1
308 self.assertEqual(nodeType,'SRG')
309 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
310 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
311 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
312 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
313 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
314 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
315 res['network'][0]['node'][i]['supporting-node'])
316 listNode.remove(nodeId)
317 elif(nodeId=='ROADMA-DEG1'):
318 #Test related to DEG1
319 self.assertEqual(nodeType,'DEGREE')
320 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
321 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
322 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
323 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
324 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
325 res['network'][0]['node'][i]['supporting-node'])
326 listNode.remove(nodeId)
327 elif(nodeId=='ROADMA-DEG2'):
328 #Test related to DEG2
329 self.assertEqual(nodeType,'DEGREE')
330 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
331 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
332 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
333 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
334 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
335 res['network'][0]['node'][i]['supporting-node'])
336 listNode.remove(nodeId)
338 self.assertFalse(True)
339 self.assertEqual(len(listNode),0)
341 #Connect the tail XPDRA to ROADMA and vice versa
342 def test_10_connect_tail_xpdr_rdm(self):
343 #Connect the tail: XPDRA to ROADMA
344 url = ("{}/operations/networkutils:init-xpdr-rdm-links"
345 .format(self.restconf_baseurl))
346 data = {"networkutils:input": {
347 "networkutils:links-input": {
348 "networkutils:xpdr-node": "XPDRA",
349 "networkutils:xpdr-num": "1",
350 "networkutils:network-num": "1",
351 "networkutils:rdm-node": "ROADMA",
352 "networkutils:srg-num": "1",
353 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
357 headers = {'content-type': 'application/json'}
358 response = requests.request(
359 "POST", url, data=json.dumps(data), headers=headers,
360 auth=('admin', 'admin'))
361 self.assertEqual(response.status_code, requests.codes.ok)
364 def test_11_connect_tail_rdm_xpdr(self):
365 #Connect the tail: ROADMA to XPDRA
366 url = ("{}/operations/networkutils:init-rdm-xpdr-links"
367 .format(self.restconf_baseurl))
368 data = {"networkutils:input": {
369 "networkutils:links-input": {
370 "networkutils:xpdr-node": "XPDRA",
371 "networkutils:xpdr-num": "1",
372 "networkutils:network-num": "1",
373 "networkutils:rdm-node": "ROADMA",
374 "networkutils:srg-num": "1",
375 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
379 headers = {'content-type': 'application/json'}
380 response = requests.request(
381 "POST", url, data=json.dumps(data), headers=headers,
382 auth=('admin', 'admin'))
383 self.assertEqual(response.status_code, requests.codes.ok)
386 def test_12_getLinks_OpenRoadmTopology(self):
387 url = ("{}/config/ietf-network:network/openroadm-topology"
388 .format(self.restconf_baseurl))
389 headers = {'content-type': 'application/json'}
390 response = requests.request(
391 "GET", url, headers=headers, auth=('admin', 'admin'))
392 self.assertEqual(response.status_code, requests.codes.ok)
393 res = response.json()
394 #Tests related to links
395 nbLink=len(res['network'][0]['ietf-network-topology:link'])
396 self.assertEqual(nbLink,8)
397 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
398 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
399 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
400 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
401 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
402 for i in range(0,nbLink):
403 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
404 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
405 if(nodeType=='EXPRESS-LINK'):
406 find= linkId in expressLink
407 self.assertEqual(find, True)
408 expressLink.remove(linkId)
409 elif(nodeType=='ADD-LINK'):
410 find= linkId in addLink
411 self.assertEqual(find, True)
412 addLink.remove(linkId)
413 elif(nodeType=='DROP-LINK'):
414 find= linkId in dropLink
415 self.assertEqual(find, True)
416 dropLink.remove(linkId)
417 elif(nodeType=='XPONDER-INPUT'):
418 find= linkId in XPDR_IN
419 self.assertEqual(find, True)
420 XPDR_IN.remove(linkId)
421 elif(nodeType=='XPONDER-OUTPUT'):
422 find= linkId in XPDR_OUT
423 self.assertEqual(find, True)
424 XPDR_OUT.remove(linkId)
426 self.assertFalse(True)
427 self.assertEqual(len(expressLink),0)
428 self.assertEqual(len(addLink),0)
429 self.assertEqual(len(dropLink),0)
430 self.assertEqual(len(XPDR_IN),0)
431 self.assertEqual(len(XPDR_OUT),0)
433 def test_13_connect_ROADMC(self):
435 url = ("{}/config/network-topology:"
436 "network-topology/topology/topology-netconf/node/ROADMC"
437 .format(self.restconf_baseurl))
440 "netconf-node-topology:username": "admin",
441 "netconf-node-topology:password": "admin",
442 "netconf-node-topology:host": "127.0.0.1",
443 "netconf-node-topology:port": "17833",
444 "netconf-node-topology:tcp-only": "false",
445 "netconf-node-topology:pass-through": {}}]}
446 headers = {'content-type': 'application/json'}
447 response = requests.request(
448 "PUT", url, data=json.dumps(data), headers=headers,
449 auth=('admin', 'admin'))
450 self.assertEqual(response.status_code, requests.codes.created)
453 def test_14_getClliNetwork(self):
454 url = ("{}/config/ietf-network:network/clli-network"
455 .format(self.restconf_baseurl))
456 headers = {'content-type': 'application/json'}
457 response = requests.request(
458 "GET", url, headers=headers, auth=('admin', 'admin'))
459 self.assertEqual(response.status_code, requests.codes.ok)
460 res = response.json()
461 nbNode=len(res['network'][0]['node'])
462 listNode=['NodeA','NodeC']
463 for i in range(0,nbNode):
464 nodeId = res['network'][0]['node'][i]['node-id']
465 find= nodeId in listNode
466 self.assertEqual(find, True)
468 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
470 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
471 listNode.remove(nodeId)
473 self.assertEqual(len(listNode),0)
475 def test_15_getOpenRoadmNetwork(self):
476 url = ("{}/config/ietf-network:network/openroadm-network"
477 .format(self.restconf_baseurl))
478 headers = {'content-type': 'application/json'}
479 response = requests.request(
480 "GET", url, headers=headers, auth=('admin', 'admin'))
481 self.assertEqual(response.status_code, requests.codes.ok)
482 res = response.json()
483 nbNode=len(res['network'][0]['node'])
484 self.assertEqual(nbNode,3)
485 listNode=['XPDRA','ROADMA','ROADMC']
486 for i in range(0,nbNode):
487 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
488 nodeId=res['network'][0]['node'][i]['node-id']
490 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
491 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
492 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'1')
493 listNode.remove(nodeId)
494 elif(nodeId=='ROADMA'):
495 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
496 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
497 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
498 listNode.remove(nodeId)
499 elif(nodeId=='ROADMC'):
500 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
501 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
502 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'2')
503 listNode.remove(nodeId)
505 self.assertFalse(True)
506 self.assertEqual(len(listNode),0)
508 def test_16_getROADMLinkOpenRoadmTopology(self):
509 url = ("{}/config/ietf-network:network/openroadm-topology"
510 .format(self.restconf_baseurl))
511 headers = {'content-type': 'application/json'}
512 response = requests.request(
513 "GET", url, headers=headers, auth=('admin', 'admin'))
514 self.assertEqual(response.status_code, requests.codes.ok)
515 res = response.json()
516 #Tests related to links
517 nbLink=len(res['network'][0]['ietf-network-topology:link'])
518 self.assertEqual(nbLink,16)
519 expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX',
520 'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX','ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX']
521 addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',
522 'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX','ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX']
523 dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX',
524 'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX','ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX']
525 R2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX']
526 XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
527 XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
528 for i in range(0,nbLink):
529 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
530 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
531 if(nodeType=='EXPRESS-LINK'):
532 find= linkId in expressLink
533 self.assertEqual(find, True)
534 expressLink.remove(linkId)
535 elif(nodeType=='ADD-LINK'):
536 find= linkId in addLink
537 self.assertEqual(find, True)
538 addLink.remove(linkId)
539 elif(nodeType=='DROP-LINK'):
540 find= linkId in dropLink
541 self.assertEqual(find, True)
542 dropLink.remove(linkId)
543 elif(nodeType=='ROADM-TO-ROADM'):
544 find= linkId in R2RLink
545 self.assertEqual(find, True)
546 R2RLink.remove(linkId)
547 elif(nodeType=='XPONDER-INPUT'):
548 find= linkId in XPDR_IN
549 self.assertEqual(find, True)
550 XPDR_IN.remove(linkId)
551 elif(nodeType=='XPONDER-OUTPUT'):
552 find= linkId in XPDR_OUT
553 self.assertEqual(find, True)
554 XPDR_OUT.remove(linkId)
556 self.assertFalse(True)
557 self.assertEqual(len(expressLink),0)
558 self.assertEqual(len(addLink),0)
559 self.assertEqual(len(dropLink),0)
560 self.assertEqual(len(R2RLink),0)
561 self.assertEqual(len(XPDR_IN),0)
562 self.assertEqual(len(XPDR_OUT),0)
564 def test_17_getNodes_OpenRoadmTopology(self):
565 url = ("{}/config/ietf-network:network/openroadm-topology"
566 .format(self.restconf_baseurl))
567 headers = {'content-type': 'application/json'}
568 response = requests.request(
569 "GET", url, headers=headers, auth=('admin', 'admin'))
570 res = response.json()
571 #Tests related to nodes
572 self.assertEqual(response.status_code, requests.codes.ok)
573 nbNode=len(res['network'][0]['node'])
574 self.assertEqual(nbNode,7)
575 listNode=['XPDRA-XPDR1',
576 'ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2',
577 'ROADMC-SRG1','ROADMC-DEG1','ROADMC-DEG2']
578 #************************Tests related to XPDRA nodes
579 for i in range(0,nbNode):
580 nodeId=res['network'][0]['node'][i]['node-id']
581 if(nodeId=='XPDRA-XPDR1'):
582 #Test related to XPDR1
583 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),2)
584 self.assertEqual({'tp-id': 'XPDR1-CLIENT1', 'org-openroadm-network-topology:tp-type': 'XPONDER-CLIENT',
585 'org-openroadm-network-topology:xpdr-network-attributes': {'tail-equipment-id': 'XPDR1-NETWORK1'}},
586 res['network'][0]['node'][i]['ietf-network-topology:termination-point'][0])
587 self.assertEqual({'tp-id': 'XPDR1-NETWORK1', 'org-openroadm-network-topology:tp-type': 'XPONDER-NETWORK',
588 'org-openroadm-network-topology:xpdr-client-attributes': {'tail-equipment-id': 'XPDR1-CLIENT1'}},
589 res['network'][0]['node'][i]['ietf-network-topology:termination-point'][1])
590 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
591 res['network'][0]['node'][i]['supporting-node'])
592 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'XPONDER')
593 listNode.remove(nodeId)
594 elif(nodeId=='ROADMA-SRG1'):
595 #Test related to SRG1
596 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
597 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
598 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
599 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
600 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
601 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
602 res['network'][0]['node'][i]['supporting-node'])
603 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
604 listNode.remove(nodeId)
605 elif(nodeId=='ROADMA-DEG1'):
606 #Test related to DEG1
607 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
608 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
609 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
610 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
611 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
612 res['network'][0]['node'][i]['supporting-node'])
613 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
614 listNode.remove(nodeId)
615 elif(nodeId=='ROADMA-DEG2'):
616 #Test related to DEG2
617 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
618 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
619 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
620 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
621 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
622 res['network'][0]['node'][i]['supporting-node'])
623 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
624 listNode.remove(nodeId)
625 elif(nodeId=='ROADMC-SRG1'):
626 #Test related to SRG1
627 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
628 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
629 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
630 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
631 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
632 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
633 res['network'][0]['node'][i]['supporting-node'])
634 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
635 listNode.remove(nodeId)
636 elif(nodeId=='ROADMC-DEG1'):
637 #Test related to DEG1
638 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
639 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
640 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
641 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
642 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
643 res['network'][0]['node'][i]['supporting-node'])
644 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
645 listNode.remove(nodeId)
646 elif(nodeId=='ROADMC-DEG2'):
647 #Test related to DEG2
648 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
649 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
650 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
651 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
652 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC'},
653 res['network'][0]['node'][i]['supporting-node'])
654 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
655 listNode.remove(nodeId)
657 self.assertFalse(True)
658 self.assertEqual(len(listNode),0)
660 def test_18_connect_ROADMB(self):
661 url = ("{}/config/network-topology:"
662 "network-topology/topology/topology-netconf/node/ROADMB"
663 .format(self.restconf_baseurl))
666 "netconf-node-topology:username": "admin",
667 "netconf-node-topology:password": "admin",
668 "netconf-node-topology:host": "127.0.0.1",
669 "netconf-node-topology:port": "17832",
670 "netconf-node-topology:tcp-only": "false",
671 "netconf-node-topology:pass-through": {}}]}
672 headers = {'content-type': 'application/json'}
673 response = requests.request(
674 "PUT", url, data=json.dumps(data), headers=headers,
675 auth=('admin', 'admin'))
676 self.assertEqual(response.status_code, requests.codes.created)
679 def test_19_getClliNetwork(self):
680 url = ("{}/config/ietf-network:network/clli-network"
681 .format(self.restconf_baseurl))
682 headers = {'content-type': 'application/json'}
683 response = requests.request(
684 "GET", url, headers=headers, auth=('admin', 'admin'))
685 self.assertEqual(response.status_code, requests.codes.ok)
686 res = response.json()
687 nbNode=len(res['network'][0]['node'])
688 listNode=['NodeA','NodeB','NodeC']
689 for i in range(0,nbNode):
690 nodeId = res['network'][0]['node'][i]['node-id']
691 find= nodeId in listNode
692 self.assertEqual(find, True)
694 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
695 elif(nodeId=='NodeB'):
696 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
698 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
699 listNode.remove(nodeId)
701 self.assertEqual(len(listNode),0)
703 def test_20_verifyDegree(self):
704 url = ("{}/config/ietf-network:network/openroadm-topology"
705 .format(self.restconf_baseurl))
706 headers = {'content-type': 'application/json'}
707 response = requests.request(
708 "GET", url, headers=headers, auth=('admin', 'admin'))
709 self.assertEqual(response.status_code, requests.codes.ok)
710 res = response.json()
711 #Tests related to links
712 nbLink=len(res['network'][0]['ietf-network-topology:link'])
713 listR2RLink=['ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX','ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX',
714 'ROADMA-DEG2-DEG2-TTP-TXRXtoROADMB-DEG1-DEG1-TTP-TXRX','ROADMC-DEG1-DEG1-TTP-TXRXtoROADMB-DEG2-DEG2-TTP-TXRX',
715 'ROADMB-DEG1-DEG1-TTP-TXRXtoROADMA-DEG2-DEG2-TTP-TXRX','ROADMB-DEG2-DEG2-TTP-TXRXtoROADMC-DEG1-DEG1-TTP-TXRX']
716 for i in range(0,nbLink):
717 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'] == 'ROADM-TO-ROADM':
718 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
719 find= link_id in listR2RLink
720 self.assertEqual(find, True)
721 listR2RLink.remove(link_id)
722 self.assertEqual(len(listR2RLink),0)
724 def test_21_verifyOppositeLinkTopology(self):
725 url = ("{}/config/ietf-network:network/openroadm-topology"
726 .format(self.restconf_baseurl))
727 headers = {'content-type': 'application/json'}
728 response = requests.request(
729 "GET", url, headers=headers, auth=('admin', 'admin'))
730 self.assertEqual(response.status_code, requests.codes.ok)
731 res = response.json()
732 #Write the response in the log
733 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
734 outfile1.write(str(res))
735 #Tests related to links
736 nbLink=len(res['network'][0]['ietf-network-topology:link'])
737 self.assertEqual(nbLink,26)
738 for i in range(0,nbLink):
739 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
740 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
741 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
742 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
743 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-opposite-links:opposite-link']
744 #Find the opposite link
745 url_oppLink="{}/config/ietf-network:network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
746 url = (url_oppLink.format(self.restconf_baseurl))
747 headers = {'content-type': 'application/json'}
748 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
749 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
750 res_oppLink = response_oppLink.json()
751 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-opposite-links:opposite-link'],link_id)
752 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
753 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
754 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-network-topology:link-type']
755 if link_type=='ADD-LINK':
756 self.assertEqual(oppLink_type, 'DROP-LINK')
757 elif link_type=='DROP-LINK':
758 self.assertEqual(oppLink_type, 'ADD-LINK')
759 elif link_type=='EXPRESS-LINK':
760 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
761 elif link_type=='ROADM-TO-ROADM':
762 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
763 elif link_type=='XPONDER-INPUT':
764 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
765 elif link_type=='XPONDER-OUTPUT':
766 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
769 def test_22_disconnect_ROADMB(self):
770 #Delete in the topology-netconf
771 url = ("{}/config/network-topology:"
772 "network-topology/topology/topology-netconf/node/ROADMB"
773 .format(self.restconf_baseurl))
775 headers = {'content-type': 'application/json'}
776 response = requests.request(
777 "DELETE", url, data=json.dumps(data), headers=headers,
778 auth=('admin', 'admin'))
779 self.assertEqual(response.status_code, requests.codes.ok)
780 #Delete in the clli-network
781 url = ("{}/config/ietf-network:network/clli-network/node/NodeB"
782 .format(self.restconf_baseurl))
784 headers = {'content-type': 'application/json'}
785 response = requests.request(
786 "DELETE", url, data=json.dumps(data), headers=headers,
787 auth=('admin', 'admin'))
788 self.assertEqual(response.status_code, requests.codes.ok)
789 #Delete in the openroadm-network
790 url = ("{}/config/ietf-network:network/openroadm-network/node/ROADMB"
791 .format(self.restconf_baseurl))
793 headers = {'content-type': 'application/json'}
794 response = requests.request(
795 "DELETE", url, data=json.dumps(data), headers=headers,
796 auth=('admin', 'admin'))
797 self.assertEqual(response.status_code, requests.codes.ok)
799 def test_23_disconnect_ROADMC(self):
800 #Delete in the topology-netconf
801 url = ("{}/config/network-topology:"
802 "network-topology/topology/topology-netconf/node/ROADMC"
803 .format(self.restconf_baseurl))
805 headers = {'content-type': 'application/json'}
806 response = requests.request(
807 "DELETE", url, data=json.dumps(data), headers=headers,
808 auth=('admin', 'admin'))
809 self.assertEqual(response.status_code, requests.codes.ok)
810 #Delete in the clli-network
811 url = ("{}/config/ietf-network:network/clli-network/node/NodeC"
812 .format(self.restconf_baseurl))
814 headers = {'content-type': 'application/json'}
815 response = requests.request(
816 "DELETE", url, data=json.dumps(data), headers=headers,
817 auth=('admin', 'admin'))
818 self.assertEqual(response.status_code, requests.codes.ok)
819 #Delete in the openroadm-network
820 url = ("{}/config/ietf-network:network/openroadm-network/node/ROADMC"
821 .format(self.restconf_baseurl))
823 headers = {'content-type': 'application/json'}
824 response = requests.request(
825 "DELETE", url, data=json.dumps(data), headers=headers,
826 auth=('admin', 'admin'))
827 self.assertEqual(response.status_code, requests.codes.ok)
829 # def test_24_getLinks_OpenRoadmTopology(self):
830 # url = ("{}/config/ietf-network:network/openroadm-topology"
831 # .format(self.restconf_baseurl))
832 # headers = {'content-type': 'application/json'}
833 # response = requests.request(
834 # "GET", url, headers=headers, auth=('admin', 'admin'))
835 # self.assertEqual(response.status_code, requests.codes.ok)
836 # res = response.json()
837 # #Write the response in the log
838 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
839 # outfile1.write(str(res))
840 # #Tests related to links
841 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
842 # self.assertEqual(nbLink,8)
843 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
844 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
845 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
846 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
847 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
848 # for i in range(0,nbLink):
849 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
850 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
851 # if(nodeType=='EXPRESS-LINK'):
852 # find= linkId in expressLink
853 # self.assertEqual(find, True)
854 # expressLink.remove(linkId)
855 # elif(nodeType=='ADD-LINK'):
856 # find= linkId in addLink
857 # self.assertEqual(find, True)
858 # addLink.remove(linkId)
859 # elif(nodeType=='DROP-LINK'):
860 # find= linkId in dropLink
861 # self.assertEqual(find, True)
862 # dropLink.remove(linkId)
863 # elif(nodeType=='XPONDER-INPUT'):
864 # find= linkId in XPDR_IN
865 # self.assertEqual(find, True)
866 # XPDR_IN.remove(linkId)
867 # elif(nodeType=='XPONDER-OUTPUT'):
868 # find= linkId in XPDR_OUT
869 # self.assertEqual(find, True)
870 # XPDR_OUT.remove(linkId)
872 # self.assertFalse(True)
873 # self.assertEqual(len(expressLink),0)
874 # self.assertEqual(len(addLink),0)
875 # self.assertEqual(len(dropLink),0)
876 # self.assertEqual(len(XPDR_IN),0)
877 # self.assertEqual(len(XPDR_OUT),0)
879 # for i in range(0,nbLink):
880 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'ROADM-TO-ROADM')
881 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
882 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
883 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
884 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
885 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
886 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
888 # def test_25_getNodes_OpenRoadmTopology(self):
889 # url = ("{}/config/ietf-network:network/openroadm-topology"
890 # .format(self.restconf_baseurl))
891 # headers = {'content-type': 'application/json'}
892 # response = requests.request(
893 # "GET", url, headers=headers, auth=('admin', 'admin'))
894 # res = response.json()
895 # #Tests related to nodes
896 # self.assertEqual(response.status_code, requests.codes.ok)
897 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
898 # outfile1.write(str(len(res['network'][0]['node'])))
899 # nbNode=len(res['network'][0]['node'])
900 # self.assertEqual(nbNode,4)
901 # listNode=['XPDRA-XPDR1','ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
902 # for i in range(0,nbNode):
903 # nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
904 # nodeId=res['network'][0]['node'][i]['node-id']
905 # #Tests related to XPDRA nodes
906 # if(nodeId=='XPDRA-XPDR1'):
907 # self.assertEqual(nodeType,'XPONDER')
908 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),2)
909 # self.assertEqual({'tp-id': 'XPDR1-CLIENT1', 'org-openroadm-network-topology:tp-type': 'XPONDER-CLIENT',
910 # 'org-openroadm-network-topology:xpdr-network-attributes': {
911 # 'tail-equipment-id': 'XPDR1-NETWORK1'}},
912 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][0])
913 # self.assertEqual({'tp-id': 'XPDR1-NETWORK1', 'org-openroadm-network-topology:tp-type': 'XPONDER-NETWORK',
914 # 'org-openroadm-network-topology:xpdr-client-attributes': {'tail-equipment-id': 'XPDR1-CLIENT1'}},
915 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'][1])
916 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA'},
917 # res['network'][0]['node'][i]['supporting-node'])
918 # listNode.remove(nodeId)
919 # elif(nodeId=='ROADMA-SRG1'):
920 # #Test related to SRG1
921 # self.assertEqual(nodeType,'SRG')
922 # self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
923 # self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
924 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
925 # self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
926 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
927 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
928 # res['network'][0]['node'][i]['supporting-node'])
929 # listNode.remove(nodeId)
930 # elif(nodeId=='ROADMA-DEG1'):
931 # #Test related to DEG1
932 # self.assertEqual(nodeType,'DEGREE')
933 # self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
934 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
935 # self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
936 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
937 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
938 # res['network'][0]['node'][i]['supporting-node'])
939 # listNode.remove(nodeId)
940 # elif(nodeId=='ROADMA-DEG2'):
941 # #Test related to DEG2
942 # self.assertEqual(nodeType,'DEGREE')
943 # self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
944 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
945 # self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
946 # res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
947 # self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
948 # res['network'][0]['node'][i]['supporting-node'])
949 # listNode.remove(nodeId)
951 # self.assertFalse(True)
952 # self.assertEqual(len(listNode),0)
953 # #Test related to SRG1 of ROADMC
954 # for i in range(0,nbNode):
955 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-SRG1')
956 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG1')
957 # self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC-DEG2')
959 def test_26_getOpenRoadmNetwork(self):
960 url = ("{}/config/ietf-network:network/openroadm-network"
961 .format(self.restconf_baseurl))
962 headers = {'content-type': 'application/json'}
963 response = requests.request(
964 "GET", url, headers=headers, auth=('admin', 'admin'))
965 self.assertEqual(response.status_code, requests.codes.ok)
966 res = response.json()
967 nbNode=len(res['network'][0]['node'])
968 self.assertEqual(nbNode,2)
969 for i in range(0,nbNode-1):
970 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADMC')
972 def test_27_getClliNetwork(self):
973 url = ("{}/config/ietf-network:network/clli-network"
974 .format(self.restconf_baseurl))
975 headers = {'content-type': 'application/json'}
976 response = requests.request(
977 "GET", url, headers=headers, auth=('admin', 'admin'))
978 self.assertEqual(response.status_code, requests.codes.ok)
979 res = response.json()
980 nbNode=len(res['network'][0]['node'])
981 self.assertEqual(nbNode,1)
982 for i in range(0,nbNode-1):
983 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
985 def test_28_disconnect_XPDRA(self):
986 url = ("{}/config/network-topology:"
987 "network-topology/topology/topology-netconf/node/XPDRA"
988 .format(self.restconf_baseurl))
990 headers = {'content-type': 'application/json'}
991 response = requests.request(
992 "DELETE", url, data=json.dumps(data), headers=headers,
993 auth=('admin', 'admin'))
994 self.assertEqual(response.status_code, requests.codes.ok)
995 #Delete in the openroadm-network
996 url = ("{}/config/ietf-network:network/openroadm-network/node/XPDRA"
997 .format(self.restconf_baseurl))
999 headers = {'content-type': 'application/json'}
1000 response = requests.request(
1001 "DELETE", url, data=json.dumps(data), headers=headers,
1002 auth=('admin', 'admin'))
1003 self.assertEqual(response.status_code, requests.codes.ok)
1005 def test_29_getClliNetwork(self):
1006 url = ("{}/config/ietf-network:network/clli-network"
1007 .format(self.restconf_baseurl))
1008 headers = {'content-type': 'application/json'}
1009 response = requests.request(
1010 "GET", url, headers=headers, auth=('admin', 'admin'))
1011 self.assertEqual(response.status_code, requests.codes.ok)
1012 res = response.json()
1013 nbNode=len(res['network'][0]['node'])
1014 self.assertEqual(nbNode,1)
1015 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1017 def test_30_getOpenRoadmNetwork(self):
1018 url = ("{}/config/ietf-network:network/openroadm-network"
1019 .format(self.restconf_baseurl))
1020 headers = {'content-type': 'application/json'}
1021 response = requests.request(
1022 "GET", url, headers=headers, auth=('admin', 'admin'))
1023 self.assertEqual(response.status_code, requests.codes.ok)
1024 res = response.json()
1025 nbNode=len(res['network'][0]['node'])
1026 self.assertEqual(nbNode,1)
1027 for i in range(0,nbNode):
1028 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDRA')
1030 def test_31_getNodes_OpenRoadmTopology(self):
1031 url = ("{}/config/ietf-network:network/openroadm-topology"
1032 .format(self.restconf_baseurl))
1033 headers = {'content-type': 'application/json'}
1034 response = requests.request(
1035 "GET", url, headers=headers, auth=('admin', 'admin'))
1036 res = response.json()
1037 #Tests related to nodes
1038 self.assertEqual(response.status_code, requests.codes.ok)
1039 nbNode=len(res['network'][0]['node'])
1040 self.assertEqual(nbNode,3)
1041 listNode=['ROADMA-SRG1','ROADMA-DEG1','ROADMA-DEG2']
1042 for i in range(0,nbNode):
1043 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA'},
1044 res['network'][0]['node'][i]['supporting-node'])
1045 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
1046 nodeId=res['network'][0]['node'][i]['node-id']
1047 if(nodeId=='ROADMA-SRG1'):
1048 #Test related to SRG1
1049 self.assertEqual(nodeType,'SRG')
1050 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),17)
1051 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
1052 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1053 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
1054 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1055 listNode.remove(nodeId)
1056 elif(nodeId=='ROADMA-DEG1'):
1057 #Test related to DEG1
1058 self.assertEqual(nodeType,'DEGREE')
1059 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1060 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1061 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1062 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1063 listNode.remove(nodeId)
1064 elif(nodeId=='ROADMA-DEG2'):
1065 #Test related to DEG2
1066 self.assertEqual(nodeType,'DEGREE')
1067 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1068 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1069 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1070 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1071 listNode.remove(nodeId)
1073 self.assertFalse(True)
1074 self.assertEqual(len(listNode),0)
1076 def test_32_disconnect_ROADM_XPDRA_link(self):
1078 url = ("{}/config/ietf-network:network/openroadm-topology/ietf-network-topology:"
1079 "link/XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX"
1080 .format(self.restconf_baseurl))
1082 headers = {'content-type': 'application/json'}
1083 response = requests.request(
1084 "DELETE", url, data=json.dumps(data), headers=headers,
1085 auth=('admin', 'admin'))
1086 self.assertEqual(response.status_code, requests.codes.ok)
1088 url = ("{}/config/ietf-network:network/openroadm-topology/ietf-network-topology:"
1089 "link/ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1"
1090 .format(self.restconf_baseurl))
1092 headers = {'content-type': 'application/json'}
1093 response = requests.request(
1094 "DELETE", url, data=json.dumps(data), headers=headers,
1095 auth=('admin', 'admin'))
1096 self.assertEqual(response.status_code, requests.codes.ok)
1098 # def test_33_getLinks_OpenRoadmTopology(self):
1099 # url = ("{}/config/ietf-network:network/openroadm-topology"
1100 # .format(self.restconf_baseurl))
1101 # headers = {'content-type': 'application/json'}
1102 # response = requests.request(
1103 # "GET", url, headers=headers, auth=('admin', 'admin'))
1104 # self.assertEqual(response.status_code, requests.codes.ok)
1105 # res = response.json()
1106 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1107 # self.assertEqual(nbLink,6)
1108 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1109 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX']
1110 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1111 # for i in range(0,nbLink):
1112 # if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
1113 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1114 # find= link_id in expressLink
1115 # self.assertEqual(find, True)
1116 # expressLink.remove(link_id)
1117 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
1118 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1119 # find= link_id in addLink
1120 # self.assertEqual(find, True)
1121 # addLink.remove(link_id)
1122 # elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
1123 # link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1124 # find= link_id in dropLink
1125 # self.assertEqual(find, True)
1126 # dropLink.remove(link_id)
1128 # self.assertFalse(True)
1129 # self.assertEqual(len(expressLink),0)
1130 # self.assertEqual(len(addLink),0)
1131 # self.assertEqual(len(dropLink),0)
1132 # for i in range(0,nbLink):
1133 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-OUTPUT')
1134 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-INPUT')
1136 def test_34_disconnect_ROADMA(self):
1137 url = ("{}/config/network-topology:"
1138 "network-topology/topology/topology-netconf/node/ROADMA"
1139 .format(self.restconf_baseurl))
1141 headers = {'content-type': 'application/json'}
1142 response = requests.request(
1143 "DELETE", url, data=json.dumps(data), headers=headers,
1144 auth=('admin', 'admin'))
1145 self.assertEqual(response.status_code, requests.codes.ok)
1146 #Delete in the clli-network
1147 url = ("{}/config/ietf-network:network/clli-network/node/NodeA"
1148 .format(self.restconf_baseurl))
1150 headers = {'content-type': 'application/json'}
1151 response = requests.request(
1152 "DELETE", url, data=json.dumps(data), headers=headers,
1153 auth=('admin', 'admin'))
1154 self.assertEqual(response.status_code, requests.codes.ok)
1155 #Delete in the openroadm-network
1156 url = ("{}/config/ietf-network:network/openroadm-network/node/ROADMA"
1157 .format(self.restconf_baseurl))
1159 headers = {'content-type': 'application/json'}
1160 response = requests.request(
1161 "DELETE", url, data=json.dumps(data), headers=headers,
1162 auth=('admin', 'admin'))
1163 self.assertEqual(response.status_code, requests.codes.ok)
1166 def test_35_getClliNetwork(self):
1167 url = ("{}/config/ietf-network:network/clli-network"
1168 .format(self.restconf_baseurl))
1169 headers = {'content-type': 'application/json'}
1170 response = requests.request(
1171 "GET", url, headers=headers, auth=('admin', 'admin'))
1172 self.assertEqual(response.status_code, requests.codes.ok)
1173 res = response.json()
1174 self.assertNotIn('node', res['network'][0])
1176 def test_36_getOpenRoadmNetwork(self):
1177 url = ("{}/config/ietf-network:network/openroadm-network"
1178 .format(self.restconf_baseurl))
1179 headers = {'content-type': 'application/json'}
1180 response = requests.request(
1181 "GET", url, headers=headers, auth=('admin', 'admin'))
1182 self.assertEqual(response.status_code, requests.codes.ok)
1183 res = response.json()
1184 self.assertNotIn('node', res['network'][0])
1186 # def test_37_getOpenRoadmTopology(self):
1187 # url = ("{}/config/ietf-network:network/openroadm-topology"
1188 # .format(self.restconf_baseurl))
1189 # headers = {'content-type': 'application/json'}
1190 # response = requests.request(
1191 # "GET", url, headers=headers, auth=('admin', 'admin'))
1192 # self.assertEqual(response.status_code, requests.codes.ok)
1193 # res = response.json()
1194 # self.assertNotIn('node', res['network'][0])
1195 # self.assertNotIn('ietf-network-topology:link', res['network'][0])
1197 if __name__ == "__main__":
1198 #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
1199 #logging.debug('I am there')
1200 unittest.main(verbosity=2)