2 ##############################################################################
3 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
19 RESTCONF_BASE_URL = "http://localhost:8181/restconf"
21 CODE_SHOULD_BE_200 = 'Http status code should be 200'
23 CODE_SHOULD_BE_201 = 'Http status code should be 201'
25 CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
28 class TransportTapitesting(unittest.TestCase):
34 cls.init_failed = False
36 cls.processes = test_utils.start_tpce()
37 # TAPI feature is not installed by default in Karaf
38 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
39 print("installing tapi feature...")
40 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
41 if result.returncode != 0:
42 cls.init_failed = True
43 print("Restarting opendaylight...")
44 test_utils.shutdown_process(cls.processes[0])
45 cls.processes = test_utils.start_tpce()
46 cls.init_failed = not test_utils.wait_until_log_contains(
47 test_utils.karaf_log, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
49 print("tapi installaiton feature failed...")
50 test_utils.shutdown_process(cls.odl_processes[0])
52 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc', 'spdrav2'])
55 def tearDownClass(cls):
56 for process in cls.processes:
57 test_utils.shutdown_process(process)
58 print("all processes killed")
60 def setUp(self): # instruction executed before each test method
62 self.fail('Feature installation failed')
63 print("execution of {}".format(self.id().split(".")[-1]))
65 # connect netconf devices
67 def test_00_connect_spdr_sa1(self):
68 url = ("{}/config/network-topology:"
69 "network-topology/topology/topology-netconf/node/SPDR-SA1"
70 .format(RESTCONF_BASE_URL))
71 data = test_utils.generate_connect_data("SPDR-SA1", test_utils.sims['spdrav2']['port'])
72 response = test_utils.put_request(url, data, 'admin', 'admin')
73 self.assertEqual(response.status_code, requests.codes.created, CODE_SHOULD_BE_201) # pylint: disable=no-member
76 def test_01_connect_xpdra(self):
77 url = ("{}/config/network-topology:"
78 "network-topology/topology/topology-netconf/node/XPDR-A1"
79 .format(RESTCONF_BASE_URL))
80 data = test_utils.generate_connect_data("XPDR-A1", test_utils.sims['xpdra']['port'])
81 response = test_utils.put_request(url, data, 'admin', 'admin')
82 self.assertEqual(response.status_code, requests.codes.created, CODE_SHOULD_BE_201) # pylint: disable=no-member
85 def test_02_connect_xpdrc(self):
86 url = ("{}/config/network-topology:"
87 "network-topology/topology/topology-netconf/node/XPDR-C1"
88 .format(RESTCONF_BASE_URL))
89 data = test_utils.generate_connect_data("XPDR-C1", test_utils.sims['xpdrc']['port'])
90 response = test_utils.put_request(url, data, 'admin', 'admin')
91 self.assertEqual(response.status_code, requests.codes.created, CODE_SHOULD_BE_201) # pylint: disable=no-member
94 def test_03_connect_rdma(self):
95 url = ("{}/config/network-topology:"
96 "network-topology/topology/topology-netconf/node/ROADM-A1"
97 .format(RESTCONF_BASE_URL))
98 data = test_utils.generate_connect_data("ROADM-A1", test_utils.sims['roadma']['port'])
99 response = test_utils.put_request(url, data, 'admin', 'admin')
100 self.assertEqual(response.status_code, requests.codes.created, CODE_SHOULD_BE_201) # pylint: disable=no-member
103 def test_04_connect_rdmc(self):
104 url = ("{}/config/network-topology:"
105 "network-topology/topology/topology-netconf/node/ROADM-C1"
106 .format(RESTCONF_BASE_URL))
107 data = test_utils.generate_connect_data("ROADM-C1", test_utils.sims['roadmc']['port'])
108 response = test_utils.put_request(url, data, 'admin', 'admin')
109 self.assertEqual(response.status_code, requests.codes.created, CODE_SHOULD_BE_201) # pylint: disable=no-member
112 def test_05_connect_xprda_n1_to_roadma_pp1(self):
113 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(RESTCONF_BASE_URL)
114 data = test_utils.generate_link_data("XPDR-A1", "1", "1", "ROADM-A1", "1", "SRG1-PP1-TXRX")
115 response = test_utils.post_request(url, data, 'admin', 'admin')
116 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
117 res = response.json()
118 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
119 CREATED_SUCCESSFULLY)
122 def test_06_connect_roadma_pp1_to_xpdra_n1(self):
123 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(RESTCONF_BASE_URL)
124 data = test_utils.generate_link_data("XPDR-A1", "1", "1", "ROADM-A1", "1", "SRG1-PP1-TXRX")
125 response = test_utils.post_request(url, data, 'admin', 'admin')
126 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
127 res = response.json()
128 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
129 CREATED_SUCCESSFULLY)
132 def test_07_connect_xprdc_n1_to_roadmc_pp1(self):
133 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(RESTCONF_BASE_URL)
134 data = test_utils.generate_link_data("XPDR-C1", "1", "1", "ROADM-C1", "1", "SRG1-PP1-TXRX")
135 response = test_utils.post_request(url, data, 'admin', 'admin')
136 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
137 res = response.json()
138 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
139 CREATED_SUCCESSFULLY)
142 def test_08_connect_roadmc_pp1_to_xpdrc_n1(self):
143 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(RESTCONF_BASE_URL)
144 data = test_utils.generate_link_data("XPDR-C1", "1", "1", "ROADM-C1", "1", "SRG1-PP1-TXRX")
145 response = test_utils.post_request(url, data, 'admin', 'admin')
146 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
147 res = response.json()
148 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
149 CREATED_SUCCESSFULLY)
152 def test_09_connect_xprda_n2_to_roadma_pp2(self):
153 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(RESTCONF_BASE_URL)
154 data = test_utils.generate_link_data("XPDR-A1", "1", "2", "ROADM-A1", "1", "SRG1-PP2-TXRX")
155 response = test_utils.post_request(url, data, 'admin', 'admin')
156 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
157 res = response.json()
158 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
159 CREATED_SUCCESSFULLY)
162 def test_10_connect_roadma_pp2_to_xpdra_n2(self):
163 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(RESTCONF_BASE_URL)
164 data = test_utils.generate_link_data("XPDR-A1", "1", "2", "ROADM-A1", "1", "SRG1-PP2-TXRX")
165 response = test_utils.post_request(url, data, 'admin', 'admin')
166 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
167 res = response.json()
168 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
169 CREATED_SUCCESSFULLY)
172 def test_11_connect_xprdc_n2_to_roadmc_pp2(self):
173 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(RESTCONF_BASE_URL)
174 data = test_utils.generate_link_data("XPDR-C1", "1", "2", "ROADM-C1", "1", "SRG1-PP2-TXRX")
175 response = test_utils.post_request(url, data, 'admin', 'admin')
176 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
177 res = response.json()
178 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
179 CREATED_SUCCESSFULLY)
182 def test_12_connect_roadmc_pp2_to_xpdrc_n2(self):
183 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(RESTCONF_BASE_URL)
184 data = test_utils.generate_link_data("XPDR-C1", "1", "2", "ROADM-C1", "1", "SRG1-PP2-TXRX")
185 response = test_utils.post_request(url, data, 'admin', 'admin')
186 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
187 res = response.json()
188 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
189 CREATED_SUCCESSFULLY)
192 def test_13_get_tapi_openroadm_topology(self):
193 url = "{}/operations/tapi-topology:get-topology-details".format(RESTCONF_BASE_URL)
195 "tapi-topology:input": {
196 "tapi-topology:topology-id-or-name": "openroadm-topology"
200 response = test_utils.post_request(url, data, 'admin', 'admin')
201 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
202 res = response.json()
203 self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'There should be 1 node')
204 self.assertEqual(len(res["output"]["topology"]["node"][0]["owned-node-edge-point"]), 4,
205 'There should be 4 owned-node-edge-points')
207 def test_14_get_tapi_otn_topology(self):
208 url = "{}/operations/tapi-topology:get-topology-details".format(RESTCONF_BASE_URL)
210 "tapi-topology:input": {
211 "tapi-topology:topology-id-or-name": "otn-topology"
215 response = test_utils.post_request(url, data, 'admin', 'admin')
216 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
217 res = response.json()
218 self.assertEqual(len(res["output"]["topology"]["node"]), 4, 'There should be 4 nodes')
219 self.assertEqual(len(res["output"]["topology"]["link"]), 5, 'There should be 5 links')
220 link_to_check = res["output"]["topology"]["link"][0]
221 # get info from first link to do deeper check
222 node1_uid = link_to_check["node-edge-point"][0]["node-uuid"]
223 node2_uid = link_to_check["node-edge-point"][1]["node-uuid"]
224 node_edge_point1_uid = link_to_check["node-edge-point"][0]["node-edge-point-uuid"]
225 node_edge_point2_uid = link_to_check["node-edge-point"][1]["node-edge-point-uuid"]
226 # get node associated to link info
227 nodes = res["output"]["topology"]["node"]
228 node1 = find_object_with_key(nodes, "uuid", node1_uid)
229 self.assertIsNotNone(node1, 'Node with uuid ' + node1_uid + ' should not be null')
230 node2 = find_object_with_key(nodes, "uuid", node2_uid)
231 self.assertIsNotNone(node2, 'Node with uuid ' + node2_uid + ' should not be null')
232 # get edge-point associated to nodes
233 node1_edge_point = node1["owned-node-edge-point"]
234 node2_edge_point = node2["owned-node-edge-point"]
235 node_edge_point1 = find_object_with_key(node1_edge_point, "uuid", node_edge_point1_uid)
236 self.assertIsNotNone(node_edge_point1, 'Node edge point with uuid ' + node_edge_point1_uid + 'should not be '
238 node_edge_point2 = find_object_with_key(node2_edge_point, "uuid", node_edge_point2_uid)
239 self.assertIsNotNone(node_edge_point2, 'Node edge point with uuid ' + node_edge_point2_uid + 'should not be '
241 self.assertEqual(len(node_edge_point1["name"]), 1, 'There should be 1 name')
242 self.assertEqual(len(node_edge_point2["name"]), 1, 'There should be 1 name')
243 if node_edge_point1["layer-protocol-name"] == 'ODU':
244 self.assertIn('NodeEdgePoint_N', node_edge_point1["name"][0]["value-name"], 'Value name should be '
246 elif node_edge_point1["layer-protocol-name"] == 'PHOTONIC_MEDIA':
247 self.assertIn('iNodeEdgePoint_', node_edge_point1["name"][0]["value-name"], 'Value name should be '
250 self.fail('Wrong layer protocol name')
252 if node_edge_point2["layer-protocol-name"] == 'ODU':
253 self.assertIn('NodeEdgePoint_N', node_edge_point2["name"][0]["value-name"], 'Value name should be '
255 elif node_edge_point2["layer-protocol-name"] == 'PHOTONIC_MEDIA':
256 self.assertIn('iNodeEdgePoint_', node_edge_point2["name"][0]["value-name"], 'Value name should be '
259 self.fail('Wrong layer protocol name')
261 def test_15_disconnect_xpdra(self):
262 url = ("{}/config/network-topology:"
263 "network-topology/topology/topology-netconf/node/XPDR-A1"
264 .format(RESTCONF_BASE_URL))
266 response = test_utils.delete_request(url, 'admin', 'admin')
267 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
270 def test_16_disconnect_xpdrc(self):
271 url = ("{}/config/network-topology:"
272 "network-topology/topology/topology-netconf/node/XPDR-C1"
273 .format(RESTCONF_BASE_URL))
275 response = test_utils.delete_request(url, 'admin', 'admin')
276 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
279 def test_17_disconnect_roadma(self):
280 url = ("{}/config/network-topology:"
281 "network-topology/topology/topology-netconf/node/ROADM-A1"
282 .format(RESTCONF_BASE_URL))
284 response = test_utils.delete_request(url, 'admin', 'admin')
285 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
288 def test_18_disconnect_roadmc(self):
289 url = ("{}/config/network-topology:"
290 "network-topology/topology/topology-netconf/node/ROADM-C1"
291 .format(RESTCONF_BASE_URL))
293 response = test_utils.delete_request(url, 'admin', 'admin')
294 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
297 def test_19_disconnect_spdr_sa1(self):
298 url = ("{}/config/network-topology:"
299 "network-topology/topology/topology-netconf/node/SPDR-SA1"
300 .format(RESTCONF_BASE_URL))
301 response = test_utils.delete_request(url, 'admin', 'admin')
302 self.assertEqual(response.status_code, requests.codes.ok, CODE_SHOULD_BE_200) # pylint: disable=no-member
305 def find_object_with_key(list_dicts, key, value):
306 for dict_ in list_dicts:
307 if dict_[key] == value:
312 if __name__ == "__main__":
313 unittest.main(verbosity=2)