3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
24 class TransportPCEtesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
33 #START_IGNORE_XTESTING
37 print ("starting honeynode1...")
38 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
41 print ("starting honeynode2...")
42 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
45 print ("starting honeynode3...")
46 cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
49 print ("starting honeynode4...")
50 cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
52 print ("all honeynodes started")
54 print ("starting opendaylight...")
55 cls.odl_process = test_utils.start_tpce()
57 print ("opendaylight started")
60 def tearDownClass(cls):
61 for child in psutil.Process(cls.odl_process.pid).children():
62 child.send_signal(signal.SIGINT)
64 cls.odl_process.send_signal(signal.SIGINT)
65 cls.odl_process.wait()
66 for child in psutil.Process(cls.honeynode_process1.pid).children():
67 child.send_signal(signal.SIGINT)
69 cls.honeynode_process1.send_signal(signal.SIGINT)
70 cls.honeynode_process1.wait()
71 for child in psutil.Process(cls.honeynode_process2.pid).children():
72 child.send_signal(signal.SIGINT)
74 cls.honeynode_process2.send_signal(signal.SIGINT)
75 cls.honeynode_process2.wait()
76 for child in psutil.Process(cls.honeynode_process3.pid).children():
77 child.send_signal(signal.SIGINT)
79 cls.honeynode_process3.send_signal(signal.SIGINT)
80 cls.honeynode_process3.wait()
81 for child in psutil.Process(cls.honeynode_process4.pid).children():
82 child.send_signal(signal.SIGINT)
84 cls.honeynode_process4.send_signal(signal.SIGINT)
85 cls.honeynode_process4.wait()
93 def test_01_connect_ROADM_A1(self):
95 url = ("{}/config/network-topology:"
96 "network-topology/topology/topology-netconf/node/ROADM-A1"
97 .format(self.restconf_baseurl))
99 "node-id": "ROADM-A1",
100 "netconf-node-topology:username": "admin",
101 "netconf-node-topology:password": "admin",
102 "netconf-node-topology:host": "127.0.0.1",
103 "netconf-node-topology:port": "17841",
104 "netconf-node-topology:tcp-only": "false",
105 "netconf-node-topology:pass-through": {}}]}
106 headers = {'content-type': 'application/json'}
107 response = requests.request(
108 "PUT", url, data=json.dumps(data), headers=headers,
109 auth=('admin', 'admin'))
110 self.assertEqual(response.status_code, requests.codes.created)
113 def test_02_getClliNetwork(self):
114 url = ("{}/config/ietf-network:networks/network/clli-network"
115 .format(self.restconf_baseurl))
116 headers = {'content-type': 'application/json'}
117 response = requests.request(
118 "GET", url, headers=headers, auth=('admin', 'admin'))
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
122 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
123 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
125 def test_03_getOpenRoadmNetwork(self):
126 url = ("{}/config/ietf-network:networks/network/openroadm-network"
127 .format(self.restconf_baseurl))
128 headers = {'content-type': 'application/json'}
129 response = requests.request(
130 "GET", url, headers=headers, auth=('admin', 'admin'))
131 self.assertEqual(response.status_code, requests.codes.ok)
132 res = response.json()
133 self.assertEqual(res['network'][0]['node'][0]['node-id'],'ROADM-A1')
134 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'],'clli-network')
135 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'],'NodeA')
136 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'],'ROADM')
137 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'],'model2')
139 def test_04_getLinks_OpenroadmTopology(self):
140 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
141 .format(self.restconf_baseurl))
142 headers = {'content-type': 'application/json'}
143 response = requests.request(
144 "GET", url, headers=headers, auth=('admin', 'admin'))
145 self.assertEqual(response.status_code, requests.codes.ok)
146 res = response.json()
147 #Tests related to links
148 nbLink=len(res['network'][0]['ietf-network-topology:link'])
149 self.assertEqual(nbLink,10)
150 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
151 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
152 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
153 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
154 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
155 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
156 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
157 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
158 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
159 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
160 for i in range(0,nbLink):
161 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
162 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='EXPRESS-LINK'):
163 find= linkId in expressLink
164 self.assertEqual(find, True)
165 expressLink.remove(linkId)
166 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='ADD-LINK'):
167 find= linkId in addLink
168 self.assertEqual(find, True)
169 addLink.remove(linkId)
170 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='DROP-LINK'):
171 find= linkId in dropLink
172 self.assertEqual(find, True)
173 dropLink.remove(linkId)
175 self.assertFalse(True)
176 self.assertEqual(len(expressLink),0)
177 self.assertEqual(len(addLink),0)
178 self.assertEqual(len(dropLink),0)
180 def test_05_getNodes_OpenRoadmTopology(self):
181 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
182 .format(self.restconf_baseurl))
183 headers = {'content-type': 'application/json'}
184 response = requests.request(
185 "GET", url, headers=headers, auth=('admin', 'admin'))
186 res = response.json()
187 #Tests related to nodes
188 self.assertEqual(response.status_code, requests.codes.ok)
189 nbNode=len(res['network'][0]['node'])
190 self.assertEqual(nbNode,4)
191 listNode=['ROADM-A1-SRG1', 'ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
192 for i in range(0,nbNode):
193 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
194 res['network'][0]['node'][i]['supporting-node'])
195 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
196 nodeId=res['network'][0]['node'][i]['node-id']
197 if(nodeId=='ROADM-A1-SRG1'):
198 #Test related to SRG1
199 self.assertEqual(nodeType,'SRG')
200 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
201 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
202 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
203 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
204 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
205 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
206 res['network'][0]['node'][i]['supporting-node'])
207 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
208 res['network'][0]['node'][i]['supporting-node'])
209 listNode.remove(nodeId)
210 elif(nodeId=='ROADM-A1-SRG3'):
211 #Test related to SRG1
212 self.assertEqual(nodeType,'SRG')
213 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
214 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
215 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
216 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
217 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
218 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
219 res['network'][0]['node'][i]['supporting-node'])
220 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
221 res['network'][0]['node'][i]['supporting-node'])
222 listNode.remove(nodeId)
223 elif(nodeId=='ROADM-A1-DEG1'):
224 #Test related to DEG1
225 self.assertEqual(nodeType,'DEGREE')
226 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
227 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
228 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
229 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
230 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
231 res['network'][0]['node'][i]['supporting-node'])
232 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
233 res['network'][0]['node'][i]['supporting-node'])
234 listNode.remove(nodeId)
235 elif(nodeId=='ROADM-A1-DEG2'):
236 #Test related to DEG2
237 self.assertEqual(nodeType,'DEGREE')
238 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
239 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
240 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
241 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
242 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
243 res['network'][0]['node'][i]['supporting-node'])
244 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
245 res['network'][0]['node'][i]['supporting-node'])
246 listNode.remove(nodeId)
248 self.assertFalse(True)
249 self.assertEqual(len(listNode),0)
251 def test_06_connect_XPDRA(self):
252 url = ("{}/config/network-topology:"
253 "network-topology/topology/topology-netconf/node/XPDR-A1"
254 .format(self.restconf_baseurl))
256 "node-id": "XPDR-A1",
257 "netconf-node-topology:username": "admin",
258 "netconf-node-topology:password": "admin",
259 "netconf-node-topology:host": "127.0.0.1",
260 "netconf-node-topology:port": "17840",
261 "netconf-node-topology:tcp-only": "false",
262 "netconf-node-topology:pass-through": {}}]}
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "PUT", url, data=json.dumps(data), headers=headers,
266 auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.created)
270 def test_07_getClliNetwork(self):
271 url = ("{}/config/ietf-network:networks/network/clli-network"
272 .format(self.restconf_baseurl))
273 headers = {'content-type': 'application/json'}
274 response = requests.request(
275 "GET", url, headers=headers, auth=('admin', 'admin'))
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 self.assertEqual(res['network'][0]['node'][0]['node-id'],'NodeA')
279 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
281 def test_08_getOpenRoadmNetwork(self):
282 url = ("{}/config/ietf-network:networks/network/openroadm-network"
283 .format(self.restconf_baseurl))
284 headers = {'content-type': 'application/json'}
285 response = requests.request(
286 "GET", url, headers=headers, auth=('admin', 'admin'))
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 nbNode=len(res['network'][0]['node'])
290 self.assertEqual(nbNode,2)
291 for i in range(0,nbNode):
292 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
293 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
294 nodeId=res['network'][0]['node'][i]['node-id']
295 if(nodeId=='XPDR-A1'):
296 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'XPONDER')
297 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
298 elif(nodeId=='ROADM-A1'):
299 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'ROADM')
300 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
302 self.assertFalse(True)
304 def test_09_getNodes_OpenRoadmTopology(self):
305 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
306 .format(self.restconf_baseurl))
307 headers = {'content-type': 'application/json'}
308 response = requests.request(
309 "GET", url, headers=headers, auth=('admin', 'admin'))
310 res = response.json()
311 #Tests related to nodes
312 self.assertEqual(response.status_code, requests.codes.ok)
313 nbNode=len(res['network'][0]['node'])
314 self.assertEqual(nbNode,5)
315 listNode=['XPDR-A1-XPDR1','ROADM-A1-SRG1','ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
316 for i in range(0,nbNode):
317 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
318 nodeId=res['network'][0]['node'][i]['node-id']
319 #Tests related to XPDRA nodes
320 if(nodeId=='XPDR-A1-XPDR1'):
321 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
322 res['network'][0]['node'][i]['supporting-node'])
323 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
324 res['network'][0]['node'][i]['supporting-node'])
325 self.assertEqual(nodeType,'XPONDER')
326 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
329 for j in range(0,nbTps):
330 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
331 tpId=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
332 if (tpType=='XPONDER-CLIENT'):
334 elif (tpType=='XPONDER-NETWORK'):
336 if (tpId == 'XPDR1-NETWORK2'):
337 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
338 ['transportpce-topology:associated-connection-map-port'],'XPDR1-CLIENT2')
339 if (tpId == 'XPDR1-CLIENT2'):
340 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
341 ['transportpce-topology:associated-connection-map-port'],'XPDR1-NETWORK2')
343 self.assertTrue(client == 2)
344 self.assertTrue(network == 2)
345 listNode.remove(nodeId)
346 elif(nodeId=='ROADM-A1-SRG1'):
347 #Test related to SRG1
348 self.assertEqual(nodeType,'SRG')
349 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
350 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
351 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
352 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
353 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
354 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
355 res['network'][0]['node'][i]['supporting-node'])
356 listNode.remove(nodeId)
357 elif(nodeId=='ROADM-A1-SRG3'):
358 #Test related to SRG1
359 self.assertEqual(nodeType,'SRG')
360 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
361 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
362 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
364 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
366 res['network'][0]['node'][i]['supporting-node'])
367 listNode.remove(nodeId)
368 elif(nodeId=='ROADM-A1-DEG1'):
369 #Test related to DEG1
370 self.assertEqual(nodeType,'DEGREE')
371 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
372 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
374 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
376 res['network'][0]['node'][i]['supporting-node'])
377 listNode.remove(nodeId)
378 elif(nodeId=='ROADM-A1-DEG2'):
379 #Test related to DEG2
380 self.assertEqual(nodeType,'DEGREE')
381 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
382 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
383 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
384 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
385 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
386 res['network'][0]['node'][i]['supporting-node'])
387 listNode.remove(nodeId)
389 self.assertFalse(True)
390 self.assertEqual(len(listNode),0)
392 #Connect the tail XPDRA to ROADMA and vice versa
393 def test_10_connect_tail_xpdr_rdm(self):
394 #Connect the tail: XPDRA to ROADMA
395 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
396 .format(self.restconf_baseurl))
397 data = {"networkutils:input": {
398 "networkutils:links-input": {
399 "networkutils:xpdr-node": "XPDR-A1",
400 "networkutils:xpdr-num": "1",
401 "networkutils:network-num": "1",
402 "networkutils:rdm-node": "ROADM-A1",
403 "networkutils:srg-num": "1",
404 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
408 headers = {'content-type': 'application/json'}
409 response = requests.request(
410 "POST", url, data=json.dumps(data), headers=headers,
411 auth=('admin', 'admin'))
412 self.assertEqual(response.status_code, requests.codes.ok)
414 def test_11_connect_tail_rdm_xpdr(self):
415 #Connect the tail: ROADMA to XPDRA
416 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
417 .format(self.restconf_baseurl))
418 data = {"networkutils:input": {
419 "networkutils:links-input": {
420 "networkutils:xpdr-node": "XPDR-A1",
421 "networkutils:xpdr-num": "1",
422 "networkutils:network-num": "1",
423 "networkutils:rdm-node": "ROADM-A1",
424 "networkutils:srg-num": "1",
425 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
429 headers = {'content-type': 'application/json'}
430 response = requests.request(
431 "POST", url, data=json.dumps(data), headers=headers,
432 auth=('admin', 'admin'))
433 self.assertEqual(response.status_code, requests.codes.ok)
435 def test_12_getLinks_OpenRoadmTopology(self):
436 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
437 .format(self.restconf_baseurl))
438 headers = {'content-type': 'application/json'}
439 response = requests.request(
440 "GET", url, headers=headers, auth=('admin', 'admin'))
441 self.assertEqual(response.status_code, requests.codes.ok)
442 res = response.json()
443 #Tests related to links
444 nbLink=len(res['network'][0]['ietf-network-topology:link'])
445 self.assertEqual(nbLink,12)
446 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
447 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
448 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
449 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
450 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
451 XPDR_IN=['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
452 XPDR_OUT=['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
453 for i in range(0,nbLink):
454 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
455 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
456 if(nodeType=='EXPRESS-LINK'):
457 find= linkId in expressLink
458 self.assertEqual(find, True)
459 expressLink.remove(linkId)
460 elif(nodeType=='ADD-LINK'):
461 find= linkId in addLink
462 self.assertEqual(find, True)
463 addLink.remove(linkId)
464 elif(nodeType=='DROP-LINK'):
465 find= linkId in dropLink
466 self.assertEqual(find, True)
467 dropLink.remove(linkId)
468 elif(nodeType=='XPONDER-INPUT'):
469 find= linkId in XPDR_IN
470 self.assertEqual(find, True)
471 XPDR_IN.remove(linkId)
472 elif(nodeType=='XPONDER-OUTPUT'):
473 find= linkId in XPDR_OUT
474 self.assertEqual(find, True)
475 XPDR_OUT.remove(linkId)
477 self.assertFalse(True)
478 self.assertEqual(len(expressLink),0)
479 self.assertEqual(len(addLink),0)
480 self.assertEqual(len(dropLink),0)
481 self.assertEqual(len(XPDR_IN),0)
482 self.assertEqual(len(XPDR_OUT),0)
484 def test_13_connect_ROADMC(self):
486 url = ("{}/config/network-topology:"
487 "network-topology/topology/topology-netconf/node/ROADM-C1"
488 .format(self.restconf_baseurl))
490 "node-id": "ROADM-C1",
491 "netconf-node-topology:username": "admin",
492 "netconf-node-topology:password": "admin",
493 "netconf-node-topology:host": "127.0.0.1",
494 "netconf-node-topology:port": "17843",
495 "netconf-node-topology:tcp-only": "false",
496 "netconf-node-topology:pass-through": {}}]}
497 headers = {'content-type': 'application/json'}
498 response = requests.request(
499 "PUT", url, data=json.dumps(data), headers=headers,
500 auth=('admin', 'admin'))
501 self.assertEqual(response.status_code, requests.codes.created)
504 def test_14_getClliNetwork(self):
505 url = ("{}/config/ietf-network:networks/network/clli-network"
506 .format(self.restconf_baseurl))
507 headers = {'content-type': 'application/json'}
508 response = requests.request(
509 "GET", url, headers=headers, auth=('admin', 'admin'))
510 self.assertEqual(response.status_code, requests.codes.ok)
511 res = response.json()
512 nbNode=len(res['network'][0]['node'])
513 listNode=['NodeA','NodeC']
514 for i in range(0,nbNode):
515 nodeId = res['network'][0]['node'][i]['node-id']
516 find= nodeId in listNode
517 self.assertEqual(find, True)
519 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
521 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
522 listNode.remove(nodeId)
523 self.assertEqual(len(listNode),0)
525 def test_15_getOpenRoadmNetwork(self):
526 url = ("{}/config/ietf-network:networks/network/openroadm-network"
527 .format(self.restconf_baseurl))
528 headers = {'content-type': 'application/json'}
529 response = requests.request(
530 "GET", url, headers=headers, auth=('admin', 'admin'))
531 self.assertEqual(response.status_code, requests.codes.ok)
532 res = response.json()
533 nbNode=len(res['network'][0]['node'])
534 self.assertEqual(nbNode,3)
535 listNode=['XPDR-A1','ROADM-A1','ROADM-C1']
536 for i in range(0,nbNode):
537 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'],'clli-network')
538 nodeId=res['network'][0]['node'][i]['node-id']
539 if(nodeId=='XPDR-A1'):
540 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
541 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'XPONDER')
542 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
543 listNode.remove(nodeId)
544 elif(nodeId=='ROADM-A1'):
545 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeA')
546 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'ROADM')
547 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
548 listNode.remove(nodeId)
549 elif(nodeId=='ROADM-C1'):
550 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'],'NodeC')
551 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'ROADM')
552 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'],'model2')
553 listNode.remove(nodeId)
555 self.assertFalse(True)
556 self.assertEqual(len(listNode),0)
558 def test_16_getROADMLinkOpenRoadmTopology(self):
559 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
560 .format(self.restconf_baseurl))
561 headers = {'content-type': 'application/json'}
562 response = requests.request(
563 "GET", url, headers=headers, auth=('admin', 'admin'))
564 self.assertEqual(response.status_code, requests.codes.ok)
565 res = response.json()
566 #Tests related to links
567 nbLink=len(res['network'][0]['ietf-network-topology:link'])
568 self.assertEqual(nbLink,20)
569 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
570 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX','ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
571 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
572 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
573 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX','ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
574 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
575 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
576 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX','ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
577 R2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
578 XPDR_IN=['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
579 XPDR_OUT=['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
580 for i in range(0,nbLink):
581 nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
582 linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
583 if(nodeType=='EXPRESS-LINK'):
584 find= linkId in expressLink
585 self.assertEqual(find, True)
586 expressLink.remove(linkId)
587 elif(nodeType=='ADD-LINK'):
588 find= linkId in addLink
589 self.assertEqual(find, True)
590 addLink.remove(linkId)
591 elif(nodeType=='DROP-LINK'):
592 find= linkId in dropLink
593 self.assertEqual(find, True)
594 dropLink.remove(linkId)
595 elif(nodeType=='ROADM-TO-ROADM'):
596 find= linkId in R2RLink
597 self.assertEqual(find, True)
598 R2RLink.remove(linkId)
599 elif(nodeType=='XPONDER-INPUT'):
600 find= linkId in XPDR_IN
601 self.assertEqual(find, True)
602 XPDR_IN.remove(linkId)
603 elif(nodeType=='XPONDER-OUTPUT'):
604 find= linkId in XPDR_OUT
605 self.assertEqual(find, True)
606 XPDR_OUT.remove(linkId)
608 self.assertFalse(True)
609 self.assertEqual(len(expressLink),0)
610 self.assertEqual(len(addLink),0)
611 self.assertEqual(len(dropLink),0)
612 self.assertEqual(len(R2RLink),0)
613 self.assertEqual(len(XPDR_IN),0)
614 self.assertEqual(len(XPDR_OUT),0)
616 def test_17_getNodes_OpenRoadmTopology(self):
617 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
618 .format(self.restconf_baseurl))
619 headers = {'content-type': 'application/json'}
620 response = requests.request(
621 "GET", url, headers=headers, auth=('admin', 'admin'))
622 res = response.json()
623 #Tests related to nodes
624 self.assertEqual(response.status_code, requests.codes.ok)
625 nbNode=len(res['network'][0]['node'])
626 self.assertEqual(nbNode,8)
627 listNode=['XPDR-A1-XPDR1',
628 'ROADM-A1-SRG1','ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2',
629 'ROADM-C1-SRG1','ROADM-C1-DEG1', 'ROADM-C1-DEG2']
630 #************************Tests related to XPDRA nodes
631 for i in range(0,nbNode):
632 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
633 nodeId=res['network'][0]['node'][i]['node-id']
634 if(nodeId=='XPDR-A1-XPDR1'):
635 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
636 res['network'][0]['node'][i]['supporting-node'])
637 self.assertEqual(nodeType,'XPONDER')
638 nbTps=len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
639 self.assertTrue(nbTps >= 4)
642 for j in range(0,nbTps):
643 tpType=res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
644 if (tpType=='XPONDER-CLIENT'):
646 elif (tpType=='XPONDER-NETWORK'):
648 self.assertTrue(client == 2)
649 self.assertTrue(network == 2)
650 listNode.remove(nodeId)
651 elif(nodeId=='ROADM-A1-SRG1'):
652 #Test related to SRG1
653 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
654 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
655 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
656 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
657 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
658 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
659 res['network'][0]['node'][i]['supporting-node'])
660 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'SRG')
661 listNode.remove(nodeId)
662 elif(nodeId=='ROADM-A1-SRG3'):
663 #Test related to SRG1
664 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
665 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
666 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
667 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
668 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
669 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
670 res['network'][0]['node'][i]['supporting-node'])
671 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'SRG')
672 listNode.remove(nodeId)
673 elif(nodeId=='ROADM-A1-DEG1'):
674 #Test related to DEG1
675 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
676 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
677 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
678 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
679 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
680 res['network'][0]['node'][i]['supporting-node'])
681 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
682 listNode.remove(nodeId)
683 elif(nodeId=='ROADM-A1-DEG2'):
684 #Test related to DEG2
685 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
686 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
687 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
688 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
689 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
690 res['network'][0]['node'][i]['supporting-node'])
691 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
692 listNode.remove(nodeId)
693 elif(nodeId=='ROADM-C1-SRG1'):
694 #Test related to SRG1
695 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
696 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
697 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
698 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
699 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
700 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
701 res['network'][0]['node'][i]['supporting-node'])
702 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'SRG')
703 listNode.remove(nodeId)
704 elif(nodeId=='ROADM-C1-DEG1'):
705 #Test related to DEG1
706 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
707 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
708 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
709 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
710 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
711 res['network'][0]['node'][i]['supporting-node'])
712 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
713 listNode.remove(nodeId)
714 elif(nodeId=='ROADM-C1-DEG2'):
715 #Test related to DEG1
716 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
717 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
718 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
719 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
720 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
721 res['network'][0]['node'][i]['supporting-node'])
722 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'],'DEGREE')
723 listNode.remove(nodeId)
725 self.assertFalse(True)
726 self.assertEqual(len(listNode),0)
728 def test_18_connect_ROADMB(self):
729 url = ("{}/config/network-topology:"
730 "network-topology/topology/topology-netconf/node/ROADM-B1"
731 .format(self.restconf_baseurl))
733 "node-id": "ROADM-B1",
734 "netconf-node-topology:username": "admin",
735 "netconf-node-topology:password": "admin",
736 "netconf-node-topology:host": "127.0.0.1",
737 "netconf-node-topology:port": "17842",
738 "netconf-node-topology:tcp-only": "false",
739 "netconf-node-topology:pass-through": {}}]}
740 headers = {'content-type': 'application/json'}
741 response = requests.request(
742 "PUT", url, data=json.dumps(data), headers=headers,
743 auth=('admin', 'admin'))
744 self.assertEqual(response.status_code, requests.codes.created)
747 def test_19_getClliNetwork(self):
748 url = ("{}/config/ietf-network:networks/network/clli-network"
749 .format(self.restconf_baseurl))
750 headers = {'content-type': 'application/json'}
751 response = requests.request(
752 "GET", url, headers=headers, auth=('admin', 'admin'))
753 self.assertEqual(response.status_code, requests.codes.ok)
754 res = response.json()
755 nbNode=len(res['network'][0]['node'])
756 listNode=['NodeA','NodeB','NodeC']
757 for i in range(0,nbNode):
758 nodeId = res['network'][0]['node'][i]['node-id']
759 find= nodeId in listNode
760 self.assertEqual(find, True)
762 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeA')
763 elif(nodeId=='NodeB'):
764 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeB')
766 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'],'NodeC')
767 listNode.remove(nodeId)
768 self.assertEqual(len(listNode),0)
770 def test_20_verifyDegree(self):
771 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
772 .format(self.restconf_baseurl))
773 headers = {'content-type': 'application/json'}
774 response = requests.request(
775 "GET", url, headers=headers, auth=('admin', 'admin'))
776 self.assertEqual(response.status_code, requests.codes.ok)
777 res = response.json()
778 #Tests related to links
779 nbLink=len(res['network'][0]['ietf-network-topology:link'])
780 listR2RLink=['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
781 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX','ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
782 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX','ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
783 for i in range(0,nbLink):
784 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
785 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
786 find= link_id in listR2RLink
787 self.assertEqual(find, True)
788 listR2RLink.remove(link_id)
789 self.assertEqual(len(listR2RLink),0)
791 def test_21_verifyOppositeLinkTopology(self):
792 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
793 .format(self.restconf_baseurl))
794 headers = {'content-type': 'application/json'}
795 response = requests.request(
796 "GET", url, headers=headers, auth=('admin', 'admin'))
797 self.assertEqual(response.status_code, requests.codes.ok)
798 res = response.json()
799 #Tests related to links
800 nbLink=len(res['network'][0]['ietf-network-topology:link'])
801 self.assertEqual(nbLink,26)
802 for i in range(0,nbLink):
803 link_id=res['network'][0]['ietf-network-topology:link'][i]['link-id']
804 link_type=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
805 link_src=res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
806 link_dest=res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
807 oppLink_id=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
808 #Find the opposite link
809 url_oppLink="{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
810 url = (url_oppLink.format(self.restconf_baseurl))
811 headers = {'content-type': 'application/json'}
812 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
813 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
814 res_oppLink = response_oppLink.json()
815 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:opposite-link'],link_id)
816 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'],link_dest)
817 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'],link_src)
818 oppLink_type=res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
819 if link_type=='ADD-LINK':
820 self.assertEqual(oppLink_type, 'DROP-LINK')
821 elif link_type=='DROP-LINK':
822 self.assertEqual(oppLink_type, 'ADD-LINK')
823 elif link_type=='EXPRESS-LINK':
824 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
825 elif link_type=='ROADM-TO-ROADM':
826 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
827 elif link_type=='XPONDER-INPUT':
828 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
829 elif link_type=='XPONDER-OUTPUT':
830 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
832 def test_22_disconnect_ROADMB(self):
833 #Delete in the topology-netconf
834 url = ("{}/config/network-topology:"
835 "network-topology/topology/topology-netconf/node/ROADM-B1"
836 .format(self.restconf_baseurl))
838 headers = {'content-type': 'application/json'}
839 response = requests.request(
840 "DELETE", url, data=json.dumps(data), headers=headers,
841 auth=('admin', 'admin'))
842 self.assertEqual(response.status_code, requests.codes.ok)
843 #Delete in the clli-network
844 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
845 .format(self.restconf_baseurl))
847 headers = {'content-type': 'application/json'}
848 response = requests.request(
849 "DELETE", url, data=json.dumps(data), headers=headers,
850 auth=('admin', 'admin'))
851 self.assertEqual(response.status_code, requests.codes.ok)
853 def test_23_disconnect_ROADMC(self):
854 #Delete in the topology-netconf
855 url = ("{}/config/network-topology:"
856 "network-topology/topology/topology-netconf/node/ROADM-C1"
857 .format(self.restconf_baseurl))
859 headers = {'content-type': 'application/json'}
860 response = requests.request(
861 "DELETE", url, data=json.dumps(data), headers=headers,
862 auth=('admin', 'admin'))
863 self.assertEqual(response.status_code, requests.codes.ok)
864 #Delete in the clli-network
865 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
866 .format(self.restconf_baseurl))
868 headers = {'content-type': 'application/json'}
869 response = requests.request(
870 "DELETE", url, data=json.dumps(data), headers=headers,
871 auth=('admin', 'admin'))
872 self.assertEqual(response.status_code, requests.codes.ok)
874 # def test_24_check_roadm2roadm_links_deletion(self):
875 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
876 # .format(self.restconf_baseurl))
877 # headers = {'content-type': 'application/json'}
878 # response = requests.request(
879 # "GET", url, headers=headers, auth=('admin', 'admin'))
880 # self.assertEqual(response.status_code, requests.codes.ok)
881 # res = response.json()
882 # #Write the response in the log
883 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
884 # outfile1.write(str(res))
885 # #Tests related to links
886 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
887 # self.assertEqual(nbLink,8)
888 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
889 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
890 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
891 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
892 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
893 # for i in range(0,nbLink):
894 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
895 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
896 # if(nodeType=='EXPRESS-LINK'):
897 # find= linkId in expressLink
898 # self.assertEqual(find, True)
899 # expressLink.remove(linkId)
900 # elif(nodeType=='ADD-LINK'):
901 # find= linkId in addLink
902 # self.assertEqual(find, True)
903 # addLink.remove(linkId)
904 # elif(nodeType=='DROP-LINK'):
905 # find= linkId in dropLink
906 # self.assertEqual(find, True)
907 # dropLink.remove(linkId)
908 # elif(nodeType=='XPONDER-INPUT'):
909 # find= linkId in XPDR_IN
910 # self.assertEqual(find, True)
911 # XPDR_IN.remove(linkId)
912 # elif(nodeType=='XPONDER-OUTPUT'):
913 # find= linkId in XPDR_OUT
914 # self.assertEqual(find, True)
915 # XPDR_OUT.remove(linkId)
917 # self.assertFalse(True)
918 # self.assertEqual(len(expressLink),0)
919 # self.assertEqual(len(addLink),0)
920 # self.assertEqual(len(dropLink),0)
921 # self.assertEqual(len(XPDR_IN),0)
922 # self.assertEqual(len(XPDR_OUT),0)
924 # for i in range(0,nbLink):
925 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
926 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
927 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
928 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
929 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
930 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
931 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
933 def test_25_getNodes_OpenRoadmTopology(self):
934 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
935 .format(self.restconf_baseurl))
936 headers = {'content-type': 'application/json'}
937 response = requests.request(
938 "GET", url, headers=headers, auth=('admin', 'admin'))
939 res = response.json()
940 #Tests related to nodes
941 self.assertEqual(response.status_code, requests.codes.ok)
942 nbNode=len(res['network'][0]['node'])
943 self.assertEqual(nbNode,5)
944 listNode=['XPDR-A1-XPDR1','ROADM-A1-SRG1', 'ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
945 for i in range(0,nbNode):
946 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
947 nodeId=res['network'][0]['node'][i]['node-id']
948 #Tests related to XPDRA nodes
949 if(nodeId=='XPDR-A1-XPDR1'):
950 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
951 for j in range(0, nbTp):
952 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
953 if (tpid == 'XPDR1-CLIENT1'):
954 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
955 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
956 if (tpid == 'XPDR1-NETWORK1'):
957 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
958 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
959 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
960 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
961 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
962 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
963 res['network'][0]['node'][i]['supporting-node'])
964 listNode.remove(nodeId)
965 elif(nodeId=='ROADM-A1-SRG1'):
966 #Test related to SRG1
967 self.assertEqual(nodeType,'SRG')
968 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
969 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
970 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
971 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
972 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
973 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
974 res['network'][0]['node'][i]['supporting-node'])
975 listNode.remove(nodeId)
976 elif(nodeId=='ROADM-A1-SRG3'):
977 #Test related to SRG1
978 self.assertEqual(nodeType,'SRG')
979 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
980 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
981 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
982 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
983 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
984 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
985 res['network'][0]['node'][i]['supporting-node'])
986 listNode.remove(nodeId)
987 elif(nodeId=='ROADM-A1-DEG1'):
988 #Test related to DEG1
989 self.assertEqual(nodeType,'DEGREE')
990 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
991 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
992 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
993 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
994 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
995 res['network'][0]['node'][i]['supporting-node'])
996 listNode.remove(nodeId)
997 elif(nodeId=='ROADM-A1-DEG2'):
998 #Test related to DEG2
999 self.assertEqual(nodeType,'DEGREE')
1000 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1001 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1002 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1003 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1004 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1005 res['network'][0]['node'][i]['supporting-node'])
1006 listNode.remove(nodeId)
1008 self.assertFalse(True)
1009 self.assertEqual(len(listNode),0)
1010 #Test related to SRG1 of ROADMC
1011 for i in range(0,nbNode):
1012 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-SRG1')
1013 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-DEG1')
1014 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1-DEG2')
1016 def test_26_getOpenRoadmNetwork(self):
1017 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1018 .format(self.restconf_baseurl))
1019 headers = {'content-type': 'application/json'}
1020 response = requests.request(
1021 "GET", url, headers=headers, auth=('admin', 'admin'))
1022 self.assertEqual(response.status_code, requests.codes.ok)
1023 res = response.json()
1024 nbNode=len(res['network'][0]['node'])
1025 self.assertEqual(nbNode,2)
1026 for i in range(0,nbNode-1):
1027 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-C1')
1028 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'ROADM-B1')
1030 def test_27_getClliNetwork(self):
1031 url = ("{}/config/ietf-network:networks/network/clli-network"
1032 .format(self.restconf_baseurl))
1033 headers = {'content-type': 'application/json'}
1034 response = requests.request(
1035 "GET", url, headers=headers, auth=('admin', 'admin'))
1036 self.assertEqual(response.status_code, requests.codes.ok)
1037 res = response.json()
1038 nbNode=len(res['network'][0]['node'])
1039 self.assertEqual(nbNode,1)
1040 for i in range(0,nbNode-1):
1041 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'],'NodeC')
1043 def test_28_disconnect_XPDRA(self):
1044 url = ("{}/config/network-topology:"
1045 "network-topology/topology/topology-netconf/node/XPDR-A1"
1046 .format(self.restconf_baseurl))
1048 headers = {'content-type': 'application/json'}
1049 response = requests.request(
1050 "DELETE", url, data=json.dumps(data), headers=headers,
1051 auth=('admin', 'admin'))
1052 self.assertEqual(response.status_code, requests.codes.ok)
1054 def test_29_getClliNetwork(self):
1055 url = ("{}/config/ietf-network:networks/network/clli-network"
1056 .format(self.restconf_baseurl))
1057 headers = {'content-type': 'application/json'}
1058 response = requests.request(
1059 "GET", url, headers=headers, auth=('admin', 'admin'))
1060 self.assertEqual(response.status_code, requests.codes.ok)
1061 res = response.json()
1062 nbNode=len(res['network'][0]['node'])
1063 self.assertEqual(nbNode,1)
1064 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'],'NodeA')
1066 def test_30_getOpenRoadmNetwork(self):
1067 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1068 .format(self.restconf_baseurl))
1069 headers = {'content-type': 'application/json'}
1070 response = requests.request(
1071 "GET", url, headers=headers, auth=('admin', 'admin'))
1072 self.assertEqual(response.status_code, requests.codes.ok)
1073 res = response.json()
1074 nbNode=len(res['network'][0]['node'])
1075 self.assertEqual(nbNode,1)
1076 for i in range(0,nbNode):
1077 self.assertNotEqual(res['network'][0]['node'][i]['node-id'],'XPDR-A1')
1079 def test_31_getNodes_OpenRoadmTopology(self):
1080 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1081 .format(self.restconf_baseurl))
1082 headers = {'content-type': 'application/json'}
1083 response = requests.request(
1084 "GET", url, headers=headers, auth=('admin', 'admin'))
1085 res = response.json()
1086 #Tests related to nodes
1087 self.assertEqual(response.status_code, requests.codes.ok)
1088 nbNode=len(res['network'][0]['node'])
1089 self.assertEqual(nbNode,4)
1090 listNode=['ROADM-A1-SRG1','ROADM-A1-SRG3','ROADM-A1-DEG1','ROADM-A1-DEG2']
1091 for i in range(0,nbNode):
1092 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1093 res['network'][0]['node'][i]['supporting-node'])
1094 nodeType=res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1095 nodeId=res['network'][0]['node'][i]['node-id']
1096 if(nodeId=='ROADM-A1-SRG1'):
1097 #Test related to SRG1
1098 self.assertEqual(nodeType,'SRG')
1099 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1100 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1101 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1102 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1103 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1104 listNode.remove(nodeId)
1105 elif(nodeId=='ROADM-A1-SRG3'):
1106 #Test related to SRG1
1107 self.assertEqual(nodeType,'SRG')
1108 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']),5)
1109 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1110 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1111 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1112 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1113 listNode.remove(nodeId)
1114 elif(nodeId=='ROADM-A1-DEG1'):
1115 #Test related to DEG1
1116 self.assertEqual(nodeType,'DEGREE')
1117 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1118 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1119 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1120 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1121 listNode.remove(nodeId)
1122 elif(nodeId=='ROADM-A1-DEG2'):
1123 #Test related to DEG2
1124 self.assertEqual(nodeType,'DEGREE')
1125 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1126 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1127 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1128 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1129 listNode.remove(nodeId)
1131 self.assertFalse(True)
1132 self.assertEqual(len(listNode),0)
1134 def test_32_disconnect_ROADM_XPDRA_link(self):
1136 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1137 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1138 .format(self.restconf_baseurl))
1140 headers = {'content-type': 'application/json'}
1141 response = requests.request(
1142 "DELETE", url, data=json.dumps(data), headers=headers,
1143 auth=('admin', 'admin'))
1144 self.assertEqual(response.status_code, requests.codes.ok)
1146 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1147 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1148 .format(self.restconf_baseurl))
1150 headers = {'content-type': 'application/json'}
1151 response = requests.request(
1152 "DELETE", url, data=json.dumps(data), headers=headers,
1153 auth=('admin', 'admin'))
1154 self.assertEqual(response.status_code, requests.codes.ok)
1156 def test_33_getLinks_OpenRoadmTopology(self):
1157 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1158 .format(self.restconf_baseurl))
1159 headers = {'content-type': 'application/json'}
1160 response = requests.request(
1161 "GET", url, headers=headers, auth=('admin', 'admin'))
1162 self.assertEqual(response.status_code, requests.codes.ok)
1163 res = response.json()
1164 nbLink=len(res['network'][0]['ietf-network-topology:link'])
1165 self.assertEqual(nbLink, 16)
1166 expressLink=['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX','ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1167 addLink=['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1168 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX','ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1169 dropLink=['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1170 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX','ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1171 roadmtoroadmLink = 0
1172 for i in range(0,nbLink):
1173 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='EXPRESS-LINK'):
1174 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1175 find= link_id in expressLink
1176 self.assertEqual(find, True)
1177 expressLink.remove(link_id)
1178 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='ADD-LINK'):
1179 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1180 find= link_id in addLink
1181 self.assertEqual(find, True)
1182 addLink.remove(link_id)
1183 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']=='DROP-LINK'):
1184 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1185 find= link_id in dropLink
1186 self.assertEqual(find, True)
1187 dropLink.remove(link_id)
1189 roadmtoroadmLink += 1
1190 self.assertEqual(len(expressLink),0)
1191 self.assertEqual(len(addLink),0)
1192 self.assertEqual(len(dropLink),0)
1193 self.assertEqual(roadmtoroadmLink, 6)
1194 for i in range(0,nbLink):
1195 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'XPONDER-OUTPUT')
1196 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'XPONDER-INPUT')
1198 def test_34_disconnect_ROADMA(self):
1199 url = ("{}/config/network-topology:"
1200 "network-topology/topology/topology-netconf/node/ROADM-A1"
1201 .format(self.restconf_baseurl))
1203 headers = {'content-type': 'application/json'}
1204 response = requests.request(
1205 "DELETE", url, data=json.dumps(data), headers=headers,
1206 auth=('admin', 'admin'))
1207 self.assertEqual(response.status_code, requests.codes.ok)
1208 #Delete in the clli-network
1209 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1210 .format(self.restconf_baseurl))
1212 headers = {'content-type': 'application/json'}
1213 response = requests.request(
1214 "DELETE", url, data=json.dumps(data), headers=headers,
1215 auth=('admin', 'admin'))
1216 self.assertEqual(response.status_code, requests.codes.ok)
1218 def test_35_getClliNetwork(self):
1219 url = ("{}/config/ietf-network:networks/network/clli-network"
1220 .format(self.restconf_baseurl))
1221 headers = {'content-type': 'application/json'}
1222 response = requests.request(
1223 "GET", url, headers=headers, auth=('admin', 'admin'))
1224 self.assertEqual(response.status_code, requests.codes.ok)
1225 res = response.json()
1226 self.assertNotIn('node', res['network'][0])
1228 def test_36_getOpenRoadmNetwork(self):
1229 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1230 .format(self.restconf_baseurl))
1231 headers = {'content-type': 'application/json'}
1232 response = requests.request(
1233 "GET", url, headers=headers, auth=('admin', 'admin'))
1234 self.assertEqual(response.status_code, requests.codes.ok)
1235 res = response.json()
1236 self.assertNotIn('node', res['network'][0])
1238 def test_37_check_roadm2roadm_link_persistence(self):
1239 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1240 .format(self.restconf_baseurl))
1241 headers = {'content-type': 'application/json'}
1242 response = requests.request(
1243 "GET", url, headers=headers, auth=('admin', 'admin'))
1244 self.assertEqual(response.status_code, requests.codes.ok)
1245 res = response.json()
1246 nbLink=len(res['network'][0]['ietf-network-topology:link'])
1247 self.assertNotIn('node', res['network'][0])
1248 self.assertEqual(nbLink, 6)
1250 if __name__ == "__main__":
1251 unittest.main(verbosity=2)