2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
23 class TransportPCE400Gtesting(unittest.TestCase):
25 simple_topo_bi_dir_data = None
26 port_mapping_data = None
32 sample_files_parsed = False
33 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
34 "..", "..", "sample_configs", "honeynode-topo400G.json")
35 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
36 cls.topo_bi_dir_data = topo_bi_dir.read()
38 OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
39 "..", "..", "sample_configs", "honeynode-otntopo400G.json")
40 with open(OTN_TOPO_BI_DIR_FILE, 'r') as otn_topo_bi_dir:
41 cls.otn_topo_bi_dir_data = otn_topo_bi_dir.read()
43 OTUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
44 "..", "..", "sample_configs", "honeynode-otntopo400GwithOTUC4.json")
45 with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r') as otuc4_otn_topo_bi_dir:
46 cls.otuc4_otn_topo_bi_dir_data = otuc4_otn_topo_bi_dir.read()
48 ODUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
49 "..", "..", "sample_configs", "honeynode-otntopo400GwithODUC4.json")
50 with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r') as oduc4_otn_topo_bi_dir:
51 cls.oduc4_otn_topo_bi_dir_data = oduc4_otn_topo_bi_dir.read()
53 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
54 "..", "..", "sample_configs", "pce_portmapping_71.json")
55 with open(PORT_MAPPING_FILE, 'r') as port_mapping:
56 cls.port_mapping_data = port_mapping.read()
57 sample_files_parsed = True
58 except PermissionError as err:
59 print("Permission Error when trying to read sample files\n", err)
61 except FileNotFoundError as err:
62 print("File Not found Error when trying to read sample files\n", err)
65 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
68 if sample_files_parsed:
69 print("sample files content loaded")
70 cls.processes = test_utils.start_tpce()
73 def tearDownClass(cls):
74 # pylint: disable=not-an-iterable
75 for process in cls.processes:
76 test_utils.shutdown_process(process)
77 print("all processes killed")
79 def setUp(self): # instruction executed before each test method
80 print("execution of {}".format(self.id().split(".")[-1]))
84 def test_01_load_port_mapping(self):
85 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
86 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
89 # Load openroadm topology
90 def test_02_load_openroadm_topology_bi(self):
91 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.topo_bi_dir_data)
92 self.assertEqual(response.status_code, requests.codes.ok)
95 # Path Computation success
96 def test_03_path_computation_400G_xpdr_bi(self):
97 response = test_utils.path_computation_request("request-1", "service-1",
98 {"node-id": "XPDR-A2", "service-rate": "400",
99 "service-format": "Ethernet", "clli": "nodeA"},
100 {"node-id": "XPDR-C2", "service-rate": "400",
101 "service-format": "Ethernet", "clli": "nodeC"})
102 self.assertEqual(response.status_code, requests.codes.ok)
103 res = response.json()
104 self.assertIn('Path is calculated',
105 res['output']['configuration-response-common']['response-message'])
107 self.assertEqual(1, res['output']['response-parameters']['path-description']
108 ['aToZ-direction']['aToZ-wavelength-number'])
109 self.assertEqual(400, res['output']['response-parameters']['path-description']
110 ['aToZ-direction']['rate'])
111 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
112 ['aToZ-direction']['aToZ-min-frequency'])
113 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
114 ['aToZ-direction']['aToZ-max-frequency'])
115 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
116 ['aToZ-direction']['modulation-format'])
118 self.assertEqual(1, res['output']['response-parameters']['path-description']
119 ['zToA-direction']['zToA-wavelength-number'])
120 self.assertEqual(400, res['output']['response-parameters']['path-description']
121 ['zToA-direction']['rate'])
122 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
123 ['zToA-direction']['zToA-min-frequency'])
124 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
125 ['zToA-direction']['zToA-max-frequency'])
126 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
127 ['zToA-direction']['modulation-format'])
131 def test_04_load_otn_topology_bi(self):
132 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otn_topo_bi_dir_data)
133 self.assertEqual(response.status_code, requests.codes.ok)
136 # Path Computation success
137 def test_05_path_computation_OTUC4_xpdr_bi(self):
138 response = test_utils.path_computation_request("request-1", "service-OTUC4",
139 {"service-rate": "400", "clli": "NodeA",
140 "service-format": "OTU", "node-id": "XPDR-A2",
141 "rx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
143 {"service-rate": "400", "clli": "NodeC",
144 "service-format": "OTU", "node-id": "XPDR-C2",
145 "rx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 self.assertIn('Path is calculated',
150 res['output']['configuration-response-common']['response-message'])
152 self.assertEqual(1, res['output']['response-parameters']['path-description']
153 ['aToZ-direction']['aToZ-wavelength-number'])
154 self.assertEqual(400, res['output']['response-parameters']['path-description']
155 ['aToZ-direction']['rate'])
156 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
157 ['aToZ-direction']['aToZ-min-frequency'])
158 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
159 ['aToZ-direction']['aToZ-max-frequency'])
160 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
161 ['aToZ-direction']['modulation-format'])
163 self.assertEqual(1, res['output']['response-parameters']['path-description']
164 ['zToA-direction']['zToA-wavelength-number'])
165 self.assertEqual(400, res['output']['response-parameters']['path-description']
166 ['zToA-direction']['rate'])
167 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
168 ['zToA-direction']['zToA-min-frequency'])
169 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
170 ['zToA-direction']['zToA-max-frequency'])
171 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
172 ['zToA-direction']['modulation-format'])
175 # Load otn topology with OTUC4 links
176 def test_06_load_otuc4_otn_topology_bi(self):
177 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otuc4_otn_topo_bi_dir_data)
178 self.assertEqual(response.status_code, requests.codes.ok)
181 # Path Computation success
182 def test_07_path_computation_ODUC4_xpdr_bi(self):
183 response = test_utils.path_computation_request("request-1", "service-ODUC4",
184 {"service-rate": "400", "clli": "NodeA", "service-format": "ODU",
185 "node-id": "XPDR-A2",
186 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
188 {"service-rate": "400", "clli": "NodeC", "service-format": "ODU",
189 "node-id": "XPDR-C2",
190 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
192 self.assertEqual(response.status_code, requests.codes.ok)
193 res = response.json()
194 self.assertIn('Path is calculated',
195 res['output']['configuration-response-common']['response-message'])
197 self.assertEqual(400, res['output']['response-parameters']['path-description']
198 ['aToZ-direction']['rate'])
199 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
200 ['aToZ-direction']['modulation-format'])
202 self.assertEqual(400, res['output']['response-parameters']['path-description']
203 ['zToA-direction']['rate'])
204 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
205 ['zToA-direction']['modulation-format'])
208 # Load otn topology with OTUC4 links
209 def test_08_load_oduc4_otn_topology_bi(self):
210 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.oduc4_otn_topo_bi_dir_data)
211 self.assertEqual(response.status_code, requests.codes.ok)
214 # Path Computation success
215 def test_09_path_computation_100G_xpdr_bi(self):
216 response = test_utils.path_computation_request("request-1", "service-100GE",
217 {"service-rate": "100", "clli": "NodeA", "service-format": "Ethernet",
218 "node-id": "XPDR-A2",
219 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2",
220 "port-name": "XPDR2-CLIENT1"}}},
221 {"service-rate": "100", "clli": "NodeC", "service-format": "Ethernet",
222 "node-id": "XPDR-C2",
223 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2",
224 "port-name": "XPDR2-CLIENT1"}}})
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('Path is calculated',
229 res['output']['configuration-response-common']['response-message'])
231 self.assertEqual(100, res['output']['response-parameters']['path-description']
232 ['aToZ-direction']['rate'])
233 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
234 ['aToZ-direction']['min-trib-slot'])
235 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
236 ['aToZ-direction']['max-trib-slot'])
237 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
238 ['aToZ-direction']['modulation-format'])
240 self.assertEqual(100, res['output']['response-parameters']['path-description']
241 ['zToA-direction']['rate'])
242 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
243 ['zToA-direction']['min-trib-slot'])
244 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
245 ['zToA-direction']['max-trib-slot'])
246 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
247 ['zToA-direction']['modulation-format'])
250 if __name__ == "__main__":
251 unittest.main(verbosity=2)