3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
23 class TransportPCEtesting(unittest.TestCase):
25 honeynode_process1 = None
26 honeynode_process2 = None
27 honeynode_process3 = None
28 honeynode_process4 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
32 #START_IGNORE_XTESTING
35 def __start_honeynode1(cls):
36 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
37 "/honeynode-distribution-1.18.01/honeycomb-tpce")
38 if os.path.isfile(executable):
39 with open('honeynode1.log', 'w') as outfile:
40 cls.honeynode_process1 = subprocess.Popen(
41 [executable, "17840", "sample_configs/openroadm/2.2.1/oper-XPDRA.xml"],
45 def __start_honeynode2(cls):
46 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
47 "/honeynode-distribution-1.18.01/honeycomb-tpce")
48 if os.path.isfile(executable):
49 with open('honeynode2.log', 'w') as outfile:
50 cls.honeynode_process2 = subprocess.Popen(
51 [executable, "17841", "sample_configs/openroadm/2.2.1/oper-ROADMA.xml"],
55 def __start_honeynode3(cls):
56 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
57 "/honeynode-distribution-1.18.01/honeycomb-tpce")
58 if os.path.isfile(executable):
59 with open('honeynode1.log', 'w') as outfile:
60 cls.honeynode_process3 = subprocess.Popen(
61 [executable, "17842", "sample_configs/openroadm/2.2.1/oper-ROADMB.xml"],
65 def __start_honeynode4(cls):
66 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
67 "/honeynode-distribution-1.18.01/honeycomb-tpce")
68 if os.path.isfile(executable):
69 with open('honeynode2.log', 'w') as outfile:
70 cls.honeynode_process4 = subprocess.Popen(
71 [executable, "17843", "sample_configs/openroadm/2.2.1/oper-ROADMC.xml"],
76 executable = "../karaf/target/assembly/bin/karaf"
77 with open('odl.log', 'w') as outfile:
78 cls.odl_process = subprocess.Popen(
79 ["bash", executable, "server"], stdout=outfile,
80 stdin=open(os.devnull))
84 cls.__start_honeynode1()
86 cls.__start_honeynode2()
88 cls.__start_honeynode3()
90 cls.__start_honeynode4()
96 def tearDownClass(cls):
97 for child in psutil.Process(cls.odl_process.pid).children():
98 child.send_signal(signal.SIGINT)
100 cls.odl_process.send_signal(signal.SIGINT)
101 cls.odl_process.wait()
102 for child in psutil.Process(cls.honeynode_process1.pid).children():
103 child.send_signal(signal.SIGINT)
105 cls.honeynode_process1.send_signal(signal.SIGINT)
106 cls.honeynode_process1.wait()
107 for child in psutil.Process(cls.honeynode_process2.pid).children():
108 child.send_signal(signal.SIGINT)
110 cls.honeynode_process2.send_signal(signal.SIGINT)
111 cls.honeynode_process2.wait()
112 for child in psutil.Process(cls.honeynode_process3.pid).children():
113 child.send_signal(signal.SIGINT)
115 cls.honeynode_process3.send_signal(signal.SIGINT)
116 cls.honeynode_process3.wait()
117 for child in psutil.Process(cls.honeynode_process4.pid).children():
118 child.send_signal(signal.SIGINT)
120 cls.honeynode_process4.send_signal(signal.SIGINT)
121 cls.honeynode_process4.wait()
129 def test_01_connect_ROADM_A1(self):
131 url = ("{}/config/network-topology:"
132 "network-topology/topology/topology-netconf/node/ROADM-A1"
133 .format(self.restconf_baseurl))
135 "node-id": "ROADM-A1",
136 "netconf-node-topology:username": "admin",
137 "netconf-node-topology:password": "admin",
138 "netconf-node-topology:host": "127.0.0.1",
139 "netconf-node-topology:port": "17841",
140 "netconf-node-topology:tcp-only": "false",
141 "netconf-node-topology:pass-through": {}}]}
142 headers = {'content-type': 'application/json'}
143 response = requests.request(
144 "PUT", url, data=json.dumps(data), headers=headers,
145 auth=('admin', 'admin'))
146 self.assertEqual(response.status_code, requests.codes.created)
149 def test_02_getClliNetwork(self):
150 url = ("{}/config/ietf-network:networks/network/clli-network"
151 .format(self.restconf_baseurl))
152 headers = {'content-type': 'application/json'}
153 response = requests.request(
154 "GET", url, headers=headers, auth=('admin', 'admin'))
155 self.assertEqual(response.status_code, requests.codes.ok)
156 res = response.json()
158 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
159 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
161 def test_03_getOpenRoadmNetwork(self):
162 url = ("{}/config/ietf-network:networks/network/openroadm-network"
163 .format(self.restconf_baseurl))
164 headers = {'content-type': 'application/json'}
165 response = requests.request(
166 "GET", url, headers=headers, auth=('admin', 'admin'))
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADM-A1')
170 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
171 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
172 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:node-type'],'ROADM')
173 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'')
175 def test_04_getLinks_OpenroadmTopology(self):
176 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
177 .format(self.restconf_baseurl))
178 headers = {'content-type': 'application/json'}
179 response = requests.request(
180 "GET", url, headers=headers, auth=('admin', 'admin'))
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
183 #Tests related to links
184 nbLink=len(res['network'][0]['ietf-network-topology:link'])
185 self.assertEqual(nbLink,6)
186 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
187 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
188 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
189 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
190 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
191 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX']
192 for i in range(0,nbLink):
193 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
194 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
195 find= linkId in expressLink
196 self.assertEqual(find, True)
197 expressLink.remove(linkId)
198 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
199 find= linkId in addLink
200 self.assertEqual(find, True)
201 addLink.remove(linkId)
202 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
203 find= linkId in dropLink
204 self.assertEqual(find, True)
205 dropLink.remove(linkId)
207 self.assertFalse(True)
208 self.assertEqual(len(expressLink),0)
209 self.assertEqual(len(addLink),0)
210 self.assertEqual(len(dropLink),0)
212 def test_05_getNodes_OpenRoadmTopology(self):
213 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
214 .format(self.restconf_baseurl))
215 headers = {'content-type': 'application/json'}
216 response = requests.request(
217 "GET", url, headers=headers, auth=('admin', 'admin'))
218 res = response.json()
219 #Tests related to nodes
220 self.assertEqual(response.status_code, requests.codes.ok)
221 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
222 outfile1.write(str(len(res['network'][0]['node'])))
223 nbNode=len(res['network'][0]['node'])
224 self.assertEqual(nbNode,3)
225 listNode=['ROADM-A1-SRG1','ROADM-A1-DEG1','ROADM-A1-DEG2']
226 for i in range(0,nbNode):
227 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
228 res['network'][0]['node'][i]['supporting-node'])
229 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
230 nodeId=res['network'][0]['node'][i]['node-id']
231 if(nodeId=='ROADM-A1-SRG1'):
232 #Test related to SRG1
233 self.assertEqual(nodeType,'SRG')
234 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
235 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
236 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
237 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
238 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
239 listNode.remove(nodeId)
240 elif(nodeId=='ROADM-A1-DEG1'):
241 #Test related to DEG1
242 self.assertEqual(nodeType,'DEGREE')
243 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
244 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
245 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
246 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
247 listNode.remove(nodeId)
248 elif(nodeId=='ROADM-A1-DEG2'):
249 #Test related to DEG2
250 self.assertEqual(nodeType,'DEGREE')
251 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
252 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
253 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
254 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
255 listNode.remove(nodeId)
257 self.assertFalse(True)
258 self.assertEqual(len(listNode),0)
260 def test_06_connect_XPDRA(self):
261 url = ("{}/config/network-topology:"
262 "network-topology/topology/topology-netconf/node/XPDR-A1"
263 .format(self.restconf_baseurl))
265 "node-id": "XPDR-A1",
266 "netconf-node-topology:username": "admin",
267 "netconf-node-topology:password": "admin",
268 "netconf-node-topology:host": "127.0.0.1",
269 "netconf-node-topology:port": "17840",
270 "netconf-node-topology:tcp-only": "false",
271 "netconf-node-topology:pass-through": {}}]}
272 headers = {'content-type': 'application/json'}
273 response = requests.request(
274 "PUT", url, data=json.dumps(data), headers=headers,
275 auth=('admin', 'admin'))
276 self.assertEqual(response.status_code, requests.codes.created)
279 def test_07_getClliNetwork(self):
280 url = ("{}/config/ietf-network:networks/network/clli-network"
281 .format(self.restconf_baseurl))
282 headers = {'content-type': 'application/json'}
283 response = requests.request(
284 "GET", url, headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
288 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
290 def test_08_getOpenRoadmNetwork(self):
291 url = ("{}/config/ietf-network:networks/network/openroadm-network"
292 .format(self.restconf_baseurl))
293 headers = {'content-type': 'application/json'}
294 response = requests.request(
295 "GET", url, headers=headers, auth=('admin', 'admin'))
296 self.assertEqual(response.status_code, requests.codes.ok)
297 res = response.json()
298 nbNode=len(res['network'][0]['node'])
299 self.assertEqual(nbNode,2)
300 for i in range(0,nbNode):
301 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
302 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
303 nodeId=res['network'][0]['node'][i]['node-id']
304 if(nodeId=='XPDR-A1'):
305 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
306 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'')
307 elif(nodeId=='ROADM-A1'):
308 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
309 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'')
311 self.assertFalse(True)
313 def test_09_getNodes_OpenRoadmTopology(self):
314 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
315 .format(self.restconf_baseurl))
316 headers = {'content-type': 'application/json'}
317 response = requests.request(
318 "GET", url, headers=headers, auth=('admin', 'admin'))
319 res = response.json()
320 #Tests related to nodes
321 self.assertEqual(response.status_code, requests.codes.ok)
322 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
323 outfile1.write(str(len(res['network'][0]['node'])))
324 nbNode=len(res['network'][0]['node'])
325 self.assertEqual(nbNode,4)
326 listNode=['XPDR-A1-XPDR1','ROADM-A1-SRG1','ROADM-A1-DEG1','ROADM-A1-DEG2']
327 for i in range(0,nbNode):
328 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
329 nodeId=res['network'][0]['node'][i]['node-id']
330 #Tests related to XPDRA nodes
331 if(nodeId=='XPDR-A1-XPDR1'):
332 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
333 res['network'][0]['node'][i]['supporting-node'])
334 self.assertEqual(nodeType,'XPONDER')
335 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
336 self.assertTrue(nbTps >= 4)
339 for j in range(0,nbTps):
340 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
341 if (tpType=='XPONDER-CLIENT'):
343 elif (tpType=='XPONDER-NETWORK'):
345 self.assertTrue(client >= 2)
346 self.assertTrue(network >= 2)
347 listNode.remove(nodeId)
348 elif(nodeId=='ROADM-A1-SRG1'):
349 #Test related to SRG1
350 self.assertEqual(nodeType,'SRG')
351 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
352 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
353 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
354 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
355 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
356 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
357 res['network'][0]['node'][i]['supporting-node'])
358 listNode.remove(nodeId)
359 elif(nodeId=='ROADM-A1-DEG1'):
360 #Test related to DEG1
361 self.assertEqual(nodeType,'DEGREE')
362 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
363 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
364 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
365 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
366 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
367 res['network'][0]['node'][i]['supporting-node'])
368 listNode.remove(nodeId)
369 elif(nodeId=='ROADM-A1-DEG2'):
370 #Test related to DEG2
371 self.assertEqual(nodeType,'DEGREE')
372 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
373 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
374 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
375 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
376 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
377 res['network'][0]['node'][i]['supporting-node'])
378 listNode.remove(nodeId)
380 self.assertFalse(True)
381 self.assertEqual(len(listNode),0)
383 #Connect the tail XPDRA to ROADMA and vice versa
384 def test_10_connect_tail_xpdr_rdm(self):
385 #Connect the tail: XPDRA to ROADMA
386 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
387 .format(self.restconf_baseurl))
388 data = {"networkutils:input": {
389 "networkutils:links-input": {
390 "networkutils:xpdr-node": "XPDR-A1",
391 "networkutils:xpdr-num": "1",
392 "networkutils:network-num": "1",
393 "networkutils:rdm-node": "ROADM-A1",
394 "networkutils:srg-num": "1",
395 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
399 headers = {'content-type': 'application/json'}
400 response = requests.request(
401 "POST", url, data=json.dumps(data), headers=headers,
402 auth=('admin', 'admin'))
403 self.assertEqual(response.status_code, requests.codes.ok)
406 def test_11_connect_tail_rdm_xpdr(self):
407 #Connect the tail: ROADMA to XPDRA
408 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
409 .format(self.restconf_baseurl))
410 data = {"networkutils:input": {
411 "networkutils:links-input": {
412 "networkutils:xpdr-node": "XPDR-A1",
413 "networkutils:xpdr-num": "1",
414 "networkutils:network-num": "1",
415 "networkutils:rdm-node": "ROADM-A1",
416 "networkutils:srg-num": "1",
417 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
421 headers = {'content-type': 'application/json'}
422 response = requests.request(
423 "POST", url, data=json.dumps(data), headers=headers,
424 auth=('admin', 'admin'))
425 self.assertEqual(response.status_code, requests.codes.ok)
428 def test_12_getLinks_OpenRoadmTopology(self):
429 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
430 .format(self.restconf_baseurl))
431 headers = {'content-type': 'application/json'}
432 response = requests.request(
433 "GET", url, headers=headers, auth=('admin', 'admin'))
434 self.assertEqual(response.status_code, requests.codes.ok)
435 res = response.json()
436 #Tests related to links
437 nbLink=len(res['network'][0]['ietf-network-topology:link'])
438 self.assertEqual(nbLink,8)
439 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
440 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',]
441 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX']
442 XPDR_IN=['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
443 XPDR_OUT=['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
444 for i in range(0,nbLink):
445 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
446 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
447 if(nodeType=='EXPRESS-LINK'):
448 find= linkId in expressLink
449 self.assertEqual(find, True)
450 expressLink.remove(linkId)
451 elif(nodeType=='ADD-LINK'):
452 find= linkId in addLink
453 self.assertEqual(find, True)
454 addLink.remove(linkId)
455 elif(nodeType=='DROP-LINK'):
456 find= linkId in dropLink
457 self.assertEqual(find, True)
458 dropLink.remove(linkId)
459 elif(nodeType=='XPONDER-INPUT'):
460 find= linkId in XPDR_IN
461 self.assertEqual(find, True)
462 XPDR_IN.remove(linkId)
463 elif(nodeType=='XPONDER-OUTPUT'):
464 find= linkId in XPDR_OUT
465 self.assertEqual(find, True)
466 XPDR_OUT.remove(linkId)
468 self.assertFalse(True)
469 self.assertEqual(len(expressLink),0)
470 self.assertEqual(len(addLink),0)
471 self.assertEqual(len(dropLink),0)
472 self.assertEqual(len(XPDR_IN),0)
473 self.assertEqual(len(XPDR_OUT),0)
475 def test_13_connect_ROADMC(self):
477 url = ("{}/config/network-topology:"
478 "network-topology/topology/topology-netconf/node/ROADM-C1"
479 .format(self.restconf_baseurl))
481 "node-id": "ROADM-C1",
482 "netconf-node-topology:username": "admin",
483 "netconf-node-topology:password": "admin",
484 "netconf-node-topology:host": "127.0.0.1",
485 "netconf-node-topology:port": "17843",
486 "netconf-node-topology:tcp-only": "false",
487 "netconf-node-topology:pass-through": {}}]}
488 headers = {'content-type': 'application/json'}
489 response = requests.request(
490 "PUT", url, data=json.dumps(data), headers=headers,
491 auth=('admin', 'admin'))
492 self.assertEqual(response.status_code, requests.codes.created)
495 def test_14_getClliNetwork(self):
496 url = ("{}/config/ietf-network:networks/network/clli-network"
497 .format(self.restconf_baseurl))
498 headers = {'content-type': 'application/json'}
499 response = requests.request(
500 "GET", url, headers=headers, auth=('admin', 'admin'))
501 self.assertEqual(response.status_code, requests.codes.ok)
502 res = response.json()
503 nbNode=len(res['network'][0]['node'])
504 listNode=['NodeA','NodeC']
505 for i in range(0,nbNode):
506 nodeId = res['network'][0]['node'][i]['node-id']
507 find= nodeId in listNode
508 self.assertEqual(find, True)
510 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
512 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
513 listNode.remove(nodeId)
515 self.assertEqual(len(listNode),0)
517 def test_15_getOpenRoadmNetwork(self):
518 url = ("{}/config/ietf-network:networks/network/openroadm-network"
519 .format(self.restconf_baseurl))
520 headers = {'content-type': 'application/json'}
521 response = requests.request(
522 "GET", url, headers=headers, auth=('admin', 'admin'))
523 self.assertEqual(response.status_code, requests.codes.ok)
524 res = response.json()
525 nbNode=len(res['network'][0]['node'])
526 self.assertEqual(nbNode,3)
527 listNode=['XPDR-A1','ROADM-A1','ROADM-C1']
528 for i in range(0,nbNode):
529 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
530 nodeId=res['network'][0]['node'][i]['node-id']
531 if(nodeId=='XPDR-A1'):
532 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
533 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'XPONDER')
534 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'')
535 listNode.remove(nodeId)
536 elif(nodeId=='ROADM-A1'):
537 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
538 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
539 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'')
540 listNode.remove(nodeId)
541 elif(nodeId=='ROADM-C1'):
542 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
543 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:node-type'],'ROADM')
544 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'')
545 listNode.remove(nodeId)
547 self.assertFalse(True)
548 self.assertEqual(len(listNode),0)
550 def test_16_getROADMLinkOpenRoadmTopology(self):
551 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
552 .format(self.restconf_baseurl))
553 headers = {'content-type': 'application/json'}
554 response = requests.request(
555 "GET", url, headers=headers, auth=('admin', 'admin'))
556 self.assertEqual(response.status_code, requests.codes.ok)
557 res = response.json()
558 #Tests related to links
559 nbLink=len(res['network'][0]['ietf-network-topology:link'])
560 self.assertEqual(nbLink,16)
561 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
562 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX','ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
563 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
564 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX','ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
565 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
566 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX','ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
567 R2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
568 XPDR_IN=['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
569 XPDR_OUT=['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
570 for i in range(0,nbLink):
571 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
572 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
573 if(nodeType=='EXPRESS-LINK'):
574 find= linkId in expressLink
575 self.assertEqual(find, True)
576 expressLink.remove(linkId)
577 elif(nodeType=='ADD-LINK'):
578 find= linkId in addLink
579 self.assertEqual(find, True)
580 addLink.remove(linkId)
581 elif(nodeType=='DROP-LINK'):
582 find= linkId in dropLink
583 self.assertEqual(find, True)
584 dropLink.remove(linkId)
585 elif(nodeType=='ROADM-TO-ROADM'):
586 find= linkId in R2RLink
587 self.assertEqual(find, True)
588 R2RLink.remove(linkId)
589 elif(nodeType=='XPONDER-INPUT'):
590 find= linkId in XPDR_IN
591 self.assertEqual(find, True)
592 XPDR_IN.remove(linkId)
593 elif(nodeType=='XPONDER-OUTPUT'):
594 find= linkId in XPDR_OUT
595 self.assertEqual(find, True)
596 XPDR_OUT.remove(linkId)
598 self.assertFalse(True)
599 self.assertEqual(len(expressLink),0)
600 self.assertEqual(len(addLink),0)
601 self.assertEqual(len(dropLink),0)
602 self.assertEqual(len(R2RLink),0)
603 self.assertEqual(len(XPDR_IN),0)
604 self.assertEqual(len(XPDR_OUT),0)
606 def test_17_getNodes_OpenRoadmTopology(self):
607 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
608 .format(self.restconf_baseurl))
609 headers = {'content-type': 'application/json'}
610 response = requests.request(
611 "GET", url, headers=headers, auth=('admin', 'admin'))
612 res = response.json()
613 #Tests related to nodes
614 self.assertEqual(response.status_code, requests.codes.ok)
615 nbNode=len(res['network'][0]['node'])
616 self.assertEqual(nbNode,7)
617 listNode=['XPDR-A1-XPDR1',
618 'ROADM-A1-SRG1','ROADM-A1-DEG1','ROADM-A1-DEG2',
619 'ROADM-C1-SRG1','ROADM-C1-DEG1', 'ROADM-C1-DEG2']
620 #************************Tests related to XPDRA nodes
621 for i in range(0,nbNode):
622 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
623 nodeId=res['network'][0]['node'][i]['node-id']
624 if(nodeId=='XPDR-A1-XPDR1'):
625 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
626 res['network'][0]['node'][i]['supporting-node'])
627 self.assertEqual(nodeType,'XPONDER')
628 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
629 self.assertTrue(nbTps >= 4)
632 for j in range(0,nbTps):
633 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-network-topology:tp-type']
634 if (tpType=='XPONDER-CLIENT'):
636 elif (tpType=='XPONDER-NETWORK'):
638 self.assertTrue(client > 2)
639 self.assertTrue(network > 2)
640 listNode.remove(nodeId)
641 elif(nodeId=='ROADM-A1-SRG1'):
642 #Test related to SRG1
643 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
644 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
645 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
646 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
647 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
648 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
649 res['network'][0]['node'][i]['supporting-node'])
650 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
651 listNode.remove(nodeId)
652 elif(nodeId=='ROADM-A1-DEG1'):
653 #Test related to DEG1
654 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
655 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
656 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
657 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
658 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
659 res['network'][0]['node'][i]['supporting-node'])
660 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
661 listNode.remove(nodeId)
662 elif(nodeId=='ROADM-A1-DEG2'):
663 #Test related to DEG2
664 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
665 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
666 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
667 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
668 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
669 res['network'][0]['node'][i]['supporting-node'])
670 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
671 listNode.remove(nodeId)
672 elif(nodeId=='ROADM-C1-SRG1'):
673 #Test related to SRG1
674 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
675 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
676 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
677 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
678 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
679 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
680 res['network'][0]['node'][i]['supporting-node'])
681 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'SRG')
682 listNode.remove(nodeId)
683 elif(nodeId=='ROADM-C1-DEG1'):
684 #Test related to DEG1
685 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
686 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
687 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
688 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
689 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
690 res['network'][0]['node'][i]['supporting-node'])
691 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
692 listNode.remove(nodeId)
693 elif(nodeId=='ROADM-C1-DEG2'):
694 #Test related to DEG1
695 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
696 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
697 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
698 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
699 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
700 res['network'][0]['node'][i]['supporting-node'])
701 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network-topology:node-type'],'DEGREE')
702 listNode.remove(nodeId)
704 self.assertFalse(True)
705 self.assertEqual(len(listNode),0)
707 def test_18_connect_ROADMB(self):
708 url = ("{}/config/network-topology:"
709 "network-topology/topology/topology-netconf/node/ROADM-B1"
710 .format(self.restconf_baseurl))
712 "node-id": "ROADM-B1",
713 "netconf-node-topology:username": "admin",
714 "netconf-node-topology:password": "admin",
715 "netconf-node-topology:host": "127.0.0.1",
716 "netconf-node-topology:port": "17842",
717 "netconf-node-topology:tcp-only": "false",
718 "netconf-node-topology:pass-through": {}}]}
719 headers = {'content-type': 'application/json'}
720 response = requests.request(
721 "PUT", url, data=json.dumps(data), headers=headers,
722 auth=('admin', 'admin'))
723 self.assertEqual(response.status_code, requests.codes.created)
726 def test_19_getClliNetwork(self):
727 url = ("{}/config/ietf-network:networks/network/clli-network"
728 .format(self.restconf_baseurl))
729 headers = {'content-type': 'application/json'}
730 response = requests.request(
731 "GET", url, headers=headers, auth=('admin', 'admin'))
732 self.assertEqual(response.status_code, requests.codes.ok)
733 res = response.json()
734 nbNode=len(res['network'][0]['node'])
735 listNode=['NodeA','NodeB','NodeC']
736 for i in range(0,nbNode):
737 nodeId = res['network'][0]['node'][i]['node-id']
738 find= nodeId in listNode
739 self.assertEqual(find, True)
741 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
742 elif(nodeId=='NodeB'):
743 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
745 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
746 listNode.remove(nodeId)
748 self.assertEqual(len(listNode),0)
750 def test_20_verifyDegree(self):
751 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
752 .format(self.restconf_baseurl))
753 headers = {'content-type': 'application/json'}
754 response = requests.request(
755 "GET", url, headers=headers, auth=('admin', 'admin'))
756 self.assertEqual(response.status_code, requests.codes.ok)
757 res = response.json()
758 #Tests related to links
759 nbLink=len(res['network'][0]['ietf-network-topology:link'])
760 listR2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
761 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
762 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX','ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
763 for i in range(0,nbLink):
764 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'] == 'ROADM-TO-ROADM':
765 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
766 find= link_id in listR2RLink
767 self.assertEqual(find, True)
768 listR2RLink.remove(link_id)
769 self.assertEqual(len(listR2RLink),0)
771 def test_21_verifyOppositeLinkTopology(self):
772 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
773 .format(self.restconf_baseurl))
774 headers = {'content-type': 'application/json'}
775 response = requests.request(
776 "GET", url, headers=headers, auth=('admin', 'admin'))
777 self.assertEqual(response.status_code, requests.codes.ok)
778 res = response.json()
779 #Write the response in the log
780 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
781 outfile1.write(str(res))
782 #Tests related to links
783 nbLink=len(res['network'][0]['ietf-network-topology:link'])
784 self.assertEqual(nbLink,22)
785 for i in range(0,nbLink):
786 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
787 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
788 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
789 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
790 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
791 #Find the opposite link
792 url_oppLink="{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
793 url = (url_oppLink.format(self.restconf_baseurl))
794 headers = {'content-type': 'application/json'}
795 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
796 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
797 res_oppLink = response_oppLink.json()
798 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:opposite-link'],link_id)
799 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
800 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
801 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-network-topology:link-type']
802 if link_type=='ADD-LINK':
803 self.assertEqual(oppLink_type, 'DROP-LINK')
804 elif link_type=='DROP-LINK':
805 self.assertEqual(oppLink_type, 'ADD-LINK')
806 elif link_type=='EXPRESS-LINK':
807 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
808 elif link_type=='ROADM-TO-ROADM':
809 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
810 elif link_type=='XPONDER-INPUT':
811 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
812 elif link_type=='XPONDER-OUTPUT':
813 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
816 def test_22_disconnect_ROADMB(self):
817 #Delete in the topology-netconf
818 url = ("{}/config/network-topology:"
819 "network-topology/topology/topology-netconf/node/ROADM-B1"
820 .format(self.restconf_baseurl))
822 headers = {'content-type': 'application/json'}
823 response = requests.request(
824 "DELETE", url, data=json.dumps(data), headers=headers,
825 auth=('admin', 'admin'))
826 self.assertEqual(response.status_code, requests.codes.ok)
827 #Delete in the clli-network
828 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
829 .format(self.restconf_baseurl))
831 headers = {'content-type': 'application/json'}
832 response = requests.request(
833 "DELETE", url, data=json.dumps(data), headers=headers,
834 auth=('admin', 'admin'))
835 self.assertEqual(response.status_code, requests.codes.ok)
837 def test_23_disconnect_ROADMC(self):
838 #Delete in the topology-netconf
839 url = ("{}/config/network-topology:"
840 "network-topology/topology/topology-netconf/node/ROADM-C1"
841 .format(self.restconf_baseurl))
843 headers = {'content-type': 'application/json'}
844 response = requests.request(
845 "DELETE", url, data=json.dumps(data), headers=headers,
846 auth=('admin', 'admin'))
847 self.assertEqual(response.status_code, requests.codes.ok)
848 #Delete in the clli-network
849 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
850 .format(self.restconf_baseurl))
852 headers = {'content-type': 'application/json'}
853 response = requests.request(
854 "DELETE", url, data=json.dumps(data), headers=headers,
855 auth=('admin', 'admin'))
856 self.assertEqual(response.status_code, requests.codes.ok)
858 # def test_24_check_roadm2roadm_links_deletion(self):
859 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
860 # .format(self.restconf_baseurl))
861 # headers = {'content-type': 'application/json'}
862 # response = requests.request(
863 # "GET", url, headers=headers, auth=('admin', 'admin'))
864 # self.assertEqual(response.status_code, requests.codes.ok)
865 # res = response.json()
866 # #Write the response in the log
867 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
868 # outfile1.write(str(res))
869 # #Tests related to links
870 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
871 # self.assertEqual(nbLink,8)
872 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
873 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
874 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
875 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
876 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
877 # for i in range(0,nbLink):
878 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']
879 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
880 # if(nodeType=='EXPRESS-LINK'):
881 # find= linkId in expressLink
882 # self.assertEqual(find, True)
883 # expressLink.remove(linkId)
884 # elif(nodeType=='ADD-LINK'):
885 # find= linkId in addLink
886 # self.assertEqual(find, True)
887 # addLink.remove(linkId)
888 # elif(nodeType=='DROP-LINK'):
889 # find= linkId in dropLink
890 # self.assertEqual(find, True)
891 # dropLink.remove(linkId)
892 # elif(nodeType=='XPONDER-INPUT'):
893 # find= linkId in XPDR_IN
894 # self.assertEqual(find, True)
895 # XPDR_IN.remove(linkId)
896 # elif(nodeType=='XPONDER-OUTPUT'):
897 # find= linkId in XPDR_OUT
898 # self.assertEqual(find, True)
899 # XPDR_OUT.remove(linkId)
901 # self.assertFalse(True)
902 # self.assertEqual(len(expressLink),0)
903 # self.assertEqual(len(addLink),0)
904 # self.assertEqual(len(dropLink),0)
905 # self.assertEqual(len(XPDR_IN),0)
906 # self.assertEqual(len(XPDR_OUT),0)
908 # for i in range(0,nbLink):
909 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'ROADM-TO-ROADM')
910 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
911 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
912 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
913 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
914 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
915 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
917 def test_25_getNodes_OpenRoadmTopology(self):
918 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
919 .format(self.restconf_baseurl))
920 headers = {'content-type': 'application/json'}
921 response = requests.request(
922 "GET", url, headers=headers, auth=('admin', 'admin'))
923 res = response.json()
924 #Tests related to nodes
925 self.assertEqual(response.status_code, requests.codes.ok)
926 with open('./transportpce_tests/log/response.log', 'a') as outfile1:
927 outfile1.write(str(len(res['network'][0]['node'])))
928 nbNode=len(res['network'][0]['node'])
929 self.assertEqual(nbNode,4)
930 listNode=['XPDR-A1-XPDR1','ROADM-A1-SRG1','ROADM-A1-DEG1','ROADM-A1-DEG2']
931 for i in range(0,nbNode):
932 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
933 nodeId=res['network'][0]['node'][i]['node-id']
934 #Tests related to XPDRA nodes
935 if(nodeId=='XPDR-A1-XPDR1'):
936 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
937 for j in range(0, nbTp):
938 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
939 if (tpid == 'XPDR1-CLIENT1'):
940 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
941 ['org-openroadm-network-topology:tp-type'], 'XPONDER-CLIENT')
942 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
943 ['org-openroadm-network-topology:xpdr-client-attributes']['tail-equipment-id'],
945 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
946 res['network'][0]['node'][i]['supporting-node'])
947 listNode.remove(nodeId)
948 elif(nodeId=='ROADM-A1-SRG1'):
949 #Test related to SRG1
950 self.assertEqual(nodeType,'SRG')
951 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
952 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
953 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
954 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
955 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
956 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
957 res['network'][0]['node'][i]['supporting-node'])
958 listNode.remove(nodeId)
959 elif(nodeId=='ROADM-A1-DEG1'):
960 #Test related to DEG1
961 self.assertEqual(nodeType,'DEGREE')
962 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
963 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
964 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
965 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
966 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
967 res['network'][0]['node'][i]['supporting-node'])
968 listNode.remove(nodeId)
969 elif(nodeId=='ROADM-A1-DEG2'):
970 #Test related to DEG2
971 self.assertEqual(nodeType,'DEGREE')
972 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
973 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
974 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
975 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
976 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
977 res['network'][0]['node'][i]['supporting-node'])
978 listNode.remove(nodeId)
980 self.assertFalse(True)
981 self.assertEqual(len(listNode),0)
982 #Test related to SRG1 of ROADMC
983 for i in range(0,nbNode):
984 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-SRG1')
985 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-DEG1')
986 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-DEG2')
988 def test_26_getOpenRoadmNetwork(self):
989 url = ("{}/config/ietf-network:networks/network/openroadm-network"
990 .format(self.restconf_baseurl))
991 headers = {'content-type': 'application/json'}
992 response = requests.request(
993 "GET", url, headers=headers, auth=('admin', 'admin'))
994 self.assertEqual(response.status_code, requests.codes.ok)
995 res = response.json()
996 nbNode=len(res['network'][0]['node'])
997 self.assertEqual(nbNode,2)
998 for i in range(0,nbNode-1):
999 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1')
1001 def test_27_getClliNetwork(self):
1002 url = ("{}/config/ietf-network:networks/network/clli-network"
1003 .format(self.restconf_baseurl))
1004 headers = {'content-type': 'application/json'}
1005 response = requests.request(
1006 "GET", url, headers=headers, auth=('admin', 'admin'))
1007 self.assertEqual(response.status_code, requests.codes.ok)
1008 res = response.json()
1009 nbNode=len(res['network'][0]['node'])
1010 self.assertEqual(nbNode,1)
1011 for i in range(0,nbNode-1):
1012 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
1014 def test_28_disconnect_XPDRA(self):
1015 url = ("{}/config/network-topology:"
1016 "network-topology/topology/topology-netconf/node/XPDR-A1"
1017 .format(self.restconf_baseurl))
1019 headers = {'content-type': 'application/json'}
1020 response = requests.request(
1021 "DELETE", url, data=json.dumps(data), headers=headers,
1022 auth=('admin', 'admin'))
1023 self.assertEqual(response.status_code, requests.codes.ok)
1025 def test_29_getClliNetwork(self):
1026 url = ("{}/config/ietf-network:networks/network/clli-network"
1027 .format(self.restconf_baseurl))
1028 headers = {'content-type': 'application/json'}
1029 response = requests.request(
1030 "GET", url, headers=headers, auth=('admin', 'admin'))
1031 self.assertEqual(response.status_code, requests.codes.ok)
1032 res = response.json()
1033 nbNode=len(res['network'][0]['node'])
1034 self.assertEqual(nbNode,1)
1035 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1037 def test_30_getOpenRoadmNetwork(self):
1038 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1039 .format(self.restconf_baseurl))
1040 headers = {'content-type': 'application/json'}
1041 response = requests.request(
1042 "GET", url, headers=headers, auth=('admin', 'admin'))
1043 self.assertEqual(response.status_code, requests.codes.ok)
1044 res = response.json()
1045 nbNode=len(res['network'][0]['node'])
1046 self.assertEqual(nbNode,1)
1047 for i in range(0,nbNode):
1048 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDR-A1')
1050 def test_31_getNodes_OpenRoadmTopology(self):
1051 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1052 .format(self.restconf_baseurl))
1053 headers = {'content-type': 'application/json'}
1054 response = requests.request(
1055 "GET", url, headers=headers, auth=('admin', 'admin'))
1056 res = response.json()
1057 #Tests related to nodes
1058 self.assertEqual(response.status_code, requests.codes.ok)
1059 nbNode=len(res['network'][0]['node'])
1060 self.assertEqual(nbNode,3)
1061 listNode=['ROADM-A1-SRG1','ROADM-A1-DEG1','ROADM-A1-DEG2']
1062 for i in range(0,nbNode):
1063 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1064 res['network'][0]['node'][i]['supporting-node'])
1065 nodeType=res['network'][0]['node'][i]['org-openroadm-network-topology:node-type']
1066 nodeId=res['network'][0]['node'][i]['node-id']
1067 if(nodeId=='ROADM-A1-SRG1'):
1068 #Test related to SRG1
1069 self.assertEqual(nodeType,'SRG')
1070 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1071 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-CP'},
1072 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1073 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-network-topology:tp-type': 'SRG-TXRX-PP'},
1074 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1075 listNode.remove(nodeId)
1076 elif(nodeId=='ROADM-A1-DEG1'):
1077 #Test related to DEG1
1078 self.assertEqual(nodeType,'DEGREE')
1079 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1080 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1081 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1082 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1083 listNode.remove(nodeId)
1084 elif(nodeId=='ROADM-A1-DEG2'):
1085 #Test related to DEG2
1086 self.assertEqual(nodeType,'DEGREE')
1087 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-TTP'},
1088 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1089 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-network-topology:tp-type': 'DEGREE-TXRX-CTP'},
1090 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1091 listNode.remove(nodeId)
1093 self.assertFalse(True)
1094 self.assertEqual(len(listNode),0)
1096 def test_32_disconnect_ROADM_XPDRA_link(self):
1098 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1099 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1100 .format(self.restconf_baseurl))
1102 headers = {'content-type': 'application/json'}
1103 response = requests.request(
1104 "DELETE", url, data=json.dumps(data), headers=headers,
1105 auth=('admin', 'admin'))
1106 self.assertEqual(response.status_code, requests.codes.ok)
1108 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1109 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1110 .format(self.restconf_baseurl))
1112 headers = {'content-type': 'application/json'}
1113 response = requests.request(
1114 "DELETE", url, data=json.dumps(data), headers=headers,
1115 auth=('admin', 'admin'))
1116 self.assertEqual(response.status_code, requests.codes.ok)
1118 def test_33_getLinks_OpenRoadmTopology(self):
1119 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1120 .format(self.restconf_baseurl))
1121 headers = {'content-type': 'application/json'}
1122 response = requests.request(
1123 "GET", url, headers=headers, auth=('admin', 'admin'))
1124 self.assertEqual(response.status_code, requests.codes.ok)
1125 res = response.json()
1126 nbLink=len(res['network'][0]['ietf-network-topology:link'])
1127 self.assertEqual(nbLink, 12)
1128 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1129 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1130 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX']
1131 roadmtoroadmLink = 0
1132 for i in range(0,nbLink):
1133 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='EXPRESS-LINK'):
1134 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1135 find= link_id in expressLink
1136 self.assertEqual(find, True)
1137 expressLink.remove(link_id)
1138 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='ADD-LINK'):
1139 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1140 find= link_id in addLink
1141 self.assertEqual(find, True)
1142 addLink.remove(link_id)
1143 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type']=='DROP-LINK'):
1144 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1145 find= link_id in dropLink
1146 self.assertEqual(find, True)
1147 dropLink.remove(link_id)
1149 roadmtoroadmLink += 1
1150 self.assertEqual(len(expressLink),0)
1151 self.assertEqual(len(addLink),0)
1152 self.assertEqual(len(dropLink),0)
1153 self.assertEqual(roadmtoroadmLink, 6)
1154 for i in range(0,nbLink):
1155 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-OUTPUT')
1156 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-network-topology:link-type'],'XPONDER-INPUT')
1158 def test_34_disconnect_ROADMA(self):
1159 url = ("{}/config/network-topology:"
1160 "network-topology/topology/topology-netconf/node/ROADM-A1"
1161 .format(self.restconf_baseurl))
1163 headers = {'content-type': 'application/json'}
1164 response = requests.request(
1165 "DELETE", url, data=json.dumps(data), headers=headers,
1166 auth=('admin', 'admin'))
1167 self.assertEqual(response.status_code, requests.codes.ok)
1168 #Delete in the clli-network
1169 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1170 .format(self.restconf_baseurl))
1172 headers = {'content-type': 'application/json'}
1173 response = requests.request(
1174 "DELETE", url, data=json.dumps(data), headers=headers,
1175 auth=('admin', 'admin'))
1176 self.assertEqual(response.status_code, requests.codes.ok)
1178 def test_35_getClliNetwork(self):
1179 url = ("{}/config/ietf-network:networks/network/clli-network"
1180 .format(self.restconf_baseurl))
1181 headers = {'content-type': 'application/json'}
1182 response = requests.request(
1183 "GET", url, headers=headers, auth=('admin', 'admin'))
1184 self.assertEqual(response.status_code, requests.codes.ok)
1185 res = response.json()
1186 self.assertNotIn('node', res['network'][0])
1188 def test_36_getOpenRoadmNetwork(self):
1189 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1190 .format(self.restconf_baseurl))
1191 headers = {'content-type': 'application/json'}
1192 response = requests.request(
1193 "GET", url, headers=headers, auth=('admin', 'admin'))
1194 self.assertEqual(response.status_code, requests.codes.ok)
1195 res = response.json()
1196 self.assertNotIn('node', res['network'][0])
1198 def test_37_check_roadm2roadm_link_persistence(self):
1199 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1200 .format(self.restconf_baseurl))
1201 headers = {'content-type': 'application/json'}
1202 response = requests.request(
1203 "GET", url, headers=headers, auth=('admin', 'admin'))
1204 self.assertEqual(response.status_code, requests.codes.ok)
1205 res = response.json()
1206 nbLink=len(res['network'][0]['ietf-network-topology:link'])
1207 self.assertNotIn('node', res['network'][0])
1208 self.assertEqual(nbLink, 6)
1210 if __name__ == "__main__":
1211 #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
1212 #logging.debug('I am there')
1213 unittest.main(verbosity=2)