2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 from common import test_utils
22 class TransportPCE400Gtesting(unittest.TestCase):
24 simple_topo_bi_dir_data = None
25 port_mapping_data = None
31 sample_files_parsed = False
32 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
33 "..", "..", "sample_configs", "honeynode-topo400G.json")
34 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
35 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
37 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
38 "..", "..", "sample_configs", "pce_portmapping_71.json")
39 with open(PORT_MAPPING_FILE, 'r') as port_mapping:
40 cls.port_mapping_data = port_mapping.read()
41 sample_files_parsed = True
42 except PermissionError as err:
43 print("Permission Error when trying to read sample files\n", err)
45 except FileNotFoundError as err:
46 print("File Not found Error when trying to read sample files\n", err)
49 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
52 if sample_files_parsed:
53 print("sample files content loaded")
54 cls.processes = test_utils.start_tpce()
57 def tearDownClass(cls):
58 # pylint: disable=not-an-iterable
59 for process in cls.processes:
60 test_utils.shutdown_process(process)
61 print("all processes killed")
63 def setUp(self): # instruction executed before each test method
64 print("execution of {}".format(self.id().split(".")[-1]))
68 def test_01_load_port_mapping(self):
69 response = test_utils.put_jsonrequest(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
70 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.created))
73 # Load simple bidirectional topology
74 def test_02_load_simple_topology_bi(self):
75 response = test_utils.put_jsonrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
76 self.assertEqual(response.status_code, requests.codes.ok)
79 # Path Computation success
80 def test_03_path_computation_xpdr_bi(self):
81 response = test_utils.path_computation_request("request-1", "service-1",
82 {"node-id": "XPDR-A2", "service-rate": "400",
83 "service-format": "Ethernet", "clli": "nodeA"},
84 {"node-id": "XPDR-C2", "service-rate": "400",
85 "service-format": "Ethernet", "clli": "nodeC"})
86 self.assertEqual(response.status_code, requests.codes.ok)
88 self.assertIn('Path is calculated',
89 res['output']['configuration-response-common']['response-message'])
91 self.assertEqual(1, res['output']['response-parameters']['path-description']
92 ['aToZ-direction']['aToZ-wavelength-number'])
93 self.assertEqual(400, res['output']['response-parameters']['path-description']
94 ['aToZ-direction']['rate'])
95 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
96 ['aToZ-direction']['aToZ-min-frequency'])
97 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
98 ['aToZ-direction']['aToZ-max-frequency'])
99 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
100 ['aToZ-direction']['modulation-format'])
102 self.assertEqual(1, res['output']['response-parameters']['path-description']
103 ['zToA-direction']['zToA-wavelength-number'])
104 self.assertEqual(400, res['output']['response-parameters']['path-description']
105 ['zToA-direction']['rate'])
106 self.assertEqual(196.0375, res['output']['response-parameters']['path-description']
107 ['zToA-direction']['zToA-min-frequency'])
108 self.assertEqual(196.12500, res['output']['response-parameters']['path-description']
109 ['zToA-direction']['zToA-max-frequency'])
110 self.assertEqual('dp-qam16', res['output']['response-parameters']['path-description']
111 ['zToA-direction']['modulation-format'])
114 # Test deleted complex topology
115 def test_04_test_topology_complex_deleted(self):
116 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
117 self.assertEqual(response.status_code, requests.codes.conflict)
121 def test_05_delete_port_mapping(self):
122 response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING)
123 self.assertEqual(response.status_code, requests.codes.ok)
127 if __name__ == "__main__":
128 unittest.main(verbosity=2)