2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=invalid-name
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCE400Gtesting(unittest.TestCase):
29 simple_topo_bi_dir_data = None
30 port_mapping_data = None
35 # pylint: disable=bare-except
37 sample_files_parsed = False
38 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
39 "..", "..", "sample_configs",
40 "honeynode-topo400G.json")
41 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
42 cls.topo_bi_dir_data = topo_bi_dir.read()
44 OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs",
46 "honeynode-otntopo400G.json")
47 with open(OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as otn_topo_bi_dir:
48 cls.otn_topo_bi_dir_data = otn_topo_bi_dir.read()
50 OTUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
51 "..", "..", "sample_configs",
52 "honeynode-otntopo400GwithOTUC4.json")
53 with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as otuc4_otn_topo_bi_dir:
54 cls.otuc4_otn_topo_bi_dir_data = otuc4_otn_topo_bi_dir.read()
56 ODUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
57 "..", "..", "sample_configs",
58 "honeynode-otntopo400GwithODUC4.json")
59 with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as oduc4_otn_topo_bi_dir:
60 cls.oduc4_otn_topo_bi_dir_data = oduc4_otn_topo_bi_dir.read()
62 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
63 "..", "..", "sample_configs",
64 "pce_portmapping_71.json")
65 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
66 cls.port_mapping_data = port_mapping.read()
67 sample_files_parsed = True
68 except PermissionError as err:
69 print("Permission Error when trying to read sample files\n", err)
71 except FileNotFoundError as err:
72 print("File Not found Error when trying to read sample files\n", err)
75 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
78 if sample_files_parsed:
79 print("sample files content loaded")
80 cls.processes = test_utils.start_tpce()
83 def tearDownClass(cls):
84 # pylint: disable=not-an-iterable
85 for process in cls.processes:
86 test_utils.shutdown_process(process)
87 print("all processes killed")
89 def setUp(self): # instruction executed before each test method
90 # pylint: disable=consider-using-f-string
91 print("execution of {}".format(self.id().split(".")[-1]))
95 def test_01_load_port_mapping(self):
96 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
97 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
100 # Load openroadm topology
101 def test_02_load_openroadm_topology_bi(self):
102 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.topo_bi_dir_data)
103 self.assertEqual(response.status_code, requests.codes.ok)
106 # Path Computation success
107 def test_03_path_computation_400G_xpdr_bi(self):
108 response = test_utils.path_computation_request("request-1", "service-1",
109 {"node-id": "XPDR-A2", "service-rate": "400",
110 "service-format": "Ethernet", "clli": "nodeA"},
111 {"node-id": "XPDR-C2", "service-rate": "400",
112 "service-format": "Ethernet", "clli": "nodeC"})
113 self.assertEqual(response.status_code, requests.codes.ok)
114 res = response.json()
115 self.assertIn('Path is calculated',
116 res['output']['configuration-response-common']['response-message'])
118 self.assertEqual(1, res['output']['response-parameters']['path-description']
119 ['aToZ-direction']['aToZ-wavelength-number'])
120 self.assertEqual(400, res['output']['response-parameters']['path-description']
121 ['aToZ-direction']['rate'])
122 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
123 ['aToZ-direction']['aToZ-min-frequency'])
124 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
125 ['aToZ-direction']['aToZ-max-frequency'])
126 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
127 ['aToZ-direction']['modulation-format'])
129 self.assertEqual(1, res['output']['response-parameters']['path-description']
130 ['zToA-direction']['zToA-wavelength-number'])
131 self.assertEqual(400, res['output']['response-parameters']['path-description']
132 ['zToA-direction']['rate'])
133 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
134 ['zToA-direction']['zToA-min-frequency'])
135 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
136 ['zToA-direction']['zToA-max-frequency'])
137 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
138 ['zToA-direction']['modulation-format'])
142 def test_04_load_otn_topology_bi(self):
143 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otn_topo_bi_dir_data)
144 self.assertEqual(response.status_code, requests.codes.ok)
147 # Path Computation success
148 def test_05_path_computation_OTUC4_xpdr_bi(self):
149 response = test_utils.path_computation_request("request-1", "service-OTUC4",
150 {"service-rate": "400",
152 "service-format": "OTU",
153 "node-id": "XPDR-A2",
154 "rx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
156 {"service-rate": "400",
158 "service-format": "OTU",
159 "node-id": "XPDR-C2",
160 "rx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Path is calculated',
165 res['output']['configuration-response-common']['response-message'])
167 self.assertEqual(1, res['output']['response-parameters']['path-description']
168 ['aToZ-direction']['aToZ-wavelength-number'])
169 self.assertEqual(400, res['output']['response-parameters']['path-description']
170 ['aToZ-direction']['rate'])
171 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
172 ['aToZ-direction']['aToZ-min-frequency'])
173 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
174 ['aToZ-direction']['aToZ-max-frequency'])
175 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
176 ['aToZ-direction']['modulation-format'])
178 self.assertEqual(1, res['output']['response-parameters']['path-description']
179 ['zToA-direction']['zToA-wavelength-number'])
180 self.assertEqual(400, res['output']['response-parameters']['path-description']
181 ['zToA-direction']['rate'])
182 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
183 ['zToA-direction']['zToA-min-frequency'])
184 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
185 ['zToA-direction']['zToA-max-frequency'])
186 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
187 ['zToA-direction']['modulation-format'])
190 # Load otn topology with OTUC4 links
191 def test_06_load_otuc4_otn_topology_bi(self):
192 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otuc4_otn_topo_bi_dir_data)
193 self.assertEqual(response.status_code, requests.codes.ok)
196 # Path Computation success
197 def test_07_path_computation_ODUC4_xpdr_bi(self):
198 response = test_utils.path_computation_request("request-1", "service-ODUC4",
199 {"service-rate": "400",
201 "service-format": "ODU",
202 "node-id": "XPDR-A2",
203 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
205 {"service-rate": "400",
207 "service-format": "ODU",
208 "node-id": "XPDR-C2",
209 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 self.assertIn('Path is calculated',
214 res['output']['configuration-response-common']['response-message'])
216 self.assertEqual(400, res['output']['response-parameters']['path-description']
217 ['aToZ-direction']['rate'])
218 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
219 ['aToZ-direction']['modulation-format'])
221 self.assertEqual(400, res['output']['response-parameters']['path-description']
222 ['zToA-direction']['rate'])
223 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
224 ['zToA-direction']['modulation-format'])
227 # Load otn topology with OTUC4 links
228 def test_08_load_oduc4_otn_topology_bi(self):
229 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.oduc4_otn_topo_bi_dir_data)
230 self.assertEqual(response.status_code, requests.codes.ok)
233 # Path Computation success
234 def test_09_path_computation_100G_xpdr_bi(self):
235 response = test_utils.path_computation_request("request-1", "service-100GE",
236 {"service-rate": "100",
238 "service-format": "Ethernet",
239 "node-id": "XPDR-A2",
240 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2",
241 "port-name": "XPDR2-CLIENT1"}}},
242 {"service-rate": "100",
244 "service-format": "Ethernet",
245 "node-id": "XPDR-C2",
246 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2",
247 "port-name": "XPDR2-CLIENT1"}}})
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Path is calculated',
252 res['output']['configuration-response-common']['response-message'])
254 self.assertEqual(100, res['output']['response-parameters']['path-description']
255 ['aToZ-direction']['rate'])
256 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
257 ['aToZ-direction']['min-trib-slot'])
258 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
259 ['aToZ-direction']['max-trib-slot'])
260 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
261 ['aToZ-direction']['modulation-format'])
263 self.assertEqual(100, res['output']['response-parameters']['path-description']
264 ['zToA-direction']['rate'])
265 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
266 ['zToA-direction']['min-trib-slot'])
267 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
268 ['zToA-direction']['max-trib-slot'])
269 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
270 ['zToA-direction']['modulation-format'])
274 if __name__ == "__main__":
275 unittest.main(verbosity=2)