3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
21 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION_221 = '2.2.1'
34 NODE_VERSION_71 = '7.1'
36 cr_serv_sample_data = {"input": {
37 "sdnc-request-header": {
38 "request-id": "request-1",
39 "rpc-action": "service-create",
40 "request-system-id": "appname"
42 "service-name": "service1-OTUC4",
43 "common-id": "commonId",
44 "connection-type": "infrastructure",
46 "service-rate": "400",
48 "service-format": "OTU",
49 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
53 "committed-info-rate": "100000",
54 "committed-burst-size": "64"
59 "port-device-name": "XPDR-A2-XPDR2",
61 "port-name": "XPDR2-NETWORK1",
62 "port-rack": "000000.00",
63 "port-shelf": "Chassis#1"
66 "lgx-device-name": "Some lgx-device-name",
67 "lgx-port-name": "Some lgx-port-name",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
74 "port-device-name": "XPDR-A2-XPDR2",
76 "port-name": "XPDR2-NETWORK1",
77 "port-rack": "000000.00",
78 "port-shelf": "Chassis#1"
81 "lgx-device-name": "Some lgx-device-name",
82 "lgx-port-name": "Some lgx-port-name",
83 "lgx-port-rack": "000000.00",
84 "lgx-port-shelf": "00"
90 "service-rate": "400",
92 "service-format": "OTU",
93 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
97 "committed-info-rate": "100000",
98 "committed-burst-size": "64"
103 "port-device-name": "XPDR-C2-XPDR2",
104 "port-type": "fixed",
105 "port-name": "XPDR2-NETWORK1",
106 "port-rack": "000000.00",
107 "port-shelf": "Chassis#1"
110 "lgx-device-name": "Some lgx-device-name",
111 "lgx-port-name": "Some lgx-port-name",
112 "lgx-port-rack": "000000.00",
113 "lgx-port-shelf": "00"
118 "port-device-name": "XPDR-C2-XPDR2",
119 "port-type": "fixed",
120 "port-name": "XPDR2-NETWORK1",
121 "port-rack": "000000.00",
122 "port-shelf": "Chassis#1"
125 "lgx-device-name": "Some lgx-device-name",
126 "lgx-port-name": "Some lgx-port-name",
127 "lgx-port-rack": "000000.00",
128 "lgx-port-shelf": "00"
133 "due-date": "2018-06-15T00:00:01Z",
134 "operator-contact": "pw1234"
140 cls.processes = test_utils.start_tpce()
141 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
142 ('roadma', cls.NODE_VERSION_221),
143 ('roadmc', cls.NODE_VERSION_221),
144 ('xpdrc2', cls.NODE_VERSION_71)])
147 def tearDownClass(cls):
148 # pylint: disable=not-an-iterable
149 for process in cls.processes:
150 test_utils.shutdown_process(process)
151 print("all processes killed")
156 def test_01_connect_xpdra2(self):
157 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_02_connect_xpdrc2(self):
162 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_03_connect_rdma(self):
167 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
168 self.assertEqual(response.status_code,
169 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171 def test_04_connect_rdmc(self):
172 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
173 self.assertEqual(response.status_code,
174 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
176 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
177 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
178 "ROADM-A1", "1", "SRG1-PP2-TXRX")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertIn('Xponder Roadm Link created successfully',
182 res["output"]["result"])
185 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
186 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
187 "ROADM-A1", "1", "SRG1-PP2-TXRX")
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Roadm Xponder links created successfully',
191 res["output"]["result"])
194 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
196 "ROADM-C1", "1", "SRG1-PP2-TXRX")
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
199 self.assertIn('Xponder Roadm Link created successfully',
200 res["output"]["result"])
203 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
204 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
205 "ROADM-C1", "1", "SRG1-PP2-TXRX")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 self.assertIn('Roadm Xponder links created successfully',
209 res["output"]["result"])
212 def test_09_add_omsAttributes_roadma_roadmc(self):
213 # Config ROADMA-ROADMC oms-attributes
215 "auto-spanloss": "true",
216 "spanloss-base": 11.4,
217 "spanloss-current": 12,
218 "engineered-spanloss": 12.2,
219 "link-concatenation": [{
222 "SRLG-length": 100000,
224 response = test_utils.add_oms_attr_request(
225 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
226 self.assertEqual(response.status_code, requests.codes.created)
228 def test_10_add_omsAttributes_roadmc_roadma(self):
229 # Config ROADMC-ROADMA oms-attributes
231 "auto-spanloss": "true",
232 "spanloss-base": 11.4,
233 "spanloss-current": 12,
234 "engineered-spanloss": 12.2,
235 "link-concatenation": [{
238 "SRLG-length": 100000,
240 response = test_utils.add_oms_attr_request(
241 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
242 self.assertEqual(response.status_code, requests.codes.created)
244 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
245 def test_11_check_otn_topology(self):
246 response = test_utils.get_otn_topo_request()
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
249 nbNode = len(res['network'][0]['node'])
250 self.assertEqual(nbNode, 4, 'There should be 4 nodes')
251 self.assertNotIn('ietf-network-topology:link', res['network'][0])
253 def test_12_create_OTUC4_service(self):
254 response = test_utils.service_create_request(self.cr_serv_sample_data)
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertIn('PCE calculation in progress',
258 res['output']['configuration-response-common']['response-message'])
259 time.sleep(self.WAITING)
261 def test_13_get_OTUC4_service1(self):
262 response = test_utils.get_service_list_request(
263 "services/service1-OTUC4")
264 self.assertEqual(response.status_code, requests.codes.ok)
265 res = response.json()
267 res['services'][0]['administrative-state'], 'inService')
269 res['services'][0]['service-name'], 'service1-OTUC4')
271 res['services'][0]['connection-type'], 'infrastructure')
273 res['services'][0]['lifecycle-state'], 'planned')
275 res['services'][0]['service-layer'], 'wdm')
277 res['services'][0]['service-a-end']['service-rate'], 400)
279 res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
282 res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
285 # Check correct configuration of devices
286 def test_14_check_interface_och_xpdra2(self):
287 response = test_utils.check_netconf_node_request(
288 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
292 'administrative-state': 'inService',
293 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294 'type': 'org-openroadm-interfaces:otsi',
295 'supporting-port': 'L1'
296 }, **res['interface'][0]),
299 self.assertDictEqual(
300 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
301 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
302 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
303 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
305 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
306 response = test_utils.check_netconf_node_request(
307 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
311 'administrative-state': 'inService',
312 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
313 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
314 'type': 'org-openroadm-interfaces:otsi-group',
315 'supporting-port': 'L1'
317 input_dict_2 = {"group-id": 1,
318 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
320 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
322 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
323 ['org-openroadm-otsi-group-interfaces:otsi-group']),
325 ['org-openroadm-otsi-group-interfaces:otsi-group'])
327 def test_16_check_interface_OTUC4_xpdra2(self):
328 response = test_utils.check_netconf_node_request(
329 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
330 self.assertEqual(response.status_code, requests.codes.ok)
331 res = response.json()
332 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
333 'administrative-state': 'inService',
334 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
335 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
336 'type': 'org-openroadm-interfaces:otnOtu',
337 'supporting-port': 'L1'
339 input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTUCn',
340 'degthr-percentage': 100,
344 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
346 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
348 ['org-openroadm-otn-otu-interfaces:otu'])
350 def test_17_check_interface_och_xpdrc2(self):
351 response = test_utils.check_netconf_node_request(
352 "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
353 self.assertEqual(response.status_code, requests.codes.ok)
354 res = response.json()
355 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
356 'administrative-state': 'inService',
357 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
358 'type': 'org-openroadm-interfaces:otsi',
359 'supporting-port': 'L1'
360 }, **res['interface'][0]),
363 self.assertDictEqual(
364 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
365 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
366 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
367 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
369 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
370 response = test_utils.check_netconf_node_request(
371 "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
372 self.assertEqual(response.status_code, requests.codes.ok)
373 res = response.json()
374 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
375 'administrative-state': 'inService',
376 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
377 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
378 'type': 'org-openroadm-interfaces:otsi-group',
379 'supporting-port': 'L1'
381 input_dict_2 = {"group-id": 1,
382 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
384 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
386 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
387 ['org-openroadm-otsi-group-interfaces:otsi-group']),
389 ['org-openroadm-otsi-group-interfaces:otsi-group'])
391 def test_19_check_interface_OTUC4_xpdrc2(self):
392 response = test_utils.check_netconf_node_request(
393 "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
397 'administrative-state': 'inService',
398 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
399 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
400 'type': 'org-openroadm-interfaces:otnOtu',
401 'supporting-port': 'L1'
403 input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTUCn',
404 'degthr-percentage': 100,
409 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
412 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
414 ['org-openroadm-otn-otu-interfaces:otu'])
416 def test_20_check_no_interface_ODUC4_xpdra2(self):
417 response = test_utils.check_netconf_node_request(
418 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
419 self.assertEqual(response.status_code, requests.codes.conflict)
420 res = response.json()
422 {"error-type": "application", "error-tag": "data-missing",
423 "error-message": "Request could not be completed because the relevant data model content does not exist"},
424 res['errors']['error'])
426 def test_21_check_openroadm_topo_xpdra2(self):
427 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
428 self.assertEqual(response.status_code, requests.codes.ok)
429 res = response.json()
430 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
431 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
432 self.assertEqual({'frequency': 196.08125,
434 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
437 def test_22_check_otn_topo_OTUC4_links(self):
438 response = test_utils.get_otn_topo_request()
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 nb_links = len(res['network'][0]['ietf-network-topology:link'])
442 self.assertEqual(nb_links, 2)
443 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
444 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
445 for link in res['network'][0]['ietf-network-topology:link']:
446 self.assertIn(link['link-id'], listLinkId)
448 link['transportpce-topology:otn-link-type'], 'OTUC4')
450 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
452 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
454 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
456 link['org-openroadm-common-network:opposite-link'], listLinkId)
458 # test service-create for ODU4 service from xpdra2 to xpdrc2
459 def test_23_create_ODUC4_service(self):
460 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
461 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
462 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
463 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
464 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
465 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
466 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
468 response = test_utils.service_create_request(self.cr_serv_sample_data)
469 self.assertEqual(response.status_code, requests.codes.ok)
470 res = response.json()
471 self.assertIn('PCE calculation in progress',
472 res['output']['configuration-response-common']['response-message'])
473 time.sleep(self.WAITING)
475 def test_24_get_ODUC4_service1(self):
476 response = test_utils.get_service_list_request(
477 "services/service1-ODUC4")
478 self.assertEqual(response.status_code, requests.codes.ok)
479 res = response.json()
481 res['services'][0]['administrative-state'], 'inService')
483 res['services'][0]['service-name'], 'service1-ODUC4')
485 res['services'][0]['connection-type'], 'infrastructure')
487 res['services'][0]['lifecycle-state'], 'planned')
489 res['services'][0]['service-layer'], 'wdm')
491 res['services'][0]['service-a-end']['service-rate'], 400)
493 res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
495 res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
498 def test_25_check_interface_ODUC4_xpdra2(self):
499 response = test_utils.check_netconf_node_request(
500 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
501 self.assertEqual(response.status_code, requests.codes.ok)
502 res = response.json()
503 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
504 'administrative-state': 'inService',
505 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
506 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
507 'type': 'org-openroadm-interfaces:otnOdu',
508 'supporting-port': 'L1'}
509 # SAPI/DAPI are added in the Otu4 renderer
510 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
511 'rate': 'org-openroadm-otn-common-types:ODUCn',
512 'expected-dapi': 'Nmbu2MNHvc4=',
513 'expected-sapi': 'Nmbu2MNHvc4=',
514 'tx-dapi': 'Nmbu2MNHvc4=',
515 'tx-sapi': 'LY9PxYJqUbw='}
517 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
519 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
520 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
521 self.assertDictEqual(
522 {'payload-type': '22', 'exp-payload-type': '22'},
523 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
525 def test_26_check_interface_ODUC4_xpdrc2(self):
526 response = test_utils.check_netconf_node_request(
527 "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
531 'administrative-state': 'inService',
532 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
533 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
534 'type': 'org-openroadm-interfaces:otnOdu',
535 'supporting-port': 'L1'}
536 # SAPI/DAPI are added in the Otu4 renderer
537 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
538 'rate': 'org-openroadm-otn-common-types:ODUCn',
539 'tx-sapi': 'Nmbu2MNHvc4=',
540 'tx-dapi': 'LY9PxYJqUbw=',
541 'expected-sapi': 'LY9PxYJqUbw=',
542 'expected-dapi': 'LY9PxYJqUbw='
544 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
546 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
547 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
548 self.assertDictEqual(
549 {'payload-type': '22', 'exp-payload-type': '22'},
550 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
552 def test_27_check_otn_topo_links(self):
553 response = test_utils.get_otn_topo_request()
554 self.assertEqual(response.status_code, requests.codes.ok)
555 res = response.json()
556 nb_links = len(res['network'][0]['ietf-network-topology:link'])
557 self.assertEqual(nb_links, 4)
558 for link in res['network'][0]['ietf-network-topology:link']:
559 linkId = link['link-id']
560 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
561 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
563 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
565 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
566 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
567 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
569 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
571 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
573 link['transportpce-topology:otn-link-type'], 'ODUC4')
575 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
576 self.assertIn(link['org-openroadm-common-network:opposite-link'],
577 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
578 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
580 self.fail("this link should not exist")
582 def test_28_check_otn_topo_tp(self):
583 response = test_utils.get_otn_topo_request()
584 res = response.json()
585 for node in res['network'][0]['node']:
586 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
587 tpList = node['ietf-network-topology:termination-point']
589 if tp['tp-id'] == 'XPDR2-NETWORK1':
590 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
591 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
593 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
594 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
595 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
597 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
598 def test_29_create_100GE_service_1(self):
599 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
600 self.cr_serv_sample_data["input"]["connection-type"] = "service"
601 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
602 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
603 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
604 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
605 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
606 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
607 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
608 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
609 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
610 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611 response = test_utils.service_create_request(self.cr_serv_sample_data)
612 self.assertEqual(response.status_code, requests.codes.ok)
613 res = response.json()
614 self.assertIn('PCE calculation in progress',
615 res['output']['configuration-response-common']['response-message'])
616 time.sleep(self.WAITING)
618 def test_30_get_100GE_service_1(self):
619 response = test_utils.get_service_list_request(
620 "services/service-100GE")
621 self.assertEqual(response.status_code, requests.codes.ok)
622 res = response.json()
624 res['services'][0]['administrative-state'], 'inService')
626 res['services'][0]['service-name'], 'service-100GE')
628 res['services'][0]['connection-type'], 'service')
630 res['services'][0]['lifecycle-state'], 'planned')
633 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
634 response = test_utils.check_netconf_node_request(
635 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
639 'administrative-state': 'inService',
640 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
641 'type': 'org-openroadm-interfaces:ethernetCsmacd',
642 'supporting-port': 'C1'
644 input_dict_2 = {'speed': 100000}
645 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
647 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
648 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
650 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
651 response = test_utils.check_netconf_node_request(
652 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
655 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
656 'administrative-state': 'inService',
657 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
658 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
659 'type': 'org-openroadm-interfaces:otnOdu',
660 'supporting-port': 'C1'}
662 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
663 'rate': 'org-openroadm-otn-common-types:ODU4',
664 'monitoring-mode': 'terminated'}
666 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
668 self.assertDictEqual(dict(input_dict_2,
669 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
670 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
671 self.assertDictEqual(
672 {'payload-type': '07', 'exp-payload-type': '07'},
673 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
675 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
676 response = test_utils.check_netconf_node_request(
677 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
681 'administrative-state': 'inService',
682 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
683 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
684 'type': 'org-openroadm-interfaces:otnOdu',
685 'supporting-port': 'L1'}
687 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
688 'rate': 'org-openroadm-otn-common-types:ODU4',
689 'monitoring-mode': 'not-terminated'}
690 input_dict_3 = {'trib-port-number': 1}
692 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
694 self.assertDictEqual(dict(input_dict_2,
695 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
696 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
697 self.assertDictEqual(dict(input_dict_3,
698 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
699 'parent-odu-allocation']),
700 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
701 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
702 ['opucn-trib-slots'])
703 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
704 ['opucn-trib-slots'])
706 def test_34_check_ODU4_connection_xpdra2(self):
707 response = test_utils.check_netconf_node_request(
709 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
710 self.assertEqual(response.status_code, requests.codes.ok)
711 res = response.json()
714 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
715 'direction': 'bidirectional'
718 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
719 res['odu-connection'][0])
720 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
721 res['odu-connection'][0]['destination'])
722 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
723 res['odu-connection'][0]['source'])
725 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
726 response = test_utils.check_netconf_node_request(
727 "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
728 self.assertEqual(response.status_code, requests.codes.ok)
729 res = response.json()
730 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
731 'administrative-state': 'inService',
732 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
733 'type': 'org-openroadm-interfaces:ethernetCsmacd',
734 'supporting-port': 'C1'
736 input_dict_2 = {'speed': 100000}
737 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
739 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
740 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
742 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
743 response = test_utils.check_netconf_node_request(
744 "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
748 'administrative-state': 'inService',
749 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
750 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
751 'type': 'org-openroadm-interfaces:otnOdu',
752 'supporting-port': 'C1'}
754 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
755 'rate': 'org-openroadm-otn-common-types:ODU4',
756 'monitoring-mode': 'terminated'}
758 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
760 self.assertDictEqual(dict(input_dict_2,
761 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
762 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
763 self.assertDictEqual(
764 {'payload-type': '07', 'exp-payload-type': '07'},
765 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
767 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
768 response = test_utils.check_netconf_node_request(
769 "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
770 self.assertEqual(response.status_code, requests.codes.ok)
771 res = response.json()
772 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
773 'administrative-state': 'inService',
774 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
775 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
776 'type': 'org-openroadm-interfaces:otnOdu',
777 'supporting-port': 'C1'}
779 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
780 'rate': 'org-openroadm-otn-common-types:ODU4',
781 'monitoring-mode': 'not-terminated'}
783 input_dict_3 = {'trib-port-number': 1}
785 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
787 self.assertDictEqual(dict(input_dict_2,
788 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
789 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
790 self.assertDictEqual(dict(input_dict_3,
791 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
792 'parent-odu-allocation']),
793 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
794 'parent-odu-allocation'])
797 'org-openroadm-otn-odu-interfaces:odu'][
798 'parent-odu-allocation']['opucn-trib-slots'])
799 self.assertIn('1.20',
801 'org-openroadm-otn-odu-interfaces:odu'][
802 'parent-odu-allocation']['opucn-trib-slots'])
804 def test_38_check_ODU4_connection_xpdrc2(self):
805 response = test_utils.check_netconf_node_request(
807 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
808 self.assertEqual(response.status_code, requests.codes.ok)
809 res = response.json()
812 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
813 'direction': 'bidirectional'
816 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
817 res['odu-connection'][0])
818 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service-100GE'},
819 res['odu-connection'][0]['destination'])
820 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service-100GE'},
821 res['odu-connection'][0]['source'])
823 def test_39_check_otn_topo_links(self):
824 response = test_utils.get_otn_topo_request()
825 self.assertEqual(response.status_code, requests.codes.ok)
826 res = response.json()
827 nb_links = len(res['network'][0]['ietf-network-topology:link'])
828 self.assertEqual(nb_links, 4)
829 for link in res['network'][0]['ietf-network-topology:link']:
830 linkId = link['link-id']
831 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
832 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
834 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
836 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
838 def test_40_check_otn_topo_tp(self):
839 response = test_utils.get_otn_topo_request()
840 res = response.json()
841 for node in res['network'][0]['node']:
842 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
843 tpList = node['ietf-network-topology:termination-point']
845 if tp['tp-id'] == 'XPDR2-NETWORK1':
846 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
847 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
848 tsPoolList = list(range(1, 20))
850 tsPoolList, xpdrTpPortConAt['ts-pool'])
852 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
854 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
856 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
857 def test_41_create_100GE_service_2(self):
858 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
859 self.cr_serv_sample_data["input"]["connection-type"] = "service"
860 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
861 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
862 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
863 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
864 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
865 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
866 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
867 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
868 response = test_utils.service_create_request(self.cr_serv_sample_data)
869 self.assertEqual(response.status_code, requests.codes.ok)
870 res = response.json()
871 self.assertIn('PCE calculation in progress',
872 res['output']['configuration-response-common']['response-message'])
873 time.sleep(self.WAITING)
875 def test_42_get_100GE_service_2(self):
876 response = test_utils.get_service_list_request(
877 "services/service-100GE2")
878 self.assertEqual(response.status_code, requests.codes.ok)
879 res = response.json()
881 res['services'][0]['administrative-state'], 'inService')
883 res['services'][0]['service-name'], 'service-100GE2')
885 res['services'][0]['connection-type'], 'service')
887 res['services'][0]['lifecycle-state'], 'planned')
890 def test_43_check_service_list(self):
891 response = test_utils.get_service_list_request("")
892 self.assertEqual(response.status_code, requests.codes.ok)
893 res = response.json()
894 self.assertEqual(len(res['service-list']['services']), 4)
897 def test_44_check_otn_topo_links(self):
898 response = test_utils.get_otn_topo_request()
899 self.assertEqual(response.status_code, requests.codes.ok)
900 res = response.json()
901 nb_links = len(res['network'][0]['ietf-network-topology:link'])
902 self.assertEqual(nb_links, 4)
903 for link in res['network'][0]['ietf-network-topology:link']:
904 linkId = link['link-id']
905 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
906 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
908 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
910 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
912 def test_45_check_otn_topo_tp(self):
913 response = test_utils.get_otn_topo_request()
914 res = response.json()
915 for node in res['network'][0]['node']:
916 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
917 tpList = node['ietf-network-topology:termination-point']
919 if tp['tp-id'] == 'XPDR2-NETWORK1':
920 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
921 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
922 tsPoolList = list(range(1, 40))
924 tsPoolList, xpdrTpPortConAt['ts-pool'])
926 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
928 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
930 def test_46_delete_100GE_service_2(self):
931 response = test_utils.service_delete_request("service-100GE2")
932 self.assertEqual(response.status_code, requests.codes.ok)
933 res = response.json()
934 self.assertIn('Renderer service delete in progress',
935 res['output']['configuration-response-common']['response-message'])
936 time.sleep(self.WAITING)
938 def test_47_delete_100GE_service_1(self):
939 response = test_utils.service_delete_request("service-100GE")
940 self.assertEqual(response.status_code, requests.codes.ok)
941 res = response.json()
942 self.assertIn('Renderer service delete in progress',
943 res['output']['configuration-response-common']['response-message'])
944 time.sleep(self.WAITING)
946 def test_48_check_service_list(self):
947 response = test_utils.get_service_list_request("")
948 self.assertEqual(response.status_code, requests.codes.ok)
949 res = response.json()
950 self.assertEqual(len(res['service-list']['services']), 2)
953 def test_49_check_no_ODU4_connection_xpdra2(self):
954 response = test_utils.check_netconf_node_request("XPDR-A2", "")
955 self.assertEqual(response.status_code, requests.codes.ok)
956 res = response.json()
957 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
960 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
961 response = test_utils.check_netconf_node_request(
962 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
963 self.assertEqual(response.status_code, requests.codes.conflict)
965 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
966 response = test_utils.check_netconf_node_request(
967 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
968 self.assertEqual(response.status_code, requests.codes.conflict)
970 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
971 response = test_utils.check_netconf_node_request(
972 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
973 self.assertEqual(response.status_code, requests.codes.conflict)
975 def test_53_check_otn_topo_links(self):
976 response = test_utils.get_otn_topo_request()
977 self.assertEqual(response.status_code, requests.codes.ok)
978 res = response.json()
979 nb_links = len(res['network'][0]['ietf-network-topology:link'])
980 self.assertEqual(nb_links, 4)
981 for link in res['network'][0]['ietf-network-topology:link']:
982 linkId = link['link-id']
983 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
984 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
986 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
988 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
990 def test_54_check_otn_topo_tp(self):
991 response = test_utils.get_otn_topo_request()
992 res = response.json()
993 for node in res['network'][0]['node']:
994 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
995 tpList = node['ietf-network-topology:termination-point']
997 if tp['tp-id'] == 'XPDR2-NETWORK1':
998 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
999 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1001 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1003 def test_55_delete_ODUC4_service(self):
1004 response = test_utils.service_delete_request("service1-ODUC4")
1005 self.assertEqual(response.status_code, requests.codes.ok)
1006 res = response.json()
1007 self.assertIn('Renderer service delete in progress',
1008 res['output']['configuration-response-common']['response-message'])
1009 time.sleep(self.WAITING)
1011 def test_56_check_service_list(self):
1012 response = test_utils.get_service_list_request("")
1013 self.assertEqual(response.status_code, requests.codes.ok)
1014 res = response.json()
1015 self.assertEqual(len(res['service-list']['services']), 1)
1018 def test_57_check_no_interface_ODU4_xpdra2(self):
1019 response = test_utils.check_netconf_node_request(
1020 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1021 self.assertEqual(response.status_code, requests.codes.conflict)
1023 def test_58_check_otn_topo_links(self):
1024 self.test_22_check_otn_topo_OTUC4_links()
1026 def test_59_check_otn_topo_tp(self):
1027 response = test_utils.get_otn_topo_request()
1028 res = response.json()
1029 for node in res['network'][0]['node']:
1030 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1031 tpList = node['ietf-network-topology:termination-point']
1033 if tp['tp-id'] == 'XPDR2-NETWORK1':
1034 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1037 def test_60_delete_OTUC4_service(self):
1038 response = test_utils.service_delete_request("service1-OTUC4")
1039 self.assertEqual(response.status_code, requests.codes.ok)
1040 res = response.json()
1041 self.assertIn('Renderer service delete in progress',
1042 res['output']['configuration-response-common']['response-message'])
1043 time.sleep(self.WAITING)
1045 def test_61_get_no_service(self):
1046 response = test_utils.get_service_list_request("")
1047 self.assertEqual(response.status_code, requests.codes.conflict)
1048 res = response.json()
1050 {"error-type": "application", "error-tag": "data-missing",
1051 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1052 res['errors']['error'])
1055 def test_62_check_no_interface_OTUC4_xpdra2(self):
1056 response = test_utils.check_netconf_node_request(
1057 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1058 self.assertEqual(response.status_code, requests.codes.conflict)
1060 def test_63_check_no_interface_OCH_xpdra2(self):
1061 response = test_utils.check_netconf_node_request(
1062 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1063 self.assertEqual(response.status_code, requests.codes.conflict)
1065 def test_64_check_no_interface_OTSI_xpdra2(self):
1066 response = test_utils.check_netconf_node_request(
1067 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1068 self.assertEqual(response.status_code, requests.codes.conflict)
1070 def test_65_getLinks_OtnTopology(self):
1071 response = test_utils.get_otn_topo_request()
1072 self.assertEqual(response.status_code, requests.codes.ok)
1073 res = response.json()
1074 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1076 def test_66_check_openroadm_topo_xpdra2(self):
1077 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1078 self.assertEqual(response.status_code, requests.codes.ok)
1079 res = response.json()
1080 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1081 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1082 self.assertNotIn('wavelength', dict.keys(
1083 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1086 def test_67_check_openroadm_topology(self):
1087 response = test_utils.get_ordm_topo_request("")
1088 self.assertEqual(response.status_code, requests.codes.ok)
1089 res = response.json()
1090 links = res['network'][0]['ietf-network-topology:link']
1091 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1093 def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1094 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1095 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1096 self.assertEqual(response.status_code, requests.codes.ok)
1097 res = response.json()
1098 self.assertIn('Xponder Roadm Link created successfully',
1099 res["output"]["result"])
1102 def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1103 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1104 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1105 self.assertEqual(response.status_code, requests.codes.ok)
1106 res = response.json()
1107 self.assertIn('Roadm Xponder links created successfully',
1108 res["output"]["result"])
1111 def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1112 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1113 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1114 self.assertEqual(response.status_code, requests.codes.ok)
1115 res = response.json()
1116 self.assertIn('Xponder Roadm Link created successfully',
1117 res["output"]["result"])
1120 def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1121 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1122 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1123 self.assertEqual(response.status_code, requests.codes.ok)
1124 res = response.json()
1125 self.assertIn('Roadm Xponder links created successfully',
1126 res["output"]["result"])
1130 # test service-create for 400GE service from xpdra2 to xpdrc2
1132 def test_72_create_400GE_service(self):
1133 self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1134 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1135 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1136 response = test_utils.service_create_request(self.cr_serv_sample_data)
1137 self.assertEqual(response.status_code, requests.codes.ok)
1138 res = response.json()
1139 self.assertIn('PCE calculation in progress',
1140 res['output']['configuration-response-common']['response-message'])
1141 time.sleep(self.WAITING)
1143 def test_73_get_400GE_service(self):
1144 response = test_utils.get_service_list_request(
1145 "services/service-400GE")
1146 self.assertEqual(response.status_code, requests.codes.ok)
1147 res = response.json()
1149 res['services'][0]['administrative-state'], 'inService')
1151 res['services'][0]['service-name'], 'service-400GE')
1153 res['services'][0]['connection-type'], 'service')
1155 res['services'][0]['lifecycle-state'], 'planned')
1158 def test_74_check_xc1_roadma(self):
1159 response = test_utils.check_netconf_node_request(
1160 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1161 self.assertEqual(response.status_code, requests.codes.ok)
1162 res = response.json()
1163 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1164 self.assertDictEqual(
1166 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1167 'opticalControlMode': 'gainLoss',
1168 'target-output-power': -3.0
1169 }, **res['roadm-connections'][0]),
1170 res['roadm-connections'][0]
1172 self.assertDictEqual(
1173 {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1174 res['roadm-connections'][0]['source'])
1175 self.assertDictEqual(
1176 {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1177 res['roadm-connections'][0]['destination'])
1180 def test_75_check_topo_xpdra2(self):
1181 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1182 self.assertEqual(response.status_code, requests.codes.ok)
1183 res = response.json()
1184 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1185 for ele in liste_tp:
1186 if ele['tp-id'] == 'XPDR1-NETWORK1':
1187 self.assertEqual({'frequency': 196.08125,
1189 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1190 if ele['tp-id'] == 'XPDR1-CLIENT1':
1191 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1194 def test_76_check_topo_roadma_SRG1(self):
1195 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1196 self.assertEqual(response.status_code, requests.codes.ok)
1197 res = response.json()
1198 freq_map = base64.b64decode(
1199 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1200 freq_map_array = [int(x) for x in freq_map]
1201 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1202 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1203 for ele in liste_tp:
1204 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1205 freq_map = base64.b64decode(
1206 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1207 freq_map_array = [int(x) for x in freq_map]
1208 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1209 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1210 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1213 def test_77_check_topo_roadma_DEG1(self):
1214 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1215 self.assertEqual(response.status_code, requests.codes.ok)
1216 res = response.json()
1217 freq_map = base64.b64decode(
1218 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1219 freq_map_array = [int(x) for x in freq_map]
1220 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1221 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1222 for ele in liste_tp:
1223 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1224 freq_map = base64.b64decode(
1225 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1226 freq_map_array = [int(x) for x in freq_map]
1227 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1228 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1229 freq_map = base64.b64decode(
1230 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1231 freq_map_array = [int(x) for x in freq_map]
1232 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1235 def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1236 response = test_utils.check_netconf_node_request(
1237 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1238 self.assertEqual(response.status_code, requests.codes.ok)
1239 res = response.json()
1240 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1241 'administrative-state': 'inService',
1242 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1243 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1244 'supporting-port': 'C1'
1246 input_dict_2 = {'speed': 400000}
1247 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1248 res['interface'][0])
1249 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1250 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1252 def test_79_check_interface_OTSI_xpdra2(self):
1253 response = test_utils.check_netconf_node_request(
1254 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1255 self.assertEqual(response.status_code, requests.codes.ok)
1256 res = response.json()
1257 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1258 'administrative-state': 'inService',
1259 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1260 'type': 'org-openroadm-interfaces:otsi',
1261 'supporting-port': 'L1'}
1263 "frequency": 196.0812,
1264 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1265 "fec": "org-openroadm-common-types:ofec",
1266 "transmit-power": -5,
1267 "provision-mode": "explicit",
1268 "modulation-format": "dp-qam16"}
1270 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1271 res['interface'][0])
1272 self.assertDictEqual(dict(input_dict_2,
1273 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1274 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1275 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1276 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1278 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1279 response = test_utils.check_netconf_node_request(
1280 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1281 self.assertEqual(response.status_code, requests.codes.ok)
1282 res = response.json()
1283 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1284 'administrative-state': 'inService',
1285 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1286 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1287 'type': 'org-openroadm-interfaces:otsi-group',
1288 'supporting-port': 'L1'}
1289 input_dict_2 = {"group-id": 1,
1290 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1292 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1293 res['interface'][0])
1294 self.assertDictEqual(dict(input_dict_2,
1295 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1296 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1298 def test_81_check_interface_OTUC4_xpdra2(self):
1299 response = test_utils.check_netconf_node_request(
1300 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1301 self.assertEqual(response.status_code, requests.codes.ok)
1302 res = response.json()
1303 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1304 'administrative-state': 'inService',
1305 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1306 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1307 'type': 'org-openroadm-interfaces:otnOtu',
1308 'supporting-port': 'L1'}
1309 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
1310 "degthr-percentage": 100,
1311 "tim-detect-mode": "Disabled",
1313 "degm-intervals": 2}
1315 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1316 res['interface'][0])
1317 self.assertDictEqual(dict(input_dict_2,
1318 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1319 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1321 @unittest.skip("Temporary skipped waiting for the rest of the functionality in change 97834")
1322 def test_82_check_interface_ODUC4_xpdra2(self):
1323 response = test_utils.check_netconf_node_request(
1324 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1325 self.assertEqual(response.status_code, requests.codes.ok)
1326 res = response.json()
1327 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1328 'administrative-state': 'inService',
1329 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1330 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1331 'type': 'org-openroadm-interfaces:otnOdu',
1332 'supporting-port': 'L1'}
1333 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1334 "tim-detect-mode": "Disabled",
1335 "degm-intervals": 2,
1336 "degthr-percentage": 100,
1337 "monitoring-mode": "terminated",
1338 "rate": "org-openroadm-otn-common-types:ODUCn",
1340 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1342 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1343 res['interface'][0])
1344 self.assertDictEqual(dict(input_dict_2,
1345 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1346 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1347 self.assertDictEqual(dict(input_dict_3,
1348 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1349 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1351 def test_83_delete_400GE_service(self):
1352 response = test_utils.service_delete_request("service-400GE")
1353 self.assertEqual(response.status_code, requests.codes.ok)
1354 res = response.json()
1355 self.assertIn('Renderer service delete in progress',
1356 res['output']['configuration-response-common']['response-message'])
1357 time.sleep(self.WAITING)
1359 def test_84_get_no_service(self):
1360 response = test_utils.get_service_list_request("")
1361 self.assertEqual(response.status_code, requests.codes.conflict)
1362 res = response.json()
1364 {"error-type": "application", "error-tag": "data-missing",
1365 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1366 res['errors']['error'])
1369 def test_85_check_no_interface_ODUC4_xpdra2(self):
1370 response = test_utils.check_netconf_node_request(
1371 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1372 self.assertEqual(response.status_code, requests.codes.conflict)
1374 def test_86_check_no_interface_OTUC4_xpdra2(self):
1375 response = test_utils.check_netconf_node_request(
1376 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1377 self.assertEqual(response.status_code, requests.codes.conflict)
1379 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1380 response = test_utils.check_netconf_node_request(
1381 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1382 self.assertEqual(response.status_code, requests.codes.conflict)
1384 def test_88_check_no_interface_OTSI_xpdra2(self):
1385 response = test_utils.check_netconf_node_request(
1386 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1387 self.assertEqual(response.status_code, requests.codes.conflict)
1389 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1390 response = test_utils.check_netconf_node_request(
1391 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1392 self.assertEqual(response.status_code, requests.codes.conflict)
1394 def test_90_disconnect_xponders_from_roadm(self):
1395 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1396 response = test_utils.get_ordm_topo_request("")
1397 self.assertEqual(response.status_code, requests.codes.ok)
1398 res = response.json()
1399 links = res['network'][0]['ietf-network-topology:link']
1401 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1402 link_name = link["link-id"]
1403 response = test_utils.delete_request(url+link_name)
1404 self.assertEqual(response.status_code, requests.codes.ok)
1406 def test_91_disconnect_xpdra2(self):
1407 response = test_utils.unmount_device("XPDR-A2")
1408 self.assertEqual(response.status_code, requests.codes.ok,
1409 test_utils.CODE_SHOULD_BE_200)
1411 def test_92_disconnect_xpdrc2(self):
1412 response = test_utils.unmount_device("XPDR-C2")
1413 self.assertEqual(response.status_code, requests.codes.ok,
1414 test_utils.CODE_SHOULD_BE_200)
1416 def test_93_disconnect_roadmA(self):
1417 response = test_utils.unmount_device("ROADM-A1")
1418 self.assertEqual(response.status_code, requests.codes.ok,
1419 test_utils.CODE_SHOULD_BE_200)
1421 def test_94_disconnect_roadmC(self):
1422 response = test_utils.unmount_device("ROADM-C1")
1423 self.assertEqual(response.status_code, requests.codes.ok,
1424 test_utils.CODE_SHOULD_BE_200)
1427 if __name__ == "__main__":
1428 unittest.main(verbosity=2)