3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
17 #from unittest.result import failfast
19 from common import test_utils
22 class TransportPCERendererTesting(unittest.TestCase):
28 cls.processes = test_utils.start_tpce()
29 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
32 def tearDownClass(cls):
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
37 def test_01_rdm_device_connected(self):
38 response = test_utils.mount_device("ROADM-A1", 'roadma')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
41 def test_02_xpdr_device_connected(self):
42 response = test_utils.mount_device("XPDR-A1", 'xpdra')
43 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
45 def test_03_rdm_portmapping(self):
46 response = test_utils.portmapping_request("ROADM-A1")
47 self.assertEqual(response.status_code, requests.codes.ok)
50 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
51 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
52 res['nodes'][0]['mapping'])
54 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
55 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
56 res['nodes'][0]['mapping'])
58 def test_04_xpdr_portmapping(self):
59 response = test_utils.portmapping_request("XPDR-A1")
60 self.assertEqual(response.status_code, requests.codes.ok)
63 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
64 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
65 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
66 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
67 'lcp-hash-val': 'AMkDwQ7xTmRI'},
68 res['nodes'][0]['mapping'])
70 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
71 'supporting-port': 'C1',
72 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
73 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
74 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
75 'lcp-hash-val': 'AJUUr6I5fALj'},
76 res['nodes'][0]['mapping'])
78 def test_05_service_path_create(self):
79 response = test_utils.service_path_request("create", "service_test", "7",
80 [{"renderer:node-id": "ROADM-A1",
81 "renderer:src-tp": "SRG1-PP3-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
82 {"renderer:node-id": "XPDR-A1",
83 "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
84 self.assertEqual(response.status_code, requests.codes.ok)
86 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
88 def test_06_service_path_create_rdm_check(self):
89 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
90 self.assertEqual(response.status_code, requests.codes.ok)
92 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
95 'name': 'DEG1-TTP-TXRX-nmc-7',
96 'administrative-state': 'inService',
97 'supporting-circuit-pack-name': '1/0',
98 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
99 'supporting-port': 'L1'
100 }, **res['interface'][0]),
103 self.assertDictEqual(
104 {u'frequency': 195.8, u'width': 40},
105 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
107 def test_07_service_path_create_rdm_check(self):
108 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
109 self.assertEqual(response.status_code, requests.codes.ok)
110 res = response.json()
111 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
112 self.assertDictEqual(
114 'name': 'DEG1-TTP-TXRX-mc-7',
115 'administrative-state': 'inService',
116 'supporting-circuit-pack-name': '1/0',
117 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
118 'supporting-port': 'L1'
119 }, **res['interface'][0]),
122 self.assertDictEqual(
123 {u'min-freq': 195.775, u'max-freq': 195.825},
124 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
126 def test_08_service_path_create_rdm_check(self):
127 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
128 self.assertEqual(response.status_code, requests.codes.ok)
129 res = response.json()
130 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
131 self.assertDictEqual(
133 'name': 'SRG1-PP3-TXRX-nmc-7',
134 'administrative-state': 'inService',
135 'supporting-circuit-pack-name': '3/0',
136 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
137 'supporting-port': 'C3'
138 }, **res['interface'][0]),
141 self.assertDictEqual(
142 {u'frequency': 195.8, u'width': 40},
143 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
145 # -mc supporting interfaces must not be created for SRG, only degrees
146 def test_09_service_path_create_rdm_check(self):
147 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
148 self.assertEqual(response.status_code, requests.codes.conflict)
149 res = response.json()
151 {"error-type": "application", "error-tag": "data-missing",
152 "error-message": "Request could not be completed because the relevant data model content does not exist"},
153 res['errors']['error'])
155 def test_10_service_path_create_rdm_check(self):
156 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
159 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
160 self.assertDictEqual(
162 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-7',
163 'opticalControlMode': 'off'
164 }, **res['roadm-connections'][0]),
165 res['roadm-connections'][0]
167 self.assertDictEqual(
168 {'src-if': 'SRG1-PP3-TXRX-nmc-7'},
169 res['roadm-connections'][0]['source'])
170 self.assertDictEqual(
171 {'dst-if': 'DEG1-TTP-TXRX-nmc-7'},
172 res['roadm-connections'][0]['destination'])
174 def test_11_service_path_create_xpdr_check(self):
175 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
179 self.assertDictEqual(
181 'name': 'XPDR1-NETWORK1-7',
182 'administrative-state': 'inService',
183 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
184 'type': 'org-openroadm-interfaces:opticalChannel',
185 'supporting-port': '1'
186 }, **res['interface'][0]),
189 self.assertDictEqual(
190 {u'rate': u'org-openroadm-common-types:R100G',
191 u'transmit-power': -5,
192 u'modulation-format': 'dp-qpsk',
193 u'frequency': 195.8},
194 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
196 def test_12_service_path_create_xpdr_check(self):
197 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
200 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
201 self.assertDictEqual(
203 'name': 'XPDR1-NETWORK1-OTU',
204 'administrative-state': 'inService',
205 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
206 'type': 'org-openroadm-interfaces:otnOtu',
207 'supporting-port': '1',
208 'supporting-interface': 'XPDR1-NETWORK1-7'
209 }, **res['interface'][0]),
212 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
213 'expected-dapi': 'AMkDwQ7xTmRI',
214 'rate': 'org-openroadm-otn-common-types:OTU4',
216 self.assertDictEqual(input_dict_2,
217 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
219 def test_13_service_path_create_xpdr_check(self):
220 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
224 self.assertDictEqual(
226 'name': 'XPDR1-NETWORK1-ODU',
227 'administrative-state': 'inService',
228 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
229 'type': 'org-openroadm-interfaces:otnOdu',
230 'supporting-port': '1',
231 'supporting-interface': 'XPDR1-NETWORK1-OTU'
232 }, **res['interface'][0]),
235 self.assertDictEqual(
237 'rate': 'org-openroadm-otn-common-types:ODU4',
238 u'monitoring-mode': u'terminated'
239 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
240 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
242 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
243 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
245 def test_14_service_path_create_xpdr_check(self):
246 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
249 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
250 self.assertDictEqual(
252 'name': 'XPDR1-CLIENT1-ETHERNET',
253 'administrative-state': 'inService',
254 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
255 'type': 'org-openroadm-interfaces:ethernetCsmacd',
256 'supporting-port': 'C1'
257 }, **res['interface'][0]),
260 self.assertDictEqual(
261 {u'fec': u'off', u'speed': 100000},
262 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
264 def test_15_service_path_create_xpdr_check(self):
265 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
266 self.assertEqual(response.status_code, requests.codes.ok)
267 res = response.json()
268 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
270 def test_16_service_path_create_xpdr_check(self):
271 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
276 def test_17_service_path_delete(self):
277 response = test_utils.service_path_request("delete", "service_test", "7",
278 [{"renderer:node-id": "ROADM-A1",
279 "renderer:src-tp": "SRG1-PP3-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
280 {"renderer:node-id": "XPDR-A1",
281 "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
282 self.assertEqual(response.status_code, requests.codes.ok)
283 self.assertEqual(response.json(), {
284 'output': {'result': 'Request processed', 'success': True}})
286 def test_18_service_path_delete_rdm_check(self):
287 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
288 self.assertEqual(response.status_code, requests.codes.conflict)
289 res = response.json()
291 {"error-type": "application", "error-tag": "data-missing",
292 "error-message": "Request could not be completed because the relevant data model content does not exist"},
293 res['errors']['error'])
295 def test_19_service_path_delete_rdm_check(self):
296 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
297 self.assertEqual(response.status_code, requests.codes.conflict)
298 res = response.json()
300 {"error-type": "application", "error-tag": "data-missing",
301 "error-message": "Request could not be completed because the relevant data model content does not exist"},
302 res['errors']['error'])
304 def test_20_service_path_delete_rdm_check(self):
305 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
306 self.assertEqual(response.status_code, requests.codes.conflict)
307 res = response.json()
309 "error-type": "application",
310 "error-tag": "data-missing",
311 "error-message": "Request could not be completed because the relevant data model content does not exist"},
312 res['errors']['error'])
314 def test_21_service_path_delete_rdm_check(self):
315 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
316 self.assertEqual(response.status_code, requests.codes.conflict)
317 res = response.json()
319 "error-type": "application",
320 "error-tag": "data-missing",
321 "error-message": "Request could not be completed because the relevant data model content does not exist"},
322 res['errors']['error'])
324 def test_22_service_path_delete_rdm_check(self):
325 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
326 self.assertEqual(response.status_code, requests.codes.conflict)
327 res = response.json()
329 "error-type": "application",
330 "error-tag": "data-missing",
331 "error-message": "Request could not be completed because the relevant data model content does not exist"},
332 res['errors']['error'])
334 def test_23_service_path_delete_xpdr_check(self):
335 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
336 self.assertEqual(response.status_code, requests.codes.conflict)
337 res = response.json()
339 "error-type": "application",
340 "error-tag": "data-missing",
341 "error-message": "Request could not be completed because the relevant data model content does not exist"},
342 res['errors']['error'])
344 def test_24_service_path_delete_xpdr_check(self):
345 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
346 self.assertEqual(response.status_code, requests.codes.conflict)
347 res = response.json()
349 "error-type": "application",
350 "error-tag": "data-missing",
351 "error-message": "Request could not be completed because the relevant data model content does not exist"},
352 res['errors']['error'])
354 def test_25_service_path_delete_xpdr_check(self):
355 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
356 self.assertEqual(response.status_code, requests.codes.conflict)
357 res = response.json()
359 "error-type": "application",
360 "error-tag": "data-missing",
361 "error-message": "Request could not be completed because the relevant data model content does not exist"},
362 res['errors']['error'])
364 def test_26_service_path_delete_xpdr_check(self):
365 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
366 self.assertEqual(response.status_code, requests.codes.conflict)
367 res = response.json()
369 "error-type": "application",
370 "error-tag": "data-missing",
371 "error-message": "Request could not be completed because the relevant data model content does not exist"},
372 res['errors']['error'])
374 def test_27_service_path_delete_xpdr_check(self):
375 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
376 self.assertEqual(response.status_code, requests.codes.ok)
377 res = response.json()
378 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
380 def test_28_service_path_delete_xpdr_check(self):
381 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
386 def test_29_rdm_device_disconnected(self):
387 response = test_utils.unmount_device("ROADM-A1")
388 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
390 def test_30_xpdr_device_disconnected(self):
391 response = test_utils.unmount_device("XPDR-A1")
392 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
395 if __name__ == "__main__":
396 unittest.main(verbosity=2, failfast=False)