3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
25 import test_utils_rfc8040 # nopep8
28 class TransportPCETopologyTesting(unittest.TestCase):
31 NODE_VERSION = '1.2.1'
35 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
36 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
37 'org-openroadm-common-network:operational-state': 'inService'}),
38 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
39 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
40 'org-openroadm-common-network:operational-state': 'inService'})]
44 'checks_tp': [({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
45 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
46 'org-openroadm-common-network:operational-state': 'inService'}),
47 ({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
48 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
49 'org-openroadm-common-network:operational-state': 'inService'})]
52 'node_type': 'DEGREE',
53 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
54 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
55 'org-openroadm-common-network:operational-state': 'inService'}),
56 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
57 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
58 'org-openroadm-common-network:operational-state': 'inService'})]
61 'node_type': 'DEGREE',
62 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
63 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
64 'org-openroadm-common-network:operational-state': 'inService'}),
65 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
66 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
67 'org-openroadm-common-network:operational-state': 'inService'})]
73 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
74 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
75 'org-openroadm-common-network:operational-state': 'inService'}),
76 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
77 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
78 'org-openroadm-common-network:operational-state': 'inService'})]
81 'node_type': 'DEGREE',
82 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
83 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
84 'org-openroadm-common-network:operational-state': 'inService'}),
85 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
86 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
87 'org-openroadm-common-network:operational-state': 'inService'})]
90 'node_type': 'DEGREE',
91 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
92 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
93 'org-openroadm-common-network:operational-state': 'inService'}),
94 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
95 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
96 'org-openroadm-common-network:operational-state': 'inService'})]
102 cls.processes = test_utils_rfc8040.start_tpce()
103 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION),
104 ('roadmb', cls.NODE_VERSION), ('roadmc', cls.NODE_VERSION)])
107 def tearDownClass(cls):
108 # pylint: disable=not-an-iterable
109 for process in cls.processes:
110 test_utils_rfc8040.shutdown_process(process)
111 print("all processes killed")
116 def test_01_connect_ROADMA(self):
117 response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
118 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
120 def test_02_getClliNetwork(self):
121 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
122 self.assertEqual(response['status_code'], requests.codes.ok)
123 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
124 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
126 def test_03_getOpenRoadmNetwork(self):
127 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
128 self.assertEqual(response['status_code'], requests.codes.ok)
129 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'ROADMA01')
130 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
131 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
132 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
133 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-network:model'], '2')
135 def test_04_getLinks_OpenroadmTopology(self):
136 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
137 self.assertEqual(response['status_code'], requests.codes.ok)
138 # Tests related to links
139 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 10)
140 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
141 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
142 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
143 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
144 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
145 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
146 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
147 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
148 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
149 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
151 for val in response['network'][0]['ietf-network-topology:link']:
152 linkId = val['link-id']
153 linkType = val['org-openroadm-common-network:link-type']
154 self.assertIn(linkType, check_list)
155 find = linkId in check_list[linkType]
156 self.assertEqual(find, True)
157 (check_list[linkType]).remove(linkId)
158 for val in check_list.values():
159 self.assertEqual(len(val), 0)
161 def test_05_getNodes_OpenRoadmTopology(self):
162 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
163 self.assertEqual(response['status_code'], requests.codes.ok)
164 self.assertEqual(len(response['network'][0]['node']), 4)
165 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
166 for val in response['network'][0]['node']:
167 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
168 nodeType = val['org-openroadm-common-network:node-type']
169 nodeId = val['node-id']
170 self.assertIn(nodeId, self.CHECK_DICT1)
171 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
172 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
173 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
174 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
175 self.assertIn(item, val['ietf-network-topology:termination-point'])
176 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
177 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
178 listNode.remove(nodeId)
180 self.assertEqual(len(listNode), 0)
182 def test_06_connect_XPDRA(self):
183 response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
184 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
186 def test_07_getClliNetwork(self):
187 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
188 self.assertEqual(response['status_code'], requests.codes.ok)
189 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
190 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
192 def test_08_getOpenRoadmNetwork(self):
193 # pylint: disable=redundant-unittest-assert
194 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
195 self.assertEqual(response['status_code'], requests.codes.ok)
196 self.assertEqual(len(response['network'][0]['node']), 2)
197 for val in response['network'][0]['node']:
198 self.assertEqual(val['supporting-node'][0]['network-ref'], 'clli-network')
199 self.assertEqual(val['supporting-node'][0]['node-ref'], 'NodeA')
200 nodeId = val['node-id']
201 if nodeId == 'XPDRA01':
202 self.assertEqual(val['org-openroadm-common-network:node-type'], 'XPONDER')
203 self.assertEqual(val['org-openroadm-network:model'], '1')
204 elif nodeId == 'ROADMA01':
205 self.assertEqual(val['org-openroadm-common-network:node-type'], 'ROADM')
206 self.assertEqual(val['org-openroadm-network:model'], '2')
208 self.assertFalse(True)
210 def test_09_getNodes_OpenRoadmTopology(self):
211 # pylint: disable=redundant-unittest-assert
212 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
213 self.assertEqual(response['status_code'], requests.codes.ok)
214 self.assertEqual(len(response['network'][0]['node']), 5)
215 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
216 for val in response['network'][0]['node']:
217 nodeType = val['org-openroadm-common-network:node-type']
218 nodeId = val['node-id']
219 # Tests related to XPDRA nodes
220 if nodeId == 'XPDRA01-XPDR1':
221 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, val['supporting-node'])
222 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
223 self.assertEqual(nodeType, 'XPONDER')
226 for val2 in val['ietf-network-topology:termination-point']:
227 tpType = val2['org-openroadm-common-network:tp-type']
229 if tpType == 'XPONDER-CLIENT':
231 elif tpType == 'XPONDER-NETWORK':
233 if tpId == 'XPDR1-NETWORK2':
234 self.assertEqual(val2['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
235 elif tpId == 'XPDR1-CLIENT3':
236 self.assertEqual(val2['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
237 self.assertTrue(client == 4)
238 self.assertTrue(network == 2)
239 listNode.remove(nodeId)
240 # Tests related to ROADMA nodes
241 elif nodeId in self.CHECK_DICT1:
242 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
243 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
244 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
245 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
246 self.assertIn(item, val['ietf-network-topology:termination-point'])
247 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
248 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
249 listNode.remove(nodeId)
251 self.assertFalse(True)
252 self.assertEqual(len(listNode), 0)
254 # Connect the tail XPDRA to ROADMA and vice versa
255 def test_10_connect_tail_xpdr_rdm(self):
256 # Connect the tail: XPDRA to ROADMA
257 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
258 "ROADMA01", "1", "SRG1-PP1-TXRX")
259 self.assertEqual(response.status_code, requests.codes.ok)
261 def test_11_connect_tail_rdm_xpdr(self):
262 # Connect the tail: ROADMA to XPDRA
263 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
264 "ROADMA01", "1", "SRG1-PP1-TXRX")
265 self.assertEqual(response.status_code, requests.codes.ok)
267 def test_12_getLinks_OpenRoadmTopology(self):
268 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
269 self.assertEqual(response['status_code'], requests.codes.ok)
270 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 12)
271 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
272 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
273 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
274 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
275 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
276 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
277 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
278 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
279 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
280 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX'],
281 'XPONDER-INPUT': ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1'],
282 'XPONDER-OUTPUT': ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
284 for val in response['network'][0]['ietf-network-topology:link']:
285 linkId = val['link-id']
286 linkType = val['org-openroadm-common-network:link-type']
287 self.assertIn(linkType, check_list)
288 find = linkId in check_list[linkType]
289 self.assertEqual(find, True)
290 (check_list[linkType]).remove(linkId)
291 for val in check_list.values():
292 self.assertEqual(len(val), 0)
294 def test_13_connect_ROADMC(self):
295 response = test_utils_rfc8040.mount_device("ROADMC01", ('roadmc', self.NODE_VERSION))
296 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
298 def test_14_omsAttributes_ROADMA_ROADMC(self):
299 # Config ROADMA01-ROADMC01 oms-attributes
301 "auto-spanloss": "true",
302 "engineered-spanloss": 12.2,
303 "link-concatenation": [{
306 "SRLG-length": 100000,
308 response = test_utils_rfc8040.add_oms_attr_request(
309 "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
310 self.assertEqual(response.status_code, requests.codes.created)
312 def test_15_omsAttributes_ROADMC_ROADMA(self):
313 # Config ROADMC01-ROADMA oms-attributes
315 "auto-spanloss": "true",
316 "engineered-spanloss": 12.2,
317 "link-concatenation": [{
320 "SRLG-length": 100000,
322 response = test_utils_rfc8040.add_oms_attr_request(
323 "ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
324 self.assertEqual(response.status_code, requests.codes.created)
326 def test_16_getClliNetwork(self):
327 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
328 self.assertEqual(response['status_code'], requests.codes.ok)
329 listNode = ['NodeA', 'NodeC']
330 for val in response['network'][0]['node']:
331 nodeId = val['node-id']
332 self.assertIn(nodeId, listNode)
333 self.assertEqual(val['org-openroadm-clli-network:clli'], nodeId)
334 listNode.remove(nodeId)
335 self.assertEqual(len(listNode), 0)
337 def test_17_getOpenRoadmNetwork(self):
338 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
339 self.assertEqual(response['status_code'], requests.codes.ok)
340 self.assertEqual(len(response['network'][0]['node']), 3)
341 listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
342 CHECK_LIST = {'XPDRA01': {'node-ref': 'NodeA', 'node-type': 'XPONDER', 'model': '1'},
343 'ROADMA01': {'node-ref': 'NodeA', 'node-type': 'ROADM', 'model': '2'},
344 'ROADMC01': {'node-ref': 'NodeC', 'node-type': 'ROADM', 'model': '2'}
346 for val in response['network'][0]['node']:
347 self.assertEqual(val['supporting-node'][0]['network-ref'], 'clli-network')
348 nodeId = val['node-id']
349 self.assertIn(nodeId, CHECK_LIST)
350 self.assertEqual(val['supporting-node'][0]['node-ref'],
351 CHECK_LIST[nodeId]['node-ref'])
352 self.assertEqual(val['org-openroadm-common-network:node-type'],
353 CHECK_LIST[nodeId]['node-type'])
354 self.assertEqual(val['org-openroadm-network:model'],
355 CHECK_LIST[nodeId]['model'])
356 listNode.remove(nodeId)
357 self.assertEqual(len(listNode), 0)
359 def test_18_getROADMLinkOpenRoadmTopology(self):
360 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
361 self.assertEqual(response['status_code'], requests.codes.ok)
362 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
363 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
364 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
365 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX',
366 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX'],
367 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
368 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
369 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
370 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
371 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX',
372 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX'],
373 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
374 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
375 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
376 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
377 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX',
378 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX'],
379 'ROADM-TO-ROADM': ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
380 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX'],
381 'XPONDER-INPUT': ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1'],
382 'XPONDER-OUTPUT': ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
384 for val in response['network'][0]['ietf-network-topology:link']:
385 linkId = val['link-id']
386 linkType = val['org-openroadm-common-network:link-type']
387 self.assertIn(linkType, check_list)
388 find = linkId in check_list[linkType]
389 self.assertEqual(find, True)
390 (check_list[linkType]).remove(linkId)
391 for val in check_list.values():
392 self.assertEqual(len(val), 0)
394 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
395 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
396 self.assertEqual(response['status_code'], requests.codes.ok)
397 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
398 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
399 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
400 for val in response['network'][0]['ietf-network-topology:link']:
401 link_id = val['link-id']
402 if link_id in R2RLink:
404 spanLoss = val['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
405 # pylint: disable=line-too-long
406 length = val['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
407 if (spanLoss is not None) & (length is not None):
409 self.assertTrue(find)
410 R2RLink.remove(link_id)
411 self.assertEqual(len(R2RLink), 0)
413 def test_20_getNodes_OpenRoadmTopology(self):
414 # pylint: disable=redundant-unittest-assert
415 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
416 self.assertEqual(response['status_code'], requests.codes.ok)
417 self.assertEqual(len(response['network'][0]['node']), 8)
418 listNode = ['XPDRA01-XPDR1',
419 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
420 'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
421 for val in response['network'][0]['node']:
422 nodeType = val['org-openroadm-common-network:node-type']
423 nodeId = val['node-id']
424 # Tests related to XPDRA nodes
425 if nodeId == 'XPDRA01-XPDR1':
426 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, val['supporting-node'])
427 self.assertEqual(nodeType, 'XPONDER')
428 self.assertEqual(len(val['ietf-network-topology:termination-point']), 6)
431 for val2 in val['ietf-network-topology:termination-point']:
432 tpType = val2['org-openroadm-common-network:tp-type']
433 if tpType == 'XPONDER-CLIENT':
435 elif tpType == 'XPONDER-NETWORK':
437 self.assertTrue(client == 4)
438 self.assertTrue(network == 2)
439 listNode.remove(nodeId)
440 # Tests related to ROADMA nodes
441 elif nodeId in self.CHECK_DICT1:
442 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
443 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
444 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
445 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
446 self.assertIn(item, val['ietf-network-topology:termination-point'])
447 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
448 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
449 self.assertEqual(val['org-openroadm-common-network:node-type'], self.CHECK_DICT1[nodeId]['node_type'])
450 listNode.remove(nodeId)
451 # Tests related to ROADMC nodes
452 elif nodeId in self.CHECK_DICT2:
453 self.assertEqual(nodeType, self.CHECK_DICT2[nodeId]['node_type'])
454 if self.CHECK_DICT2[nodeId]['node_type'] == 'SRG':
455 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
456 for item in self.CHECK_DICT2[nodeId]['checks_tp']:
457 self.assertIn(item, val['ietf-network-topology:termination-point'])
458 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeC'}, val['supporting-node'])
459 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'}, val['supporting-node'])
460 self.assertEqual(val['org-openroadm-common-network:node-type'], self.CHECK_DICT2[nodeId]['node_type'])
461 listNode.remove(nodeId)
463 self.assertFalse(True)
464 self.assertEqual(len(listNode), 0)
466 def test_21_connect_ROADMB(self):
467 response = test_utils_rfc8040.mount_device("ROADMB01", ('roadmb', self.NODE_VERSION))
468 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
470 def test_22_omsAttributes_ROADMA_ROADMB(self):
471 # Config ROADMA01-ROADMB01 oms-attributes
473 "auto-spanloss": "true",
474 "engineered-spanloss": 12.2,
475 "link-concatenation": [{
478 "SRLG-length": 100000,
480 response = test_utils_rfc8040.add_oms_attr_request(
481 "ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX", data)
482 self.assertEqual(response.status_code, requests.codes.created)
484 def test_23_omsAttributes_ROADMB_ROADMA(self):
485 # Config ROADMB01-ROADMA01 oms-attributes
487 "auto-spanloss": "true",
488 "engineered-spanloss": 12.2,
489 "link-concatenation": [{
492 "SRLG-length": 100000,
494 response = test_utils_rfc8040.add_oms_attr_request(
495 "ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX", data)
496 self.assertEqual(response.status_code, requests.codes.created)
498 def test_24_omsAttributes_ROADMB_ROADMC(self):
499 # Config ROADMB01-ROADMC01 oms-attributes
501 "auto-spanloss": "true",
502 "engineered-spanloss": 12.2,
503 "link-concatenation": [{
506 "SRLG-length": 100000,
508 response = test_utils_rfc8040.add_oms_attr_request(
509 "ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX", data)
510 self.assertEqual(response.status_code, requests.codes.created)
512 def test_25_omsAttributes_ROADMC_ROADMB(self):
513 # Config ROADMC01-ROADMB01 oms-attributes
515 "auto-spanloss": "true",
516 "engineered-spanloss": 12.2,
517 "link-concatenation": [{
520 "SRLG-length": 100000,
522 response = test_utils_rfc8040.add_oms_attr_request(
523 "ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX", data)
524 self.assertEqual(response.status_code, requests.codes.created)
526 def test_26_getClliNetwork(self):
527 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
528 self.assertEqual(response['status_code'], requests.codes.ok)
529 listNode = ['NodeA', 'NodeB', 'NodeC']
530 for val in response['network'][0]['node']:
531 nodeId = val['node-id']
532 self.assertIn(nodeId, listNode)
533 self.assertEqual(val['org-openroadm-clli-network:clli'], nodeId)
534 listNode.remove(nodeId)
535 self.assertEqual(len(listNode), 0)
537 def test_27_verifyDegree(self):
538 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
539 self.assertEqual(response['status_code'], requests.codes.ok)
540 listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
541 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
542 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
543 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
544 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
545 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
546 for val in response['network'][0]['ietf-network-topology:link']:
547 if val['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
548 link_id = val['link-id']
549 find = link_id in listR2RLink
550 self.assertEqual(find, True)
551 listR2RLink.remove(link_id)
552 self.assertEqual(len(listR2RLink), 0)
554 def test_28_verifyOppositeLinkTopology(self):
555 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
556 self.assertEqual(response['status_code'], requests.codes.ok)
557 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 34)
558 for val in response['network'][0]['ietf-network-topology:link']:
559 link_id = val['link-id']
560 link_type = val['org-openroadm-common-network:link-type']
561 link_src = val['source']['source-node']
562 link_dest = val['destination']['dest-node']
563 oppLink_id = val['org-openroadm-common-network:opposite-link']
564 # Find the opposite link
565 res_oppLink = test_utils_rfc8040.get_ietf_network_link_request('openroadm-topology', oppLink_id, 'config')
566 self.assertEqual(res_oppLink['status_code'], requests.codes.ok)
567 self.assertEqual(res_oppLink['link']['org-openroadm-common-network:opposite-link'], link_id)
568 self.assertEqual(res_oppLink['link']['source']['source-node'], link_dest)
569 self.assertEqual(res_oppLink['link']['destination']['dest-node'], link_src)
570 oppLink_type = res_oppLink['link']['org-openroadm-common-network:link-type']
571 CHECK_DICT = {'ADD-LINK': 'DROP-LINK', 'DROP-LINK': 'ADD-LINK',
572 'EXPRESS-LINK': 'EXPRESS-LINK', 'ROADM-TO-ROADM': 'ROADM-TO-ROADM',
573 'XPONDER-INPUT': 'XPONDER-OUTPUT', 'XPONDER-OUTUT': 'XPONDER-INPUT'}
574 if link_type in CHECK_DICT:
575 self.assertEqual(oppLink_type, CHECK_DICT[link_type])
577 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
578 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
579 self.assertEqual(response['status_code'], requests.codes.ok)
580 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
581 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
582 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
583 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
584 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
585 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
586 for val in response['network'][0]['ietf-network-topology:link']:
587 link_id = val['link-id']
588 if link_id in R2RLink:
590 spanLoss = val['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
591 # pylint: disable=line-too-long
592 length = val['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
593 if (spanLoss is not None) & (length is not None):
595 self.assertTrue(find)
596 R2RLink.remove(link_id)
597 self.assertEqual(len(R2RLink), 0)
599 def test_30_disconnect_ROADMB(self):
600 # Delete in the topology-netconf
601 response = test_utils_rfc8040.unmount_device("ROADMB01")
602 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
603 # Delete in the clli-network
604 response = test_utils_rfc8040.del_ietf_network_node_request('clli-network', 'NodeB', 'config')
605 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
607 def test_31_disconnect_ROADMC(self):
608 response = test_utils_rfc8040.unmount_device("ROADMC01")
609 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
610 # Delete in the clli-network
611 response = test_utils_rfc8040.del_ietf_network_node_request('clli-network', 'NodeC', 'config')
612 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
614 def test_32_getNodes_OpenRoadmTopology(self):
615 # pylint: disable=redundant-unittest-assert
616 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
617 self.assertEqual(response['status_code'], requests.codes.ok)
618 self.assertEqual(len(response['network'][0]['node']), 5)
619 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
620 for val in response['network'][0]['node']:
621 nodeType = val['org-openroadm-common-network:node-type']
622 nodeId = val['node-id']
623 # Tests related to XPDRA nodes
624 if nodeId == 'XPDRA01-XPDR1':
625 for val2 in val['ietf-network-topology:termination-point']:
627 if tpid == 'XPDR1-CLIENT1':
628 self.assertEqual(val2['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
629 if tpid == 'XPDR1-NETWORK1':
630 self.assertEqual(val2['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
631 # pylint: disable=line-too-long
632 self.assertEqual(val2['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
633 'ROADMA01-SRG1--SRG1-PP1-TXRX')
634 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, val['supporting-node'])
635 listNode.remove(nodeId)
636 # Tests related to ROADMA nodes
637 elif nodeId in self.CHECK_DICT1:
638 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
639 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
640 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
641 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
642 self.assertIn(item, val['ietf-network-topology:termination-point'])
643 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
644 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
645 listNode.remove(nodeId)
647 self.assertFalse(True)
648 self.assertEqual(len(listNode), 0)
649 # Test related to SRG1 of ROADMC
650 for val in response['network'][0]['node']:
651 self.assertNotEqual(val['node-id'], 'ROADMC01-SRG1')
652 self.assertNotEqual(val['node-id'], 'ROADMC01-DEG1')
653 self.assertNotEqual(val['node-id'], 'ROADMC01-DEG2')
655 def test_33_getOpenRoadmNetwork(self):
656 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
657 self.assertEqual(response['status_code'], requests.codes.ok)
658 self.assertEqual(len(response['network'][0]['node']), 2)
659 for val in response['network'][0]['node']:
660 self.assertNotEqual(val['node-id'], 'ROADMC01')
661 self.assertNotEqual(val['node-id'], 'ROADMB01')
663 def test_34_getClliNetwork(self):
664 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
665 self.assertEqual(response['status_code'], requests.codes.ok)
666 self.assertEqual(len(response['network'][0]['node']), 1)
667 self.assertNotEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeC')
669 def test_35_disconnect_XPDRA(self):
670 response = test_utils_rfc8040.unmount_device("XPDRA01")
671 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
673 def test_36_getClliNetwork(self):
674 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
675 self.assertEqual(response['status_code'], requests.codes.ok)
676 self.assertEqual(len(response['network'][0]['node']), 1)
677 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
679 def test_37_getOpenRoadmNetwork(self):
680 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
681 self.assertEqual(response['status_code'], requests.codes.ok)
682 self.assertEqual(len(response['network'][0]['node']), 1)
683 self.assertNotEqual(response['network'][0]['node'][0]['node-id'], 'XPDRA01')
685 def test_38_getNodes_OpenRoadmTopology(self):
686 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
687 self.assertEqual(response['status_code'], requests.codes.ok)
688 self.assertEqual(len(response['network'][0]['node']), 4)
689 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
690 for val in response['network'][0]['node']:
691 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
692 nodeType = val['org-openroadm-common-network:node-type']
693 nodeId = val['node-id']
694 self.assertIn(nodeId, self.CHECK_DICT1)
695 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
696 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
697 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
698 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
699 self.assertIn(item, val['ietf-network-topology:termination-point'])
700 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
701 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
702 listNode.remove(nodeId)
703 self.assertEqual(len(listNode), 0)
705 def test_39_disconnect_ROADM_XPDRA_link(self):
707 response = test_utils_rfc8040.del_ietf_network_link_request(
708 'openroadm-topology',
709 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX',
712 response2 = test_utils_rfc8040.del_ietf_network_link_request(
713 'openroadm-topology',
714 'ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1',
716 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
717 self.assertIn(response2.status_code, (requests.codes.ok, requests.codes.no_content))
719 def test_40_getLinks_OpenRoadmTopology(self):
720 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
721 self.assertEqual(response['status_code'], requests.codes.ok)
722 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 16)
723 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
724 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
725 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
726 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
727 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
728 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
729 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
730 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
731 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
732 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
735 for val in response['network'][0]['ietf-network-topology:link']:
736 linkId = val['link-id']
737 linkType = val['org-openroadm-common-network:link-type']
738 if linkType in check_list:
739 find = linkId in check_list[linkType]
740 self.assertEqual(find, True)
741 (check_list[linkType]).remove(linkId)
743 roadmtoroadmLink += 1
744 for val in check_list.values():
745 self.assertEqual(len(val), 0)
746 self.assertEqual(roadmtoroadmLink, 6)
747 for val in response['network'][0]['ietf-network-topology:link']:
748 self.assertNotEqual(val['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
749 self.assertNotEqual(val['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
751 def test_41_disconnect_ROADMA(self):
752 response = test_utils_rfc8040.unmount_device("ROADMA01")
753 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
754 # Delete in the clli-network
755 response = test_utils_rfc8040.del_ietf_network_node_request('clli-network', 'NodeA', 'config')
756 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
758 def test_42_getClliNetwork(self):
759 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
760 self.assertEqual(response['status_code'], requests.codes.ok)
761 self.assertNotIn('node', response['network'][0])
763 def test_43_getOpenRoadmNetwork(self):
764 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
765 self.assertEqual(response['status_code'], requests.codes.ok)
766 self.assertNotIn('node', response['network'][0])
768 def test_44_check_roadm2roadm_link_persistence(self):
769 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
770 self.assertEqual(response['status_code'], requests.codes.ok)
771 self.assertNotIn('node', response['network'][0])
772 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 6)
775 if __name__ == "__main__":
776 unittest.main(verbosity=2)