2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
23 class TransportPCE400Gtesting(unittest.TestCase):
25 simple_topo_bi_dir_data = None
26 port_mapping_data = None
32 sample_files_parsed = False
33 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
34 "..", "..", "sample_configs", "honeynode-topo400G.json")
35 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
36 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
38 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
39 "..", "..", "sample_configs", "pce_portmapping_71.json")
40 with open(PORT_MAPPING_FILE, 'r') as port_mapping:
41 cls.port_mapping_data = port_mapping.read()
42 sample_files_parsed = True
43 except PermissionError as err:
44 print("Permission Error when trying to read sample files\n", err)
46 except FileNotFoundError as err:
47 print("File Not found Error when trying to read sample files\n", err)
50 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
53 if sample_files_parsed:
54 print("sample files content loaded")
55 cls.processes = test_utils.start_tpce()
58 def tearDownClass(cls):
59 # pylint: disable=not-an-iterable
60 for process in cls.processes:
61 test_utils.shutdown_process(process)
62 print("all processes killed")
64 def setUp(self): # instruction executed before each test method
65 print("execution of {}".format(self.id().split(".")[-1]))
69 def test_01_load_port_mapping(self):
70 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
71 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
74 # Load simple bidirectional topology
75 def test_02_load_simple_topology_bi(self):
76 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
77 self.assertEqual(response.status_code, requests.codes.ok)
80 # Path Computation success
81 def test_03_path_computation_xpdr_bi(self):
82 response = test_utils.path_computation_request("request-1", "service-1",
83 {"node-id": "XPDR-A2", "service-rate": "400",
84 "service-format": "Ethernet", "clli": "nodeA"},
85 {"node-id": "XPDR-C2", "service-rate": "400",
86 "service-format": "Ethernet", "clli": "nodeC"})
87 self.assertEqual(response.status_code, requests.codes.ok)
89 self.assertIn('Path is calculated',
90 res['output']['configuration-response-common']['response-message'])
92 self.assertEqual(1, res['output']['response-parameters']['path-description']
93 ['aToZ-direction']['aToZ-wavelength-number'])
94 self.assertEqual(400, res['output']['response-parameters']['path-description']
95 ['aToZ-direction']['rate'])
96 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
97 ['aToZ-direction']['aToZ-min-frequency'])
98 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
99 ['aToZ-direction']['aToZ-max-frequency'])
100 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
101 ['aToZ-direction']['modulation-format'])
103 self.assertEqual(1, res['output']['response-parameters']['path-description']
104 ['zToA-direction']['zToA-wavelength-number'])
105 self.assertEqual(400, res['output']['response-parameters']['path-description']
106 ['zToA-direction']['rate'])
107 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
108 ['zToA-direction']['zToA-min-frequency'])
109 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
110 ['zToA-direction']['zToA-max-frequency'])
111 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
112 ['zToA-direction']['modulation-format'])
115 # Test deleted complex topology
116 def test_04_test_topology_complex_deleted(self):
117 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
118 self.assertEqual(response.status_code, requests.codes.conflict)
122 def test_05_delete_port_mapping(self):
123 response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING)
124 self.assertEqual(response.status_code, requests.codes.ok)
128 if __name__ == "__main__":
129 unittest.main(verbosity=2)