3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION_221 = '2.2.1'
33 NODE_VERSION_71 = '7.1'
35 cr_serv_sample_data = {"input": {
36 "sdnc-request-header": {
37 "request-id": "request-1",
38 "rpc-action": "service-create",
39 "request-system-id": "appname"
41 "service-name": "service1-OTUC4",
42 "common-id": "commonId",
43 "connection-type": "infrastructure",
45 "service-rate": "400",
47 "service-format": "OTU",
48 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
52 "committed-info-rate": "100000",
53 "committed-burst-size": "64"
58 "port-device-name": "XPDR-A2-XPDR2",
60 "port-name": "XPDR2-NETWORK1",
61 "port-rack": "000000.00",
62 "port-shelf": "Chassis#1"
65 "lgx-device-name": "Some lgx-device-name",
66 "lgx-port-name": "Some lgx-port-name",
67 "lgx-port-rack": "000000.00",
68 "lgx-port-shelf": "00"
73 "port-device-name": "XPDR-A2-XPDR2",
75 "port-name": "XPDR2-NETWORK1",
76 "port-rack": "000000.00",
77 "port-shelf": "Chassis#1"
80 "lgx-device-name": "Some lgx-device-name",
81 "lgx-port-name": "Some lgx-port-name",
82 "lgx-port-rack": "000000.00",
83 "lgx-port-shelf": "00"
89 "service-rate": "400",
91 "service-format": "OTU",
92 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
96 "committed-info-rate": "100000",
97 "committed-burst-size": "64"
102 "port-device-name": "XPDR-C2-XPDR2",
103 "port-type": "fixed",
104 "port-name": "XPDR2-NETWORK1",
105 "port-rack": "000000.00",
106 "port-shelf": "Chassis#1"
109 "lgx-device-name": "Some lgx-device-name",
110 "lgx-port-name": "Some lgx-port-name",
111 "lgx-port-rack": "000000.00",
112 "lgx-port-shelf": "00"
117 "port-device-name": "XPDR-C2-XPDR2",
118 "port-type": "fixed",
119 "port-name": "XPDR2-NETWORK1",
120 "port-rack": "000000.00",
121 "port-shelf": "Chassis#1"
124 "lgx-device-name": "Some lgx-device-name",
125 "lgx-port-name": "Some lgx-port-name",
126 "lgx-port-rack": "000000.00",
127 "lgx-port-shelf": "00"
132 "due-date": "2018-06-15T00:00:01Z",
133 "operator-contact": "pw1234"
139 cls.processes = test_utils.start_tpce()
140 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
141 ('roadma', cls.NODE_VERSION_221),
142 ('roadmc', cls.NODE_VERSION_221),
143 ('xpdrc2', cls.NODE_VERSION_71)])
146 def tearDownClass(cls):
147 # pylint: disable=not-an-iterable
148 for process in cls.processes:
149 test_utils.shutdown_process(process)
150 print("all processes killed")
155 def test_01_connect_xpdra2(self):
156 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
157 self.assertEqual(response.status_code,
158 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_02_connect_xpdrc2(self):
161 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
162 self.assertEqual(response.status_code,
163 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165 def test_03_connect_rdma(self):
166 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
167 self.assertEqual(response.status_code,
168 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170 def test_04_connect_rdmc(self):
171 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
172 self.assertEqual(response.status_code,
173 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
176 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
177 "ROADM-A1", "1", "SRG1-PP2-TXRX")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Xponder Roadm Link created successfully',
181 res["output"]["result"])
184 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
185 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
186 "ROADM-A1", "1", "SRG1-PP2-TXRX")
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
189 self.assertIn('Roadm Xponder links created successfully',
190 res["output"]["result"])
193 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
194 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
195 "ROADM-C1", "1", "SRG1-PP2-TXRX")
196 self.assertEqual(response.status_code, requests.codes.ok)
197 res = response.json()
198 self.assertIn('Xponder Roadm Link created successfully',
199 res["output"]["result"])
202 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
203 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
204 "ROADM-C1", "1", "SRG1-PP2-TXRX")
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
207 self.assertIn('Roadm Xponder links created successfully',
208 res["output"]["result"])
211 def test_09_add_omsAttributes_roadma_roadmc(self):
212 # Config ROADMA-ROADMC oms-attributes
214 "auto-spanloss": "true",
215 "spanloss-base": 11.4,
216 "spanloss-current": 12,
217 "engineered-spanloss": 12.2,
218 "link-concatenation": [{
221 "SRLG-length": 100000,
223 response = test_utils.add_oms_attr_request(
224 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
225 self.assertEqual(response.status_code, requests.codes.created)
227 def test_10_add_omsAttributes_roadmc_roadma(self):
228 # Config ROADMC-ROADMA oms-attributes
230 "auto-spanloss": "true",
231 "spanloss-base": 11.4,
232 "spanloss-current": 12,
233 "engineered-spanloss": 12.2,
234 "link-concatenation": [{
237 "SRLG-length": 100000,
239 response = test_utils.add_oms_attr_request(
240 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
241 self.assertEqual(response.status_code, requests.codes.created)
243 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
244 def test_11_check_otn_topology(self):
245 response = test_utils.get_otn_topo_request()
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
248 nbNode = len(res['network'][0]['node'])
249 self.assertEqual(nbNode, 4, 'There should be 4 nodes')
250 self.assertNotIn('ietf-network-topology:link', res['network'][0])
252 def test_12_create_OTUC4_service(self):
253 response = test_utils.service_create_request(self.cr_serv_sample_data)
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 self.assertIn('PCE calculation in progress',
257 res['output']['configuration-response-common']['response-message'])
258 time.sleep(self.WAITING)
260 def test_13_get_OTUC4_service1(self):
261 response = test_utils.get_service_list_request(
262 "services/service1-OTUC4")
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
266 res['services'][0]['administrative-state'], 'inService')
268 res['services'][0]['service-name'], 'service1-OTUC4')
270 res['services'][0]['connection-type'], 'infrastructure')
272 res['services'][0]['lifecycle-state'], 'planned')
274 res['services'][0]['service-layer'], 'wdm')
276 res['services'][0]['service-a-end']['service-rate'], 400)
278 res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
281 res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
284 # Check correct configuration of devices
285 def test_14_check_interface_och_xpdra2(self):
286 response = test_utils.check_netconf_node_request(
287 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
290 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
291 'administrative-state': 'inService',
292 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
293 'type': 'org-openroadm-interfaces:otsi',
294 'supporting-port': 'L1'
295 }, **res['interface'][0]),
298 self.assertDictEqual(
299 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
300 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
301 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
302 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
304 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
305 response = test_utils.check_netconf_node_request(
306 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
307 self.assertEqual(response.status_code, requests.codes.ok)
308 res = response.json()
309 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
310 'administrative-state': 'inService',
311 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
312 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
313 'type': 'org-openroadm-interfaces:otsi-group',
314 'supporting-port': 'L1'
316 input_dict_2 = {"group-id": 1,
317 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
319 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
321 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
322 ['org-openroadm-otsi-group-interfaces:otsi-group']),
324 ['org-openroadm-otsi-group-interfaces:otsi-group'])
326 def test_16_check_interface_OTUC4_xpdra2(self):
327 response = test_utils.check_netconf_node_request(
328 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
329 self.assertEqual(response.status_code, requests.codes.ok)
330 res = response.json()
331 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
332 'administrative-state': 'inService',
333 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
334 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
335 'type': 'org-openroadm-interfaces:otnOtu',
336 'supporting-port': 'L1'
338 input_dict_2 = {'tx-sapi': 'LY9PxYJqUbw=',
339 'expected-dapi': 'LY9PxYJqUbw=',
340 'rate': 'org-openroadm-otn-common-types:OTUCn',
341 'degthr-percentage': 100,
345 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
347 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
349 ['org-openroadm-otn-otu-interfaces:otu'])
351 def test_17_check_interface_och_xpdrc2(self):
352 response = test_utils.check_netconf_node_request(
353 "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
357 'administrative-state': 'inService',
358 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
359 'type': 'org-openroadm-interfaces:otsi',
360 'supporting-port': 'L1'
361 }, **res['interface'][0]),
364 self.assertDictEqual(
365 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
366 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
367 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
368 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
370 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
371 response = test_utils.check_netconf_node_request(
372 "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
376 'administrative-state': 'inService',
377 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
378 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
379 'type': 'org-openroadm-interfaces:otsi-group',
380 'supporting-port': 'L1'
382 input_dict_2 = {"group-id": 1,
383 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
385 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
387 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
388 ['org-openroadm-otsi-group-interfaces:otsi-group']),
390 ['org-openroadm-otsi-group-interfaces:otsi-group'])
392 def test_19_check_interface_OTUC4_xpdrc2(self):
393 response = test_utils.check_netconf_node_request(
394 "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
397 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
398 'administrative-state': 'inService',
399 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
400 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
401 'type': 'org-openroadm-interfaces:otnOtu',
402 'supporting-port': 'L1'
404 input_dict_2 = {'tx-dapi': 'LY9PxYJqUbw=',
405 'expected-sapi': 'LY9PxYJqUbw=',
406 'tx-sapi': 'Nmbu2MNHvc4=',
407 'expected-dapi': 'Nmbu2MNHvc4=',
408 'rate': 'org-openroadm-otn-common-types:OTUCn',
409 'degthr-percentage': 100,
414 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
417 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
419 ['org-openroadm-otn-otu-interfaces:otu'])
421 def test_20_check_no_interface_ODUC4_xpdra2(self):
422 response = test_utils.check_netconf_node_request(
423 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
424 self.assertEqual(response.status_code, requests.codes.conflict)
425 res = response.json()
427 {"error-type": "application", "error-tag": "data-missing",
428 "error-message": "Request could not be completed because the relevant data model content does not exist"},
429 res['errors']['error'])
431 def test_21_check_openroadm_topo_xpdra2(self):
432 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
433 self.assertEqual(response.status_code, requests.codes.ok)
434 res = response.json()
435 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
436 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
437 self.assertEqual({'frequency': 196.08125,
439 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
442 def test_22_check_otn_topo_OTUC4_links(self):
443 response = test_utils.get_otn_topo_request()
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 nb_links = len(res['network'][0]['ietf-network-topology:link'])
447 self.assertEqual(nb_links, 2)
448 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
449 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
450 for link in res['network'][0]['ietf-network-topology:link']:
451 self.assertIn(link['link-id'], listLinkId)
453 link['transportpce-topology:otn-link-type'], 'OTUC4')
455 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
457 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
459 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
461 link['org-openroadm-common-network:opposite-link'], listLinkId)
463 # test service-create for ODU4 service from xpdra2 to xpdrc2
464 def test_23_create_ODUC4_service(self):
465 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
466 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
467 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
468 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
469 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
470 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
471 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
473 response = test_utils.service_create_request(self.cr_serv_sample_data)
474 self.assertEqual(response.status_code, requests.codes.ok)
475 res = response.json()
476 self.assertIn('PCE calculation in progress',
477 res['output']['configuration-response-common']['response-message'])
478 time.sleep(self.WAITING)
480 def test_24_get_ODUC4_service1(self):
481 response = test_utils.get_service_list_request(
482 "services/service1-ODUC4")
483 self.assertEqual(response.status_code, requests.codes.ok)
484 res = response.json()
486 res['services'][0]['administrative-state'], 'inService')
488 res['services'][0]['service-name'], 'service1-ODUC4')
490 res['services'][0]['connection-type'], 'infrastructure')
492 res['services'][0]['lifecycle-state'], 'planned')
494 res['services'][0]['service-layer'], 'wdm')
496 res['services'][0]['service-a-end']['service-rate'], 400)
498 res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
500 res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
503 def test_25_check_interface_ODUC4_xpdra2(self):
504 response = test_utils.check_netconf_node_request(
505 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
506 self.assertEqual(response.status_code, requests.codes.ok)
507 res = response.json()
508 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
509 'administrative-state': 'inService',
510 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
511 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
512 'type': 'org-openroadm-interfaces:otnOdu',
513 'supporting-port': 'L1'}
514 # SAPI/DAPI are added in the Otu4 renderer
515 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
516 'rate': 'org-openroadm-otn-common-types:ODUCn',
517 'expected-dapi': 'Nmbu2MNHvc4=',
518 'expected-sapi': 'Nmbu2MNHvc4=',
519 'tx-dapi': 'Nmbu2MNHvc4=',
520 'tx-sapi': 'LY9PxYJqUbw='}
522 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
524 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
525 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
526 self.assertDictEqual(
527 {'payload-type': '22', 'exp-payload-type': '22'},
528 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
530 def test_26_check_interface_ODUC4_xpdrc2(self):
531 response = test_utils.check_netconf_node_request(
532 "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
536 'administrative-state': 'inService',
537 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
538 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
539 'type': 'org-openroadm-interfaces:otnOdu',
540 'supporting-port': 'L1'}
541 # SAPI/DAPI are added in the Otu4 renderer
542 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
543 'rate': 'org-openroadm-otn-common-types:ODUCn',
544 'tx-sapi': 'Nmbu2MNHvc4=',
545 'tx-dapi': 'LY9PxYJqUbw=',
546 'expected-sapi': 'LY9PxYJqUbw=',
547 'expected-dapi': 'LY9PxYJqUbw='
549 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
551 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
552 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
553 self.assertDictEqual(
554 {'payload-type': '22', 'exp-payload-type': '22'},
555 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
557 def test_27_check_otn_topo_links(self):
558 response = test_utils.get_otn_topo_request()
559 self.assertEqual(response.status_code, requests.codes.ok)
560 res = response.json()
561 nb_links = len(res['network'][0]['ietf-network-topology:link'])
562 self.assertEqual(nb_links, 4)
563 for link in res['network'][0]['ietf-network-topology:link']:
564 linkId = link['link-id']
565 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
566 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
568 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
570 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
571 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
572 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
574 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
576 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
578 link['transportpce-topology:otn-link-type'], 'ODUC4')
580 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
581 self.assertIn(link['org-openroadm-common-network:opposite-link'],
582 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
583 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
585 self.fail("this link should not exist")
587 def test_28_check_otn_topo_tp(self):
588 response = test_utils.get_otn_topo_request()
589 res = response.json()
590 for node in res['network'][0]['node']:
591 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
592 tpList = node['ietf-network-topology:termination-point']
594 if tp['tp-id'] == 'XPDR2-NETWORK1':
595 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
596 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
598 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
599 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
600 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
602 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
603 def test_29_create_100GE_service_1(self):
604 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
605 self.cr_serv_sample_data["input"]["connection-type"] = "service"
606 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
607 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
608 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
609 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
610 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
612 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
613 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
614 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
615 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
616 response = test_utils.service_create_request(self.cr_serv_sample_data)
617 self.assertEqual(response.status_code, requests.codes.ok)
618 res = response.json()
619 self.assertIn('PCE calculation in progress',
620 res['output']['configuration-response-common']['response-message'])
621 time.sleep(self.WAITING)
623 def test_30_get_100GE_service_1(self):
624 response = test_utils.get_service_list_request(
625 "services/service-100GE")
626 self.assertEqual(response.status_code, requests.codes.ok)
627 res = response.json()
629 res['services'][0]['administrative-state'], 'inService')
631 res['services'][0]['service-name'], 'service-100GE')
633 res['services'][0]['connection-type'], 'service')
635 res['services'][0]['lifecycle-state'], 'planned')
638 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
639 response = test_utils.check_netconf_node_request(
640 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
641 self.assertEqual(response.status_code, requests.codes.ok)
642 res = response.json()
643 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
644 'administrative-state': 'inService',
645 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
646 'type': 'org-openroadm-interfaces:ethernetCsmacd',
647 'supporting-port': 'C1'
649 input_dict_2 = {'speed': 100000}
650 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
652 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
653 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
655 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
656 response = test_utils.check_netconf_node_request(
657 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
658 self.assertEqual(response.status_code, requests.codes.ok)
659 res = response.json()
660 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
661 'administrative-state': 'inService',
662 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
663 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
664 'type': 'org-openroadm-interfaces:otnOdu',
665 'supporting-port': 'C1'}
667 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
668 'rate': 'org-openroadm-otn-common-types:ODU4',
669 'monitoring-mode': 'terminated'}
671 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
673 self.assertDictEqual(dict(input_dict_2,
674 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
675 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
676 self.assertDictEqual(
677 {'payload-type': '07', 'exp-payload-type': '07'},
678 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
680 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
681 response = test_utils.check_netconf_node_request(
682 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
683 self.assertEqual(response.status_code, requests.codes.ok)
684 res = response.json()
685 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
686 'administrative-state': 'inService',
687 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
688 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
689 'type': 'org-openroadm-interfaces:otnOdu',
690 'supporting-port': 'L1'}
692 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
693 'rate': 'org-openroadm-otn-common-types:ODU4',
694 'monitoring-mode': 'not-terminated'}
695 input_dict_3 = {'trib-port-number': 1}
697 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
699 self.assertDictEqual(dict(input_dict_2,
700 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
701 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
702 self.assertDictEqual(dict(input_dict_3,
703 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
704 'parent-odu-allocation']),
705 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
706 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
707 ['opucn-trib-slots'])
708 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
709 ['opucn-trib-slots'])
711 def test_34_check_ODU4_connection_xpdra2(self):
712 response = test_utils.check_netconf_node_request(
714 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
715 self.assertEqual(response.status_code, requests.codes.ok)
716 res = response.json()
719 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
720 'direction': 'bidirectional'
723 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
724 res['odu-connection'][0])
725 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
726 res['odu-connection'][0]['destination'])
727 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
728 res['odu-connection'][0]['source'])
730 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
731 response = test_utils.check_netconf_node_request(
732 "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
733 self.assertEqual(response.status_code, requests.codes.ok)
734 res = response.json()
735 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
736 'administrative-state': 'inService',
737 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
738 'type': 'org-openroadm-interfaces:ethernetCsmacd',
739 'supporting-port': 'C1'
741 input_dict_2 = {'speed': 100000}
742 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
744 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
745 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
747 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
748 response = test_utils.check_netconf_node_request(
749 "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
750 self.assertEqual(response.status_code, requests.codes.ok)
751 res = response.json()
752 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
753 'administrative-state': 'inService',
754 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
755 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
756 'type': 'org-openroadm-interfaces:otnOdu',
757 'supporting-port': 'C1'}
759 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
760 'rate': 'org-openroadm-otn-common-types:ODU4',
761 'monitoring-mode': 'terminated'}
763 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
765 self.assertDictEqual(dict(input_dict_2,
766 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
767 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
768 self.assertDictEqual(
769 {'payload-type': '07', 'exp-payload-type': '07'},
770 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
772 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
773 response = test_utils.check_netconf_node_request(
774 "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
775 self.assertEqual(response.status_code, requests.codes.ok)
776 res = response.json()
777 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
778 'administrative-state': 'inService',
779 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
780 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
781 'type': 'org-openroadm-interfaces:otnOdu',
782 'supporting-port': 'C1'}
784 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
785 'rate': 'org-openroadm-otn-common-types:ODU4',
786 'monitoring-mode': 'not-terminated'}
788 input_dict_3 = {'trib-port-number': 1}
790 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
792 self.assertDictEqual(dict(input_dict_2,
793 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
794 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
795 self.assertDictEqual(dict(input_dict_3,
796 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
797 'parent-odu-allocation']),
798 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
799 'parent-odu-allocation'])
802 'org-openroadm-otn-odu-interfaces:odu'][
803 'parent-odu-allocation']['opucn-trib-slots'])
804 self.assertIn('1.20',
806 'org-openroadm-otn-odu-interfaces:odu'][
807 'parent-odu-allocation']['opucn-trib-slots'])
809 def test_38_check_ODU4_connection_xpdrc2(self):
810 response = test_utils.check_netconf_node_request(
812 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
813 self.assertEqual(response.status_code, requests.codes.ok)
814 res = response.json()
817 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
818 'direction': 'bidirectional'
821 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
822 res['odu-connection'][0])
823 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
824 res['odu-connection'][0]['destination'])
825 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
826 res['odu-connection'][0]['source'])
828 def test_39_check_otn_topo_links(self):
829 response = test_utils.get_otn_topo_request()
830 self.assertEqual(response.status_code, requests.codes.ok)
831 res = response.json()
832 nb_links = len(res['network'][0]['ietf-network-topology:link'])
833 self.assertEqual(nb_links, 4)
834 for link in res['network'][0]['ietf-network-topology:link']:
835 linkId = link['link-id']
836 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
837 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
839 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
841 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
843 def test_40_check_otn_topo_tp(self):
844 response = test_utils.get_otn_topo_request()
845 res = response.json()
846 for node in res['network'][0]['node']:
847 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
848 tpList = node['ietf-network-topology:termination-point']
850 if tp['tp-id'] == 'XPDR2-NETWORK1':
851 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
852 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
853 tsPoolList = list(range(1, 20))
855 tsPoolList, xpdrTpPortConAt['ts-pool'])
857 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
859 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
861 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
862 def test_41_create_100GE_service_2(self):
863 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
864 self.cr_serv_sample_data["input"]["connection-type"] = "service"
865 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
866 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
867 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
868 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
869 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
870 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
871 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
872 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
873 response = test_utils.service_create_request(self.cr_serv_sample_data)
874 self.assertEqual(response.status_code, requests.codes.ok)
875 res = response.json()
876 self.assertIn('PCE calculation in progress',
877 res['output']['configuration-response-common']['response-message'])
878 time.sleep(self.WAITING)
880 def test_42_get_100GE_service_2(self):
881 response = test_utils.get_service_list_request(
882 "services/service-100GE2")
883 self.assertEqual(response.status_code, requests.codes.ok)
884 res = response.json()
886 res['services'][0]['administrative-state'], 'inService')
888 res['services'][0]['service-name'], 'service-100GE2')
890 res['services'][0]['connection-type'], 'service')
892 res['services'][0]['lifecycle-state'], 'planned')
895 def test_43_check_service_list(self):
896 response = test_utils.get_service_list_request("")
897 self.assertEqual(response.status_code, requests.codes.ok)
898 res = response.json()
899 self.assertEqual(len(res['service-list']['services']), 4)
902 def test_44_check_otn_topo_links(self):
903 response = test_utils.get_otn_topo_request()
904 self.assertEqual(response.status_code, requests.codes.ok)
905 res = response.json()
906 nb_links = len(res['network'][0]['ietf-network-topology:link'])
907 self.assertEqual(nb_links, 4)
908 for link in res['network'][0]['ietf-network-topology:link']:
909 linkId = link['link-id']
910 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
911 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
913 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
915 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
917 def test_45_check_otn_topo_tp(self):
918 response = test_utils.get_otn_topo_request()
919 res = response.json()
920 for node in res['network'][0]['node']:
921 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
922 tpList = node['ietf-network-topology:termination-point']
924 if tp['tp-id'] == 'XPDR2-NETWORK1':
925 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
926 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
927 tsPoolList = list(range(1, 40))
929 tsPoolList, xpdrTpPortConAt['ts-pool'])
931 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
933 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
935 def test_46_delete_100GE_service_2(self):
936 response = test_utils.service_delete_request("service-100GE2")
937 self.assertEqual(response.status_code, requests.codes.ok)
938 res = response.json()
939 self.assertIn('Renderer service delete in progress',
940 res['output']['configuration-response-common']['response-message'])
941 time.sleep(self.WAITING)
943 def test_47_delete_100GE_service_1(self):
944 response = test_utils.service_delete_request("service-100GE")
945 self.assertEqual(response.status_code, requests.codes.ok)
946 res = response.json()
947 self.assertIn('Renderer service delete in progress',
948 res['output']['configuration-response-common']['response-message'])
949 time.sleep(self.WAITING)
951 def test_48_check_service_list(self):
952 response = test_utils.get_service_list_request("")
953 self.assertEqual(response.status_code, requests.codes.ok)
954 res = response.json()
955 self.assertEqual(len(res['service-list']['services']), 2)
958 def test_49_check_no_ODU4_connection_xpdra2(self):
959 response = test_utils.check_netconf_node_request("XPDR-A2", "")
960 self.assertEqual(response.status_code, requests.codes.ok)
961 res = response.json()
962 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
965 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
966 response = test_utils.check_netconf_node_request(
967 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
968 self.assertEqual(response.status_code, requests.codes.conflict)
970 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
971 response = test_utils.check_netconf_node_request(
972 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
973 self.assertEqual(response.status_code, requests.codes.conflict)
975 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
976 response = test_utils.check_netconf_node_request(
977 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
978 self.assertEqual(response.status_code, requests.codes.conflict)
980 def test_53_check_otn_topo_links(self):
981 response = test_utils.get_otn_topo_request()
982 self.assertEqual(response.status_code, requests.codes.ok)
983 res = response.json()
984 nb_links = len(res['network'][0]['ietf-network-topology:link'])
985 self.assertEqual(nb_links, 4)
986 for link in res['network'][0]['ietf-network-topology:link']:
987 linkId = link['link-id']
988 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
989 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
991 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
993 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
995 def test_54_check_otn_topo_tp(self):
996 response = test_utils.get_otn_topo_request()
997 res = response.json()
998 for node in res['network'][0]['node']:
999 if (node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2'):
1000 tpList = node['ietf-network-topology:termination-point']
1002 if tp['tp-id'] == 'XPDR2-NETWORK1':
1003 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1004 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1006 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1008 def test_55_delete_ODUC4_service(self):
1009 response = test_utils.service_delete_request("service1-ODUC4")
1010 self.assertEqual(response.status_code, requests.codes.ok)
1011 res = response.json()
1012 self.assertIn('Renderer service delete in progress',
1013 res['output']['configuration-response-common']['response-message'])
1014 time.sleep(self.WAITING)
1016 def test_56_check_service_list(self):
1017 response = test_utils.get_service_list_request("")
1018 self.assertEqual(response.status_code, requests.codes.ok)
1019 res = response.json()
1020 self.assertEqual(len(res['service-list']['services']), 1)
1023 def test_57_check_no_interface_ODU4_xpdra2(self):
1024 response = test_utils.check_netconf_node_request(
1025 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1026 self.assertEqual(response.status_code, requests.codes.conflict)
1028 def test_58_check_otn_topo_links(self):
1029 self.test_22_check_otn_topo_OTUC4_links()
1031 def test_59_check_otn_topo_tp(self):
1032 response = test_utils.get_otn_topo_request()
1033 res = response.json()
1034 for node in res['network'][0]['node']:
1035 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
1036 tpList = node['ietf-network-topology:termination-point']
1038 if tp['tp-id'] == 'XPDR2-NETWORK1':
1039 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1042 def test_60_delete_OTUC4_service(self):
1043 response = test_utils.service_delete_request("service1-OTUC4")
1044 self.assertEqual(response.status_code, requests.codes.ok)
1045 res = response.json()
1046 self.assertIn('Renderer service delete in progress',
1047 res['output']['configuration-response-common']['response-message'])
1048 time.sleep(self.WAITING)
1050 def test_61_get_no_service(self):
1051 response = test_utils.get_service_list_request("")
1052 self.assertEqual(response.status_code, requests.codes.conflict)
1053 res = response.json()
1055 {"error-type": "application", "error-tag": "data-missing",
1056 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1057 res['errors']['error'])
1060 def test_62_check_no_interface_OTUC4_xpdra2(self):
1061 response = test_utils.check_netconf_node_request(
1062 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1063 self.assertEqual(response.status_code, requests.codes.conflict)
1065 def test_63_check_no_interface_OCH_xpdra2(self):
1066 response = test_utils.check_netconf_node_request(
1067 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1068 self.assertEqual(response.status_code, requests.codes.conflict)
1070 def test_64_check_no_interface_OTSI_xpdra2(self):
1071 response = test_utils.check_netconf_node_request(
1072 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1073 self.assertEqual(response.status_code, requests.codes.conflict)
1075 def test_65_getLinks_OtnTopology(self):
1076 response = test_utils.get_otn_topo_request()
1077 self.assertEqual(response.status_code, requests.codes.ok)
1078 res = response.json()
1079 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1081 def test_66_check_openroadm_topo_xpdra2(self):
1082 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1083 self.assertEqual(response.status_code, requests.codes.ok)
1084 res = response.json()
1085 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1086 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1087 self.assertNotIn('wavelength', dict.keys(
1088 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1091 def test_67_check_openroadm_topology(self):
1092 response = test_utils.get_ordm_topo_request("")
1093 self.assertEqual(response.status_code, requests.codes.ok)
1094 res = response.json()
1095 links = res['network'][0]['ietf-network-topology:link']
1096 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1098 def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1099 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1100 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1101 self.assertEqual(response.status_code, requests.codes.ok)
1102 res = response.json()
1103 self.assertIn('Xponder Roadm Link created successfully',
1104 res["output"]["result"])
1107 def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1108 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1109 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1110 self.assertEqual(response.status_code, requests.codes.ok)
1111 res = response.json()
1112 self.assertIn('Roadm Xponder links created successfully',
1113 res["output"]["result"])
1116 def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1117 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1118 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1119 self.assertEqual(response.status_code, requests.codes.ok)
1120 res = response.json()
1121 self.assertIn('Xponder Roadm Link created successfully',
1122 res["output"]["result"])
1125 def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1126 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1127 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1128 self.assertEqual(response.status_code, requests.codes.ok)
1129 res = response.json()
1130 self.assertIn('Roadm Xponder links created successfully',
1131 res["output"]["result"])
1134 # test service-create for 400GE service from xpdra2 to xpdrc2
1135 def test_72_create_400GE_service(self):
1136 self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1137 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1138 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1139 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1140 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1141 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1142 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1143 response = test_utils.service_create_request(self.cr_serv_sample_data)
1144 self.assertEqual(response.status_code, requests.codes.ok)
1145 res = response.json()
1146 self.assertIn('PCE calculation in progress',
1147 res['output']['configuration-response-common']['response-message'])
1148 time.sleep(self.WAITING)
1150 def test_73_get_400GE_service(self):
1151 response = test_utils.get_service_list_request(
1152 "services/service-400GE")
1153 self.assertEqual(response.status_code, requests.codes.ok)
1154 res = response.json()
1156 res['services'][0]['administrative-state'], 'inService')
1158 res['services'][0]['service-name'], 'service-400GE')
1160 res['services'][0]['connection-type'], 'service')
1162 res['services'][0]['lifecycle-state'], 'planned')
1165 def test_74_check_xc1_roadma(self):
1166 response = test_utils.check_netconf_node_request(
1167 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1168 self.assertEqual(response.status_code, requests.codes.ok)
1169 res = response.json()
1170 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1171 self.assertDictEqual(
1173 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1174 'opticalControlMode': 'gainLoss',
1175 'target-output-power': -3.0
1176 }, **res['roadm-connections'][0]),
1177 res['roadm-connections'][0]
1179 self.assertDictEqual(
1180 {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1181 res['roadm-connections'][0]['source'])
1182 self.assertDictEqual(
1183 {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1184 res['roadm-connections'][0]['destination'])
1187 def test_75_check_topo_xpdra2(self):
1188 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1189 self.assertEqual(response.status_code, requests.codes.ok)
1190 res = response.json()
1191 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1192 for ele in liste_tp:
1193 if ele['tp-id'] == 'XPDR1-NETWORK1':
1194 self.assertEqual({'frequency': 196.08125,
1196 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1197 if ele['tp-id'] == 'XPDR1-CLIENT1':
1198 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1201 def test_76_check_topo_roadma_SRG1(self):
1202 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1203 self.assertEqual(response.status_code, requests.codes.ok)
1204 res = response.json()
1205 freq_map = base64.b64decode(
1206 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1207 freq_map_array = [int(x) for x in freq_map]
1208 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1209 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1210 for ele in liste_tp:
1211 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1212 freq_map = base64.b64decode(
1213 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1214 freq_map_array = [int(x) for x in freq_map]
1215 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1216 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1217 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1220 def test_77_check_topo_roadma_DEG1(self):
1221 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1222 self.assertEqual(response.status_code, requests.codes.ok)
1223 res = response.json()
1224 freq_map = base64.b64decode(
1225 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1226 freq_map_array = [int(x) for x in freq_map]
1227 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1228 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1229 for ele in liste_tp:
1230 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1231 freq_map = base64.b64decode(
1232 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1233 freq_map_array = [int(x) for x in freq_map]
1234 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1235 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1236 freq_map = base64.b64decode(
1237 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1238 freq_map_array = [int(x) for x in freq_map]
1239 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1242 def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1243 response = test_utils.check_netconf_node_request(
1244 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1245 self.assertEqual(response.status_code, requests.codes.ok)
1246 res = response.json()
1247 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1248 'administrative-state': 'inService',
1249 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1250 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1251 'supporting-port': 'C1'
1253 input_dict_2 = {'speed': 400000}
1254 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1255 res['interface'][0])
1256 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1257 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1259 def test_79_check_interface_OTSI_xpdra2(self):
1260 response = test_utils.check_netconf_node_request(
1261 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1262 self.assertEqual(response.status_code, requests.codes.ok)
1263 res = response.json()
1264 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1265 'administrative-state': 'inService',
1266 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1267 'type': 'org-openroadm-interfaces:otsi',
1268 'supporting-port': 'L1'}
1270 "frequency": 196.0812,
1271 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1272 "fec": "org-openroadm-common-types:ofec",
1273 "transmit-power": -5,
1274 "provision-mode": "explicit",
1275 "modulation-format": "dp-qam16"}
1277 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1278 res['interface'][0])
1279 self.assertDictEqual(dict(input_dict_2,
1280 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1281 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1282 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1283 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1285 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1286 response = test_utils.check_netconf_node_request(
1287 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1288 self.assertEqual(response.status_code, requests.codes.ok)
1289 res = response.json()
1290 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1291 'administrative-state': 'inService',
1292 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1293 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1294 'type': 'org-openroadm-interfaces:otsi-group',
1295 'supporting-port': 'L1'}
1296 input_dict_2 = {"group-id": 1,
1297 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1299 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1300 res['interface'][0])
1301 self.assertDictEqual(dict(input_dict_2,
1302 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1303 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1305 def test_81_check_interface_OTUC4_xpdra2(self):
1306 response = test_utils.check_netconf_node_request(
1307 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1308 self.assertEqual(response.status_code, requests.codes.ok)
1309 res = response.json()
1310 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1311 'administrative-state': 'inService',
1312 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1313 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1314 'type': 'org-openroadm-interfaces:otnOtu',
1315 'supporting-port': 'L1'}
1316 input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1317 "degthr-percentage": 100,
1318 "tim-detect-mode": "Disabled",
1320 "degm-intervals": 2,
1321 "expected-dapi": "AIGiVAQ4gDil"}
1323 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1324 res['interface'][0])
1325 self.assertDictEqual(dict(input_dict_2,
1326 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1327 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1329 def test_82_check_interface_ODUC4_xpdra2(self):
1330 response = test_utils.check_netconf_node_request(
1331 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1332 self.assertEqual(response.status_code, requests.codes.ok)
1333 res = response.json()
1334 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1335 'administrative-state': 'inService',
1336 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1337 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1338 'type': 'org-openroadm-interfaces:otnOdu',
1339 'supporting-port': 'L1'}
1340 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1341 "tim-detect-mode": "Disabled",
1342 "degm-intervals": 2,
1343 "degthr-percentage": 100,
1344 "monitoring-mode": "terminated",
1345 "rate": "org-openroadm-otn-common-types:ODUCn",
1347 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1349 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1350 res['interface'][0])
1351 self.assertDictEqual(dict(input_dict_2,
1352 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1353 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1354 self.assertDictEqual(dict(input_dict_3,
1355 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1356 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1358 def test_83_delete_400GE_service(self):
1359 response = test_utils.service_delete_request("service-400GE")
1360 self.assertEqual(response.status_code, requests.codes.ok)
1361 res = response.json()
1362 self.assertIn('Renderer service delete in progress',
1363 res['output']['configuration-response-common']['response-message'])
1364 time.sleep(self.WAITING)
1366 def test_84_get_no_service(self):
1367 response = test_utils.get_service_list_request("")
1368 self.assertEqual(response.status_code, requests.codes.conflict)
1369 res = response.json()
1371 {"error-type": "application", "error-tag": "data-missing",
1372 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1373 res['errors']['error'])
1376 def test_85_check_no_interface_ODUC4_xpdra2(self):
1377 response = test_utils.check_netconf_node_request(
1378 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1379 self.assertEqual(response.status_code, requests.codes.conflict)
1381 def test_86_check_no_interface_OTUC4_xpdra2(self):
1382 response = test_utils.check_netconf_node_request(
1383 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1384 self.assertEqual(response.status_code, requests.codes.conflict)
1386 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1387 response = test_utils.check_netconf_node_request(
1388 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1389 self.assertEqual(response.status_code, requests.codes.conflict)
1391 def test_88_check_no_interface_OTSI_xpdra2(self):
1392 response = test_utils.check_netconf_node_request(
1393 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1394 self.assertEqual(response.status_code, requests.codes.conflict)
1396 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1397 response = test_utils.check_netconf_node_request(
1398 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1399 self.assertEqual(response.status_code, requests.codes.conflict)
1401 def test_90_disconnect_xponders_from_roadm(self):
1402 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1403 response = test_utils.get_ordm_topo_request("")
1404 self.assertEqual(response.status_code, requests.codes.ok)
1405 res = response.json()
1406 links = res['network'][0]['ietf-network-topology:link']
1408 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1409 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1410 link_name = link["link-id"]
1411 response = test_utils.delete_request(url+link_name)
1412 self.assertEqual(response.status_code, requests.codes.ok)
1414 def test_91_disconnect_xpdra2(self):
1415 response = test_utils.unmount_device("XPDR-A2")
1416 self.assertEqual(response.status_code, requests.codes.ok,
1417 test_utils.CODE_SHOULD_BE_200)
1419 def test_92_disconnect_xpdrc2(self):
1420 response = test_utils.unmount_device("XPDR-C2")
1421 self.assertEqual(response.status_code, requests.codes.ok,
1422 test_utils.CODE_SHOULD_BE_200)
1424 def test_93_disconnect_roadmA(self):
1425 response = test_utils.unmount_device("ROADM-A1")
1426 self.assertEqual(response.status_code, requests.codes.ok,
1427 test_utils.CODE_SHOULD_BE_200)
1429 def test_94_disconnect_roadmC(self):
1430 response = test_utils.unmount_device("ROADM-C1")
1431 self.assertEqual(response.status_code, requests.codes.ok,
1432 test_utils.CODE_SHOULD_BE_200)
1435 if __name__ == "__main__":
1436 unittest.main(verbosity=2)