3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
35 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
36 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
37 'org-openroadm-common-network:operational-state': 'inService'}),
38 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
39 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
40 'org-openroadm-common-network:operational-state': 'inService'})]
44 'checks_tp': [({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
45 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
46 'org-openroadm-common-network:operational-state': 'inService'}),
47 ({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
48 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
49 'org-openroadm-common-network:operational-state': 'inService'})]
52 'node_type': 'DEGREE',
53 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
54 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
55 'org-openroadm-common-network:operational-state': 'inService'}),
56 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
57 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
58 'org-openroadm-common-network:operational-state': 'inService'})]
61 'node_type': 'DEGREE',
62 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
63 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
64 'org-openroadm-common-network:operational-state': 'inService'}),
65 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
66 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
67 'org-openroadm-common-network:operational-state': 'inService'})]
73 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
74 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
75 'org-openroadm-common-network:operational-state': 'inService'}),
76 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
77 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
78 'org-openroadm-common-network:operational-state': 'inService'})]
81 'node_type': 'DEGREE',
82 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
83 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
84 'org-openroadm-common-network:operational-state': 'inService'}),
85 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
86 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
87 'org-openroadm-common-network:operational-state': 'inService'})]
90 'node_type': 'DEGREE',
91 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
92 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
93 'org-openroadm-common-network:operational-state': 'inService'}),
94 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
95 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
96 'org-openroadm-common-network:operational-state': 'inService'})]
99 NODE_VERSION = '2.2.1'
103 cls.processes = test_utils_rfc8040.start_tpce()
104 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION),
105 ('roadmb', cls.NODE_VERSION), ('roadmc', cls.NODE_VERSION)])
108 def tearDownClass(cls):
109 # pylint: disable=not-an-iterable
110 for process in cls.processes:
111 test_utils_rfc8040.shutdown_process(process)
112 print("all processes killed")
117 def test_01_connect_ROADM_A1(self):
118 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
119 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
121 def test_02_getClliNetwork(self):
122 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
123 self.assertEqual(response['status_code'], requests.codes.ok)
124 logging.info(response)
125 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
126 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
128 def test_03_getOpenRoadmNetwork(self):
129 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
130 self.assertEqual(response['status_code'], requests.codes.ok)
131 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'ROADM-A1')
132 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
133 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
134 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
135 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
137 def test_04_getLinks_OpenroadmTopology(self):
138 # pylint: disable=redundant-unittest-assert
139 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
140 self.assertEqual(response['status_code'], requests.codes.ok)
141 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 10)
142 check_list = {'EXPRESS-LINK': ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
143 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX'],
144 'ADD-LINK': ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
145 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
146 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
147 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX'],
148 'DROP-LINK': ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
149 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
150 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
151 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
153 for val in response['network'][0]['ietf-network-topology:link']:
154 linkId = val['link-id']
155 linkType = val['org-openroadm-common-network:link-type']
156 self.assertIn(linkType, check_list)
157 find = linkId in check_list[linkType]
158 self.assertEqual(find, True)
159 (check_list[linkType]).remove(linkId)
160 for val in check_list.values():
161 self.assertEqual(len(val), 0)
163 def test_05_getNodes_OpenRoadmTopology(self):
164 # pylint: disable=redundant-unittest-assert
165 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
166 self.assertEqual(response['status_code'], requests.codes.ok)
167 self.assertEqual(len(response['network'][0]['node']), 4)
168 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
169 for val in response['network'][0]['node']:
170 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
171 nodeType = val['org-openroadm-common-network:node-type']
172 nodeId = val['node-id']
173 self.assertIn(nodeId, self.CHECK_DICT1)
174 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
175 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
176 self.assertEqual(len(val['ietf-network-topology:termination-point']), 5)
177 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
178 self.assertIn(item, val['ietf-network-topology:termination-point'])
179 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
180 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
181 listNode.remove(nodeId)
182 self.assertEqual(len(listNode), 0)
184 def test_06_connect_XPDRA(self):
185 response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
186 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
188 def test_07_getClliNetwork(self):
189 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
190 self.assertEqual(response['status_code'], requests.codes.ok)
191 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
192 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
194 def test_08_getOpenRoadmNetwork(self):
195 # pylint: disable=redundant-unittest-assert
196 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
197 self.assertEqual(response['status_code'], requests.codes.ok)
198 nbNode = len(response['network'][0]['node'])
199 self.assertEqual(nbNode, 2)
200 for val in response['network'][0]['node']:
201 self.assertEqual(val['supporting-node'][0]['network-ref'], 'clli-network')
202 self.assertEqual(val['supporting-node'][0]['node-ref'], 'NodeA')
203 nodeId = val['node-id']
204 if nodeId == 'XPDR-A1':
205 self.assertEqual(val['org-openroadm-common-network:node-type'], 'XPONDER')
206 elif nodeId == 'ROADM-A1':
207 self.assertEqual(val['org-openroadm-common-network:node-type'], 'ROADM')
209 self.assertFalse(True)
211 self.assertEqual(val['org-openroadm-network:model'], 'model2')
213 def test_09_getNodes_OpenRoadmTopology(self):
214 # pylint: disable=redundant-unittest-assert
215 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
216 self.assertEqual(response['status_code'], requests.codes.ok)
217 self.assertEqual(len(response['network'][0]['node']), 5)
218 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
219 for val in response['network'][0]['node']:
220 nodeType = val['org-openroadm-common-network:node-type']
221 nodeId = val['node-id']
222 # Tests related to XPDRA nodes
223 if nodeId == 'XPDR-A1-XPDR1':
224 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'}, val['supporting-node'])
225 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
226 self.assertEqual(nodeType, 'XPONDER')
229 for val2 in val['ietf-network-topology:termination-point']:
230 tpType = (val2['org-openroadm-common-network:tp-type'])
232 if tpType == 'XPONDER-CLIENT':
234 elif tpType == 'XPONDER-NETWORK':
236 if tpId == 'XPDR1-NETWORK2':
237 self.assertEqual(val2['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT2')
238 if tpId == 'XPDR1-CLIENT2':
239 self.assertEqual(val2['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
240 self.assertEqual(client, 2)
241 self.assertEqual(network, 2)
242 listNode.remove(nodeId)
243 elif nodeId in self.CHECK_DICT1:
244 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
245 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
246 self.assertEqual(len(val['ietf-network-topology:termination-point']), 5)
247 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
248 self.assertIn(item, val['ietf-network-topology:termination-point'])
249 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
250 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
251 listNode.remove(nodeId)
253 self.assertFalse(True)
254 self.assertEqual(len(listNode), 0)
256 # Connect the tail XPDRA to ROADMA and vice versa
257 def test_10_connect_tail_xpdr_rdm(self):
258 # Connect the tail: XPDRA to ROADMA
259 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
260 "ROADM-A1", "1", "SRG1-PP1-TXRX")
261 self.assertEqual(response.status_code, requests.codes.ok)
263 def test_11_connect_tail_rdm_xpdr(self):
264 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
265 "ROADM-A1", "1", "SRG1-PP1-TXRX")
266 self.assertEqual(response.status_code, requests.codes.ok)
268 def test_12_getLinks_OpenRoadmTopology(self):
269 # pylint: disable=redundant-unittest-assert
270 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
271 self.assertEqual(response['status_code'], requests.codes.ok)
272 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 12)
273 check_list = {'EXPRESS-LINK': ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
274 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX'],
275 'ADD-LINK': ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
276 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
277 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
278 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX'],
279 'DROP-LINK': ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
280 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
281 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
282 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX'],
283 'XPONDER-INPUT': ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1'],
284 'XPONDER-OUTPUT': ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
286 for val in response['network'][0]['ietf-network-topology:link']:
287 linkId = val['link-id']
288 linkType = val['org-openroadm-common-network:link-type']
289 self.assertIn(linkType, check_list)
290 find = linkId in check_list[linkType]
291 self.assertEqual(find, True)
292 (check_list[linkType]).remove(linkId)
293 for val in check_list.values():
294 self.assertEqual(len(val), 0)
296 def test_13_connect_ROADMC(self):
297 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
298 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
300 def test_14_omsAttributes_ROADMA_ROADMC(self):
301 # Config ROADMA-ROADMC oms-attributes
303 "auto-spanloss": "true",
304 "engineered-spanloss": 12.2,
305 "link-concatenation": [{
308 "SRLG-length": 100000,
310 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
311 self.assertEqual(response.status_code, requests.codes.created)
313 def test_15_omsAttributes_ROADMC_ROADMA(self):
314 # Config ROADM-C1-ROADM-A1 oms-attributes
316 "auto-spanloss": "true",
317 "engineered-spanloss": 12.2,
318 "link-concatenation": [{
321 "SRLG-length": 100000,
324 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
325 self.assertEqual(response.status_code, requests.codes.created)
327 def test_16_getClliNetwork(self):
328 # pylint: disable=redundant-unittest-assert
329 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
330 self.assertEqual(response['status_code'], requests.codes.ok)
331 listNode = ['NodeA', 'NodeC']
332 for val in response['network'][0]['node']:
333 nodeId = val['node-id']
334 self.assertIn(nodeId, listNode)
335 self.assertEqual(val['org-openroadm-clli-network:clli'], nodeId)
336 listNode.remove(nodeId)
337 self.assertEqual(len(listNode), 0)
339 def test_17_getOpenRoadmNetwork(self):
340 # pylint: disable=redundant-unittest-assert
341 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
342 self.assertEqual(response['status_code'], requests.codes.ok)
343 nbNode = len(response['network'][0]['node'])
344 self.assertEqual(nbNode, 3)
345 listNode = ['XPDR-A1', 'ROADM-A1', 'ROADM-C1']
346 CHECK_LIST = {'XPDR-A1': {'node-ref': 'NodeA', 'node-type': 'XPONDER'},
347 'ROADM-A1': {'node-ref': 'NodeA', 'node-type': 'ROADM'},
348 'ROADM-C1': {'node-ref': 'NodeC', 'node-type': 'ROADM'}
350 for val in response['network'][0]['node']:
351 self.assertEqual(val['supporting-node'][0]['network-ref'], 'clli-network')
352 nodeId = val['node-id']
353 if nodeId in CHECK_LIST:
354 self.assertEqual(val['supporting-node'][0]['node-ref'], CHECK_LIST[nodeId]['node-ref'])
355 self.assertEqual(val['org-openroadm-common-network:node-type'], CHECK_LIST[nodeId]['node-type'])
356 listNode.remove(nodeId)
358 self.assertFalse(True)
360 self.assertEqual(val['org-openroadm-network:model'], 'model2')
361 self.assertEqual(len(listNode), 0)
363 def test_18_getROADMLinkOpenRoadmTopology(self):
364 # pylint: disable=redundant-unittest-assert
365 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
366 self.assertEqual(response['status_code'], requests.codes.ok)
367 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
368 check_list = {'EXPRESS-LINK': ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
369 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
370 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX',
371 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX'],
372 'ADD-LINK': ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
373 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
374 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
375 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
376 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX',
377 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX'],
378 'DROP-LINK': ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
379 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
380 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
381 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
382 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX',
383 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX'],
384 'ROADM-TO-ROADM': ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
385 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX'],
386 'XPONDER-INPUT': ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1'],
387 'XPONDER-OUTPUT': ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
389 for val in response['network'][0]['ietf-network-topology:link']:
390 linkId = val['link-id']
391 linkType = val['org-openroadm-common-network:link-type']
392 self.assertIn(linkType, check_list)
393 find = linkId in check_list[linkType]
394 self.assertEqual(find, True)
395 (check_list[linkType]).remove(linkId)
396 for val in check_list.values():
397 self.assertEqual(len(val), 0)
399 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
400 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
401 self.assertEqual(response['status_code'], requests.codes.ok)
402 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
403 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
404 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
405 for val in response['network'][0]['ietf-network-topology:link']:
406 link_id = val['link-id']
407 if link_id in R2RLink:
409 spanLoss = val['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
410 # pylint: disable=line-too-long
411 length = val['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
412 if (spanLoss is not None) & (length is not None):
414 self.assertTrue(find)
415 R2RLink.remove(link_id)
416 self.assertEqual(len(R2RLink), 0)
418 def test_20_getNodes_OpenRoadmTopology(self):
419 # pylint: disable=redundant-unittest-assert
420 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
421 self.assertEqual(response['status_code'], requests.codes.ok)
422 self.assertEqual(len(response['network'][0]['node']), 8)
423 listNode = ['XPDR-A1-XPDR1',
424 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2',
425 'ROADM-C1-SRG1', 'ROADM-C1-DEG1', 'ROADM-C1-DEG2']
426 # Tests related to XPDRA nodes
427 for val in response['network'][0]['node']:
428 nodeType = val['org-openroadm-common-network:node-type']
429 nodeId = val['node-id']
430 if nodeId == 'XPDR-A1-XPDR1':
431 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'}, val['supporting-node'])
432 self.assertEqual(nodeType, 'XPONDER')
433 nbTps = len(val['ietf-network-topology:termination-point'])
434 self.assertTrue(nbTps >= 4)
437 for val2 in val['ietf-network-topology:termination-point']:
438 tpType = (val2['org-openroadm-common-network:tp-type'])
439 if tpType == 'XPONDER-CLIENT':
441 elif tpType == 'XPONDER-NETWORK':
443 self.assertTrue(client == 2)
444 self.assertTrue(network == 2)
445 listNode.remove(nodeId)
446 # Tests related to ROADMA nodes
447 elif nodeId in self.CHECK_DICT1:
448 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
449 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
450 self.assertEqual(len(val['ietf-network-topology:termination-point']), 5)
451 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
452 self.assertIn(item, val['ietf-network-topology:termination-point'])
453 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
454 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
455 self.assertEqual(val['org-openroadm-common-network:node-type'], self.CHECK_DICT1[nodeId]['node_type'])
456 listNode.remove(nodeId)
457 # Tests related to ROADMA nodes
458 elif nodeId in self.CHECK_DICT2:
459 self.assertEqual(nodeType, self.CHECK_DICT2[nodeId]['node_type'])
460 if self.CHECK_DICT2[nodeId]['node_type'] == 'SRG':
461 self.assertEqual(len(val['ietf-network-topology:termination-point']), 5)
462 for item in self.CHECK_DICT2[nodeId]['checks_tp']:
463 self.assertIn(item, val['ietf-network-topology:termination-point'])
464 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeC'}, val['supporting-node'])
465 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'}, val['supporting-node'])
466 self.assertEqual(val['org-openroadm-common-network:node-type'], self.CHECK_DICT2[nodeId]['node_type'])
467 listNode.remove(nodeId)
468 self.assertEqual(len(listNode), 0)
470 def test_21_connect_ROADMB(self):
471 response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
472 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
474 def test_22_omsAttributes_ROADMA_ROADMB(self):
475 # Config ROADM-A1-ROADM-B1 oms-attributes
477 "auto-spanloss": "true",
478 "engineered-spanloss": 12.2,
479 "spanloss-current": 12,
480 "spanloss-base": 11.4,
481 "link-concatenation": [{
484 "SRLG-length": 100000,
486 response = test_utils.add_oms_attr_request("ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
487 self.assertEqual(response.status_code, requests.codes.created)
489 def test_23_omsAttributes_ROADMB_ROADMA(self):
490 # Config ROADM-B1-ROADM-A1 oms-attributes
492 "auto-spanloss": "true",
493 "engineered-spanloss": 12.2,
494 "spanloss-current": 12,
495 "spanloss-base": 11.4,
496 "link-concatenation": [{
499 "SRLG-length": 100000,
501 response = test_utils.add_oms_attr_request("ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
502 self.assertEqual(response.status_code, requests.codes.created)
504 def test_24_omsAttributes_ROADMB_ROADMC(self):
505 # Config ROADM-B1-ROADM-C1 oms-attributes
507 "auto-spanloss": "true",
508 "engineered-spanloss": 12.2,
509 "spanloss-current": 12,
510 "spanloss-base": 11.4,
511 "link-concatenation": [{
514 "SRLG-length": 100000,
516 response = test_utils.add_oms_attr_request("ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
517 self.assertEqual(response.status_code, requests.codes.created)
519 def test_25_omsAttributes_ROADMC_ROADMB(self):
520 # Config ROADM-C1-ROADM-B1 oms-attributes
522 "auto-spanloss": "true",
523 "engineered-spanloss": 12.2,
524 "link-concatenation": [{
527 "SRLG-length": 100000,
529 response = test_utils.add_oms_attr_request("ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
530 self.assertEqual(response.status_code, requests.codes.created)
532 def test_26_getClliNetwork(self):
533 # pylint: disable=redundant-unittest-assert
534 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
535 self.assertEqual(response['status_code'], requests.codes.ok)
536 listNode = ['NodeA', 'NodeB', 'NodeC']
537 for val in response['network'][0]['node']:
538 nodeId = val['node-id']
539 self.assertIn(nodeId, listNode)
540 self.assertEqual(val['org-openroadm-clli-network:clli'], nodeId)
541 listNode.remove(nodeId)
542 self.assertEqual(len(listNode), 0)
544 def test_27_verifyDegree(self):
545 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
546 self.assertEqual(response['status_code'], requests.codes.ok)
547 listR2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
548 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
549 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
550 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
551 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
552 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
553 for val in response['network'][0]['ietf-network-topology:link']:
554 if val['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
555 link_id = val['link-id']
556 find = link_id in listR2RLink
557 self.assertEqual(find, True)
558 listR2RLink.remove(link_id)
559 self.assertEqual(len(listR2RLink), 0)
561 def test_28_verifyOppositeLinkTopology(self):
562 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
563 self.assertEqual(response['status_code'], requests.codes.ok)
564 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 30)
565 for val in response['network'][0]['ietf-network-topology:link']:
566 link_id = val['link-id']
567 link_type = val['org-openroadm-common-network:link-type']
568 link_src = val['source']['source-node']
569 link_dest = val['destination']['dest-node']
570 oppLink_id = val['org-openroadm-common-network:opposite-link']
571 # Find the opposite link
572 response_oppLink = test_utils.get_ordm_topo_request("ietf-network-topology:link/"+oppLink_id)
573 self.assertEqual(response_oppLink.status_code, requests.codes.ok)
574 res_oppLink = response_oppLink.json()
575 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
576 ['org-openroadm-common-network:opposite-link'], link_id)
577 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
578 self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
579 oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
580 CHECK_DICT = {'ADD-LINK': 'DROP-LINK', 'DROP-LINK': 'ADD-LINK',
581 'EXPRESS-LINK': 'EXPRESS-LINK', 'ROADM-TO-ROADM': 'ROADM-TO-ROADM',
582 'XPONDER-INPUT': 'XPONDER-OUTPUT', 'XPONDER-OUTUT': 'XPONDER-INPUT'}
583 if link_type in CHECK_DICT:
584 self.assertEqual(oppLink_type, CHECK_DICT[link_type])
586 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
587 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
588 self.assertEqual(response['status_code'], requests.codes.ok)
589 R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
590 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
591 'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
592 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
593 'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
594 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
595 for val in response['network'][0]['ietf-network-topology:link']:
596 link_id = val['link-id']
597 if link_id in R2RLink:
599 spanLoss = val['org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
600 # pylint: disable=line-too-long
601 length = val['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
602 if (spanLoss is not None) & (length is not None):
604 self.assertTrue(find)
605 R2RLink.remove(link_id)
606 self.assertEqual(len(R2RLink), 0)
608 def test_30_disconnect_ROADMB(self):
609 # Delete in the topology-netconf
610 response = test_utils_rfc8040.unmount_device("ROADM-B1")
611 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
612 # Delete in the clli-network
613 response = test_utils.del_node_request("NodeB")
614 self.assertEqual(response.status_code, requests.codes.ok)
616 def test_31_disconnect_ROADMC(self):
617 # Delete in the topology-netconf
618 response = test_utils_rfc8040.unmount_device("ROADM-C1")
619 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
620 # Delete in the clli-network
621 response = test_utils.del_node_request("NodeC")
622 self.assertEqual(response.status_code, requests.codes.ok)
624 def test_32_getNodes_OpenRoadmTopology(self):
625 # pylint: disable=redundant-unittest-assert
626 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
627 self.assertEqual(response['status_code'], requests.codes.ok)
628 self.assertEqual(len(response['network'][0]['node']), 5)
629 listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
630 for val in response['network'][0]['node']:
631 nodeType = val['org-openroadm-common-network:node-type']
632 nodeId = val['node-id']
633 # Tests related to XPDRA nodes
634 if nodeId == 'XPDR-A1-XPDR1':
635 for val2 in val['ietf-network-topology:termination-point']:
637 if tpid == 'XPDR1-CLIENT1':
638 self.assertEqual((val2['org-openroadm-common-network:tp-type']), 'XPONDER-CLIENT')
639 elif tpid == 'XPDR1-NETWORK1':
640 self.assertEqual((val2['org-openroadm-common-network:tp-type']), 'XPONDER-NETWORK')
641 # pylint: disable=line-too-long
642 self.assertEqual((val2['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id']),
643 'ROADM-A1-SRG1--SRG1-PP1-TXRX')
644 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'}, val['supporting-node'])
645 listNode.remove(nodeId)
646 # Tests related to ROADMA nodes
647 elif nodeId in self.CHECK_DICT1:
648 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
649 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
650 self.assertEqual(len(val['ietf-network-topology:termination-point']), 5)
651 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
652 self.assertIn(item, val['ietf-network-topology:termination-point'])
653 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
654 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
655 listNode.remove(nodeId)
657 self.assertFalse(True)
658 self.assertEqual(len(listNode), 0)
659 # Test related to SRG1 of ROADMC
660 for val in response['network'][0]['node']:
661 self.assertNotEqual(val['node-id'], 'ROADM-C1-SRG1')
662 self.assertNotEqual(val['node-id'], 'ROADM-C1-DEG1')
663 self.assertNotEqual(val['node-id'], 'ROADM-C1-DEG2')
665 def test_33_getOpenRoadmNetwork(self):
666 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
667 self.assertEqual(response['status_code'], requests.codes.ok)
668 nbNode = len(response['network'][0]['node'])
669 self.assertEqual(nbNode, 2)
670 for val in response['network'][0]['node']:
671 self.assertNotEqual(val['node-id'], 'ROADM-C1')
672 self.assertNotEqual(val['node-id'], 'ROADM-B1')
674 def test_34_getClliNetwork(self):
675 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
676 self.assertEqual(response['status_code'], requests.codes.ok)
677 self.assertEqual(len(response['network'][0]['node']), 1)
678 self.assertNotEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeC')
680 def test_35_disconnect_XPDRA(self):
681 response = test_utils_rfc8040.unmount_device("XPDR-A1")
682 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
684 def test_36_getClliNetwork(self):
685 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
686 self.assertEqual(response['status_code'], requests.codes.ok)
687 self.assertEqual(len(response['network'][0]['node']), 1)
688 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
690 def test_37_getOpenRoadmNetwork(self):
691 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
692 self.assertEqual(response['status_code'], requests.codes.ok)
693 self.assertEqual(len(response['network'][0]['node']), 1)
694 self.assertNotEqual(response['network'][0]['node'][0]['node-id'], 'XPDR-A1')
696 def test_38_getNodes_OpenRoadmTopology(self):
697 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
698 self.assertEqual(response['status_code'], requests.codes.ok)
699 self.assertEqual(len(response['network'][0]['node']), 4)
700 listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
701 for val in response['network'][0]['node']:
702 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
703 nodeType = val['org-openroadm-common-network:node-type']
704 nodeId = val['node-id']
705 self.assertIn(nodeId, self.CHECK_DICT1)
706 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
707 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
708 self.assertEqual(len(val['ietf-network-topology:termination-point']), 5)
709 for item in self.CHECK_DICT1[nodeId]['checks_tp']:
710 self.assertIn(item, val['ietf-network-topology:termination-point'])
711 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, val['supporting-node'])
712 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'}, val['supporting-node'])
713 listNode.remove(nodeId)
714 self.assertEqual(len(listNode), 0)
716 def test_39_disconnect_ROADM_XPDRA_link(self):
718 response = test_utils.del_link_request("XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX")
719 self.assertEqual(response.status_code, requests.codes.ok)
721 response = test_utils.del_link_request("ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1")
722 self.assertEqual(response.status_code, requests.codes.ok)
724 def test_40_getLinks_OpenRoadmTopology(self):
725 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
726 self.assertEqual(response['status_code'], requests.codes.ok)
727 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 16)
728 check_list = {'EXPRESS-LINK': ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
729 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX'],
730 'ADD-LINK': ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
731 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
732 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
733 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX'],
734 'DROP-LINK': ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
735 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
736 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
737 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
740 for val in response['network'][0]['ietf-network-topology:link']:
741 linkId = val['link-id']
742 linkType = val['org-openroadm-common-network:link-type']
743 if linkType in check_list:
744 find = linkId in check_list[linkType]
745 self.assertEqual(find, True)
746 (check_list[linkType]).remove(linkId)
748 roadmtoroadmLink += 1
749 for val in check_list.values():
750 self.assertEqual(len(val), 0)
751 self.assertEqual(roadmtoroadmLink, 6)
752 for val in response['network'][0]['ietf-network-topology:link']:
753 self.assertNotEqual(val['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
754 self.assertNotEqual(val['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
756 def test_41_disconnect_ROADMA(self):
757 response = test_utils_rfc8040.unmount_device("ROADM-A1")
758 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
759 # Delete in the clli-network
760 response = test_utils.del_node_request("NodeA")
761 self.assertEqual(response.status_code, requests.codes.ok)
763 def test_42_getClliNetwork(self):
764 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
765 self.assertEqual(response['status_code'], requests.codes.ok)
766 self.assertNotIn('node', response['network'][0])
768 def test_43_getOpenRoadmNetwork(self):
769 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
770 self.assertEqual(response['status_code'], requests.codes.ok)
771 self.assertNotIn('node', response['network'][0])
773 def test_44_check_roadm2roadm_link_persistence(self):
774 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
775 self.assertEqual(response['status_code'], requests.codes.ok)
776 self.assertNotIn('node', response['network'][0])
777 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 6)
780 if __name__ == "__main__":
781 unittest.main(verbosity=2)