3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 #from unittest.result import failfast
18 from common import test_utils
21 class TransportPCERendererTesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
31 def tearDownClass(cls):
32 for process in cls.processes:
33 test_utils.shutdown_process(process)
34 print("all processes killed")
36 def test_01_rdm_device_connected(self):
37 response = test_utils.mount_device("ROADM-A1", 'roadma')
38 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40 def test_02_xpdr_device_connected(self):
41 response = test_utils.mount_device("XPDR-A1", 'xpdra')
42 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
44 def test_03_rdm_portmapping(self):
45 response = test_utils.portmapping_request("ROADM-A1")
46 self.assertEqual(response.status_code, requests.codes.ok)
49 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
50 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
51 res['nodes'][0]['mapping'])
53 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
54 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
55 res['nodes'][0]['mapping'])
57 def test_04_xpdr_portmapping(self):
58 response = test_utils.portmapping_request("XPDR-A1")
59 self.assertEqual(response.status_code, requests.codes.ok)
62 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
63 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
64 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
65 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
66 'lcp-hash-val': 'AMkDwQ7xTmRI'},
67 res['nodes'][0]['mapping'])
69 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
70 'supporting-port': 'C1',
71 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
72 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
73 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
74 'lcp-hash-val': 'AJUUr6I5fALj'},
75 res['nodes'][0]['mapping'])
77 def test_05_service_path_create(self):
78 response = test_utils.service_path_request("create", "service_test", "7",
79 [{"renderer:node-id": "ROADM-A1",
80 "renderer:src-tp": "SRG1-PP3-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
81 {"renderer:node-id": "XPDR-A1",
82 "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
83 self.assertEqual(response.status_code, requests.codes.ok)
85 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
87 def test_06_service_path_create_rdm_check(self):
88 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
89 self.assertEqual(response.status_code, requests.codes.ok)
91 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
94 'name': 'DEG1-TTP-TXRX-nmc-7',
95 'administrative-state': 'inService',
96 'supporting-circuit-pack-name': '1/0',
97 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
98 'supporting-port': 'L1'
99 }, **res['interface'][0]),
102 self.assertDictEqual(
103 {u'frequency': 195.8, u'width': 40},
104 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
106 def test_07_service_path_create_rdm_check(self):
107 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
110 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
111 self.assertDictEqual(
113 'name': 'DEG1-TTP-TXRX-mc-7',
114 'administrative-state': 'inService',
115 'supporting-circuit-pack-name': '1/0',
116 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
117 'supporting-port': 'L1'
118 }, **res['interface'][0]),
121 self.assertDictEqual(
122 {u'min-freq': 195.775, u'max-freq': 195.825},
123 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
125 def test_08_service_path_create_rdm_check(self):
126 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
127 self.assertEqual(response.status_code, requests.codes.ok)
128 res = response.json()
129 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
130 self.assertDictEqual(
132 'name': 'SRG1-PP3-TXRX-nmc-7',
133 'administrative-state': 'inService',
134 'supporting-circuit-pack-name': '3/0',
135 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
136 'supporting-port': 'C3'
137 }, **res['interface'][0]),
140 self.assertDictEqual(
141 {u'frequency': 195.8, u'width': 40},
142 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
144 # -mc supporting interfaces must not be created for SRG, only degrees
145 def test_09_service_path_create_rdm_check(self):
146 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
147 self.assertEqual(response.status_code, requests.codes.conflict)
148 res = response.json()
150 {"error-type": "application", "error-tag": "data-missing",
151 "error-message": "Request could not be completed because the relevant data model content does not exist"},
152 res['errors']['error'])
154 def test_10_service_path_create_rdm_check(self):
155 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
159 self.assertDictEqual(
161 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-7',
162 'opticalControlMode': 'off'
163 }, **res['roadm-connections'][0]),
164 res['roadm-connections'][0]
166 self.assertDictEqual(
167 {'src-if': 'SRG1-PP3-TXRX-nmc-7'},
168 res['roadm-connections'][0]['source'])
169 self.assertDictEqual(
170 {'dst-if': 'DEG1-TTP-TXRX-nmc-7'},
171 res['roadm-connections'][0]['destination'])
173 def test_11_service_path_create_xpdr_check(self):
174 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
178 self.assertDictEqual(
180 'name': 'XPDR1-NETWORK1-7',
181 'administrative-state': 'inService',
182 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
183 'type': 'org-openroadm-interfaces:opticalChannel',
184 'supporting-port': '1'
185 }, **res['interface'][0]),
188 self.assertDictEqual(
189 {u'rate': u'org-openroadm-common-types:R100G',
190 u'transmit-power': -5,
191 u'modulation-format': 'dp-qpsk',
192 u'frequency': 195.8},
193 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
195 def test_12_service_path_create_xpdr_check(self):
196 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
199 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
200 self.assertDictEqual(
202 'name': 'XPDR1-NETWORK1-OTU',
203 'administrative-state': 'inService',
204 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
205 'type': 'org-openroadm-interfaces:otnOtu',
206 'supporting-port': '1',
207 'supporting-interface': 'XPDR1-NETWORK1-7'
208 }, **res['interface'][0]),
211 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
212 'expected-dapi': 'AMkDwQ7xTmRI',
213 'rate': 'org-openroadm-otn-common-types:OTU4',
215 self.assertDictEqual(input_dict_2,
216 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
218 def test_13_service_path_create_xpdr_check(self):
219 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
222 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
223 self.assertDictEqual(
225 'name': 'XPDR1-NETWORK1-ODU',
226 'administrative-state': 'inService',
227 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
228 'type': 'org-openroadm-interfaces:otnOdu',
229 'supporting-port': '1',
230 'supporting-interface': 'XPDR1-NETWORK1-OTU'
231 }, **res['interface'][0]),
234 self.assertDictEqual(
236 'rate': 'org-openroadm-otn-common-types:ODU4',
237 u'monitoring-mode': u'terminated'
238 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
239 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
241 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
242 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
244 def test_14_service_path_create_xpdr_check(self):
245 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
248 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
249 self.assertDictEqual(
251 'name': 'XPDR1-CLIENT1-ETHERNET',
252 'administrative-state': 'inService',
253 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
254 'type': 'org-openroadm-interfaces:ethernetCsmacd',
255 'supporting-port': 'C1'
256 }, **res['interface'][0]),
259 self.assertDictEqual(
260 {u'fec': u'off', u'speed': 100000},
261 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
263 def test_15_service_path_create_xpdr_check(self):
264 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
265 self.assertEqual(response.status_code, requests.codes.ok)
266 res = response.json()
267 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
269 def test_16_service_path_create_xpdr_check(self):
270 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
275 def test_17_service_path_delete(self):
276 response = test_utils.service_path_request("delete", "service_test", "7",
277 [{"renderer:node-id": "ROADM-A1",
278 "renderer:src-tp": "SRG1-PP3-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
279 {"renderer:node-id": "XPDR-A1",
280 "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
281 self.assertEqual(response.status_code, requests.codes.ok)
282 self.assertEqual(response.json(), {
283 'output': {'result': 'Request processed', 'success': True}})
285 def test_18_service_path_delete_rdm_check(self):
286 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-7")
287 self.assertEqual(response.status_code, requests.codes.conflict)
288 res = response.json()
290 {"error-type": "application", "error-tag": "data-missing",
291 "error-message": "Request could not be completed because the relevant data model content does not exist"},
292 res['errors']['error'])
294 def test_19_service_path_delete_rdm_check(self):
295 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-7")
296 self.assertEqual(response.status_code, requests.codes.conflict)
297 res = response.json()
299 {"error-type": "application", "error-tag": "data-missing",
300 "error-message": "Request could not be completed because the relevant data model content does not exist"},
301 res['errors']['error'])
303 def test_20_service_path_delete_rdm_check(self):
304 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-7")
305 self.assertEqual(response.status_code, requests.codes.conflict)
306 res = response.json()
308 "error-type": "application",
309 "error-tag": "data-missing",
310 "error-message": "Request could not be completed because the relevant data model content does not exist"},
311 res['errors']['error'])
313 def test_21_service_path_delete_rdm_check(self):
314 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-7")
315 self.assertEqual(response.status_code, requests.codes.conflict)
316 res = response.json()
318 "error-type": "application",
319 "error-tag": "data-missing",
320 "error-message": "Request could not be completed because the relevant data model content does not exist"},
321 res['errors']['error'])
323 def test_22_service_path_delete_rdm_check(self):
324 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-7")
325 self.assertEqual(response.status_code, requests.codes.conflict)
326 res = response.json()
328 "error-type": "application",
329 "error-tag": "data-missing",
330 "error-message": "Request could not be completed because the relevant data model content does not exist"},
331 res['errors']['error'])
333 def test_23_service_path_delete_xpdr_check(self):
334 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-7")
335 self.assertEqual(response.status_code, requests.codes.conflict)
336 res = response.json()
338 "error-type": "application",
339 "error-tag": "data-missing",
340 "error-message": "Request could not be completed because the relevant data model content does not exist"},
341 res['errors']['error'])
343 def test_24_service_path_delete_xpdr_check(self):
344 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
345 self.assertEqual(response.status_code, requests.codes.conflict)
346 res = response.json()
348 "error-type": "application",
349 "error-tag": "data-missing",
350 "error-message": "Request could not be completed because the relevant data model content does not exist"},
351 res['errors']['error'])
353 def test_25_service_path_delete_xpdr_check(self):
354 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
355 self.assertEqual(response.status_code, requests.codes.conflict)
356 res = response.json()
358 "error-type": "application",
359 "error-tag": "data-missing",
360 "error-message": "Request could not be completed because the relevant data model content does not exist"},
361 res['errors']['error'])
363 def test_26_service_path_delete_xpdr_check(self):
364 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
365 self.assertEqual(response.status_code, requests.codes.conflict)
366 res = response.json()
368 "error-type": "application",
369 "error-tag": "data-missing",
370 "error-message": "Request could not be completed because the relevant data model content does not exist"},
371 res['errors']['error'])
373 def test_27_service_path_delete_xpdr_check(self):
374 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
375 self.assertEqual(response.status_code, requests.codes.ok)
376 res = response.json()
377 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
379 def test_28_service_path_delete_xpdr_check(self):
380 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
381 self.assertEqual(response.status_code, requests.codes.ok)
382 res = response.json()
383 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
385 def test_29_rdm_device_disconnected(self):
386 response = test_utils.unmount_device("ROADM-A1")
387 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
389 def test_30_xpdr_device_disconnected(self):
390 response = test_utils.unmount_device("XPDR-A1")
391 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
394 if __name__ == "__main__":
395 unittest.main(verbosity=2, failfast=False)