3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
25 import test_utils_rfc8040 # nopep8
28 class TransportPCETopologyTesting(unittest.TestCase):
31 NODE_VERSION = '1.2.1'
35 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
36 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
37 'org-openroadm-common-network:operational-state': 'inService'}),
38 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
39 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
40 'org-openroadm-common-network:operational-state': 'inService'})]
44 'checks_tp': [({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
45 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
46 'org-openroadm-common-network:operational-state': 'inService'}),
47 ({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
48 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
49 'org-openroadm-common-network:operational-state': 'inService'})]
52 'node_type': 'DEGREE',
53 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
54 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
55 'org-openroadm-common-network:operational-state': 'inService'}),
56 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
57 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
58 'org-openroadm-common-network:operational-state': 'inService'})]
61 'node_type': 'DEGREE',
62 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
63 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
64 'org-openroadm-common-network:operational-state': 'inService'}),
65 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
66 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
67 'org-openroadm-common-network:operational-state': 'inService'})]
73 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
74 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
75 'org-openroadm-common-network:operational-state': 'inService'}),
76 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
77 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
78 'org-openroadm-common-network:operational-state': 'inService'})]
81 'node_type': 'DEGREE',
82 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
83 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
84 'org-openroadm-common-network:operational-state': 'inService'}),
85 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
86 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
87 'org-openroadm-common-network:operational-state': 'inService'})]
90 'node_type': 'DEGREE',
91 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
92 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
93 'org-openroadm-common-network:operational-state': 'inService'}),
94 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
95 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
96 'org-openroadm-common-network:operational-state': 'inService'})]
102 cls.processes = test_utils_rfc8040.start_tpce()
103 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION),
104 ('roadmb', cls.NODE_VERSION), ('roadmc', cls.NODE_VERSION)])
107 def tearDownClass(cls):
108 # pylint: disable=not-an-iterable
109 for process in cls.processes:
110 test_utils_rfc8040.shutdown_process(process)
111 print("all processes killed")
116 def test_01_connect_ROADMA(self):
117 response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
118 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
120 def test_02_getClliNetwork(self):
121 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
122 self.assertEqual(response['status_code'], requests.codes.ok)
123 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
124 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
126 def test_03_getOpenRoadmNetwork(self):
127 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
128 self.assertEqual(response['status_code'], requests.codes.ok)
129 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'ROADMA01')
130 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
131 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
132 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
133 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-network:model'], '2')
135 def test_04_getLinks_OpenroadmTopology(self):
136 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
137 self.assertEqual(response['status_code'], requests.codes.ok)
138 # Tests related to links
139 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 10)
140 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
141 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
142 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
143 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
144 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
145 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
146 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
147 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
148 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
149 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
151 for val in response['network'][0]['ietf-network-topology:link']:
152 linkId = val['link-id']
153 linkType = val['org-openroadm-common-network:link-type']
154 self.assertIn(linkType, check_list)
155 find = linkId in check_list[linkType]
156 self.assertEqual(find, True)
157 (check_list[linkType]).remove(linkId)
158 for val in check_list.values():
159 self.assertEqual(len(val), 0)
161 def test_05_getNodes_OpenRoadmTopology(self):
162 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
163 self.assertEqual(response['status_code'], requests.codes.ok)
164 self.assertEqual(len(response['network'][0]['node']), 4)
165 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
166 for val in response['network'][0]['node']:
167 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
168 nodeType = val['org-openroadm-common-network:node-type']
169 nodeId = val['node-id']
170 self.assertIn(nodeId, self.CHECK_DICT1)
171 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
172 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
173 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
174 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
175 self.assertIn(item, val['ietf-network-topology:termination-point'])
176 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
177 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
178 listNode.remove(nodeId)
180 self.assertEqual(len(listNode), 0)
182 def test_06_connect_XPDRA(self):
183 response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
184 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
186 def test_07_getClliNetwork(self):
187 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
188 self.assertEqual(response['status_code'], requests.codes.ok)
189 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
190 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
192 def test_08_getOpenRoadmNetwork(self):
193 # pylint: disable=redundant-unittest-assert
194 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
195 self.assertEqual(response['status_code'], requests.codes.ok)
196 self.assertEqual(len(response['network'][0]['node']), 2)
197 for val in response['network'][0]['node']:
198 self.assertEqual(val['supporting-node'][0]['network-ref'], 'clli-network')
199 self.assertEqual(val['supporting-node'][0]['node-ref'], 'NodeA')
200 nodeId = val['node-id']
201 if nodeId == 'XPDRA01':
202 self.assertEqual(val['org-openroadm-common-network:node-type'], 'XPONDER')
203 self.assertEqual(val['org-openroadm-network:model'], '1')
204 elif nodeId == 'ROADMA01':
205 self.assertEqual(val['org-openroadm-common-network:node-type'], 'ROADM')
206 self.assertEqual(val['org-openroadm-network:model'], '2')
208 self.assertFalse(True)
210 def test_09_getNodes_OpenRoadmTopology(self):
211 # pylint: disable=redundant-unittest-assert
212 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
213 self.assertEqual(response['status_code'], requests.codes.ok)
214 self.assertEqual(len(response['network'][0]['node']), 5)
215 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
216 for val in response['network'][0]['node']:
217 nodeType = val['org-openroadm-common-network:node-type']
218 nodeId = val['node-id']
219 # Tests related to XPDRA nodes
220 if nodeId == 'XPDRA01-XPDR1':
221 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, val['supporting-node'])
222 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
223 self.assertEqual(nodeType, 'XPONDER')
226 for val2 in val['ietf-network-topology:termination-point']:
227 tpType = val2['org-openroadm-common-network:tp-type']
229 if tpType == 'XPONDER-CLIENT':
231 elif tpType == 'XPONDER-NETWORK':
233 if tpId == 'XPDR1-NETWORK2':
234 self.assertEqual(val2['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
235 elif tpId == 'XPDR1-CLIENT3':
236 self.assertEqual(val2['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
237 self.assertTrue(client == 4)
238 self.assertTrue(network == 2)
239 listNode.remove(nodeId)
240 # Tests related to ROADMA nodes
241 elif nodeId in self.CHECK_DICT1:
242 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
243 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
244 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
245 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
246 self.assertIn(item, val['ietf-network-topology:termination-point'])
247 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
248 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
249 listNode.remove(nodeId)
251 self.assertFalse(True)
252 self.assertEqual(len(listNode), 0)
254 # Connect the tail XPDRA to ROADMA and vice versa
255 def test_10_connect_tail_xpdr_rdm(self):
256 # Connect the tail: XPDRA to ROADMA
257 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
258 "ROADMA01", "1", "SRG1-PP1-TXRX")
259 self.assertEqual(response.status_code, requests.codes.ok)
261 def test_11_connect_tail_rdm_xpdr(self):
262 # Connect the tail: ROADMA to XPDRA
263 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
264 "ROADMA01", "1", "SRG1-PP1-TXRX")
265 self.assertEqual(response.status_code, requests.codes.ok)
267 def test_12_getLinks_OpenRoadmTopology(self):
268 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
269 self.assertEqual(response['status_code'], requests.codes.ok)
270 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 12)
271 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
272 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
273 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
274 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
275 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
276 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
277 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
278 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
279 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
280 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX'],
281 'XPONDER-INPUT': ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1'],
282 'XPONDER-OUTPUT': ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
284 for val in response['network'][0]['ietf-network-topology:link']:
285 linkId = val['link-id']
286 linkType = val['org-openroadm-common-network:link-type']
287 self.assertIn(linkType, check_list)
288 find = linkId in check_list[linkType]
289 self.assertEqual(find, True)
290 (check_list[linkType]).remove(linkId)
291 for val in check_list.values():
292 self.assertEqual(len(val), 0)
294 def test_13_connect_ROADMC(self):
295 response = test_utils_rfc8040.mount_device("ROADMC01", ('roadmc', self.NODE_VERSION))
296 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
298 def test_14_omsAttributes_ROADMA_ROADMC(self):
299 # Config ROADMA01-ROADMC01 oms-attributes
301 "auto-spanloss": "true",
302 "engineered-spanloss": 12.2,
303 "link-concatenation": [{
306 "SRLG-length": 100000,
308 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
309 self.assertEqual(response.status_code, requests.codes.created)
311 def test_15_omsAttributes_ROADMC_ROADMA(self):
312 # Config ROADMC01-ROADMA oms-attributes
314 "auto-spanloss": "true",
315 "engineered-spanloss": 12.2,
316 "link-concatenation": [{
319 "SRLG-length": 100000,
321 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
322 self.assertEqual(response.status_code, requests.codes.created)
324 def test_16_getClliNetwork(self):
325 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
326 self.assertEqual(response['status_code'], requests.codes.ok)
327 listNode = ['NodeA', 'NodeC']
328 for val in response['network'][0]['node']:
329 nodeId = val['node-id']
330 self.assertIn(nodeId, listNode)
331 self.assertEqual(val['org-openroadm-clli-network:clli'], nodeId)
332 listNode.remove(nodeId)
333 self.assertEqual(len(listNode), 0)
335 def test_17_getOpenRoadmNetwork(self):
336 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
337 self.assertEqual(response['status_code'], requests.codes.ok)
338 self.assertEqual(len(response['network'][0]['node']), 3)
339 listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
340 CHECK_LIST = {'XPDRA01': {'node-ref': 'NodeA', 'node-type': 'XPONDER', 'model': '1'},
341 'ROADMA01': {'node-ref': 'NodeA', 'node-type': 'ROADM', 'model': '2'},
342 'ROADMC01': {'node-ref': 'NodeC', 'node-type': 'ROADM', 'model': '2'}
344 for val in response['network'][0]['node']:
345 self.assertEqual(val['supporting-node'][0]['network-ref'], 'clli-network')
346 nodeId = val['node-id']
347 self.assertIn(nodeId, CHECK_LIST)
348 self.assertEqual(val['supporting-node'][0]['node-ref'],
349 CHECK_LIST[nodeId]['node-ref'])
350 self.assertEqual(val['org-openroadm-common-network:node-type'],
351 CHECK_LIST[nodeId]['node-type'])
352 self.assertEqual(val['org-openroadm-network:model'],
353 CHECK_LIST[nodeId]['model'])
354 listNode.remove(nodeId)
355 self.assertEqual(len(listNode), 0)
357 def test_18_getROADMLinkOpenRoadmTopology(self):
358 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
359 self.assertEqual(response['status_code'], requests.codes.ok)
360 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
361 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
362 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
363 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX',
364 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX'],
365 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
366 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
367 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
368 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
369 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX',
370 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX'],
371 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
372 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
373 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
374 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
375 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX',
376 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX'],
377 'ROADM-TO-ROADM': ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
378 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX'],
379 'XPONDER-INPUT': ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1'],
380 'XPONDER-OUTPUT': ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
382 for val in response['network'][0]['ietf-network-topology:link']:
383 linkId = val['link-id']
384 linkType = val['org-openroadm-common-network:link-type']
385 self.assertIn(linkType, check_list)
386 find = linkId in check_list[linkType]
387 self.assertEqual(find, True)
388 (check_list[linkType]).remove(linkId)
389 for val in check_list.values():
390 self.assertEqual(len(val), 0)
392 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
393 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
394 self.assertEqual(response['status_code'], requests.codes.ok)
395 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
396 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
397 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
398 for val in response['network'][0]['ietf-network-topology:link']:
399 link_id = val['link-id']
400 if link_id in R2RLink:
402 spanLoss = val['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
403 # pylint: disable=line-too-long
404 length = val['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
405 if (spanLoss is not None) & (length is not None):
407 self.assertTrue(find)
408 R2RLink.remove(link_id)
409 self.assertEqual(len(R2RLink), 0)
411 def test_20_getNodes_OpenRoadmTopology(self):
412 # pylint: disable=redundant-unittest-assert
413 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
414 self.assertEqual(response['status_code'], requests.codes.ok)
415 self.assertEqual(len(response['network'][0]['node']), 8)
416 listNode = ['XPDRA01-XPDR1',
417 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
418 'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
419 for val in response['network'][0]['node']:
420 nodeType = val['org-openroadm-common-network:node-type']
421 nodeId = val['node-id']
422 # Tests related to XPDRA nodes
423 if nodeId == 'XPDRA01-XPDR1':
424 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, val['supporting-node'])
425 self.assertEqual(nodeType, 'XPONDER')
426 self.assertEqual(len(val['ietf-network-topology:termination-point']), 6)
429 for val2 in val['ietf-network-topology:termination-point']:
430 tpType = val2['org-openroadm-common-network:tp-type']
431 if tpType == 'XPONDER-CLIENT':
433 elif tpType == 'XPONDER-NETWORK':
435 self.assertTrue(client == 4)
436 self.assertTrue(network == 2)
437 listNode.remove(nodeId)
438 # Tests related to ROADMA nodes
439 elif nodeId in self.CHECK_DICT1:
440 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
441 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
442 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
443 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
444 self.assertIn(item, val['ietf-network-topology:termination-point'])
445 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
446 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
447 self.assertEqual(val['org-openroadm-common-network:node-type'], self.CHECK_DICT1[nodeId]['node_type'])
448 listNode.remove(nodeId)
449 # Tests related to ROADMC nodes
450 elif nodeId in self.CHECK_DICT2:
451 self.assertEqual(nodeType, self.CHECK_DICT2[nodeId]['node_type'])
452 if self.CHECK_DICT2[nodeId]['node_type'] == 'SRG':
453 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
454 for item in self.CHECK_DICT2[nodeId]['checks_tp']:
455 self.assertIn(item, val['ietf-network-topology:termination-point'])
456 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeC'}, val['supporting-node'])
457 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'}, val['supporting-node'])
458 self.assertEqual(val['org-openroadm-common-network:node-type'], self.CHECK_DICT2[nodeId]['node_type'])
459 listNode.remove(nodeId)
461 self.assertFalse(True)
462 self.assertEqual(len(listNode), 0)
464 def test_21_connect_ROADMB(self):
465 response = test_utils_rfc8040.mount_device("ROADMB01", ('roadmb', self.NODE_VERSION))
466 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
468 def test_22_omsAttributes_ROADMA_ROADMB(self):
469 # Config ROADMA01-ROADMB01 oms-attributes
471 "auto-spanloss": "true",
472 "engineered-spanloss": 12.2,
473 "link-concatenation": [{
476 "SRLG-length": 100000,
478 response = test_utils.add_oms_attr_request("ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX", data)
479 self.assertEqual(response.status_code, requests.codes.created)
481 def test_23_omsAttributes_ROADMB_ROADMA(self):
482 # Config ROADMB01-ROADMA01 oms-attributes
484 "auto-spanloss": "true",
485 "engineered-spanloss": 12.2,
486 "link-concatenation": [{
489 "SRLG-length": 100000,
491 response = test_utils.add_oms_attr_request("ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX", data)
492 self.assertEqual(response.status_code, requests.codes.created)
494 def test_24_omsAttributes_ROADMB_ROADMC(self):
495 # Config ROADMB01-ROADMC01 oms-attributes
497 "auto-spanloss": "true",
498 "engineered-spanloss": 12.2,
499 "link-concatenation": [{
502 "SRLG-length": 100000,
504 response = test_utils.add_oms_attr_request("ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX", data)
505 self.assertEqual(response.status_code, requests.codes.created)
507 def test_25_omsAttributes_ROADMC_ROADMB(self):
508 # Config ROADMC01-ROADMB01 oms-attributes
510 "auto-spanloss": "true",
511 "engineered-spanloss": 12.2,
512 "link-concatenation": [{
515 "SRLG-length": 100000,
517 response = test_utils.add_oms_attr_request("ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX", data)
518 self.assertEqual(response.status_code, requests.codes.created)
520 def test_26_getClliNetwork(self):
521 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
522 self.assertEqual(response['status_code'], requests.codes.ok)
523 listNode = ['NodeA', 'NodeB', 'NodeC']
524 for val in response['network'][0]['node']:
525 nodeId = val['node-id']
526 self.assertIn(nodeId, listNode)
527 self.assertEqual(val['org-openroadm-clli-network:clli'], nodeId)
528 listNode.remove(nodeId)
529 self.assertEqual(len(listNode), 0)
531 def test_27_verifyDegree(self):
532 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
533 self.assertEqual(response['status_code'], requests.codes.ok)
534 listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
535 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
536 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
537 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
538 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
539 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
540 for val in response['network'][0]['ietf-network-topology:link']:
541 if val['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
542 link_id = val['link-id']
543 find = link_id in listR2RLink
544 self.assertEqual(find, True)
545 listR2RLink.remove(link_id)
546 self.assertEqual(len(listR2RLink), 0)
548 def test_28_verifyOppositeLinkTopology(self):
549 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
550 self.assertEqual(response['status_code'], requests.codes.ok)
551 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 34)
552 for val in response['network'][0]['ietf-network-topology:link']:
553 link_id = val['link-id']
554 link_type = val['org-openroadm-common-network:link-type']
555 link_src = val['source']['source-node']
556 link_dest = val['destination']['dest-node']
557 oppLink_id = val['org-openroadm-common-network:opposite-link']
558 # Find the opposite link
559 response_oppLink = test_utils.get_ordm_topo_request("ietf-network-topology:link/"+oppLink_id)
560 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
561 res_oppLink = response_oppLink.json()
562 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
563 ['org-openroadm-common-network:opposite-link'], link_id)
564 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
565 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
566 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
567 CHECK_DICT = {'ADD-LINK': 'DROP-LINK', 'DROP-LINK': 'ADD-LINK',
568 'EXPRESS-LINK': 'EXPRESS-LINK', 'ROADM-TO-ROADM': 'ROADM-TO-ROADM',
569 'XPONDER-INPUT': 'XPONDER-OUTPUT', 'XPONDER-OUTUT': 'XPONDER-INPUT'}
570 if link_type in CHECK_DICT:
571 self.assertEqual(oppLink_type, CHECK_DICT[link_type])
573 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
574 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
575 self.assertEqual(response['status_code'], requests.codes.ok)
576 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
577 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
578 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
579 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
580 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
581 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
582 for val in response['network'][0]['ietf-network-topology:link']:
583 link_id = val['link-id']
584 if link_id in R2RLink:
586 spanLoss = val['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
587 # pylint: disable=line-too-long
588 length = val['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
589 if (spanLoss is not None) & (length is not None):
591 self.assertTrue(find)
592 R2RLink.remove(link_id)
593 self.assertEqual(len(R2RLink), 0)
595 def test_30_disconnect_ROADMB(self):
596 # Delete in the topology-netconf
597 response = test_utils_rfc8040.unmount_device("ROADMB01")
598 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
599 # Delete in the clli-network
600 response = test_utils.del_node_request("NodeB")
601 self.assertEqual(response.status_code, requests.codes.ok)
603 def test_31_disconnect_ROADMC(self):
604 response = test_utils_rfc8040.unmount_device("ROADMC01")
605 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
606 # Delete in the clli-network
607 response = test_utils.del_node_request("NodeC")
608 self.assertEqual(response.status_code, requests.codes.ok)
610 def test_32_getNodes_OpenRoadmTopology(self):
611 # pylint: disable=redundant-unittest-assert
612 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
613 self.assertEqual(response['status_code'], requests.codes.ok)
614 self.assertEqual(len(response['network'][0]['node']), 5)
615 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
616 for val in response['network'][0]['node']:
617 nodeType = val['org-openroadm-common-network:node-type']
618 nodeId = val['node-id']
619 # Tests related to XPDRA nodes
620 if nodeId == 'XPDRA01-XPDR1':
621 for val2 in val['ietf-network-topology:termination-point']:
623 if tpid == 'XPDR1-CLIENT1':
624 self.assertEqual(val2['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
625 if tpid == 'XPDR1-NETWORK1':
626 self.assertEqual(val2['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
627 # pylint: disable=line-too-long
628 self.assertEqual(val2['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
629 'ROADMA01-SRG1--SRG1-PP1-TXRX')
630 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, val['supporting-node'])
631 listNode.remove(nodeId)
632 # Tests related to ROADMA nodes
633 elif nodeId in self.CHECK_DICT1:
634 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
635 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
636 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
637 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
638 self.assertIn(item, val['ietf-network-topology:termination-point'])
639 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
640 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
641 listNode.remove(nodeId)
643 self.assertFalse(True)
644 self.assertEqual(len(listNode), 0)
645 # Test related to SRG1 of ROADMC
646 for val in response['network'][0]['node']:
647 self.assertNotEqual(val['node-id'], 'ROADMC01-SRG1')
648 self.assertNotEqual(val['node-id'], 'ROADMC01-DEG1')
649 self.assertNotEqual(val['node-id'], 'ROADMC01-DEG2')
651 def test_33_getOpenRoadmNetwork(self):
652 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
653 self.assertEqual(response['status_code'], requests.codes.ok)
654 self.assertEqual(len(response['network'][0]['node']), 2)
655 for val in response['network'][0]['node']:
656 self.assertNotEqual(val['node-id'], 'ROADMC01')
657 self.assertNotEqual(val['node-id'], 'ROADMB01')
659 def test_34_getClliNetwork(self):
660 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
661 self.assertEqual(response['status_code'], requests.codes.ok)
662 self.assertEqual(len(response['network'][0]['node']), 1)
663 self.assertNotEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeC')
665 def test_35_disconnect_XPDRA(self):
666 response = test_utils_rfc8040.unmount_device("XPDRA01")
667 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
669 def test_36_getClliNetwork(self):
670 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
671 self.assertEqual(response['status_code'], requests.codes.ok)
672 self.assertEqual(len(response['network'][0]['node']), 1)
673 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
675 def test_37_getOpenRoadmNetwork(self):
676 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
677 self.assertEqual(response['status_code'], requests.codes.ok)
678 self.assertEqual(len(response['network'][0]['node']), 1)
679 self.assertNotEqual(response['network'][0]['node'][0]['node-id'], 'XPDRA01')
681 def test_38_getNodes_OpenRoadmTopology(self):
682 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
683 self.assertEqual(response['status_code'], requests.codes.ok)
684 self.assertEqual(len(response['network'][0]['node']), 4)
685 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
686 for val in response['network'][0]['node']:
687 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
688 nodeType = val['org-openroadm-common-network:node-type']
689 nodeId = val['node-id']
690 self.assertIn(nodeId, self.CHECK_DICT1)
691 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
692 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
693 self.assertEqual(len(val['ietf-network-topology:termination-point']), 17)
694 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
695 self.assertIn(item, val['ietf-network-topology:termination-point'])
696 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
697 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, val['supporting-node'])
698 listNode.remove(nodeId)
699 self.assertEqual(len(listNode), 0)
701 def test_39_disconnect_ROADM_XPDRA_link(self):
703 response = test_utils.del_link_request("XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
704 self.assertEqual(response.status_code, requests.codes.ok)
706 response = test_utils.del_link_request("ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1")
707 self.assertEqual(response.status_code, requests.codes.ok)
709 def test_40_getLinks_OpenRoadmTopology(self):
710 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
711 self.assertEqual(response['status_code'], requests.codes.ok)
712 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 16)
713 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
714 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
715 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
716 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
717 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
718 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
719 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
720 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
721 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
722 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
725 for val in response['network'][0]['ietf-network-topology:link']:
726 linkId = val['link-id']
727 linkType = val['org-openroadm-common-network:link-type']
728 if linkType in check_list:
729 find = linkId in check_list[linkType]
730 self.assertEqual(find, True)
731 (check_list[linkType]).remove(linkId)
733 roadmtoroadmLink += 1
734 for val in check_list.values():
735 self.assertEqual(len(val), 0)
736 self.assertEqual(roadmtoroadmLink, 6)
737 for val in response['network'][0]['ietf-network-topology:link']:
738 self.assertNotEqual(val['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
739 self.assertNotEqual(val['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
741 def test_41_disconnect_ROADMA(self):
742 response = test_utils_rfc8040.unmount_device("ROADMA01")
743 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
744 # Delete in the clli-network
745 response = test_utils.del_node_request("NodeA")
746 self.assertEqual(response.status_code, requests.codes.ok)
748 def test_42_getClliNetwork(self):
749 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
750 self.assertEqual(response['status_code'], requests.codes.ok)
751 self.assertNotIn('node', response['network'][0])
753 def test_43_getOpenRoadmNetwork(self):
754 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
755 self.assertEqual(response['status_code'], requests.codes.ok)
756 self.assertNotIn('node', response['network'][0])
758 def test_44_check_roadm2roadm_link_persistence(self):
759 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
760 self.assertEqual(response['status_code'], requests.codes.ok)
761 self.assertNotIn('node', response['network'][0])
762 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 6)
765 if __name__ == "__main__":
766 unittest.main(verbosity=2)