3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 #from unittest.result import failfast
18 from common import test_utils
21 class TransportPCERendererTesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
31 def tearDownClass(cls):
32 # pylint: disable=not-an-iterable
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
37 def test_01_rdm_device_connected(self):
38 response = test_utils.mount_device("ROADM-A1", 'roadma')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
41 def test_02_xpdr_device_connected(self):
42 response = test_utils.mount_device("XPDR-A1", 'xpdra')
43 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
45 def test_03_rdm_portmapping(self):
46 response = test_utils.portmapping_request("ROADM-A1")
47 self.assertEqual(response.status_code, requests.codes.ok)
50 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
51 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
52 res['nodes'][0]['mapping'])
54 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
55 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
56 res['nodes'][0]['mapping'])
58 def test_04_xpdr_portmapping(self):
59 response = test_utils.portmapping_request("XPDR-A1")
60 self.assertEqual(response.status_code, requests.codes.ok)
63 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
64 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
65 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
66 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
67 'lcp-hash-val': 'AMkDwQ7xTmRI'},
68 res['nodes'][0]['mapping'])
70 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
71 'supporting-port': 'C1',
72 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
73 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
74 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
75 'lcp-hash-val': 'AJUUr6I5fALj'},
76 res['nodes'][0]['mapping'])
78 def test_05_service_path_create(self):
79 response = test_utils.service_path_request("create", "service_test", "7",
80 [{"renderer:node-id": "ROADM-A1",
81 "renderer:src-tp": "SRG1-PP3-TXRX",
82 "renderer:dest-tp": "DEG1-TTP-TXRX"},
83 {"renderer:node-id": "XPDR-A1",
84 "renderer:src-tp": "XPDR1-CLIENT1",
85 "renderer:dest-tp": "XPDR1-NETWORK1"}])
86 self.assertEqual(response.status_code, requests.codes.ok)
88 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
90 def test_06_service_path_create_rdm_check(self):
91 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
92 self.assertEqual(response.status_code, requests.codes.ok)
94 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
97 'name': 'DEG1-TTP-TXRX-nmc-7',
98 'administrative-state': 'inService',
99 'supporting-circuit-pack-name': '1/0',
100 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
101 'supporting-port': 'L1'
102 }, **res['interface'][0]),
105 self.assertDictEqual(
106 {u'frequency': 195.8, u'width': 40},
107 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
109 def test_07_service_path_create_rdm_check(self):
110 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
111 self.assertEqual(response.status_code, requests.codes.ok)
112 res = response.json()
113 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
114 self.assertDictEqual(
116 'name': 'DEG1-TTP-TXRX-mc-7',
117 'administrative-state': 'inService',
118 'supporting-circuit-pack-name': '1/0',
119 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
120 'supporting-port': 'L1'
121 }, **res['interface'][0]),
124 self.assertDictEqual(
125 {u'min-freq': 195.775, u'max-freq': 195.825},
126 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
128 def test_08_service_path_create_rdm_check(self):
129 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
130 self.assertEqual(response.status_code, requests.codes.ok)
131 res = response.json()
132 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
133 self.assertDictEqual(
135 'name': 'SRG1-PP3-TXRX-nmc-7',
136 'administrative-state': 'inService',
137 'supporting-circuit-pack-name': '3/0',
138 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
139 'supporting-port': 'C3'
140 }, **res['interface'][0]),
143 self.assertDictEqual(
144 {u'frequency': 195.8, u'width': 40},
145 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
147 # -mc supporting interfaces must not be created for SRG, only degrees
148 def test_09_service_path_create_rdm_check(self):
149 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
150 self.assertEqual(response.status_code, requests.codes.conflict)
151 res = response.json()
153 {"error-type": "application", "error-tag": "data-missing",
154 "error-message": "Request could not be completed because the relevant data model content does not exist"},
155 res['errors']['error'])
157 def test_10_service_path_create_rdm_check(self):
158 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
162 self.assertDictEqual(
164 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-7',
165 'opticalControlMode': 'off'
166 }, **res['roadm-connections'][0]),
167 res['roadm-connections'][0]
169 self.assertDictEqual(
170 {'src-if': 'SRG1-PP3-TXRX-nmc-7'},
171 res['roadm-connections'][0]['source'])
172 self.assertDictEqual(
173 {'dst-if': 'DEG1-TTP-TXRX-nmc-7'},
174 res['roadm-connections'][0]['destination'])
176 def test_11_service_path_create_xpdr_check(self):
177 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
181 self.assertDictEqual(
183 'name': 'XPDR1-NETWORK1-7',
184 'administrative-state': 'inService',
185 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
186 'type': 'org-openroadm-interfaces:opticalChannel',
187 'supporting-port': '1'
188 }, **res['interface'][0]),
191 self.assertDictEqual(
192 {u'rate': u'org-openroadm-common-types:R100G',
193 u'transmit-power': -5,
194 u'modulation-format': 'dp-qpsk',
195 u'frequency': 195.8},
196 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
198 def test_12_service_path_create_xpdr_check(self):
199 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
200 self.assertEqual(response.status_code, requests.codes.ok)
201 res = response.json()
202 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
203 self.assertDictEqual(
205 'name': 'XPDR1-NETWORK1-OTU',
206 'administrative-state': 'inService',
207 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
208 'type': 'org-openroadm-interfaces:otnOtu',
209 'supporting-port': '1',
210 'supporting-interface': 'XPDR1-NETWORK1-7'
211 }, **res['interface'][0]),
214 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
215 'expected-dapi': 'AMkDwQ7xTmRI',
216 'rate': 'org-openroadm-otn-common-types:OTU4',
218 self.assertDictEqual(input_dict_2,
219 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
221 def test_13_service_path_create_xpdr_check(self):
222 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
226 self.assertDictEqual(
228 'name': 'XPDR1-NETWORK1-ODU',
229 'administrative-state': 'inService',
230 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
231 'type': 'org-openroadm-interfaces:otnOdu',
232 'supporting-port': '1',
233 'supporting-interface': 'XPDR1-NETWORK1-OTU'
234 }, **res['interface'][0]),
237 self.assertDictEqual(
239 'rate': 'org-openroadm-otn-common-types:ODU4',
240 u'monitoring-mode': u'terminated'
241 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
242 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
244 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
245 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
247 def test_14_service_path_create_xpdr_check(self):
248 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
252 self.assertDictEqual(
254 'name': 'XPDR1-CLIENT1-ETHERNET',
255 'administrative-state': 'inService',
256 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
257 'type': 'org-openroadm-interfaces:ethernetCsmacd',
258 'supporting-port': 'C1'
259 }, **res['interface'][0]),
262 self.assertDictEqual(
263 {u'fec': u'off', u'speed': 100000},
264 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
266 def test_15_service_path_create_xpdr_check(self):
267 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
268 self.assertEqual(response.status_code, requests.codes.ok)
269 res = response.json()
270 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
272 def test_16_service_path_create_xpdr_check(self):
273 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
276 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
278 def test_17_service_path_delete(self):
279 response = test_utils.service_path_request("delete", "service_test", "7",
280 [{"renderer:node-id": "ROADM-A1",
281 "renderer:src-tp": "SRG1-PP3-TXRX",
282 "renderer:dest-tp": "DEG1-TTP-TXRX"},
283 {"renderer:node-id": "XPDR-A1",
284 "renderer:src-tp": "XPDR1-CLIENT1",
285 "renderer:dest-tp": "XPDR1-NETWORK1"}])
286 self.assertEqual(response.status_code, requests.codes.ok)
287 self.assertEqual(response.json(), {
288 'output': {'result': 'Request processed', 'success': True}})
290 def test_18_service_path_delete_rdm_check(self):
291 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
292 self.assertEqual(response.status_code, requests.codes.conflict)
293 res = response.json()
295 {"error-type": "application", "error-tag": "data-missing",
296 "error-message": "Request could not be completed because the relevant data model content does not exist"},
297 res['errors']['error'])
299 def test_19_service_path_delete_rdm_check(self):
300 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
301 self.assertEqual(response.status_code, requests.codes.conflict)
302 res = response.json()
304 {"error-type": "application", "error-tag": "data-missing",
305 "error-message": "Request could not be completed because the relevant data model content does not exist"},
306 res['errors']['error'])
308 def test_20_service_path_delete_rdm_check(self):
309 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
310 self.assertEqual(response.status_code, requests.codes.conflict)
311 res = response.json()
313 "error-type": "application",
314 "error-tag": "data-missing",
315 "error-message": "Request could not be completed because the relevant data model content does not exist"},
316 res['errors']['error'])
318 def test_21_service_path_delete_rdm_check(self):
319 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
320 self.assertEqual(response.status_code, requests.codes.conflict)
321 res = response.json()
323 "error-type": "application",
324 "error-tag": "data-missing",
325 "error-message": "Request could not be completed because the relevant data model content does not exist"},
326 res['errors']['error'])
328 def test_22_service_path_delete_rdm_check(self):
329 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
330 self.assertEqual(response.status_code, requests.codes.conflict)
331 res = response.json()
333 "error-type": "application",
334 "error-tag": "data-missing",
335 "error-message": "Request could not be completed because the relevant data model content does not exist"},
336 res['errors']['error'])
338 def test_23_service_path_delete_xpdr_check(self):
339 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
340 self.assertEqual(response.status_code, requests.codes.conflict)
341 res = response.json()
343 "error-type": "application",
344 "error-tag": "data-missing",
345 "error-message": "Request could not be completed because the relevant data model content does not exist"},
346 res['errors']['error'])
348 def test_24_service_path_delete_xpdr_check(self):
349 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
350 self.assertEqual(response.status_code, requests.codes.conflict)
351 res = response.json()
353 "error-type": "application",
354 "error-tag": "data-missing",
355 "error-message": "Request could not be completed because the relevant data model content does not exist"},
356 res['errors']['error'])
358 def test_25_service_path_delete_xpdr_check(self):
359 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
360 self.assertEqual(response.status_code, requests.codes.conflict)
361 res = response.json()
363 "error-type": "application",
364 "error-tag": "data-missing",
365 "error-message": "Request could not be completed because the relevant data model content does not exist"},
366 res['errors']['error'])
368 def test_26_service_path_delete_xpdr_check(self):
369 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
370 self.assertEqual(response.status_code, requests.codes.conflict)
371 res = response.json()
373 "error-type": "application",
374 "error-tag": "data-missing",
375 "error-message": "Request could not be completed because the relevant data model content does not exist"},
376 res['errors']['error'])
378 def test_27_service_path_delete_xpdr_check(self):
379 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
384 def test_28_service_path_delete_xpdr_check(self):
385 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
386 self.assertEqual(response.status_code, requests.codes.ok)
387 res = response.json()
388 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
390 def test_29_rdm_device_disconnected(self):
391 response = test_utils.unmount_device("ROADM-A1")
392 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
394 def test_30_xpdr_device_disconnected(self):
395 response = test_utils.unmount_device("XPDR-A1")
396 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
399 if __name__ == "__main__":
400 unittest.main(verbosity=2, failfast=False)