3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
20 sys.path.append('transportpce_tests/common/')
23 class TransportPCEtesting(unittest.TestCase):
26 WAITING = 20 # nominal value is 300
27 NODE_VERSION_221 = '2.2.1'
28 NODE_VERSION_71 = '7.1'
30 cr_serv_sample_data = {"input": {
31 "sdnc-request-header": {
32 "request-id": "request-1",
33 "rpc-action": "service-create",
34 "request-system-id": "appname"
36 "service-name": "service1-OTUC4",
37 "common-id": "commonId",
38 "connection-type": "infrastructure",
40 "service-rate": "400",
42 "service-format": "OTU",
43 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
47 "committed-info-rate": "100000",
48 "committed-burst-size": "64"
53 "port-device-name": "XPDR-A2-XPDR2",
55 "port-name": "XPDR2-NETWORK1",
56 "port-rack": "000000.00",
57 "port-shelf": "Chassis#1"
60 "lgx-device-name": "Some lgx-device-name",
61 "lgx-port-name": "Some lgx-port-name",
62 "lgx-port-rack": "000000.00",
63 "lgx-port-shelf": "00"
68 "port-device-name": "XPDR-A2-XPDR2",
70 "port-name": "XPDR2-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
84 "service-rate": "400",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
91 "committed-info-rate": "100000",
92 "committed-burst-size": "64"
97 "port-device-name": "XPDR-C2-XPDR2",
99 "port-name": "XPDR2-NETWORK1",
100 "port-rack": "000000.00",
101 "port-shelf": "Chassis#1"
104 "lgx-device-name": "Some lgx-device-name",
105 "lgx-port-name": "Some lgx-port-name",
106 "lgx-port-rack": "000000.00",
107 "lgx-port-shelf": "00"
112 "port-device-name": "XPDR-C2-XPDR2",
113 "port-type": "fixed",
114 "port-name": "XPDR2-NETWORK1",
115 "port-rack": "000000.00",
116 "port-shelf": "Chassis#1"
119 "lgx-device-name": "Some lgx-device-name",
120 "lgx-port-name": "Some lgx-port-name",
121 "lgx-port-rack": "000000.00",
122 "lgx-port-shelf": "00"
127 "due-date": "2018-06-15T00:00:01Z",
128 "operator-contact": "pw1234"
134 cls.processes = test_utils.start_tpce()
135 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
136 ('roadma', cls.NODE_VERSION_221),
137 ('roadmc', cls.NODE_VERSION_221),
138 ('xpdrc2', cls.NODE_VERSION_71)])
141 def tearDownClass(cls):
142 # pylint: disable=not-an-iterable
143 for process in cls.processes:
144 test_utils.shutdown_process(process)
145 print("all processes killed")
150 def test_01_connect_xpdra2(self):
151 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
152 self.assertEqual(response.status_code,
153 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_02_connect_xpdrc2(self):
156 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
157 self.assertEqual(response.status_code,
158 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_03_connect_rdma(self):
161 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
162 self.assertEqual(response.status_code,
163 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165 def test_04_connect_rdmc(self):
166 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
167 self.assertEqual(response.status_code,
168 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
171 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
172 "ROADM-A1", "1", "SRG1-PP2-TXRX")
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
175 self.assertIn('Xponder Roadm Link created successfully',
176 res["output"]["result"])
179 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
180 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
181 "ROADM-A1", "1", "SRG1-PP2-TXRX")
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 self.assertIn('Roadm Xponder links created successfully',
185 res["output"]["result"])
188 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
189 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
190 "ROADM-C1", "1", "SRG1-PP2-TXRX")
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
193 self.assertIn('Xponder Roadm Link created successfully',
194 res["output"]["result"])
197 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
198 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
199 "ROADM-C1", "1", "SRG1-PP2-TXRX")
200 self.assertEqual(response.status_code, requests.codes.ok)
201 res = response.json()
202 self.assertIn('Roadm Xponder links created successfully',
203 res["output"]["result"])
206 def test_09_add_omsAttributes_roadma_roadmc(self):
207 # Config ROADMA-ROADMC oms-attributes
209 "auto-spanloss": "true",
210 "spanloss-base": 11.4,
211 "spanloss-current": 12,
212 "engineered-spanloss": 12.2,
213 "link-concatenation": [{
216 "SRLG-length": 100000,
218 response = test_utils.add_oms_attr_request(
219 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
220 self.assertEqual(response.status_code, requests.codes.created)
222 def test_10_add_omsAttributes_roadmc_roadma(self):
223 # Config ROADMC-ROADMA oms-attributes
225 "auto-spanloss": "true",
226 "spanloss-base": 11.4,
227 "spanloss-current": 12,
228 "engineered-spanloss": 12.2,
229 "link-concatenation": [{
232 "SRLG-length": 100000,
234 response = test_utils.add_oms_attr_request(
235 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
236 self.assertEqual(response.status_code, requests.codes.created)
238 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
239 def test_11_check_otn_topology(self):
240 response = test_utils.get_otn_topo_request()
241 self.assertEqual(response.status_code, requests.codes.ok)
242 res = response.json()
243 nbNode = len(res['network'][0]['node'])
244 self.assertEqual(nbNode, 4, 'There should be 4 nodes')
245 self.assertNotIn('ietf-network-topology:link', res['network'][0])
247 def test_12_create_OTUC4_service(self):
248 response = test_utils.service_create_request(self.cr_serv_sample_data)
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('PCE calculation in progress',
252 res['output']['configuration-response-common']['response-message'])
253 time.sleep(self.WAITING)
255 def test_13_get_OTUC4_service1(self):
256 response = test_utils.get_service_list_request(
257 "services/service1-OTUC4")
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
261 res['services'][0]['administrative-state'], 'inService')
263 res['services'][0]['service-name'], 'service1-OTUC4')
265 res['services'][0]['connection-type'], 'infrastructure')
267 res['services'][0]['lifecycle-state'], 'planned')
269 res['services'][0]['service-layer'], 'wdm')
271 res['services'][0]['service-a-end']['service-rate'], 400)
273 res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
276 res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
279 # Check correct configuration of devices
280 def test_14_check_interface_och_xpdra2(self):
281 response = test_utils.check_netconf_node_request(
282 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
283 self.assertEqual(response.status_code, requests.codes.ok)
284 res = response.json()
285 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
286 'administrative-state': 'inService',
287 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
288 'type': 'org-openroadm-interfaces:otsi',
289 'supporting-port': 'L1'
290 }, **res['interface'][0]),
293 self.assertDictEqual(
294 dict({u'frequency': 196.0812, u'otsi-rate': u'org-openroadm-common-optical-channel-types:R400G-otsi',
295 u'transmit-power': -5, u'modulation-format': 'dp-qam16'},
296 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
297 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
299 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
300 response = test_utils.check_netconf_node_request(
301 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
302 self.assertEqual(response.status_code, requests.codes.ok)
303 res = response.json()
304 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
305 'administrative-state': 'inService',
306 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
307 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
308 'type': 'org-openroadm-interfaces:otsi-group',
309 'supporting-port': 'L1'
311 input_dict_2 = {"group-id": 1,
312 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
314 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
316 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
317 ['org-openroadm-otsi-group-interfaces:otsi-group']),
319 ['org-openroadm-otsi-group-interfaces:otsi-group'])
321 def test_16_check_interface_OTUC4_xpdra2(self):
322 response = test_utils.check_netconf_node_request(
323 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
327 'administrative-state': 'inService',
328 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
329 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
330 'type': 'org-openroadm-interfaces:otnOtu',
331 'supporting-port': 'L1'
333 input_dict_2 = {'tx-sapi': 'LY9PxYJqUbw=',
334 'expected-dapi': 'LY9PxYJqUbw=',
335 'rate': 'org-openroadm-otn-common-types:OTUCn',
336 'degthr-percentage': 100,
340 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
342 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
344 ['org-openroadm-otn-otu-interfaces:otu'])
346 def test_17_check_interface_och_xpdrc2(self):
347 response = test_utils.check_netconf_node_request(
348 "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
352 'administrative-state': 'inService',
353 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
354 'type': 'org-openroadm-interfaces:otsi',
355 'supporting-port': 'L1'
356 }, **res['interface'][0]),
359 self.assertDictEqual(
360 dict({u'frequency': 196.0812, u'otsi-rate': u'org-openroadm-common-optical-channel-types:R400G-otsi',
361 u'transmit-power': -5, u'modulation-format': 'dp-qam16'},
362 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
363 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
365 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
366 response = test_utils.check_netconf_node_request(
367 "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
371 'administrative-state': 'inService',
372 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
373 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
374 'type': 'org-openroadm-interfaces:otsi-group',
375 'supporting-port': 'L1'
377 input_dict_2 = {"group-id": 1,
378 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
380 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
382 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
383 ['org-openroadm-otsi-group-interfaces:otsi-group']),
385 ['org-openroadm-otsi-group-interfaces:otsi-group'])
387 def test_19_check_interface_OTUC4_xpdrc2(self):
388 response = test_utils.check_netconf_node_request(
389 "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
390 self.assertEqual(response.status_code, requests.codes.ok)
391 res = response.json()
392 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
393 'administrative-state': 'inService',
394 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
395 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
396 'type': 'org-openroadm-interfaces:otnOtu',
397 'supporting-port': 'L1'
399 input_dict_2 = {'tx-dapi': 'LY9PxYJqUbw=',
400 'expected-sapi': 'LY9PxYJqUbw=',
401 'tx-sapi': 'Nmbu2MNHvc4=',
402 'expected-dapi': 'Nmbu2MNHvc4=',
403 'rate': 'org-openroadm-otn-common-types:OTUCn',
404 'degthr-percentage': 100,
409 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
412 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
414 ['org-openroadm-otn-otu-interfaces:otu'])
416 def test_20_check_no_interface_ODUC4_xpdra2(self):
417 response = test_utils.check_netconf_node_request(
418 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
419 self.assertEqual(response.status_code, requests.codes.conflict)
420 res = response.json()
422 {"error-type": "application", "error-tag": "data-missing",
423 "error-message": "Request could not be completed because the relevant data model content does not exist"},
424 res['errors']['error'])
426 def test_21_check_openroadm_topo_xpdra2(self):
427 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
428 self.assertEqual(response.status_code, requests.codes.ok)
429 res = response.json()
430 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
431 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
432 self.assertEqual({u'frequency': 196.08125,
434 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
437 def test_22_check_otn_topo_OTUC4_links(self):
438 response = test_utils.get_otn_topo_request()
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 nb_links = len(res['network'][0]['ietf-network-topology:link'])
442 self.assertEqual(nb_links, 2)
443 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
444 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
445 for link in res['network'][0]['ietf-network-topology:link']:
446 self.assertIn(link['link-id'], listLinkId)
448 link['transportpce-topology:otn-link-type'], 'OTUC4')
450 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
452 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
454 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
456 link['org-openroadm-common-network:opposite-link'], listLinkId)
458 # test service-create for ODU4 service from xpdra2 to xpdrc2
459 def test_23_create_ODUC4_service(self):
460 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
461 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
462 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
463 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
464 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
465 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
466 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
468 response = test_utils.service_create_request(self.cr_serv_sample_data)
469 self.assertEqual(response.status_code, requests.codes.ok)
470 res = response.json()
471 self.assertIn('PCE calculation in progress',
472 res['output']['configuration-response-common']['response-message'])
473 time.sleep(self.WAITING)
475 def test_24_get_ODUC4_service1(self):
476 response = test_utils.get_service_list_request(
477 "services/service1-ODUC4")
478 self.assertEqual(response.status_code, requests.codes.ok)
479 res = response.json()
481 res['services'][0]['administrative-state'], 'inService')
483 res['services'][0]['service-name'], 'service1-ODUC4')
485 res['services'][0]['connection-type'], 'infrastructure')
487 res['services'][0]['lifecycle-state'], 'planned')
489 res['services'][0]['service-layer'], 'wdm')
491 res['services'][0]['service-a-end']['service-rate'], 400)
493 res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
495 res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
498 def test_25_check_interface_ODUC4_xpdra2(self):
499 response = test_utils.check_netconf_node_request(
500 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
501 self.assertEqual(response.status_code, requests.codes.ok)
502 res = response.json()
503 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
504 'administrative-state': 'inService',
505 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
506 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
507 'type': 'org-openroadm-interfaces:otnOdu',
508 'supporting-port': 'L1'}
509 # SAPI/DAPI are added in the Otu4 renderer
510 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
511 'rate': 'org-openroadm-otn-common-types:ODUCn',
512 'expected-dapi': 'Nmbu2MNHvc4=',
513 'expected-sapi': 'Nmbu2MNHvc4=',
514 'tx-dapi': 'Nmbu2MNHvc4=',
515 'tx-sapi': 'LY9PxYJqUbw='}
517 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
519 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
520 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
521 self.assertDictEqual(
522 {u'payload-type': u'22', u'exp-payload-type': u'22'},
523 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
525 def test_26_check_interface_ODUC4_xpdrc2(self):
526 response = test_utils.check_netconf_node_request(
527 "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
531 'administrative-state': 'inService',
532 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
533 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
534 'type': 'org-openroadm-interfaces:otnOdu',
535 'supporting-port': 'L1'}
536 # SAPI/DAPI are added in the Otu4 renderer
537 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
538 'rate': 'org-openroadm-otn-common-types:ODUCn',
539 'tx-sapi': 'Nmbu2MNHvc4=',
540 'tx-dapi': 'LY9PxYJqUbw=',
541 'expected-sapi': 'LY9PxYJqUbw=',
542 'expected-dapi': 'LY9PxYJqUbw='
544 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
546 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
547 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
548 self.assertDictEqual(
549 {u'payload-type': u'22', u'exp-payload-type': u'22'},
550 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
552 def test_27_check_otn_topo_links(self):
553 response = test_utils.get_otn_topo_request()
554 self.assertEqual(response.status_code, requests.codes.ok)
555 res = response.json()
556 nb_links = len(res['network'][0]['ietf-network-topology:link'])
557 self.assertEqual(nb_links, 4)
558 for link in res['network'][0]['ietf-network-topology:link']:
559 linkId = link['link-id']
560 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
561 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
563 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
565 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
566 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
567 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
569 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
571 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
573 link['transportpce-topology:otn-link-type'], 'ODUC4')
575 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
576 self.assertIn(link['org-openroadm-common-network:opposite-link'],
577 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
578 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
580 self.fail("this link should not exist")
582 def test_28_check_otn_topo_tp(self):
583 response = test_utils.get_otn_topo_request()
584 res = response.json()
585 for node in res['network'][0]['node']:
586 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
587 tpList = node['ietf-network-topology:termination-point']
589 if tp['tp-id'] == 'XPDR2-NETWORK1':
590 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
591 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
593 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
594 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
595 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
597 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
598 def test_29_create_100GE_service_1(self):
599 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
600 self.cr_serv_sample_data["input"]["connection-type"] = "service"
601 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
602 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
603 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
604 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
605 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
606 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
607 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
608 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
609 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
610 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611 response = test_utils.service_create_request(self.cr_serv_sample_data)
612 self.assertEqual(response.status_code, requests.codes.ok)
613 res = response.json()
614 self.assertIn('PCE calculation in progress',
615 res['output']['configuration-response-common']['response-message'])
616 time.sleep(self.WAITING)
618 def test_30_get_100GE_service_1(self):
619 response = test_utils.get_service_list_request(
620 "services/service-100GE")
621 self.assertEqual(response.status_code, requests.codes.ok)
622 res = response.json()
624 res['services'][0]['administrative-state'], 'inService')
626 res['services'][0]['service-name'], 'service-100GE')
628 res['services'][0]['connection-type'], 'service')
630 res['services'][0]['lifecycle-state'], 'planned')
633 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
634 response = test_utils.check_netconf_node_request(
635 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
639 'administrative-state': 'inService',
640 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
641 'type': 'org-openroadm-interfaces:ethernetCsmacd',
642 'supporting-port': 'C1'
644 input_dict_2 = {u'speed': 100000}
645 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
647 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
648 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
650 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
651 response = test_utils.check_netconf_node_request(
652 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
655 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
656 'administrative-state': 'inService',
657 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
658 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
659 'type': 'org-openroadm-interfaces:otnOdu',
660 'supporting-port': 'C1'}
662 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
663 'rate': 'org-openroadm-otn-common-types:ODU4',
664 'monitoring-mode': 'terminated'}
666 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
668 self.assertDictEqual(dict(input_dict_2,
669 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
670 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
671 self.assertDictEqual(
672 {u'payload-type': u'07', u'exp-payload-type': u'07'},
673 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
675 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
676 response = test_utils.check_netconf_node_request(
677 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
681 'administrative-state': 'inService',
682 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
683 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
684 'type': 'org-openroadm-interfaces:otnOdu',
685 'supporting-port': 'L1'}
687 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
688 'rate': 'org-openroadm-otn-common-types:ODU4',
689 'monitoring-mode': 'not-terminated'}
690 input_dict_3 = {u'trib-port-number': 1}
692 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
694 self.assertDictEqual(dict(input_dict_2,
695 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
696 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
697 self.assertDictEqual(dict(input_dict_3,
698 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
699 'parent-odu-allocation']),
700 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
701 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
702 ['opucn-trib-slots'])
703 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
704 ['opucn-trib-slots'])
706 def test_34_check_ODU4_connection_xpdra2(self):
707 response = test_utils.check_netconf_node_request(
709 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
710 self.assertEqual(response.status_code, requests.codes.ok)
711 res = response.json()
714 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
715 'direction': 'bidirectional'
718 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
719 res['odu-connection'][0])
720 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4-service-100GE'},
721 res['odu-connection'][0]['destination'])
722 self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4-service-100GE'},
723 res['odu-connection'][0]['source'])
725 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
726 response = test_utils.check_netconf_node_request(
727 "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
728 self.assertEqual(response.status_code, requests.codes.ok)
729 res = response.json()
730 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
731 'administrative-state': 'inService',
732 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
733 'type': 'org-openroadm-interfaces:ethernetCsmacd',
734 'supporting-port': 'C1'
736 input_dict_2 = {u'speed': 100000}
737 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
739 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
740 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
742 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
743 response = test_utils.check_netconf_node_request(
744 "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
748 'administrative-state': 'inService',
749 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
750 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
751 'type': 'org-openroadm-interfaces:otnOdu',
752 'supporting-port': 'C1'}
754 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
755 'rate': 'org-openroadm-otn-common-types:ODU4',
756 'monitoring-mode': 'terminated'}
758 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
760 self.assertDictEqual(dict(input_dict_2,
761 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
762 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
763 self.assertDictEqual(
764 {u'payload-type': u'07', u'exp-payload-type': u'07'},
765 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
767 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
768 response = test_utils.check_netconf_node_request(
769 "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
770 self.assertEqual(response.status_code, requests.codes.ok)
771 res = response.json()
772 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
773 'administrative-state': 'inService',
774 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
775 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
776 'type': 'org-openroadm-interfaces:otnOdu',
777 'supporting-port': 'C1'}
779 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
780 'rate': 'org-openroadm-otn-common-types:ODU4',
781 'monitoring-mode': 'not-terminated'}
783 input_dict_3 = {'trib-port-number': 1}
785 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
787 self.assertDictEqual(dict(input_dict_2,
788 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
789 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
790 self.assertDictEqual(dict(input_dict_3,
791 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
792 'parent-odu-allocation']),
793 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
794 'parent-odu-allocation'])
797 'org-openroadm-otn-odu-interfaces:odu'][
798 'parent-odu-allocation']['opucn-trib-slots'])
799 self.assertIn('1.20',
801 'org-openroadm-otn-odu-interfaces:odu'][
802 'parent-odu-allocation']['opucn-trib-slots'])
804 def test_38_check_ODU4_connection_xpdrc2(self):
805 response = test_utils.check_netconf_node_request(
807 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
808 self.assertEqual(response.status_code, requests.codes.ok)
809 res = response.json()
812 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
813 'direction': 'bidirectional'
816 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
817 res['odu-connection'][0])
818 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4-service-100GE'},
819 res['odu-connection'][0]['destination'])
820 self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4-service-100GE'},
821 res['odu-connection'][0]['source'])
823 def test_39_check_otn_topo_links(self):
824 response = test_utils.get_otn_topo_request()
825 self.assertEqual(response.status_code, requests.codes.ok)
826 res = response.json()
827 nb_links = len(res['network'][0]['ietf-network-topology:link'])
828 self.assertEqual(nb_links, 4)
829 for link in res['network'][0]['ietf-network-topology:link']:
830 linkId = link['link-id']
831 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
832 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
834 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
836 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
838 def test_40_check_otn_topo_tp(self):
839 response = test_utils.get_otn_topo_request()
840 res = response.json()
841 for node in res['network'][0]['node']:
842 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
843 tpList = node['ietf-network-topology:termination-point']
845 if tp['tp-id'] == 'XPDR2-NETWORK1':
846 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
847 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
848 tsPoolList = list(range(1, 20))
850 tsPoolList, xpdrTpPortConAt['ts-pool'])
852 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
854 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
856 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
857 def test_41_create_100GE_service_2(self):
858 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
859 self.cr_serv_sample_data["input"]["connection-type"] = "service"
860 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
861 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
862 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
863 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
864 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
865 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
866 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
867 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
868 response = test_utils.service_create_request(self.cr_serv_sample_data)
869 self.assertEqual(response.status_code, requests.codes.ok)
870 res = response.json()
871 self.assertIn('PCE calculation in progress',
872 res['output']['configuration-response-common']['response-message'])
873 time.sleep(self.WAITING)
875 def test_42_get_100GE_service_2(self):
876 response = test_utils.get_service_list_request(
877 "services/service-100GE2")
878 self.assertEqual(response.status_code, requests.codes.ok)
879 res = response.json()
881 res['services'][0]['administrative-state'], 'inService')
883 res['services'][0]['service-name'], 'service-100GE2')
885 res['services'][0]['connection-type'], 'service')
887 res['services'][0]['lifecycle-state'], 'planned')
890 def test_43_check_service_list(self):
891 response = test_utils.get_service_list_request("")
892 self.assertEqual(response.status_code, requests.codes.ok)
893 res = response.json()
894 self.assertEqual(len(res['service-list']['services']), 4)
897 def test_44_check_otn_topo_links(self):
898 response = test_utils.get_otn_topo_request()
899 self.assertEqual(response.status_code, requests.codes.ok)
900 res = response.json()
901 nb_links = len(res['network'][0]['ietf-network-topology:link'])
902 self.assertEqual(nb_links, 4)
903 for link in res['network'][0]['ietf-network-topology:link']:
904 linkId = link['link-id']
905 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
906 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
908 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
910 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
912 def test_45_check_otn_topo_tp(self):
913 response = test_utils.get_otn_topo_request()
914 res = response.json()
915 for node in res['network'][0]['node']:
916 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
917 tpList = node['ietf-network-topology:termination-point']
919 if tp['tp-id'] == 'XPDR2-NETWORK1':
920 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
921 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
922 tsPoolList = list(range(1, 40))
924 tsPoolList, xpdrTpPortConAt['ts-pool'])
926 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
928 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
930 def test_46_delete_100GE_service_2(self):
931 response = test_utils.service_delete_request("service-100GE2")
932 self.assertEqual(response.status_code, requests.codes.ok)
933 res = response.json()
934 self.assertIn('Renderer service delete in progress',
935 res['output']['configuration-response-common']['response-message'])
936 time.sleep(self.WAITING)
938 def test_47_delete_100GE_service_1(self):
939 response = test_utils.service_delete_request("service-100GE")
940 self.assertEqual(response.status_code, requests.codes.ok)
941 res = response.json()
942 self.assertIn('Renderer service delete in progress',
943 res['output']['configuration-response-common']['response-message'])
944 time.sleep(self.WAITING)
946 def test_48_check_service_list(self):
947 response = test_utils.get_service_list_request("")
948 self.assertEqual(response.status_code, requests.codes.ok)
949 res = response.json()
950 self.assertEqual(len(res['service-list']['services']), 2)
953 def test_49_check_no_ODU4_connection_xpdra2(self):
954 response = test_utils.check_netconf_node_request("XPDR-A2", "")
955 self.assertEqual(response.status_code, requests.codes.ok)
956 res = response.json()
957 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
960 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
961 response = test_utils.check_netconf_node_request(
962 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
963 self.assertEqual(response.status_code, requests.codes.conflict)
965 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
966 response = test_utils.check_netconf_node_request(
967 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
968 self.assertEqual(response.status_code, requests.codes.conflict)
970 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
971 response = test_utils.check_netconf_node_request(
972 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
973 self.assertEqual(response.status_code, requests.codes.conflict)
975 def test_53_check_otn_topo_links(self):
976 response = test_utils.get_otn_topo_request()
977 self.assertEqual(response.status_code, requests.codes.ok)
978 res = response.json()
979 nb_links = len(res['network'][0]['ietf-network-topology:link'])
980 self.assertEqual(nb_links, 4)
981 for link in res['network'][0]['ietf-network-topology:link']:
982 linkId = link['link-id']
983 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
984 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
986 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
988 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
990 def test_54_check_otn_topo_tp(self):
991 response = test_utils.get_otn_topo_request()
992 res = response.json()
993 for node in res['network'][0]['node']:
994 if (node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2'):
995 tpList = node['ietf-network-topology:termination-point']
997 if tp['tp-id'] == 'XPDR2-NETWORK1':
998 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
999 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1001 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1003 def test_55_delete_ODUC4_service(self):
1004 response = test_utils.service_delete_request("service1-ODUC4")
1005 self.assertEqual(response.status_code, requests.codes.ok)
1006 res = response.json()
1007 self.assertIn('Renderer service delete in progress',
1008 res['output']['configuration-response-common']['response-message'])
1009 time.sleep(self.WAITING)
1011 def test_56_check_service_list(self):
1012 response = test_utils.get_service_list_request("")
1013 self.assertEqual(response.status_code, requests.codes.ok)
1014 res = response.json()
1015 self.assertEqual(len(res['service-list']['services']), 1)
1018 def test_57_check_no_interface_ODU4_xpdra2(self):
1019 response = test_utils.check_netconf_node_request(
1020 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1021 self.assertEqual(response.status_code, requests.codes.conflict)
1023 def test_58_check_otn_topo_links(self):
1024 self.test_22_check_otn_topo_OTUC4_links()
1026 def test_59_check_otn_topo_tp(self):
1027 response = test_utils.get_otn_topo_request()
1028 res = response.json()
1029 for node in res['network'][0]['node']:
1030 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
1031 tpList = node['ietf-network-topology:termination-point']
1033 if tp['tp-id'] == 'XPDR2-NETWORK1':
1034 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1037 def test_60_delete_OTUC4_service(self):
1038 response = test_utils.service_delete_request("service1-OTUC4")
1039 self.assertEqual(response.status_code, requests.codes.ok)
1040 res = response.json()
1041 self.assertIn('Renderer service delete in progress',
1042 res['output']['configuration-response-common']['response-message'])
1043 time.sleep(self.WAITING)
1045 def test_61_get_no_service(self):
1046 response = test_utils.get_service_list_request("")
1047 self.assertEqual(response.status_code, requests.codes.conflict)
1048 res = response.json()
1050 {"error-type": "application", "error-tag": "data-missing",
1051 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1052 res['errors']['error'])
1055 def test_62_check_no_interface_OTUC4_xpdra2(self):
1056 response = test_utils.check_netconf_node_request(
1057 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1058 self.assertEqual(response.status_code, requests.codes.conflict)
1060 def test_63_check_no_interface_OCH_xpdra2(self):
1061 response = test_utils.check_netconf_node_request(
1062 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1063 self.assertEqual(response.status_code, requests.codes.conflict)
1065 def test_64_check_no_interface_OTSI_xpdra2(self):
1066 response = test_utils.check_netconf_node_request(
1067 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1068 self.assertEqual(response.status_code, requests.codes.conflict)
1070 def test_65_getLinks_OtnTopology(self):
1071 response = test_utils.get_otn_topo_request()
1072 self.assertEqual(response.status_code, requests.codes.ok)
1073 res = response.json()
1074 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1076 def test_66_check_openroadm_topo_xpdra2(self):
1077 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1078 self.assertEqual(response.status_code, requests.codes.ok)
1079 res = response.json()
1080 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1081 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1082 self.assertNotIn('wavelength', dict.keys(
1083 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
1086 def test_67_check_openroadm_topology(self):
1087 response = test_utils.get_ordm_topo_request("")
1088 self.assertEqual(response.status_code, requests.codes.ok)
1089 res = response.json()
1090 links = res['network'][0]['ietf-network-topology:link']
1091 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1093 def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1094 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1095 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1096 self.assertEqual(response.status_code, requests.codes.ok)
1097 res = response.json()
1098 self.assertIn('Xponder Roadm Link created successfully',
1099 res["output"]["result"])
1102 def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1103 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1104 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1105 self.assertEqual(response.status_code, requests.codes.ok)
1106 res = response.json()
1107 self.assertIn('Roadm Xponder links created successfully',
1108 res["output"]["result"])
1111 def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1112 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1113 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1114 self.assertEqual(response.status_code, requests.codes.ok)
1115 res = response.json()
1116 self.assertIn('Xponder Roadm Link created successfully',
1117 res["output"]["result"])
1120 def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1121 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1122 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1123 self.assertEqual(response.status_code, requests.codes.ok)
1124 res = response.json()
1125 self.assertIn('Roadm Xponder links created successfully',
1126 res["output"]["result"])
1129 # test service-create for 400GE service from xpdra2 to xpdrc2
1130 def test_72_create_400GE_service(self):
1131 self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1132 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1133 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1134 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1135 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1136 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1137 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1138 response = test_utils.service_create_request(self.cr_serv_sample_data)
1139 self.assertEqual(response.status_code, requests.codes.ok)
1140 res = response.json()
1141 self.assertIn('PCE calculation in progress',
1142 res['output']['configuration-response-common']['response-message'])
1143 time.sleep(self.WAITING)
1145 def test_73_get_400GE_service(self):
1146 response = test_utils.get_service_list_request(
1147 "services/service-400GE")
1148 self.assertEqual(response.status_code, requests.codes.ok)
1149 res = response.json()
1151 res['services'][0]['administrative-state'], 'inService')
1153 res['services'][0]['service-name'], 'service-400GE')
1155 res['services'][0]['connection-type'], 'service')
1157 res['services'][0]['lifecycle-state'], 'planned')
1160 def test_74_check_xc1_roadma(self):
1161 response = test_utils.check_netconf_node_request(
1162 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1163 self.assertEqual(response.status_code, requests.codes.ok)
1164 res = response.json()
1165 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1166 self.assertDictEqual(
1168 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1169 'opticalControlMode': 'gainLoss',
1170 'target-output-power': -3.0
1171 }, **res['roadm-connections'][0]),
1172 res['roadm-connections'][0]
1174 self.assertDictEqual(
1175 {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1176 res['roadm-connections'][0]['source'])
1177 self.assertDictEqual(
1178 {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1179 res['roadm-connections'][0]['destination'])
1182 def test_75_check_topo_xpdra2(self):
1183 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1184 self.assertEqual(response.status_code, requests.codes.ok)
1185 res = response.json()
1186 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1187 for ele in liste_tp:
1188 if ele['tp-id'] == 'XPDR1-NETWORK1':
1189 self.assertEqual({u'frequency': 196.08125,
1191 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1192 if ele['tp-id'] == 'XPDR1-CLIENT1':
1193 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1196 def test_76_check_topo_roadma_SRG1(self):
1197 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1198 self.assertEqual(response.status_code, requests.codes.ok)
1199 res = response.json()
1200 freq_map = base64.b64decode(
1201 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1202 freq_map_array = [int(x) for x in freq_map]
1203 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1204 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1205 for ele in liste_tp:
1206 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1207 freq_map = base64.b64decode(
1208 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1209 freq_map_array = [int(x) for x in freq_map]
1210 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1211 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1212 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1215 def test_77_check_topo_roadma_DEG1(self):
1216 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1217 self.assertEqual(response.status_code, requests.codes.ok)
1218 res = response.json()
1219 freq_map = base64.b64decode(
1220 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1221 freq_map_array = [int(x) for x in freq_map]
1222 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1223 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1224 for ele in liste_tp:
1225 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1226 freq_map = base64.b64decode(
1227 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1228 freq_map_array = [int(x) for x in freq_map]
1229 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1230 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1231 freq_map = base64.b64decode(
1232 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1233 freq_map_array = [int(x) for x in freq_map]
1234 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1237 def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1238 response = test_utils.check_netconf_node_request(
1239 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1240 self.assertEqual(response.status_code, requests.codes.ok)
1241 res = response.json()
1242 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1243 'administrative-state': 'inService',
1244 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1245 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1246 'supporting-port': 'C1'
1248 input_dict_2 = {u'speed': 400000}
1249 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1250 res['interface'][0])
1251 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1252 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1254 def test_79_check_interface_OTSI_xpdra2(self):
1255 response = test_utils.check_netconf_node_request(
1256 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1257 self.assertEqual(response.status_code, requests.codes.ok)
1258 res = response.json()
1259 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1260 'administrative-state': 'inService',
1261 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1262 'type': 'org-openroadm-interfaces:otsi',
1263 'supporting-port': 'L1'}
1265 "frequency": 196.0812,
1266 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1267 "fec": "org-openroadm-common-types:ofec",
1268 "transmit-power": -5,
1269 "provision-mode": "explicit",
1270 "modulation-format": "dp-qam16"}
1272 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1273 res['interface'][0])
1274 self.assertDictEqual(dict(input_dict_2,
1275 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1276 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1277 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1278 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1280 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1281 response = test_utils.check_netconf_node_request(
1282 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1283 self.assertEqual(response.status_code, requests.codes.ok)
1284 res = response.json()
1285 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1286 'administrative-state': 'inService',
1287 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1288 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1289 'type': 'org-openroadm-interfaces:otsi-group',
1290 'supporting-port': 'L1'}
1291 input_dict_2 = {"group-id": 1,
1292 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1294 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1295 res['interface'][0])
1296 self.assertDictEqual(dict(input_dict_2,
1297 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1298 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1300 def test_81_check_interface_OTUC4_xpdra2(self):
1301 response = test_utils.check_netconf_node_request(
1302 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1303 self.assertEqual(response.status_code, requests.codes.ok)
1304 res = response.json()
1305 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1306 'administrative-state': 'inService',
1307 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1308 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1309 'type': 'org-openroadm-interfaces:otnOtu',
1310 'supporting-port': 'L1'}
1311 input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1312 "degthr-percentage": 100,
1313 "tim-detect-mode": "Disabled",
1315 "degm-intervals": 2,
1316 "expected-dapi": "AIGiVAQ4gDil"}
1318 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1319 res['interface'][0])
1320 self.assertDictEqual(dict(input_dict_2,
1321 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1322 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1324 def test_82_check_interface_ODUC4_xpdra2(self):
1325 response = test_utils.check_netconf_node_request(
1326 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1327 self.assertEqual(response.status_code, requests.codes.ok)
1328 res = response.json()
1329 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1330 'administrative-state': 'inService',
1331 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1332 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1333 'type': 'org-openroadm-interfaces:otnOdu',
1334 'supporting-port': 'L1'}
1335 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1336 "tim-detect-mode": "Disabled",
1337 "degm-intervals": 2,
1338 "degthr-percentage": 100,
1339 "monitoring-mode": "terminated",
1340 "rate": "org-openroadm-otn-common-types:ODUCn",
1342 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1344 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1345 res['interface'][0])
1346 self.assertDictEqual(dict(input_dict_2,
1347 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1348 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1349 self.assertDictEqual(dict(input_dict_3,
1350 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1351 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1353 def test_83_delete_400GE_service(self):
1354 response = test_utils.service_delete_request("service-400GE")
1355 self.assertEqual(response.status_code, requests.codes.ok)
1356 res = response.json()
1357 self.assertIn('Renderer service delete in progress',
1358 res['output']['configuration-response-common']['response-message'])
1359 time.sleep(self.WAITING)
1361 def test_84_get_no_service(self):
1362 response = test_utils.get_service_list_request("")
1363 self.assertEqual(response.status_code, requests.codes.conflict)
1364 res = response.json()
1366 {"error-type": "application", "error-tag": "data-missing",
1367 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1368 res['errors']['error'])
1371 def test_85_check_no_interface_ODUC4_xpdra2(self):
1372 response = test_utils.check_netconf_node_request(
1373 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1374 self.assertEqual(response.status_code, requests.codes.conflict)
1376 def test_86_check_no_interface_OTUC4_xpdra2(self):
1377 response = test_utils.check_netconf_node_request(
1378 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1379 self.assertEqual(response.status_code, requests.codes.conflict)
1381 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1382 response = test_utils.check_netconf_node_request(
1383 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1384 self.assertEqual(response.status_code, requests.codes.conflict)
1386 def test_88_check_no_interface_OTSI_xpdra2(self):
1387 response = test_utils.check_netconf_node_request(
1388 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1389 self.assertEqual(response.status_code, requests.codes.conflict)
1391 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1392 response = test_utils.check_netconf_node_request(
1393 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1394 self.assertEqual(response.status_code, requests.codes.conflict)
1396 def test_90_disconnect_xponders_from_roadm(self):
1397 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1398 response = test_utils.get_ordm_topo_request("")
1399 self.assertEqual(response.status_code, requests.codes.ok)
1400 res = response.json()
1401 links = res['network'][0]['ietf-network-topology:link']
1403 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1404 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1405 link_name = link["link-id"]
1406 response = test_utils.delete_request(url+link_name)
1407 self.assertEqual(response.status_code, requests.codes.ok)
1409 def test_91_disconnect_xpdra2(self):
1410 response = test_utils.unmount_device("XPDR-A2")
1411 self.assertEqual(response.status_code, requests.codes.ok,
1412 test_utils.CODE_SHOULD_BE_200)
1414 def test_92_disconnect_xpdrc2(self):
1415 response = test_utils.unmount_device("XPDR-C2")
1416 self.assertEqual(response.status_code, requests.codes.ok,
1417 test_utils.CODE_SHOULD_BE_200)
1419 def test_93_disconnect_roadmA(self):
1420 response = test_utils.unmount_device("ROADM-A1")
1421 self.assertEqual(response.status_code, requests.codes.ok,
1422 test_utils.CODE_SHOULD_BE_200)
1424 def test_94_disconnect_roadmC(self):
1425 response = test_utils.unmount_device("ROADM-C1")
1426 self.assertEqual(response.status_code, requests.codes.ok,
1427 test_utils.CODE_SHOULD_BE_200)
1430 if __name__ == "__main__":
1431 unittest.main(verbosity=2)