3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040 # nopep8
26 class TransportPCERendererTesting(unittest.TestCase):
29 NODE_VERSION = '1.2.1'
33 cls.processes = test_utils_rfc8040.start_tpce()
34 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
37 def tearDownClass(cls):
38 # pylint: disable=not-an-iterable
39 for process in cls.processes:
40 test_utils_rfc8040.shutdown_process(process)
41 print("all processes killed")
44 # pylint: disable=consider-using-f-string
45 print("execution of {}".format(self.id().split(".")[-1]))
48 def test_01_rdm_device_connected(self):
49 response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
50 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
52 def test_02_xpdr_device_connected(self):
53 response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
54 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
56 def test_03_rdm_portmapping(self):
57 response = test_utils_rfc8040.get_portmapping("ROADMA01")
58 self.assertEqual(response['status_code'], requests.codes.ok)
60 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
61 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
62 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
63 response['nodes'][0]['mapping'])
65 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
66 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional',
67 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
68 response['nodes'][0]['mapping'])
70 def test_04_xpdr_portmapping(self):
71 response = test_utils_rfc8040.get_portmapping("XPDRA01")
72 self.assertEqual(response['status_code'], requests.codes.ok)
74 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
75 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
76 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
77 'lcp-hash-val': 'OSvMgUyP+mE=', 'xponder-type': 'tpdr',
78 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
79 response['nodes'][0]['mapping'])
81 {'supporting-port': 'C1',
82 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
83 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
84 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
85 'lcp-hash-val': 'AO9UFkY/TLYw', 'xponder-type': 'tpdr',
86 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
87 response['nodes'][0]['mapping'])
89 def test_05_service_path_create(self):
92 'service-name': 'service_test',
94 'modulation-format': 'dp-qpsk',
95 'operation': 'create',
96 'nodes': [{'node-id': 'ROADMA01', 'src-tp': 'SRG1-PP7-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
97 {'node-id': 'XPDRA01', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
102 'lower-spectral-slot-number': 713,
103 'higher-spectral-slot-number': 720
106 response = test_utils_rfc8040.device_renderer_service_path_request(data)
107 self.assertEqual(response.status_code, requests.codes.ok)
109 def test_06_service_path_create_rdm_check(self):
110 response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
111 self.assertEqual(response['status_code'], requests.codes.ok)
112 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
113 self.assertDictEqual(
115 'name': 'DEG1-TTP-TXRX-713:720',
116 'administrative-state': 'inService',
117 'supporting-circuit-pack-name': '2/0',
118 'type': 'org-openroadm-interfaces:opticalChannel',
119 'supporting-port': 'L1'
120 }, **response['interface'][0]), response['interface'][0])
121 self.assertDictEqual(
122 {'wavelength-number': 7},
123 response['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
125 def test_07_service_path_create_rdm_check(self):
126 response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "SRG1-PP7-TXRX-713:720")
127 self.assertEqual(response['status_code'], requests.codes.ok)
128 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
129 self.assertDictEqual(
131 'name': 'SRG1-PP7-TXRX-713:720',
132 'administrative-state': 'inService',
133 'supporting-circuit-pack-name': '4/0',
134 'type': 'org-openroadm-interfaces:opticalChannel',
135 'supporting-port': 'C7'
136 }, **response['interface'][0]), response['interface'][0])
137 self.assertDictEqual(
138 {'wavelength-number': 7},
139 response['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
141 def test_08_service_path_create_rdm_check(self):
142 response = test_utils_rfc8040.check_node_attribute_request(
143 "ROADMA01", "roadm-connections", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
144 self.assertEqual(response['status_code'], requests.codes.ok)
145 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
146 self.assertDictEqual(
148 'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720',
149 'wavelength-number': 7,
150 'opticalControlMode': 'off'
151 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
152 self.assertDictEqual({'src-if': 'SRG1-PP7-TXRX-713:720'}, response['roadm-connections'][0]['source'])
153 self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-713:720'}, response['roadm-connections'][0]['destination'])
155 def test_09_service_path_create_xpdr_check(self):
156 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-713:720")
157 self.assertEqual(response['status_code'], requests.codes.ok)
158 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
159 self.assertDictEqual(
161 'name': 'XPDR1-NETWORK1-713:720',
162 'administrative-state': 'inService',
163 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
164 'type': 'org-openroadm-interfaces:opticalChannel',
165 'supporting-port': '1'
166 }, **response['interface'][0]), response['interface'][0])
168 response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
169 [{'rate': 'org-openroadm-optical-channel-interfaces:R100G',
170 'transmit-power': '-5',
171 'wavelength-number': 7,
172 'modulation-format': 'dp-qpsk'},
173 {'rate': 'org-openroadm-optical-channel-interfaces:R100G',
174 'transmit-power': -5,
175 'wavelength-number': 7,
176 'modulation-format': 'dp-qpsk'}])
178 def test_10_service_path_create_xpdr_check(self):
179 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-OTU")
180 self.assertEqual(response['status_code'], requests.codes.ok)
181 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
182 self.assertDictEqual(
184 'name': 'XPDR1-NETWORK1-OTU',
185 'administrative-state': 'inService',
186 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
187 'type': 'org-openroadm-interfaces:otnOtu',
188 'supporting-port': '1',
189 'supporting-interface': 'XPDR1-NETWORK1-713:720'
190 }, **response['interface'][0]), response['interface'][0])
191 self.assertDictEqual(
192 {'rate': 'org-openroadm-otn-otu-interfaces:OTU4', 'fec': 'scfec'},
193 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
195 def test_11_service_path_create_xpdr_check(self):
196 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-ODU")
197 self.assertEqual(response['status_code'], requests.codes.ok)
198 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
199 self.assertDictEqual(
201 'name': 'XPDR1-NETWORK1-ODU',
202 'administrative-state': 'inService',
203 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
204 'type': 'org-openroadm-interfaces:otnOdu',
205 'supporting-port': '1',
206 'supporting-interface': 'XPDR1-NETWORK1-OTU'
207 }, **response['interface'][0]), response['interface'][0])
208 self.assertDictEqual(
210 'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
211 'monitoring-mode': 'terminated'
212 }, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
213 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
215 self.assertDictEqual({'exp-payload-type': '07', 'payload-type': '07'},
216 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
218 def test_12_service_path_create_xpdr_check(self):
219 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-CLIENT1-ETHERNET")
220 self.assertEqual(response['status_code'], requests.codes.ok)
221 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
222 self.assertDictEqual(
224 'name': 'XPDR1-CLIENT1-ETHERNET',
225 'administrative-state': 'inService',
226 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
227 'type': 'org-openroadm-interfaces:ethernetCsmacd',
228 'supporting-port': 'C1'
229 }, **response['interface'][0]), response['interface'][0])
230 self.assertDictEqual(
233 'auto-negotiation': 'enabled',
236 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
238 def test_13_service_path_create_xpdr_check(self):
239 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "circuit-packs", "1%2F0%2F1-PLUG-NET")
240 # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
241 self.assertEqual(response['status_code'], requests.codes.ok)
242 self.assertIn('not-reserved-inuse', response['circuit-packs'][0]["equipment-state"])
244 def test_14_service_path_delete(self):
247 'service-name': 'service_test',
249 'modulation-format': 'dp-qpsk',
250 'operation': 'delete',
251 'nodes': [{'node-id': 'ROADMA01', 'src-tp': 'SRG1-PP7-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
252 {'node-id': 'XPDRA01', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
253 'center-freq': 195.8,
257 'lower-spectral-slot-number': 713,
258 'higher-spectral-slot-number': 720
261 response = test_utils_rfc8040.device_renderer_service_path_request(data)
262 self.assertEqual(response.status_code, requests.codes.ok)
263 self.assertIn(response.json(),
264 ({'output': {'result': 'Request processed', 'success': True}},
265 {'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
267 def test_15_service_path_delete_rdm_check(self):
268 response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
269 self.assertEqual(response['status_code'], requests.codes.conflict)
271 def test_16_service_path_delete_rdm_check(self):
272 response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "SRG1-PP7-TXRX-713:720")
273 self.assertEqual(response['status_code'], requests.codes.conflict)
275 def test_17_service_path_delete_rdm_check(self):
276 response = test_utils_rfc8040.check_node_attribute_request(
277 "ROADMA01", "roadm-connections", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
278 self.assertEqual(response['status_code'], requests.codes.conflict)
280 def test_18_service_path_delete_xpdr_check(self):
281 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-713:720")
282 self.assertEqual(response['status_code'], requests.codes.conflict)
284 def test_19_service_path_delete_xpdr_check(self):
285 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-OTU")
286 self.assertEqual(response['status_code'], requests.codes.conflict)
288 def test_20_service_path_delete_xpdr_check(self):
289 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-ODU")
290 self.assertEqual(response['status_code'], requests.codes.conflict)
292 def test_21_service_path_delete_xpdr_check(self):
293 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-CLIENT1-ETHERNET")
294 self.assertEqual(response['status_code'], requests.codes.conflict)
296 def test_22_service_path_delete_xpdr_check(self):
297 response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "circuit-packs", "1%2F0%2F1-PLUG-NET")
298 self.assertEqual(response['status_code'], requests.codes.ok)
299 self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
301 def test_23_rdm_device_disconnected(self):
302 response = test_utils_rfc8040.unmount_device("ROADMA01")
303 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
305 def test_24_xpdr_device_disconnected(self):
306 response = test_utils_rfc8040.unmount_device("XPDRA01")
307 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
310 if __name__ == "__main__":
311 unittest.main(verbosity=2, failfast=False)