3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
28 restconf_baseurl = "http://localhost:8181/restconf"
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmb', 'roadmc'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
44 def test_01_connect_ROADM_A1(self):
46 url = ("{}/config/network-topology:"
47 "network-topology/topology/topology-netconf/node/ROADM-A1"
48 .format(self.restconf_baseurl))
50 "node-id": "ROADM-A1",
51 "netconf-node-topology:username": "admin",
52 "netconf-node-topology:password": "admin",
53 "netconf-node-topology:host": "127.0.0.1",
54 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
55 "netconf-node-topology:tcp-only": "false",
56 "netconf-node-topology:pass-through": {}}]}
57 headers = {'content-type': 'application/json'}
58 response = requests.request(
59 "PUT", url, data=json.dumps(data), headers=headers,
60 auth=('admin', 'admin'))
61 self.assertEqual(response.status_code, requests.codes.created)
64 def test_02_getClliNetwork(self):
65 url = ("{}/config/ietf-network:networks/network/clli-network"
66 .format(self.restconf_baseurl))
67 headers = {'content-type': 'application/json'}
68 response = requests.request(
69 "GET", url, headers=headers, auth=('admin', 'admin'))
70 self.assertEqual(response.status_code, requests.codes.ok)
73 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
74 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
76 def test_03_getOpenRoadmNetwork(self):
77 url = ("{}/config/ietf-network:networks/network/openroadm-network"
78 .format(self.restconf_baseurl))
79 headers = {'content-type': 'application/json'}
80 response = requests.request(
81 "GET", url, headers=headers, auth=('admin', 'admin'))
82 self.assertEqual(response.status_code, requests.codes.ok)
84 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADM-A1')
85 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
86 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
87 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
88 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
90 def test_04_getLinks_OpenroadmTopology(self):
91 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
92 .format(self.restconf_baseurl))
93 headers = {'content-type': 'application/json'}
94 response = requests.request(
95 "GET", url, headers=headers, auth=('admin', 'admin'))
96 self.assertEqual(response.status_code, requests.codes.ok)
98 # Tests related to links
99 nbLink = len(res['network'][0]['ietf-network-topology:link'])
100 self.assertEqual(nbLink, 10)
101 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
102 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
103 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
104 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
105 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
106 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
107 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
108 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
109 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
110 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
111 for i in range(0, nbLink):
112 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
113 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
114 find = linkId in expressLink
115 self.assertEqual(find, True)
116 expressLink.remove(linkId)
117 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
118 find = linkId in addLink
119 self.assertEqual(find, True)
120 addLink.remove(linkId)
121 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
122 find = linkId in dropLink
123 self.assertEqual(find, True)
124 dropLink.remove(linkId)
126 self.assertFalse(True)
127 self.assertEqual(len(expressLink), 0)
128 self.assertEqual(len(addLink), 0)
129 self.assertEqual(len(dropLink), 0)
131 def test_05_getNodes_OpenRoadmTopology(self):
132 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
133 .format(self.restconf_baseurl))
134 headers = {'content-type': 'application/json'}
135 response = requests.request(
136 "GET", url, headers=headers, auth=('admin', 'admin'))
137 res = response.json()
138 # Tests related to nodes
139 self.assertEqual(response.status_code, requests.codes.ok)
140 nbNode = len(res['network'][0]['node'])
141 self.assertEqual(nbNode, 4)
142 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
143 for i in range(0, nbNode):
144 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
145 res['network'][0]['node'][i]['supporting-node'])
146 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
147 nodeId = res['network'][0]['node'][i]['node-id']
148 if(nodeId == 'ROADM-A1-SRG1'):
149 # Test related to SRG1
150 self.assertEqual(nodeType, 'SRG')
151 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
152 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
153 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
154 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
155 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
156 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
157 res['network'][0]['node'][i]['supporting-node'])
158 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
159 res['network'][0]['node'][i]['supporting-node'])
160 listNode.remove(nodeId)
161 elif(nodeId == 'ROADM-A1-SRG3'):
162 # Test related to SRG1
163 self.assertEqual(nodeType, 'SRG')
164 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
165 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
166 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
167 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
168 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
169 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
170 res['network'][0]['node'][i]['supporting-node'])
171 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
172 res['network'][0]['node'][i]['supporting-node'])
173 listNode.remove(nodeId)
174 elif(nodeId == 'ROADM-A1-DEG1'):
175 # Test related to DEG1
176 self.assertEqual(nodeType, 'DEGREE')
177 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
178 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
179 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
180 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
181 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
182 res['network'][0]['node'][i]['supporting-node'])
183 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
184 res['network'][0]['node'][i]['supporting-node'])
185 listNode.remove(nodeId)
186 elif(nodeId == 'ROADM-A1-DEG2'):
187 # Test related to DEG2
188 self.assertEqual(nodeType, 'DEGREE')
189 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
190 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
191 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
192 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
193 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
194 res['network'][0]['node'][i]['supporting-node'])
195 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
196 res['network'][0]['node'][i]['supporting-node'])
197 listNode.remove(nodeId)
199 self.assertFalse(True)
200 self.assertEqual(len(listNode), 0)
202 def test_06_connect_XPDRA(self):
203 url = ("{}/config/network-topology:"
204 "network-topology/topology/topology-netconf/node/XPDR-A1"
205 .format(self.restconf_baseurl))
207 "node-id": "XPDR-A1",
208 "netconf-node-topology:username": "admin",
209 "netconf-node-topology:password": "admin",
210 "netconf-node-topology:host": "127.0.0.1",
211 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
212 "netconf-node-topology:tcp-only": "false",
213 "netconf-node-topology:pass-through": {}}]}
214 headers = {'content-type': 'application/json'}
215 response = requests.request(
216 "PUT", url, data=json.dumps(data), headers=headers,
217 auth=('admin', 'admin'))
218 self.assertEqual(response.status_code, requests.codes.created)
221 def test_07_getClliNetwork(self):
222 url = ("{}/config/ietf-network:networks/network/clli-network"
223 .format(self.restconf_baseurl))
224 headers = {'content-type': 'application/json'}
225 response = requests.request(
226 "GET", url, headers=headers, auth=('admin', 'admin'))
227 self.assertEqual(response.status_code, requests.codes.ok)
228 res = response.json()
229 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
230 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
232 def test_08_getOpenRoadmNetwork(self):
233 url = ("{}/config/ietf-network:networks/network/openroadm-network"
234 .format(self.restconf_baseurl))
235 headers = {'content-type': 'application/json'}
236 response = requests.request(
237 "GET", url, headers=headers, auth=('admin', 'admin'))
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
240 nbNode = len(res['network'][0]['node'])
241 self.assertEqual(nbNode, 2)
242 for i in range(0, nbNode):
243 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
244 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
245 nodeId = res['network'][0]['node'][i]['node-id']
246 if(nodeId == 'XPDR-A1'):
247 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
248 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
249 elif(nodeId == 'ROADM-A1'):
250 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
251 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
253 self.assertFalse(True)
255 def test_09_getNodes_OpenRoadmTopology(self):
256 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
257 .format(self.restconf_baseurl))
258 headers = {'content-type': 'application/json'}
259 response = requests.request(
260 "GET", url, headers=headers, auth=('admin', 'admin'))
261 res = response.json()
262 # Tests related to nodes
263 self.assertEqual(response.status_code, requests.codes.ok)
264 nbNode = len(res['network'][0]['node'])
265 self.assertEqual(nbNode, 5)
266 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
267 for i in range(0, nbNode):
268 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
269 nodeId = res['network'][0]['node'][i]['node-id']
270 # Tests related to XPDRA nodes
271 if(nodeId == 'XPDR-A1-XPDR1'):
272 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
273 res['network'][0]['node'][i]['supporting-node'])
274 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
275 res['network'][0]['node'][i]['supporting-node'])
276 self.assertEqual(nodeType, 'XPONDER')
277 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
280 for j in range(0, nbTps):
281 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
282 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
283 if (tpType == 'XPONDER-CLIENT'):
285 elif (tpType == 'XPONDER-NETWORK'):
287 if (tpId == 'XPDR1-NETWORK2'):
288 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
289 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT2')
290 if (tpId == 'XPDR1-CLIENT2'):
291 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
292 ['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
294 self.assertTrue(client == 2)
295 self.assertTrue(network == 2)
296 listNode.remove(nodeId)
297 elif(nodeId == 'ROADM-A1-SRG1'):
298 # Test related to SRG1
299 self.assertEqual(nodeType, 'SRG')
300 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
301 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
302 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
303 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
304 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
305 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
306 res['network'][0]['node'][i]['supporting-node'])
307 listNode.remove(nodeId)
308 elif(nodeId == 'ROADM-A1-SRG3'):
309 # Test related to SRG1
310 self.assertEqual(nodeType, 'SRG')
311 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
312 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
313 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
314 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
315 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
316 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
317 res['network'][0]['node'][i]['supporting-node'])
318 listNode.remove(nodeId)
319 elif(nodeId == 'ROADM-A1-DEG1'):
320 # Test related to DEG1
321 self.assertEqual(nodeType, 'DEGREE')
322 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
323 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
324 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
325 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
326 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
327 res['network'][0]['node'][i]['supporting-node'])
328 listNode.remove(nodeId)
329 elif(nodeId == 'ROADM-A1-DEG2'):
330 # Test related to DEG2
331 self.assertEqual(nodeType, 'DEGREE')
332 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
333 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
334 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
335 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
336 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
337 res['network'][0]['node'][i]['supporting-node'])
338 listNode.remove(nodeId)
340 self.assertFalse(True)
341 self.assertEqual(len(listNode), 0)
343 # Connect the tail XPDRA to ROADMA and vice versa
344 def test_10_connect_tail_xpdr_rdm(self):
345 # Connect the tail: XPDRA to ROADMA
346 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
347 .format(self.restconf_baseurl))
348 data = {"networkutils:input": {
349 "networkutils:links-input": {
350 "networkutils:xpdr-node": "XPDR-A1",
351 "networkutils:xpdr-num": "1",
352 "networkutils:network-num": "1",
353 "networkutils:rdm-node": "ROADM-A1",
354 "networkutils:srg-num": "1",
355 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
359 headers = {'content-type': 'application/json'}
360 response = requests.request(
361 "POST", url, data=json.dumps(data), headers=headers,
362 auth=('admin', 'admin'))
363 self.assertEqual(response.status_code, requests.codes.ok)
365 def test_11_connect_tail_rdm_xpdr(self):
366 # Connect the tail: ROADMA to XPDRA
367 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
368 .format(self.restconf_baseurl))
369 data = {"networkutils:input": {
370 "networkutils:links-input": {
371 "networkutils:xpdr-node": "XPDR-A1",
372 "networkutils:xpdr-num": "1",
373 "networkutils:network-num": "1",
374 "networkutils:rdm-node": "ROADM-A1",
375 "networkutils:srg-num": "1",
376 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
380 headers = {'content-type': 'application/json'}
381 response = requests.request(
382 "POST", url, data=json.dumps(data), headers=headers,
383 auth=('admin', 'admin'))
384 self.assertEqual(response.status_code, requests.codes.ok)
386 def test_12_getLinks_OpenRoadmTopology(self):
387 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
388 .format(self.restconf_baseurl))
389 headers = {'content-type': 'application/json'}
390 response = requests.request(
391 "GET", url, headers=headers, auth=('admin', 'admin'))
392 self.assertEqual(response.status_code, requests.codes.ok)
393 res = response.json()
394 # Tests related to links
395 nbLink = len(res['network'][0]['ietf-network-topology:link'])
396 self.assertEqual(nbLink, 12)
397 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
398 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
399 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
400 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
401 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
402 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
403 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
404 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
405 for i in range(0, nbLink):
406 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
407 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
408 if(nodeType == 'EXPRESS-LINK'):
409 find = linkId in expressLink
410 self.assertEqual(find, True)
411 expressLink.remove(linkId)
412 elif(nodeType == 'ADD-LINK'):
413 find = linkId in addLink
414 self.assertEqual(find, True)
415 addLink.remove(linkId)
416 elif(nodeType == 'DROP-LINK'):
417 find = linkId in dropLink
418 self.assertEqual(find, True)
419 dropLink.remove(linkId)
420 elif(nodeType == 'XPONDER-INPUT'):
421 find = linkId in XPDR_IN
422 self.assertEqual(find, True)
423 XPDR_IN.remove(linkId)
424 elif(nodeType == 'XPONDER-OUTPUT'):
425 find = linkId in XPDR_OUT
426 self.assertEqual(find, True)
427 XPDR_OUT.remove(linkId)
429 self.assertFalse(True)
430 self.assertEqual(len(expressLink), 0)
431 self.assertEqual(len(addLink), 0)
432 self.assertEqual(len(dropLink), 0)
433 self.assertEqual(len(XPDR_IN), 0)
434 self.assertEqual(len(XPDR_OUT), 0)
436 def test_13_connect_ROADMC(self):
438 url = ("{}/config/network-topology:"
439 "network-topology/topology/topology-netconf/node/ROADM-C1"
440 .format(self.restconf_baseurl))
442 "node-id": "ROADM-C1",
443 "netconf-node-topology:username": "admin",
444 "netconf-node-topology:password": "admin",
445 "netconf-node-topology:host": "127.0.0.1",
446 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
447 "netconf-node-topology:tcp-only": "false",
448 "netconf-node-topology:pass-through": {}}]}
449 headers = {'content-type': 'application/json'}
450 response = requests.request(
451 "PUT", url, data=json.dumps(data), headers=headers,
452 auth=('admin', 'admin'))
453 self.assertEqual(response.status_code, requests.codes.created)
456 def test_14_omsAttributes_ROADMA_ROADMC(self):
457 # Config ROADMA-ROADMC oms-attributes
458 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
459 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
460 "OMS-attributes/span"
461 .format(self.restconf_baseurl))
463 "auto-spanloss": "true",
464 "engineered-spanloss": 12.2,
465 "link-concatenation": [{
468 "SRLG-length": 100000,
470 headers = {'content-type': 'application/json'}
471 response = requests.request(
472 "PUT", url, data=json.dumps(data), headers=headers,
473 auth=('admin', 'admin'))
474 self.assertEqual(response.status_code, requests.codes.created)
476 def test_15_omsAttributes_ROADMC_ROADMA(self):
477 # Config ROADM-C1-ROADM-A1 oms-attributes
478 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
479 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
480 "OMS-attributes/span"
481 .format(self.restconf_baseurl))
483 "auto-spanloss": "true",
484 "engineered-spanloss": 12.2,
485 "link-concatenation": [{
488 "SRLG-length": 100000,
491 headers = {'content-type': 'application/json'}
492 response = requests.request(
493 "PUT", url, data=json.dumps(data), headers=headers,
494 auth=('admin', 'admin'))
495 self.assertEqual(response.status_code, requests.codes.created)
497 def test_16_getClliNetwork(self):
498 url = ("{}/config/ietf-network:networks/network/clli-network"
499 .format(self.restconf_baseurl))
500 headers = {'content-type': 'application/json'}
501 response = requests.request(
502 "GET", url, headers=headers, auth=('admin', 'admin'))
503 self.assertEqual(response.status_code, requests.codes.ok)
504 res = response.json()
505 nbNode = len(res['network'][0]['node'])
506 listNode = ['NodeA', 'NodeC']
507 for i in range(0, nbNode):
508 nodeId = res['network'][0]['node'][i]['node-id']
509 find = nodeId in listNode
510 self.assertEqual(find, True)
511 if(nodeId == 'NodeA'):
512 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
514 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
515 listNode.remove(nodeId)
516 self.assertEqual(len(listNode), 0)
518 def test_17_getOpenRoadmNetwork(self):
519 url = ("{}/config/ietf-network:networks/network/openroadm-network"
520 .format(self.restconf_baseurl))
521 headers = {'content-type': 'application/json'}
522 response = requests.request(
523 "GET", url, headers=headers, auth=('admin', 'admin'))
524 self.assertEqual(response.status_code, requests.codes.ok)
525 res = response.json()
526 nbNode = len(res['network'][0]['node'])
527 self.assertEqual(nbNode, 3)
528 listNode = ['XPDR-A1', 'ROADM-A1', 'ROADM-C1']
529 for i in range(0, nbNode):
530 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
531 nodeId = res['network'][0]['node'][i]['node-id']
532 if(nodeId == 'XPDR-A1'):
533 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
534 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
535 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
536 listNode.remove(nodeId)
537 elif(nodeId == 'ROADM-A1'):
538 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
539 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
540 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
541 listNode.remove(nodeId)
542 elif(nodeId == 'ROADM-C1'):
543 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
544 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
545 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
546 listNode.remove(nodeId)
548 self.assertFalse(True)
549 self.assertEqual(len(listNode), 0)
551 def test_18_getROADMLinkOpenRoadmTopology(self):
552 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
553 .format(self.restconf_baseurl))
554 headers = {'content-type': 'application/json'}
555 response = requests.request(
556 "GET", url, headers=headers, auth=('admin', 'admin'))
557 self.assertEqual(response.status_code, requests.codes.ok)
558 res = response.json()
559 # Tests related to links
560 nbLink = len(res['network'][0]['ietf-network-topology:link'])
561 self.assertEqual(nbLink, 20)
562 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX', 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
563 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX', 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
564 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
565 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
566 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX', 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
567 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
568 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
569 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX', 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
570 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
571 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
572 XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
573 XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
574 for i in range(0, nbLink):
575 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
576 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
577 if(nodeType == 'EXPRESS-LINK'):
578 find = linkId in expressLink
579 self.assertEqual(find, True)
580 expressLink.remove(linkId)
581 elif(nodeType == 'ADD-LINK'):
582 find = linkId in addLink
583 self.assertEqual(find, True)
584 addLink.remove(linkId)
585 elif(nodeType == 'DROP-LINK'):
586 find = linkId in dropLink
587 self.assertEqual(find, True)
588 dropLink.remove(linkId)
589 elif(nodeType == 'ROADM-TO-ROADM'):
590 find = linkId in R2RLink
591 self.assertEqual(find, True)
592 R2RLink.remove(linkId)
593 elif(nodeType == 'XPONDER-INPUT'):
594 find = linkId in XPDR_IN
595 self.assertEqual(find, True)
596 XPDR_IN.remove(linkId)
597 elif(nodeType == 'XPONDER-OUTPUT'):
598 find = linkId in XPDR_OUT
599 self.assertEqual(find, True)
600 XPDR_OUT.remove(linkId)
602 self.assertFalse(True)
603 self.assertEqual(len(expressLink), 0)
604 self.assertEqual(len(addLink), 0)
605 self.assertEqual(len(dropLink), 0)
606 self.assertEqual(len(R2RLink), 0)
607 self.assertEqual(len(XPDR_IN), 0)
608 self.assertEqual(len(XPDR_OUT), 0)
610 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
611 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
612 .format(self.restconf_baseurl))
613 headers = {'content-type': 'application/json'}
614 response = requests.request(
615 "GET", url, headers=headers, auth=('admin', 'admin'))
616 self.assertEqual(response.status_code, requests.codes.ok)
617 res = response.json()
618 # Tests related to links
619 nbLink = len(res['network'][0]['ietf-network-topology:link'])
620 self.assertEqual(nbLink, 20)
621 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
622 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
623 for i in range(0, nbLink):
624 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
625 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
626 if(link_id in R2RLink):
628 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
629 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
630 length = res['network'][0]['ietf-network-topology:link'][i][
631 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
632 if((spanLoss != None) & (length != None)):
634 self.assertTrue(find)
635 R2RLink.remove(link_id)
636 self.assertEqual(len(R2RLink), 0)
638 def test_20_getNodes_OpenRoadmTopology(self):
639 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
640 .format(self.restconf_baseurl))
641 headers = {'content-type': 'application/json'}
642 response = requests.request(
643 "GET", url, headers=headers, auth=('admin', 'admin'))
644 res = response.json()
645 # Tests related to nodes
646 self.assertEqual(response.status_code, requests.codes.ok)
647 nbNode = len(res['network'][0]['node'])
648 self.assertEqual(nbNode, 8)
649 listNode = ['XPDR-A1-XPDR1',
650 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2',
651 'ROADM-C1-SRG1', 'ROADM-C1-DEG1', 'ROADM-C1-DEG2']
652 # ************************Tests related to XPDRA nodes
653 for i in range(0, nbNode):
654 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
655 nodeId = res['network'][0]['node'][i]['node-id']
656 if(nodeId == 'XPDR-A1-XPDR1'):
657 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
658 res['network'][0]['node'][i]['supporting-node'])
659 self.assertEqual(nodeType, 'XPONDER')
660 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
661 self.assertTrue(nbTps >= 4)
664 for j in range(0, nbTps):
665 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
666 if (tpType == 'XPONDER-CLIENT'):
668 elif (tpType == 'XPONDER-NETWORK'):
670 self.assertTrue(client == 2)
671 self.assertTrue(network == 2)
672 listNode.remove(nodeId)
673 elif(nodeId == 'ROADM-A1-SRG1'):
674 # Test related to SRG1
675 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
676 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
677 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
678 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
679 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
680 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
681 res['network'][0]['node'][i]['supporting-node'])
682 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
683 listNode.remove(nodeId)
684 elif(nodeId == 'ROADM-A1-SRG3'):
685 # Test related to SRG1
686 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
687 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
688 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
689 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
690 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
691 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
692 res['network'][0]['node'][i]['supporting-node'])
693 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
694 listNode.remove(nodeId)
695 elif(nodeId == 'ROADM-A1-DEG1'):
696 # Test related to DEG1
697 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
698 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
699 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
700 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
701 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
702 res['network'][0]['node'][i]['supporting-node'])
703 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
704 listNode.remove(nodeId)
705 elif(nodeId == 'ROADM-A1-DEG2'):
706 # Test related to DEG2
707 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
708 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
709 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
710 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
711 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
712 res['network'][0]['node'][i]['supporting-node'])
713 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
714 listNode.remove(nodeId)
715 elif(nodeId == 'ROADM-C1-SRG1'):
716 # Test related to SRG1
717 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
718 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
719 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
720 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
721 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
722 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
723 res['network'][0]['node'][i]['supporting-node'])
724 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
725 listNode.remove(nodeId)
726 elif(nodeId == 'ROADM-C1-DEG1'):
727 # Test related to DEG1
728 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
729 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
730 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
731 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
732 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
733 res['network'][0]['node'][i]['supporting-node'])
734 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
735 listNode.remove(nodeId)
736 elif(nodeId == 'ROADM-C1-DEG2'):
737 # Test related to DEG1
738 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
739 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
740 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
741 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
742 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
743 res['network'][0]['node'][i]['supporting-node'])
744 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
745 listNode.remove(nodeId)
747 self.assertFalse(True)
748 self.assertEqual(len(listNode), 0)
750 def test_21_connect_ROADMB(self):
751 url = ("{}/config/network-topology:"
752 "network-topology/topology/topology-netconf/node/ROADM-B1"
753 .format(self.restconf_baseurl))
755 "node-id": "ROADM-B1",
756 "netconf-node-topology:username": "admin",
757 "netconf-node-topology:password": "admin",
758 "netconf-node-topology:host": "127.0.0.1",
759 "netconf-node-topology:port": test_utils.sims['roadmb']['port'],
760 "netconf-node-topology:tcp-only": "false",
761 "netconf-node-topology:pass-through": {}}]}
762 headers = {'content-type': 'application/json'}
763 response = requests.request(
764 "PUT", url, data=json.dumps(data), headers=headers,
765 auth=('admin', 'admin'))
766 self.assertEqual(response.status_code, requests.codes.created)
769 def test_22_omsAttributes_ROADMA_ROADMB(self):
770 # Config ROADM-A1-ROADM-B1 oms-attributes
771 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
772 "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
773 "OMS-attributes/span"
774 .format(self.restconf_baseurl))
776 "auto-spanloss": "true",
777 "engineered-spanloss": 12.2,
778 "spanloss-current": 12,
779 "spanloss-base": 11.4,
780 "link-concatenation": [{
783 "SRLG-length": 100000,
785 headers = {'content-type': 'application/json'}
786 response = requests.request(
787 "PUT", url, data=json.dumps(data), headers=headers,
788 auth=('admin', 'admin'))
789 self.assertEqual(response.status_code, requests.codes.created)
791 def test_23_omsAttributes_ROADMB_ROADMA(self):
792 # Config ROADM-B1-ROADM-A1 oms-attributes
793 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
794 "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
795 "OMS-attributes/span"
796 .format(self.restconf_baseurl))
798 "auto-spanloss": "true",
799 "engineered-spanloss": 12.2,
800 "spanloss-current": 12,
801 "spanloss-base": 11.4,
802 "link-concatenation": [{
805 "SRLG-length": 100000,
807 headers = {'content-type': 'application/json'}
808 response = requests.request(
809 "PUT", url, data=json.dumps(data), headers=headers,
810 auth=('admin', 'admin'))
811 self.assertEqual(response.status_code, requests.codes.created)
813 def test_24_omsAttributes_ROADMB_ROADMC(self):
814 # Config ROADM-B1-ROADM-C1 oms-attributes
815 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
816 "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
817 "OMS-attributes/span"
818 .format(self.restconf_baseurl))
820 "auto-spanloss": "true",
821 "engineered-spanloss": 12.2,
822 "spanloss-current": 12,
823 "spanloss-base": 11.4,
824 "link-concatenation": [{
827 "SRLG-length": 100000,
829 headers = {'content-type': 'application/json'}
830 response = requests.request(
831 "PUT", url, data=json.dumps(data), headers=headers,
832 auth=('admin', 'admin'))
833 self.assertEqual(response.status_code, requests.codes.created)
835 def test_25_omsAttributes_ROADMC_ROADMB(self):
836 # Config ROADM-C1-ROADM-B1 oms-attributes
837 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
838 "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
839 "OMS-attributes/span"
840 .format(self.restconf_baseurl))
842 "auto-spanloss": "true",
843 "engineered-spanloss": 12.2,
844 "link-concatenation": [{
847 "SRLG-length": 100000,
849 headers = {'content-type': 'application/json'}
850 response = requests.request(
851 "PUT", url, data=json.dumps(data), headers=headers,
852 auth=('admin', 'admin'))
853 self.assertEqual(response.status_code, requests.codes.created)
855 def test_26_getClliNetwork(self):
856 url = ("{}/config/ietf-network:networks/network/clli-network"
857 .format(self.restconf_baseurl))
858 headers = {'content-type': 'application/json'}
859 response = requests.request(
860 "GET", url, headers=headers, auth=('admin', 'admin'))
861 self.assertEqual(response.status_code, requests.codes.ok)
862 res = response.json()
863 nbNode = len(res['network'][0]['node'])
864 listNode = ['NodeA', 'NodeB', 'NodeC']
865 for i in range(0, nbNode):
866 nodeId = res['network'][0]['node'][i]['node-id']
867 find = nodeId in listNode
868 self.assertEqual(find, True)
869 if(nodeId == 'NodeA'):
870 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
871 elif(nodeId == 'NodeB'):
872 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
874 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
875 listNode.remove(nodeId)
876 self.assertEqual(len(listNode), 0)
878 def test_27_verifyDegree(self):
879 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
880 .format(self.restconf_baseurl))
881 headers = {'content-type': 'application/json'}
882 response = requests.request(
883 "GET", url, headers=headers, auth=('admin', 'admin'))
884 self.assertEqual(response.status_code, requests.codes.ok)
885 res = response.json()
886 # Tests related to links
887 nbLink = len(res['network'][0]['ietf-network-topology:link'])
888 listR2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
889 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
890 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX', 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
891 for i in range(0, nbLink):
892 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
893 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
894 find = link_id in listR2RLink
895 self.assertEqual(find, True)
896 listR2RLink.remove(link_id)
897 self.assertEqual(len(listR2RLink), 0)
899 def test_28_verifyOppositeLinkTopology(self):
900 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
901 .format(self.restconf_baseurl))
902 headers = {'content-type': 'application/json'}
903 response = requests.request(
904 "GET", url, headers=headers, auth=('admin', 'admin'))
905 self.assertEqual(response.status_code, requests.codes.ok)
906 res = response.json()
907 # Tests related to links
908 nbLink = len(res['network'][0]['ietf-network-topology:link'])
909 self.assertEqual(nbLink, 26)
910 for i in range(0, nbLink):
911 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
912 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
913 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
914 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
915 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
916 # Find the opposite link
917 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
918 url = (url_oppLink.format(self.restconf_baseurl))
919 headers = {'content-type': 'application/json'}
920 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
921 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
922 res_oppLink = response_oppLink.json()
923 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
924 ['org-openroadm-common-network:opposite-link'], link_id)
925 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
926 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
927 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
928 if link_type == 'ADD-LINK':
929 self.assertEqual(oppLink_type, 'DROP-LINK')
930 elif link_type == 'DROP-LINK':
931 self.assertEqual(oppLink_type, 'ADD-LINK')
932 elif link_type == 'EXPRESS-LINK':
933 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
934 elif link_type == 'ROADM-TO-ROADM':
935 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
936 elif link_type == 'XPONDER-INPUT':
937 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
938 elif link_type == 'XPONDER-OUTPUT':
939 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
941 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
942 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
943 .format(self.restconf_baseurl))
944 headers = {'content-type': 'application/json'}
945 response = requests.request(
946 "GET", url, headers=headers, auth=('admin', 'admin'))
947 self.assertEqual(response.status_code, requests.codes.ok)
948 res = response.json()
949 nbLink = len(res['network'][0]['ietf-network-topology:link'])
950 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
951 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
952 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
953 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
954 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
955 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
956 for i in range(0, nbLink):
957 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
958 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
959 if(link_id in R2RLink):
961 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
962 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
963 length = res['network'][0]['ietf-network-topology:link'][i][
964 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
965 if((spanLoss != None) & (length != None)):
967 self.assertTrue(find)
968 R2RLink.remove(link_id)
969 self.assertEqual(len(R2RLink), 0)
971 def test_30_disconnect_ROADMB(self):
972 # Delete in the topology-netconf
973 url = ("{}/config/network-topology:"
974 "network-topology/topology/topology-netconf/node/ROADM-B1"
975 .format(self.restconf_baseurl))
977 headers = {'content-type': 'application/json'}
978 response = requests.request(
979 "DELETE", url, data=json.dumps(data), headers=headers,
980 auth=('admin', 'admin'))
981 self.assertEqual(response.status_code, requests.codes.ok)
982 # Delete in the clli-network
983 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
984 .format(self.restconf_baseurl))
986 headers = {'content-type': 'application/json'}
987 response = requests.request(
988 "DELETE", url, data=json.dumps(data), headers=headers,
989 auth=('admin', 'admin'))
990 self.assertEqual(response.status_code, requests.codes.ok)
992 def test_31_disconnect_ROADMC(self):
993 # Delete in the topology-netconf
994 url = ("{}/config/network-topology:"
995 "network-topology/topology/topology-netconf/node/ROADM-C1"
996 .format(self.restconf_baseurl))
998 headers = {'content-type': 'application/json'}
999 response = requests.request(
1000 "DELETE", url, data=json.dumps(data), headers=headers,
1001 auth=('admin', 'admin'))
1002 self.assertEqual(response.status_code, requests.codes.ok)
1003 # Delete in the clli-network
1004 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1005 .format(self.restconf_baseurl))
1007 headers = {'content-type': 'application/json'}
1008 response = requests.request(
1009 "DELETE", url, data=json.dumps(data), headers=headers,
1010 auth=('admin', 'admin'))
1011 self.assertEqual(response.status_code, requests.codes.ok)
1013 # def test_24_check_roadm2roadm_links_deletion(self):
1014 # url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1015 # .format(self.restconf_baseurl))
1016 # headers = {'content-type': 'application/json'}
1017 # response = requests.request(
1018 # "GET", url, headers=headers, auth=('admin', 'admin'))
1019 # self.assertEqual(response.status_code, requests.codes.ok)
1020 # res = response.json()
1021 # #Write the response in the log
1022 # with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1023 # outfile1.write(str(res))
1024 # #Tests related to links
1025 # nbLink=len(res['network'][0]['ietf-network-topology:link'])
1026 # self.assertEqual(nbLink,8)
1027 # expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1028 # addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1029 # dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1030 # XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1031 # XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1032 # for i in range(0,nbLink):
1033 # nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1034 # linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1035 # if(nodeType=='EXPRESS-LINK'):
1036 # find= linkId in expressLink
1037 # self.assertEqual(find, True)
1038 # expressLink.remove(linkId)
1039 # elif(nodeType=='ADD-LINK'):
1040 # find= linkId in addLink
1041 # self.assertEqual(find, True)
1042 # addLink.remove(linkId)
1043 # elif(nodeType=='DROP-LINK'):
1044 # find= linkId in dropLink
1045 # self.assertEqual(find, True)
1046 # dropLink.remove(linkId)
1047 # elif(nodeType=='XPONDER-INPUT'):
1048 # find= linkId in XPDR_IN
1049 # self.assertEqual(find, True)
1050 # XPDR_IN.remove(linkId)
1051 # elif(nodeType=='XPONDER-OUTPUT'):
1052 # find= linkId in XPDR_OUT
1053 # self.assertEqual(find, True)
1054 # XPDR_OUT.remove(linkId)
1056 # self.assertFalse(True)
1057 # self.assertEqual(len(expressLink),0)
1058 # self.assertEqual(len(addLink),0)
1059 # self.assertEqual(len(dropLink),0)
1060 # self.assertEqual(len(XPDR_IN),0)
1061 # self.assertEqual(len(XPDR_OUT),0)
1063 # for i in range(0,nbLink):
1064 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
1065 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1066 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1067 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1068 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1069 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1070 # self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1072 def test_32_getNodes_OpenRoadmTopology(self):
1073 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1074 .format(self.restconf_baseurl))
1075 headers = {'content-type': 'application/json'}
1076 response = requests.request(
1077 "GET", url, headers=headers, auth=('admin', 'admin'))
1078 res = response.json()
1079 # Tests related to nodes
1080 self.assertEqual(response.status_code, requests.codes.ok)
1081 nbNode = len(res['network'][0]['node'])
1082 self.assertEqual(nbNode, 5)
1083 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1084 for i in range(0, nbNode):
1085 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1086 nodeId = res['network'][0]['node'][i]['node-id']
1087 # Tests related to XPDRA nodes
1088 if(nodeId == 'XPDR-A1-XPDR1'):
1089 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1090 for j in range(0, nbTp):
1091 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1092 if (tpid == 'XPDR1-CLIENT1'):
1093 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1094 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1095 if (tpid == 'XPDR1-NETWORK1'):
1096 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1097 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1098 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1099 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1100 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
1101 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
1102 res['network'][0]['node'][i]['supporting-node'])
1103 listNode.remove(nodeId)
1104 elif(nodeId == 'ROADM-A1-SRG1'):
1105 # Test related to SRG1
1106 self.assertEqual(nodeType, 'SRG')
1107 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1108 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1109 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1110 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1111 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1112 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1113 res['network'][0]['node'][i]['supporting-node'])
1114 listNode.remove(nodeId)
1115 elif(nodeId == 'ROADM-A1-SRG3'):
1116 # Test related to SRG1
1117 self.assertEqual(nodeType, 'SRG')
1118 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1119 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1120 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1121 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1122 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1123 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1124 res['network'][0]['node'][i]['supporting-node'])
1125 listNode.remove(nodeId)
1126 elif(nodeId == 'ROADM-A1-DEG1'):
1127 # Test related to DEG1
1128 self.assertEqual(nodeType, 'DEGREE')
1129 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1130 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1131 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1132 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1133 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1134 res['network'][0]['node'][i]['supporting-node'])
1135 listNode.remove(nodeId)
1136 elif(nodeId == 'ROADM-A1-DEG2'):
1137 # Test related to DEG2
1138 self.assertEqual(nodeType, 'DEGREE')
1139 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1140 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1141 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1142 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1143 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1144 res['network'][0]['node'][i]['supporting-node'])
1145 listNode.remove(nodeId)
1147 self.assertFalse(True)
1148 self.assertEqual(len(listNode), 0)
1149 # Test related to SRG1 of ROADMC
1150 for i in range(0, nbNode):
1151 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-SRG1')
1152 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG1')
1153 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG2')
1155 def test_33_getOpenRoadmNetwork(self):
1156 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1157 .format(self.restconf_baseurl))
1158 headers = {'content-type': 'application/json'}
1159 response = requests.request(
1160 "GET", url, headers=headers, auth=('admin', 'admin'))
1161 self.assertEqual(response.status_code, requests.codes.ok)
1162 res = response.json()
1163 nbNode = len(res['network'][0]['node'])
1164 self.assertEqual(nbNode, 2)
1165 for i in range(0, nbNode-1):
1166 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1')
1167 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-B1')
1169 def test_34_getClliNetwork(self):
1170 url = ("{}/config/ietf-network:networks/network/clli-network"
1171 .format(self.restconf_baseurl))
1172 headers = {'content-type': 'application/json'}
1173 response = requests.request(
1174 "GET", url, headers=headers, auth=('admin', 'admin'))
1175 self.assertEqual(response.status_code, requests.codes.ok)
1176 res = response.json()
1177 nbNode = len(res['network'][0]['node'])
1178 self.assertEqual(nbNode, 1)
1179 for i in range(0, nbNode-1):
1180 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1182 def test_35_disconnect_XPDRA(self):
1183 url = ("{}/config/network-topology:"
1184 "network-topology/topology/topology-netconf/node/XPDR-A1"
1185 .format(self.restconf_baseurl))
1187 headers = {'content-type': 'application/json'}
1188 response = requests.request(
1189 "DELETE", url, data=json.dumps(data), headers=headers,
1190 auth=('admin', 'admin'))
1191 self.assertEqual(response.status_code, requests.codes.ok)
1193 def test_36_getClliNetwork(self):
1194 url = ("{}/config/ietf-network:networks/network/clli-network"
1195 .format(self.restconf_baseurl))
1196 headers = {'content-type': 'application/json'}
1197 response = requests.request(
1198 "GET", url, headers=headers, auth=('admin', 'admin'))
1199 self.assertEqual(response.status_code, requests.codes.ok)
1200 res = response.json()
1201 nbNode = len(res['network'][0]['node'])
1202 self.assertEqual(nbNode, 1)
1203 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1205 def test_37_getOpenRoadmNetwork(self):
1206 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1207 .format(self.restconf_baseurl))
1208 headers = {'content-type': 'application/json'}
1209 response = requests.request(
1210 "GET", url, headers=headers, auth=('admin', 'admin'))
1211 self.assertEqual(response.status_code, requests.codes.ok)
1212 res = response.json()
1213 nbNode = len(res['network'][0]['node'])
1214 self.assertEqual(nbNode, 1)
1215 for i in range(0, nbNode):
1216 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDR-A1')
1218 def test_38_getNodes_OpenRoadmTopology(self):
1219 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1220 .format(self.restconf_baseurl))
1221 headers = {'content-type': 'application/json'}
1222 response = requests.request(
1223 "GET", url, headers=headers, auth=('admin', 'admin'))
1224 res = response.json()
1225 # Tests related to nodes
1226 self.assertEqual(response.status_code, requests.codes.ok)
1227 nbNode = len(res['network'][0]['node'])
1228 self.assertEqual(nbNode, 4)
1229 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1230 for i in range(0, nbNode):
1231 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1232 res['network'][0]['node'][i]['supporting-node'])
1233 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1234 nodeId = res['network'][0]['node'][i]['node-id']
1235 if(nodeId == 'ROADM-A1-SRG1'):
1236 # Test related to SRG1
1237 self.assertEqual(nodeType, 'SRG')
1238 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1239 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1240 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1241 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1242 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1243 listNode.remove(nodeId)
1244 elif(nodeId == 'ROADM-A1-SRG3'):
1245 # Test related to SRG1
1246 self.assertEqual(nodeType, 'SRG')
1247 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1248 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1249 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1250 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1251 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1252 listNode.remove(nodeId)
1253 elif(nodeId == 'ROADM-A1-DEG1'):
1254 # Test related to DEG1
1255 self.assertEqual(nodeType, 'DEGREE')
1256 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1257 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1258 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1259 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1260 listNode.remove(nodeId)
1261 elif(nodeId == 'ROADM-A1-DEG2'):
1262 # Test related to DEG2
1263 self.assertEqual(nodeType, 'DEGREE')
1264 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1265 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1266 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1267 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1268 listNode.remove(nodeId)
1270 self.assertFalse(True)
1271 self.assertEqual(len(listNode), 0)
1273 def test_39_disconnect_ROADM_XPDRA_link(self):
1275 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1276 "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1277 .format(self.restconf_baseurl))
1279 headers = {'content-type': 'application/json'}
1280 response = requests.request(
1281 "DELETE", url, data=json.dumps(data), headers=headers,
1282 auth=('admin', 'admin'))
1283 self.assertEqual(response.status_code, requests.codes.ok)
1285 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1286 "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1287 .format(self.restconf_baseurl))
1289 headers = {'content-type': 'application/json'}
1290 response = requests.request(
1291 "DELETE", url, data=json.dumps(data), headers=headers,
1292 auth=('admin', 'admin'))
1293 self.assertEqual(response.status_code, requests.codes.ok)
1295 def test_40_getLinks_OpenRoadmTopology(self):
1296 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1297 .format(self.restconf_baseurl))
1298 headers = {'content-type': 'application/json'}
1299 response = requests.request(
1300 "GET", url, headers=headers, auth=('admin', 'admin'))
1301 self.assertEqual(response.status_code, requests.codes.ok)
1302 res = response.json()
1303 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1304 self.assertEqual(nbLink, 16)
1305 expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1306 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1307 addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1308 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1309 dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1310 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1311 roadmtoroadmLink = 0
1312 for i in range(0, nbLink):
1313 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1314 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1315 find = link_id in expressLink
1316 self.assertEqual(find, True)
1317 expressLink.remove(link_id)
1318 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1319 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1320 find = link_id in addLink
1321 self.assertEqual(find, True)
1322 addLink.remove(link_id)
1323 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1324 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1325 find = link_id in dropLink
1326 self.assertEqual(find, True)
1327 dropLink.remove(link_id)
1329 roadmtoroadmLink += 1
1330 self.assertEqual(len(expressLink), 0)
1331 self.assertEqual(len(addLink), 0)
1332 self.assertEqual(len(dropLink), 0)
1333 self.assertEqual(roadmtoroadmLink, 6)
1334 for i in range(0, nbLink):
1335 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1336 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1337 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1338 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1340 def test_41_disconnect_ROADMA(self):
1341 url = ("{}/config/network-topology:"
1342 "network-topology/topology/topology-netconf/node/ROADM-A1"
1343 .format(self.restconf_baseurl))
1345 headers = {'content-type': 'application/json'}
1346 response = requests.request(
1347 "DELETE", url, data=json.dumps(data), headers=headers,
1348 auth=('admin', 'admin'))
1349 self.assertEqual(response.status_code, requests.codes.ok)
1350 # Delete in the clli-network
1351 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1352 .format(self.restconf_baseurl))
1354 headers = {'content-type': 'application/json'}
1355 response = requests.request(
1356 "DELETE", url, data=json.dumps(data), headers=headers,
1357 auth=('admin', 'admin'))
1358 self.assertEqual(response.status_code, requests.codes.ok)
1360 def test_42_getClliNetwork(self):
1361 url = ("{}/config/ietf-network:networks/network/clli-network"
1362 .format(self.restconf_baseurl))
1363 headers = {'content-type': 'application/json'}
1364 response = requests.request(
1365 "GET", url, headers=headers, auth=('admin', 'admin'))
1366 self.assertEqual(response.status_code, requests.codes.ok)
1367 res = response.json()
1368 self.assertNotIn('node', res['network'][0])
1370 def test_43_getOpenRoadmNetwork(self):
1371 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1372 .format(self.restconf_baseurl))
1373 headers = {'content-type': 'application/json'}
1374 response = requests.request(
1375 "GET", url, headers=headers, auth=('admin', 'admin'))
1376 self.assertEqual(response.status_code, requests.codes.ok)
1377 res = response.json()
1378 self.assertNotIn('node', res['network'][0])
1380 def test_44_check_roadm2roadm_link_persistence(self):
1381 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1382 .format(self.restconf_baseurl))
1383 headers = {'content-type': 'application/json'}
1384 response = requests.request(
1385 "GET", url, headers=headers, auth=('admin', 'admin'))
1386 self.assertEqual(response.status_code, requests.codes.ok)
1387 res = response.json()
1388 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1389 self.assertNotIn('node', res['network'][0])
1390 self.assertEqual(nbLink, 6)
1393 if __name__ == "__main__":
1394 unittest.main(verbosity=2)