3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils_rfc8040 # nopep8
27 class TransportPCETopologyTesting(unittest.TestCase):
30 NODE_VERSION = '1.2.1'
34 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
35 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
36 'org-openroadm-common-network:operational-state': 'inService'}),
37 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
38 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
39 'org-openroadm-common-network:operational-state': 'inService'})]
43 'checks_tp': [({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
44 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
45 'org-openroadm-common-network:operational-state': 'inService'}),
46 ({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
47 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
48 'org-openroadm-common-network:operational-state': 'inService'})]
51 'node_type': 'DEGREE',
52 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
53 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
54 'org-openroadm-common-network:operational-state': 'inService'}),
55 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
56 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
57 'org-openroadm-common-network:operational-state': 'inService'})]
60 'node_type': 'DEGREE',
61 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
62 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
63 'org-openroadm-common-network:operational-state': 'inService'}),
64 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
65 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
66 'org-openroadm-common-network:operational-state': 'inService'})]
72 'checks_tp': [({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
73 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP',
74 'org-openroadm-common-network:operational-state': 'inService'}),
75 ({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
76 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP',
77 'org-openroadm-common-network:operational-state': 'inService'})]
80 'node_type': 'DEGREE',
81 'checks_tp': [({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
82 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
83 'org-openroadm-common-network:operational-state': 'inService'}),
84 ({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
85 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
86 'org-openroadm-common-network:operational-state': 'inService'})]
89 'node_type': 'DEGREE',
90 'checks_tp': [({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
91 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP',
92 'org-openroadm-common-network:operational-state': 'inService'}),
93 ({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:administrative-state': 'inService',
94 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP',
95 'org-openroadm-common-network:operational-state': 'inService'})]
101 cls.processes = test_utils_rfc8040.start_tpce()
102 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION),
103 ('roadmb', cls.NODE_VERSION), ('roadmc', cls.NODE_VERSION)])
106 def tearDownClass(cls):
107 # pylint: disable=not-an-iterable
108 for process in cls.processes:
109 test_utils_rfc8040.shutdown_process(process)
110 print("all processes killed")
115 def test_01_connect_ROADMA(self):
116 response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
117 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
119 def test_02_getClliNetwork(self):
120 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
121 self.assertEqual(response['status_code'], requests.codes.ok)
122 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
123 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
125 def test_03_getOpenRoadmNetwork(self):
126 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
127 self.assertEqual(response['status_code'], requests.codes.ok)
128 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'ROADMA01')
129 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
130 self.assertEqual(response['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
131 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
132 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-network:model'], '2')
134 def test_04_getLinks_OpenroadmTopology(self):
135 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
136 self.assertEqual(response['status_code'], requests.codes.ok)
137 # Tests related to links
138 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 10)
139 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
140 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
141 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
142 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
143 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
144 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
145 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
146 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
147 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
148 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
150 for link in response['network'][0]['ietf-network-topology:link']:
151 linkId = link['link-id']
152 linkType = link['org-openroadm-common-network:link-type']
153 self.assertIn(linkType, check_list)
154 find = linkId in check_list[linkType]
155 self.assertEqual(find, True)
156 (check_list[linkType]).remove(linkId)
157 for link in check_list.values():
158 self.assertEqual(len(link), 0)
160 def test_05_getNodes_OpenRoadmTopology(self):
161 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
162 self.assertEqual(response['status_code'], requests.codes.ok)
163 self.assertEqual(len(response['network'][0]['node']), 4)
164 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
165 for node in response['network'][0]['node']:
166 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
167 nodeType = node['org-openroadm-common-network:node-type']
168 nodeId = node['node-id']
169 self.assertIn(nodeId, self.CHECK_DICT1)
170 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
171 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
172 self.assertEqual(len(node['ietf-network-topology:termination-point']), 17)
173 for tp in self.CHECK_DICT1[nodeId]['checks_tp']:
174 self.assertIn(tp, node['ietf-network-topology:termination-point'])
175 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, node['supporting-node'])
176 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
177 listNode.remove(nodeId)
179 self.assertEqual(len(listNode), 0)
181 def test_06_connect_XPDRA(self):
182 response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
183 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
185 def test_07_getClliNetwork(self):
186 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
187 self.assertEqual(response['status_code'], requests.codes.ok)
188 self.assertEqual(response['network'][0]['node'][0]['node-id'], 'NodeA')
189 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
191 def test_08_getOpenRoadmNetwork(self):
192 # pylint: disable=redundant-unittest-assert
193 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
194 self.assertEqual(response['status_code'], requests.codes.ok)
195 self.assertEqual(len(response['network'][0]['node']), 2)
196 for node in response['network'][0]['node']:
197 self.assertEqual(node['supporting-node'][0]['network-ref'], 'clli-network')
198 self.assertEqual(node['supporting-node'][0]['node-ref'], 'NodeA')
199 nodeId = node['node-id']
200 if nodeId == 'XPDRA01':
201 self.assertEqual(node['org-openroadm-common-network:node-type'], 'XPONDER')
202 self.assertEqual(node['org-openroadm-network:model'], '1')
203 elif nodeId == 'ROADMA01':
204 self.assertEqual(node['org-openroadm-common-network:node-type'], 'ROADM')
205 self.assertEqual(node['org-openroadm-network:model'], '2')
207 self.assertFalse(True)
209 def test_09_getNodes_OpenRoadmTopology(self):
210 # pylint: disable=redundant-unittest-assert
211 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
212 self.assertEqual(response['status_code'], requests.codes.ok)
213 self.assertEqual(len(response['network'][0]['node']), 5)
214 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
215 for node in response['network'][0]['node']:
216 nodeType = node['org-openroadm-common-network:node-type']
217 nodeId = node['node-id']
218 # Tests related to XPDRA nodes
219 if nodeId == 'XPDRA01-XPDR1':
220 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, node['supporting-node'])
221 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, node['supporting-node'])
222 self.assertEqual(nodeType, 'XPONDER')
225 for tp in node['ietf-network-topology:termination-point']:
226 tpType = tp['org-openroadm-common-network:tp-type']
228 if tpType == 'XPONDER-CLIENT':
230 elif tpType == 'XPONDER-NETWORK':
232 if tpId == 'XPDR1-NETWORK2':
233 self.assertEqual(tp['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
234 elif tpId == 'XPDR1-CLIENT3':
235 self.assertEqual(tp['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
236 self.assertTrue(client == 4)
237 self.assertTrue(network == 2)
238 listNode.remove(nodeId)
239 # Tests related to ROADMA nodes
240 elif nodeId in self.CHECK_DICT1:
241 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
242 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
243 self.assertEqual(len(node['ietf-network-topology:termination-point']), 17)
244 for tp in self.CHECK_DICT1[nodeId]['checks_tp']:
245 self.assertIn(tp, node['ietf-network-topology:termination-point'])
246 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, node['supporting-node'])
247 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
248 listNode.remove(nodeId)
250 self.assertFalse(True)
251 self.assertEqual(len(listNode), 0)
253 # Connect the tail XPDRA to ROADMA and vice versa
254 def test_10_connect_tail_xpdr_rdm(self):
255 # Connect the tail: XPDRA to ROADMA
256 response = test_utils_rfc8040.transportpce_api_rpc_request(
257 'transportpce-networkutils', 'init-xpdr-rdm-links',
258 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
259 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
260 self.assertEqual(response['status_code'], requests.codes.ok)
262 def test_11_connect_tail_rdm_xpdr(self):
263 # Connect the tail: ROADMA to XPDRA
264 response = test_utils_rfc8040.transportpce_api_rpc_request(
265 'transportpce-networkutils', 'init-rdm-xpdr-links',
266 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
267 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
268 self.assertEqual(response['status_code'], requests.codes.ok)
270 def test_12_getLinks_OpenRoadmTopology(self):
271 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
272 self.assertEqual(response['status_code'], requests.codes.ok)
273 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 12)
274 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
275 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
276 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
277 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
278 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
279 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
280 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
281 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
282 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
283 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX'],
284 'XPONDER-INPUT': ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1'],
285 'XPONDER-OUTPUT': ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
287 for link in response['network'][0]['ietf-network-topology:link']:
288 linkId = link['link-id']
289 linkType = link['org-openroadm-common-network:link-type']
290 self.assertIn(linkType, check_list)
291 find = linkId in check_list[linkType]
292 self.assertEqual(find, True)
293 (check_list[linkType]).remove(linkId)
294 for link in check_list.values():
295 self.assertEqual(len(link), 0)
297 def test_13_connect_ROADMC(self):
298 response = test_utils_rfc8040.mount_device("ROADMC01", ('roadmc', self.NODE_VERSION))
299 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
301 def test_14_omsAttributes_ROADMA_ROADMC(self):
302 # Config ROADMA01-ROADMC01 oms-attributes
304 "auto-spanloss": "true",
305 "engineered-spanloss": 12.2,
306 "link-concatenation": [{
309 "SRLG-length": 100000,
311 response = test_utils_rfc8040.add_oms_attr_request(
312 "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
313 self.assertEqual(response.status_code, requests.codes.created)
315 def test_15_omsAttributes_ROADMC_ROADMA(self):
316 # Config ROADMC01-ROADMA oms-attributes
318 "auto-spanloss": "true",
319 "engineered-spanloss": 12.2,
320 "link-concatenation": [{
323 "SRLG-length": 100000,
325 response = test_utils_rfc8040.add_oms_attr_request(
326 "ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
327 self.assertEqual(response.status_code, requests.codes.created)
329 def test_16_getClliNetwork(self):
330 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
331 self.assertEqual(response['status_code'], requests.codes.ok)
332 listNode = ['NodeA', 'NodeC']
333 for node in response['network'][0]['node']:
334 nodeId = node['node-id']
335 self.assertIn(nodeId, listNode)
336 self.assertEqual(node['org-openroadm-clli-network:clli'], nodeId)
337 listNode.remove(nodeId)
338 self.assertEqual(len(listNode), 0)
340 def test_17_getOpenRoadmNetwork(self):
341 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
342 self.assertEqual(response['status_code'], requests.codes.ok)
343 self.assertEqual(len(response['network'][0]['node']), 3)
344 listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
345 CHECK_LIST = {'XPDRA01': {'node-ref': 'NodeA', 'node-type': 'XPONDER', 'model': '1'},
346 'ROADMA01': {'node-ref': 'NodeA', 'node-type': 'ROADM', 'model': '2'},
347 'ROADMC01': {'node-ref': 'NodeC', 'node-type': 'ROADM', 'model': '2'}
349 for node in response['network'][0]['node']:
350 self.assertEqual(node['supporting-node'][0]['network-ref'], 'clli-network')
351 nodeId = node['node-id']
352 self.assertIn(nodeId, CHECK_LIST)
353 self.assertEqual(node['supporting-node'][0]['node-ref'],
354 CHECK_LIST[nodeId]['node-ref'])
355 self.assertEqual(node['org-openroadm-common-network:node-type'],
356 CHECK_LIST[nodeId]['node-type'])
357 self.assertEqual(node['org-openroadm-network:model'],
358 CHECK_LIST[nodeId]['model'])
359 listNode.remove(nodeId)
360 self.assertEqual(len(listNode), 0)
362 def test_18_getROADMLinkOpenRoadmTopology(self):
363 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
364 self.assertEqual(response['status_code'], requests.codes.ok)
365 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
366 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
367 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
368 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX',
369 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX'],
370 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
371 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
372 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
373 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
374 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX',
375 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX'],
376 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
377 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
378 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
379 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
380 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX',
381 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX'],
382 'ROADM-TO-ROADM': ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
383 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX'],
384 'XPONDER-INPUT': ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1'],
385 'XPONDER-OUTPUT': ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
387 for link in response['network'][0]['ietf-network-topology:link']:
388 linkId = link['link-id']
389 linkType = link['org-openroadm-common-network:link-type']
390 self.assertIn(linkType, check_list)
391 find = linkId in check_list[linkType]
392 self.assertEqual(find, True)
393 (check_list[linkType]).remove(linkId)
394 for link in check_list.values():
395 self.assertEqual(len(link), 0)
397 def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
398 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
399 self.assertEqual(response['status_code'], requests.codes.ok)
400 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 20)
401 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
402 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
403 for link in response['network'][0]['ietf-network-topology:link']:
404 link_id = link['link-id']
405 if link_id in R2RLink:
407 spanLoss = link['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
408 # pylint: disable=line-too-long
409 length = link['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
410 if (spanLoss is not None) & (length is not None):
412 self.assertTrue(find)
413 R2RLink.remove(link_id)
414 self.assertEqual(len(R2RLink), 0)
416 def test_20_getNodes_OpenRoadmTopology(self):
417 # pylint: disable=redundant-unittest-assert
418 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
419 self.assertEqual(response['status_code'], requests.codes.ok)
420 self.assertEqual(len(response['network'][0]['node']), 8)
421 listNode = ['XPDRA01-XPDR1',
422 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
423 'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
424 for node in response['network'][0]['node']:
425 nodeType = node['org-openroadm-common-network:node-type']
426 nodeId = node['node-id']
427 # Tests related to XPDRA nodes
428 if nodeId == 'XPDRA01-XPDR1':
429 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, node['supporting-node'])
430 self.assertEqual(nodeType, 'XPONDER')
431 self.assertEqual(len(node['ietf-network-topology:termination-point']), 6)
434 for tp in node['ietf-network-topology:termination-point']:
435 tpType = tp['org-openroadm-common-network:tp-type']
436 if tpType == 'XPONDER-CLIENT':
438 elif tpType == 'XPONDER-NETWORK':
440 self.assertTrue(client == 4)
441 self.assertTrue(network == 2)
442 listNode.remove(nodeId)
443 # Tests related to ROADMA nodes
444 elif nodeId in self.CHECK_DICT1:
445 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
446 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
447 self.assertEqual(len(node['ietf-network-topology:termination-point']), 17)
448 for tp in self.CHECK_DICT1[nodeId]['checks_tp']:
449 self.assertIn(tp, node['ietf-network-topology:termination-point'])
450 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, node['supporting-node'])
451 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
452 self.assertEqual(node['org-openroadm-common-network:node-type'], self.CHECK_DICT1[nodeId]['node_type'])
453 listNode.remove(nodeId)
454 # Tests related to ROADMC nodes
455 elif nodeId in self.CHECK_DICT2:
456 self.assertEqual(nodeType, self.CHECK_DICT2[nodeId]['node_type'])
457 if self.CHECK_DICT2[nodeId]['node_type'] == 'SRG':
458 self.assertEqual(len(node['ietf-network-topology:termination-point']), 17)
459 for tp in self.CHECK_DICT2[nodeId]['checks_tp']:
460 self.assertIn(tp, node['ietf-network-topology:termination-point'])
461 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeC'}, node['supporting-node'])
462 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'}, node['supporting-node'])
463 self.assertEqual(node['org-openroadm-common-network:node-type'], self.CHECK_DICT2[nodeId]['node_type'])
464 listNode.remove(nodeId)
466 self.assertFalse(True)
467 self.assertEqual(len(listNode), 0)
469 def test_21_connect_ROADMB(self):
470 response = test_utils_rfc8040.mount_device("ROADMB01", ('roadmb', self.NODE_VERSION))
471 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
473 def test_22_omsAttributes_ROADMA_ROADMB(self):
474 # Config ROADMA01-ROADMB01 oms-attributes
476 "auto-spanloss": "true",
477 "engineered-spanloss": 12.2,
478 "link-concatenation": [{
481 "SRLG-length": 100000,
483 response = test_utils_rfc8040.add_oms_attr_request(
484 "ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX", data)
485 self.assertEqual(response.status_code, requests.codes.created)
487 def test_23_omsAttributes_ROADMB_ROADMA(self):
488 # Config ROADMB01-ROADMA01 oms-attributes
490 "auto-spanloss": "true",
491 "engineered-spanloss": 12.2,
492 "link-concatenation": [{
495 "SRLG-length": 100000,
497 response = test_utils_rfc8040.add_oms_attr_request(
498 "ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX", data)
499 self.assertEqual(response.status_code, requests.codes.created)
501 def test_24_omsAttributes_ROADMB_ROADMC(self):
502 # Config ROADMB01-ROADMC01 oms-attributes
504 "auto-spanloss": "true",
505 "engineered-spanloss": 12.2,
506 "link-concatenation": [{
509 "SRLG-length": 100000,
511 response = test_utils_rfc8040.add_oms_attr_request(
512 "ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX", data)
513 self.assertEqual(response.status_code, requests.codes.created)
515 def test_25_omsAttributes_ROADMC_ROADMB(self):
516 # Config ROADMC01-ROADMB01 oms-attributes
518 "auto-spanloss": "true",
519 "engineered-spanloss": 12.2,
520 "link-concatenation": [{
523 "SRLG-length": 100000,
525 response = test_utils_rfc8040.add_oms_attr_request(
526 "ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX", data)
527 self.assertEqual(response.status_code, requests.codes.created)
529 def test_26_getClliNetwork(self):
530 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
531 self.assertEqual(response['status_code'], requests.codes.ok)
532 listNode = ['NodeA', 'NodeB', 'NodeC']
533 for node in response['network'][0]['node']:
534 nodeId = node['node-id']
535 self.assertIn(nodeId, listNode)
536 self.assertEqual(node['org-openroadm-clli-network:clli'], nodeId)
537 listNode.remove(nodeId)
538 self.assertEqual(len(listNode), 0)
540 def test_27_verifyDegree(self):
541 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
542 self.assertEqual(response['status_code'], requests.codes.ok)
543 listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
544 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
545 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
546 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
547 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
548 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
549 for link in response['network'][0]['ietf-network-topology:link']:
550 if link['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
551 link_id = link['link-id']
552 find = link_id in listR2RLink
553 self.assertEqual(find, True)
554 listR2RLink.remove(link_id)
555 self.assertEqual(len(listR2RLink), 0)
557 def test_28_verifyOppositeLinkTopology(self):
558 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
559 self.assertEqual(response['status_code'], requests.codes.ok)
560 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 34)
561 for link in response['network'][0]['ietf-network-topology:link']:
562 link_id = link['link-id']
563 link_type = link['org-openroadm-common-network:link-type']
564 link_src = link['source']['source-node']
565 link_dest = link['destination']['dest-node']
566 oppLink_id = link['org-openroadm-common-network:opposite-link']
567 # Find the opposite link
568 res_oppLink = test_utils_rfc8040.get_ietf_network_link_request('openroadm-topology', oppLink_id, 'config')
569 self.assertEqual(res_oppLink['status_code'], requests.codes.ok)
570 self.assertEqual(res_oppLink['link']['org-openroadm-common-network:opposite-link'], link_id)
571 self.assertEqual(res_oppLink['link']['source']['source-node'], link_dest)
572 self.assertEqual(res_oppLink['link']['destination']['dest-node'], link_src)
573 oppLink_type = res_oppLink['link']['org-openroadm-common-network:link-type']
574 CHECK_DICT = {'ADD-LINK': 'DROP-LINK', 'DROP-LINK': 'ADD-LINK',
575 'EXPRESS-LINK': 'EXPRESS-LINK', 'ROADM-TO-ROADM': 'ROADM-TO-ROADM',
576 'XPONDER-INPUT': 'XPONDER-OUTPUT', 'XPONDER-OUTUT': 'XPONDER-INPUT'}
577 if link_type in CHECK_DICT:
578 self.assertEqual(oppLink_type, CHECK_DICT[link_type])
580 def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
581 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
582 self.assertEqual(response['status_code'], requests.codes.ok)
583 R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
584 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
585 'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
586 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
587 'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
588 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
589 for link in response['network'][0]['ietf-network-topology:link']:
590 link_id = link['link-id']
591 if link_id in R2RLink:
593 spanLoss = link['org-openroadm-network-topology:OMS-attributes']['span']['engineered-spanloss']
594 # pylint: disable=line-too-long
595 length = link['org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
596 if (spanLoss is not None) & (length is not None):
598 self.assertTrue(find)
599 R2RLink.remove(link_id)
600 self.assertEqual(len(R2RLink), 0)
602 def test_30_disconnect_ROADMB(self):
603 # Delete in the topology-netconf
604 response = test_utils_rfc8040.unmount_device("ROADMB01")
605 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
606 # Delete in the clli-network
607 response = test_utils_rfc8040.del_ietf_network_node_request('clli-network', 'NodeB', 'config')
608 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
610 def test_31_disconnect_ROADMC(self):
611 response = test_utils_rfc8040.unmount_device("ROADMC01")
612 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
613 # Delete in the clli-network
614 response = test_utils_rfc8040.del_ietf_network_node_request('clli-network', 'NodeC', 'config')
615 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
617 def test_32_getNodes_OpenRoadmTopology(self):
618 # pylint: disable=redundant-unittest-assert
619 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
620 self.assertEqual(response['status_code'], requests.codes.ok)
621 self.assertEqual(len(response['network'][0]['node']), 5)
622 listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
623 for node in response['network'][0]['node']:
624 nodeType = node['org-openroadm-common-network:node-type']
625 nodeId = node['node-id']
626 # Tests related to XPDRA nodes
627 if nodeId == 'XPDRA01-XPDR1':
628 for tp in node['ietf-network-topology:termination-point']:
630 if tpid == 'XPDR1-CLIENT1':
631 self.assertEqual(tp['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
632 if tpid == 'XPDR1-NETWORK1':
633 self.assertEqual(tp['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
634 # pylint: disable=line-too-long
635 self.assertEqual(tp['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
636 'ROADMA01-SRG1--SRG1-PP1-TXRX')
637 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'}, node['supporting-node'])
638 listNode.remove(nodeId)
639 # Tests related to ROADMA nodes
640 elif nodeId in self.CHECK_DICT1:
641 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
642 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
643 self.assertEqual(len(node['ietf-network-topology:termination-point']), 17)
644 for tp in self.CHECK_DICT1[nodeId]['checks_tp']:
645 self.assertIn(tp, node['ietf-network-topology:termination-point'])
646 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, node['supporting-node'])
647 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
648 listNode.remove(nodeId)
650 self.assertFalse(True)
651 self.assertEqual(len(listNode), 0)
652 # Test related to SRG1 of ROADMC
653 for node in response['network'][0]['node']:
654 self.assertNotEqual(node['node-id'], 'ROADMC01-SRG1')
655 self.assertNotEqual(node['node-id'], 'ROADMC01-DEG1')
656 self.assertNotEqual(node['node-id'], 'ROADMC01-DEG2')
658 def test_33_getOpenRoadmNetwork(self):
659 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
660 self.assertEqual(response['status_code'], requests.codes.ok)
661 self.assertEqual(len(response['network'][0]['node']), 2)
662 for node in response['network'][0]['node']:
663 self.assertNotEqual(node['node-id'], 'ROADMC01')
664 self.assertNotEqual(node['node-id'], 'ROADMB01')
666 def test_34_getClliNetwork(self):
667 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
668 self.assertEqual(response['status_code'], requests.codes.ok)
669 self.assertEqual(len(response['network'][0]['node']), 1)
670 self.assertNotEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeC')
672 def test_35_disconnect_XPDRA(self):
673 response = test_utils_rfc8040.unmount_device("XPDRA01")
674 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
676 def test_36_getClliNetwork(self):
677 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
678 self.assertEqual(response['status_code'], requests.codes.ok)
679 self.assertEqual(len(response['network'][0]['node']), 1)
680 self.assertEqual(response['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
682 def test_37_getOpenRoadmNetwork(self):
683 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
684 self.assertEqual(response['status_code'], requests.codes.ok)
685 self.assertEqual(len(response['network'][0]['node']), 1)
686 self.assertNotEqual(response['network'][0]['node'][0]['node-id'], 'XPDRA01')
688 def test_38_getNodes_OpenRoadmTopology(self):
689 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
690 self.assertEqual(response['status_code'], requests.codes.ok)
691 self.assertEqual(len(response['network'][0]['node']), 4)
692 listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
693 for node in response['network'][0]['node']:
694 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
695 nodeType = node['org-openroadm-common-network:node-type']
696 nodeId = node['node-id']
697 self.assertIn(nodeId, self.CHECK_DICT1)
698 self.assertEqual(nodeType, self.CHECK_DICT1[nodeId]['node_type'])
699 if self.CHECK_DICT1[nodeId]['node_type'] == 'SRG':
700 self.assertEqual(len(node['ietf-network-topology:termination-point']), 17)
701 for tp in self.CHECK_DICT1[nodeId]['checks_tp']:
702 self.assertIn(tp, node['ietf-network-topology:termination-point'])
703 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'}, node['supporting-node'])
704 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'}, node['supporting-node'])
705 listNode.remove(nodeId)
706 self.assertEqual(len(listNode), 0)
708 def test_39_disconnect_ROADM_XPDRA_link(self):
710 response = test_utils_rfc8040.del_ietf_network_link_request(
711 'openroadm-topology',
712 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX',
715 response2 = test_utils_rfc8040.del_ietf_network_link_request(
716 'openroadm-topology',
717 'ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1',
719 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
720 self.assertIn(response2.status_code, (requests.codes.ok, requests.codes.no_content))
722 def test_40_getLinks_OpenRoadmTopology(self):
723 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
724 self.assertEqual(response['status_code'], requests.codes.ok)
725 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 16)
726 check_list = {'EXPRESS-LINK': ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
727 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX'],
728 'ADD-LINK': ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
729 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
730 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
731 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX'],
732 'DROP-LINK': ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
733 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
734 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
735 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
738 for link in response['network'][0]['ietf-network-topology:link']:
739 linkId = link['link-id']
740 linkType = link['org-openroadm-common-network:link-type']
741 if linkType in check_list:
742 find = linkId in check_list[linkType]
743 self.assertEqual(find, True)
744 (check_list[linkType]).remove(linkId)
746 roadmtoroadmLink += 1
747 for link in check_list.values():
748 self.assertEqual(len(link), 0)
749 self.assertEqual(roadmtoroadmLink, 6)
750 for link in response['network'][0]['ietf-network-topology:link']:
751 self.assertNotEqual(link['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
752 self.assertNotEqual(link['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
754 def test_41_disconnect_ROADMA(self):
755 response = test_utils_rfc8040.unmount_device("ROADMA01")
756 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
757 # Delete in the clli-network
758 response = test_utils_rfc8040.del_ietf_network_node_request('clli-network', 'NodeA', 'config')
759 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
761 def test_42_getClliNetwork(self):
762 response = test_utils_rfc8040.get_ietf_network_request('clli-network', 'config')
763 self.assertEqual(response['status_code'], requests.codes.ok)
764 self.assertNotIn('node', response['network'][0])
766 def test_43_getOpenRoadmNetwork(self):
767 response = test_utils_rfc8040.get_ietf_network_request('openroadm-network', 'config')
768 self.assertEqual(response['status_code'], requests.codes.ok)
769 self.assertNotIn('node', response['network'][0])
771 def test_44_check_roadm2roadm_link_persistence(self):
772 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
773 self.assertEqual(response['status_code'], requests.codes.ok)
774 self.assertNotIn('node', response['network'][0])
775 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 6)
778 if __name__ == "__main__":
779 unittest.main(verbosity=2)