3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 # from unittest.result import failfast
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCERendererTesting(unittest.TestCase):
29 NODE_VERSION = '2.2.1'
33 cls.processes = test_utils.start_tpce()
34 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
37 def tearDownClass(cls):
38 # pylint: disable=not-an-iterable
39 for process in cls.processes:
40 test_utils.shutdown_process(process)
41 print("all processes killed")
43 def test_01_rdm_device_connected(self):
44 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
45 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
47 def test_02_xpdr_device_connected(self):
48 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
49 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
51 def test_03_rdm_portmapping(self):
52 response = test_utils.portmapping_request("ROADM-A1")
53 self.assertEqual(response.status_code, requests.codes.ok)
56 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
57 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
58 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
59 res['nodes'][0]['mapping'])
61 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
62 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional',
63 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
64 res['nodes'][0]['mapping'])
66 def test_04_xpdr_portmapping(self):
67 response = test_utils.portmapping_request("XPDR-A1")
68 self.assertEqual(response.status_code, requests.codes.ok)
71 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
72 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
73 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
74 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
75 'lcp-hash-val': 'AMkDwQ7xTmRI', 'xponder-type': 'tpdr',
76 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
77 res['nodes'][0]['mapping'])
79 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
80 'supporting-port': 'C1',
81 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
82 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
83 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
84 'lcp-hash-val': 'AJUUr6I5fALj', 'xponder-type': 'tpdr',
85 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
86 res['nodes'][0]['mapping'])
88 def test_05_service_path_create(self):
89 response = test_utils.service_path_request("create", "service_test", "7",
90 [{"renderer:node-id": "ROADM-A1",
91 "renderer:src-tp": "SRG1-PP3-TXRX",
92 "renderer:dest-tp": "DEG1-TTP-TXRX"},
93 {"renderer:node-id": "XPDR-A1",
94 "renderer:src-tp": "XPDR1-CLIENT1",
95 "renderer:dest-tp": "XPDR1-NETWORK1"}],
96 195.8, 40, 195.775, 195.825, 713,
98 self.assertEqual(response.status_code, requests.codes.ok)
100 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
102 def test_06_service_path_create_rdm_check(self):
103 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-713:720")
104 self.assertEqual(response.status_code, requests.codes.ok)
105 res = response.json()
106 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
107 self.assertDictEqual(
109 'name': 'DEG1-TTP-TXRX-nmc-713:720',
110 'administrative-state': 'inService',
111 'supporting-circuit-pack-name': '1/0',
112 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
113 'supporting-port': 'L1'
114 }, **res['interface'][0]),
117 self.assertDictEqual(
118 {'frequency': 195.8, 'width': 40},
119 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
121 def test_07_service_path_create_rdm_check(self):
122 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-713:720")
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
125 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
126 self.assertDictEqual(
128 'name': 'DEG1-TTP-TXRX-mc-7',
129 'administrative-state': 'inService',
130 'supporting-circuit-pack-name': '1/0',
131 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
132 'supporting-port': 'L1'
133 }, **res['interface'][0]),
136 self.assertDictEqual(
137 {'min-freq': 195.775, 'max-freq': 195.825},
138 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
140 def test_08_service_path_create_rdm_check(self):
141 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-713:720")
142 self.assertEqual(response.status_code, requests.codes.ok)
143 res = response.json()
144 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
145 self.assertDictEqual(
147 'name': 'SRG1-PP3-TXRX-nmc-713:720',
148 'administrative-state': 'inService',
149 'supporting-circuit-pack-name': '3/0',
150 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
151 'supporting-port': 'C3'
152 }, **res['interface'][0]),
155 self.assertDictEqual(
156 {'frequency': 195.8, 'width': 40},
157 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
159 # -mc supporting interfaces must not be created for SRG, only degrees
160 def test_09_service_path_create_rdm_check(self):
161 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-713:720")
162 self.assertEqual(response.status_code, requests.codes.conflict)
163 res = response.json()
165 {"error-type": "application", "error-tag": "data-missing",
166 "error-message": "Request could not be completed because the relevant data model content does not exist"},
167 res['errors']['error'])
169 def test_10_service_path_create_rdm_check(self):
170 response = test_utils.check_netconf_node_request(
171 "ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
175 self.assertDictEqual(
177 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720',
178 'opticalControlMode': 'off'
179 }, **res['roadm-connections'][0]),
180 res['roadm-connections'][0]
182 self.assertDictEqual(
183 {'src-if': 'SRG1-PP3-TXRX-nmc-713:720'},
184 res['roadm-connections'][0]['source'])
185 self.assertDictEqual(
186 {'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'},
187 res['roadm-connections'][0]['destination'])
189 def test_11_service_path_create_xpdr_check(self):
190 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-713:720")
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
193 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
194 self.assertDictEqual(
196 'name': 'XPDR1-NETWORK1-713:720',
197 'administrative-state': 'inService',
198 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
199 'type': 'org-openroadm-interfaces:opticalChannel',
200 'supporting-port': '1'
201 }, **res['interface'][0]),
204 self.assertDictEqual(
205 {'rate': 'org-openroadm-common-types:R100G',
206 'transmit-power': -5,
207 'modulation-format': 'dp-qpsk',
209 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
211 def test_12_service_path_create_xpdr_check(self):
212 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
213 self.assertEqual(response.status_code, requests.codes.ok)
214 res = response.json()
215 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
216 self.assertDictEqual(
218 'name': 'XPDR1-NETWORK1-OTU',
219 'administrative-state': 'inService',
220 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
221 'type': 'org-openroadm-interfaces:otnOtu',
222 'supporting-port': '1',
223 'supporting-interface': 'XPDR1-NETWORK1-7'
224 }, **res['interface'][0]),
227 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
228 'expected-dapi': 'AMkDwQ7xTmRI',
229 'rate': 'org-openroadm-otn-common-types:OTU4',
231 self.assertDictEqual(input_dict_2,
232 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
234 def test_13_service_path_create_xpdr_check(self):
235 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
236 self.assertEqual(response.status_code, requests.codes.ok)
237 res = response.json()
238 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
239 self.assertDictEqual(
241 'name': 'XPDR1-NETWORK1-ODU',
242 'administrative-state': 'inService',
243 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
244 'type': 'org-openroadm-interfaces:otnOdu',
245 'supporting-port': '1',
246 'supporting-interface': 'XPDR1-NETWORK1-OTU'
247 }, **res['interface'][0]),
250 self.assertDictEqual(
252 'rate': 'org-openroadm-otn-common-types:ODU4',
253 'monitoring-mode': 'terminated'
254 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
255 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
257 self.assertDictEqual({'exp-payload-type': '07', 'payload-type': '07'},
258 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
260 def test_14_service_path_create_xpdr_check(self):
261 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
264 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
265 self.assertDictEqual(
267 'name': 'XPDR1-CLIENT1-ETHERNET',
268 'administrative-state': 'inService',
269 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
270 'type': 'org-openroadm-interfaces:ethernetCsmacd',
271 'supporting-port': 'C1'
272 }, **res['interface'][0]),
275 self.assertDictEqual(
276 {'fec': 'off', 'speed': 100000},
277 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
279 def test_15_service_path_create_xpdr_check(self):
280 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
281 self.assertEqual(response.status_code, requests.codes.ok)
282 res = response.json()
283 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
285 def test_16_service_path_create_xpdr_check(self):
286 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
291 def test_17_service_path_delete(self):
292 response = test_utils.service_path_request("delete", "service_test", "7",
293 [{"renderer:node-id": "ROADM-A1",
294 "renderer:src-tp": "SRG1-PP3-TXRX",
295 "renderer:dest-tp": "DEG1-TTP-TXRX"},
296 {"renderer:node-id": "XPDR-A1",
297 "renderer:src-tp": "XPDR1-CLIENT1",
298 "renderer:dest-tp": "XPDR1-NETWORK1"}],
299 195.8, 40, 195.775, 195.825, 713,
301 self.assertEqual(response.status_code, requests.codes.ok)
302 self.assertEqual(response.json(), {
303 'output': {'result': 'Request processed', 'success': True}})
305 def test_18_service_path_delete_rdm_check(self):
306 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-713:720")
307 self.assertEqual(response.status_code, requests.codes.conflict)
308 res = response.json()
310 {"error-type": "application", "error-tag": "data-missing",
311 "error-message": "Request could not be completed because the relevant data model content does not exist"},
312 res['errors']['error'])
314 def test_19_service_path_delete_rdm_check(self):
315 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-713:720")
316 self.assertEqual(response.status_code, requests.codes.conflict)
317 res = response.json()
319 {"error-type": "application", "error-tag": "data-missing",
320 "error-message": "Request could not be completed because the relevant data model content does not exist"},
321 res['errors']['error'])
323 def test_20_service_path_delete_rdm_check(self):
324 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-713:720")
325 self.assertEqual(response.status_code, requests.codes.conflict)
326 res = response.json()
328 "error-type": "application",
329 "error-tag": "data-missing",
330 "error-message": "Request could not be completed because the relevant data model content does not exist"},
331 res['errors']['error'])
333 def test_21_service_path_delete_rdm_check(self):
334 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-713:720")
335 self.assertEqual(response.status_code, requests.codes.conflict)
336 res = response.json()
338 "error-type": "application",
339 "error-tag": "data-missing",
340 "error-message": "Request could not be completed because the relevant data model content does not exist"},
341 res['errors']['error'])
343 def test_22_service_path_delete_rdm_check(self):
344 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
345 self.assertEqual(response.status_code, requests.codes.conflict)
346 res = response.json()
348 "error-type": "application",
349 "error-tag": "data-missing",
350 "error-message": "Request could not be completed because the relevant data model content does not exist"},
351 res['errors']['error'])
353 def test_23_service_path_delete_xpdr_check(self):
354 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-713:720")
355 self.assertEqual(response.status_code, requests.codes.conflict)
356 res = response.json()
358 "error-type": "application",
359 "error-tag": "data-missing",
360 "error-message": "Request could not be completed because the relevant data model content does not exist"},
361 res['errors']['error'])
363 def test_24_service_path_delete_xpdr_check(self):
364 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
365 self.assertEqual(response.status_code, requests.codes.conflict)
366 res = response.json()
368 "error-type": "application",
369 "error-tag": "data-missing",
370 "error-message": "Request could not be completed because the relevant data model content does not exist"},
371 res['errors']['error'])
373 def test_25_service_path_delete_xpdr_check(self):
374 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
375 self.assertEqual(response.status_code, requests.codes.conflict)
376 res = response.json()
378 "error-type": "application",
379 "error-tag": "data-missing",
380 "error-message": "Request could not be completed because the relevant data model content does not exist"},
381 res['errors']['error'])
383 def test_26_service_path_delete_xpdr_check(self):
384 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
385 self.assertEqual(response.status_code, requests.codes.conflict)
386 res = response.json()
388 "error-type": "application",
389 "error-tag": "data-missing",
390 "error-message": "Request could not be completed because the relevant data model content does not exist"},
391 res['errors']['error'])
393 def test_27_service_path_delete_xpdr_check(self):
394 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
397 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
399 def test_28_service_path_delete_xpdr_check(self):
400 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
403 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
405 def test_29_rdm_device_disconnected(self):
406 response = test_utils.unmount_device("ROADM-A1")
407 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
409 def test_30_xpdr_device_disconnected(self):
410 response = test_utils.unmount_device("XPDR-A1")
411 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
414 if __name__ == "__main__":
415 unittest.main(verbosity=2, failfast=False)