3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
20 sys.path.append('transportpce_tests/common/')
24 class TransportPCEtesting(unittest.TestCase):
27 WAITING = 20 # nominal value is 300
28 NODE_VERSION_221 = '2.2.1'
29 NODE_VERSION_71 = '7.1'
31 cr_serv_sample_data = {"input": {
32 "sdnc-request-header": {
33 "request-id": "request-1",
34 "rpc-action": "service-create",
35 "request-system-id": "appname"
37 "service-name": "service1-OTUC4",
38 "common-id": "commonId",
39 "connection-type": "infrastructure",
41 "service-rate": "400",
43 "service-format": "OTU",
44 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
48 "committed-info-rate": "100000",
49 "committed-burst-size": "64"
54 "port-device-name": "XPDR-A2-XPDR2",
56 "port-name": "XPDR2-NETWORK1",
57 "port-rack": "000000.00",
58 "port-shelf": "Chassis#1"
61 "lgx-device-name": "Some lgx-device-name",
62 "lgx-port-name": "Some lgx-port-name",
63 "lgx-port-rack": "000000.00",
64 "lgx-port-shelf": "00"
69 "port-device-name": "XPDR-A2-XPDR2",
71 "port-name": "XPDR2-NETWORK1",
72 "port-rack": "000000.00",
73 "port-shelf": "Chassis#1"
76 "lgx-device-name": "Some lgx-device-name",
77 "lgx-port-name": "Some lgx-port-name",
78 "lgx-port-rack": "000000.00",
79 "lgx-port-shelf": "00"
85 "service-rate": "400",
87 "service-format": "OTU",
88 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
92 "committed-info-rate": "100000",
93 "committed-burst-size": "64"
98 "port-device-name": "XPDR-C2-XPDR2",
100 "port-name": "XPDR2-NETWORK1",
101 "port-rack": "000000.00",
102 "port-shelf": "Chassis#1"
105 "lgx-device-name": "Some lgx-device-name",
106 "lgx-port-name": "Some lgx-port-name",
107 "lgx-port-rack": "000000.00",
108 "lgx-port-shelf": "00"
113 "port-device-name": "XPDR-C2-XPDR2",
114 "port-type": "fixed",
115 "port-name": "XPDR2-NETWORK1",
116 "port-rack": "000000.00",
117 "port-shelf": "Chassis#1"
120 "lgx-device-name": "Some lgx-device-name",
121 "lgx-port-name": "Some lgx-port-name",
122 "lgx-port-rack": "000000.00",
123 "lgx-port-shelf": "00"
128 "due-date": "2018-06-15T00:00:01Z",
129 "operator-contact": "pw1234"
135 cls.processes = test_utils.start_tpce()
136 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
137 ('roadma', cls.NODE_VERSION_221),
138 ('roadmc', cls.NODE_VERSION_221),
139 ('xpdrc2', cls.NODE_VERSION_71)])
142 def tearDownClass(cls):
143 # pylint: disable=not-an-iterable
144 for process in cls.processes:
145 test_utils.shutdown_process(process)
146 print("all processes killed")
151 def test_01_connect_xpdra2(self):
152 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
153 self.assertEqual(response.status_code,
154 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_02_connect_xpdrc2(self):
157 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_03_connect_rdma(self):
162 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_04_connect_rdmc(self):
167 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
168 self.assertEqual(response.status_code,
169 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
172 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "2", "1",
173 "ROADM-A1", "1", "SRG1-PP2-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Xponder Roadm Link created successfully',
177 res["output"]["result"])
180 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "2", "1",
182 "ROADM-A1", "1", "SRG1-PP2-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully',
186 res["output"]["result"])
189 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
190 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "2", "1",
191 "ROADM-C1", "1", "SRG1-PP2-TXRX")
192 self.assertEqual(response.status_code, requests.codes.ok)
193 res = response.json()
194 self.assertIn('Xponder Roadm Link created successfully',
195 res["output"]["result"])
198 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
199 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "2", "1",
200 "ROADM-C1", "1", "SRG1-PP2-TXRX")
201 self.assertEqual(response.status_code, requests.codes.ok)
202 res = response.json()
203 self.assertIn('Roadm Xponder links created successfully',
204 res["output"]["result"])
207 def test_09_add_omsAttributes_roadma_roadmc(self):
208 # Config ROADMA-ROADMC oms-attributes
210 "auto-spanloss": "true",
211 "spanloss-base": 11.4,
212 "spanloss-current": 12,
213 "engineered-spanloss": 12.2,
214 "link-concatenation": [{
217 "SRLG-length": 100000,
219 response = test_utils.add_oms_attr_request(
220 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
221 self.assertEqual(response.status_code, requests.codes.created)
223 def test_10_add_omsAttributes_roadmc_roadma(self):
224 # Config ROADMC-ROADMA oms-attributes
226 "auto-spanloss": "true",
227 "spanloss-base": 11.4,
228 "spanloss-current": 12,
229 "engineered-spanloss": 12.2,
230 "link-concatenation": [{
233 "SRLG-length": 100000,
235 response = test_utils.add_oms_attr_request(
236 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
237 self.assertEqual(response.status_code, requests.codes.created)
239 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
240 def test_11_check_otn_topology(self):
241 response = test_utils.get_otn_topo_request()
242 self.assertEqual(response.status_code, requests.codes.ok)
243 res = response.json()
244 nbNode = len(res['network'][0]['node'])
245 self.assertEqual(nbNode, 4, 'There should be 4 nodes')
246 self.assertNotIn('ietf-network-topology:link', res['network'][0])
248 def test_12_create_OTUC4_service(self):
249 response = test_utils.service_create_request(self.cr_serv_sample_data)
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 self.assertIn('PCE calculation in progress',
253 res['output']['configuration-response-common']['response-message'])
254 time.sleep(self.WAITING)
256 def test_13_get_OTUC4_service1(self):
257 response = test_utils.get_service_list_request(
258 "services/service1-OTUC4")
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
262 res['services'][0]['administrative-state'], 'inService')
264 res['services'][0]['service-name'], 'service1-OTUC4')
266 res['services'][0]['connection-type'], 'infrastructure')
268 res['services'][0]['lifecycle-state'], 'planned')
270 res['services'][0]['service-layer'], 'wdm')
272 res['services'][0]['service-a-end']['service-rate'], 400)
274 res['services'][0]['service-a-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
277 res['services'][0]['service-z-end']['otu-service-rate'], 'org-openroadm-otn-common-types:OTUCn')
280 # Check correct configuration of devices
281 def test_14_check_interface_och_xpdra2(self):
282 response = test_utils.check_netconf_node_request(
283 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
284 self.assertEqual(response.status_code, requests.codes.ok)
285 res = response.json()
286 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
287 'administrative-state': 'inService',
288 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
289 'type': 'org-openroadm-interfaces:otsi',
290 'supporting-port': 'L1'
291 }, **res['interface'][0]),
294 self.assertDictEqual(
295 dict({u'frequency': 196.0812, u'otsi-rate': u'org-openroadm-common-optical-channel-types:R400G-otsi',
296 u'transmit-power': -5, u'modulation-format': 'dp-qam16'},
297 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
298 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
300 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
301 response = test_utils.check_netconf_node_request(
302 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
306 'administrative-state': 'inService',
307 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
308 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
309 'type': 'org-openroadm-interfaces:otsi-group',
310 'supporting-port': 'L1'
312 input_dict_2 = {"group-id": 1,
313 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
315 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
317 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
318 ['org-openroadm-otsi-group-interfaces:otsi-group']),
320 ['org-openroadm-otsi-group-interfaces:otsi-group'])
322 def test_16_check_interface_OTUC4_xpdra2(self):
323 response = test_utils.check_netconf_node_request(
324 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
325 self.assertEqual(response.status_code, requests.codes.ok)
326 res = response.json()
327 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
328 'administrative-state': 'inService',
329 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
330 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
331 'type': 'org-openroadm-interfaces:otnOtu',
332 'supporting-port': 'L1'
334 input_dict_2 = {'tx-sapi': 'LY9PxYJqUbw=',
335 'expected-dapi': 'LY9PxYJqUbw=',
336 'rate': 'org-openroadm-otn-common-types:OTUCn',
337 'degthr-percentage': 100,
341 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
343 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
345 ['org-openroadm-otn-otu-interfaces:otu'])
347 def test_17_check_interface_och_xpdrc2(self):
348 response = test_utils.check_netconf_node_request(
349 "XPDR-C2", "interface/XPDR2-NETWORK1-755:768")
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
353 'administrative-state': 'inService',
354 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
355 'type': 'org-openroadm-interfaces:otsi',
356 'supporting-port': 'L1'
357 }, **res['interface'][0]),
360 self.assertDictEqual(
361 dict({u'frequency': 196.0812, u'otsi-rate': u'org-openroadm-common-optical-channel-types:R400G-otsi',
362 u'transmit-power': -5, u'modulation-format': 'dp-qam16'},
363 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
364 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
366 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
367 response = test_utils.check_netconf_node_request(
368 "XPDR-C2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
369 self.assertEqual(response.status_code, requests.codes.ok)
370 res = response.json()
371 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSI-GROUP',
372 'administrative-state': 'inService',
373 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
374 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
375 'type': 'org-openroadm-interfaces:otsi-group',
376 'supporting-port': 'L1'
378 input_dict_2 = {"group-id": 1,
379 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
381 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
383 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]
384 ['org-openroadm-otsi-group-interfaces:otsi-group']),
386 ['org-openroadm-otsi-group-interfaces:otsi-group'])
388 def test_19_check_interface_OTUC4_xpdrc2(self):
389 response = test_utils.check_netconf_node_request(
390 "XPDR-C2", "interface/XPDR2-NETWORK1-OTUC4")
391 self.assertEqual(response.status_code, requests.codes.ok)
392 res = response.json()
393 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
394 'administrative-state': 'inService',
395 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
396 'supporting-interface-list': 'XPDR2-NETWORK1-OTSI-GROUP',
397 'type': 'org-openroadm-interfaces:otnOtu',
398 'supporting-port': 'L1'
400 input_dict_2 = {'tx-dapi': 'LY9PxYJqUbw=',
401 'expected-sapi': 'LY9PxYJqUbw=',
402 'tx-sapi': 'Nmbu2MNHvc4=',
403 'expected-dapi': 'Nmbu2MNHvc4=',
404 'rate': 'org-openroadm-otn-common-types:OTUCn',
405 'degthr-percentage': 100,
410 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
413 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
415 ['org-openroadm-otn-otu-interfaces:otu'])
417 def test_20_check_no_interface_ODUC4_xpdra2(self):
418 response = test_utils.check_netconf_node_request(
419 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
420 self.assertEqual(response.status_code, requests.codes.conflict)
421 res = response.json()
423 {"error-type": "application", "error-tag": "data-missing",
424 "error-message": "Request could not be completed because the relevant data model content does not exist"},
425 res['errors']['error'])
427 def test_21_check_openroadm_topo_xpdra2(self):
428 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
429 self.assertEqual(response.status_code, requests.codes.ok)
430 res = response.json()
431 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
432 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
433 self.assertEqual({u'frequency': 196.08125,
435 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
438 def test_22_check_otn_topo_OTUC4_links(self):
439 response = test_utils.get_otn_topo_request()
440 self.assertEqual(response.status_code, requests.codes.ok)
441 res = response.json()
442 nb_links = len(res['network'][0]['ietf-network-topology:link'])
443 self.assertEqual(nb_links, 2)
444 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
445 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
446 for link in res['network'][0]['ietf-network-topology:link']:
447 self.assertIn(link['link-id'], listLinkId)
449 link['transportpce-topology:otn-link-type'], 'OTUC4')
451 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
453 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
455 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
457 link['org-openroadm-common-network:opposite-link'], listLinkId)
459 # test service-create for ODU4 service from xpdra2 to xpdrc2
460 def test_23_create_ODUC4_service(self):
461 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODUC4"
462 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
463 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
464 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
465 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
466 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
467 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
469 response = test_utils.service_create_request(self.cr_serv_sample_data)
470 self.assertEqual(response.status_code, requests.codes.ok)
471 res = response.json()
472 self.assertIn('PCE calculation in progress',
473 res['output']['configuration-response-common']['response-message'])
474 time.sleep(self.WAITING)
476 def test_24_get_ODUC4_service1(self):
477 response = test_utils.get_service_list_request(
478 "services/service1-ODUC4")
479 self.assertEqual(response.status_code, requests.codes.ok)
480 res = response.json()
482 res['services'][0]['administrative-state'], 'inService')
484 res['services'][0]['service-name'], 'service1-ODUC4')
486 res['services'][0]['connection-type'], 'infrastructure')
488 res['services'][0]['lifecycle-state'], 'planned')
490 res['services'][0]['service-layer'], 'wdm')
492 res['services'][0]['service-a-end']['service-rate'], 400)
494 res['services'][0]['service-a-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
496 res['services'][0]['service-z-end']['odu-service-rate'], 'org-openroadm-otn-common-types:ODUCn')
499 def test_25_check_interface_ODUC4_xpdra2(self):
500 response = test_utils.check_netconf_node_request(
501 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
505 'administrative-state': 'inService',
506 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
507 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
508 'type': 'org-openroadm-interfaces:otnOdu',
509 'supporting-port': 'L1'}
510 # SAPI/DAPI are added in the Otu4 renderer
511 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
512 'rate': 'org-openroadm-otn-common-types:ODUCn',
513 'expected-dapi': 'Nmbu2MNHvc4=',
514 'expected-sapi': 'Nmbu2MNHvc4=',
515 'tx-dapi': 'Nmbu2MNHvc4=',
516 'tx-sapi': 'LY9PxYJqUbw='}
518 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
520 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
521 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
522 self.assertDictEqual(
523 {u'payload-type': u'22', u'exp-payload-type': u'22'},
524 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
526 def test_26_check_interface_ODUC4_xpdrc2(self):
527 response = test_utils.check_netconf_node_request(
528 "XPDR-C2", "interface/XPDR2-NETWORK1-ODUC4")
529 self.assertEqual(response.status_code, requests.codes.ok)
530 res = response.json()
531 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
532 'administrative-state': 'inService',
533 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
534 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
535 'type': 'org-openroadm-interfaces:otnOdu',
536 'supporting-port': 'L1'}
537 # SAPI/DAPI are added in the Otu4 renderer
538 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
539 'rate': 'org-openroadm-otn-common-types:ODUCn',
540 'tx-sapi': 'Nmbu2MNHvc4=',
541 'tx-dapi': 'LY9PxYJqUbw=',
542 'expected-sapi': 'LY9PxYJqUbw=',
543 'expected-dapi': 'LY9PxYJqUbw='
545 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
547 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
548 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
549 self.assertDictEqual(
550 {u'payload-type': u'22', u'exp-payload-type': u'22'},
551 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
553 def test_27_check_otn_topo_links(self):
554 response = test_utils.get_otn_topo_request()
555 self.assertEqual(response.status_code, requests.codes.ok)
556 res = response.json()
557 nb_links = len(res['network'][0]['ietf-network-topology:link'])
558 self.assertEqual(nb_links, 4)
559 for link in res['network'][0]['ietf-network-topology:link']:
560 linkId = link['link-id']
561 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
562 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
564 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
566 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
567 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
568 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
570 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
572 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
574 link['transportpce-topology:otn-link-type'], 'ODUC4')
576 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
577 self.assertIn(link['org-openroadm-common-network:opposite-link'],
578 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
579 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
581 self.fail("this link should not exist")
583 def test_28_check_otn_topo_tp(self):
584 response = test_utils.get_otn_topo_request()
585 res = response.json()
586 for node in res['network'][0]['node']:
587 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
588 tpList = node['ietf-network-topology:termination-point']
590 if tp['tp-id'] == 'XPDR2-NETWORK1':
591 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
592 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
594 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
595 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
596 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
598 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
599 def test_29_create_100GE_service_1(self):
600 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE"
601 self.cr_serv_sample_data["input"]["connection-type"] = "service"
602 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
603 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
604 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
605 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
606 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
607 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
608 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
609 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
610 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
611 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
612 response = test_utils.service_create_request(self.cr_serv_sample_data)
613 self.assertEqual(response.status_code, requests.codes.ok)
614 res = response.json()
615 self.assertIn('PCE calculation in progress',
616 res['output']['configuration-response-common']['response-message'])
617 time.sleep(self.WAITING)
619 def test_30_get_100GE_service_1(self):
620 response = test_utils.get_service_list_request(
621 "services/service-100GE")
622 self.assertEqual(response.status_code, requests.codes.ok)
623 res = response.json()
625 res['services'][0]['administrative-state'], 'inService')
627 res['services'][0]['service-name'], 'service-100GE')
629 res['services'][0]['connection-type'], 'service')
631 res['services'][0]['lifecycle-state'], 'planned')
634 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
635 response = test_utils.check_netconf_node_request(
636 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
637 self.assertEqual(response.status_code, requests.codes.ok)
638 res = response.json()
639 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
640 'administrative-state': 'inService',
641 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
642 'type': 'org-openroadm-interfaces:ethernetCsmacd',
643 'supporting-port': 'C1'
645 input_dict_2 = {u'speed': 100000}
646 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
648 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
649 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
651 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
652 response = test_utils.check_netconf_node_request(
653 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
656 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
657 'administrative-state': 'inService',
658 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
659 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
660 'type': 'org-openroadm-interfaces:otnOdu',
661 'supporting-port': 'C1'}
663 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
664 'rate': 'org-openroadm-otn-common-types:ODU4',
665 'monitoring-mode': 'terminated'}
667 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
669 self.assertDictEqual(dict(input_dict_2,
670 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
671 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
672 self.assertDictEqual(
673 {u'payload-type': u'07', u'exp-payload-type': u'07'},
674 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
676 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
677 response = test_utils.check_netconf_node_request(
678 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
679 self.assertEqual(response.status_code, requests.codes.ok)
680 res = response.json()
681 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
682 'administrative-state': 'inService',
683 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
684 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
685 'type': 'org-openroadm-interfaces:otnOdu',
686 'supporting-port': 'L1'}
688 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
689 'rate': 'org-openroadm-otn-common-types:ODU4',
690 'monitoring-mode': 'not-terminated'}
691 input_dict_3 = {u'trib-port-number': 1}
693 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
695 self.assertDictEqual(dict(input_dict_2,
696 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
697 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
698 self.assertDictEqual(dict(input_dict_3,
699 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
700 'parent-odu-allocation']),
701 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
702 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
703 ['opucn-trib-slots'])
704 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
705 ['opucn-trib-slots'])
707 def test_34_check_ODU4_connection_xpdra2(self):
708 response = test_utils.check_netconf_node_request(
710 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
711 self.assertEqual(response.status_code, requests.codes.ok)
712 res = response.json()
715 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
716 'direction': 'bidirectional'
719 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
720 res['odu-connection'][0])
721 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4-service-100GE'},
722 res['odu-connection'][0]['destination'])
723 self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4-service-100GE'},
724 res['odu-connection'][0]['source'])
726 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
727 response = test_utils.check_netconf_node_request(
728 "XPDR-C2", "interface/XPDR2-CLIENT1-ETHERNET100G")
729 self.assertEqual(response.status_code, requests.codes.ok)
730 res = response.json()
731 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET100G',
732 'administrative-state': 'inService',
733 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
734 'type': 'org-openroadm-interfaces:ethernetCsmacd',
735 'supporting-port': 'C1'
737 input_dict_2 = {u'speed': 100000}
738 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
740 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
741 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
743 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
744 response = test_utils.check_netconf_node_request(
745 "XPDR-C2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
746 self.assertEqual(response.status_code, requests.codes.ok)
747 res = response.json()
748 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service-100GE',
749 'administrative-state': 'inService',
750 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
751 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET100G',
752 'type': 'org-openroadm-interfaces:otnOdu',
753 'supporting-port': 'C1'}
755 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
756 'rate': 'org-openroadm-otn-common-types:ODU4',
757 'monitoring-mode': 'terminated'}
759 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
761 self.assertDictEqual(dict(input_dict_2,
762 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
763 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
764 self.assertDictEqual(
765 {u'payload-type': u'07', u'exp-payload-type': u'07'},
766 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
768 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
769 response = test_utils.check_netconf_node_request(
770 "XPDR-C2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
771 self.assertEqual(response.status_code, requests.codes.ok)
772 res = response.json()
773 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service-100GE',
774 'administrative-state': 'inService',
775 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
776 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
777 'type': 'org-openroadm-interfaces:otnOdu',
778 'supporting-port': 'C1'}
780 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
781 'rate': 'org-openroadm-otn-common-types:ODU4',
782 'monitoring-mode': 'not-terminated'}
784 input_dict_3 = {'trib-port-number': 1}
786 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
788 self.assertDictEqual(dict(input_dict_2,
789 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
790 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
791 self.assertDictEqual(dict(input_dict_3,
792 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
793 'parent-odu-allocation']),
794 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
795 'parent-odu-allocation'])
798 'org-openroadm-otn-odu-interfaces:odu'][
799 'parent-odu-allocation']['opucn-trib-slots'])
800 self.assertIn('1.20',
802 'org-openroadm-otn-odu-interfaces:odu'][
803 'parent-odu-allocation']['opucn-trib-slots'])
805 def test_38_check_ODU4_connection_xpdrc2(self):
806 response = test_utils.check_netconf_node_request(
808 "odu-connection/XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE")
809 self.assertEqual(response.status_code, requests.codes.ok)
810 res = response.json()
813 'XPDR2-CLIENT1-ODU4-service-100GE-x-XPDR2-NETWORK1-ODU4-service-100GE',
814 'direction': 'bidirectional'
817 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
818 res['odu-connection'][0])
819 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4-service-100GE'},
820 res['odu-connection'][0]['destination'])
821 self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4-service-100GE'},
822 res['odu-connection'][0]['source'])
824 def test_39_check_otn_topo_links(self):
825 response = test_utils.get_otn_topo_request()
826 self.assertEqual(response.status_code, requests.codes.ok)
827 res = response.json()
828 nb_links = len(res['network'][0]['ietf-network-topology:link'])
829 self.assertEqual(nb_links, 4)
830 for link in res['network'][0]['ietf-network-topology:link']:
831 linkId = link['link-id']
832 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
833 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
835 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
837 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
839 def test_40_check_otn_topo_tp(self):
840 response = test_utils.get_otn_topo_request()
841 res = response.json()
842 for node in res['network'][0]['node']:
843 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
844 tpList = node['ietf-network-topology:termination-point']
846 if tp['tp-id'] == 'XPDR2-NETWORK1':
847 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
848 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
849 tsPoolList = list(range(1, 20))
851 tsPoolList, xpdrTpPortConAt['ts-pool'])
853 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
855 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
857 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
858 def test_41_create_100GE_service_2(self):
859 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE2"
860 self.cr_serv_sample_data["input"]["connection-type"] = "service"
861 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
862 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
863 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
864 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
865 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
866 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
867 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
868 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT2"
869 response = test_utils.service_create_request(self.cr_serv_sample_data)
870 self.assertEqual(response.status_code, requests.codes.ok)
871 res = response.json()
872 self.assertIn('PCE calculation in progress',
873 res['output']['configuration-response-common']['response-message'])
874 time.sleep(self.WAITING)
876 def test_42_get_100GE_service_2(self):
877 response = test_utils.get_service_list_request(
878 "services/service-100GE2")
879 self.assertEqual(response.status_code, requests.codes.ok)
880 res = response.json()
882 res['services'][0]['administrative-state'], 'inService')
884 res['services'][0]['service-name'], 'service-100GE2')
886 res['services'][0]['connection-type'], 'service')
888 res['services'][0]['lifecycle-state'], 'planned')
891 def test_43_check_service_list(self):
892 response = test_utils.get_service_list_request("")
893 self.assertEqual(response.status_code, requests.codes.ok)
894 res = response.json()
895 self.assertEqual(len(res['service-list']['services']), 4)
898 def test_44_check_otn_topo_links(self):
899 response = test_utils.get_otn_topo_request()
900 self.assertEqual(response.status_code, requests.codes.ok)
901 res = response.json()
902 nb_links = len(res['network'][0]['ietf-network-topology:link'])
903 self.assertEqual(nb_links, 4)
904 for link in res['network'][0]['ietf-network-topology:link']:
905 linkId = link['link-id']
906 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
907 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
909 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
911 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
913 def test_45_check_otn_topo_tp(self):
914 response = test_utils.get_otn_topo_request()
915 res = response.json()
916 for node in res['network'][0]['node']:
917 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
918 tpList = node['ietf-network-topology:termination-point']
920 if tp['tp-id'] == 'XPDR2-NETWORK1':
921 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
922 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
923 tsPoolList = list(range(1, 40))
925 tsPoolList, xpdrTpPortConAt['ts-pool'])
927 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
929 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
931 def test_46_delete_100GE_service_2(self):
932 response = test_utils.service_delete_request("service-100GE2")
933 self.assertEqual(response.status_code, requests.codes.ok)
934 res = response.json()
935 self.assertIn('Renderer service delete in progress',
936 res['output']['configuration-response-common']['response-message'])
937 time.sleep(self.WAITING)
939 def test_47_delete_100GE_service_1(self):
940 response = test_utils.service_delete_request("service-100GE")
941 self.assertEqual(response.status_code, requests.codes.ok)
942 res = response.json()
943 self.assertIn('Renderer service delete in progress',
944 res['output']['configuration-response-common']['response-message'])
945 time.sleep(self.WAITING)
947 def test_48_check_service_list(self):
948 response = test_utils.get_service_list_request("")
949 self.assertEqual(response.status_code, requests.codes.ok)
950 res = response.json()
951 self.assertEqual(len(res['service-list']['services']), 2)
954 def test_49_check_no_ODU4_connection_xpdra2(self):
955 response = test_utils.check_netconf_node_request("XPDR-A2", "")
956 self.assertEqual(response.status_code, requests.codes.ok)
957 res = response.json()
958 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
961 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
962 response = test_utils.check_netconf_node_request(
963 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service-100GE")
964 self.assertEqual(response.status_code, requests.codes.conflict)
966 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
967 response = test_utils.check_netconf_node_request(
968 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service-100GE")
969 self.assertEqual(response.status_code, requests.codes.conflict)
971 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
972 response = test_utils.check_netconf_node_request(
973 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET100G")
974 self.assertEqual(response.status_code, requests.codes.conflict)
976 def test_53_check_otn_topo_links(self):
977 response = test_utils.get_otn_topo_request()
978 self.assertEqual(response.status_code, requests.codes.ok)
979 res = response.json()
980 nb_links = len(res['network'][0]['ietf-network-topology:link'])
981 self.assertEqual(nb_links, 4)
982 for link in res['network'][0]['ietf-network-topology:link']:
983 linkId = link['link-id']
984 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
985 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
987 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
989 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
991 def test_54_check_otn_topo_tp(self):
992 response = test_utils.get_otn_topo_request()
993 res = response.json()
994 for node in res['network'][0]['node']:
995 if (node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2'):
996 tpList = node['ietf-network-topology:termination-point']
998 if tp['tp-id'] == 'XPDR2-NETWORK1':
999 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1000 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1002 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
1004 def test_55_delete_ODUC4_service(self):
1005 response = test_utils.service_delete_request("service1-ODUC4")
1006 self.assertEqual(response.status_code, requests.codes.ok)
1007 res = response.json()
1008 self.assertIn('Renderer service delete in progress',
1009 res['output']['configuration-response-common']['response-message'])
1010 time.sleep(self.WAITING)
1012 def test_56_check_service_list(self):
1013 response = test_utils.get_service_list_request("")
1014 self.assertEqual(response.status_code, requests.codes.ok)
1015 res = response.json()
1016 self.assertEqual(len(res['service-list']['services']), 1)
1019 def test_57_check_no_interface_ODU4_xpdra2(self):
1020 response = test_utils.check_netconf_node_request(
1021 "XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
1022 self.assertEqual(response.status_code, requests.codes.conflict)
1024 def test_58_check_otn_topo_links(self):
1025 self.test_22_check_otn_topo_OTUC4_links()
1027 def test_59_check_otn_topo_tp(self):
1028 response = test_utils.get_otn_topo_request()
1029 res = response.json()
1030 for node in res['network'][0]['node']:
1031 if node['node-id'] == 'XPDR-A2-XPDR2' or 'XPDR-C2-XPDR2':
1032 tpList = node['ietf-network-topology:termination-point']
1034 if tp['tp-id'] == 'XPDR2-NETWORK1':
1035 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1038 def test_60_delete_OTUC4_service(self):
1039 response = test_utils.service_delete_request("service1-OTUC4")
1040 self.assertEqual(response.status_code, requests.codes.ok)
1041 res = response.json()
1042 self.assertIn('Renderer service delete in progress',
1043 res['output']['configuration-response-common']['response-message'])
1044 time.sleep(self.WAITING)
1046 def test_61_get_no_service(self):
1047 response = test_utils.get_service_list_request("")
1048 self.assertEqual(response.status_code, requests.codes.conflict)
1049 res = response.json()
1051 {"error-type": "application", "error-tag": "data-missing",
1052 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1053 res['errors']['error'])
1056 def test_62_check_no_interface_OTUC4_xpdra2(self):
1057 response = test_utils.check_netconf_node_request(
1058 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
1059 self.assertEqual(response.status_code, requests.codes.conflict)
1061 def test_63_check_no_interface_OCH_xpdra2(self):
1062 response = test_utils.check_netconf_node_request(
1063 "XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
1064 self.assertEqual(response.status_code, requests.codes.conflict)
1066 def test_64_check_no_interface_OTSI_xpdra2(self):
1067 response = test_utils.check_netconf_node_request(
1068 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSI-GROUP")
1069 self.assertEqual(response.status_code, requests.codes.conflict)
1071 def test_65_getLinks_OtnTopology(self):
1072 response = test_utils.get_otn_topo_request()
1073 self.assertEqual(response.status_code, requests.codes.ok)
1074 res = response.json()
1075 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1077 def test_66_check_openroadm_topo_xpdra2(self):
1078 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR2")
1079 self.assertEqual(response.status_code, requests.codes.ok)
1080 res = response.json()
1081 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1082 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1083 self.assertNotIn('wavelength', dict.keys(
1084 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
1087 def test_67_check_openroadm_topology(self):
1088 response = test_utils.get_ordm_topo_request("")
1089 self.assertEqual(response.status_code, requests.codes.ok)
1090 res = response.json()
1091 links = res['network'][0]['ietf-network-topology:link']
1092 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1094 def test_68_connect_xprda2_2_N1_to_roadma_PP2(self):
1095 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A2", "1", "1",
1096 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1097 self.assertEqual(response.status_code, requests.codes.ok)
1098 res = response.json()
1099 self.assertIn('Xponder Roadm Link created successfully',
1100 res["output"]["result"])
1103 def test_69_connect_roadma_PP2_to_xpdra2_2_N1(self):
1104 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A2", "1", "1",
1105 "ROADM-A1", "1", "SRG1-PP1-TXRX")
1106 self.assertEqual(response.status_code, requests.codes.ok)
1107 res = response.json()
1108 self.assertIn('Roadm Xponder links created successfully',
1109 res["output"]["result"])
1112 def test_70_connect_xprdc2_2_N1_to_roadmc_PP2(self):
1113 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C2", "1", "1",
1114 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1115 self.assertEqual(response.status_code, requests.codes.ok)
1116 res = response.json()
1117 self.assertIn('Xponder Roadm Link created successfully',
1118 res["output"]["result"])
1121 def test_71_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
1122 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C2", "1", "1",
1123 "ROADM-C1", "1", "SRG1-PP1-TXRX")
1124 self.assertEqual(response.status_code, requests.codes.ok)
1125 res = response.json()
1126 self.assertIn('Roadm Xponder links created successfully',
1127 res["output"]["result"])
1130 # test service-create for 400GE service from xpdra2 to xpdrc2
1131 def test_72_create_400GE_service(self):
1132 self.cr_serv_sample_data["input"]["service-name"] = "service-400GE"
1133 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "400"
1134 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1135 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1136 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "400"
1137 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1138 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1139 response = test_utils.service_create_request(self.cr_serv_sample_data)
1140 self.assertEqual(response.status_code, requests.codes.ok)
1141 res = response.json()
1142 self.assertIn('PCE calculation in progress',
1143 res['output']['configuration-response-common']['response-message'])
1144 time.sleep(self.WAITING)
1146 def test_73_get_400GE_service(self):
1147 response = test_utils.get_service_list_request(
1148 "services/service-400GE")
1149 self.assertEqual(response.status_code, requests.codes.ok)
1150 res = response.json()
1152 res['services'][0]['administrative-state'], 'inService')
1154 res['services'][0]['service-name'], 'service-400GE')
1156 res['services'][0]['connection-type'], 'service')
1158 res['services'][0]['lifecycle-state'], 'planned')
1161 def test_74_check_xc1_roadma(self):
1162 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1163 self.assertEqual(response.status_code, requests.codes.ok)
1164 res = response.json()
1165 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1166 self.assertDictEqual(
1168 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1169 'opticalControlMode': 'gainLoss',
1170 'target-output-power': -3.0
1171 }, **res['roadm-connections'][0]),
1172 res['roadm-connections'][0]
1174 self.assertDictEqual(
1175 {'src-if': 'SRG1-PP1-TXRX-nmc-755:768'},
1176 res['roadm-connections'][0]['source'])
1177 self.assertDictEqual(
1178 {'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'},
1179 res['roadm-connections'][0]['destination'])
1182 def test_75_check_topo_xpdra2(self):
1183 response = test_utils.get_ordm_topo_request("node/XPDR-A2-XPDR1")
1184 self.assertEqual(response.status_code, requests.codes.ok)
1185 res = response.json()
1186 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1187 for ele in liste_tp:
1188 if ele['tp-id'] == 'XPDR1-NETWORK1':
1189 self.assertEqual({u'frequency': 196.08125,
1191 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
1192 if ele['tp-id'] == 'XPDR1-CLIENT1':
1193 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1196 def test_76_check_topo_roadma_SRG1(self):
1197 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
1198 self.assertEqual(response.status_code, requests.codes.ok)
1199 res = response.json()
1200 freq_map = base64.b64decode(
1201 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1202 freq_map_array = [int(x) for x in freq_map]
1203 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1204 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1205 for ele in liste_tp:
1206 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1207 freq_map = base64.b64decode(
1208 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1209 freq_map_array = [int(x) for x in freq_map]
1210 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1211 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1212 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1215 def test_77_check_topo_roadma_DEG1(self):
1216 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1217 self.assertEqual(response.status_code, requests.codes.ok)
1218 res = response.json()
1219 freq_map = base64.b64decode(
1220 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1221 freq_map_array = [int(x) for x in freq_map]
1222 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1223 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1224 for ele in liste_tp:
1225 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1226 freq_map = base64.b64decode(
1227 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1228 freq_map_array = [int(x) for x in freq_map]
1229 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1230 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1231 freq_map = base64.b64decode(
1232 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1233 freq_map_array = [int(x) for x in freq_map]
1234 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1238 def test_78_check_interface_100GE_CLIENT_xpdra2(self):
1239 response = test_utils.check_netconf_node_request(
1240 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1241 self.assertEqual(response.status_code, requests.codes.ok)
1242 res = response.json()
1243 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1244 'administrative-state': 'inService',
1245 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1246 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1247 'supporting-port': 'C1'
1249 input_dict_2 = {u'speed': 400000}
1250 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1251 res['interface'][0])
1252 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1253 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1255 def test_79_check_interface_OTSI_xpdra2(self):
1256 response = test_utils.check_netconf_node_request(
1257 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1258 self.assertEqual(response.status_code, requests.codes.ok)
1259 res = response.json()
1260 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1261 'administrative-state': 'inService',
1262 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1263 'type': 'org-openroadm-interfaces:otsi',
1264 'supporting-port': 'L1'}
1266 "frequency": 196.0812,
1267 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1268 "fec": "org-openroadm-common-types:ofec",
1269 "transmit-power": -5,
1270 "provision-mode": "explicit",
1271 "modulation-format": "dp-qam16"}
1273 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1274 res['interface'][0])
1275 self.assertDictEqual(dict(input_dict_2,
1276 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1277 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1278 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1279 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1281 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1282 response = test_utils.check_netconf_node_request(
1283 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1284 self.assertEqual(response.status_code, requests.codes.ok)
1285 res = response.json()
1286 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
1287 'administrative-state': 'inService',
1288 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1289 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1290 'type': 'org-openroadm-interfaces:otsi-group',
1291 'supporting-port': 'L1'}
1292 input_dict_2 = {"group-id": 1,
1293 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1295 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1296 res['interface'][0])
1297 self.assertDictEqual(dict(input_dict_2,
1298 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1299 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1301 def test_81_check_interface_OTUC4_xpdra2(self):
1302 response = test_utils.check_netconf_node_request(
1303 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1304 self.assertEqual(response.status_code, requests.codes.ok)
1305 res = response.json()
1306 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1307 'administrative-state': 'inService',
1308 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1309 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1310 'type': 'org-openroadm-interfaces:otnOtu',
1311 'supporting-port': 'L1'}
1312 input_dict_2 = {"tx-sapi": "AIGiVAQ4gDil", "rate": "org-openroadm-otn-common-types:OTUCn",
1313 "degthr-percentage": 100,
1314 "tim-detect-mode": "Disabled",
1316 "degm-intervals": 2,
1317 "expected-dapi": "AIGiVAQ4gDil"}
1319 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1320 res['interface'][0])
1321 self.assertDictEqual(dict(input_dict_2,
1322 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1323 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1325 def test_82_check_interface_ODUC4_xpdra2(self):
1326 response = test_utils.check_netconf_node_request(
1327 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1328 self.assertEqual(response.status_code, requests.codes.ok)
1329 res = response.json()
1330 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1331 'administrative-state': 'inService',
1332 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1333 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSI-GROUP',
1334 'type': 'org-openroadm-interfaces:otnOdu',
1335 'supporting-port': 'L1'}
1336 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1337 "tim-detect-mode": "Disabled",
1338 "degm-intervals": 2,
1339 "degthr-percentage": 100,
1340 "monitoring-mode": "terminated",
1341 "rate": "org-openroadm-otn-common-types:ODUCn",
1343 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1345 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1346 res['interface'][0])
1347 self.assertDictEqual(dict(input_dict_2,
1348 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1349 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1350 self.assertDictEqual(dict(input_dict_3,
1351 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1352 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1354 def test_83_delete_400GE_service(self):
1355 response = test_utils.service_delete_request("service-400GE")
1356 self.assertEqual(response.status_code, requests.codes.ok)
1357 res = response.json()
1358 self.assertIn('Renderer service delete in progress',
1359 res['output']['configuration-response-common']['response-message'])
1360 time.sleep(self.WAITING)
1362 def test_84_get_no_service(self):
1363 response = test_utils.get_service_list_request("")
1364 self.assertEqual(response.status_code, requests.codes.conflict)
1365 res = response.json()
1367 {"error-type": "application", "error-tag": "data-missing",
1368 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1369 res['errors']['error'])
1372 def test_85_check_no_interface_ODUC4_xpdra2(self):
1373 response = test_utils.check_netconf_node_request(
1374 "XPDR-A2", "interface/XPDR1-NETWORK1-ODUC4")
1375 self.assertEqual(response.status_code, requests.codes.conflict)
1377 def test_86_check_no_interface_OTUC4_xpdra2(self):
1378 response = test_utils.check_netconf_node_request(
1379 "XPDR-A2", "interface/XPDR1-NETWORK1-OTUC4")
1380 self.assertEqual(response.status_code, requests.codes.conflict)
1382 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1383 response = test_utils.check_netconf_node_request(
1384 "XPDR-A2", "interface/XPDR1-NETWORK1-OTSI-GROUP")
1385 self.assertEqual(response.status_code, requests.codes.conflict)
1387 def test_88_check_no_interface_OTSI_xpdra2(self):
1388 response = test_utils.check_netconf_node_request(
1389 "XPDR-A2", "interface/XPDR1-NETWORK1-755:768")
1390 self.assertEqual(response.status_code, requests.codes.conflict)
1392 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1393 response = test_utils.check_netconf_node_request(
1394 "XPDR-A2", "interface/XPDR1-CLIENT1-ETHERNET")
1395 self.assertEqual(response.status_code, requests.codes.conflict)
1397 def test_90_disconnect_xponders_from_roadm(self):
1398 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1399 response = test_utils.get_ordm_topo_request("")
1400 self.assertEqual(response.status_code, requests.codes.ok)
1401 res = response.json()
1402 links = res['network'][0]['ietf-network-topology:link']
1404 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1405 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1406 link_name = link["link-id"]
1407 response = test_utils.delete_request(url+link_name)
1408 self.assertEqual(response.status_code, requests.codes.ok)
1410 def test_91_disconnect_xpdra2(self):
1411 response = test_utils.unmount_device("XPDR-A2")
1412 self.assertEqual(response.status_code, requests.codes.ok,
1413 test_utils.CODE_SHOULD_BE_200)
1415 def test_92_disconnect_xpdrc2(self):
1416 response = test_utils.unmount_device("XPDR-C2")
1417 self.assertEqual(response.status_code, requests.codes.ok,
1418 test_utils.CODE_SHOULD_BE_200)
1420 def test_93_disconnect_roadmA(self):
1421 response = test_utils.unmount_device("ROADM-A1")
1422 self.assertEqual(response.status_code, requests.codes.ok,
1423 test_utils.CODE_SHOULD_BE_200)
1425 def test_94_disconnect_roadmC(self):
1426 response = test_utils.unmount_device("ROADM-C1")
1427 self.assertEqual(response.status_code, requests.codes.ok,
1428 test_utils.CODE_SHOULD_BE_200)
1431 if __name__ == "__main__":
1432 unittest.main(verbosity=2)