3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
16 #from unittest.result import failfast
18 from common import test_utils
21 class TransportPCERendererTesting(unittest.TestCase):
27 cls.processes = test_utils.start_tpce()
28 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
31 def tearDownClass(cls):
32 # pylint: disable=not-an-iterable
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
37 def test_01_rdm_device_connected(self):
38 response = test_utils.mount_device("ROADM-A1", 'roadma')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
41 def test_02_xpdr_device_connected(self):
42 response = test_utils.mount_device("XPDR-A1", 'xpdra')
43 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
45 def test_03_rdm_portmapping(self):
46 response = test_utils.portmapping_request("ROADM-A1")
47 self.assertEqual(response.status_code, requests.codes.ok)
50 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
51 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
52 res['nodes'][0]['mapping'])
54 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
55 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
56 res['nodes'][0]['mapping'])
58 def test_04_xpdr_portmapping(self):
59 response = test_utils.portmapping_request("XPDR-A1")
60 self.assertEqual(response.status_code, requests.codes.ok)
63 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
64 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
65 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
66 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
67 'lcp-hash-val': 'AMkDwQ7xTmRI'},
68 res['nodes'][0]['mapping'])
70 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
71 'supporting-port': 'C1',
72 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
73 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
74 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
75 'lcp-hash-val': 'AJUUr6I5fALj'},
76 res['nodes'][0]['mapping'])
78 def test_05_service_path_create(self):
79 response = test_utils.service_path_request("create", "service_test", "7",
80 [{"renderer:node-id": "ROADM-A1",
81 "renderer:src-tp": "SRG1-PP3-TXRX",
82 "renderer:dest-tp": "DEG1-TTP-TXRX"},
83 {"renderer:node-id": "XPDR-A1",
84 "renderer:src-tp": "XPDR1-CLIENT1",
85 "renderer:dest-tp": "XPDR1-NETWORK1"}],
86 195.8, 40, 195.775, 195.825, 713,
88 self.assertEqual(response.status_code, requests.codes.ok)
90 self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
92 def test_06_service_path_create_rdm_check(self):
93 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-713:720")
94 self.assertEqual(response.status_code, requests.codes.ok)
96 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
99 'name': 'DEG1-TTP-TXRX-nmc-713:720',
100 'administrative-state': 'inService',
101 'supporting-circuit-pack-name': '1/0',
102 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
103 'supporting-port': 'L1'
104 }, **res['interface'][0]),
107 self.assertDictEqual(
108 {u'frequency': 195.8, u'width': 40},
109 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
111 def test_07_service_path_create_rdm_check(self):
112 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-713:720")
113 self.assertEqual(response.status_code, requests.codes.ok)
114 res = response.json()
115 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
116 self.assertDictEqual(
118 'name': 'DEG1-TTP-TXRX-mc-7',
119 'administrative-state': 'inService',
120 'supporting-circuit-pack-name': '1/0',
121 'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
122 'supporting-port': 'L1'
123 }, **res['interface'][0]),
126 self.assertDictEqual(
127 {u'min-freq': 195.775, u'max-freq': 195.825},
128 res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
130 def test_08_service_path_create_rdm_check(self):
131 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-713:720")
132 self.assertEqual(response.status_code, requests.codes.ok)
133 res = response.json()
134 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
135 self.assertDictEqual(
137 'name': 'SRG1-PP3-TXRX-nmc-713:720',
138 'administrative-state': 'inService',
139 'supporting-circuit-pack-name': '3/0',
140 'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
141 'supporting-port': 'C3'
142 }, **res['interface'][0]),
145 self.assertDictEqual(
146 {u'frequency': 195.8, u'width': 40},
147 res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
149 # -mc supporting interfaces must not be created for SRG, only degrees
150 def test_09_service_path_create_rdm_check(self):
151 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-713:720")
152 self.assertEqual(response.status_code, requests.codes.conflict)
153 res = response.json()
155 {"error-type": "application", "error-tag": "data-missing",
156 "error-message": "Request could not be completed because the relevant data model content does not exist"},
157 res['errors']['error'])
159 def test_10_service_path_create_rdm_check(self):
160 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
164 self.assertDictEqual(
166 'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720',
167 'opticalControlMode': 'off'
168 }, **res['roadm-connections'][0]),
169 res['roadm-connections'][0]
171 self.assertDictEqual(
172 {'src-if': 'SRG1-PP3-TXRX-nmc-713:720'},
173 res['roadm-connections'][0]['source'])
174 self.assertDictEqual(
175 {'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'},
176 res['roadm-connections'][0]['destination'])
178 def test_11_service_path_create_xpdr_check(self):
179 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-713:720")
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
183 self.assertDictEqual(
185 'name': 'XPDR1-NETWORK1-713:720',
186 'administrative-state': 'inService',
187 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
188 'type': 'org-openroadm-interfaces:opticalChannel',
189 'supporting-port': '1'
190 }, **res['interface'][0]),
193 self.assertDictEqual(
194 {u'rate': u'org-openroadm-common-types:R100G',
195 u'transmit-power': -5,
196 u'modulation-format': 'dp-qpsk',
197 u'frequency': 195.8},
198 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
200 def test_12_service_path_create_xpdr_check(self):
201 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
202 self.assertEqual(response.status_code, requests.codes.ok)
203 res = response.json()
204 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
205 self.assertDictEqual(
207 'name': 'XPDR1-NETWORK1-OTU',
208 'administrative-state': 'inService',
209 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
210 'type': 'org-openroadm-interfaces:otnOtu',
211 'supporting-port': '1',
212 'supporting-interface': 'XPDR1-NETWORK1-7'
213 }, **res['interface'][0]),
216 input_dict_2 = {'tx-sapi': 'AMkDwQ7xTmRI',
217 'expected-dapi': 'AMkDwQ7xTmRI',
218 'rate': 'org-openroadm-otn-common-types:OTU4',
220 self.assertDictEqual(input_dict_2,
221 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
223 def test_13_service_path_create_xpdr_check(self):
224 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
228 self.assertDictEqual(
230 'name': 'XPDR1-NETWORK1-ODU',
231 'administrative-state': 'inService',
232 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
233 'type': 'org-openroadm-interfaces:otnOdu',
234 'supporting-port': '1',
235 'supporting-interface': 'XPDR1-NETWORK1-OTU'
236 }, **res['interface'][0]),
239 self.assertDictEqual(
241 'rate': 'org-openroadm-otn-common-types:ODU4',
242 u'monitoring-mode': u'terminated'
243 }, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
244 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
246 self.assertDictEqual({u'exp-payload-type': u'07', u'payload-type': u'07'},
247 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
249 def test_14_service_path_create_xpdr_check(self):
250 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
251 self.assertEqual(response.status_code, requests.codes.ok)
252 res = response.json()
253 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
254 self.assertDictEqual(
256 'name': 'XPDR1-CLIENT1-ETHERNET',
257 'administrative-state': 'inService',
258 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
259 'type': 'org-openroadm-interfaces:ethernetCsmacd',
260 'supporting-port': 'C1'
261 }, **res['interface'][0]),
264 self.assertDictEqual(
265 {u'fec': u'off', u'speed': 100000},
266 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
268 def test_15_service_path_create_xpdr_check(self):
269 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
270 self.assertEqual(response.status_code, requests.codes.ok)
271 res = response.json()
272 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
274 def test_16_service_path_create_xpdr_check(self):
275 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
280 def test_17_service_path_delete(self):
281 response = test_utils.service_path_request("delete", "service_test", "7",
282 [{"renderer:node-id": "ROADM-A1",
283 "renderer:src-tp": "SRG1-PP3-TXRX",
284 "renderer:dest-tp": "DEG1-TTP-TXRX"},
285 {"renderer:node-id": "XPDR-A1",
286 "renderer:src-tp": "XPDR1-CLIENT1",
287 "renderer:dest-tp": "XPDR1-NETWORK1"}],
288 195.8, 40, 195.775, 195.825, 713,
290 self.assertEqual(response.status_code, requests.codes.ok)
291 self.assertEqual(response.json(), {
292 'output': {'result': 'Request processed', 'success': True}})
294 def test_18_service_path_delete_rdm_check(self):
295 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-mc-713:720")
296 self.assertEqual(response.status_code, requests.codes.conflict)
297 res = response.json()
299 {"error-type": "application", "error-tag": "data-missing",
300 "error-message": "Request could not be completed because the relevant data model content does not exist"},
301 res['errors']['error'])
303 def test_19_service_path_delete_rdm_check(self):
304 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/DEG1-TTP-TXRX-nmc-713:720")
305 self.assertEqual(response.status_code, requests.codes.conflict)
306 res = response.json()
308 {"error-type": "application", "error-tag": "data-missing",
309 "error-message": "Request could not be completed because the relevant data model content does not exist"},
310 res['errors']['error'])
312 def test_20_service_path_delete_rdm_check(self):
313 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-mc-713:720")
314 self.assertEqual(response.status_code, requests.codes.conflict)
315 res = response.json()
317 "error-type": "application",
318 "error-tag": "data-missing",
319 "error-message": "Request could not be completed because the relevant data model content does not exist"},
320 res['errors']['error'])
322 def test_21_service_path_delete_rdm_check(self):
323 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-nmc-713:720")
324 self.assertEqual(response.status_code, requests.codes.conflict)
325 res = response.json()
327 "error-type": "application",
328 "error-tag": "data-missing",
329 "error-message": "Request could not be completed because the relevant data model content does not exist"},
330 res['errors']['error'])
332 def test_22_service_path_delete_rdm_check(self):
333 response = test_utils.check_netconf_node_request("ROADM-A1", "interface/SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
334 self.assertEqual(response.status_code, requests.codes.conflict)
335 res = response.json()
337 "error-type": "application",
338 "error-tag": "data-missing",
339 "error-message": "Request could not be completed because the relevant data model content does not exist"},
340 res['errors']['error'])
342 def test_23_service_path_delete_xpdr_check(self):
343 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-713:720")
344 self.assertEqual(response.status_code, requests.codes.conflict)
345 res = response.json()
347 "error-type": "application",
348 "error-tag": "data-missing",
349 "error-message": "Request could not be completed because the relevant data model content does not exist"},
350 res['errors']['error'])
352 def test_24_service_path_delete_xpdr_check(self):
353 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-OTU")
354 self.assertEqual(response.status_code, requests.codes.conflict)
355 res = response.json()
357 "error-type": "application",
358 "error-tag": "data-missing",
359 "error-message": "Request could not be completed because the relevant data model content does not exist"},
360 res['errors']['error'])
362 def test_25_service_path_delete_xpdr_check(self):
363 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-NETWORK1-ODU")
364 self.assertEqual(response.status_code, requests.codes.conflict)
365 res = response.json()
367 "error-type": "application",
368 "error-tag": "data-missing",
369 "error-message": "Request could not be completed because the relevant data model content does not exist"},
370 res['errors']['error'])
372 def test_26_service_path_delete_xpdr_check(self):
373 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR1-CLIENT1-ETHERNET")
374 self.assertEqual(response.status_code, requests.codes.conflict)
375 res = response.json()
377 "error-type": "application",
378 "error-tag": "data-missing",
379 "error-message": "Request could not be completed because the relevant data model content does not exist"},
380 res['errors']['error'])
382 def test_27_service_path_delete_xpdr_check(self):
383 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-NET")
384 self.assertEqual(response.status_code, requests.codes.ok)
385 res = response.json()
386 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
388 def test_28_service_path_delete_xpdr_check(self):
389 response = test_utils.check_netconf_node_request("XPDR-A1", "circuit-packs/1%2F0%2F1-PLUG-CLIENT")
390 self.assertEqual(response.status_code, requests.codes.ok)
391 res = response.json()
392 self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
394 def test_29_rdm_device_disconnected(self):
395 response = test_utils.unmount_device("ROADM-A1")
396 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
398 def test_30_xpdr_device_disconnected(self):
399 response = test_utils.unmount_device("XPDR-A1")
400 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
403 if __name__ == "__main__":
404 unittest.main(verbosity=2, failfast=False)