3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
18 from common import test_utils
21 class TransportPCETopologyTesting(unittest.TestCase):
24 restconf_baseurl = "http://localhost:8181/restconf"
28 cls.processes = test_utils.start_tpce()
29 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmb', 'roadmc'])
32 def tearDownClass(cls):
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
40 def test_01_connect_ROADMA(self):
42 url = ("{}/config/network-topology:"
43 "network-topology/topology/topology-netconf/node/ROADMA01"
44 .format(self.restconf_baseurl))
46 "node-id": "ROADMA01",
47 "netconf-node-topology:username": "admin",
48 "netconf-node-topology:password": "admin",
49 "netconf-node-topology:host": "127.0.0.1",
50 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
51 "netconf-node-topology:tcp-only": "false",
52 "netconf-node-topology:pass-through": {}}]}
53 headers = {'content-type': 'application/json'}
54 response = requests.request(
55 "PUT", url, data=json.dumps(data), headers=headers,
56 auth=('admin', 'admin'))
57 self.assertEqual(response.status_code, requests.codes.created)
60 def test_02_getClliNetwork(self):
61 url = ("{}/config/ietf-network:networks/network/clli-network"
62 .format(self.restconf_baseurl))
63 headers = {'content-type': 'application/json'}
64 response = requests.request(
65 "GET", url, headers=headers, auth=('admin', 'admin'))
66 self.assertEqual(response.status_code, requests.codes.ok)
68 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
69 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
71 def test_03_getOpenRoadmNetwork(self):
72 url = ("{}/config/ietf-network:networks/network/openroadm-network"
73 .format(self.restconf_baseurl))
74 headers = {'content-type': 'application/json'}
75 response = requests.request(
76 "GET", url, headers=headers, auth=('admin', 'admin'))
77 self.assertEqual(response.status_code, requests.codes.ok)
79 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADMA01')
80 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
81 self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
82 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
83 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], '2')
85 def test_04_getLinks_OpenroadmTopology(self):
86 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
87 .format(self.restconf_baseurl))
88 headers = {'content-type': 'application/json'}
89 response = requests.request(
90 "GET", url, headers=headers, auth=('admin', 'admin'))
91 self.assertEqual(response.status_code, requests.codes.ok)
93 # Tests related to links
94 nbLink = len(res['network'][0]['ietf-network-topology:link'])
95 self.assertEqual(nbLink, 10)
96 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
97 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
98 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
99 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
100 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
101 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
102 for i in range(0, nbLink):
103 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
104 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK':
105 find = linkId in expressLink
106 self.assertEqual(find, True)
107 expressLink.remove(linkId)
108 elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK':
109 find = linkId in addLink
110 self.assertEqual(find, True)
111 addLink.remove(linkId)
112 elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK':
113 find = linkId in dropLink
114 self.assertEqual(find, True)
115 dropLink.remove(linkId)
117 self.assertFalse(True)
118 self.assertEqual(len(expressLink), 0)
119 self.assertEqual(len(addLink), 0)
120 self.assertEqual(len(dropLink), 0)
122 def test_05_getNodes_OpenRoadmTopology(self):
123 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
124 .format(self.restconf_baseurl))
125 headers = {'content-type': 'application/json'}
126 response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
127 res = response.json()
128 # Tests related to nodes
129 self.assertEqual(response.status_code, requests.codes.ok)
130 nbNode = len(res['network'][0]['node'])
131 self.assertEqual(nbNode, 4)
132 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
133 for i in range(0, nbNode):
134 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
135 res['network'][0]['node'][i]['supporting-node'])
136 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
137 nodeId = res['network'][0]['node'][i]['node-id']
138 if nodeId == 'ROADMA01-SRG1':
139 # Test related to SRG1
140 self.assertEqual(nodeType, 'SRG')
141 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
142 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
143 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
144 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
145 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
146 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
147 res['network'][0]['node'][i]['supporting-node'])
148 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
149 res['network'][0]['node'][i]['supporting-node'])
150 listNode.remove(nodeId)
151 elif nodeId == 'ROADMA01-SRG3':
152 # Test related to SRG1
153 self.assertEqual(nodeType, 'SRG')
154 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
155 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
156 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
157 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
158 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
159 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
160 res['network'][0]['node'][i]['supporting-node'])
161 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
162 res['network'][0]['node'][i]['supporting-node'])
163 listNode.remove(nodeId)
164 elif nodeId == 'ROADMA01-DEG1':
165 # Test related to DEG1
166 self.assertEqual(nodeType, 'DEGREE')
167 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
168 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
169 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
170 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
171 listNode.remove(nodeId)
172 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
173 res['network'][0]['node'][i]['supporting-node'])
174 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
175 res['network'][0]['node'][i]['supporting-node'])
176 elif nodeId == 'ROADMA01-DEG2':
177 # Test related to DEG2
178 self.assertEqual(nodeType, 'DEGREE')
179 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
180 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
181 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
182 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
183 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
184 res['network'][0]['node'][i]['supporting-node'])
185 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
186 res['network'][0]['node'][i]['supporting-node'])
187 listNode.remove(nodeId)
189 self.assertFalse(True)
190 self.assertEqual(len(listNode), 0)
192 def test_06_connect_XPDRA(self):
193 url = ("{}/config/network-topology:"
194 "network-topology/topology/topology-netconf/node/XPDRA01"
195 .format(self.restconf_baseurl))
197 "node-id": "XPDRA01",
198 "netconf-node-topology:username": "admin",
199 "netconf-node-topology:password": "admin",
200 "netconf-node-topology:host": "127.0.0.1",
201 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
202 "netconf-node-topology:tcp-only": "false",
203 "netconf-node-topology:pass-through": {}}]}
204 headers = {'content-type': 'application/json'}
205 response = requests.request(
206 "PUT", url, data=json.dumps(data), headers=headers,
207 auth=('admin', 'admin'))
208 self.assertEqual(response.status_code, requests.codes.created)
211 def test_07_getClliNetwork(self):
212 url = ("{}/config/ietf-network:networks/network/clli-network"
213 .format(self.restconf_baseurl))
214 headers = {'content-type': 'application/json'}
215 response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
219 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
221 def test_08_getOpenRoadmNetwork(self):
222 url = ("{}/config/ietf-network:networks/network/openroadm-network"
223 .format(self.restconf_baseurl))
224 headers = {'content-type': 'application/json'}
225 response = requests.request(
226 "GET", url, headers=headers, auth=('admin', 'admin'))
227 self.assertEqual(response.status_code, requests.codes.ok)
228 res = response.json()
229 nbNode = len(res['network'][0]['node'])
230 self.assertEqual(nbNode, 2)
231 for i in range(0, nbNode):
232 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
233 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
234 nodeId = res['network'][0]['node'][i]['node-id']
235 if(nodeId == 'XPDRA01'):
236 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
237 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
238 elif(nodeId == 'ROADMA01'):
239 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
240 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
242 self.assertFalse(True)
244 def test_09_getNodes_OpenRoadmTopology(self):
245 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
246 .format(self.restconf_baseurl))
247 headers = {'content-type': 'application/json'}
248 response = requests.request(
249 "GET", url, headers=headers, auth=('admin', 'admin'))
250 res = response.json()
251 # Tests related to nodes
252 self.assertEqual(response.status_code, requests.codes.ok)
253 nbNode = len(res['network'][0]['node'])
254 self.assertEqual(nbNode, 5)
255 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
256 for i in range(0, nbNode):
257 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
258 nodeId = res['network'][0]['node'][i]['node-id']
259 # Tests related to XPDRA nodes
260 if(nodeId == 'XPDRA01-XPDR1'):
261 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
262 res['network'][0]['node'][i]['supporting-node'])
263 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
264 res['network'][0]['node'][i]['supporting-node'])
265 self.assertEqual(nodeType, 'XPONDER')
266 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
269 for j in range(0, nbTps):
270 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
271 tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
272 if (tpType == 'XPONDER-CLIENT'):
274 elif (tpType == 'XPONDER-NETWORK'):
276 if (tpId == 'XPDR1-NETWORK2'):
277 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
278 [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
279 if (tpId == 'XPDR1-CLIENT3'):
280 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
281 [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
282 self.assertTrue(client == 4)
283 self.assertTrue(network == 2)
284 listNode.remove(nodeId)
285 elif(nodeId == 'ROADMA01-SRG1'):
286 # Test related to SRG1
287 self.assertEqual(nodeType, 'SRG')
288 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
289 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
290 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
291 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
292 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
293 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
294 res['network'][0]['node'][i]['supporting-node'])
295 listNode.remove(nodeId)
296 elif(nodeId == 'ROADMA01-SRG3'):
297 # Test related to SRG1
298 self.assertEqual(nodeType, 'SRG')
299 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
300 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
301 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
302 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
303 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
304 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
305 res['network'][0]['node'][i]['supporting-node'])
306 listNode.remove(nodeId)
307 elif(nodeId == 'ROADMA01-DEG1'):
308 # Test related to DEG1
309 self.assertEqual(nodeType, 'DEGREE')
310 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
311 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
312 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
313 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
314 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
315 res['network'][0]['node'][i]['supporting-node'])
316 listNode.remove(nodeId)
317 elif(nodeId == 'ROADMA01-DEG2'):
318 # Test related to DEG2
319 self.assertEqual(nodeType, 'DEGREE')
320 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
321 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
322 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
323 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
324 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
325 res['network'][0]['node'][i]['supporting-node'])
326 listNode.remove(nodeId)
328 self.assertFalse(True)
329 self.assertEqual(len(listNode), 0)
331 # Connect the tail XPDRA to ROADMA and vice versa
332 def test_10_connect_tail_xpdr_rdm(self):
333 # Connect the tail: XPDRA to ROADMA
334 url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
335 .format(self.restconf_baseurl))
336 data = {"networkutils:input": {
337 "networkutils:links-input": {
338 "networkutils:xpdr-node": "XPDRA01",
339 "networkutils:xpdr-num": "1",
340 "networkutils:network-num": "1",
341 "networkutils:rdm-node": "ROADMA01",
342 "networkutils:srg-num": "1",
343 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
347 headers = {'content-type': 'application/json'}
348 response = requests.request(
349 "POST", url, data=json.dumps(data), headers=headers,
350 auth=('admin', 'admin'))
351 self.assertEqual(response.status_code, requests.codes.ok)
353 def test_11_connect_tail_rdm_xpdr(self):
354 # Connect the tail: ROADMA to XPDRA
355 url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
356 .format(self.restconf_baseurl))
357 data = {"networkutils:input": {
358 "networkutils:links-input": {
359 "networkutils:xpdr-node": "XPDRA01",
360 "networkutils:xpdr-num": "1",
361 "networkutils:network-num": "1",
362 "networkutils:rdm-node": "ROADMA01",
363 "networkutils:srg-num": "1",
364 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
368 headers = {'content-type': 'application/json'}
369 response = requests.request(
370 "POST", url, data=json.dumps(data), headers=headers,
371 auth=('admin', 'admin'))
372 self.assertEqual(response.status_code, requests.codes.ok)
374 def test_12_getLinks_OpenRoadmTopology(self):
375 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
376 .format(self.restconf_baseurl))
377 headers = {'content-type': 'application/json'}
378 response = requests.request(
379 "GET", url, headers=headers, auth=('admin', 'admin'))
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 # Tests related to links
383 nbLink = len(res['network'][0]['ietf-network-topology:link'])
384 self.assertEqual(nbLink, 12)
385 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
386 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
387 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
388 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
389 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
390 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
391 XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
392 XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
393 for i in range(0, nbLink):
394 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
395 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
396 if(nodeType == 'EXPRESS-LINK'):
397 find = linkId in expressLink
398 self.assertEqual(find, True)
399 expressLink.remove(linkId)
400 elif(nodeType == 'ADD-LINK'):
401 find = linkId in addLink
402 self.assertEqual(find, True)
403 addLink.remove(linkId)
404 elif(nodeType == 'DROP-LINK'):
405 find = linkId in dropLink
406 self.assertEqual(find, True)
407 dropLink.remove(linkId)
408 elif(nodeType == 'XPONDER-INPUT'):
409 find = linkId in XPDR_IN
410 self.assertEqual(find, True)
411 XPDR_IN.remove(linkId)
412 elif(nodeType == 'XPONDER-OUTPUT'):
413 find = linkId in XPDR_OUT
414 self.assertEqual(find, True)
415 XPDR_OUT.remove(linkId)
417 self.assertFalse(True)
418 self.assertEqual(len(expressLink), 0)
419 self.assertEqual(len(addLink), 0)
420 self.assertEqual(len(dropLink), 0)
421 self.assertEqual(len(XPDR_IN), 0)
422 self.assertEqual(len(XPDR_OUT), 0)
424 def test_13_connect_ROADMC(self):
426 url = ("{}/config/network-topology:"
427 "network-topology/topology/topology-netconf/node/ROADMC01"
428 .format(self.restconf_baseurl))
430 "node-id": "ROADMC01",
431 "netconf-node-topology:username": "admin",
432 "netconf-node-topology:password": "admin",
433 "netconf-node-topology:host": "127.0.0.1",
434 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
435 "netconf-node-topology:tcp-only": "false",
436 "netconf-node-topology:pass-through": {}}]}
437 headers = {'content-type': 'application/json'}
438 response = requests.request(
439 "PUT", url, data=json.dumps(data), headers=headers,
440 auth=('admin', 'admin'))
441 self.assertEqual(response.status_code, requests.codes.created)
444 def test_14_omsAttributes_ROADMA_ROADMC(self):
445 # Config ROADMA01-ROADMC01 oms-attributes
446 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
447 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
448 "OMS-attributes/span"
449 .format(self.restconf_baseurl))
451 "auto-spanloss": "true",
452 "engineered-spanloss": 12.2,
453 "link-concatenation": [{
456 "SRLG-length": 100000,
458 headers = {'content-type': 'application/json'}
459 response = requests.request(
460 "PUT", url, data=json.dumps(data), headers=headers,
461 auth=('admin', 'admin'))
462 self.assertEqual(response.status_code, requests.codes.created)
464 def test_15_omsAttributes_ROADMC_ROADMA(self):
465 # Config ROADMC01-ROADMA oms-attributes
466 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
467 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
468 "OMS-attributes/span"
469 .format(self.restconf_baseurl))
471 "auto-spanloss": "true",
472 "engineered-spanloss": 12.2,
473 "link-concatenation": [{
476 "SRLG-length": 100000,
478 headers = {'content-type': 'application/json'}
479 response = requests.request(
480 "PUT", url, data=json.dumps(data), headers=headers,
481 auth=('admin', 'admin'))
482 self.assertEqual(response.status_code, requests.codes.created)
484 def test_16_getClliNetwork(self):
485 url = ("{}/config/ietf-network:networks/network/clli-network"
486 .format(self.restconf_baseurl))
487 headers = {'content-type': 'application/json'}
488 response = requests.request(
489 "GET", url, headers=headers, auth=('admin', 'admin'))
490 self.assertEqual(response.status_code, requests.codes.ok)
491 res = response.json()
492 nbNode = len(res['network'][0]['node'])
493 listNode = ['NodeA', 'NodeC']
494 for i in range(0, nbNode):
495 nodeId = res['network'][0]['node'][i]['node-id']
496 find = nodeId in listNode
497 self.assertEqual(find, True)
498 if(nodeId == 'NodeA'):
499 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
501 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
502 listNode.remove(nodeId)
504 self.assertEqual(len(listNode), 0)
506 def test_17_getOpenRoadmNetwork(self):
507 url = ("{}/config/ietf-network:networks/network/openroadm-network"
508 .format(self.restconf_baseurl))
509 headers = {'content-type': 'application/json'}
510 response = requests.request(
511 "GET", url, headers=headers, auth=('admin', 'admin'))
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 nbNode = len(res['network'][0]['node'])
515 self.assertEqual(nbNode, 3)
516 listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
517 for i in range(0, nbNode):
518 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
519 nodeId = res['network'][0]['node'][i]['node-id']
520 if(nodeId == 'XPDRA01'):
521 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
522 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
523 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
524 listNode.remove(nodeId)
525 elif(nodeId == 'ROADMA01'):
526 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
527 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
528 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
529 listNode.remove(nodeId)
530 elif(nodeId == 'ROADMC01'):
531 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
532 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
533 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
534 listNode.remove(nodeId)
536 self.assertFalse(True)
537 self.assertEqual(len(listNode), 0)
539 def test_18_getROADMLinkOpenRoadmTopology(self):
540 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
541 .format(self.restconf_baseurl))
542 headers = {'content-type': 'application/json'}
543 response = requests.request(
544 "GET", url, headers=headers, auth=('admin', 'admin'))
545 self.assertEqual(response.status_code, requests.codes.ok)
546 res = response.json()
547 # Tests related to links
548 nbLink = len(res['network'][0]['ietf-network-topology:link'])
549 self.assertEqual(nbLink, 20)
550 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX', 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
551 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX', 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX']
552 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
553 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
554 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX', 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX']
555 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
556 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
557 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX', 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX']
558 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
559 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
560 XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
561 XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
562 for i in range(0, nbLink):
563 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
564 linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
565 if(nodeType == 'EXPRESS-LINK'):
566 find = linkId in expressLink
567 self.assertEqual(find, True)
568 expressLink.remove(linkId)
569 elif(nodeType == 'ADD-LINK'):
570 find = linkId in addLink
571 self.assertEqual(find, True)
572 addLink.remove(linkId)
573 elif(nodeType == 'DROP-LINK'):
574 find = linkId in dropLink
575 self.assertEqual(find, True)
576 dropLink.remove(linkId)
577 elif(nodeType == 'ROADM-TO-ROADM'):
578 find = linkId in R2RLink
579 self.assertEqual(find, True)
580 R2RLink.remove(linkId)
581 elif(nodeType == 'XPONDER-INPUT'):
582 find = linkId in XPDR_IN
583 self.assertEqual(find, True)
584 XPDR_IN.remove(linkId)
585 elif(nodeType == 'XPONDER-OUTPUT'):
586 find = linkId in XPDR_OUT
587 self.assertEqual(find, True)
588 XPDR_OUT.remove(linkId)
590 self.assertFalse(True)
591 self.assertEqual(len(expressLink), 0)
592 self.assertEqual(len(addLink), 0)
593 self.assertEqual(len(dropLink), 0)
594 self.assertEqual(len(R2RLink), 0)
595 self.assertEqual(len(XPDR_IN), 0)
596 self.assertEqual(len(XPDR_OUT), 0)
598 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
599 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
600 .format(self.restconf_baseurl))
601 headers = {'content-type': 'application/json'}
602 response = requests.request(
603 "GET", url, headers=headers, auth=('admin', 'admin'))
604 self.assertEqual(response.status_code, requests.codes.ok)
605 res = response.json()
606 # Tests related to links
607 nbLink = len(res['network'][0]['ietf-network-topology:link'])
608 self.assertEqual(nbLink, 20)
609 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
610 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
611 for i in range(0, nbLink):
612 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
613 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
614 if(link_id in R2RLink):
616 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
617 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
618 length = res['network'][0]['ietf-network-topology:link'][i][
619 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
620 if((spanLoss != None) & (length != None)):
622 self.assertTrue(find)
623 R2RLink.remove(link_id)
624 self.assertEqual(len(R2RLink), 0)
626 def test_20_getNodes_OpenRoadmTopology(self):
627 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
628 .format(self.restconf_baseurl))
629 headers = {'content-type': 'application/json'}
630 response = requests.request(
631 "GET", url, headers=headers, auth=('admin', 'admin'))
632 res = response.json()
633 # Tests related to nodes
634 self.assertEqual(response.status_code, requests.codes.ok)
635 nbNode = len(res['network'][0]['node'])
636 self.assertEqual(nbNode, 8)
637 listNode = ['XPDRA01-XPDR1',
638 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
639 'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
640 # ************************Tests related to XPDRA nodes
641 for i in range(0, nbNode):
642 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
643 nodeId = res['network'][0]['node'][i]['node-id']
644 if(nodeId == 'XPDRA01-XPDR1'):
645 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
646 res['network'][0]['node'][i]['supporting-node'])
647 self.assertEqual(nodeType, 'XPONDER')
648 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
649 self.assertTrue(nbTps == 6)
652 for j in range(0, nbTps):
653 tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
654 if (tpType == 'XPONDER-CLIENT'):
656 elif (tpType == 'XPONDER-NETWORK'):
658 self.assertTrue(client == 4)
659 self.assertTrue(network == 2)
660 listNode.remove(nodeId)
661 elif(nodeId == 'ROADMA01-SRG1'):
662 # Test related to SRG1
663 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
664 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
665 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
666 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
667 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
668 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
669 res['network'][0]['node'][i]['supporting-node'])
670 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
671 listNode.remove(nodeId)
672 elif(nodeId == 'ROADMA01-SRG3'):
673 # Test related to SRG1
674 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
675 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
676 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
677 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
678 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
679 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
680 res['network'][0]['node'][i]['supporting-node'])
681 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
682 listNode.remove(nodeId)
683 elif(nodeId == 'ROADMA01-DEG1'):
684 # Test related to DEG1
685 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
686 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
687 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
688 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
689 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
690 res['network'][0]['node'][i]['supporting-node'])
691 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
692 listNode.remove(nodeId)
693 elif(nodeId == 'ROADMA01-DEG2'):
694 # Test related to DEG2
695 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
696 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
697 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
698 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
699 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
700 res['network'][0]['node'][i]['supporting-node'])
701 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
702 listNode.remove(nodeId)
703 elif(nodeId == 'ROADMC01-SRG1'):
704 # Test related to SRG1
705 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
706 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
707 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
708 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
709 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
710 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
711 res['network'][0]['node'][i]['supporting-node'])
712 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
713 listNode.remove(nodeId)
714 elif(nodeId == 'ROADMC01-DEG1'):
715 # Test related to DEG1
716 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
717 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
718 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
719 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
720 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
721 res['network'][0]['node'][i]['supporting-node'])
722 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
723 listNode.remove(nodeId)
724 elif(nodeId == 'ROADMC01-DEG2'):
725 # Test related to DEG2
726 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
727 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
728 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
729 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
730 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
731 res['network'][0]['node'][i]['supporting-node'])
732 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
733 listNode.remove(nodeId)
735 self.assertFalse(True)
736 self.assertEqual(len(listNode), 0)
738 def test_21_connect_ROADMB(self):
739 url = ("{}/config/network-topology:"
740 "network-topology/topology/topology-netconf/node/ROADMB01"
741 .format(self.restconf_baseurl))
743 "node-id": "ROADMB01",
744 "netconf-node-topology:username": "admin",
745 "netconf-node-topology:password": "admin",
746 "netconf-node-topology:host": "127.0.0.1",
747 "netconf-node-topology:port": test_utils.sims['roadmb']['port'],
748 "netconf-node-topology:tcp-only": "false",
749 "netconf-node-topology:pass-through": {}}]}
750 headers = {'content-type': 'application/json'}
751 response = requests.request(
752 "PUT", url, data=json.dumps(data), headers=headers,
753 auth=('admin', 'admin'))
754 self.assertEqual(response.status_code, requests.codes.created)
757 def test_22_omsAttributes_ROADMA_ROADMB(self):
758 # Config ROADMA01-ROADMB01 oms-attributes
759 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
760 "link/ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
761 "OMS-attributes/span"
762 .format(self.restconf_baseurl))
764 "auto-spanloss": "true",
765 "engineered-spanloss": 12.2,
766 "link-concatenation": [{
769 "SRLG-length": 100000,
771 headers = {'content-type': 'application/json'}
772 response = requests.request(
773 "PUT", url, data=json.dumps(data), headers=headers,
774 auth=('admin', 'admin'))
775 self.assertEqual(response.status_code, requests.codes.created)
777 def test_23_omsAttributes_ROADMB_ROADMA(self):
778 # Config ROADMB01-ROADMA01 oms-attributes
779 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
780 "link/ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
781 "OMS-attributes/span"
782 .format(self.restconf_baseurl))
784 "auto-spanloss": "true",
785 "engineered-spanloss": 12.2,
786 "link-concatenation": [{
789 "SRLG-length": 100000,
791 headers = {'content-type': 'application/json'}
792 response = requests.request(
793 "PUT", url, data=json.dumps(data), headers=headers,
794 auth=('admin', 'admin'))
795 self.assertEqual(response.status_code, requests.codes.created)
797 def test_24_omsAttributes_ROADMB_ROADMC(self):
798 # Config ROADMB01-ROADMC01 oms-attributes
799 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
800 "link/ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
801 "OMS-attributes/span"
802 .format(self.restconf_baseurl))
804 "auto-spanloss": "true",
805 "engineered-spanloss": 12.2,
806 "link-concatenation": [{
809 "SRLG-length": 100000,
811 headers = {'content-type': 'application/json'}
812 response = requests.request(
813 "PUT", url, data=json.dumps(data), headers=headers,
814 auth=('admin', 'admin'))
815 self.assertEqual(response.status_code, requests.codes.created)
817 def test_25_omsAttributes_ROADMC_ROADMB(self):
818 # Config ROADMC01-ROADMB01 oms-attributes
819 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
820 "link/ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
821 "OMS-attributes/span"
822 .format(self.restconf_baseurl))
824 "auto-spanloss": "true",
825 "engineered-spanloss": 12.2,
826 "link-concatenation": [{
829 "SRLG-length": 100000,
831 headers = {'content-type': 'application/json'}
832 response = requests.request(
833 "PUT", url, data=json.dumps(data), headers=headers,
834 auth=('admin', 'admin'))
835 self.assertEqual(response.status_code, requests.codes.created)
837 def test_26_getClliNetwork(self):
838 url = ("{}/config/ietf-network:networks/network/clli-network"
839 .format(self.restconf_baseurl))
840 headers = {'content-type': 'application/json'}
841 response = requests.request(
842 "GET", url, headers=headers, auth=('admin', 'admin'))
843 self.assertEqual(response.status_code, requests.codes.ok)
844 res = response.json()
845 nbNode = len(res['network'][0]['node'])
846 listNode = ['NodeA', 'NodeB', 'NodeC']
847 for i in range(0, nbNode):
848 nodeId = res['network'][0]['node'][i]['node-id']
849 find = nodeId in listNode
850 self.assertEqual(find, True)
851 if(nodeId == 'NodeA'):
852 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
853 elif(nodeId == 'NodeB'):
854 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
856 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
857 listNode.remove(nodeId)
858 self.assertEqual(len(listNode), 0)
860 def test_27_verifyDegree(self):
861 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
862 .format(self.restconf_baseurl))
863 headers = {'content-type': 'application/json'}
864 response = requests.request(
865 "GET", url, headers=headers, auth=('admin', 'admin'))
866 self.assertEqual(response.status_code, requests.codes.ok)
867 res = response.json()
868 # Tests related to links
869 nbLink = len(res['network'][0]['ietf-network-topology:link'])
870 listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX', 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
871 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX', 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
872 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX', 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
873 for i in range(0, nbLink):
874 if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
875 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
876 find = link_id in listR2RLink
877 self.assertEqual(find, True)
878 listR2RLink.remove(link_id)
879 self.assertEqual(len(listR2RLink), 0)
881 def test_28_verifyOppositeLinkTopology(self):
882 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
883 .format(self.restconf_baseurl))
884 headers = {'content-type': 'application/json'}
885 response = requests.request(
886 "GET", url, headers=headers, auth=('admin', 'admin'))
887 self.assertEqual(response.status_code, requests.codes.ok)
888 res = response.json()
889 # Tests related to links
890 nbLink = len(res['network'][0]['ietf-network-topology:link'])
891 self.assertEqual(nbLink, 34)
892 for i in range(0, nbLink):
893 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
894 link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
895 link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
896 link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
897 oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
898 # Find the opposite link
899 url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
900 url = (url_oppLink.format(self.restconf_baseurl))
901 headers = {'content-type': 'application/json'}
902 response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
903 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
904 res_oppLink = response_oppLink.json()
905 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
906 ['org-openroadm-common-network:opposite-link'], link_id)
907 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
908 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
909 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
910 if link_type == 'ADD-LINK':
911 self.assertEqual(oppLink_type, 'DROP-LINK')
912 elif link_type == 'DROP-LINK':
913 self.assertEqual(oppLink_type, 'ADD-LINK')
914 elif link_type == 'EXPRESS-LINK':
915 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
916 elif link_type == 'ROADM-TO-ROADM':
917 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
918 elif link_type == 'XPONDER-INPUT':
919 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
920 elif link_type == 'XPONDER-OUTPUT':
921 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
923 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
924 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
925 .format(self.restconf_baseurl))
926 headers = {'content-type': 'application/json'}
927 response = requests.request(
928 "GET", url, headers=headers, auth=('admin', 'admin'))
929 self.assertEqual(response.status_code, requests.codes.ok)
930 res = response.json()
931 nbLink = len(res['network'][0]['ietf-network-topology:link'])
932 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
933 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
934 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
935 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
936 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
937 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
938 for i in range(0, nbLink):
939 nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
940 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
941 if(link_id in R2RLink):
943 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
944 'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
945 length = res['network'][0]['ietf-network-topology:link'][i][
946 'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
947 if((spanLoss != None) & (length != None)):
949 self.assertTrue(find)
950 R2RLink.remove(link_id)
951 self.assertEqual(len(R2RLink), 0)
953 def test_30_disconnect_ROADMB(self):
954 # Delete in the topology-netconf
955 url = ("{}/config/network-topology:"
956 "network-topology/topology/topology-netconf/node/ROADMB01"
957 .format(self.restconf_baseurl))
959 headers = {'content-type': 'application/json'}
960 response = requests.request(
961 "DELETE", url, data=json.dumps(data), headers=headers,
962 auth=('admin', 'admin'))
963 self.assertEqual(response.status_code, requests.codes.ok)
964 # Delete in the clli-network
965 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
966 .format(self.restconf_baseurl))
968 headers = {'content-type': 'application/json'}
969 response = requests.request(
970 "DELETE", url, data=json.dumps(data), headers=headers,
971 auth=('admin', 'admin'))
972 self.assertEqual(response.status_code, requests.codes.ok)
974 def test_31_disconnect_ROADMC(self):
975 # Delete in the topology-netconf
976 url = ("{}/config/network-topology:"
977 "network-topology/topology/topology-netconf/node/ROADMC01"
978 .format(self.restconf_baseurl))
980 headers = {'content-type': 'application/json'}
981 response = requests.request(
982 "DELETE", url, data=json.dumps(data), headers=headers,
983 auth=('admin', 'admin'))
984 self.assertEqual(response.status_code, requests.codes.ok)
985 # Delete in the clli-network
986 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
987 .format(self.restconf_baseurl))
989 headers = {'content-type': 'application/json'}
990 response = requests.request(
991 "DELETE", url, data=json.dumps(data), headers=headers,
992 auth=('admin', 'admin'))
993 self.assertEqual(response.status_code, requests.codes.ok)
995 def test_32_getNodes_OpenRoadmTopology(self):
996 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
997 .format(self.restconf_baseurl))
998 headers = {'content-type': 'application/json'}
999 response = requests.request(
1000 "GET", url, headers=headers, auth=('admin', 'admin'))
1001 res = response.json()
1002 # Tests related to nodes
1003 self.assertEqual(response.status_code, requests.codes.ok)
1004 nbNode = len(res['network'][0]['node'])
1005 self.assertEqual(nbNode, 5)
1006 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1007 for i in range(0, nbNode):
1008 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1009 nodeId = res['network'][0]['node'][i]['node-id']
1010 # Tests related to XPDRA nodes
1011 if(nodeId == 'XPDRA01-XPDR1'):
1012 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1013 for j in range(0, nbTp):
1014 tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1015 if (tpid == 'XPDR1-CLIENT1'):
1016 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1017 ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1018 if (tpid == 'XPDR1-NETWORK1'):
1019 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1020 ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1021 self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1022 ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1023 'ROADMA01-SRG1--SRG1-PP1-TXRX')
1024 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
1025 res['network'][0]['node'][i]['supporting-node'])
1026 listNode.remove(nodeId)
1027 elif(nodeId == 'ROADMA01-SRG1'):
1028 # Test related to SRG1
1029 self.assertEqual(nodeType, 'SRG')
1030 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1031 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1032 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1033 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1034 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1035 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1036 res['network'][0]['node'][i]['supporting-node'])
1037 listNode.remove(nodeId)
1038 elif(nodeId == 'ROADMA01-SRG3'):
1039 # Test related to SRG1
1040 self.assertEqual(nodeType, 'SRG')
1041 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1042 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1043 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1044 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1045 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1046 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1047 res['network'][0]['node'][i]['supporting-node'])
1048 listNode.remove(nodeId)
1049 elif(nodeId == 'ROADMA01-DEG1'):
1050 # Test related to DEG1
1051 self.assertEqual(nodeType, 'DEGREE')
1052 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1053 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1054 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1055 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1056 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1057 res['network'][0]['node'][i]['supporting-node'])
1058 listNode.remove(nodeId)
1059 elif(nodeId == 'ROADMA01-DEG2'):
1060 # Test related to DEG2
1061 self.assertEqual(nodeType, 'DEGREE')
1062 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1063 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1064 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1065 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1066 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1067 res['network'][0]['node'][i]['supporting-node'])
1068 listNode.remove(nodeId)
1070 self.assertFalse(True)
1071 self.assertEqual(len(listNode), 0)
1072 # Test related to SRG1 of ROADMC
1073 for i in range(0, nbNode):
1074 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-SRG1')
1075 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG1')
1076 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG2')
1078 def test_33_getOpenRoadmNetwork(self):
1079 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1080 .format(self.restconf_baseurl))
1081 headers = {'content-type': 'application/json'}
1082 response = requests.request(
1083 "GET", url, headers=headers, auth=('admin', 'admin'))
1084 self.assertEqual(response.status_code, requests.codes.ok)
1085 res = response.json()
1086 nbNode = len(res['network'][0]['node'])
1087 self.assertEqual(nbNode, 2)
1088 for i in range(0, nbNode-1):
1089 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01')
1090 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMB01')
1092 def test_34_getClliNetwork(self):
1093 url = ("{}/config/ietf-network:networks/network/clli-network"
1094 .format(self.restconf_baseurl))
1095 headers = {'content-type': 'application/json'}
1096 response = requests.request(
1097 "GET", url, headers=headers, auth=('admin', 'admin'))
1098 self.assertEqual(response.status_code, requests.codes.ok)
1099 res = response.json()
1100 nbNode = len(res['network'][0]['node'])
1101 self.assertEqual(nbNode, 1)
1102 for i in range(0, nbNode-1):
1103 self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1105 def test_35_disconnect_XPDRA(self):
1106 url = ("{}/config/network-topology:"
1107 "network-topology/topology/topology-netconf/node/XPDRA01"
1108 .format(self.restconf_baseurl))
1110 headers = {'content-type': 'application/json'}
1111 response = requests.request(
1112 "DELETE", url, data=json.dumps(data), headers=headers,
1113 auth=('admin', 'admin'))
1114 self.assertEqual(response.status_code, requests.codes.ok)
1116 def test_36_getClliNetwork(self):
1117 url = ("{}/config/ietf-network:networks/network/clli-network"
1118 .format(self.restconf_baseurl))
1119 headers = {'content-type': 'application/json'}
1120 response = requests.request(
1121 "GET", url, headers=headers, auth=('admin', 'admin'))
1122 self.assertEqual(response.status_code, requests.codes.ok)
1123 res = response.json()
1124 nbNode = len(res['network'][0]['node'])
1125 self.assertEqual(nbNode, 1)
1126 self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1128 def test_37_getOpenRoadmNetwork(self):
1129 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1130 .format(self.restconf_baseurl))
1131 headers = {'content-type': 'application/json'}
1132 response = requests.request(
1133 "GET", url, headers=headers, auth=('admin', 'admin'))
1134 self.assertEqual(response.status_code, requests.codes.ok)
1135 res = response.json()
1136 nbNode = len(res['network'][0]['node'])
1137 self.assertEqual(nbNode, 1)
1138 for i in range(0, nbNode):
1139 self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDRA01')
1141 def test_38_getNodes_OpenRoadmTopology(self):
1142 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1143 .format(self.restconf_baseurl))
1144 headers = {'content-type': 'application/json'}
1145 response = requests.request(
1146 "GET", url, headers=headers, auth=('admin', 'admin'))
1147 res = response.json()
1148 # Tests related to nodes
1149 self.assertEqual(response.status_code, requests.codes.ok)
1150 nbNode = len(res['network'][0]['node'])
1151 self.assertEqual(nbNode, 4)
1152 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1153 for i in range(0, nbNode):
1154 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1155 res['network'][0]['node'][i]['supporting-node'])
1156 nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1157 nodeId = res['network'][0]['node'][i]['node-id']
1158 if(nodeId == 'ROADMA01-SRG1'):
1159 # Test related to SRG1
1160 self.assertEqual(nodeType, 'SRG')
1161 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1162 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1163 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1164 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1165 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1166 listNode.remove(nodeId)
1167 elif(nodeId == 'ROADMA01-SRG3'):
1168 # Test related to SRG1
1169 self.assertEqual(nodeType, 'SRG')
1170 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1171 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1172 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1173 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1174 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1175 listNode.remove(nodeId)
1176 elif(nodeId == 'ROADMA01-DEG1'):
1177 # Test related to DEG1
1178 self.assertEqual(nodeType, 'DEGREE')
1179 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1180 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1181 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1182 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1183 listNode.remove(nodeId)
1184 elif(nodeId == 'ROADMA01-DEG2'):
1185 # Test related to DEG2
1186 self.assertEqual(nodeType, 'DEGREE')
1187 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1188 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1189 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1190 res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1191 listNode.remove(nodeId)
1193 self.assertFalse(True)
1194 self.assertEqual(len(listNode), 0)
1196 def test_39_disconnect_ROADM_XPDRA_link(self):
1198 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1199 "link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
1200 .format(self.restconf_baseurl))
1202 headers = {'content-type': 'application/json'}
1203 response = requests.request(
1204 "DELETE", url, data=json.dumps(data), headers=headers,
1205 auth=('admin', 'admin'))
1206 self.assertEqual(response.status_code, requests.codes.ok)
1208 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1209 "link/ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1"
1210 .format(self.restconf_baseurl))
1212 headers = {'content-type': 'application/json'}
1213 response = requests.request(
1214 "DELETE", url, data=json.dumps(data), headers=headers,
1215 auth=('admin', 'admin'))
1216 self.assertEqual(response.status_code, requests.codes.ok)
1218 def test_40_getLinks_OpenRoadmTopology(self):
1219 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1220 .format(self.restconf_baseurl))
1221 headers = {'content-type': 'application/json'}
1222 response = requests.request(
1223 "GET", url, headers=headers, auth=('admin', 'admin'))
1224 self.assertEqual(response.status_code, requests.codes.ok)
1225 res = response.json()
1226 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1227 self.assertEqual(nbLink, 16)
1228 expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1229 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
1230 addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1231 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
1232 dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
1233 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
1234 roadmtoroadmLink = 0
1235 for i in range(0, nbLink):
1236 if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1237 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1238 find = link_id in expressLink
1239 self.assertEqual(find, True)
1240 expressLink.remove(link_id)
1241 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1242 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1243 find = link_id in addLink
1244 self.assertEqual(find, True)
1245 addLink.remove(link_id)
1246 elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1247 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1248 find = link_id in dropLink
1249 self.assertEqual(find, True)
1250 dropLink.remove(link_id)
1252 roadmtoroadmLink += 1
1253 self.assertEqual(len(expressLink), 0)
1254 self.assertEqual(len(addLink), 0)
1255 self.assertEqual(len(dropLink), 0)
1256 self.assertEqual(roadmtoroadmLink, 6)
1257 for i in range(0, nbLink):
1258 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1259 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1260 self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1261 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1263 def test_41_disconnect_ROADMA(self):
1264 url = ("{}/config/network-topology:"
1265 "network-topology/topology/topology-netconf/node/ROADMA01"
1266 .format(self.restconf_baseurl))
1268 headers = {'content-type': 'application/json'}
1269 response = requests.request(
1270 "DELETE", url, data=json.dumps(data), headers=headers,
1271 auth=('admin', 'admin'))
1272 self.assertEqual(response.status_code, requests.codes.ok)
1273 # Delete in the clli-network
1274 url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1275 .format(self.restconf_baseurl))
1277 headers = {'content-type': 'application/json'}
1278 response = requests.request(
1279 "DELETE", url, data=json.dumps(data), headers=headers,
1280 auth=('admin', 'admin'))
1281 self.assertEqual(response.status_code, requests.codes.ok)
1283 def test_42_getClliNetwork(self):
1284 url = ("{}/config/ietf-network:networks/network/clli-network"
1285 .format(self.restconf_baseurl))
1286 headers = {'content-type': 'application/json'}
1287 response = requests.request(
1288 "GET", url, headers=headers, auth=('admin', 'admin'))
1289 self.assertEqual(response.status_code, requests.codes.ok)
1290 res = response.json()
1291 self.assertNotIn('node', res['network'][0])
1293 def test_43_getOpenRoadmNetwork(self):
1294 url = ("{}/config/ietf-network:networks/network/openroadm-network"
1295 .format(self.restconf_baseurl))
1296 headers = {'content-type': 'application/json'}
1297 response = requests.request(
1298 "GET", url, headers=headers, auth=('admin', 'admin'))
1299 self.assertEqual(response.status_code, requests.codes.ok)
1300 res = response.json()
1301 self.assertNotIn('node', res['network'][0])
1303 def test_44_check_roadm2roadm_link_persistence(self):
1304 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1305 .format(self.restconf_baseurl))
1306 headers = {'content-type': 'application/json'}
1307 response = requests.request(
1308 "GET", url, headers=headers, auth=('admin', 'admin'))
1309 self.assertEqual(response.status_code, requests.codes.ok)
1310 res = response.json()
1311 nbLink = len(res['network'][0]['ietf-network-topology:link'])
1312 self.assertNotIn('node', res['network'][0])
1313 self.assertEqual(nbLink, 6)
1316 if __name__ == "__main__":
1317 unittest.main(verbosity=2)