3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 #from unittest.result import failfast
18 from common import test_utils
21 class TransportPCERendererTesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
31 def tearDownClass(cls):
32 for process in cls.processes:
33 test_utils.shutdown_process(process)
34 print("all processes killed")
36 def test_01_rdm_device_connected(self):
37 response = test_utils.mount_device("ROADM-A1", 'roadma')
38 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40 def test_02_xpdr_device_connected(self):
41 response = test_utils.mount_device("XPDR-A1", 'xpdra')
42 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
44 def test_03_rdm_portmapping(self):
45 response = test_utils.portmapping_request("ROADM-A1")
46 self.assertEqual(response.status_code, requests.codes.ok)
49 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
50 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
51 res['nodes'][0]['mapping'])
53 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
54 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
55 res['nodes'][0]['mapping'])
57 def test_04_xpdr_portmapping(self):
58 response = test_utils.portmapping_request("XPDR-A1")
59 self.assertEqual(response.status_code, requests.codes.ok)
62 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
63 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
64 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
65 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
66 'lcp-hash-val': 'AMkDwQ7xTmRI'},
67 res['nodes'][0]['mapping'])
69 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
70 'supporting-port': 'C1',
71 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
72 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
73 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
74 'lcp-hash-val': 'AJUUr6I5fALj'},
75 res['nodes'][0]['mapping'])
77 def test_05_service_path_create(self):
78 response = test_utils.service_path_request("create", "service_test", "7",
79 [{"renderer:node-id": "ROADM-A1",
80 "renderer:src-tp": "SRG1-PP3-TXRX",
81 "renderer:dest-tp": "DEG1-TTP-TXRX"},
82 {"renderer:node-id": "XPDR-A1",
83 "renderer:src-tp": "XPDR1-CLIENT1",
84 "renderer:dest-tp": "XPDR1-NETWORK1"}])
85 self.assertEqual(response.status_code, requests.codes.ok)
87 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
89 def test_06_service_path_create_rdm_check(self):
90 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
91 self.assertEqual(response.status_code, requests.codes.ok)
93 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
96 'name': 'DEG1-TTP-TXRX-nmc-7',
97 'administrative-state': 'inService',
98 'supporting-circuit-pack-name': '1/0',
99 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
100 'supporting-port': 'L1'
101 }, **res['interface'][0]),
104 self.assertDictEqual(
105 {u'frequency': 195.8, u'width': 40},
106 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
108 def test_07_service_path_create_rdm_check(self):
109 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
110 self.assertEqual(response.status_code, requests.codes.ok)
111 res = response.json()
112 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
113 self.assertDictEqual(
115 'name': 'DEG1-TTP-TXRX-mc-7',
116 'administrative-state': 'inService',
117 'supporting-circuit-pack-name': '1/0',
118 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
119 'supporting-port': 'L1'
120 }, **res['interface'][0]),
123 self.assertDictEqual(
124 {u'min-freq': 195.775, u'max-freq': 195.825},
125 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
127 def test_08_service_path_create_rdm_check(self):
128 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
129 self.assertEqual(response.status_code, requests.codes.ok)
130 res = response.json()
131 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
132 self.assertDictEqual(
134 'name': 'SRG1-PP3-TXRX-nmc-7',
135 'administrative-state': 'inService',
136 'supporting-circuit-pack-name': '3/0',
137 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
138 'supporting-port': 'C3'
139 }, **res['interface'][0]),
142 self.assertDictEqual(
143 {u'frequency': 195.8, u'width': 40},
144 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
146 # -mc supporting interfaces must not be created for SRG, only degrees
147 def test_09_service_path_create_rdm_check(self):
148 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
149 self.assertEqual(response.status_code, requests.codes.conflict)
150 res = response.json()
152 {"error-type": "application", "error-tag": "data-missing",
153 "error-message": "Request could not be completed because the relevant data model content does not exist"},
154 res['errors']['error'])
156 def test_10_service_path_create_rdm_check(self):
157 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
158 self.assertEqual(response.status_code, requests.codes.ok)
159 res = response.json()
160 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
161 self.assertDictEqual(
163 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-7',
164 'opticalControlMode': 'off'
165 }, **res['roadm-connections'][0]),
166 res['roadm-connections'][0]
168 self.assertDictEqual(
169 {'src-if': 'SRG1-PP3-TXRX-nmc-7'},
170 res['roadm-connections'][0]['source'])
171 self.assertDictEqual(
172 {'dst-if': 'DEG1-TTP-TXRX-nmc-7'},
173 res['roadm-connections'][0]['destination'])
175 def test_11_service_path_create_xpdr_check(self):
176 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
180 self.assertDictEqual(
182 'name': 'XPDR1-NETWORK1-7',
183 'administrative-state': 'inService',
184 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
185 'type': 'org-openroadm-interfaces:opticalChannel',
186 'supporting-port': '1'
187 }, **res['interface'][0]),
190 self.assertDictEqual(
191 {u'rate': u'org-openroadm-common-types:R100G',
192 u'transmit-power': -5,
193 u'modulation-format': 'dp-qpsk',
194 u'frequency': 195.8},
195 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
197 def test_12_service_path_create_xpdr_check(self):
198 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
202 self.assertDictEqual(
204 'name': 'XPDR1-NETWORK1-OTU',
205 'administrative-state': 'inService',
206 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
207 'type': 'org-openroadm-interfaces:otnOtu',
208 'supporting-port': '1',
209 'supporting-interface': 'XPDR1-NETWORK1-7'
210 }, **res['interface'][0]),
213 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
214 'expected-dapi': 'AMkDwQ7xTmRI',
215 'rate': 'org-openroadm-otn-common-types:OTU4',
217 self.assertDictEqual(input_dict_2,
218 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
220 def test_13_service_path_create_xpdr_check(self):
221 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
225 self.assertDictEqual(
227 'name': 'XPDR1-NETWORK1-ODU',
228 'administrative-state': 'inService',
229 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
230 'type': 'org-openroadm-interfaces:otnOdu',
231 'supporting-port': '1',
232 'supporting-interface': 'XPDR1-NETWORK1-OTU'
233 }, **res['interface'][0]),
236 self.assertDictEqual(
238 'rate': 'org-openroadm-otn-common-types:ODU4',
239 u'monitoring-mode': u'terminated'
240 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
241 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
243 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
244 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
246 def test_14_service_path_create_xpdr_check(self):
247 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
251 self.assertDictEqual(
253 'name': 'XPDR1-CLIENT1-ETHERNET',
254 'administrative-state': 'inService',
255 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
256 'type': 'org-openroadm-interfaces:ethernetCsmacd',
257 'supporting-port': 'C1'
258 }, **res['interface'][0]),
261 self.assertDictEqual(
262 {u'fec': u'off', u'speed': 100000},
263 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
265 def test_15_service_path_create_xpdr_check(self):
266 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
271 def test_16_service_path_create_xpdr_check(self):
272 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
277 def test_17_service_path_delete(self):
278 response = test_utils.service_path_request("delete", "service_test", "7",
279 [{"renderer:node-id": "ROADM-A1",
280 "renderer:src-tp": "SRG1-PP3-TXRX",
281 "renderer:dest-tp": "DEG1-TTP-TXRX"},
282 {"renderer:node-id": "XPDR-A1",
283 "renderer:src-tp": "XPDR1-CLIENT1",
284 "renderer:dest-tp": "XPDR1-NETWORK1"}])
285 self.assertEqual(response.status_code, requests.codes.ok)
286 self.assertEqual(response.json(), {
287 'output': {'result': 'Request processed', 'success': True}})
289 def test_18_service_path_delete_rdm_check(self):
290 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
291 self.assertEqual(response.status_code, requests.codes.conflict)
292 res = response.json()
294 {"error-type": "application", "error-tag": "data-missing",
295 "error-message": "Request could not be completed because the relevant data model content does not exist"},
296 res['errors']['error'])
298 def test_19_service_path_delete_rdm_check(self):
299 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
300 self.assertEqual(response.status_code, requests.codes.conflict)
301 res = response.json()
303 {"error-type": "application", "error-tag": "data-missing",
304 "error-message": "Request could not be completed because the relevant data model content does not exist"},
305 res['errors']['error'])
307 def test_20_service_path_delete_rdm_check(self):
308 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
309 self.assertEqual(response.status_code, requests.codes.conflict)
310 res = response.json()
312 "error-type": "application",
313 "error-tag": "data-missing",
314 "error-message": "Request could not be completed because the relevant data model content does not exist"},
315 res['errors']['error'])
317 def test_21_service_path_delete_rdm_check(self):
318 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
319 self.assertEqual(response.status_code, requests.codes.conflict)
320 res = response.json()
322 "error-type": "application",
323 "error-tag": "data-missing",
324 "error-message": "Request could not be completed because the relevant data model content does not exist"},
325 res['errors']['error'])
327 def test_22_service_path_delete_rdm_check(self):
328 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
329 self.assertEqual(response.status_code, requests.codes.conflict)
330 res = response.json()
332 "error-type": "application",
333 "error-tag": "data-missing",
334 "error-message": "Request could not be completed because the relevant data model content does not exist"},
335 res['errors']['error'])
337 def test_23_service_path_delete_xpdr_check(self):
338 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
339 self.assertEqual(response.status_code, requests.codes.conflict)
340 res = response.json()
342 "error-type": "application",
343 "error-tag": "data-missing",
344 "error-message": "Request could not be completed because the relevant data model content does not exist"},
345 res['errors']['error'])
347 def test_24_service_path_delete_xpdr_check(self):
348 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
349 self.assertEqual(response.status_code, requests.codes.conflict)
350 res = response.json()
352 "error-type": "application",
353 "error-tag": "data-missing",
354 "error-message": "Request could not be completed because the relevant data model content does not exist"},
355 res['errors']['error'])
357 def test_25_service_path_delete_xpdr_check(self):
358 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
359 self.assertEqual(response.status_code, requests.codes.conflict)
360 res = response.json()
362 "error-type": "application",
363 "error-tag": "data-missing",
364 "error-message": "Request could not be completed because the relevant data model content does not exist"},
365 res['errors']['error'])
367 def test_26_service_path_delete_xpdr_check(self):
368 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
369 self.assertEqual(response.status_code, requests.codes.conflict)
370 res = response.json()
372 "error-type": "application",
373 "error-tag": "data-missing",
374 "error-message": "Request could not be completed because the relevant data model content does not exist"},
375 res['errors']['error'])
377 def test_27_service_path_delete_xpdr_check(self):
378 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
381 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
383 def test_28_service_path_delete_xpdr_check(self):
384 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
387 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
389 def test_29_rdm_device_disconnected(self):
390 response = test_utils.unmount_device("ROADM-A1")
391 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
393 def test_30_xpdr_device_disconnected(self):
394 response = test_utils.unmount_device("XPDR-A1")
395 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
398 if __name__ == "__main__":
399 unittest.main(verbosity=2, failfast=False)