3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
14 #from unittest.result import failfast
16 from common import test_utils
19 class TransportPCERendererTesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
34 def test_01_rdm_device_connected(self):
35 response = test_utils.mount_device("ROADM-A1", 'roadma')
36 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
38 def test_02_xpdr_device_connected(self):
39 response = test_utils.mount_device("XPDR-A1", 'xpdra')
40 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 def test_03_rdm_portmapping(self):
43 response = test_utils.portmapping_request("ROADM-A1")
44 self.assertEqual(response.status_code, requests.codes.ok)
47 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
48 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
49 res['nodes'][0]['mapping'])
51 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
52 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
53 res['nodes'][0]['mapping'])
55 def test_04_xpdr_portmapping(self):
56 response = test_utils.portmapping_request("XPDR-A1")
57 self.assertEqual(response.status_code, requests.codes.ok)
60 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
61 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
62 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
63 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
64 'lcp-hash-val': 'AMkDwQ7xTmRI'},
65 res['nodes'][0]['mapping'])
67 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
68 'supporting-port': 'C1',
69 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
70 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
71 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
72 'lcp-hash-val': 'AJUUr6I5fALj'},
73 res['nodes'][0]['mapping'])
75 def test_05_service_path_create(self):
76 url = "{}/operations/transportpce-device-renderer:service-path"
77 data = {"renderer:input": {
78 "renderer:service-name": "service_test",
79 "renderer:wave-number": "7",
80 "renderer:modulation-format": "qpsk",
81 "renderer:operation": "create",
83 {"renderer:node-id": "ROADM-A1",
84 "renderer:src-tp": "SRG1-PP3-TXRX",
85 "renderer:dest-tp": "DEG1-TTP-TXRX"},
86 {"renderer:node-id": "XPDR-A1",
87 "renderer:src-tp": "XPDR1-CLIENT1",
88 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
89 response = test_utils.post_request(url, data)
90 self.assertEqual(response.status_code, requests.codes.ok)
92 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
94 def test_06_service_path_create_rdm_check(self):
95 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
96 self.assertEqual(response.status_code, requests.codes.ok)
98 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
101 'name': 'DEG1-TTP-TXRX-nmc-7',
102 'administrative-state': 'inService',
103 'supporting-circuit-pack-name': '1/0',
104 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
105 'supporting-port': 'L1'
106 }, **res['interface'][0]),
109 self.assertDictEqual(
110 {u'frequency': 195.8, u'width': 40},
111 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
113 def test_07_service_path_create_rdm_check(self):
114 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
115 self.assertEqual(response.status_code, requests.codes.ok)
116 res = response.json()
117 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
118 self.assertDictEqual(
120 'name': 'DEG1-TTP-TXRX-mc-7',
121 'administrative-state': 'inService',
122 'supporting-circuit-pack-name': '1/0',
123 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
124 'supporting-port': 'L1'
125 }, **res['interface'][0]),
128 self.assertDictEqual(
129 {u'min-freq': 195.775, u'max-freq': 195.825},
130 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
132 def test_08_service_path_create_rdm_check(self):
133 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
134 self.assertEqual(response.status_code, requests.codes.ok)
135 res = response.json()
136 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
137 self.assertDictEqual(
139 'name': 'SRG1-PP3-TXRX-nmc-7',
140 'administrative-state': 'inService',
141 'supporting-circuit-pack-name': '3/0',
142 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
143 'supporting-port': 'C3'
144 }, **res['interface'][0]),
147 self.assertDictEqual(
148 {u'frequency': 195.8, u'width': 40},
149 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
151 # -mc supporting interfaces must not be created for SRG, only degrees
152 def test_09_service_path_create_rdm_check(self):
153 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
154 self.assertEqual(response.status_code, requests.codes.not_found)
155 res = response.json()
157 {"error-type": "application", "error-tag": "data-missing",
158 "error-message": "Request could not be completed because the relevant data model content does not exist"},
159 res['errors']['error'])
161 def test_10_service_path_create_rdm_check(self):
162 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
165 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
166 self.assertDictEqual(
168 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-7',
169 'opticalControlMode': 'off'
170 }, **res['roadm-connections'][0]),
171 res['roadm-connections'][0]
173 self.assertDictEqual(
174 {'src-if': 'SRG1-PP3-TXRX-nmc-7'},
175 res['roadm-connections'][0]['source'])
176 self.assertDictEqual(
177 {'dst-if': 'DEG1-TTP-TXRX-nmc-7'},
178 res['roadm-connections'][0]['destination'])
180 def test_11_service_path_create_xpdr_check(self):
181 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
185 self.assertDictEqual(
187 'name': 'XPDR1-NETWORK1-7',
188 'administrative-state': 'inService',
189 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
190 'type': 'org-openroadm-interfaces:opticalChannel',
191 'supporting-port': '1'
192 }, **res['interface'][0]),
195 self.assertDictEqual(
196 {u'rate': u'org-openroadm-common-types:R100G',
197 u'transmit-power': -5,
198 u'modulation-format': 'dp-qpsk',
199 u'frequency': 195.8},
200 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
202 def test_12_service_path_create_xpdr_check(self):
203 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
204 self.assertEqual(response.status_code, requests.codes.ok)
205 res = response.json()
206 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
207 self.assertDictEqual(
209 'name': 'XPDR1-NETWORK1-OTU',
210 'administrative-state': 'inService',
211 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
212 'type': 'org-openroadm-interfaces:otnOtu',
213 'supporting-port': '1',
214 'supporting-interface': 'XPDR1-NETWORK1-7'
215 }, **res['interface'][0]),
218 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
219 'expected-dapi': 'AMkDwQ7xTmRI',
220 'rate': 'org-openroadm-otn-common-types:OTU4',
222 self.assertDictEqual(input_dict_2,
223 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
225 def test_13_service_path_create_xpdr_check(self):
226 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
227 self.assertEqual(response.status_code, requests.codes.ok)
228 res = response.json()
229 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
230 self.assertDictEqual(
232 'name': 'XPDR1-NETWORK1-ODU',
233 'administrative-state': 'inService',
234 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
235 'type': 'org-openroadm-interfaces:otnOdu',
236 'supporting-port': '1',
237 'supporting-interface': 'XPDR1-NETWORK1-OTU'
238 }, **res['interface'][0]),
241 self.assertDictEqual(
243 'rate': 'org-openroadm-otn-common-types:ODU4',
244 u'monitoring-mode': u'terminated'
245 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
246 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
248 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
249 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
251 def test_14_service_path_create_xpdr_check(self):
252 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
256 self.assertDictEqual(
258 'name': 'XPDR1-CLIENT1-ETHERNET',
259 'administrative-state': 'inService',
260 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
261 'type': 'org-openroadm-interfaces:ethernetCsmacd',
262 'supporting-port': 'C1'
263 }, **res['interface'][0]),
266 self.assertDictEqual(
267 {u'fec': u'off', u'speed': 100000},
268 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
270 def test_15_service_path_create_xpdr_check(self):
271 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
276 def test_16_service_path_create_xpdr_check(self):
277 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
282 def test_17_service_path_delete(self):
283 url = "{}/operations/transportpce-device-renderer:service-path"
284 data = {"renderer:input": {
285 "renderer:service-name": "service_test",
286 "renderer:wave-number": "7",
287 "renderer:operation": "delete",
289 {"renderer:node-id": "ROADM-A1",
290 "renderer:src-tp": "SRG1-PP3-TXRX",
291 "renderer:dest-tp": "DEG1-TTP-TXRX"},
292 {"renderer:node-id": "XPDR-A1",
293 "renderer:src-tp": "XPDR1-CLIENT1",
294 "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
295 response = test_utils.post_request(url, data)
296 self.assertEqual(response.status_code, requests.codes.ok)
297 self.assertEqual(response.json(), {
298 'output': {'result': 'Request processed', 'success': True}})
300 def test_18_service_path_delete_rdm_check(self):
301 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
302 self.assertEqual(response.status_code, requests.codes.not_found)
303 res = response.json()
305 {"error-type": "application", "error-tag": "data-missing",
306 "error-message": "Request could not be completed because the relevant data model content does not exist"},
307 res['errors']['error'])
309 def test_19_service_path_delete_rdm_check(self):
310 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
311 self.assertEqual(response.status_code, requests.codes.not_found)
312 res = response.json()
314 {"error-type": "application", "error-tag": "data-missing",
315 "error-message": "Request could not be completed because the relevant data model content does not exist"},
316 res['errors']['error'])
318 def test_20_service_path_delete_rdm_check(self):
319 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
320 self.assertEqual(response.status_code, requests.codes.not_found)
321 res = response.json()
323 "error-type": "application",
324 "error-tag": "data-missing",
325 "error-message": "Request could not be completed because the relevant data model content does not exist"},
326 res['errors']['error'])
328 def test_21_service_path_delete_rdm_check(self):
329 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
330 self.assertEqual(response.status_code, requests.codes.not_found)
331 res = response.json()
333 "error-type": "application",
334 "error-tag": "data-missing",
335 "error-message": "Request could not be completed because the relevant data model content does not exist"},
336 res['errors']['error'])
338 def test_22_service_path_delete_rdm_check(self):
339 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
340 self.assertEqual(response.status_code, requests.codes.not_found)
341 res = response.json()
343 "error-type": "application",
344 "error-tag": "data-missing",
345 "error-message": "Request could not be completed because the relevant data model content does not exist"},
346 res['errors']['error'])
348 def test_23_service_path_delete_xpdr_check(self):
349 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
350 self.assertEqual(response.status_code, requests.codes.not_found)
351 res = response.json()
353 "error-type": "application",
354 "error-tag": "data-missing",
355 "error-message": "Request could not be completed because the relevant data model content does not exist"},
356 res['errors']['error'])
358 def test_24_service_path_delete_xpdr_check(self):
359 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
360 self.assertEqual(response.status_code, requests.codes.not_found)
361 res = response.json()
363 "error-type": "application",
364 "error-tag": "data-missing",
365 "error-message": "Request could not be completed because the relevant data model content does not exist"},
366 res['errors']['error'])
368 def test_25_service_path_delete_xpdr_check(self):
369 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
370 self.assertEqual(response.status_code, requests.codes.not_found)
371 res = response.json()
373 "error-type": "application",
374 "error-tag": "data-missing",
375 "error-message": "Request could not be completed because the relevant data model content does not exist"},
376 res['errors']['error'])
378 def test_26_service_path_delete_xpdr_check(self):
379 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
380 self.assertEqual(response.status_code, requests.codes.not_found)
381 res = response.json()
383 "error-type": "application",
384 "error-tag": "data-missing",
385 "error-message": "Request could not be completed because the relevant data model content does not exist"},
386 res['errors']['error'])
388 def test_27_service_path_delete_xpdr_check(self):
389 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
390 self.assertEqual(response.status_code, requests.codes.ok)
391 res = response.json()
392 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
394 def test_28_service_path_delete_xpdr_check(self):
395 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
396 self.assertEqual(response.status_code, requests.codes.ok)
397 res = response.json()
398 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
400 def test_29_rdm_device_disconnected(self):
401 response = test_utils.unmount_device("ROADM-A1")
402 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
404 def test_30_xpdr_device_disconnected(self):
405 response = test_utils.unmount_device("XPDR-A1")
406 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
409 if __name__ == "__main__":
410 unittest.main(verbosity=2, failfast=False)