3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 from common import test_utils
20 from common.test_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP
23 class TransportPCEtesting(unittest.TestCase):
26 WAITING = 20 # nominal value is 300
28 cr_serv_sample_data = {"input": {
29 "sdnc-request-header": {
30 "request-id": "request-1",
31 "rpc-action": "service-create",
32 "request-system-id": "appname"
34 "service-name": "service1-OCH-OTU4",
35 "common-id": "commonId",
36 "connection-type": "infrastructure",
38 "service-rate": "100",
39 "node-id": "SPDR-SA1",
40 "service-format": "OTU",
41 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
45 "committed-info-rate": "100000",
46 "committed-burst-size": "64"
51 "port-device-name": "SPDR-SA1-XPDR1",
53 "port-name": "XPDR1-NETWORK1",
54 "port-rack": "000000.00",
55 "port-shelf": "Chassis#1"
58 "lgx-device-name": "Some lgx-device-name",
59 "lgx-port-name": "Some lgx-port-name",
60 "lgx-port-rack": "000000.00",
61 "lgx-port-shelf": "00"
66 "port-device-name": "SPDR-SA1-XPDR1",
68 "port-name": "XPDR1-NETWORK1",
69 "port-rack": "000000.00",
70 "port-shelf": "Chassis#1"
73 "lgx-device-name": "Some lgx-device-name",
74 "lgx-port-name": "Some lgx-port-name",
75 "lgx-port-rack": "000000.00",
76 "lgx-port-shelf": "00"
82 "service-rate": "100",
83 "node-id": "SPDR-SC1",
84 "service-format": "OTU",
85 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89 "committed-info-rate": "100000",
90 "committed-burst-size": "64"
95 "port-device-name": "SPDR-SC1-XPDR1",
97 "port-name": "XPDR1-NETWORK1",
98 "port-rack": "000000.00",
99 "port-shelf": "Chassis#1"
102 "lgx-device-name": "Some lgx-device-name",
103 "lgx-port-name": "Some lgx-port-name",
104 "lgx-port-rack": "000000.00",
105 "lgx-port-shelf": "00"
110 "port-device-name": "SPDR-SC1-XPDR1",
111 "port-type": "fixed",
112 "port-name": "XPDR1-NETWORK1",
113 "port-rack": "000000.00",
114 "port-shelf": "Chassis#1"
117 "lgx-device-name": "Some lgx-device-name",
118 "lgx-port-name": "Some lgx-port-name",
119 "lgx-port-rack": "000000.00",
120 "lgx-port-shelf": "00"
125 "due-date": "2018-06-15T00:00:01Z",
126 "operator-contact": "pw1234"
132 cls.processes = test_utils.start_tpce()
133 cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
136 def tearDownClass(cls):
137 # pylint: disable=not-an-iterable
138 for process in cls.processes:
139 test_utils.shutdown_process(process)
140 print("all processes killed")
145 def test_01_connect_spdrA(self):
146 response = test_utils.mount_device("SPDR-SA1", 'spdra')
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_02_connect_spdrC(self):
150 response = test_utils.mount_device("SPDR-SC1", 'spdrc')
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_03_connect_rdmA(self):
154 response = test_utils.mount_device("ROADM-A1", 'roadma')
155 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157 def test_04_connect_rdmC(self):
158 response = test_utils.mount_device("ROADM-C1", 'roadmc')
159 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
162 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
163 "ROADM-A1", "1", "SRG1-PP1-TXRX")
164 self.assertEqual(response.status_code, requests.codes.ok)
165 res = response.json()
166 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
169 def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
170 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
171 "ROADM-A1", "1", "SRG1-PP1-TXRX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
177 def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
178 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
179 "ROADM-C1", "1", "SRG1-PP1-TXRX")
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
185 def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
186 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
187 "ROADM-C1", "1", "SRG1-PP1-TXRX")
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
193 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
194 # Config ROADMA-ROADMC oms-attributes
196 "auto-spanloss": "true",
197 "spanloss-base": 11.4,
198 "spanloss-current": 12,
199 "engineered-spanloss": 12.2,
200 "link-concatenation": [{
203 "SRLG-length": 100000,
205 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
206 self.assertEqual(response.status_code, requests.codes.created)
208 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
209 # Config ROADMC-ROADMA oms-attributes
211 "auto-spanloss": "true",
212 "spanloss-base": 11.4,
213 "spanloss-current": 12,
214 "engineered-spanloss": 12.2,
215 "link-concatenation": [{
218 "SRLG-length": 100000,
220 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
221 self.assertEqual(response.status_code, requests.codes.created)
223 # test service-create for OCH-OTU4 service from spdr to spdr
224 def test_11_check_otn_topology(self):
225 response = test_utils.get_otn_topo_request()
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 nbNode = len(res['network'][0]['node'])
229 self.assertEqual(nbNode, 4)
230 self.assertNotIn('ietf-network-topology:link', res['network'][0])
232 def test_12_create_OCH_OTU4_service(self):
233 response = test_utils.service_create_request(self.cr_serv_sample_data)
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 self.assertIn('PCE calculation in progress',
237 res['output']['configuration-response-common']['response-message'])
238 time.sleep(self.WAITING)
240 def test_13_get_OCH_OTU4_service1(self):
241 response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
242 self.assertEqual(response.status_code, requests.codes.ok)
243 res = response.json()
245 res['services'][0]['administrative-state'], 'inService')
247 res['services'][0]['service-name'], 'service1-OCH-OTU4')
249 res['services'][0]['connection-type'], 'infrastructure')
251 res['services'][0]['lifecycle-state'], 'planned')
254 # Check correct configuration of devices
255 def test_14_check_interface_och_spdra(self):
256 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
257 self.assertEqual(response.status_code, requests.codes.ok)
258 res = response.json()
259 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
260 'administrative-state': 'inService',
261 'supporting-circuit-pack-name': 'CP1-CFP0',
262 'type': 'org-openroadm-interfaces:opticalChannel',
263 'supporting-port': 'CP1-CFP0-P1'
264 }, **res['interface'][0]),
267 self.assertDictEqual(
268 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
269 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
270 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
272 def test_15_check_interface_OTU4_spdra(self):
273 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
276 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
277 'administrative-state': 'inService',
278 'supporting-circuit-pack-name': 'CP1-CFP0',
279 'supporting-interface': 'XPDR1-NETWORK1-1',
280 'type': 'org-openroadm-interfaces:otnOtu',
281 'supporting-port': 'CP1-CFP0-P1'
283 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
284 'expected-dapi': 'Swfw02qXGyI=',
285 'rate': 'org-openroadm-otn-common-types:OTU4',
288 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
291 self.assertDictEqual(input_dict_2,
293 ['org-openroadm-otn-otu-interfaces:otu'])
295 def test_16_check_interface_och_spdrc(self):
296 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
297 self.assertEqual(response.status_code, requests.codes.ok)
298 res = response.json()
299 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
300 'administrative-state': 'inService',
301 'supporting-circuit-pack-name': 'CP1-CFP0',
302 'type': 'org-openroadm-interfaces:opticalChannel',
303 'supporting-port': 'CP1-CFP0-P1'
304 }, **res['interface'][0]),
307 self.assertDictEqual(
308 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
309 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
310 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
312 def test_17_check_interface_OTU4_spdrc(self):
313 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
316 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
317 'administrative-state': 'inService',
318 'supporting-circuit-pack-name': 'CP1-CFP0',
319 'supporting-interface': 'XPDR1-NETWORK1-1',
320 'type': 'org-openroadm-interfaces:otnOtu',
321 'supporting-port': 'CP1-CFP0-P1'
323 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
324 'expected-sapi': 'Swfw02qXGyI=',
325 'tx-sapi': 'fuYZwEO660g=',
326 'expected-dapi': 'fuYZwEO660g=',
327 'rate': 'org-openroadm-otn-common-types:OTU4',
331 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
334 self.assertDictEqual(input_dict_2,
336 ['org-openroadm-otn-otu-interfaces:otu'])
338 def test_18_check_no_interface_ODU4_spdra(self):
339 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
340 self.assertEqual(response.status_code, requests.codes.conflict)
341 res = response.json()
343 {"error-type": "application", "error-tag": "data-missing",
344 "error-message": "Request could not be completed because the relevant data model content does not exist"},
345 res['errors']['error'])
347 def test_19_check_openroadm_topo_spdra(self):
348 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
352 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
353 self.assertEqual({u'frequency': 196.1,
355 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
358 def test_20_check_openroadm_topo_ROADMA_SRG(self):
359 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
360 self.assertEqual(response.status_code, requests.codes.ok)
361 res = response.json()
362 freq_map = base64.b64decode(
363 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
364 freq_map_array = [int(x) for x in freq_map]
365 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
366 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
368 if ele['tp-id'] == 'SRG1-PP1-TXRX':
369 self.assertIn({u'index': 1, u'frequency': 196.1,
371 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
372 if ele['tp-id'] == 'SRG1-PP2-TXRX':
373 self.assertNotIn('used-wavelength', dict.keys(ele))
376 def test_21_check_openroadm_topo_ROADMA_DEG(self):
377 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
378 self.assertEqual(response.status_code, requests.codes.ok)
379 res = response.json()
380 freq_map = base64.b64decode(
381 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
382 freq_map_array = [int(x) for x in freq_map]
383 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
384 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
386 if ele['tp-id'] == 'DEG2-CTP-TXRX':
387 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
388 u'effective-bits': 8, u'freq-map': INDEX_1_USED_FREQ_MAP},
389 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'])
390 if ele['tp-id'] == 'DEG2-TTP-TXRX':
391 self.assertIn({u'index': 1, u'frequency': 196.1,
393 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
396 def test_22_check_otn_topo_otu4_links(self):
397 response = test_utils.get_otn_topo_request()
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 nb_links = len(res['network'][0]['ietf-network-topology:link'])
401 self.assertEqual(nb_links, 2)
402 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
403 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
404 for link in res['network'][0]['ietf-network-topology:link']:
405 self.assertIn(link['link-id'], listLinkId)
406 self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
407 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
408 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
409 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
410 self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
412 # test service-create for ODU4 service from spdr to spdr
413 def test_23_create_ODU4_service(self):
414 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
415 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
416 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
417 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
418 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
419 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
420 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
422 response = test_utils.service_create_request(self.cr_serv_sample_data)
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 self.assertIn('PCE calculation in progress',
426 res['output']['configuration-response-common']['response-message'])
427 time.sleep(self.WAITING)
429 def test_24_get_ODU4_service1(self):
430 response = test_utils.get_service_list_request("services/service1-ODU4")
431 self.assertEqual(response.status_code, requests.codes.ok)
432 res = response.json()
434 res['services'][0]['administrative-state'], 'inService')
436 res['services'][0]['service-name'], 'service1-ODU4')
438 res['services'][0]['connection-type'], 'infrastructure')
440 res['services'][0]['lifecycle-state'], 'planned')
443 def test_25_check_interface_ODU4_spdra(self):
444 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
445 self.assertEqual(response.status_code, requests.codes.ok)
446 res = response.json()
447 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
448 'administrative-state': 'inService',
449 'supporting-circuit-pack-name': 'CP1-CFP0',
450 'supporting-interface': 'XPDR1-NETWORK1-OTU',
451 'type': 'org-openroadm-interfaces:otnOdu',
452 'supporting-port': 'CP1-CFP0-P1'}
453 # SAPI/DAPI are added in the Otu4 renderer
454 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
455 'rate': 'org-openroadm-otn-common-types:ODU4',
456 'expected-dapi': 'Swfw02qXGyI=',
457 'expected-sapi': 'fuYZwEO660g=',
458 'tx-dapi': 'fuYZwEO660g=',
459 'tx-sapi': 'Swfw02qXGyI='}
461 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
463 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
465 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
467 self.assertDictEqual(
468 {u'payload-type': u'21', u'exp-payload-type': u'21'},
469 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
471 def test_26_check_interface_ODU4_spdrc(self):
472 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
473 self.assertEqual(response.status_code, requests.codes.ok)
474 res = response.json()
475 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
476 'administrative-state': 'inService',
477 'supporting-circuit-pack-name': 'CP1-CFP0',
478 'supporting-interface': 'XPDR1-NETWORK1-OTU',
479 'type': 'org-openroadm-interfaces:otnOdu',
480 'supporting-port': 'CP1-CFP0-P1'}
481 # SAPI/DAPI are added in the Otu4 renderer
482 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
483 'rate': 'org-openroadm-otn-common-types:ODU4',
484 'tx-sapi': 'fuYZwEO660g=',
485 'tx-dapi': 'Swfw02qXGyI=',
486 'expected-sapi': 'Swfw02qXGyI=',
487 'expected-dapi': 'fuYZwEO660g='
489 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
491 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
493 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
495 self.assertDictEqual(
496 {u'payload-type': u'21', u'exp-payload-type': u'21'},
497 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
499 def test_27_check_otn_topo_links(self):
500 response = test_utils.get_otn_topo_request()
501 self.assertEqual(response.status_code, requests.codes.ok)
502 res = response.json()
503 nb_links = len(res['network'][0]['ietf-network-topology:link'])
504 self.assertEqual(nb_links, 4)
505 for link in res['network'][0]['ietf-network-topology:link']:
506 linkId = link['link-id']
507 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
508 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
509 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
510 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
511 elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
512 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
513 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
514 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
515 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
516 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
517 self.assertIn(link['org-openroadm-common-network:opposite-link'],
518 ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
519 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
521 self.fail("this link should not exist")
523 def test_28_check_otn_topo_tp(self):
524 response = test_utils.get_otn_topo_request()
525 res = response.json()
526 for node in res['network'][0]['node']:
527 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
528 tpList = node['ietf-network-topology:termination-point']
530 if tp['tp-id'] == 'XPDR1-NETWORK1':
531 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
532 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
533 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
534 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
535 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
537 # test service-create for 10GE service from spdr to spdr
538 def test_29_create_10GE_service(self):
539 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
540 self.cr_serv_sample_data["input"]["connection-type"] = "service"
541 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
542 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
543 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
544 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
545 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
546 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
547 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
548 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
549 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
550 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
551 response = test_utils.service_create_request(self.cr_serv_sample_data)
552 self.assertEqual(response.status_code, requests.codes.ok)
553 res = response.json()
554 self.assertIn('PCE calculation in progress',
555 res['output']['configuration-response-common']['response-message'])
556 time.sleep(self.WAITING)
558 def test_30_get_10GE_service1(self):
559 response = test_utils.get_service_list_request("services/service1-10GE")
560 self.assertEqual(response.status_code, requests.codes.ok)
561 res = response.json()
563 res['services'][0]['administrative-state'], 'inService')
565 res['services'][0]['service-name'], 'service1-10GE')
567 res['services'][0]['connection-type'], 'service')
569 res['services'][0]['lifecycle-state'], 'planned')
572 def test_31_check_interface_10GE_CLIENT_spdra(self):
573 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
574 self.assertEqual(response.status_code, requests.codes.ok)
575 res = response.json()
576 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
577 'administrative-state': 'inService',
578 'supporting-circuit-pack-name': 'CP1-SFP4',
579 'type': 'org-openroadm-interfaces:ethernetCsmacd',
580 'supporting-port': 'CP1-SFP4-P1'
582 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
584 self.assertDictEqual(
586 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
588 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
589 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
593 'administrative-state': 'inService',
594 'supporting-circuit-pack-name': 'CP1-SFP4',
595 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
596 'type': 'org-openroadm-interfaces:otnOdu',
597 'supporting-port': 'CP1-SFP4-P1'}
599 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
600 'rate': 'org-openroadm-otn-common-types:ODU2e',
601 'monitoring-mode': 'terminated'}
603 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
605 self.assertDictEqual(dict(input_dict_2,
606 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
607 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
608 self.assertDictEqual(
609 {u'payload-type': u'03', u'exp-payload-type': u'03'},
610 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
612 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
613 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
614 self.assertEqual(response.status_code, requests.codes.ok)
615 res = response.json()
616 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
617 'administrative-state': 'inService',
618 'supporting-circuit-pack-name': 'CP1-CFP0',
619 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
620 'type': 'org-openroadm-interfaces:otnOdu',
621 'supporting-port': 'CP1-CFP0-P1'}
623 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
624 'rate': 'org-openroadm-otn-common-types:ODU2e',
625 'monitoring-mode': 'monitored'}
626 input_dict_3 = {'trib-port-number': 1}
628 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
630 self.assertDictEqual(dict(input_dict_2,
631 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
632 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
633 self.assertDictEqual(dict(input_dict_3,
634 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
635 'parent-odu-allocation']),
636 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
637 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
640 def test_34_check_ODU2E_connection_spdra(self):
641 response = test_utils.check_netconf_node_request(
643 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
648 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
649 'direction': 'bidirectional'
652 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
653 res['odu-connection'][0])
654 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
655 res['odu-connection'][0]['destination'])
656 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
657 res['odu-connection'][0]['source'])
659 def test_35_check_interface_10GE_CLIENT_spdrc(self):
660 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
664 'administrative-state': 'inService',
665 'supporting-circuit-pack-name': 'CP1-SFP4',
666 'type': 'org-openroadm-interfaces:ethernetCsmacd',
667 'supporting-port': 'CP1-SFP4-P1'
669 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
671 self.assertDictEqual(
673 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
675 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
676 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
677 self.assertEqual(response.status_code, requests.codes.ok)
678 res = response.json()
679 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
680 'administrative-state': 'inService',
681 'supporting-circuit-pack-name': 'CP1-SFP4',
682 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
683 'type': 'org-openroadm-interfaces:otnOdu',
684 'supporting-port': 'CP1-SFP4-P1'}
686 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
687 'rate': 'org-openroadm-otn-common-types:ODU2e',
688 'monitoring-mode': 'terminated'}
690 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
692 self.assertDictEqual(dict(input_dict_2,
693 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
694 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
695 self.assertDictEqual(
696 {u'payload-type': u'03', u'exp-payload-type': u'03'},
697 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
699 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
700 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
701 self.assertEqual(response.status_code, requests.codes.ok)
702 res = response.json()
703 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
704 'administrative-state': 'inService',
705 'supporting-circuit-pack-name': 'CP1-CFP0',
706 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
707 'type': 'org-openroadm-interfaces:otnOdu',
708 'supporting-port': 'CP1-CFP0-P1'}
710 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
711 'rate': 'org-openroadm-otn-common-types:ODU2e',
712 'monitoring-mode': 'monitored'}
714 input_dict_3 = {'trib-port-number': 1}
716 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
718 self.assertDictEqual(dict(input_dict_2,
719 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
720 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
721 self.assertDictEqual(dict(input_dict_3,
722 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
723 'parent-odu-allocation']),
724 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
725 'parent-odu-allocation'])
728 'org-openroadm-otn-odu-interfaces:odu'][
729 'parent-odu-allocation']['trib-slots'])
731 def test_38_check_ODU2E_connection_spdrc(self):
732 response = test_utils.check_netconf_node_request(
734 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
735 self.assertEqual(response.status_code, requests.codes.ok)
736 res = response.json()
739 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
740 'direction': 'bidirectional'
743 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
744 res['odu-connection'][0])
745 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
746 res['odu-connection'][0]['destination'])
747 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
748 res['odu-connection'][0]['source'])
750 def test_39_check_otn_topo_links(self):
751 response = test_utils.get_otn_topo_request()
752 self.assertEqual(response.status_code, requests.codes.ok)
753 res = response.json()
754 nb_links = len(res['network'][0]['ietf-network-topology:link'])
755 self.assertEqual(nb_links, 4)
756 for link in res['network'][0]['ietf-network-topology:link']:
757 linkId = link['link-id']
758 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
759 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
760 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
761 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
763 def test_40_check_otn_topo_tp(self):
764 response = test_utils.get_otn_topo_request()
765 res = response.json()
766 for node in res['network'][0]['node']:
767 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
768 tpList = node['ietf-network-topology:termination-point']
770 if tp['tp-id'] == 'XPDR1-NETWORK1':
771 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
772 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
773 tsPoolList = list(range(1, 9))
774 self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
775 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
776 self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
778 def test_41_delete_10GE_service(self):
779 response = test_utils.service_delete_request("service1-10GE")
780 self.assertEqual(response.status_code, requests.codes.ok)
781 res = response.json()
782 self.assertIn('Renderer service delete in progress',
783 res['output']['configuration-response-common']['response-message'])
786 def test_42_check_service_list(self):
787 response = test_utils.get_service_list_request("")
788 self.assertEqual(response.status_code, requests.codes.ok)
789 res = response.json()
790 self.assertEqual(len(res['service-list']['services']), 2)
793 def test_43_check_no_ODU2e_connection_spdra(self):
794 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
795 self.assertEqual(response.status_code, requests.codes.ok)
796 res = response.json()
797 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
800 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
801 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
802 self.assertEqual(response.status_code, requests.codes.conflict)
804 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
805 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
806 self.assertEqual(response.status_code, requests.codes.conflict)
808 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
809 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
810 self.assertEqual(response.status_code, requests.codes.conflict)
812 def test_47_check_otn_topo_links(self):
813 response = test_utils.get_otn_topo_request()
814 self.assertEqual(response.status_code, requests.codes.ok)
815 res = response.json()
816 nb_links = len(res['network'][0]['ietf-network-topology:link'])
817 self.assertEqual(nb_links, 4)
818 for link in res['network'][0]['ietf-network-topology:link']:
819 linkId = link['link-id']
820 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
821 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
822 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
823 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
825 def test_48_check_otn_topo_tp(self):
826 response = test_utils.get_otn_topo_request()
827 res = response.json()
828 for node in res['network'][0]['node']:
829 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
830 tpList = node['ietf-network-topology:termination-point']
832 if tp['tp-id'] == 'XPDR1-NETWORK1':
833 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
834 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
835 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
837 def test_49_delete_ODU4_service(self):
838 response = test_utils.service_delete_request("service1-ODU4")
839 self.assertEqual(response.status_code, requests.codes.ok)
840 res = response.json()
841 self.assertIn('Renderer service delete in progress',
842 res['output']['configuration-response-common']['response-message'])
845 def test_50_check_service_list(self):
846 response = test_utils.get_service_list_request("")
847 self.assertEqual(response.status_code, requests.codes.ok)
848 res = response.json()
849 self.assertEqual(len(res['service-list']['services']), 1)
852 def test_51_check_no_interface_ODU4_spdra(self):
853 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
854 self.assertEqual(response.status_code, requests.codes.conflict)
856 def test_52_check_otn_topo_links(self):
857 self.test_22_check_otn_topo_otu4_links()
859 def test_53_check_otn_topo_tp(self):
860 response = test_utils.get_otn_topo_request()
861 res = response.json()
862 for node in res['network'][0]['node']:
863 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
864 tpList = node['ietf-network-topology:termination-point']
866 if tp['tp-id'] == 'XPDR1-NETWORK1':
867 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
868 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
869 self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
871 def test_54_delete_OCH_OTU4_service(self):
872 response = test_utils.service_delete_request("service1-OCH-OTU4")
873 self.assertEqual(response.status_code, requests.codes.ok)
874 res = response.json()
875 self.assertIn('Renderer service delete in progress',
876 res['output']['configuration-response-common']['response-message'])
879 def test_55_get_no_service(self):
880 response = test_utils.get_service_list_request("")
881 self.assertEqual(response.status_code, requests.codes.conflict)
882 res = response.json()
884 {"error-type": "application", "error-tag": "data-missing",
885 "error-message": "Request could not be completed because the relevant data model content does not exist"},
886 res['errors']['error'])
889 def test_56_check_no_interface_OTU4_spdra(self):
890 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
891 self.assertEqual(response.status_code, requests.codes.conflict)
893 def test_57_check_no_interface_OCH_spdra(self):
894 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
895 self.assertEqual(response.status_code, requests.codes.conflict)
897 def test_58_getLinks_OtnTopology(self):
898 response = test_utils.get_otn_topo_request()
899 self.assertEqual(response.status_code, requests.codes.ok)
900 res = response.json()
901 self.assertNotIn('ietf-network-topology:link', res['network'][0])
903 def test_59_check_openroadm_topo_spdra(self):
904 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
905 self.assertEqual(response.status_code, requests.codes.ok)
906 res = response.json()
907 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
908 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
909 self.assertNotIn('wavelength', dict.keys(
910 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
913 def test_60_check_openroadm_topo_ROADMA_SRG(self):
914 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
915 self.assertEqual(response.status_code, requests.codes.ok)
916 res = response.json()
917 freq_map = base64.b64decode(
918 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
919 freq_map_array = [int(x) for x in freq_map]
920 self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
921 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
923 if ele['tp-id'] == 'SRG1-PP1-TXRX':
924 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
927 def test_61_check_openroadm_topo_ROADMA_DEG(self):
928 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
929 self.assertEqual(response.status_code, requests.codes.ok)
930 res = response.json()
932 freq_map = base64.b64decode(
933 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
934 freq_map_array = [int(x) for x in freq_map]
935 self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
936 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
938 if ele['tp-id'] == 'DEG2-CTP-TXRX':
939 self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
941 if ele['tp-id'] == 'DEG2-TTP-TXRX':
942 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
945 def test_62_disconnect_spdrA(self):
946 response = test_utils.unmount_device("SPDR-SA1")
947 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
949 def test_63_disconnect_spdrC(self):
950 response = test_utils.unmount_device("SPDR-SC1")
951 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
953 def test_64_disconnect_roadmA(self):
954 response = test_utils.unmount_device("ROADM-A1")
955 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
957 def test_65_disconnect_roadmC(self):
958 response = test_utils.unmount_device("ROADM-C1")
959 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
962 if __name__ == "__main__":
963 unittest.main(verbosity=2)