2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCE400Gtesting(unittest.TestCase):
28 simple_topo_bi_dir_data = None
29 port_mapping_data = None
35 sample_files_parsed = False
36 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
37 "..", "..", "sample_configs",
38 "honeynode-topo400G.json")
39 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
40 cls.topo_bi_dir_data = topo_bi_dir.read()
42 OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
43 "..", "..", "sample_configs",
44 "honeynode-otntopo400G.json")
45 with open(OTN_TOPO_BI_DIR_FILE, 'r') as otn_topo_bi_dir:
46 cls.otn_topo_bi_dir_data = otn_topo_bi_dir.read()
48 OTUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
49 "..", "..", "sample_configs",
50 "honeynode-otntopo400GwithOTUC4.json")
51 with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r') as otuc4_otn_topo_bi_dir:
52 cls.otuc4_otn_topo_bi_dir_data = otuc4_otn_topo_bi_dir.read()
54 ODUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
55 "..", "..", "sample_configs",
56 "honeynode-otntopo400GwithODUC4.json")
57 with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r') as oduc4_otn_topo_bi_dir:
58 cls.oduc4_otn_topo_bi_dir_data = oduc4_otn_topo_bi_dir.read()
60 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
61 "..", "..", "sample_configs",
62 "pce_portmapping_71.json")
63 with open(PORT_MAPPING_FILE, 'r') as port_mapping:
64 cls.port_mapping_data = port_mapping.read()
65 sample_files_parsed = True
66 except PermissionError as err:
67 print("Permission Error when trying to read sample files\n", err)
69 except FileNotFoundError as err:
70 print("File Not found Error when trying to read sample files\n", err)
73 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
76 if sample_files_parsed:
77 print("sample files content loaded")
78 cls.processes = test_utils.start_tpce()
81 def tearDownClass(cls):
82 # pylint: disable=not-an-iterable
83 for process in cls.processes:
84 test_utils.shutdown_process(process)
85 print("all processes killed")
87 def setUp(self): # instruction executed before each test method
88 print("execution of {}".format(self.id().split(".")[-1]))
92 def test_01_load_port_mapping(self):
93 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
94 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
97 # Load openroadm topology
98 def test_02_load_openroadm_topology_bi(self):
99 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.topo_bi_dir_data)
100 self.assertEqual(response.status_code, requests.codes.ok)
103 # Path Computation success
104 def test_03_path_computation_400G_xpdr_bi(self):
105 response = test_utils.path_computation_request("request-1", "service-1",
106 {"node-id": "XPDR-A2", "service-rate": "400",
107 "service-format": "Ethernet", "clli": "nodeA"},
108 {"node-id": "XPDR-C2", "service-rate": "400",
109 "service-format": "Ethernet", "clli": "nodeC"})
110 self.assertEqual(response.status_code, requests.codes.ok)
111 res = response.json()
112 self.assertIn('Path is calculated',
113 res['output']['configuration-response-common']['response-message'])
115 self.assertEqual(1, res['output']['response-parameters']['path-description']
116 ['aToZ-direction']['aToZ-wavelength-number'])
117 self.assertEqual(400, res['output']['response-parameters']['path-description']
118 ['aToZ-direction']['rate'])
119 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
120 ['aToZ-direction']['aToZ-min-frequency'])
121 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
122 ['aToZ-direction']['aToZ-max-frequency'])
123 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
124 ['aToZ-direction']['modulation-format'])
126 self.assertEqual(1, res['output']['response-parameters']['path-description']
127 ['zToA-direction']['zToA-wavelength-number'])
128 self.assertEqual(400, res['output']['response-parameters']['path-description']
129 ['zToA-direction']['rate'])
130 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
131 ['zToA-direction']['zToA-min-frequency'])
132 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
133 ['zToA-direction']['zToA-max-frequency'])
134 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
135 ['zToA-direction']['modulation-format'])
139 def test_04_load_otn_topology_bi(self):
140 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otn_topo_bi_dir_data)
141 self.assertEqual(response.status_code, requests.codes.ok)
144 # Path Computation success
145 def test_05_path_computation_OTUC4_xpdr_bi(self):
146 response = test_utils.path_computation_request("request-1", "service-OTUC4",
147 {"service-rate": "400",
149 "service-format": "OTU",
150 "node-id": "XPDR-A2",
151 "rx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
153 {"service-rate": "400",
155 "service-format": "OTU",
156 "node-id": "XPDR-C2",
157 "rx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Path is calculated',
162 res['output']['configuration-response-common']['response-message'])
164 self.assertEqual(1, res['output']['response-parameters']['path-description']
165 ['aToZ-direction']['aToZ-wavelength-number'])
166 self.assertEqual(400, res['output']['response-parameters']['path-description']
167 ['aToZ-direction']['rate'])
168 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
169 ['aToZ-direction']['aToZ-min-frequency'])
170 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
171 ['aToZ-direction']['aToZ-max-frequency'])
172 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
173 ['aToZ-direction']['modulation-format'])
175 self.assertEqual(1, res['output']['response-parameters']['path-description']
176 ['zToA-direction']['zToA-wavelength-number'])
177 self.assertEqual(400, res['output']['response-parameters']['path-description']
178 ['zToA-direction']['rate'])
179 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
180 ['zToA-direction']['zToA-min-frequency'])
181 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
182 ['zToA-direction']['zToA-max-frequency'])
183 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
184 ['zToA-direction']['modulation-format'])
187 # Load otn topology with OTUC4 links
188 def test_06_load_otuc4_otn_topology_bi(self):
189 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otuc4_otn_topo_bi_dir_data)
190 self.assertEqual(response.status_code, requests.codes.ok)
193 # Path Computation success
194 def test_07_path_computation_ODUC4_xpdr_bi(self):
195 response = test_utils.path_computation_request("request-1", "service-ODUC4",
196 {"service-rate": "400",
198 "service-format": "ODU",
199 "node-id": "XPDR-A2",
200 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
202 {"service-rate": "400",
204 "service-format": "ODU",
205 "node-id": "XPDR-C2",
206 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
208 self.assertEqual(response.status_code, requests.codes.ok)
209 res = response.json()
210 self.assertIn('Path is calculated',
211 res['output']['configuration-response-common']['response-message'])
213 self.assertEqual(400, res['output']['response-parameters']['path-description']
214 ['aToZ-direction']['rate'])
215 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
216 ['aToZ-direction']['modulation-format'])
218 self.assertEqual(400, res['output']['response-parameters']['path-description']
219 ['zToA-direction']['rate'])
220 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
221 ['zToA-direction']['modulation-format'])
224 # Load otn topology with OTUC4 links
225 def test_08_load_oduc4_otn_topology_bi(self):
226 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.oduc4_otn_topo_bi_dir_data)
227 self.assertEqual(response.status_code, requests.codes.ok)
230 # Path Computation success
231 def test_09_path_computation_100G_xpdr_bi(self):
232 response = test_utils.path_computation_request("request-1", "service-100GE",
233 {"service-rate": "100",
235 "service-format": "Ethernet",
236 "node-id": "XPDR-A2",
237 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2",
238 "port-name": "XPDR2-CLIENT1"}}},
239 {"service-rate": "100",
241 "service-format": "Ethernet",
242 "node-id": "XPDR-C2",
243 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2",
244 "port-name": "XPDR2-CLIENT1"}}})
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
248 self.assertIn('Path is calculated',
249 res['output']['configuration-response-common']['response-message'])
251 self.assertEqual(100, res['output']['response-parameters']['path-description']
252 ['aToZ-direction']['rate'])
253 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
254 ['aToZ-direction']['min-trib-slot'])
255 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
256 ['aToZ-direction']['max-trib-slot'])
257 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
258 ['aToZ-direction']['modulation-format'])
260 self.assertEqual(100, res['output']['response-parameters']['path-description']
261 ['zToA-direction']['rate'])
262 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
263 ['zToA-direction']['min-trib-slot'])
264 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
265 ['zToA-direction']['max-trib-slot'])
266 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
267 ['zToA-direction']['modulation-format'])
271 if __name__ == "__main__":
272 unittest.main(verbosity=2)