3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
15 from common import test_utils
18 class TransportPCEtesting(unittest.TestCase):
21 WAITING = 20 # nominal value is 300
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "request-1",
26 "rpc-action": "service-create",
27 "request-system-id": "appname"
29 "service-name": "service1-OCH-OTU4",
30 "common-id": "commonId",
31 "connection-type": "infrastructure",
33 "service-rate": "100",
34 "node-id": "SPDR-SA1",
35 "service-format": "OTU",
36 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
40 "committed-info-rate": "100000",
41 "committed-burst-size": "64"
46 "port-device-name": "SPDR-SA1-XPDR1",
48 "port-name": "XPDR1-NETWORK1",
49 "port-rack": "000000.00",
50 "port-shelf": "Chassis#1"
53 "lgx-device-name": "Some lgx-device-name",
54 "lgx-port-name": "Some lgx-port-name",
55 "lgx-port-rack": "000000.00",
56 "lgx-port-shelf": "00"
61 "port-device-name": "SPDR-SA1-XPDR1",
63 "port-name": "XPDR1-NETWORK1",
64 "port-rack": "000000.00",
65 "port-shelf": "Chassis#1"
68 "lgx-device-name": "Some lgx-device-name",
69 "lgx-port-name": "Some lgx-port-name",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
78 "node-id": "SPDR-SC1",
79 "service-format": "OTU",
80 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
84 "committed-info-rate": "100000",
85 "committed-burst-size": "64"
90 "port-device-name": "SPDR-SC1-XPDR1",
92 "port-name": "XPDR1-NETWORK1",
93 "port-rack": "000000.00",
94 "port-shelf": "Chassis#1"
97 "lgx-device-name": "Some lgx-device-name",
98 "lgx-port-name": "Some lgx-port-name",
99 "lgx-port-rack": "000000.00",
100 "lgx-port-shelf": "00"
105 "port-device-name": "SPDR-SC1-XPDR1",
106 "port-type": "fixed",
107 "port-name": "XPDR1-NETWORK1",
108 "port-rack": "000000.00",
109 "port-shelf": "Chassis#1"
112 "lgx-device-name": "Some lgx-device-name",
113 "lgx-port-name": "Some lgx-port-name",
114 "lgx-port-rack": "000000.00",
115 "lgx-port-shelf": "00"
120 "due-date": "2018-06-15T00:00:01Z",
121 "operator-contact": "pw1234"
127 cls.processes = test_utils.start_tpce()
128 cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
131 def tearDownClass(cls):
132 for process in cls.processes:
133 test_utils.shutdown_process(process)
134 print("all processes killed")
139 def test_01_connect_spdrA(self):
140 response = test_utils.mount_device("SPDR-SA1", 'spdra')
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_02_connect_spdrC(self):
144 response = test_utils.mount_device("SPDR-SC1", 'spdrc')
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_03_connect_rdmA(self):
148 response = test_utils.mount_device("ROADM-A1", 'roadma')
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_04_connect_rdmC(self):
152 response = test_utils.mount_device("ROADM-C1", 'roadmc')
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
156 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
157 "ROADM-A1", "1", "SRG1-PP1-TXRX")
158 self.assertEqual(response.status_code, requests.codes.ok)
159 res = response.json()
160 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
163 def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
164 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
165 "ROADM-A1", "1", "SRG1-PP1-TXRX")
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
171 def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
172 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
173 "ROADM-C1", "1", "SRG1-PP1-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
179 def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
180 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
181 "ROADM-C1", "1", "SRG1-PP1-TXRX")
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
187 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
188 # Config ROADMA-ROADMC oms-attributes
190 "auto-spanloss": "true",
191 "spanloss-base": 11.4,
192 "spanloss-current": 12,
193 "engineered-spanloss": 12.2,
194 "link-concatenation": [{
197 "SRLG-length": 100000,
199 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
200 self.assertEqual(response.status_code, requests.codes.created)
202 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
203 # Config ROADMC-ROADMA oms-attributes
205 "auto-spanloss": "true",
206 "spanloss-base": 11.4,
207 "spanloss-current": 12,
208 "engineered-spanloss": 12.2,
209 "link-concatenation": [{
212 "SRLG-length": 100000,
214 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
215 self.assertEqual(response.status_code, requests.codes.created)
217 # test service-create for OCH-OTU4 service from spdr to spdr
218 def test_11_check_otn_topology(self):
219 response = test_utils.get_otn_topo_request()
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
222 nbNode = len(res['network'][0]['node'])
223 self.assertEqual(nbNode, 4)
224 self.assertNotIn('ietf-network-topology:link', res['network'][0])
226 def test_12_create_OCH_OTU4_service(self):
227 response = test_utils.service_create_request(self.cr_serv_sample_data)
228 self.assertEqual(response.status_code, requests.codes.ok)
229 res = response.json()
230 self.assertIn('PCE calculation in progress',
231 res['output']['configuration-response-common']['response-message'])
232 time.sleep(self.WAITING)
234 def test_13_get_OCH_OTU4_service1(self):
235 response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
236 self.assertEqual(response.status_code, requests.codes.ok)
237 res = response.json()
239 res['services'][0]['administrative-state'], 'inService')
241 res['services'][0]['service-name'], 'service1-OCH-OTU4')
243 res['services'][0]['connection-type'], 'infrastructure')
245 res['services'][0]['lifecycle-state'], 'planned')
248 # Check correct configuration of devices
249 def test_14_check_interface_och_spdra(self):
250 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
251 self.assertEqual(response.status_code, requests.codes.ok)
252 res = response.json()
253 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
254 'administrative-state': 'inService',
255 'supporting-circuit-pack-name': 'CP1-CFP0',
256 'type': 'org-openroadm-interfaces:opticalChannel',
257 'supporting-port': 'CP1-CFP0-P1'
258 }, **res['interface'][0]),
261 self.assertDictEqual(
262 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
263 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
264 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
266 def test_15_check_interface_OTU4_spdra(self):
267 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
268 self.assertEqual(response.status_code, requests.codes.ok)
269 res = response.json()
270 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
271 'administrative-state': 'inService',
272 'supporting-circuit-pack-name': 'CP1-CFP0',
273 'supporting-interface': 'XPDR1-NETWORK1-1',
274 'type': 'org-openroadm-interfaces:otnOtu',
275 'supporting-port': 'CP1-CFP0-P1'
277 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
278 'expected-dapi': 'Swfw02qXGyI=',
279 'rate': 'org-openroadm-otn-common-types:OTU4',
282 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
285 self.assertDictEqual(input_dict_2,
287 ['org-openroadm-otn-otu-interfaces:otu'])
289 def test_16_check_interface_och_spdrc(self):
290 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
291 self.assertEqual(response.status_code, requests.codes.ok)
292 res = response.json()
293 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
294 'administrative-state': 'inService',
295 'supporting-circuit-pack-name': 'CP1-CFP0',
296 'type': 'org-openroadm-interfaces:opticalChannel',
297 'supporting-port': 'CP1-CFP0-P1'
298 }, **res['interface'][0]),
301 self.assertDictEqual(
302 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
303 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
304 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
306 def test_17_check_interface_OTU4_spdrc(self):
307 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
311 'administrative-state': 'inService',
312 'supporting-circuit-pack-name': 'CP1-CFP0',
313 'supporting-interface': 'XPDR1-NETWORK1-1',
314 'type': 'org-openroadm-interfaces:otnOtu',
315 'supporting-port': 'CP1-CFP0-P1'
317 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
318 'expected-sapi': 'Swfw02qXGyI=',
319 'tx-sapi': 'fuYZwEO660g=',
320 'expected-dapi': 'fuYZwEO660g=',
321 'rate': 'org-openroadm-otn-common-types:OTU4',
325 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
328 self.assertDictEqual(input_dict_2,
330 ['org-openroadm-otn-otu-interfaces:otu'])
332 def test_18_check_no_interface_ODU4_spdra(self):
333 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
334 self.assertEqual(response.status_code, requests.codes.not_found)
335 res = response.json()
337 {"error-type": "application", "error-tag": "data-missing",
338 "error-message": "Request could not be completed because the relevant data model content does not exist"},
339 res['errors']['error'])
341 def test_19_check_openroadm_topo_spdra(self):
342 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
343 self.assertEqual(response.status_code, requests.codes.ok)
344 res = response.json()
345 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
346 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
347 self.assertEqual({u'frequency': 196.1,
349 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
352 def test_20_check_openroadm_topo_ROADMA_SRG(self):
353 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertNotIn({u'index': 1},
357 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
358 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
360 if ele['tp-id'] == 'SRG1-PP1-TXRX':
361 self.assertIn({u'index': 1, u'frequency': 196.1,
363 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
364 if ele['tp-id'] == 'SRG1-PP2-TXRX':
365 self.assertNotIn('used-wavelength', dict.keys(ele))
368 def test_21_check_openroadm_topo_ROADMA_DEG(self):
369 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
372 self.assertNotIn({u'index': 1},
373 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
374 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
376 if ele['tp-id'] == 'DEG2-CTP-TXRX':
377 self.assertIn({u'index': 1, u'frequency': 196.1,
379 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
380 if ele['tp-id'] == 'DEG2-TTP-TXRX':
381 self.assertIn({u'index': 1, u'frequency': 196.1,
383 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
386 def test_22_check_otn_topo_otu4_links(self):
387 response = test_utils.get_otn_topo_request()
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 nb_links = len(res['network'][0]['ietf-network-topology:link'])
391 self.assertEqual(nb_links, 2)
392 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
393 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
394 for link in res['network'][0]['ietf-network-topology:link']:
395 self.assertIn(link['link-id'], listLinkId)
396 self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
397 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
398 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
399 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
400 self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
402 # test service-create for ODU4 service from spdr to spdr
403 def test_23_create_ODU4_service(self):
404 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
405 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
406 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
407 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
408 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
409 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
410 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
412 response = test_utils.service_create_request(self.cr_serv_sample_data)
413 self.assertEqual(response.status_code, requests.codes.ok)
414 res = response.json()
415 self.assertIn('PCE calculation in progress',
416 res['output']['configuration-response-common']['response-message'])
417 time.sleep(self.WAITING)
419 def test_24_get_ODU4_service1(self):
420 response = test_utils.get_service_list_request("services/service1-ODU4")
421 self.assertEqual(response.status_code, requests.codes.ok)
422 res = response.json()
424 res['services'][0]['administrative-state'], 'inService')
426 res['services'][0]['service-name'], 'service1-ODU4')
428 res['services'][0]['connection-type'], 'infrastructure')
430 res['services'][0]['lifecycle-state'], 'planned')
433 def test_25_check_interface_ODU4_spdra(self):
434 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
438 'administrative-state': 'inService',
439 'supporting-circuit-pack-name': 'CP1-CFP0',
440 'supporting-interface': 'XPDR1-NETWORK1-OTU',
441 'type': 'org-openroadm-interfaces:otnOdu',
442 'supporting-port': 'CP1-CFP0-P1'}
443 # SAPI/DAPI are added in the Otu4 renderer
444 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
445 'rate': 'org-openroadm-otn-common-types:ODU4',
446 'expected-dapi': 'Swfw02qXGyI=',
447 'expected-sapi': 'fuYZwEO660g=',
448 'tx-dapi': 'fuYZwEO660g=',
449 'tx-sapi': 'Swfw02qXGyI='}
451 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
453 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
455 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
457 self.assertDictEqual(
458 {u'payload-type': u'21', u'exp-payload-type': u'21'},
459 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
461 def test_26_check_interface_ODU4_spdrc(self):
462 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
463 self.assertEqual(response.status_code, requests.codes.ok)
464 res = response.json()
465 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
466 'administrative-state': 'inService',
467 'supporting-circuit-pack-name': 'CP1-CFP0',
468 'supporting-interface': 'XPDR1-NETWORK1-OTU',
469 'type': 'org-openroadm-interfaces:otnOdu',
470 'supporting-port': 'CP1-CFP0-P1'}
471 # SAPI/DAPI are added in the Otu4 renderer
472 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
473 'rate': 'org-openroadm-otn-common-types:ODU4',
474 'tx-sapi': 'fuYZwEO660g=',
475 'tx-dapi': 'Swfw02qXGyI=',
476 'expected-sapi': 'Swfw02qXGyI=',
477 'expected-dapi': 'fuYZwEO660g='
479 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
481 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
483 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
485 self.assertDictEqual(
486 {u'payload-type': u'21', u'exp-payload-type': u'21'},
487 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
489 def test_27_check_otn_topo_links(self):
490 response = test_utils.get_otn_topo_request()
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 nb_links = len(res['network'][0]['ietf-network-topology:link'])
494 self.assertEqual(nb_links, 4)
495 for link in res['network'][0]['ietf-network-topology:link']:
496 linkId = link['link-id']
497 if (linkId == 'OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
498 linkId == 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
499 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
500 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
501 elif (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
502 linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
503 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
504 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
505 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
506 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
507 self.assertIn(link['org-openroadm-common-network:opposite-link'],
508 ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
509 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
511 self.fail("this link should not exist")
513 def test_28_check_otn_topo_tp(self):
514 response = test_utils.get_otn_topo_request()
515 res = response.json()
516 for node in res['network'][0]['node']:
517 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
518 tpList = node['ietf-network-topology:termination-point']
520 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
521 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
522 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
523 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
524 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
525 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
527 # test service-create for 10GE service from spdr to spdr
528 def test_29_create_10GE_service(self):
529 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
530 self.cr_serv_sample_data["input"]["connection-type"] = "service"
531 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
532 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
533 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
534 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
535 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
536 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
537 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
538 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
539 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
540 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
541 response = test_utils.service_create_request(self.cr_serv_sample_data)
542 self.assertEqual(response.status_code, requests.codes.ok)
543 res = response.json()
544 self.assertIn('PCE calculation in progress',
545 res['output']['configuration-response-common']['response-message'])
546 time.sleep(self.WAITING)
548 def test_30_get_10GE_service1(self):
549 response = test_utils.get_service_list_request("services/service1-10GE")
550 self.assertEqual(response.status_code, requests.codes.ok)
551 res = response.json()
553 res['services'][0]['administrative-state'], 'inService')
555 res['services'][0]['service-name'], 'service1-10GE')
557 res['services'][0]['connection-type'], 'service')
559 res['services'][0]['lifecycle-state'], 'planned')
562 def test_31_check_interface_10GE_CLIENT_spdra(self):
563 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
564 self.assertEqual(response.status_code, requests.codes.ok)
565 res = response.json()
566 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
567 'administrative-state': 'inService',
568 'supporting-circuit-pack-name': 'CP1-SFP4',
569 'type': 'org-openroadm-interfaces:ethernetCsmacd',
570 'supporting-port': 'CP1-SFP4-P1'
572 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
574 self.assertDictEqual(
576 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
578 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
579 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
580 self.assertEqual(response.status_code, requests.codes.ok)
581 res = response.json()
582 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
583 'administrative-state': 'inService',
584 'supporting-circuit-pack-name': 'CP1-SFP4',
585 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
586 'type': 'org-openroadm-interfaces:otnOdu',
587 'supporting-port': 'CP1-SFP4-P1'}
589 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
590 'rate': 'org-openroadm-otn-common-types:ODU2e',
591 'monitoring-mode': 'terminated'}
593 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
595 self.assertDictEqual(dict(input_dict_2,
596 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
597 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
598 self.assertDictEqual(
599 {u'payload-type': u'03', u'exp-payload-type': u'03'},
600 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
602 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
603 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
604 self.assertEqual(response.status_code, requests.codes.ok)
605 res = response.json()
606 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
607 'administrative-state': 'inService',
608 'supporting-circuit-pack-name': 'CP1-CFP0',
609 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
610 'type': 'org-openroadm-interfaces:otnOdu',
611 'supporting-port': 'CP1-CFP0-P1'}
613 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
614 'rate': 'org-openroadm-otn-common-types:ODU2e',
615 'monitoring-mode': 'monitored'}
616 input_dict_3 = {'trib-port-number': 1}
618 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
620 self.assertDictEqual(dict(input_dict_2,
621 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
622 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
623 self.assertDictEqual(dict(input_dict_3,
624 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
625 'parent-odu-allocation']),
626 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
627 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
630 def test_34_check_ODU2E_connection_spdra(self):
631 response = test_utils.check_netconf_node_request(
633 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
634 self.assertEqual(response.status_code, requests.codes.ok)
635 res = response.json()
638 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
639 'direction': 'bidirectional'
642 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
643 res['odu-connection'][0])
644 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
645 res['odu-connection'][0]['destination'])
646 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
647 res['odu-connection'][0]['source'])
649 def test_35_check_interface_10GE_CLIENT_spdrc(self):
650 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
651 self.assertEqual(response.status_code, requests.codes.ok)
652 res = response.json()
653 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
654 'administrative-state': 'inService',
655 'supporting-circuit-pack-name': 'CP1-SFP4',
656 'type': 'org-openroadm-interfaces:ethernetCsmacd',
657 'supporting-port': 'CP1-SFP4-P1'
659 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
661 self.assertDictEqual(
663 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
665 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
666 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
667 self.assertEqual(response.status_code, requests.codes.ok)
668 res = response.json()
669 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
670 'administrative-state': 'inService',
671 'supporting-circuit-pack-name': 'CP1-SFP4',
672 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
673 'type': 'org-openroadm-interfaces:otnOdu',
674 'supporting-port': 'CP1-SFP4-P1'}
676 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
677 'rate': 'org-openroadm-otn-common-types:ODU2e',
678 'monitoring-mode': 'terminated'}
680 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
682 self.assertDictEqual(dict(input_dict_2,
683 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
684 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
685 self.assertDictEqual(
686 {u'payload-type': u'03', u'exp-payload-type': u'03'},
687 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
689 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
690 response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
691 self.assertEqual(response.status_code, requests.codes.ok)
692 res = response.json()
693 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
694 'administrative-state': 'inService',
695 'supporting-circuit-pack-name': 'CP1-CFP0',
696 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
697 'type': 'org-openroadm-interfaces:otnOdu',
698 'supporting-port': 'CP1-CFP0-P1'}
700 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
701 'rate': 'org-openroadm-otn-common-types:ODU2e',
702 'monitoring-mode': 'monitored'}
704 input_dict_3 = {'trib-port-number': 1}
706 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
708 self.assertDictEqual(dict(input_dict_2,
709 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
710 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
711 self.assertDictEqual(dict(input_dict_3,
712 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
713 'parent-odu-allocation']),
714 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
715 'parent-odu-allocation'])
718 'org-openroadm-otn-odu-interfaces:odu'][
719 'parent-odu-allocation']['trib-slots'])
721 def test_38_check_ODU2E_connection_spdrc(self):
722 response = test_utils.check_netconf_node_request(
724 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
725 self.assertEqual(response.status_code, requests.codes.ok)
726 res = response.json()
729 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
730 'direction': 'bidirectional'
733 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
734 res['odu-connection'][0])
735 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
736 res['odu-connection'][0]['destination'])
737 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
738 res['odu-connection'][0]['source'])
740 def test_39_check_otn_topo_links(self):
741 response = test_utils.get_otn_topo_request()
742 self.assertEqual(response.status_code, requests.codes.ok)
743 res = response.json()
744 nb_links = len(res['network'][0]['ietf-network-topology:link'])
745 self.assertEqual(nb_links, 4)
746 for link in res['network'][0]['ietf-network-topology:link']:
747 linkId = link['link-id']
748 if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
749 linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
750 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
751 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
753 def test_40_check_otn_topo_tp(self):
754 response = test_utils.get_otn_topo_request()
755 res = response.json()
756 for node in res['network'][0]['node']:
757 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
758 tpList = node['ietf-network-topology:termination-point']
760 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
761 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
762 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
763 tsPoolList = [i for i in range(1, 9)]
764 self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
765 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
766 self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
768 def test_41_delete_10GE_service(self):
769 url = "{}/operations/org-openroadm-service:service-delete"
771 "sdnc-request-header": {
772 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
773 "rpc-action": "service-delete",
774 "request-system-id": "appname",
775 "notification-url": "http://localhost:8585/NotificationServer/notify"
777 "service-delete-req-info": {
778 "service-name": "service1-10GE",
779 "tail-retention": "no"
783 response = test_utils.post_request(url, data)
784 self.assertEqual(response.status_code, requests.codes.ok)
785 res = response.json()
786 self.assertIn('Renderer service delete in progress',
787 res['output']['configuration-response-common']['response-message'])
790 def test_42_check_service_list(self):
791 response = test_utils.get_service_list_request("")
792 self.assertEqual(response.status_code, requests.codes.ok)
793 res = response.json()
794 self.assertEqual(len(res['service-list']['services']), 2)
797 def test_43_check_no_ODU2e_connection_spdra(self):
798 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
799 self.assertEqual(response.status_code, requests.codes.ok)
800 res = response.json()
801 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
804 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
805 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
806 self.assertEqual(response.status_code, requests.codes.not_found)
808 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
809 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
810 self.assertEqual(response.status_code, requests.codes.not_found)
812 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
813 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
814 self.assertEqual(response.status_code, requests.codes.not_found)
816 def test_47_check_otn_topo_links(self):
817 response = test_utils.get_otn_topo_request()
818 self.assertEqual(response.status_code, requests.codes.ok)
819 res = response.json()
820 nb_links = len(res['network'][0]['ietf-network-topology:link'])
821 self.assertEqual(nb_links, 4)
822 for link in res['network'][0]['ietf-network-topology:link']:
823 linkId = link['link-id']
824 if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
825 linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
826 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
827 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
829 def test_48_check_otn_topo_tp(self):
830 response = test_utils.get_otn_topo_request()
831 res = response.json()
832 for node in res['network'][0]['node']:
833 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
834 tpList = node['ietf-network-topology:termination-point']
836 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
837 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
838 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
839 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
841 def test_49_delete_ODU4_service(self):
842 url = "{}/operations/org-openroadm-service:service-delete"
844 "sdnc-request-header": {
845 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
846 "rpc-action": "service-delete",
847 "request-system-id": "appname",
848 "notification-url": "http://localhost:8585/NotificationServer/notify"
850 "service-delete-req-info": {
851 "service-name": "service1-ODU4",
852 "tail-retention": "no"
856 response = test_utils.post_request(url, data)
857 self.assertEqual(response.status_code, requests.codes.ok)
858 res = response.json()
859 self.assertIn('Renderer service delete in progress',
860 res['output']['configuration-response-common']['response-message'])
863 def test_50_check_service_list(self):
864 response = test_utils.get_service_list_request("")
865 self.assertEqual(response.status_code, requests.codes.ok)
866 res = response.json()
867 self.assertEqual(len(res['service-list']['services']), 1)
870 def test_51_check_no_interface_ODU4_spdra(self):
871 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
872 self.assertEqual(response.status_code, requests.codes.not_found)
874 def test_52_check_otn_topo_links(self):
875 self.test_22_check_otn_topo_otu4_links()
877 def test_53_check_otn_topo_tp(self):
878 response = test_utils.get_otn_topo_request()
879 res = response.json()
880 for node in res['network'][0]['node']:
881 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
882 tpList = node['ietf-network-topology:termination-point']
884 if (tp['tp-id'] == 'XPDR1-NETWORK1'):
885 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
886 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
887 self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
889 def test_54_delete_OCH_OTU4_service(self):
890 url = "{}/operations/org-openroadm-service:service-delete"
892 "sdnc-request-header": {
893 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
894 "rpc-action": "service-delete",
895 "request-system-id": "appname",
896 "notification-url": "http://localhost:8585/NotificationServer/notify"
898 "service-delete-req-info": {
899 "service-name": "service1-OCH-OTU4",
900 "tail-retention": "no"
904 response = test_utils.post_request(url, data)
905 self.assertEqual(response.status_code, requests.codes.ok)
906 res = response.json()
907 self.assertIn('Renderer service delete in progress',
908 res['output']['configuration-response-common']['response-message'])
911 def test_55_get_no_service(self):
912 response = test_utils.get_service_list_request("")
913 self.assertEqual(response.status_code, requests.codes.not_found)
914 res = response.json()
916 {"error-type": "application", "error-tag": "data-missing",
917 "error-message": "Request could not be completed because the relevant data model content does not exist"},
918 res['errors']['error'])
921 def test_56_check_no_interface_OTU4_spdra(self):
922 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
923 self.assertEqual(response.status_code, requests.codes.not_found)
925 def test_57_check_no_interface_OCH_spdra(self):
926 response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
927 self.assertEqual(response.status_code, requests.codes.not_found)
929 def test_58_getLinks_OtnTopology(self):
930 response = test_utils.get_otn_topo_request()
931 self.assertEqual(response.status_code, requests.codes.ok)
932 res = response.json()
933 self.assertNotIn('ietf-network-topology:link', res['network'][0])
935 def test_59_check_openroadm_topo_spdra(self):
936 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
937 self.assertEqual(response.status_code, requests.codes.ok)
938 res = response.json()
939 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
940 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
941 self.assertNotIn('wavelength', dict.keys(
942 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
945 def test_60_check_openroadm_topo_ROADMA_SRG(self):
946 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
947 self.assertEqual(response.status_code, requests.codes.ok)
948 res = response.json()
949 self.assertIn({u'index': 1},
950 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
951 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
953 if ele['tp-id'] == 'SRG1-PP1-TXRX':
954 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
957 def test_61_check_openroadm_topo_ROADMA_DEG(self):
958 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
959 self.assertEqual(response.status_code, requests.codes.ok)
960 res = response.json()
961 self.assertIn({u'index': 1},
962 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
963 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
965 if ele['tp-id'] == 'DEG2-CTP-TXRX':
966 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
967 if ele['tp-id'] == 'DEG2-TTP-TXRX':
968 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
971 def test_62_disconnect_spdrA(self):
972 response = test_utils.unmount_device("SPDR-SA1")
973 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
975 def test_63_disconnect_spdrC(self):
976 response = test_utils.unmount_device("SPDR-SC1")
977 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
979 def test_64_disconnect_roadmA(self):
980 response = test_utils.unmount_device("ROADM-A1")
981 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
983 def test_65_disconnect_roadmC(self):
984 response = test_utils.unmount_device("ROADM-C1")
985 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
988 if __name__ == "__main__":
989 unittest.main(verbosity=2)