3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 # from unittest.result import failfast
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040 # nopep8
26 class TransportPCERendererTesting(unittest.TestCase):
29 NODE_VERSION = '2.2.1'
33 cls.processes = test_utils_rfc8040.start_tpce()
34 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
37 def tearDownClass(cls):
38 # pylint: disable=not-an-iterable
39 for process in cls.processes:
40 test_utils_rfc8040.shutdown_process(process)
41 print("all processes killed")
43 def test_01_rdm_device_connected(self):
44 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
45 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
47 def test_02_xpdr_device_connected(self):
48 response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
49 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
51 def test_03_rdm_portmapping(self):
52 response = test_utils_rfc8040.get_portmapping("ROADM-A1")
53 self.assertEqual(response['status_code'], requests.codes.ok)
55 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
56 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
57 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
58 response['nodes'][0]['mapping'])
60 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
61 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional',
62 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
63 response['nodes'][0]['mapping'])
65 def test_04_xpdr_portmapping(self):
66 response = test_utils_rfc8040.get_portmapping("XPDR-A1")
67 self.assertEqual(response['status_code'], requests.codes.ok)
69 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
70 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
71 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
72 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
73 'lcp-hash-val': 'AMkDwQ7xTmRI', 'xponder-type': 'tpdr',
74 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
75 response['nodes'][0]['mapping'])
77 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
78 'supporting-port': 'C1',
79 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
80 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
81 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
82 'lcp-hash-val': 'AJUUr6I5fALj', 'xponder-type': 'tpdr',
83 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
84 response['nodes'][0]['mapping'])
86 def test_05_service_path_create(self):
89 'service-name': 'service_test',
91 'modulation-format': 'dp-qpsk',
92 'operation': 'create',
93 'nodes': [{'node-id': 'ROADM-A1', 'src-tp': 'SRG1-PP3-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
94 {'node-id': 'XPDR-A1', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
99 'lower-spectral-slot-number': 713,
100 'higher-spectral-slot-number': 720
103 response = test_utils_rfc8040.device_renderer_service_path_request(data)
104 self.assertEqual(response.status_code, requests.codes.ok)
106 def test_06_service_path_create_rdm_check(self):
107 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
108 self.assertEqual(response['status_code'], requests.codes.ok)
109 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
110 self.assertDictEqual(
112 'name': 'DEG1-TTP-TXRX-nmc-713:720',
113 'administrative-state': 'inService',
114 'supporting-circuit-pack-name': '1/0',
115 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
116 'supporting-port': 'L1'
117 }, **response['interface'][0]), response['interface'][0]
119 self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
120 [{'frequency': '195.8000', 'width': '40'},
121 {'frequency': 195.8, 'width': 40}])
123 def test_07_service_path_create_rdm_check(self):
124 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
125 self.assertEqual(response['status_code'], requests.codes.ok)
126 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
127 self.assertDictEqual(
129 'name': 'DEG1-TTP-TXRX-mc-7',
130 'administrative-state': 'inService',
131 'supporting-circuit-pack-name': '1/0',
132 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
133 'supporting-port': 'L1'
134 }, **response['interface'][0]), response['interface'][0]
136 self.assertIn(response['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'],
137 [{'min-freq': '195.7750', 'max-freq': '195.8250'},
138 {'min-freq': 195.775, 'max-freq': 195.825}])
140 def test_08_service_path_create_rdm_check(self):
141 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-nmc-713:720")
142 self.assertEqual(response['status_code'], requests.codes.ok)
143 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
144 self.assertDictEqual(
146 'name': 'SRG1-PP3-TXRX-nmc-713:720',
147 'administrative-state': 'inService',
148 'supporting-circuit-pack-name': '3/0',
149 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
150 'supporting-port': 'C3'
151 }, **response['interface'][0]), response['interface'][0]
153 self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
154 [{'frequency': '195.8000', 'width': '40'},
155 {'frequency': 195.8, 'width': 40}])
157 # -mc supporting interfaces must not be created for SRG, only degrees
158 def test_09_service_path_create_rdm_check(self):
159 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-mc-713:720")
160 self.assertEqual(response['status_code'], requests.codes.conflict)
162 def test_10_service_path_create_rdm_check(self):
163 response = test_utils_rfc8040.check_node_attribute_request(
164 "ROADM-A1", "roadm-connections", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
165 self.assertEqual(response['status_code'], requests.codes.ok)
166 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
167 self.assertDictEqual(
169 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720',
170 'opticalControlMode': 'off'
171 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
172 self.assertDictEqual({'src-if': 'SRG1-PP3-TXRX-nmc-713:720'}, response['roadm-connections'][0]['source'])
173 self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'}, response['roadm-connections'][0]['destination'])
175 def test_11_service_path_create_xpdr_check(self):
176 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-713:720")
177 self.assertEqual(response['status_code'], requests.codes.ok)
178 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
179 self.assertDictEqual(
181 'name': 'XPDR1-NETWORK1-713:720',
182 'administrative-state': 'inService',
183 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
184 'type': 'org-openroadm-interfaces:opticalChannel',
185 'supporting-port': '1'
186 }, **response['interface'][0]), response['interface'][0]
189 response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
190 [{'rate': 'org-openroadm-common-types:R100G', 'transmit-power': '-5',
191 'modulation-format': 'dp-qpsk', 'frequency': '195.8000'},
192 {'rate': 'org-openroadm-common-types:R100G', 'transmit-power': -5,
193 'modulation-format': 'dp-qpsk', 'frequency': 195.8}])
195 def test_12_service_path_create_xpdr_check(self):
196 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-OTU")
197 self.assertEqual(response['status_code'], requests.codes.ok)
198 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
199 self.assertDictEqual(
201 'name': 'XPDR1-NETWORK1-OTU',
202 'administrative-state': 'inService',
203 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
204 'type': 'org-openroadm-interfaces:otnOtu',
205 'supporting-port': '1',
206 'supporting-interface': 'XPDR1-NETWORK1-7'
207 }, **response['interface'][0]), response['interface'][0])
208 self.assertDictEqual(
209 dict({'rate': 'org-openroadm-otn-common-types:OTU4', 'fec': 'scfec'},
210 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
211 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
213 def test_13_service_path_create_xpdr_check(self):
214 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-ODU4")
215 self.assertEqual(response['status_code'], requests.codes.ok)
216 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
217 self.assertDictEqual(
219 'name': 'XPDR1-NETWORK1-ODU4',
220 'administrative-state': 'inService',
221 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
222 'type': 'org-openroadm-interfaces:otnOdu',
223 'supporting-port': '1',
224 'supporting-interface': 'XPDR1-NETWORK1-OTU'
225 }, **response['interface'][0]), response['interface'][0])
226 self.assertDictEqual(
227 dict({'rate': 'org-openroadm-otn-common-types:ODU4', 'monitoring-mode': 'terminated'},
228 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
229 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
230 self.assertDictEqual({'exp-payload-type': '07', 'payload-type': '07'},
231 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
233 def test_14_service_path_create_xpdr_check(self):
234 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-CLIENT1-ETHERNET")
235 self.assertEqual(response['status_code'], requests.codes.ok)
236 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
237 self.assertDictEqual(
239 'name': 'XPDR1-CLIENT1-ETHERNET',
240 'administrative-state': 'inService',
241 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
242 'type': 'org-openroadm-interfaces:ethernetCsmacd',
243 'supporting-port': 'C1'
244 }, **response['interface'][0]), response['interface'][0]
246 self.assertDictEqual(
247 dict({'fec': 'off', 'speed': 100000},
248 **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
249 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
251 def test_15_service_path_create_xpdr_check(self):
252 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-NET")
253 self.assertEqual(response['status_code'], requests.codes.ok)
254 self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
255 # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
257 def test_16_service_path_create_xpdr_check(self):
258 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-CLIENT")
259 self.assertEqual(response['status_code'], requests.codes.ok)
260 self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
261 # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
263 def test_17_service_path_delete(self):
266 'service-name': 'service_test',
268 'modulation-format': 'dp-qpsk',
269 'operation': 'delete',
270 'nodes': [{'node-id': 'ROADM-A1', 'src-tp': 'SRG1-PP3-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
271 {'node-id': 'XPDR-A1', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
272 'center-freq': 195.8,
276 'lower-spectral-slot-number': 713,
277 'higher-spectral-slot-number': 720
280 response = test_utils_rfc8040.device_renderer_service_path_request(data)
281 self.assertEqual(response.status_code, requests.codes.ok)
282 self.assertIn(response.json(),
283 ({'output': {'result': 'Request processed', 'success': True}},
284 {'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
286 def test_18_service_path_delete_rdm_check(self):
287 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
288 self.assertEqual(response['status_code'], requests.codes.conflict)
290 def test_19_service_path_delete_rdm_check(self):
291 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
292 self.assertEqual(response['status_code'], requests.codes.conflict)
294 def test_20_service_path_delete_rdm_check(self):
295 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-mc-713:720")
296 self.assertEqual(response['status_code'], requests.codes.conflict)
298 def test_21_service_path_delete_rdm_check(self):
299 response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-nmc-713:720")
300 self.assertEqual(response['status_code'], requests.codes.conflict)
302 def test_22_service_path_delete_rdm_check(self):
303 response = test_utils_rfc8040.check_node_attribute_request(
304 "ROADM-A1", "interface", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
305 self.assertEqual(response['status_code'], requests.codes.conflict)
307 def test_23_service_path_delete_rdm_check(self):
308 response = test_utils_rfc8040.check_node_attribute_request(
309 "ROADM-A1", "roadm-connections", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
310 self.assertEqual(response['status_code'], requests.codes.conflict)
312 def test_24_service_path_delete_xpdr_check(self):
313 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-713:720")
314 self.assertEqual(response['status_code'], requests.codes.conflict)
316 def test_25_service_path_delete_xpdr_check(self):
317 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-OTU")
318 self.assertEqual(response['status_code'], requests.codes.conflict)
320 def test_26_service_path_delete_xpdr_check(self):
321 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-ODU")
322 self.assertEqual(response['status_code'], requests.codes.conflict)
324 def test_27_service_path_delete_xpdr_check(self):
325 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-CLIENT1-ETHERNET")
326 self.assertEqual(response['status_code'], requests.codes.conflict)
328 def test_28_service_path_delete_xpdr_check(self):
329 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-NET")
330 self.assertEqual(response['status_code'], requests.codes.ok)
331 self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
333 def test_29_service_path_delete_xpdr_check(self):
334 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-CLIENT")
335 self.assertEqual(response['status_code'], requests.codes.ok)
336 self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
338 def test_30_rdm_device_disconnected(self):
339 response = test_utils_rfc8040.unmount_device("ROADM-A1")
340 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
342 def test_31_xpdr_device_disconnected(self):
343 response = test_utils_rfc8040.unmount_device("XPDR-A1")
344 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
347 if __name__ == "__main__":
348 unittest.main(verbosity=2, failfast=False)