3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 #from unittest.result import failfast
18 from common import test_utils
21 class TransportPCERendererTesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
31 def tearDownClass(cls):
32 # pylint: disable=not-an-iterable
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
37 def test_01_rdm_device_connected(self):
38 response = test_utils.mount_device("ROADM-A1", 'roadma')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
41 def test_02_xpdr_device_connected(self):
42 response = test_utils.mount_device("XPDR-A1", 'xpdra')
43 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
45 def test_03_rdm_portmapping(self):
46 response = test_utils.portmapping_request("ROADM-A1")
47 self.assertEqual(response.status_code, requests.codes.ok)
50 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
51 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
52 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
53 res['nodes'][0]['mapping'])
55 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
56 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional',
57 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
58 res['nodes'][0]['mapping'])
60 def test_04_xpdr_portmapping(self):
61 response = test_utils.portmapping_request("XPDR-A1")
62 self.assertEqual(response.status_code, requests.codes.ok)
65 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
66 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
67 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
68 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
69 'lcp-hash-val': 'AMkDwQ7xTmRI',
70 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
71 res['nodes'][0]['mapping'])
73 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
74 'supporting-port': 'C1',
75 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
76 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
77 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
78 'lcp-hash-val': 'AJUUr6I5fALj',
79 'port-admin-state': 'InService', 'port-oper-state': 'InService'},
80 res['nodes'][0]['mapping'])
82 def test_05_service_path_create(self):
83 response = test_utils.service_path_request("create", "service_test", "7",
84 [{"renderer:node-id": "ROADM-A1",
85 "renderer:src-tp": "SRG1-PP3-TXRX",
86 "renderer:dest-tp": "DEG1-TTP-TXRX"},
87 {"renderer:node-id": "XPDR-A1",
88 "renderer:src-tp": "XPDR1-CLIENT1",
89 "renderer:dest-tp": "XPDR1-NETWORK1"}],
90 195.8, 40, 195.775, 195.825, 713,
92 self.assertEqual(response.status_code, requests.codes.ok)
94 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
96 def test_06_service_path_create_rdm_check(self):
97 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-713:720")
98 self.assertEqual(response.status_code, requests.codes.ok)
100 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
101 self.assertDictEqual(
103 'name': 'DEG1-TTP-TXRX-nmc-713:720',
104 'administrative-state': 'inService',
105 'supporting-circuit-pack-name': '1/0',
106 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
107 'supporting-port': 'L1'
108 }, **res['interface'][0]),
111 self.assertDictEqual(
112 {u'frequency': 195.8, u'width': 40},
113 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
115 def test_07_service_path_create_rdm_check(self):
116 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-713:720")
117 self.assertEqual(response.status_code, requests.codes.ok)
118 res = response.json()
119 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
120 self.assertDictEqual(
122 'name': 'DEG1-TTP-TXRX-mc-7',
123 'administrative-state': 'inService',
124 'supporting-circuit-pack-name': '1/0',
125 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
126 'supporting-port': 'L1'
127 }, **res['interface'][0]),
130 self.assertDictEqual(
131 {u'min-freq': 195.775, u'max-freq': 195.825},
132 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
134 def test_08_service_path_create_rdm_check(self):
135 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-713:720")
136 self.assertEqual(response.status_code, requests.codes.ok)
137 res = response.json()
138 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
139 self.assertDictEqual(
141 'name': 'SRG1-PP3-TXRX-nmc-713:720',
142 'administrative-state': 'inService',
143 'supporting-circuit-pack-name': '3/0',
144 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
145 'supporting-port': 'C3'
146 }, **res['interface'][0]),
149 self.assertDictEqual(
150 {u'frequency': 195.8, u'width': 40},
151 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
153 # -mc supporting interfaces must not be created for SRG, only degrees
154 def test_09_service_path_create_rdm_check(self):
155 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-713:720")
156 self.assertEqual(response.status_code, requests.codes.conflict)
157 res = response.json()
159 {"error-type": "application", "error-tag": "data-missing",
160 "error-message": "Request could not be completed because the relevant data model content does not exist"},
161 res['errors']['error'])
163 def test_10_service_path_create_rdm_check(self):
164 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
168 self.assertDictEqual(
170 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720',
171 'opticalControlMode': 'off'
172 }, **res['roadm-connections'][0]),
173 res['roadm-connections'][0]
175 self.assertDictEqual(
176 {'src-if': 'SRG1-PP3-TXRX-nmc-713:720'},
177 res['roadm-connections'][0]['source'])
178 self.assertDictEqual(
179 {'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'},
180 res['roadm-connections'][0]['destination'])
182 def test_11_service_path_create_xpdr_check(self):
183 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-713:720")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
187 self.assertDictEqual(
189 'name': 'XPDR1-NETWORK1-713:720',
190 'administrative-state': 'inService',
191 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
192 'type': 'org-openroadm-interfaces:opticalChannel',
193 'supporting-port': '1'
194 }, **res['interface'][0]),
197 self.assertDictEqual(
198 {u'rate': u'org-openroadm-common-types:R100G',
199 u'transmit-power': -5,
200 u'modulation-format': 'dp-qpsk',
201 u'frequency': 195.8},
202 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
204 def test_12_service_path_create_xpdr_check(self):
205 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
209 self.assertDictEqual(
211 'name': 'XPDR1-NETWORK1-OTU',
212 'administrative-state': 'inService',
213 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
214 'type': 'org-openroadm-interfaces:otnOtu',
215 'supporting-port': '1',
216 'supporting-interface': 'XPDR1-NETWORK1-7'
217 }, **res['interface'][0]),
220 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
221 'expected-dapi': 'AMkDwQ7xTmRI',
222 'rate': 'org-openroadm-otn-common-types:OTU4',
224 self.assertDictEqual(input_dict_2,
225 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
227 def test_13_service_path_create_xpdr_check(self):
228 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
229 self.assertEqual(response.status_code, requests.codes.ok)
230 res = response.json()
231 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
232 self.assertDictEqual(
234 'name': 'XPDR1-NETWORK1-ODU',
235 'administrative-state': 'inService',
236 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
237 'type': 'org-openroadm-interfaces:otnOdu',
238 'supporting-port': '1',
239 'supporting-interface': 'XPDR1-NETWORK1-OTU'
240 }, **res['interface'][0]),
243 self.assertDictEqual(
245 'rate': 'org-openroadm-otn-common-types:ODU4',
246 u'monitoring-mode': u'terminated'
247 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
248 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
250 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
251 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
253 def test_14_service_path_create_xpdr_check(self):
254 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
258 self.assertDictEqual(
260 'name': 'XPDR1-CLIENT1-ETHERNET',
261 'administrative-state': 'inService',
262 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
263 'type': 'org-openroadm-interfaces:ethernetCsmacd',
264 'supporting-port': 'C1'
265 }, **res['interface'][0]),
268 self.assertDictEqual(
269 {u'fec': u'off', u'speed': 100000},
270 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
272 def test_15_service_path_create_xpdr_check(self):
273 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
276 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
278 def test_16_service_path_create_xpdr_check(self):
279 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
284 def test_17_service_path_delete(self):
285 response = test_utils.service_path_request("delete", "service_test", "7",
286 [{"renderer:node-id": "ROADM-A1",
287 "renderer:src-tp": "SRG1-PP3-TXRX",
288 "renderer:dest-tp": "DEG1-TTP-TXRX"},
289 {"renderer:node-id": "XPDR-A1",
290 "renderer:src-tp": "XPDR1-CLIENT1",
291 "renderer:dest-tp": "XPDR1-NETWORK1"}],
292 195.8, 40, 195.775, 195.825, 713,
294 self.assertEqual(response.status_code, requests.codes.ok)
295 self.assertEqual(response.json(), {
296 'output': {'result': 'Request processed', 'success': True}})
298 def test_18_service_path_delete_rdm_check(self):
299 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-713:720")
300 self.assertEqual(response.status_code, requests.codes.conflict)
301 res = response.json()
303 {"error-type": "application", "error-tag": "data-missing",
304 "error-message": "Request could not be completed because the relevant data model content does not exist"},
305 res['errors']['error'])
307 def test_19_service_path_delete_rdm_check(self):
308 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-713:720")
309 self.assertEqual(response.status_code, requests.codes.conflict)
310 res = response.json()
312 {"error-type": "application", "error-tag": "data-missing",
313 "error-message": "Request could not be completed because the relevant data model content does not exist"},
314 res['errors']['error'])
316 def test_20_service_path_delete_rdm_check(self):
317 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-713:720")
318 self.assertEqual(response.status_code, requests.codes.conflict)
319 res = response.json()
321 "error-type": "application",
322 "error-tag": "data-missing",
323 "error-message": "Request could not be completed because the relevant data model content does not exist"},
324 res['errors']['error'])
326 def test_21_service_path_delete_rdm_check(self):
327 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-713:720")
328 self.assertEqual(response.status_code, requests.codes.conflict)
329 res = response.json()
331 "error-type": "application",
332 "error-tag": "data-missing",
333 "error-message": "Request could not be completed because the relevant data model content does not exist"},
334 res['errors']['error'])
336 def test_22_service_path_delete_rdm_check(self):
337 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
338 self.assertEqual(response.status_code, requests.codes.conflict)
339 res = response.json()
341 "error-type": "application",
342 "error-tag": "data-missing",
343 "error-message": "Request could not be completed because the relevant data model content does not exist"},
344 res['errors']['error'])
346 def test_23_service_path_delete_xpdr_check(self):
347 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-713:720")
348 self.assertEqual(response.status_code, requests.codes.conflict)
349 res = response.json()
351 "error-type": "application",
352 "error-tag": "data-missing",
353 "error-message": "Request could not be completed because the relevant data model content does not exist"},
354 res['errors']['error'])
356 def test_24_service_path_delete_xpdr_check(self):
357 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
358 self.assertEqual(response.status_code, requests.codes.conflict)
359 res = response.json()
361 "error-type": "application",
362 "error-tag": "data-missing",
363 "error-message": "Request could not be completed because the relevant data model content does not exist"},
364 res['errors']['error'])
366 def test_25_service_path_delete_xpdr_check(self):
367 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
368 self.assertEqual(response.status_code, requests.codes.conflict)
369 res = response.json()
371 "error-type": "application",
372 "error-tag": "data-missing",
373 "error-message": "Request could not be completed because the relevant data model content does not exist"},
374 res['errors']['error'])
376 def test_26_service_path_delete_xpdr_check(self):
377 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
378 self.assertEqual(response.status_code, requests.codes.conflict)
379 res = response.json()
381 "error-type": "application",
382 "error-tag": "data-missing",
383 "error-message": "Request could not be completed because the relevant data model content does not exist"},
384 res['errors']['error'])
386 def test_27_service_path_delete_xpdr_check(self):
387 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
392 def test_28_service_path_delete_xpdr_check(self):
393 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
398 def test_29_rdm_device_disconnected(self):
399 response = test_utils.unmount_device("ROADM-A1")
400 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
402 def test_30_xpdr_device_disconnected(self):
403 response = test_utils.unmount_device("XPDR-A1")
404 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
407 if __name__ == "__main__":
408 unittest.main(verbosity=2, failfast=False)