3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
24 class TransportPCEPortMappingTesting(unittest.TestCase):
29 restconf_baseurl = "http://localhost:8181/restconf"
33 cls.sim_process1 = test_utils.start_sim('xpdra')
35 cls.sim_process2 = test_utils.start_sim('roadma')
36 print("all sims started")
38 cls.odl_process = test_utils.start_tpce()
40 print("opendaylight started")
43 def tearDownClass(cls):
44 for child in psutil.Process(cls.odl_process.pid).children():
45 child.send_signal(signal.SIGINT)
47 cls.odl_process.send_signal(signal.SIGINT)
48 cls.odl_process.wait()
49 for child in psutil.Process(cls.sim_process1.pid).children():
50 child.send_signal(signal.SIGINT)
52 cls.sim_process1.send_signal(signal.SIGINT)
53 cls.sim_process1.wait()
54 for child in psutil.Process(cls.sim_process2.pid).children():
55 child.send_signal(signal.SIGINT)
57 cls.sim_process2.send_signal(signal.SIGINT)
58 cls.sim_process2.wait()
61 print("execution of {}".format(self.id().split(".")[-1]))
64 def test_01_rdm_device_connected(self):
65 url = ("{}/config/network-topology:"
66 "network-topology/topology/topology-netconf/node/ROADM-A1"
67 .format(self.restconf_baseurl))
69 "node-id": "ROADM-A1",
70 "netconf-node-topology:username": "admin",
71 "netconf-node-topology:password": "admin",
72 "netconf-node-topology:host": "127.0.0.1",
73 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
74 "netconf-node-topology:tcp-only": "false",
75 "netconf-node-topology:pass-through": {}}]}
76 headers = {'content-type': 'application/json'}
77 response = requests.request(
78 "PUT", url, data=json.dumps(data), headers=headers,
79 auth=('admin', 'admin'))
80 self.assertEqual(response.status_code, requests.codes.created)
83 def test_02_rdm_device_connected(self):
84 url = ("{}/operational/network-topology:"
85 "network-topology/topology/topology-netconf/node/ROADM-A1"
86 .format(self.restconf_baseurl))
87 headers = {'content-type': 'application/json'}
88 response = requests.request(
89 "GET", url, headers=headers, auth=('admin', 'admin'))
90 self.assertEqual(response.status_code, requests.codes.ok)
93 res['node'][0]['netconf-node-topology:connection-status'],
97 def test_03_rdm_portmapping_info(self):
98 url = ("{}/config/transportpce-portmapping:network/"
99 "nodes/ROADM-A1/node-info"
100 .format(self.restconf_baseurl))
101 headers = {'content-type': 'application/json'}
102 response = requests.request(
103 "GET", url, headers=headers, auth=('admin', 'admin'))
104 self.assertEqual(response.status_code, requests.codes.ok)
105 res = response.json()
107 {u'node-info': {u'node-type': u'rdm',
108 u'node-ip-address': u'127.0.0.11',
109 u'node-clli': u'NodeA',
110 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
111 u'node-model': u'model2'}},
115 def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
116 url = ("{}/config/transportpce-portmapping:network/"
117 "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
118 .format(self.restconf_baseurl))
119 headers = {'content-type': 'application/json'}
120 response = requests.request(
121 "GET", url, headers=headers, auth=('admin', 'admin'))
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
125 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
126 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
129 def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
130 url = ("{}/config/transportpce-portmapping:network/"
131 "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
132 .format(self.restconf_baseurl))
133 headers = {'content-type': 'application/json'}
134 response = requests.request(
135 "GET", url, headers=headers, auth=('admin', 'admin'))
136 self.assertEqual(response.status_code, requests.codes.ok)
137 res = response.json()
139 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
140 'logical-connection-point': 'DEG2-TTP-TXRX',
141 'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
142 'port-direction': 'bidirectional'},
145 def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
146 url = ("{}/config/transportpce-portmapping:network/"
147 "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
148 .format(self.restconf_baseurl))
149 headers = {'content-type': 'application/json'}
150 response = requests.request(
151 "GET", url, headers=headers, auth=('admin', 'admin'))
152 self.assertEqual(response.status_code, requests.codes.ok)
153 res = response.json()
155 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
156 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
159 def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
160 url = ("{}/config/transportpce-portmapping:network/"
161 "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
162 .format(self.restconf_baseurl))
163 headers = {'content-type': 'application/json'}
164 response = requests.request(
165 "GET", url, headers=headers, auth=('admin', 'admin'))
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
169 {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
170 'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
173 def test_08_xpdr_device_connected(self):
174 url = ("{}/config/network-topology:"
175 "network-topology/topology/topology-netconf/node/XPDR-A1"
176 .format(self.restconf_baseurl))
178 "node-id": "XPDR-A1",
179 "netconf-node-topology:username": "admin",
180 "netconf-node-topology:password": "admin",
181 "netconf-node-topology:host": "127.0.0.1",
182 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
183 "netconf-node-topology:tcp-only": "false",
184 "netconf-node-topology:pass-through": {}}]}
185 headers = {'content-type': 'application/json'}
186 response = requests.request(
187 "PUT", url, data=json.dumps(data), headers=headers,
188 auth=('admin', 'admin'))
189 self.assertEqual(response.status_code, requests.codes.created)
192 def test_09_xpdr_device_connected(self):
193 url = ("{}/operational/network-topology:"
194 "network-topology/topology/topology-netconf/node/XPDR-A1"
195 .format(self.restconf_baseurl))
196 headers = {'content-type': 'application/json'}
197 response = requests.request(
198 "GET", url, headers=headers, auth=('admin', 'admin'))
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
202 res['node'][0]['netconf-node-topology:connection-status'],
206 def test_10_xpdr_portmapping_info(self):
207 url = ("{}/config/transportpce-portmapping:network/"
208 "nodes/XPDR-A1/node-info"
209 .format(self.restconf_baseurl))
210 headers = {'content-type': 'application/json'}
211 response = requests.request(
212 "GET", url, headers=headers, auth=('admin', 'admin'))
213 self.assertEqual(response.status_code, requests.codes.ok)
214 res = response.json()
216 {u'node-info': {u'node-type': u'xpdr',
217 u'node-ip-address': u'1.2.3.4',
218 u'node-clli': u'NodeA',
219 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
220 u'node-model': u'model2'}},
224 def test_11_xpdr_portmapping_NETWORK1(self):
225 url = ("{}/config/transportpce-portmapping:network/"
226 "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
227 .format(self.restconf_baseurl))
228 headers = {'content-type': 'application/json'}
229 response = requests.request(
230 "GET", url, headers=headers, auth=('admin', 'admin'))
231 self.assertEqual(response.status_code, requests.codes.ok)
232 res = response.json()
234 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
235 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
236 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
237 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
238 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
241 def test_12_xpdr_portmapping_NETWORK2(self):
242 url = ("{}/config/transportpce-portmapping:network/"
243 "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
244 .format(self.restconf_baseurl))
245 headers = {'content-type': 'application/json'}
246 response = requests.request(
247 "GET", url, headers=headers, auth=('admin', 'admin'))
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
251 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
252 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
253 'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
254 'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
255 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd942'},
258 def test_13_xpdr_portmapping_CLIENT1(self):
259 url = ("{}/config/transportpce-portmapping:network/"
260 "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
261 .format(self.restconf_baseurl))
262 headers = {'content-type': 'application/json'}
263 response = requests.request(
264 "GET", url, headers=headers, auth=('admin', 'admin'))
265 self.assertEqual(response.status_code, requests.codes.ok)
266 res = response.json()
268 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
269 'supporting-port': 'C1',
270 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
271 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
272 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
273 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
276 def test_14_xpdr_portmapping_CLIENT2(self):
277 url = ("{}/config/transportpce-portmapping:network/"
278 "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
279 .format(self.restconf_baseurl))
280 headers = {'content-type': 'application/json'}
281 response = requests.request(
282 "GET", url, headers=headers, auth=('admin', 'admin'))
283 self.assertEqual(response.status_code, requests.codes.ok)
284 res = response.json()
286 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
287 'supporting-port': 'C1',
288 'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
289 'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
290 'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
291 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03db'},
294 def test_15_xpdr_device_disconnected(self):
295 url = ("{}/config/network-topology:"
296 "network-topology/topology/topology-netconf/node/XPDR-A1"
297 .format(self.restconf_baseurl))
298 headers = {'content-type': 'application/json'}
299 response = requests.request(
300 "DELETE", url, headers=headers,
301 auth=('admin', 'admin'))
302 self.assertEqual(response.status_code, requests.codes.ok)
305 def test_16_xpdr_device_disconnected(self):
306 url = ("{}/operational/network-topology:network-topology/topology/"
307 "topology-netconf/node/XPDR-A1".format(self.restconf_baseurl))
308 headers = {'content-type': 'application/json'}
309 response = requests.request(
310 "GET", url, headers=headers, auth=('admin', 'admin'))
311 self.assertEqual(response.status_code, requests.codes.not_found)
312 res = response.json()
314 {"error-type": "application", "error-tag": "data-missing",
315 "error-message": "Request could not be completed because the relevant data model content does not exist"},
316 res['errors']['error'])
318 def test_17_xpdr_device_disconnected(self):
319 url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(self.restconf_baseurl))
320 headers = {'content-type': 'application/json'}
321 response = requests.request(
322 "GET", url, headers=headers, auth=('admin', 'admin'))
323 self.assertEqual(response.status_code, requests.codes.not_found)
324 res = response.json()
326 {"error-type": "application", "error-tag": "data-missing",
327 "error-message": "Request could not be completed because the relevant data model content does not exist"},
328 res['errors']['error'])
330 def test_18_rdm_device_disconnected(self):
331 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
332 .format(self.restconf_baseurl))
333 headers = {'content-type': 'application/json'}
334 response = requests.request(
335 "DELETE", url, headers=headers,
336 auth=('admin', 'admin'))
337 self.assertEqual(response.status_code, requests.codes.ok)
340 def test_19_rdm_device_disconnected(self):
341 url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
342 .format(self.restconf_baseurl))
343 headers = {'content-type': 'application/json'}
344 response = requests.request(
345 "GET", url, headers=headers, auth=('admin', 'admin'))
346 self.assertEqual(response.status_code, requests.codes.not_found)
347 res = response.json()
349 {"error-type": "application", "error-tag": "data-missing",
350 "error-message": "Request could not be completed because the relevant data model content does not exist"},
351 res['errors']['error'])
353 def test_20_rdm_device_disconnected(self):
354 url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(self.restconf_baseurl))
355 headers = {'content-type': 'application/json'}
356 response = requests.request(
357 "GET", url, headers=headers, auth=('admin', 'admin'))
358 self.assertEqual(response.status_code, requests.codes.not_found)
359 res = response.json()
361 {"error-type": "application", "error-tag": "data-missing",
362 "error-message": "Request could not be completed because the relevant data model content does not exist"},
363 res['errors']['error'])
366 if __name__ == "__main__":
367 unittest.main(verbosity=2)