3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
15 from common import test_utils
18 class TransportPCEtesting(unittest.TestCase):
21 WAITING = 20 # nominal value is 300
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "request-1",
26 "rpc-action": "service-create",
27 "request-system-id": "appname"
29 "service-name": "service1-OCH-OTU4",
30 "common-id": "commonId",
31 "connection-type": "infrastructure",
33 "service-rate": "100",
34 "node-id": "SPDR-SA1",
35 "service-format": "OTU",
36 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
40 "committed-info-rate": "100000",
41 "committed-burst-size": "64"
46 "port-device-name": "SPDR-SA1-XPDR1",
48 "port-name": "XPDR1-NETWORK1",
49 "port-rack": "000000.00",
50 "port-shelf": "Chassis#1"
53 "lgx-device-name": "Some lgx-device-name",
54 "lgx-port-name": "Some lgx-port-name",
55 "lgx-port-rack": "000000.00",
56 "lgx-port-shelf": "00"
61 "port-device-name": "SPDR-SA1-XPDR1",
63 "port-name": "XPDR1-NETWORK1",
64 "port-rack": "000000.00",
65 "port-shelf": "Chassis#1"
68 "lgx-device-name": "Some lgx-device-name",
69 "lgx-port-name": "Some lgx-port-name",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
78 "node-id": "SPDR-SC1",
79 "service-format": "OTU",
80 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
84 "committed-info-rate": "100000",
85 "committed-burst-size": "64"
90 "port-device-name": "SPDR-SC1-XPDR1",
92 "port-name": "XPDR1-NETWORK1",
93 "port-rack": "000000.00",
94 "port-shelf": "Chassis#1"
97 "lgx-device-name": "Some lgx-device-name",
98 "lgx-port-name": "Some lgx-port-name",
99 "lgx-port-rack": "000000.00",
100 "lgx-port-shelf": "00"
105 "port-device-name": "SPDR-SC1-XPDR1",
106 "port-type": "fixed",
107 "port-name": "XPDR1-NETWORK1",
108 "port-rack": "000000.00",
109 "port-shelf": "Chassis#1"
112 "lgx-device-name": "Some lgx-device-name",
113 "lgx-port-name": "Some lgx-port-name",
114 "lgx-port-rack": "000000.00",
115 "lgx-port-shelf": "00"
120 "due-date": "2018-06-15T00:00:01Z",
121 "operator-contact": "pw1234"
129 cls.processes = test_utils.start_tpce()
130 cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
133 def tearDownClass(cls):
134 for process in cls.processes:
135 test_utils.shutdown_process(process)
136 print("all processes killed")
141 def test_01_connect_spdrA(self):
142 response = test_utils.mount_device("SPDR-SA1", 'spdra')
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_02_connect_spdrC(self):
146 response = test_utils.mount_device("SPDR-SC1", 'spdrc')
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_03_connect_rdmA(self):
150 response = test_utils.mount_device("ROADM-A1", 'roadma')
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_04_connect_rdmC(self):
154 response = test_utils.mount_device("ROADM-C1", 'roadmc')
155 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157 def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
158 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
159 "ROADM-A1", "1", "SRG1-PP1-TXRX")
160 self.assertEqual(response.status_code, requests.codes.ok)
161 res = response.json()
162 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
165 def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
166 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
167 "ROADM-A1", "1", "SRG1-PP1-TXRX")
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
173 def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
174 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
175 "ROADM-C1", "1", "SRG1-PP1-TXRX")
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
181 def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
182 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
183 "ROADM-C1", "1", "SRG1-PP1-TXRX")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
189 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
190 # Config ROADMA-ROADMC oms-attributes
192 "auto-spanloss": "true",
193 "spanloss-base": 11.4,
194 "spanloss-current": 12,
195 "engineered-spanloss": 12.2,
196 "link-concatenation": [{
199 "SRLG-length": 100000,
201 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
202 self.assertEqual(response.status_code, requests.codes.created)
204 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
205 # Config ROADMC-ROADMA oms-attributes
207 "auto-spanloss": "true",
208 "spanloss-base": 11.4,
209 "spanloss-current": 12,
210 "engineered-spanloss": 12.2,
211 "link-concatenation": [{
214 "SRLG-length": 100000,
216 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
217 self.assertEqual(response.status_code, requests.codes.created)
219 # test service-create for OCH-OTU4 service from spdr to spdr
220 def test_11_check_otn_topology(self):
221 response = test_utils.get_otn_topo_request()
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 nbNode = len(res['network'][0]['node'])
225 self.assertEqual(nbNode, 4)
226 self.assertNotIn('ietf-network-topology:link', res['network'][0])
229 def test_12_create_OCH_OTU4_service(self):
230 response = test_utils.service_create_request(self.cr_serv_sample_data)
231 self.assertEqual(response.status_code, requests.codes.ok)
232 res = response.json()
233 self.assertIn('PCE calculation in progress',
234 res['output']['configuration-response-common']['response-message'])
235 time.sleep(self.WAITING)
237 def test_13_get_OCH_OTU4_service1(self):
238 response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
239 self.assertEqual(response.status_code, requests.codes.ok)
240 res = response.json()
242 res['services'][0]['administrative-state'], 'inService')
244 res['services'][0]['service-name'], 'service1-OCH-OTU4')
246 res['services'][0]['connection-type'], 'infrastructure')
248 res['services'][0]['lifecycle-state'], 'planned')
251 # Check correct configuration of devices
252 def test_14_check_interface_och_spdra(self):
253 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
257 'administrative-state': 'inService',
258 'supporting-circuit-pack-name': 'CP1-CFP0',
259 'type': 'org-openroadm-interfaces:opticalChannel',
260 'supporting-port': 'CP1-CFP0-P1'
261 }, **res['interface'][0]),
264 self.assertDictEqual(
265 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
266 u'transmit-power': -5},
267 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
269 def test_15_check_interface_OTU4_spdra(self):
270 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
274 'administrative-state': 'inService',
275 'supporting-circuit-pack-name': 'CP1-CFP0',
276 'supporting-interface': 'XPDR1-NETWORK1-1',
277 'type': 'org-openroadm-interfaces:otnOtu',
278 'supporting-port': 'CP1-CFP0-P1'
280 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
281 'expected-dapi': 'Swfw02qXGyI=',
282 'rate': 'org-openroadm-otn-common-types:OTU4',
285 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
288 self.assertDictEqual(input_dict_2,
290 ['org-openroadm-otn-otu-interfaces:otu'])
292 def test_16_check_interface_och_spdrc(self):
293 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
296 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
297 'administrative-state': 'inService',
298 'supporting-circuit-pack-name': 'CP1-CFP0',
299 'type': 'org-openroadm-interfaces:opticalChannel',
300 'supporting-port': 'CP1-CFP0-P1'
301 }, **res['interface'][0]),
304 self.assertDictEqual(
305 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
306 u'transmit-power': -5},
307 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
309 def test_17_check_interface_OTU4_spdrc(self):
310 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
314 'administrative-state': 'inService',
315 'supporting-circuit-pack-name': 'CP1-CFP0',
316 'supporting-interface': 'XPDR1-NETWORK1-1',
317 'type': 'org-openroadm-interfaces:otnOtu',
318 'supporting-port': 'CP1-CFP0-P1'
320 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
321 'expected-sapi': 'Swfw02qXGyI=',
322 'tx-sapi': 'fuYZwEO660g=',
323 'expected-dapi': 'fuYZwEO660g=',
324 'rate': 'org-openroadm-otn-common-types:OTU4',
328 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
331 self.assertDictEqual(input_dict_2,
333 ['org-openroadm-otn-otu-interfaces:otu'])
335 def test_18_check_no_interface_ODU4_spdra(self):
336 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
337 self.assertEqual(response.status_code, requests.codes.not_found)
338 res = response.json()
340 {"error-type": "application", "error-tag": "data-missing",
341 "error-message": "Request could not be completed because the relevant data model content does not exist"},
342 res['errors']['error'])
344 def test_19_check_openroadm_topo_spdra(self):
345 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
349 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
350 self.assertEqual({u'frequency': 196.1,
352 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
355 def test_20_check_openroadm_topo_ROADMA_SRG(self):
356 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertNotIn({u'index': 1},
360 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
361 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
363 if ele['tp-id'] == 'SRG1-PP1-TXRX':
364 self.assertIn({u'index': 1, u'frequency': 196.1,
366 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
367 if ele['tp-id'] == 'SRG1-PP2-TXRX':
368 self.assertNotIn('used-wavelength', dict.keys(ele))
371 def test_21_check_openroadm_topo_ROADMA_DEG(self):
372 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 self.assertNotIn({u'index': 1},
376 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
377 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
379 if ele['tp-id'] == 'DEG2-CTP-TXRX':
380 self.assertIn({u'index': 1, u'frequency': 196.1,
382 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
383 if ele['tp-id'] == 'DEG2-TTP-TXRX':
384 self.assertIn({u'index': 1, u'frequency': 196.1,
386 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
389 def test_22_check_otn_topo_otu4_links(self):
390 response = test_utils.get_otn_topo_request()
391 self.assertEqual(response.status_code, requests.codes.ok)
392 res = response.json()
393 nb_links = len(res['network'][0]['ietf-network-topology:link'])
394 self.assertEqual(nb_links, 2)
395 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
396 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
397 for link in res['network'][0]['ietf-network-topology:link']:
398 self.assertIn(link['link-id'], listLinkId)
399 self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
400 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
401 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
402 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
403 self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
405 # test service-create for ODU4 service from spdr to spdr
406 def test_23_create_ODU4_service(self):
407 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
408 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
409 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
410 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
411 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
412 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
413 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
415 response = test_utils.service_create_request(self.cr_serv_sample_data)
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 self.assertIn('PCE calculation in progress',
419 res['output']['configuration-response-common']['response-message'])
420 time.sleep(self.WAITING)
422 def test_24_get_ODU4_service1(self):
423 response = test_utils.get_service_list_request("services/service1-ODU4")
424 self.assertEqual(response.status_code, requests.codes.ok)
425 res = response.json()
427 res['services'][0]['administrative-state'], 'inService')
429 res['services'][0]['service-name'], 'service1-ODU4')
431 res['services'][0]['connection-type'], 'infrastructure')
433 res['services'][0]['lifecycle-state'], 'planned')
436 def test_25_check_interface_ODU4_spdra(self):
437 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
438 self.assertEqual(response.status_code, requests.codes.ok)
439 res = response.json()
440 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
441 'administrative-state': 'inService',
442 'supporting-circuit-pack-name': 'CP1-CFP0',
443 'supporting-interface': 'XPDR1-NETWORK1-OTU',
444 'type': 'org-openroadm-interfaces:otnOdu',
445 'supporting-port': 'CP1-CFP0-P1'}
446 # SAPI/DAPI are added in the Otu4 renderer
447 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
448 'rate': 'org-openroadm-otn-common-types:ODU4',
449 'expected-dapi': 'Swfw02qXGyI=',
450 'expected-sapi': 'fuYZwEO660g=',
451 'tx-dapi': 'fuYZwEO660g=',
452 'tx-sapi': 'Swfw02qXGyI='}
454 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
456 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
458 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
460 self.assertDictEqual(
461 {u'payload-type': u'21', u'exp-payload-type': u'21'},
462 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
464 def test_26_check_interface_ODU4_spdrc(self):
465 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
466 self.assertEqual(response.status_code, requests.codes.ok)
467 res = response.json()
468 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
469 'administrative-state': 'inService',
470 'supporting-circuit-pack-name': 'CP1-CFP0',
471 'supporting-interface': 'XPDR1-NETWORK1-OTU',
472 'type': 'org-openroadm-interfaces:otnOdu',
473 'supporting-port': 'CP1-CFP0-P1'}
474 # SAPI/DAPI are added in the Otu4 renderer
475 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
476 'rate': 'org-openroadm-otn-common-types:ODU4',
477 'tx-sapi': 'fuYZwEO660g=',
478 'tx-dapi': 'Swfw02qXGyI=',
479 'expected-sapi': 'Swfw02qXGyI=',
480 'expected-dapi': 'fuYZwEO660g='
482 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
484 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
486 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
488 self.assertDictEqual(
489 {u'payload-type': u'21', u'exp-payload-type': u'21'},
490 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
492 def test_27_check_otn_topo_links(self):
493 response = test_utils.get_otn_topo_request()
494 self.assertEqual(response.status_code, requests.codes.ok)
495 res = response.json()
496 nb_links = len(res['network'][0]['ietf-network-topology:link'])
497 self.assertEqual(nb_links, 4)
498 for link in res['network'][0]['ietf-network-topology:link']:
499 linkId = link['link-id']
500 if (linkId == 'OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
501 linkId == 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
502 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
503 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
504 elif (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
505 linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
506 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
507 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
508 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
509 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
510 self.assertIn(link['org-openroadm-common-network:opposite-link'],
511 ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
512 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
514 self.fail("this link should not exist")
516 def test_28_check_otn_topo_tp(self):
517 response = test_utils.get_otn_topo_request()
518 res = response.json()
519 for node in res['network'][0]['node']:
520 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
521 tpList = node['ietf-network-topology:termination-point']
523 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
524 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
525 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
526 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
527 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
528 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
530 # test service-create for 10GE service from spdr to spdr
531 def test_29_create_10GE_service(self):
532 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
533 self.cr_serv_sample_data["input"]["connection-type"] = "service"
534 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
535 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
536 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
537 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
538 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
539 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
540 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
541 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
542 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
543 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
544 response = test_utils.service_create_request(self.cr_serv_sample_data)
545 self.assertEqual(response.status_code, requests.codes.ok)
546 res = response.json()
547 self.assertIn('PCE calculation in progress',
548 res['output']['configuration-response-common']['response-message'])
549 time.sleep(self.WAITING)
551 def test_30_get_10GE_service1(self):
552 response = test_utils.get_service_list_request("services/service1-10GE")
553 self.assertEqual(response.status_code, requests.codes.ok)
554 res = response.json()
556 res['services'][0]['administrative-state'], 'inService')
558 res['services'][0]['service-name'], 'service1-10GE')
560 res['services'][0]['connection-type'], 'service')
562 res['services'][0]['lifecycle-state'], 'planned')
565 def test_31_check_interface_10GE_CLIENT_spdra(self):
566 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
570 'administrative-state': 'inService',
571 'supporting-circuit-pack-name': 'CP1-SFP4',
572 'type': 'org-openroadm-interfaces:ethernetCsmacd',
573 'supporting-port': 'CP1-SFP4-P1'
575 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
577 self.assertDictEqual(
579 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
581 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
582 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
583 self.assertEqual(response.status_code, requests.codes.ok)
584 res = response.json()
585 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
586 'administrative-state': 'inService',
587 'supporting-circuit-pack-name': 'CP1-SFP4',
588 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
589 'type': 'org-openroadm-interfaces:otnOdu',
590 'supporting-port': 'CP1-SFP4-P1'}
592 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
593 'rate': 'org-openroadm-otn-common-types:ODU2e',
594 'monitoring-mode': 'terminated'}
596 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
598 self.assertDictEqual(dict(input_dict_2,
599 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
600 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
601 self.assertDictEqual(
602 {u'payload-type': u'03', u'exp-payload-type': u'03'},
603 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
605 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
606 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
607 self.assertEqual(response.status_code, requests.codes.ok)
608 res = response.json()
609 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
610 'administrative-state': 'inService',
611 'supporting-circuit-pack-name': 'CP1-CFP0',
612 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
613 'type': 'org-openroadm-interfaces:otnOdu',
614 'supporting-port': 'CP1-CFP0-P1'}
616 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
617 'rate': 'org-openroadm-otn-common-types:ODU2e',
618 'monitoring-mode': 'monitored'}
619 input_dict_3 = {'trib-port-number': 1}
621 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
623 self.assertDictEqual(dict(input_dict_2,
624 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
625 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
626 self.assertDictEqual(dict(input_dict_3,
627 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
628 'parent-odu-allocation']),
629 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
630 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
633 def test_34_check_ODU2E_connection_spdra(self):
634 response = test_utils.check_netconf_node_request(
636 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
637 self.assertEqual(response.status_code, requests.codes.ok)
638 res = response.json()
641 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
642 'direction': 'bidirectional'
645 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
646 res['odu-connection'][0])
647 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
648 res['odu-connection'][0]['destination'])
649 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
650 res['odu-connection'][0]['source'])
652 def test_35_check_interface_10GE_CLIENT_spdrc(self):
653 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
656 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
657 'administrative-state': 'inService',
658 'supporting-circuit-pack-name': 'CP1-SFP4',
659 'type': 'org-openroadm-interfaces:ethernetCsmacd',
660 'supporting-port': 'CP1-SFP4-P1'
662 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
664 self.assertDictEqual(
666 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
668 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
669 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
670 self.assertEqual(response.status_code, requests.codes.ok)
671 res = response.json()
672 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
673 'administrative-state': 'inService',
674 'supporting-circuit-pack-name': 'CP1-SFP4',
675 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
676 'type': 'org-openroadm-interfaces:otnOdu',
677 'supporting-port': 'CP1-SFP4-P1'}
679 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
680 'rate': 'org-openroadm-otn-common-types:ODU2e',
681 'monitoring-mode': 'terminated'}
683 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
685 self.assertDictEqual(dict(input_dict_2,
686 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
687 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
688 self.assertDictEqual(
689 {u'payload-type': u'03', u'exp-payload-type': u'03'},
690 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
692 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
693 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
694 self.assertEqual(response.status_code, requests.codes.ok)
695 res = response.json()
696 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
697 'administrative-state': 'inService',
698 'supporting-circuit-pack-name': 'CP1-CFP0',
699 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
700 'type': 'org-openroadm-interfaces:otnOdu',
701 'supporting-port': 'CP1-CFP0-P1'}
703 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
704 'rate': 'org-openroadm-otn-common-types:ODU2e',
705 'monitoring-mode': 'monitored'}
707 input_dict_3 = {'trib-port-number': 1}
709 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
711 self.assertDictEqual(dict(input_dict_2,
712 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
713 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
714 self.assertDictEqual(dict(input_dict_3,
715 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
716 'parent-odu-allocation']),
717 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
718 'parent-odu-allocation'])
721 'org-openroadm-otn-odu-interfaces:odu'][
722 'parent-odu-allocation']['trib-slots'])
724 def test_38_check_ODU2E_connection_spdrc(self):
725 response = test_utils.check_netconf_node_request(
727 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
728 self.assertEqual(response.status_code, requests.codes.ok)
729 res = response.json()
732 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
733 'direction': 'bidirectional'
736 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
737 res['odu-connection'][0])
738 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
739 res['odu-connection'][0]['destination'])
740 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
741 res['odu-connection'][0]['source'])
743 def test_39_check_otn_topo_links(self):
744 response = test_utils.get_otn_topo_request()
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 nb_links = len(res['network'][0]['ietf-network-topology:link'])
748 self.assertEqual(nb_links, 4)
749 for link in res['network'][0]['ietf-network-topology:link']:
750 linkId = link['link-id']
751 if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
752 linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
753 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
754 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
756 def test_40_check_otn_topo_tp(self):
757 response = test_utils.get_otn_topo_request()
758 res = response.json()
759 for node in res['network'][0]['node']:
760 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
761 tpList = node['ietf-network-topology:termination-point']
763 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
764 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
765 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
766 tsPoolList = [i for i in range(1, 9)]
767 self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
768 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
769 self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
771 def test_41_delete_10GE_service(self):
772 url = "{}/operations/org-openroadm-service:service-delete"
774 "sdnc-request-header": {
775 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
776 "rpc-action": "service-delete",
777 "request-system-id": "appname",
778 "notification-url": "http://localhost:8585/NotificationServer/notify"
780 "service-delete-req-info": {
781 "service-name": "service1-10GE",
782 "tail-retention": "no"
786 response = test_utils.post_request(url, data)
787 self.assertEqual(response.status_code, requests.codes.ok)
788 res = response.json()
789 self.assertIn('Renderer service delete in progress',
790 res['output']['configuration-response-common']['response-message'])
793 def test_42_check_service_list(self):
794 response = test_utils.get_service_list_request("")
795 self.assertEqual(response.status_code, requests.codes.ok)
796 res = response.json()
797 self.assertEqual(len(res['service-list']['services']), 2)
800 def test_43_check_no_ODU2e_connection_spdra(self):
801 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
802 self.assertEqual(response.status_code, requests.codes.ok)
803 res = response.json()
804 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
807 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
808 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
809 self.assertEqual(response.status_code, requests.codes.not_found)
811 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
812 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
813 self.assertEqual(response.status_code, requests.codes.not_found)
815 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
816 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
817 self.assertEqual(response.status_code, requests.codes.not_found)
819 def test_47_check_otn_topo_links(self):
820 response = test_utils.get_otn_topo_request()
821 self.assertEqual(response.status_code, requests.codes.ok)
822 res = response.json()
823 nb_links = len(res['network'][0]['ietf-network-topology:link'])
824 self.assertEqual(nb_links, 4)
825 for link in res['network'][0]['ietf-network-topology:link']:
826 linkId = link['link-id']
827 if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
828 linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
829 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
830 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
832 def test_48_check_otn_topo_tp(self):
833 response = test_utils.get_otn_topo_request()
834 res = response.json()
835 for node in res['network'][0]['node']:
836 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
837 tpList = node['ietf-network-topology:termination-point']
839 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
840 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
841 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
842 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
844 def test_49_delete_ODU4_service(self):
845 url = "{}/operations/org-openroadm-service:service-delete"
847 "sdnc-request-header": {
848 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
849 "rpc-action": "service-delete",
850 "request-system-id": "appname",
851 "notification-url": "http://localhost:8585/NotificationServer/notify"
853 "service-delete-req-info": {
854 "service-name": "service1-ODU4",
855 "tail-retention": "no"
859 response = test_utils.post_request(url, data)
860 self.assertEqual(response.status_code, requests.codes.ok)
861 res = response.json()
862 self.assertIn('Renderer service delete in progress',
863 res['output']['configuration-response-common']['response-message'])
866 def test_50_check_service_list(self):
867 response = test_utils.get_service_list_request("")
868 self.assertEqual(response.status_code, requests.codes.ok)
869 res = response.json()
870 self.assertEqual(len(res['service-list']['services']), 1)
873 def test_51_check_no_interface_ODU4_spdra(self):
874 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
875 self.assertEqual(response.status_code, requests.codes.not_found)
877 def test_52_check_otn_topo_links(self):
878 self.test_22_check_otn_topo_otu4_links()
880 def test_53_check_otn_topo_tp(self):
881 response = test_utils.get_otn_topo_request()
882 res = response.json()
883 for node in res['network'][0]['node']:
884 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
885 tpList = node['ietf-network-topology:termination-point']
887 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
888 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
889 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
890 self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
892 def test_54_delete_OCH_OTU4_service(self):
893 url = "{}/operations/org-openroadm-service:service-delete"
895 "sdnc-request-header": {
896 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
897 "rpc-action": "service-delete",
898 "request-system-id": "appname",
899 "notification-url": "http://localhost:8585/NotificationServer/notify"
901 "service-delete-req-info": {
902 "service-name": "service1-OCH-OTU4",
903 "tail-retention": "no"
907 response = test_utils.post_request(url, data)
908 self.assertEqual(response.status_code, requests.codes.ok)
909 res = response.json()
910 self.assertIn('Renderer service delete in progress',
911 res['output']['configuration-response-common']['response-message'])
914 def test_55_get_no_service(self):
915 response = test_utils.get_service_list_request("")
916 self.assertEqual(response.status_code, requests.codes.not_found)
917 res = response.json()
919 {"error-type": "application", "error-tag": "data-missing",
920 "error-message": "Request could not be completed because the relevant data model content does not exist"},
921 res['errors']['error'])
924 def test_56_check_no_interface_OTU4_spdra(self):
925 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
926 self.assertEqual(response.status_code, requests.codes.not_found)
928 def test_57_check_no_interface_OCH_spdra(self):
929 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
930 self.assertEqual(response.status_code, requests.codes.not_found)
932 def test_58_getLinks_OtnTopology(self):
933 response = test_utils.get_otn_topo_request()
934 self.assertEqual(response.status_code, requests.codes.ok)
935 res = response.json()
936 self.assertNotIn('ietf-network-topology:link', res['network'][0])
938 def test_59_check_openroadm_topo_spdra(self):
939 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
940 self.assertEqual(response.status_code, requests.codes.ok)
941 res = response.json()
942 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
943 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
944 self.assertNotIn('wavelength', dict.keys(
945 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
948 def test_60_check_openroadm_topo_ROADMA_SRG(self):
949 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
950 self.assertEqual(response.status_code, requests.codes.ok)
951 res = response.json()
952 self.assertIn({u'index': 1},
953 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
954 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
956 if ele['tp-id'] == 'SRG1-PP1-TXRX':
957 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
960 def test_61_check_openroadm_topo_ROADMA_DEG(self):
961 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
962 self.assertEqual(response.status_code, requests.codes.ok)
963 res = response.json()
964 self.assertIn({u'index': 1},
965 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
966 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
968 if ele['tp-id'] == 'DEG2-CTP-TXRX':
969 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
970 if ele['tp-id'] == 'DEG2-TTP-TXRX':
971 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
974 def test_62_disconnect_spdrA(self):
975 response = test_utils.unmount_device("SPDR-SA1")
976 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
978 def test_63_disconnect_spdrC(self):
979 response = test_utils.unmount_device("SPDR-SC1")
980 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
982 def test_64_disconnect_roadmA(self):
983 response = test_utils.unmount_device("ROADM-A1")
984 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
986 def test_65_disconnect_roadmC(self):
987 response = test_utils.unmount_device("ROADM-C1")
988 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
991 if __name__ == "__main__":
992 unittest.main(verbosity=2)