3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 from common import test_utils
21 class TransportPCEtesting(unittest.TestCase):
24 WAITING = 20 # nominal value is 300
26 cr_serv_sample_data = {"input": {
27 "sdnc-request-header": {
28 "request-id": "request-1",
29 "rpc-action": "service-create",
30 "request-system-id": "appname"
32 "service-name": "service1-OCH-OTU4",
33 "common-id": "commonId",
34 "connection-type": "infrastructure",
36 "service-rate": "100",
37 "node-id": "SPDR-SA1",
38 "service-format": "OTU",
39 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
43 "committed-info-rate": "100000",
44 "committed-burst-size": "64"
49 "port-device-name": "SPDR-SA1-XPDR1",
51 "port-name": "XPDR1-NETWORK1",
52 "port-rack": "000000.00",
53 "port-shelf": "Chassis#1"
56 "lgx-device-name": "Some lgx-device-name",
57 "lgx-port-name": "Some lgx-port-name",
58 "lgx-port-rack": "000000.00",
59 "lgx-port-shelf": "00"
64 "port-device-name": "SPDR-SA1-XPDR1",
66 "port-name": "XPDR1-NETWORK1",
67 "port-rack": "000000.00",
68 "port-shelf": "Chassis#1"
71 "lgx-device-name": "Some lgx-device-name",
72 "lgx-port-name": "Some lgx-port-name",
73 "lgx-port-rack": "000000.00",
74 "lgx-port-shelf": "00"
80 "service-rate": "100",
81 "node-id": "SPDR-SC1",
82 "service-format": "OTU",
83 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
87 "committed-info-rate": "100000",
88 "committed-burst-size": "64"
93 "port-device-name": "SPDR-SC1-XPDR1",
95 "port-name": "XPDR1-NETWORK1",
96 "port-rack": "000000.00",
97 "port-shelf": "Chassis#1"
100 "lgx-device-name": "Some lgx-device-name",
101 "lgx-port-name": "Some lgx-port-name",
102 "lgx-port-rack": "000000.00",
103 "lgx-port-shelf": "00"
108 "port-device-name": "SPDR-SC1-XPDR1",
109 "port-type": "fixed",
110 "port-name": "XPDR1-NETWORK1",
111 "port-rack": "000000.00",
112 "port-shelf": "Chassis#1"
115 "lgx-device-name": "Some lgx-device-name",
116 "lgx-port-name": "Some lgx-port-name",
117 "lgx-port-rack": "000000.00",
118 "lgx-port-shelf": "00"
123 "due-date": "2018-06-15T00:00:01Z",
124 "operator-contact": "pw1234"
130 cls.processes = test_utils.start_tpce()
131 cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
134 def tearDownClass(cls):
135 # pylint: disable=not-an-iterable
136 for process in cls.processes:
137 test_utils.shutdown_process(process)
138 print("all processes killed")
143 def test_01_connect_spdrA(self):
144 response = test_utils.mount_device("SPDR-SA1", 'spdra')
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_02_connect_spdrC(self):
148 response = test_utils.mount_device("SPDR-SC1", 'spdrc')
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_03_connect_rdmA(self):
152 response = test_utils.mount_device("ROADM-A1", 'roadma')
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_04_connect_rdmC(self):
156 response = test_utils.mount_device("ROADM-C1", 'roadmc')
157 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159 def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
160 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
161 "ROADM-A1", "1", "SRG1-PP1-TXRX")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
167 def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
168 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
169 "ROADM-A1", "1", "SRG1-PP1-TXRX")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
175 def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
176 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
177 "ROADM-C1", "1", "SRG1-PP1-TXRX")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
183 def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
184 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
185 "ROADM-C1", "1", "SRG1-PP1-TXRX")
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
191 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
192 # Config ROADMA-ROADMC oms-attributes
194 "auto-spanloss": "true",
195 "spanloss-base": 11.4,
196 "spanloss-current": 12,
197 "engineered-spanloss": 12.2,
198 "link-concatenation": [{
201 "SRLG-length": 100000,
203 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
204 self.assertEqual(response.status_code, requests.codes.created)
206 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
207 # Config ROADMC-ROADMA oms-attributes
209 "auto-spanloss": "true",
210 "spanloss-base": 11.4,
211 "spanloss-current": 12,
212 "engineered-spanloss": 12.2,
213 "link-concatenation": [{
216 "SRLG-length": 100000,
218 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
219 self.assertEqual(response.status_code, requests.codes.created)
221 # test service-create for OCH-OTU4 service from spdr to spdr
222 def test_11_check_otn_topology(self):
223 response = test_utils.get_otn_topo_request()
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
226 nbNode = len(res['network'][0]['node'])
227 self.assertEqual(nbNode, 4)
228 self.assertNotIn('ietf-network-topology:link', res['network'][0])
230 def test_12_create_OCH_OTU4_service(self):
231 response = test_utils.service_create_request(self.cr_serv_sample_data)
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
234 self.assertIn('PCE calculation in progress',
235 res['output']['configuration-response-common']['response-message'])
236 time.sleep(self.WAITING)
238 def test_13_get_OCH_OTU4_service1(self):
239 response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
240 self.assertEqual(response.status_code, requests.codes.ok)
241 res = response.json()
243 res['services'][0]['administrative-state'], 'inService')
245 res['services'][0]['service-name'], 'service1-OCH-OTU4')
247 res['services'][0]['connection-type'], 'infrastructure')
249 res['services'][0]['lifecycle-state'], 'planned')
252 # Check correct configuration of devices
253 def test_14_check_interface_och_spdra(self):
254 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
258 'administrative-state': 'inService',
259 'supporting-circuit-pack-name': 'CP1-CFP0',
260 'type': 'org-openroadm-interfaces:opticalChannel',
261 'supporting-port': 'CP1-CFP0-P1'
262 }, **res['interface'][0]),
265 self.assertDictEqual(
266 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
267 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
268 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
270 def test_15_check_interface_OTU4_spdra(self):
271 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
275 'administrative-state': 'inService',
276 'supporting-circuit-pack-name': 'CP1-CFP0',
277 'supporting-interface': 'XPDR1-NETWORK1-1',
278 'type': 'org-openroadm-interfaces:otnOtu',
279 'supporting-port': 'CP1-CFP0-P1'
281 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
282 'expected-dapi': 'Swfw02qXGyI=',
283 'rate': 'org-openroadm-otn-common-types:OTU4',
286 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
289 self.assertDictEqual(input_dict_2,
291 ['org-openroadm-otn-otu-interfaces:otu'])
293 def test_16_check_interface_och_spdrc(self):
294 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
295 self.assertEqual(response.status_code, requests.codes.ok)
296 res = response.json()
297 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
298 'administrative-state': 'inService',
299 'supporting-circuit-pack-name': 'CP1-CFP0',
300 'type': 'org-openroadm-interfaces:opticalChannel',
301 'supporting-port': 'CP1-CFP0-P1'
302 }, **res['interface'][0]),
305 self.assertDictEqual(
306 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
307 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
308 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
310 def test_17_check_interface_OTU4_spdrc(self):
311 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
312 self.assertEqual(response.status_code, requests.codes.ok)
313 res = response.json()
314 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
315 'administrative-state': 'inService',
316 'supporting-circuit-pack-name': 'CP1-CFP0',
317 'supporting-interface': 'XPDR1-NETWORK1-1',
318 'type': 'org-openroadm-interfaces:otnOtu',
319 'supporting-port': 'CP1-CFP0-P1'
321 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
322 'expected-sapi': 'Swfw02qXGyI=',
323 'tx-sapi': 'fuYZwEO660g=',
324 'expected-dapi': 'fuYZwEO660g=',
325 'rate': 'org-openroadm-otn-common-types:OTU4',
329 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
332 self.assertDictEqual(input_dict_2,
334 ['org-openroadm-otn-otu-interfaces:otu'])
336 def test_18_check_no_interface_ODU4_spdra(self):
337 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
338 self.assertEqual(response.status_code, requests.codes.conflict)
339 res = response.json()
341 {"error-type": "application", "error-tag": "data-missing",
342 "error-message": "Request could not be completed because the relevant data model content does not exist"},
343 res['errors']['error'])
345 def test_19_check_openroadm_topo_spdra(self):
346 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
347 self.assertEqual(response.status_code, requests.codes.ok)
348 res = response.json()
349 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
350 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
351 self.assertEqual({u'frequency': 196.1,
353 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
356 def test_20_check_openroadm_topo_ROADMA_SRG(self):
357 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
358 self.assertEqual(response.status_code, requests.codes.ok)
359 res = response.json()
360 self.assertNotIn({u'index': 1},
361 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
362 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
364 if ele['tp-id'] == 'SRG1-PP1-TXRX':
365 self.assertIn({u'index': 1, u'frequency': 196.1,
367 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
368 if ele['tp-id'] == 'SRG1-PP2-TXRX':
369 self.assertNotIn('used-wavelength', dict.keys(ele))
372 def test_21_check_openroadm_topo_ROADMA_DEG(self):
373 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
376 self.assertNotIn({u'index': 1},
377 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
378 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
380 if ele['tp-id'] == 'DEG2-CTP-TXRX':
381 self.assertIn({u'index': 1, u'frequency': 196.1,
383 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
384 if ele['tp-id'] == 'DEG2-TTP-TXRX':
385 self.assertIn({u'index': 1, u'frequency': 196.1,
387 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
390 def test_22_check_otn_topo_otu4_links(self):
391 response = test_utils.get_otn_topo_request()
392 self.assertEqual(response.status_code, requests.codes.ok)
393 res = response.json()
394 nb_links = len(res['network'][0]['ietf-network-topology:link'])
395 self.assertEqual(nb_links, 2)
396 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
397 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
398 for link in res['network'][0]['ietf-network-topology:link']:
399 self.assertIn(link['link-id'], listLinkId)
400 self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
401 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
402 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
403 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
404 self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
406 # test service-create for ODU4 service from spdr to spdr
407 def test_23_create_ODU4_service(self):
408 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
409 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
410 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
411 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
412 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
413 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
414 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
416 response = test_utils.service_create_request(self.cr_serv_sample_data)
417 self.assertEqual(response.status_code, requests.codes.ok)
418 res = response.json()
419 self.assertIn('PCE calculation in progress',
420 res['output']['configuration-response-common']['response-message'])
421 time.sleep(self.WAITING)
423 def test_24_get_ODU4_service1(self):
424 response = test_utils.get_service_list_request("services/service1-ODU4")
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
428 res['services'][0]['administrative-state'], 'inService')
430 res['services'][0]['service-name'], 'service1-ODU4')
432 res['services'][0]['connection-type'], 'infrastructure')
434 res['services'][0]['lifecycle-state'], 'planned')
437 def test_25_check_interface_ODU4_spdra(self):
438 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
442 'administrative-state': 'inService',
443 'supporting-circuit-pack-name': 'CP1-CFP0',
444 'supporting-interface': 'XPDR1-NETWORK1-OTU',
445 'type': 'org-openroadm-interfaces:otnOdu',
446 'supporting-port': 'CP1-CFP0-P1'}
447 # SAPI/DAPI are added in the Otu4 renderer
448 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
449 'rate': 'org-openroadm-otn-common-types:ODU4',
450 'expected-dapi': 'Swfw02qXGyI=',
451 'expected-sapi': 'fuYZwEO660g=',
452 'tx-dapi': 'fuYZwEO660g=',
453 'tx-sapi': 'Swfw02qXGyI='}
455 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
457 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
459 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
461 self.assertDictEqual(
462 {u'payload-type': u'21', u'exp-payload-type': u'21'},
463 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
465 def test_26_check_interface_ODU4_spdrc(self):
466 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
467 self.assertEqual(response.status_code, requests.codes.ok)
468 res = response.json()
469 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
470 'administrative-state': 'inService',
471 'supporting-circuit-pack-name': 'CP1-CFP0',
472 'supporting-interface': 'XPDR1-NETWORK1-OTU',
473 'type': 'org-openroadm-interfaces:otnOdu',
474 'supporting-port': 'CP1-CFP0-P1'}
475 # SAPI/DAPI are added in the Otu4 renderer
476 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
477 'rate': 'org-openroadm-otn-common-types:ODU4',
478 'tx-sapi': 'fuYZwEO660g=',
479 'tx-dapi': 'Swfw02qXGyI=',
480 'expected-sapi': 'Swfw02qXGyI=',
481 'expected-dapi': 'fuYZwEO660g='
483 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
485 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
487 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
489 self.assertDictEqual(
490 {u'payload-type': u'21', u'exp-payload-type': u'21'},
491 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
493 def test_27_check_otn_topo_links(self):
494 response = test_utils.get_otn_topo_request()
495 self.assertEqual(response.status_code, requests.codes.ok)
496 res = response.json()
497 nb_links = len(res['network'][0]['ietf-network-topology:link'])
498 self.assertEqual(nb_links, 4)
499 for link in res['network'][0]['ietf-network-topology:link']:
500 linkId = link['link-id']
501 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
502 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
503 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
504 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
505 elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
506 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
507 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
508 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
509 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
510 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
511 self.assertIn(link['org-openroadm-common-network:opposite-link'],
512 ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
513 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
515 self.fail("this link should not exist")
517 def test_28_check_otn_topo_tp(self):
518 response = test_utils.get_otn_topo_request()
519 res = response.json()
520 for node in res['network'][0]['node']:
521 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
522 tpList = node['ietf-network-topology:termination-point']
524 if tp['tp-id'] == 'XPDR1-NETWORK1':
525 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
526 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
527 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
528 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
529 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
531 # test service-create for 10GE service from spdr to spdr
532 def test_29_create_10GE_service(self):
533 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
534 self.cr_serv_sample_data["input"]["connection-type"] = "service"
535 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
536 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
537 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
538 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
539 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
540 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
541 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
542 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
543 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
544 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
545 response = test_utils.service_create_request(self.cr_serv_sample_data)
546 self.assertEqual(response.status_code, requests.codes.ok)
547 res = response.json()
548 self.assertIn('PCE calculation in progress',
549 res['output']['configuration-response-common']['response-message'])
550 time.sleep(self.WAITING)
552 def test_30_get_10GE_service1(self):
553 response = test_utils.get_service_list_request("services/service1-10GE")
554 self.assertEqual(response.status_code, requests.codes.ok)
555 res = response.json()
557 res['services'][0]['administrative-state'], 'inService')
559 res['services'][0]['service-name'], 'service1-10GE')
561 res['services'][0]['connection-type'], 'service')
563 res['services'][0]['lifecycle-state'], 'planned')
566 def test_31_check_interface_10GE_CLIENT_spdra(self):
567 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
568 self.assertEqual(response.status_code, requests.codes.ok)
569 res = response.json()
570 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
571 'administrative-state': 'inService',
572 'supporting-circuit-pack-name': 'CP1-SFP4',
573 'type': 'org-openroadm-interfaces:ethernetCsmacd',
574 'supporting-port': 'CP1-SFP4-P1'
576 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
578 self.assertDictEqual(
580 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
582 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
583 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
584 self.assertEqual(response.status_code, requests.codes.ok)
585 res = response.json()
586 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
587 'administrative-state': 'inService',
588 'supporting-circuit-pack-name': 'CP1-SFP4',
589 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
590 'type': 'org-openroadm-interfaces:otnOdu',
591 'supporting-port': 'CP1-SFP4-P1'}
593 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
594 'rate': 'org-openroadm-otn-common-types:ODU2e',
595 'monitoring-mode': 'terminated'}
597 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
599 self.assertDictEqual(dict(input_dict_2,
600 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
601 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
602 self.assertDictEqual(
603 {u'payload-type': u'03', u'exp-payload-type': u'03'},
604 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
606 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
607 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
608 self.assertEqual(response.status_code, requests.codes.ok)
609 res = response.json()
610 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
611 'administrative-state': 'inService',
612 'supporting-circuit-pack-name': 'CP1-CFP0',
613 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
614 'type': 'org-openroadm-interfaces:otnOdu',
615 'supporting-port': 'CP1-CFP0-P1'}
617 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
618 'rate': 'org-openroadm-otn-common-types:ODU2e',
619 'monitoring-mode': 'monitored'}
620 input_dict_3 = {'trib-port-number': 1}
622 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
624 self.assertDictEqual(dict(input_dict_2,
625 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
626 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
627 self.assertDictEqual(dict(input_dict_3,
628 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
629 'parent-odu-allocation']),
630 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
631 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
634 def test_34_check_ODU2E_connection_spdra(self):
635 response = test_utils.check_netconf_node_request(
637 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
638 self.assertEqual(response.status_code, requests.codes.ok)
639 res = response.json()
642 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
643 'direction': 'bidirectional'
646 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
647 res['odu-connection'][0])
648 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
649 res['odu-connection'][0]['destination'])
650 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
651 res['odu-connection'][0]['source'])
653 def test_35_check_interface_10GE_CLIENT_spdrc(self):
654 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
655 self.assertEqual(response.status_code, requests.codes.ok)
656 res = response.json()
657 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
658 'administrative-state': 'inService',
659 'supporting-circuit-pack-name': 'CP1-SFP4',
660 'type': 'org-openroadm-interfaces:ethernetCsmacd',
661 'supporting-port': 'CP1-SFP4-P1'
663 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
665 self.assertDictEqual(
667 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
669 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
670 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
671 self.assertEqual(response.status_code, requests.codes.ok)
672 res = response.json()
673 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
674 'administrative-state': 'inService',
675 'supporting-circuit-pack-name': 'CP1-SFP4',
676 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
677 'type': 'org-openroadm-interfaces:otnOdu',
678 'supporting-port': 'CP1-SFP4-P1'}
680 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
681 'rate': 'org-openroadm-otn-common-types:ODU2e',
682 'monitoring-mode': 'terminated'}
684 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
686 self.assertDictEqual(dict(input_dict_2,
687 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
688 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
689 self.assertDictEqual(
690 {u'payload-type': u'03', u'exp-payload-type': u'03'},
691 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
693 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
694 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res = response.json()
697 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
698 'administrative-state': 'inService',
699 'supporting-circuit-pack-name': 'CP1-CFP0',
700 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
701 'type': 'org-openroadm-interfaces:otnOdu',
702 'supporting-port': 'CP1-CFP0-P1'}
704 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
705 'rate': 'org-openroadm-otn-common-types:ODU2e',
706 'monitoring-mode': 'monitored'}
708 input_dict_3 = {'trib-port-number': 1}
710 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
712 self.assertDictEqual(dict(input_dict_2,
713 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
714 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
715 self.assertDictEqual(dict(input_dict_3,
716 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
717 'parent-odu-allocation']),
718 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
719 'parent-odu-allocation'])
722 'org-openroadm-otn-odu-interfaces:odu'][
723 'parent-odu-allocation']['trib-slots'])
725 def test_38_check_ODU2E_connection_spdrc(self):
726 response = test_utils.check_netconf_node_request(
728 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
729 self.assertEqual(response.status_code, requests.codes.ok)
730 res = response.json()
733 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
734 'direction': 'bidirectional'
737 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
738 res['odu-connection'][0])
739 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
740 res['odu-connection'][0]['destination'])
741 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
742 res['odu-connection'][0]['source'])
744 def test_39_check_otn_topo_links(self):
745 response = test_utils.get_otn_topo_request()
746 self.assertEqual(response.status_code, requests.codes.ok)
747 res = response.json()
748 nb_links = len(res['network'][0]['ietf-network-topology:link'])
749 self.assertEqual(nb_links, 4)
750 for link in res['network'][0]['ietf-network-topology:link']:
751 linkId = link['link-id']
752 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
753 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
754 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
755 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
757 def test_40_check_otn_topo_tp(self):
758 response = test_utils.get_otn_topo_request()
759 res = response.json()
760 for node in res['network'][0]['node']:
761 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
762 tpList = node['ietf-network-topology:termination-point']
764 if tp['tp-id'] == 'XPDR1-NETWORK1':
765 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
766 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
767 tsPoolList = list(range(1, 9))
768 self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
769 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
770 self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
772 def test_41_delete_10GE_service(self):
773 response = test_utils.service_delete_request("service1-10GE")
774 self.assertEqual(response.status_code, requests.codes.ok)
775 res = response.json()
776 self.assertIn('Renderer service delete in progress',
777 res['output']['configuration-response-common']['response-message'])
780 def test_42_check_service_list(self):
781 response = test_utils.get_service_list_request("")
782 self.assertEqual(response.status_code, requests.codes.ok)
783 res = response.json()
784 self.assertEqual(len(res['service-list']['services']), 2)
787 def test_43_check_no_ODU2e_connection_spdra(self):
788 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
789 self.assertEqual(response.status_code, requests.codes.ok)
790 res = response.json()
791 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
794 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
795 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
796 self.assertEqual(response.status_code, requests.codes.conflict)
798 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
799 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
800 self.assertEqual(response.status_code, requests.codes.conflict)
802 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
803 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
804 self.assertEqual(response.status_code, requests.codes.conflict)
806 def test_47_check_otn_topo_links(self):
807 response = test_utils.get_otn_topo_request()
808 self.assertEqual(response.status_code, requests.codes.ok)
809 res = response.json()
810 nb_links = len(res['network'][0]['ietf-network-topology:link'])
811 self.assertEqual(nb_links, 4)
812 for link in res['network'][0]['ietf-network-topology:link']:
813 linkId = link['link-id']
814 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
815 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
816 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
817 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
819 def test_48_check_otn_topo_tp(self):
820 response = test_utils.get_otn_topo_request()
821 res = response.json()
822 for node in res['network'][0]['node']:
823 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
824 tpList = node['ietf-network-topology:termination-point']
826 if tp['tp-id'] == 'XPDR1-NETWORK1':
827 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
828 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
829 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
831 def test_49_delete_ODU4_service(self):
832 response = test_utils.service_delete_request("service1-ODU4")
833 self.assertEqual(response.status_code, requests.codes.ok)
834 res = response.json()
835 self.assertIn('Renderer service delete in progress',
836 res['output']['configuration-response-common']['response-message'])
839 def test_50_check_service_list(self):
840 response = test_utils.get_service_list_request("")
841 self.assertEqual(response.status_code, requests.codes.ok)
842 res = response.json()
843 self.assertEqual(len(res['service-list']['services']), 1)
846 def test_51_check_no_interface_ODU4_spdra(self):
847 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
848 self.assertEqual(response.status_code, requests.codes.conflict)
850 def test_52_check_otn_topo_links(self):
851 self.test_22_check_otn_topo_otu4_links()
853 def test_53_check_otn_topo_tp(self):
854 response = test_utils.get_otn_topo_request()
855 res = response.json()
856 for node in res['network'][0]['node']:
857 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
858 tpList = node['ietf-network-topology:termination-point']
860 if tp['tp-id'] == 'XPDR1-NETWORK1':
861 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
862 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
863 self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
865 def test_54_delete_OCH_OTU4_service(self):
866 response = test_utils.service_delete_request("service1-OCH-OTU4")
867 self.assertEqual(response.status_code, requests.codes.ok)
868 res = response.json()
869 self.assertIn('Renderer service delete in progress',
870 res['output']['configuration-response-common']['response-message'])
873 def test_55_get_no_service(self):
874 response = test_utils.get_service_list_request("")
875 self.assertEqual(response.status_code, requests.codes.conflict)
876 res = response.json()
878 {"error-type": "application", "error-tag": "data-missing",
879 "error-message": "Request could not be completed because the relevant data model content does not exist"},
880 res['errors']['error'])
883 def test_56_check_no_interface_OTU4_spdra(self):
884 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
885 self.assertEqual(response.status_code, requests.codes.conflict)
887 def test_57_check_no_interface_OCH_spdra(self):
888 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
889 self.assertEqual(response.status_code, requests.codes.conflict)
891 def test_58_getLinks_OtnTopology(self):
892 response = test_utils.get_otn_topo_request()
893 self.assertEqual(response.status_code, requests.codes.ok)
894 res = response.json()
895 self.assertNotIn('ietf-network-topology:link', res['network'][0])
897 def test_59_check_openroadm_topo_spdra(self):
898 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
899 self.assertEqual(response.status_code, requests.codes.ok)
900 res = response.json()
901 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
902 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
903 self.assertNotIn('wavelength', dict.keys(
904 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
907 def test_60_check_openroadm_topo_ROADMA_SRG(self):
908 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
909 self.assertEqual(response.status_code, requests.codes.ok)
910 res = response.json()
911 self.assertIn({u'index': 1},
912 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
913 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
915 if ele['tp-id'] == 'SRG1-PP1-TXRX':
916 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
919 def test_61_check_openroadm_topo_ROADMA_DEG(self):
920 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
921 self.assertEqual(response.status_code, requests.codes.ok)
922 res = response.json()
923 self.assertIn({u'index': 1},
924 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
925 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
927 if ele['tp-id'] == 'DEG2-CTP-TXRX':
928 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
929 if ele['tp-id'] == 'DEG2-TTP-TXRX':
930 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
933 def test_62_disconnect_spdrA(self):
934 response = test_utils.unmount_device("SPDR-SA1")
935 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
937 def test_63_disconnect_spdrC(self):
938 response = test_utils.unmount_device("SPDR-SC1")
939 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
941 def test_64_disconnect_roadmA(self):
942 response = test_utils.unmount_device("ROADM-A1")
943 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
945 def test_65_disconnect_roadmC(self):
946 response = test_utils.unmount_device("ROADM-C1")
947 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
950 if __name__ == "__main__":
951 unittest.main(verbosity=2)