2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=invalid-name
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCE400Gtesting(unittest.TestCase):
29 simple_topo_bi_dir_data = None
30 port_mapping_data = None
36 sample_files_parsed = False
37 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
38 "..", "..", "sample_configs",
39 "honeynode-topo400G.json")
40 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
41 cls.topo_bi_dir_data = topo_bi_dir.read()
43 OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
44 "..", "..", "sample_configs",
45 "honeynode-otntopo400G.json")
46 with open(OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as otn_topo_bi_dir:
47 cls.otn_topo_bi_dir_data = otn_topo_bi_dir.read()
49 OTUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
50 "..", "..", "sample_configs",
51 "honeynode-otntopo400GwithOTUC4.json")
52 with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as otuc4_otn_topo_bi_dir:
53 cls.otuc4_otn_topo_bi_dir_data = otuc4_otn_topo_bi_dir.read()
55 ODUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
56 "..", "..", "sample_configs",
57 "honeynode-otntopo400GwithODUC4.json")
58 with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as oduc4_otn_topo_bi_dir:
59 cls.oduc4_otn_topo_bi_dir_data = oduc4_otn_topo_bi_dir.read()
61 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
62 "..", "..", "sample_configs",
63 "pce_portmapping_71.json")
64 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
65 cls.port_mapping_data = port_mapping.read()
66 sample_files_parsed = True
67 except PermissionError as err:
68 print("Permission Error when trying to read sample files\n", err)
70 except FileNotFoundError as err:
71 print("File Not found Error when trying to read sample files\n", err)
74 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
77 if sample_files_parsed:
78 print("sample files content loaded")
79 cls.processes = test_utils.start_tpce()
82 def tearDownClass(cls):
83 # pylint: disable=not-an-iterable
84 for process in cls.processes:
85 test_utils.shutdown_process(process)
86 print("all processes killed")
88 def setUp(self): # instruction executed before each test method
89 # pylint: disable=consider-using-f-string
90 print("execution of {}".format(self.id().split(".")[-1]))
94 def test_01_load_port_mapping(self):
95 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
96 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
99 # Load openroadm topology
100 def test_02_load_openroadm_topology_bi(self):
101 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.topo_bi_dir_data)
102 self.assertEqual(response.status_code, requests.codes.ok)
105 # Path Computation success
106 def test_03_path_computation_400G_xpdr_bi(self):
107 response = test_utils.path_computation_request("request-1", "service-1",
108 {"node-id": "XPDR-A2", "service-rate": "400",
109 "service-format": "Ethernet", "clli": "nodeA"},
110 {"node-id": "XPDR-C2", "service-rate": "400",
111 "service-format": "Ethernet", "clli": "nodeC"})
112 self.assertEqual(response.status_code, requests.codes.ok)
113 res = response.json()
114 self.assertIn('Path is calculated',
115 res['output']['configuration-response-common']['response-message'])
117 self.assertEqual(1, res['output']['response-parameters']['path-description']
118 ['aToZ-direction']['aToZ-wavelength-number'])
119 self.assertEqual(400, res['output']['response-parameters']['path-description']
120 ['aToZ-direction']['rate'])
121 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
122 ['aToZ-direction']['aToZ-min-frequency'])
123 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
124 ['aToZ-direction']['aToZ-max-frequency'])
125 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
126 ['aToZ-direction']['modulation-format'])
128 self.assertEqual(1, res['output']['response-parameters']['path-description']
129 ['zToA-direction']['zToA-wavelength-number'])
130 self.assertEqual(400, res['output']['response-parameters']['path-description']
131 ['zToA-direction']['rate'])
132 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
133 ['zToA-direction']['zToA-min-frequency'])
134 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
135 ['zToA-direction']['zToA-max-frequency'])
136 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
137 ['zToA-direction']['modulation-format'])
141 def test_04_load_otn_topology_bi(self):
142 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otn_topo_bi_dir_data)
143 self.assertEqual(response.status_code, requests.codes.ok)
146 # Path Computation success
147 def test_05_path_computation_OTUC4_xpdr_bi(self):
148 response = test_utils.path_computation_request("request-1", "service-OTUC4",
149 {"service-rate": "400",
151 "service-format": "OTU",
152 "node-id": "XPDR-A2",
153 "rx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
155 {"service-rate": "400",
157 "service-format": "OTU",
158 "node-id": "XPDR-C2",
159 "rx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Path is calculated',
164 res['output']['configuration-response-common']['response-message'])
166 self.assertEqual(1, res['output']['response-parameters']['path-description']
167 ['aToZ-direction']['aToZ-wavelength-number'])
168 self.assertEqual(400, res['output']['response-parameters']['path-description']
169 ['aToZ-direction']['rate'])
170 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
171 ['aToZ-direction']['aToZ-min-frequency'])
172 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
173 ['aToZ-direction']['aToZ-max-frequency'])
174 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
175 ['aToZ-direction']['modulation-format'])
177 self.assertEqual(1, res['output']['response-parameters']['path-description']
178 ['zToA-direction']['zToA-wavelength-number'])
179 self.assertEqual(400, res['output']['response-parameters']['path-description']
180 ['zToA-direction']['rate'])
181 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
182 ['zToA-direction']['zToA-min-frequency'])
183 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
184 ['zToA-direction']['zToA-max-frequency'])
185 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
186 ['zToA-direction']['modulation-format'])
189 # Load otn topology with OTUC4 links
190 def test_06_load_otuc4_otn_topology_bi(self):
191 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otuc4_otn_topo_bi_dir_data)
192 self.assertEqual(response.status_code, requests.codes.ok)
195 # Path Computation success
196 def test_07_path_computation_ODUC4_xpdr_bi(self):
197 response = test_utils.path_computation_request("request-1", "service-ODUC4",
198 {"service-rate": "400",
200 "service-format": "ODU",
201 "node-id": "XPDR-A2",
202 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
204 {"service-rate": "400",
206 "service-format": "ODU",
207 "node-id": "XPDR-C2",
208 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
210 self.assertEqual(response.status_code, requests.codes.ok)
211 res = response.json()
212 self.assertIn('Path is calculated',
213 res['output']['configuration-response-common']['response-message'])
215 self.assertEqual(400, res['output']['response-parameters']['path-description']
216 ['aToZ-direction']['rate'])
217 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
218 ['aToZ-direction']['modulation-format'])
220 self.assertEqual(400, res['output']['response-parameters']['path-description']
221 ['zToA-direction']['rate'])
222 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
223 ['zToA-direction']['modulation-format'])
226 # Load otn topology with OTUC4 links
227 def test_08_load_oduc4_otn_topology_bi(self):
228 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.oduc4_otn_topo_bi_dir_data)
229 self.assertEqual(response.status_code, requests.codes.ok)
232 # Path Computation success
233 def test_09_path_computation_100G_xpdr_bi(self):
234 response = test_utils.path_computation_request("request-1", "service-100GE",
235 {"service-rate": "100",
237 "service-format": "Ethernet",
238 "node-id": "XPDR-A2",
239 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2",
240 "port-name": "XPDR2-CLIENT1"}}},
241 {"service-rate": "100",
243 "service-format": "Ethernet",
244 "node-id": "XPDR-C2",
245 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2",
246 "port-name": "XPDR2-CLIENT1"}}})
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 self.assertIn('Path is calculated',
251 res['output']['configuration-response-common']['response-message'])
253 self.assertEqual(100, res['output']['response-parameters']['path-description']
254 ['aToZ-direction']['rate'])
255 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
256 ['aToZ-direction']['min-trib-slot'])
257 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
258 ['aToZ-direction']['max-trib-slot'])
259 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
260 ['aToZ-direction']['modulation-format'])
262 self.assertEqual(100, res['output']['response-parameters']['path-description']
263 ['zToA-direction']['rate'])
264 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
265 ['zToA-direction']['min-trib-slot'])
266 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
267 ['zToA-direction']['max-trib-slot'])
268 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
269 ['zToA-direction']['modulation-format'])
273 if __name__ == "__main__":
274 unittest.main(verbosity=2)