2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
22 class TransportPCE400Gtesting(unittest.TestCase):
24 simple_topo_bi_dir_data = None
25 port_mapping_data = None
31 sample_files_parsed = False
32 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
33 "..", "..", "sample_configs", "honeynode-topo400G.json")
34 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
35 cls.topo_bi_dir_data = topo_bi_dir.read()
37 OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
38 "..", "..", "sample_configs", "honeynode-otntopo400G.json")
39 with open(OTN_TOPO_BI_DIR_FILE, 'r') as otn_topo_bi_dir:
40 cls.otn_topo_bi_dir_data = otn_topo_bi_dir.read()
42 OTUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
43 "..", "..", "sample_configs", "honeynode-otntopo400GwithOTUC4.json")
44 with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r') as otuc4_otn_topo_bi_dir:
45 cls.otuc4_otn_topo_bi_dir_data = otuc4_otn_topo_bi_dir.read()
47 ODUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
48 "..", "..", "sample_configs", "honeynode-otntopo400GwithODUC4.json")
49 with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r') as oduc4_otn_topo_bi_dir:
50 cls.oduc4_otn_topo_bi_dir_data = oduc4_otn_topo_bi_dir.read()
52 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
53 "..", "..", "sample_configs", "pce_portmapping_71.json")
54 with open(PORT_MAPPING_FILE, 'r') as port_mapping:
55 cls.port_mapping_data = port_mapping.read()
56 sample_files_parsed = True
57 except PermissionError as err:
58 print("Permission Error when trying to read sample files\n", err)
60 except FileNotFoundError as err:
61 print("File Not found Error when trying to read sample files\n", err)
64 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
67 if sample_files_parsed:
68 print("sample files content loaded")
69 cls.processes = test_utils.start_tpce()
72 def tearDownClass(cls):
73 # pylint: disable=not-an-iterable
74 for process in cls.processes:
75 test_utils.shutdown_process(process)
76 print("all processes killed")
78 def setUp(self): # instruction executed before each test method
79 print("execution of {}".format(self.id().split(".")[-1]))
83 def test_01_load_port_mapping(self):
84 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
85 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
88 # Load openroadm topology
89 def test_02_load_openroadm_topology_bi(self):
90 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.topo_bi_dir_data)
91 self.assertEqual(response.status_code, requests.codes.ok)
94 # Path Computation success
95 def test_03_path_computation_400G_xpdr_bi(self):
96 response = test_utils.path_computation_request("request-1", "service-1",
97 {"node-id": "XPDR-A2", "service-rate": "400",
98 "service-format": "Ethernet", "clli": "nodeA"},
99 {"node-id": "XPDR-C2", "service-rate": "400",
100 "service-format": "Ethernet", "clli": "nodeC"})
101 self.assertEqual(response.status_code, requests.codes.ok)
102 res = response.json()
103 self.assertIn('Path is calculated',
104 res['output']['configuration-response-common']['response-message'])
106 self.assertEqual(1, res['output']['response-parameters']['path-description']
107 ['aToZ-direction']['aToZ-wavelength-number'])
108 self.assertEqual(400, res['output']['response-parameters']['path-description']
109 ['aToZ-direction']['rate'])
110 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
111 ['aToZ-direction']['aToZ-min-frequency'])
112 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
113 ['aToZ-direction']['aToZ-max-frequency'])
114 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
115 ['aToZ-direction']['modulation-format'])
117 self.assertEqual(1, res['output']['response-parameters']['path-description']
118 ['zToA-direction']['zToA-wavelength-number'])
119 self.assertEqual(400, res['output']['response-parameters']['path-description']
120 ['zToA-direction']['rate'])
121 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
122 ['zToA-direction']['zToA-min-frequency'])
123 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
124 ['zToA-direction']['zToA-max-frequency'])
125 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
126 ['zToA-direction']['modulation-format'])
130 def test_04_load_otn_topology_bi(self):
131 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otn_topo_bi_dir_data)
132 self.assertEqual(response.status_code, requests.codes.ok)
135 # Path Computation success
136 def test_05_path_computation_OTUC4_xpdr_bi(self):
137 response = test_utils.path_computation_request("request-1", "service-OTUC4",
138 {"service-rate": "400", "clli": "NodeA",
139 "service-format": "OTU", "node-id": "XPDR-A2",
140 "rx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
142 {"service-rate": "400", "clli": "NodeC",
143 "service-format": "OTU", "node-id": "XPDR-C2",
144 "rx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
146 self.assertEqual(response.status_code, requests.codes.ok)
147 res = response.json()
148 self.assertIn('Path is calculated',
149 res['output']['configuration-response-common']['response-message'])
151 self.assertEqual(1, res['output']['response-parameters']['path-description']
152 ['aToZ-direction']['aToZ-wavelength-number'])
153 self.assertEqual(400, res['output']['response-parameters']['path-description']
154 ['aToZ-direction']['rate'])
155 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
156 ['aToZ-direction']['aToZ-min-frequency'])
157 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
158 ['aToZ-direction']['aToZ-max-frequency'])
159 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
160 ['aToZ-direction']['modulation-format'])
162 self.assertEqual(1, res['output']['response-parameters']['path-description']
163 ['zToA-direction']['zToA-wavelength-number'])
164 self.assertEqual(400, res['output']['response-parameters']['path-description']
165 ['zToA-direction']['rate'])
166 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
167 ['zToA-direction']['zToA-min-frequency'])
168 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
169 ['zToA-direction']['zToA-max-frequency'])
170 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
171 ['zToA-direction']['modulation-format'])
174 # Load otn topology with OTUC4 links
175 def test_06_load_otuc4_otn_topology_bi(self):
176 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.otuc4_otn_topo_bi_dir_data)
177 self.assertEqual(response.status_code, requests.codes.ok)
180 # Path Computation success
181 def test_07_path_computation_ODUC4_xpdr_bi(self):
182 response = test_utils.path_computation_request("request-1", "service-ODUC4",
183 {"service-rate": "400", "clli": "NodeA", "service-format": "ODU",
184 "node-id": "XPDR-A2",
185 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2"}}
187 {"service-rate": "400", "clli": "NodeC", "service-format": "ODU",
188 "node-id": "XPDR-C2",
189 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2"}}
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
193 self.assertIn('Path is calculated',
194 res['output']['configuration-response-common']['response-message'])
196 self.assertEqual(400, res['output']['response-parameters']['path-description']
197 ['aToZ-direction']['rate'])
198 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
199 ['aToZ-direction']['modulation-format'])
201 self.assertEqual(400, res['output']['response-parameters']['path-description']
202 ['zToA-direction']['rate'])
203 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
204 ['zToA-direction']['modulation-format'])
207 # Load otn topology with OTUC4 links
208 def test_08_load_oduc4_otn_topology_bi(self):
209 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_OTN_TOPO, self.oduc4_otn_topo_bi_dir_data)
210 self.assertEqual(response.status_code, requests.codes.ok)
213 # Path Computation success
214 def test_09_path_computation_100G_xpdr_bi(self):
215 response = test_utils.path_computation_request("request-1", "service-100GE",
216 {"service-rate": "100", "clli": "NodeA", "service-format": "Ethernet",
217 "node-id": "XPDR-A2",
218 "tx-direction": {"port": {"port-device-name": "XPDR-A2-XPDR2",
219 "port-name": "XPDR2-CLIENT1"}}},
220 {"service-rate": "100", "clli": "NodeC", "service-format": "Ethernet",
221 "node-id": "XPDR-C2",
222 "tx-direction": {"port": {"port-device-name": "XPDR-C2-XPDR2",
223 "port-name": "XPDR2-CLIENT1"}}})
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 self.assertIn('Path is calculated',
228 res['output']['configuration-response-common']['response-message'])
230 self.assertEqual(100, res['output']['response-parameters']['path-description']
231 ['aToZ-direction']['rate'])
232 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
233 ['aToZ-direction']['min-trib-slot'])
234 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
235 ['aToZ-direction']['max-trib-slot'])
236 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
237 ['aToZ-direction']['modulation-format'])
239 self.assertEqual(100, res['output']['response-parameters']['path-description']
240 ['zToA-direction']['rate'])
241 self.assertEqual('1.1', res['output']['response-parameters']['path-description']
242 ['zToA-direction']['min-trib-slot'])
243 self.assertEqual('1.20', res['output']['response-parameters']['path-description']
244 ['zToA-direction']['max-trib-slot'])
245 self.assertEqual('dp-qpsk', res['output']['response-parameters']['path-description']
246 ['zToA-direction']['modulation-format'])
250 if __name__ == "__main__":
251 unittest.main(verbosity=2)