Merge "Refactor ConvertORTopoToTapiTopoTest"
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test14_otn_switch_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_input_data = {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service-OCH-OTU4-AB",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "tx-direction": [{
50                 "port": {
51                     "port-device-name": "SPDR-SA1-XPDR2",
52                     "port-type": "fixed",
53                     "port-name": "XPDR2-NETWORK1",
54                     "port-rack": "000000.00",
55                     "port-shelf": "Chassis#1"
56                 },
57                 "lgx": {
58                     "lgx-device-name": "Some lgx-device-name",
59                     "lgx-port-name": "Some lgx-port-name",
60                     "lgx-port-rack": "000000.00",
61                     "lgx-port-shelf": "00"
62                 },
63                 "index": 0
64             }],
65             "rx-direction": [{
66                 "port": {
67                     "port-device-name": "SPDR-SA1-XPDR2",
68                     "port-type": "fixed",
69                     "port-name": "XPDR2-NETWORK1",
70                     "port-rack": "000000.00",
71                     "port-shelf": "Chassis#1"
72                 },
73                 "lgx": {
74                     "lgx-device-name": "Some lgx-device-name",
75                     "lgx-port-name": "Some lgx-port-name",
76                     "lgx-port-rack": "000000.00",
77                     "lgx-port-shelf": "00"
78                 },
79                 "index": 0
80             }],
81             "optic-type": "gray"
82         },
83         "service-z-end": {
84             "service-rate": "100",
85             "node-id": "SPDR-SB1",
86             "service-format": "OTU",
87             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
88             "clli": "NodeSB",
89             "tx-direction": [{
90                 "port": {
91                     "port-device-name": "SPDR-SB1-XPDR2",
92                     "port-type": "fixed",
93                     "port-name": "XPDR2-NETWORK1",
94                     "port-rack": "000000.00",
95                     "port-shelf": "Chassis#1"
96                 },
97                 "lgx": {
98                     "lgx-device-name": "Some lgx-device-name",
99                     "lgx-port-name": "Some lgx-port-name",
100                     "lgx-port-rack": "000000.00",
101                     "lgx-port-shelf": "00"
102                 },
103                 "index": 0
104             }],
105             "rx-direction": [{
106                 "port": {
107                     "port-device-name": "SPDR-SB1-XPDR2",
108                     "port-type": "fixed",
109                     "port-name": "XPDR2-NETWORK1",
110                     "port-rack": "000000.00",
111                     "port-shelf": "Chassis#1"
112                 },
113                 "lgx": {
114                     "lgx-device-name": "Some lgx-device-name",
115                     "lgx-port-name": "Some lgx-port-name",
116                     "lgx-port-rack": "000000.00",
117                     "lgx-port-shelf": "00"
118                 },
119                 "index": 0
120             }],
121             "optic-type": "gray"
122         },
123         "due-date": "2018-06-15T00:00:01Z",
124         "operator-contact": "pw1234"
125     }
126
127     del_serv_input_data = {
128         "sdnc-request-header": {
129             "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
130             "rpc-action": "service-delete",
131             "request-system-id": "appname",
132             "notification-url": "http://localhost:8585/NotificationServer/notify"},
133         "service-delete-req-info": {
134             "service-name": "TBD",
135             "tail-retention": "no"}
136     }
137
138     @classmethod
139     def setUpClass(cls):
140         cls.processes = test_utils.start_tpce()
141         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
142                                                ('spdrb', cls.NODE_VERSION),
143                                                ('spdrc', cls.NODE_VERSION),
144                                                ('roadma', cls.NODE_VERSION),
145                                                ('roadmb', cls.NODE_VERSION),
146                                                ('roadmc', cls.NODE_VERSION)])
147
148     @classmethod
149     def tearDownClass(cls):
150         # pylint: disable=not-an-iterable
151         for process in cls.processes:
152             test_utils.shutdown_process(process)
153         print("all processes killed")
154
155     def setUp(self):
156         time.sleep(2)
157
158     def test_001_connect_spdrA(self):
159         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
160         self.assertEqual(response.status_code,
161                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162
163     def test_002_connect_spdrB(self):
164         response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
165         self.assertEqual(response.status_code,
166                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167
168     def test_003_connect_spdrC(self):
169         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
170         self.assertEqual(response.status_code,
171                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
172
173     def test_004_connect_rdmA(self):
174         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
175         self.assertEqual(response.status_code,
176                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
177
178     def test_005_connect_rdmB(self):
179         response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
180         self.assertEqual(response.status_code,
181                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
182
183     def test_006_connect_rdmC(self):
184         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
185         self.assertEqual(response.status_code,
186                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
187
188     def test_007_connect_sprdA_2_N1_to_roadmA_PP3(self):
189         response = test_utils.transportpce_api_rpc_request(
190             'transportpce-networkutils', 'init-xpdr-rdm-links',
191             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '1',
192                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
193         self.assertEqual(response['status_code'], requests.codes.ok)
194         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
195         time.sleep(2)
196
197     def test_008_connect_roadmA_PP3_to_spdrA_2_N1(self):
198         response = test_utils.transportpce_api_rpc_request(
199             'transportpce-networkutils', 'init-rdm-xpdr-links',
200             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '1',
201                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
202         self.assertEqual(response['status_code'], requests.codes.ok)
203         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
204         time.sleep(2)
205
206     def test_009_connect_sprdC_2_N1_to_roadmC_PP3(self):
207         response = test_utils.transportpce_api_rpc_request(
208             'transportpce-networkutils', 'init-xpdr-rdm-links',
209             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '2', 'network-num': '1',
210                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
211         self.assertEqual(response['status_code'], requests.codes.ok)
212         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
213         time.sleep(2)
214
215     def test_010_connect_roadmC_PP3_to_spdrC_2_N1(self):
216         response = test_utils.transportpce_api_rpc_request(
217             'transportpce-networkutils', 'init-rdm-xpdr-links',
218             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '2', 'network-num': '1',
219                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
220         self.assertEqual(response['status_code'], requests.codes.ok)
221         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
222         time.sleep(2)
223
224     def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
225         response = test_utils.transportpce_api_rpc_request(
226             'transportpce-networkutils', 'init-xpdr-rdm-links',
227             {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '1',
228                              'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
229         self.assertEqual(response['status_code'], requests.codes.ok)
230         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
231         time.sleep(2)
232
233     def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
234         response = test_utils.transportpce_api_rpc_request(
235             'transportpce-networkutils', 'init-rdm-xpdr-links',
236             {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '1',
237                              'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
238         self.assertEqual(response['status_code'], requests.codes.ok)
239         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
240         time.sleep(2)
241
242     def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
243         response = test_utils.transportpce_api_rpc_request(
244             'transportpce-networkutils', 'init-xpdr-rdm-links',
245             {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '2',
246                              'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
247         self.assertEqual(response['status_code'], requests.codes.ok)
248         self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
249         time.sleep(2)
250
251     def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
252         response = test_utils.transportpce_api_rpc_request(
253             'transportpce-networkutils', 'init-rdm-xpdr-links',
254             {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '2',
255                              'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
256         self.assertEqual(response['status_code'], requests.codes.ok)
257         self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
258         time.sleep(2)
259
260     def test_015_add_omsAttributes_ROADMA_ROADMB(self):
261         # Config ROADMA-ROADMB oms-attributes
262         data = {"span": {
263             "auto-spanloss": "true",
264             "spanloss-base": 11.4,
265             "spanloss-current": 12,
266             "engineered-spanloss": 12.2,
267             "link-concatenation": [{
268                 "SRLG-Id": 0,
269                 "fiber-type": "smf",
270                 "SRLG-length": 100000,
271                 "pmd": 0.5}]}}
272         response = test_utils.add_oms_attr_request("ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX",
273                                                    data)
274         self.assertEqual(response.status_code, requests.codes.created)
275
276     def test_016_add_omsAttributes_ROADMB_ROADMA(self):
277         # Config ROADMB-ROADMA oms-attributes
278         data = {"span": {
279             "auto-spanloss": "true",
280             "spanloss-base": 11.4,
281             "spanloss-current": 12,
282             "engineered-spanloss": 12.2,
283             "link-concatenation": [{
284                 "SRLG-Id": 0,
285                 "fiber-type": "smf",
286                 "SRLG-length": 100000,
287                 "pmd": 0.5}]}}
288         response = test_utils.add_oms_attr_request("ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX",
289                                                    data)
290         self.assertEqual(response.status_code, requests.codes.created)
291
292     def test_017_add_omsAttributes_ROADMB_ROADMC(self):
293         # Config ROADMB-ROADMC oms-attributes
294         data = {"span": {
295             "auto-spanloss": "true",
296             "spanloss-base": 11.4,
297             "spanloss-current": 12,
298             "engineered-spanloss": 12.2,
299             "link-concatenation": [{
300                 "SRLG-Id": 0,
301                 "fiber-type": "smf",
302                 "SRLG-length": 100000,
303                 "pmd": 0.5}]}}
304         response = test_utils.add_oms_attr_request("ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX",
305                                                    data)
306         self.assertEqual(response.status_code, requests.codes.created)
307
308     def test_018_add_omsAttributes_ROADMC_ROADMB(self):
309         # Config ROADMC-ROADMB oms-attributes
310         data = {"span": {
311             "auto-spanloss": "true",
312             "spanloss-base": 11.4,
313             "spanloss-current": 12,
314             "engineered-spanloss": 12.2,
315             "link-concatenation": [{
316                 "SRLG-Id": 0,
317                 "fiber-type": "smf",
318                 "SRLG-length": 100000,
319                 "pmd": 0.5}]}}
320         response = test_utils.add_oms_attr_request("ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX",
321                                                    data)
322         self.assertEqual(response.status_code, requests.codes.created)
323
324     def test_019_create_OTS_ROADMA_DEG1(self):
325         response = test_utils.transportpce_api_rpc_request(
326             'transportpce-device-renderer', 'create-ots-oms',
327             {
328                 'node-id': 'ROADM-A1',
329                 'logical-connection-point': 'DEG1-TTP-TXRX'
330             })
331         # time.sleep(10)
332         self.assertEqual(response['status_code'], requests.codes.ok)
333         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
334                       response["output"]["result"])
335
336     def test_020_create_OTS_ROADMB_DEG1(self):
337         response = test_utils.transportpce_api_rpc_request(
338             'transportpce-device-renderer', 'create-ots-oms',
339             {
340                 'node-id': 'ROADM-B1',
341                 'logical-connection-point': 'DEG1-TTP-TXRX'
342             })
343         # time.sleep(10)
344         self.assertEqual(response['status_code'], requests.codes.ok)
345         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
346                       response["output"]["result"])
347
348     def test_021_create_OTS_ROADMB_DEG2(self):
349         response = test_utils.transportpce_api_rpc_request(
350             'transportpce-device-renderer', 'create-ots-oms',
351             {
352                 'node-id': 'ROADM-B1',
353                 'logical-connection-point': 'DEG2-TTP-TXRX'
354             })
355         # time.sleep(10)
356         self.assertEqual(response['status_code'], requests.codes.ok)
357         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
358                       response["output"]["result"])
359
360     def test_022_create_OTS_ROADMC_DEG2(self):
361         response = test_utils.transportpce_api_rpc_request(
362             'transportpce-device-renderer', 'create-ots-oms',
363             {
364                 'node-id': 'ROADM-C1',
365                 'logical-connection-point': 'DEG2-TTP-TXRX'
366             })
367         # time.sleep(10)
368         self.assertEqual(response['status_code'], requests.codes.ok)
369         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
370                       response["output"]["result"])
371
372     def test_023_calculate_span_loss_base_all(self):
373         response = test_utils.transportpce_api_rpc_request(
374             'transportpce-olm', 'calculate-spanloss-base',
375             {
376                 'src-type': 'all'
377             })
378         self.assertEqual(response['status_code'], requests.codes.ok)
379         self.assertIn('Success', response["output"]["result"])
380         self.assertIn({
381             "spanloss": "25.7",
382             "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
383         }, response["output"]["spans"])
384         self.assertIn({
385             "spanloss": "17.6",
386             "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
387         }, response["output"]["spans"])
388         self.assertIn({
389             "spanloss": "23.6",
390             "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
391         }, response["output"]["spans"])
392         self.assertIn({
393             "spanloss": "23.6",
394             "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
395         }, response["output"]["spans"])
396         self.assertIn({
397             "spanloss": "25.7",
398             "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
399         }, response["output"]["spans"])
400         self.assertIn({
401             "spanloss": "17.6",
402             "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
403         }, response["output"]["spans"])
404         time.sleep(5)
405
406     def test_024_check_otn_topology(self):
407         response = test_utils.get_ietf_network_request('otn-topology', 'config')
408         self.assertEqual(response['status_code'], requests.codes.ok)
409         self.assertEqual(len(response['network'][0]['node']), 9, 'There should be 9 nodes')
410         self.assertNotIn('ietf-network-topology:link', response['network'][0],
411                          'otn-topology should have no link')
412
413 # test service-create for OCH-OTU4 service from spdrA to spdrB
414     def test_025_create_OCH_OTU4_service_AB(self):
415         response = test_utils.transportpce_api_rpc_request(
416             'org-openroadm-service', 'service-create',
417             self.cr_serv_input_data)
418         self.assertEqual(response['status_code'], requests.codes.ok)
419         self.assertIn('PCE calculation in progress',
420                       response['output']['configuration-response-common']['response-message'])
421         time.sleep(self.WAITING)
422
423     def test_026_get_OCH_OTU4_service_AB(self):
424         response = test_utils.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AB")
425         self.assertEqual(response['status_code'], requests.codes.ok)
426         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
427         self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-AB')
428         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
429         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
430         time.sleep(1)
431
432 # Check correct configuration of devices
433     def test_027_check_interface_och_spdra(self):
434         response = test_utils.check_node_attribute_request(
435             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-761:768')
436         self.assertEqual(response['status_code'], requests.codes.ok)
437         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
438                                    'administrative-state': 'inService',
439                                    'supporting-circuit-pack-name': 'CP5-CFP',
440                                    'type': 'org-openroadm-interfaces:opticalChannel',
441                                    'supporting-port': 'CP5-CFP-P1'
442                                    }, **response['interface'][0]),
443                              response['interface'][0])
444
445         self.assertEqual('org-openroadm-common-types:R100G',
446                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
447         self.assertEqual('dp-qpsk',
448                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
449         self.assertEqual(196.1,
450                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
451         self.assertEqual(
452             -5,
453             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
454
455     def test_028_check_interface_OTU4_spdra(self):
456         response = test_utils.check_node_attribute_request(
457             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
458         self.assertEqual(response['status_code'], requests.codes.ok)
459         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
460                         'administrative-state': 'inService',
461                         'supporting-circuit-pack-name': 'CP5-CFP',
462                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
463                         'type': 'org-openroadm-interfaces:otnOtu',
464                         'supporting-port': 'CP5-CFP-P1'
465                         }
466         input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
467                         'expected-dapi': 'AOQxIv+6nCD+',
468                         'tx-dapi': 'X+8cRNi+HbE=',
469                         'expected-sapi': 'X+8cRNi+HbE=',
470                         'rate': 'org-openroadm-otn-common-types:OTU4',
471                         'fec': 'scfec'
472                         }
473         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
474                              response['interface'][0])
475         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
476                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
477
478         response2 = test_utils.check_node_attribute2_request(
479             'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
480         self.assertEqual(response2['status_code'], requests.codes.ok)
481         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
482         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
483         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
484         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
485
486     def test_029_check_interface_och_spdrB(self):
487         response = test_utils.check_node_attribute_request(
488             'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-761:768')
489         self.assertEqual(response['status_code'], requests.codes.ok)
490         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
491                                    'administrative-state': 'inService',
492                                    'supporting-circuit-pack-name': 'CP5-CFP',
493                                    'type': 'org-openroadm-interfaces:opticalChannel',
494                                    'supporting-port': 'CP5-CFP-P1'
495                                    }, **response['interface'][0]),
496                              response['interface'][0])
497
498         self.assertEqual('org-openroadm-common-types:R100G',
499                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
500         self.assertEqual('dp-qpsk',
501                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
502         self.assertEqual(196.1,
503                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
504         self.assertEqual(
505             -5,
506             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
507
508     def test_030_check_interface_OTU4_spdrB(self):
509         response = test_utils.check_node_attribute_request(
510             'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-OTU')
511         self.assertEqual(response['status_code'], requests.codes.ok)
512         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
513                         'administrative-state': 'inService',
514                         'supporting-circuit-pack-name': 'CP5-CFP',
515                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
516                         'type': 'org-openroadm-interfaces:otnOtu',
517                         'supporting-port': 'CP5-CFP-P1'
518                         }
519         input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
520                         'expected-sapi': 'AOQxIv+6nCD+',
521                         'tx-sapi': 'X+8cRNi+HbE=',
522                         'expected-dapi': 'X+8cRNi+HbE=',
523                         'rate': 'org-openroadm-otn-common-types:OTU4',
524                         'fec': 'scfec'
525                         }
526         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
527                              response['interface'][0])
528         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
529                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
530
531         response2 = test_utils.check_node_attribute2_request(
532             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
533         self.assertEqual(response2['status_code'], requests.codes.ok)
534         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
535         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
536         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
537         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
538
539     def test_031_check_no_interface_ODU4_spdra(self):
540         response = test_utils.check_node_attribute_request(
541             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
542         self.assertEqual(response['status_code'], requests.codes.conflict)
543         self.assertIn(response['interface'], (
544             {
545                 "error-type": "protocol",
546                 "error-tag": "data-missing",
547                 "error-message":
548                     "Request could not be completed because the relevant data "
549                     "model content does not exist"
550             }, {
551                 "error-type": "application",
552                 "error-tag": "data-missing",
553                 "error-message":
554                     "Request could not be completed because the relevant data "
555                     "model content does not exist"
556             }))
557
558     def test_032_check_openroadm_topo_spdra(self):
559         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
560         self.assertEqual(response['status_code'], requests.codes.ok)
561         ele = response['node']['ietf-network-topology:termination-point'][0]
562         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
563         self.assertEqual(
564             196.1,
565             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
566         self.assertEqual(
567             40,
568             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
569         self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
570                          ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
571         time.sleep(1)
572
573     def test_033_check_openroadm_topo_ROADMA_SRG(self):
574         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
575         self.assertEqual(response['status_code'], requests.codes.ok)
576         freq_map = base64.b64decode(
577             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
578         freq_map_array = [int(x) for x in freq_map]
579         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
580         liste_tp = response['node']['ietf-network-topology:termination-point']
581         for ele in liste_tp:
582             if ele['tp-id'] == 'SRG1-PP3-TXRX':
583                 freq_map = base64.b64decode(
584                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
585                 freq_map_array = [int(x) for x in freq_map]
586                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
587             if ele['tp-id'] == 'SRG1-PP2-TXRX':
588                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
589         time.sleep(1)
590
591     def test_034_check_openroadm_topo_ROADMA_DEG1(self):
592         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG1', 'config')
593         self.assertEqual(response['status_code'], requests.codes.ok)
594         freq_map = base64.b64decode(
595             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
596         freq_map_array = [int(x) for x in freq_map]
597         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
598         liste_tp = response['node']['ietf-network-topology:termination-point']
599         for ele in liste_tp:
600             if ele['tp-id'] == 'DEG1-CTP-TXRX':
601                 freq_map = base64.b64decode(
602                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
603                 freq_map_array = [int(x) for x in freq_map]
604                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
605             if ele['tp-id'] == 'DEG1-TTP-TXRX':
606                 freq_map = base64.b64decode(
607                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
608                 freq_map_array = [int(x) for x in freq_map]
609                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
610         time.sleep(1)
611
612     def test_035_check_otn_topo_otu4_links(self):
613         response = test_utils.get_ietf_network_request('otn-topology', 'config')
614         self.assertEqual(response['status_code'], requests.codes.ok)
615         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
616         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
617                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
618         for link in response['network'][0]['ietf-network-topology:link']:
619             self.assertIn(link['link-id'], listLinkId)
620             self.assertEqual(
621                 link['transportpce-networkutils:otn-link-type'], 'OTU4')
622             self.assertEqual(
623                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
624             self.assertEqual(
625                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
626             self.assertEqual(
627                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
628             self.assertIn(
629                 link['org-openroadm-common-network:opposite-link'], listLinkId)
630
631
632 # test service-create for OCH-OTU4 service from spdrB to spdrC
633
634     def test_036_create_OCH_OTU4_service_BC(self):
635         self.cr_serv_input_data["service-name"] = "service-OCH-OTU4-BC"
636         self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SB1"
637         self.cr_serv_input_data["service-a-end"]["clli"] = "NodeSB"
638         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
639         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
640         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
641         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
642         self.cr_serv_input_data["service-z-end"]["node-id"] = "SPDR-SC1"
643         self.cr_serv_input_data["service-z-end"]["clli"] = "NodeSC"
644         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
645         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
646         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
647         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
648         response = test_utils.transportpce_api_rpc_request(
649             'org-openroadm-service', 'service-create',
650             self.cr_serv_input_data)
651         self.assertEqual(response['status_code'], requests.codes.ok)
652         self.assertIn('PCE calculation in progress',
653                       response['output']['configuration-response-common']['response-message'])
654         time.sleep(self.WAITING)
655
656     def test_037_get_OCH_OTU4_service_BC(self):
657         response = test_utils.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-BC")
658         self.assertEqual(response['status_code'], requests.codes.ok)
659         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
660         self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-BC')
661         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
662         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
663         time.sleep(1)
664
665 # Check correct configuration of devices
666     def test_038_check_interface_och_spdrB(self):
667         response = test_utils.check_node_attribute_request(
668             'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-753:760')
669         self.assertEqual(response['status_code'], requests.codes.ok)
670         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
671                                    'administrative-state': 'inService',
672                                    'supporting-circuit-pack-name': 'CP6-CFP',
673                                    'type': 'org-openroadm-interfaces:opticalChannel',
674                                    'supporting-port': 'CP1-CFP0-P1'
675                                    }, **response['interface'][0]),
676                              response['interface'][0])
677
678         self.assertEqual('org-openroadm-common-types:R100G',
679                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
680         self.assertEqual('dp-qpsk',
681                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
682         self.assertEqual(196.05,
683                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
684         self.assertEqual(
685             -5,
686             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
687
688     def test_039_check_interface_OTU4_spdrB(self):
689         response = test_utils.check_node_attribute_request(
690             'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-OTU')
691         self.assertEqual(response['status_code'], requests.codes.ok)
692         input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
693                         'administrative-state': 'inService',
694                         'supporting-circuit-pack-name': 'CP6-CFP',
695                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
696                         'type': 'org-openroadm-interfaces:otnOtu',
697                         'supporting-port': 'CP6-CFP-P1'
698                         }
699         input_dict_2 = {'tx-sapi': 'X+8cRNi+HbI=',
700                         'expected-dapi': 'X+8cRNi+HbI=',
701                         'tx-dapi': 'ALvne1QI5jo4',
702                         'expected-sapi': 'ALvne1QI5jo4',
703                         'rate': 'org-openroadm-otn-common-types:OTU4',
704                         'fec': 'scfec'
705                         }
706         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
707                              response['interface'][0])
708         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
709                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
710
711         response2 = test_utils.check_node_attribute2_request(
712             'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
713         self.assertEqual(response2['status_code'], requests.codes.ok)
714         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
715         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
716         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
717         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
718
719     def test_040_check_interface_och_spdrC(self):
720         response = test_utils.check_node_attribute_request(
721             'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-753:760')
722         self.assertEqual(response['status_code'], requests.codes.ok)
723         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
724                                    'administrative-state': 'inService',
725                                    'supporting-circuit-pack-name': 'CP5-CFP',
726                                    'type': 'org-openroadm-interfaces:opticalChannel',
727                                    'supporting-port': 'CP5-CFP-P1'
728                                    }, **response['interface'][0]),
729                              response['interface'][0])
730
731         self.assertEqual('org-openroadm-common-types:R100G',
732                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
733         self.assertEqual('dp-qpsk',
734                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
735         self.assertEqual(196.05,
736                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
737         self.assertEqual(
738             -5,
739             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
740
741     def test_041_check_interface_OTU4_spdrC(self):
742         response = test_utils.check_node_attribute_request(
743             'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU')
744         self.assertEqual(response['status_code'], requests.codes.ok)
745         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
746                         'administrative-state': 'inService',
747                         'supporting-circuit-pack-name': 'CP5-CFP',
748                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
749                         'type': 'org-openroadm-interfaces:otnOtu',
750                         'supporting-port': 'CP5-CFP-P1'
751                         }
752         input_dict_2 = {'tx-dapi': 'X+8cRNi+HbI=',
753                         'expected-sapi': 'X+8cRNi+HbI=',
754                         'tx-sapi': 'ALvne1QI5jo4',
755                         'expected-dapi': 'ALvne1QI5jo4',
756                         'rate': 'org-openroadm-otn-common-types:OTU4',
757                         'fec': 'scfec'
758                         }
759         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
760                              response['interface'][0])
761         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
762                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
763
764         response2 = test_utils.check_node_attribute2_request(
765             'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-OTU', 'org-openroadm-otn-otu-interfaces:otu')
766         self.assertEqual(response2['status_code'], requests.codes.ok)
767         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
768         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
769         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
770         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
771
772     def test_042_check_no_interface_ODU4_spdrB(self):
773         response = test_utils.check_node_attribute_request(
774             'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-ODU4')
775         self.assertEqual(response['status_code'], requests.codes.conflict)
776         self.assertIn(response['interface'], (
777             {
778                 "error-type": "protocol",
779                 "error-tag": "data-missing",
780                 "error-message":
781                     "Request could not be completed because the relevant data "
782                     "model content does not exist"
783             }, {
784                 "error-type": "application",
785                 "error-tag": "data-missing",
786                 "error-message":
787                     "Request could not be completed because the relevant data "
788                     "model content does not exist"
789             }))
790
791     def test_043_check_openroadm_topo_spdrB(self):
792         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SB1-XPDR2', 'config')
793         self.assertEqual(response['status_code'], requests.codes.ok)
794         liste_tp = response['node']['ietf-network-topology:termination-point']
795         # pylint: disable=consider-using-f-string
796         for ele in liste_tp:
797             if ele['tp-id'] == 'XPDR2-NETWORK1':
798                 self.assertEqual(
799                     196.1,
800                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
801                 self.assertEqual(
802                     40,
803                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
804                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
805                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
806             elif ele['tp-id'] == 'XPDR2-NETWORK2':
807                 self.assertEqual(
808                     196.05,
809                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
810                 self.assertEqual(
811                     40,
812                     float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
813                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
814                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
815             else:
816                 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
817         time.sleep(1)
818
819     def test_044_check_openroadm_topo_ROADMB_SRG1(self):
820         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
821         self.assertEqual(response['status_code'], requests.codes.ok)
822         freq_map = base64.b64decode(
823             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
824         freq_map_array = [int(x) for x in freq_map]
825         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
826         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
827         self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
828         liste_tp = response['node']['ietf-network-topology:termination-point']
829         for ele in liste_tp:
830             if ele['tp-id'] == 'SRG1-PP1-TXRX':
831                 freq_map = base64.b64decode(
832                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
833                 freq_map_array = [int(x) for x in freq_map]
834                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
835             if ele['tp-id'] == 'SRG1-PP2-TXRX':
836                 freq_map = base64.b64decode(
837                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
838                 freq_map_array = [int(x) for x in freq_map]
839                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
840             if ele['tp-id'] == 'SRG1-PP3-TXRX':
841                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
842         time.sleep(1)
843
844     def test_045_check_openroadm_topo_ROADMB_DEG2(self):
845         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
846         self.assertEqual(response['status_code'], requests.codes.ok)
847         freq_map = base64.b64decode(
848             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
849         freq_map_array = [int(x) for x in freq_map]
850         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
851         liste_tp = response['node']['ietf-network-topology:termination-point']
852         for ele in liste_tp:
853             if ele['tp-id'] == 'DEG2-CTP-TXRX':
854                 freq_map = base64.b64decode(
855                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
856                 freq_map_array = [int(x) for x in freq_map]
857                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
858             if ele['tp-id'] == 'DEG2-TTP-TXRX':
859                 freq_map = base64.b64decode(
860                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
861                 freq_map_array = [int(x) for x in freq_map]
862                 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
863         time.sleep(1)
864
865     def test_046_check_otn_topo_otu4_links(self):
866         response = test_utils.get_ietf_network_request('otn-topology', 'config')
867         self.assertEqual(response['status_code'], requests.codes.ok)
868         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
869         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
870                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
871                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
872                       'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
873         for link in response['network'][0]['ietf-network-topology:link']:
874             self.assertIn(link['link-id'], listLinkId)
875             self.assertEqual(
876                 link['transportpce-networkutils:otn-link-type'], 'OTU4')
877             self.assertEqual(
878                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
879             self.assertEqual(
880                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
881             self.assertEqual(
882                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
883             self.assertIn(
884                 link['org-openroadm-common-network:opposite-link'], listLinkId)
885
886
887 # test service-create for 100GE service from spdrA to spdrC via spdrB
888
889
890     def test_047_create_100GE_service_ABC(self):
891         self.cr_serv_input_data["service-name"] = "service-100GE-ABC"
892         self.cr_serv_input_data["connection-type"] = "service"
893         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
894         self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SA1"
895         self.cr_serv_input_data["service-a-end"]["clli"] = "NodeSA"
896         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
897         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
898         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
899         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
900         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
901         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
902         self.cr_serv_input_data["service-z-end"]["node-id"] = "SPDR-SC1"
903         self.cr_serv_input_data["service-z-end"]["clli"] = "NodeSC"
904         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
905         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
906         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
907         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
908         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
909         response = test_utils.transportpce_api_rpc_request(
910             'org-openroadm-service', 'service-create',
911             self.cr_serv_input_data)
912         self.assertEqual(response['status_code'], requests.codes.ok)
913         self.assertIn('PCE calculation in progress',
914                       response['output']['configuration-response-common']['response-message'])
915         time.sleep(self.WAITING)
916
917     def test_048_get_100GE_service_ABC(self):
918         response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE-ABC")
919         self.assertEqual(response['status_code'], requests.codes.ok)
920         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
921         self.assertEqual(response['services'][0]['service-name'], 'service-100GE-ABC')
922         self.assertEqual(response['services'][0]['connection-type'], 'service')
923         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
924         time.sleep(1)
925
926     def test_049_check_interface_100GE_CLIENT_spdra(self):
927         response = test_utils.check_node_attribute_request(
928             'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
929         self.assertEqual(response['status_code'], requests.codes.ok)
930         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
931                       'administrative-state': 'inService',
932                       'supporting-circuit-pack-name': 'CP2-QSFP1',
933                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
934                       'supporting-port': 'CP2-QSFP1-P1'
935                       }
936         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
937                              response['interface'][0])
938         self.assertEqual(100000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
939         self.assertEqual('off', response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['fec'])
940
941     def test_050_check_interface_ODU4_CLIENT_spdra(self):
942         response = test_utils.check_node_attribute_request(
943             'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
944         self.assertEqual(response['status_code'], requests.codes.ok)
945         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
946                         'administrative-state': 'inService',
947                         'supporting-circuit-pack-name': 'CP2-QSFP1',
948                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
949                         'type': 'org-openroadm-interfaces:otnOdu',
950                         'supporting-port': 'CP2-QSFP1-P1'}
951         # SAPI/DAPI are added in the Otu4 renderer
952         input_dict_2 = {
953             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
954             'rate': 'org-openroadm-otn-common-types:ODU4',
955             'monitoring-mode': 'terminated',
956             'expected-dapi': 'AItaZ6nmyaKJ',
957             'expected-sapi': 'AKFnJJaijWiz',
958             'tx-dapi': 'AKFnJJaijWiz',
959             'tx-sapi': 'AItaZ6nmyaKJ'}
960
961         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
962                              response['interface'][0])
963         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
964                                   **input_dict_2),
965                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
966                              )
967         self.assertDictEqual(
968             {'payload-type': '21', 'exp-payload-type': '21'},
969             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
970
971         response2 = test_utils.check_node_attribute_request(
972             'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ODU4')
973         self.assertEqual(response['status_code'], requests.codes.ok)
974         self.assertEqual(input_dict_2['tx-sapi'],
975                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-dapi'])
976         self.assertEqual(input_dict_2['tx-sapi'],
977                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-sapi'])
978         self.assertEqual(input_dict_2['tx-dapi'],
979                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-sapi'])
980         self.assertEqual(input_dict_2['tx-dapi'],
981                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-dapi'])
982
983     def test_051_check_interface_ODU4_NETWORK_spdra(self):
984         response = test_utils.check_node_attribute_request(
985             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
986         self.assertEqual(response['status_code'], requests.codes.ok)
987         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
988                         'administrative-state': 'inService',
989                         'supporting-circuit-pack-name': 'CP5-CFP',
990                         'type': 'org-openroadm-interfaces:otnOdu',
991                         'supporting-port': 'CP5-CFP-P1',
992                         'circuit-id': 'TBD',
993                         'description': 'TBD'}
994         input_dict_2 = {
995             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
996             'rate': 'org-openroadm-otn-common-types:ODU4',
997             'monitoring-mode': 'monitored'}
998
999         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1000                              response['interface'][0])
1001         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1002                                   **input_dict_2),
1003                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1004                              )
1005         self.assertNotIn('opu',
1006                          dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1007
1008     def test_052_check_ODU4_connection_spdra(self):
1009         response = test_utils.check_node_attribute_request(
1010             'SPDR-SA1', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
1011         self.assertEqual(response['status_code'], requests.codes.ok)
1012         input_dict_1 = {
1013             'connection-name':
1014             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1015             'direction': 'bidirectional'
1016         }
1017
1018         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1019                              response['odu-connection'][0])
1020         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1021                              response['odu-connection'][0]['destination'])
1022         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1023                              response['odu-connection'][0]['source'])
1024
1025     def test_053_check_interface_100GE_CLIENT_spdrc(self):
1026         response = test_utils.check_node_attribute_request(
1027             'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
1028         self.assertEqual(response['status_code'], requests.codes.ok)
1029         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
1030                       'administrative-state': 'inService',
1031                       'supporting-circuit-pack-name': 'CP2-QSFP1',
1032                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1033                       'supporting-port': 'CP2-QSFP1-P1'
1034                       }
1035         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1036                              response['interface'][0])
1037         self.assertEqual(100000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1038         self.assertEqual('off', response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['fec'])
1039
1040     def test_054_check_interface_ODU4_CLIENT_spdrc(self):
1041         response = test_utils.check_node_attribute_request(
1042             'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ODU4')
1043         self.assertEqual(response['status_code'], requests.codes.ok)
1044         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
1045                         'administrative-state': 'inService',
1046                         'supporting-circuit-pack-name': 'CP2-QSFP1',
1047                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
1048                         'type': 'org-openroadm-interfaces:otnOdu',
1049                         'supporting-port': 'CP2-QSFP1-P1',
1050                         'circuit-id': 'TBD',
1051                         'description': 'TBD'}
1052         input_dict_2 = {
1053             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1054             'rate': 'org-openroadm-otn-common-types:ODU4',
1055             'monitoring-mode': 'terminated',
1056             'expected-dapi': 'AKFnJJaijWiz',
1057             'expected-sapi': 'AItaZ6nmyaKJ',
1058             'tx-dapi': 'AItaZ6nmyaKJ',
1059             'tx-sapi': 'AKFnJJaijWiz'}
1060
1061         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1062                              response['interface'][0])
1063         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1064                                   **input_dict_2),
1065                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1066                              )
1067         self.assertDictEqual(
1068             {'payload-type': '21', 'exp-payload-type': '21'},
1069             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1070
1071         response2 = test_utils.check_node_attribute_request(
1072             'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
1073         self.assertEqual(response['status_code'], requests.codes.ok)
1074         self.assertEqual(input_dict_2['tx-sapi'],
1075                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-dapi'])
1076         self.assertEqual(input_dict_2['tx-sapi'],
1077                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-sapi'])
1078         self.assertEqual(input_dict_2['tx-dapi'],
1079                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-sapi'])
1080         self.assertEqual(input_dict_2['tx-dapi'],
1081                          response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-dapi'])
1082
1083     def test_055_check_interface_ODU4_NETWORK_spdrc(self):
1084         response = test_utils.check_node_attribute_request(
1085             'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-ODU4')
1086         self.assertEqual(response['status_code'], requests.codes.ok)
1087         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1088                         'administrative-state': 'inService',
1089                         'supporting-circuit-pack-name': 'CP5-CFP',
1090                         'type': 'org-openroadm-interfaces:otnOdu',
1091                         'supporting-port': 'CP5-CFP-P1',
1092                         'circuit-id': 'TBD',
1093                         'description': 'TBD'}
1094         input_dict_2 = {
1095             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1096             'rate': 'org-openroadm-otn-common-types:ODU4',
1097             'monitoring-mode': 'monitored'}
1098
1099         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1100                              response['interface'][0])
1101         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1102                                   **input_dict_2),
1103                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1104                              )
1105         self.assertNotIn('opu',
1106                          dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1107
1108     def test_056_check_ODU4_connection_spdrc(self):
1109         response = test_utils.check_node_attribute_request(
1110             'SPDR-SC1', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
1111         self.assertEqual(response['status_code'], requests.codes.ok)
1112         input_dict_1 = {
1113             'connection-name':
1114             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1115             'direction': 'bidirectional'
1116         }
1117
1118         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1119                              response['odu-connection'][0])
1120         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1121                              response['odu-connection'][0]['destination'])
1122         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1123                              response['odu-connection'][0]['source'])
1124
1125     def test_057_check_interface_ODU4_NETWORK1_spdrb(self):
1126         response = test_utils.check_node_attribute_request(
1127             'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-ODU4')
1128         self.assertEqual(response['status_code'], requests.codes.ok)
1129         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1130                         'administrative-state': 'inService',
1131                         'supporting-circuit-pack-name': 'CP5-CFP',
1132                         'type': 'org-openroadm-interfaces:otnOdu',
1133                         'supporting-port': 'CP5-CFP-P1'}
1134         input_dict_2 = {
1135             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1136             'rate': 'org-openroadm-otn-common-types:ODU4',
1137             'monitoring-mode': 'monitored'}
1138
1139         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1140                              response['interface'][0])
1141         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1142                                   **input_dict_2),
1143                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1144                              )
1145         self.assertNotIn('opu',
1146                          dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1147
1148     def test_058_check_interface_ODU4_NETWORK2_spdrb(self):
1149         response = test_utils.check_node_attribute_request(
1150             'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-ODU4')
1151         self.assertEqual(response['status_code'], requests.codes.ok)
1152         input_dict_1 = {'name': 'XPDR2-NETWORK2-ODU4',
1153                         'administrative-state': 'inService',
1154                         'supporting-circuit-pack-name': 'CP6-CFP',
1155                         'type': 'org-openroadm-interfaces:otnOdu',
1156                         'supporting-port': 'CP6-CFP-P1'}
1157         input_dict_2 = {
1158             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1159             'rate': 'org-openroadm-otn-common-types:ODU4',
1160             'monitoring-mode': 'monitored'}
1161
1162         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1163                              response['interface'][0])
1164         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1165                                   **input_dict_2),
1166                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1167                              )
1168         self.assertNotIn('opu',
1169                          dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1170
1171     def test_059_check_ODU4_connection_spdrb(self):
1172         response = test_utils.check_node_attribute_request(
1173             'SPDR-SB1', 'odu-connection', 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4')
1174         self.assertEqual(response['status_code'], requests.codes.ok)
1175         input_dict_1 = {
1176             'connection-name':
1177             'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1178             'direction': 'bidirectional'
1179         }
1180
1181         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1182                              response['odu-connection'][0])
1183         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1184                              response['odu-connection'][0]['destination'])
1185         self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1186                              response['odu-connection'][0]['source'])
1187
1188     def test_060_check_otn_topo_links(self):
1189         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1190         self.assertEqual(response['status_code'], requests.codes.ok)
1191         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1192         for link in response['network'][0]['ietf-network-topology:link']:
1193             self.assertEqual(
1194                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1195             self.assertEqual(
1196                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1197
1198     def test_061_delete_service_100GE_ABC(self):
1199         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE-ABC"
1200         response = test_utils.transportpce_api_rpc_request(
1201             'org-openroadm-service', 'service-delete',
1202             self.del_serv_input_data)
1203         self.assertEqual(response['status_code'], requests.codes.ok)
1204         self.assertIn('Renderer service delete in progress',
1205                       response['output']['configuration-response-common']['response-message'])
1206         time.sleep(self.WAITING)
1207
1208     def test_062_check_service_list(self):
1209         response = test_utils.get_ordm_serv_list_request()
1210         self.assertEqual(response['status_code'], requests.codes.ok)
1211         self.assertEqual(len(response['service-list']['services']), 2)
1212         time.sleep(1)
1213
1214     def test_063_check_no_ODU4_connection_spdra(self):
1215         response = test_utils.check_node_request("SPDR-SA1")
1216         self.assertEqual(response['status_code'], requests.codes.ok)
1217         self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
1218         time.sleep(1)
1219
1220     def test_064_check_no_interface_ODU4_NETWORK_spdra(self):
1221         response = test_utils.check_node_attribute_request(
1222             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
1223         self.assertEqual(response['status_code'], requests.codes.conflict)
1224
1225     def test_065_check_no_interface_ODU4_CLIENT_spdra(self):
1226         response = test_utils.check_node_attribute_request(
1227             'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
1228         self.assertEqual(response['status_code'], requests.codes.conflict)
1229
1230     def test_066_check_no_interface_100GE_CLIENT_spdra(self):
1231         response = test_utils.check_node_attribute_request(
1232             'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
1233         self.assertEqual(response['status_code'], requests.codes.conflict)
1234
1235     def test_067_check_otn_topo_links(self):
1236         self.test_046_check_otn_topo_otu4_links()
1237
1238     def test_068_delete_OCH_OTU4_service_AB(self):
1239         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-OCH-OTU4-AB"
1240         response = test_utils.transportpce_api_rpc_request(
1241             'org-openroadm-service', 'service-delete',
1242             self.del_serv_input_data)
1243         self.assertEqual(response['status_code'], requests.codes.ok)
1244         self.assertIn('Renderer service delete in progress',
1245                       response['output']['configuration-response-common']['response-message'])
1246         time.sleep(self.WAITING)
1247
1248     def test_069_delete_OCH_OTU4_service_BC(self):
1249         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-OCH-OTU4-BC"
1250         response = test_utils.transportpce_api_rpc_request(
1251             'org-openroadm-service', 'service-delete',
1252             self.del_serv_input_data)
1253         self.assertEqual(response['status_code'], requests.codes.ok)
1254         self.assertIn('Renderer service delete in progress',
1255                       response['output']['configuration-response-common']['response-message'])
1256         time.sleep(self.WAITING)
1257
1258     def test_070_get_no_service(self):
1259         response = test_utils.get_ordm_serv_list_request()
1260         self.assertEqual(response['status_code'], requests.codes.conflict)
1261         self.assertIn(response['service-list'], (
1262             {
1263                 "error-type": "protocol",
1264                 "error-tag": "data-missing",
1265                 "error-message":
1266                     "Request could not be completed because the relevant data "
1267                     "model content does not exist"
1268             }, {
1269                 "error-type": "application",
1270                 "error-tag": "data-missing",
1271                 "error-message":
1272                     "Request could not be completed because the relevant data "
1273                     "model content does not exist"
1274             }))
1275         time.sleep(1)
1276
1277     def test_071_check_no_interface_OTU4_spdra(self):
1278         response = test_utils.check_node_attribute_request(
1279             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
1280         self.assertEqual(response['status_code'], requests.codes.conflict)
1281
1282     def test_072_check_no_interface_OCH_spdra(self):
1283         response = test_utils.check_node_attribute_request(
1284             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-761:768')
1285         self.assertEqual(response['status_code'], requests.codes.conflict)
1286
1287     def test_073_getLinks_OtnTopology(self):
1288         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1289         self.assertEqual(response['status_code'], requests.codes.ok)
1290         self.assertNotIn('ietf-network-topology:link', response['network'][0])
1291
1292     def test_074_check_openroadm_topo_spdra(self):
1293         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
1294         self.assertEqual(response['status_code'], requests.codes.ok)
1295         tp = response['node']['ietf-network-topology:termination-point'][0]
1296         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1297         self.assertNotIn('wavelength', dict.keys(
1298             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1299         time.sleep(1)
1300
1301     def test_075_check_openroadm_topo_ROADMB_SRG1(self):
1302         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
1303         self.assertEqual(response['status_code'], requests.codes.ok)
1304         freq_map = base64.b64decode(
1305             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1306         freq_map_array = [int(x) for x in freq_map]
1307         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1308         self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1309         liste_tp = response['node']['ietf-network-topology:termination-point']
1310         for ele in liste_tp:
1311             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1312                 freq_map = base64.b64decode(
1313                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1314                 freq_map_array = [int(x) for x in freq_map]
1315                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1316             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1317                 freq_map = base64.b64decode(
1318                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1319                 freq_map_array = [int(x) for x in freq_map]
1320                 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1321         time.sleep(1)
1322
1323     def test_076_check_openroadm_topo_ROADMB_DEG1(self):
1324         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG1', 'config')
1325         self.assertEqual(response['status_code'], requests.codes.ok)
1326         freq_map = base64.b64decode(
1327             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1328         freq_map_array = [int(x) for x in freq_map]
1329         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1330         liste_tp = response['node']['ietf-network-topology:termination-point']
1331         for ele in liste_tp:
1332             if ele['tp-id'] == 'DEG1-CTP-TXRX':
1333                 freq_map = base64.b64decode(
1334                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1335                 freq_map_array = [int(x) for x in freq_map]
1336                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1337             if ele['tp-id'] == 'DEG1-TTP-TXRX':
1338                 freq_map = base64.b64decode(
1339                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1340                 freq_map_array = [int(x) for x in freq_map]
1341                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1342         time.sleep(1)
1343
1344     def test_077_check_openroadm_topo_ROADMB_DEG2(self):
1345         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
1346         self.assertEqual(response['status_code'], requests.codes.ok)
1347         freq_map = base64.b64decode(
1348             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1349         freq_map_array = [int(x) for x in freq_map]
1350         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1351         liste_tp = response['node']['ietf-network-topology:termination-point']
1352         for ele in liste_tp:
1353             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1354                 freq_map = base64.b64decode(
1355                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1356                 freq_map_array = [int(x) for x in freq_map]
1357                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1358             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1359                 freq_map = base64.b64decode(
1360                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1361                 freq_map_array = [int(x) for x in freq_map]
1362                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1363         time.sleep(1)
1364
1365     def test_078_disconnect_xponders_from_roadm(self):
1366         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1367         self.assertEqual(response['status_code'], requests.codes.ok)
1368         links = response['network'][0]['ietf-network-topology:link']
1369         for link in links:
1370             if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1371                     and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1372                 response = test_utils.del_ietf_network_link_request(
1373                     'openroadm-topology', link['link-id'], 'config')
1374                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1375
1376     def test_079_disconnect_spdrB(self):
1377         response = test_utils.unmount_device("SPDR-SB1")
1378         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1379
1380     def test_080_disconnect_roadmB(self):
1381         response = test_utils.unmount_device("ROADM-B1")
1382         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1383
1384     def test_081_remove_roadm_to_roadm_links(self):
1385         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1386         self.assertEqual(response['status_code'], requests.codes.ok)
1387         links = response['network'][0]['ietf-network-topology:link']
1388         for link in links:
1389             if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1390                     and 'ROADM-B1' in link['link-id']):
1391                 response = test_utils.del_ietf_network_link_request(
1392                     'openroadm-topology', link['link-id'], 'config')
1393                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1394
1395     def test_082_add_omsAttributes_ROADMA_ROADMC(self):
1396         # Config ROADMA-ROADMC oms-attributes
1397         data = {"span": {
1398             "auto-spanloss": "true",
1399             "spanloss-base": 11.4,
1400             "spanloss-current": 12,
1401             "engineered-spanloss": 12.2,
1402             "link-concatenation": [{
1403                 "SRLG-Id": 0,
1404                 "fiber-type": "smf",
1405                 "SRLG-length": 100000,
1406                 "pmd": 0.5}]}}
1407         response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX",
1408                                                    data)
1409         self.assertEqual(response.status_code, requests.codes.created)
1410
1411     def test_083_add_omsAttributes_ROADMC_ROADMA(self):
1412         # Config ROADMC-ROADMA oms-attributes
1413         data = {"span": {
1414             "auto-spanloss": "true",
1415             "spanloss-base": 11.4,
1416             "spanloss-current": 12,
1417             "engineered-spanloss": 12.2,
1418             "link-concatenation": [{
1419                 "SRLG-Id": 0,
1420                 "fiber-type": "smf",
1421                 "SRLG-length": 100000,
1422                 "pmd": 0.5}]}}
1423         response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX",
1424                                                    data)
1425         self.assertEqual(response.status_code, requests.codes.created)
1426
1427     def test_084_create_OCH_OTU4_service_AC(self):
1428         self.cr_serv_input_data["service-name"] = "service-OCH-OTU4-AC"
1429         self.cr_serv_input_data["connection-type"] = "infrastructure"
1430         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
1431         self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
1432         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1433         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1434         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1435         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1436         self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1437         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1438         self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1439         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1440         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1441         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1442         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1443         self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1444         response = test_utils.transportpce_api_rpc_request(
1445             'org-openroadm-service', 'service-create',
1446             self.cr_serv_input_data)
1447         self.assertEqual(response['status_code'], requests.codes.ok)
1448         self.assertIn('PCE calculation in progress',
1449                       response['output']['configuration-response-common']['response-message'])
1450         time.sleep(self.WAITING)
1451
1452     def test_085_get_OCH_OTU4_service_AC(self):
1453         response = test_utils.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AC")
1454         self.assertEqual(response['status_code'], requests.codes.ok)
1455         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1456         self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1457         self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
1458         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1459         time.sleep(1)
1460
1461 # test service-create for 100GE service from spdrA to spdrC via spdrB
1462     def test_086_create_100GE_service_AC(self):
1463         self.cr_serv_input_data["service-name"] = "service-100GE-AC"
1464         self.cr_serv_input_data["connection-type"] = "service"
1465         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1466         self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SA1"
1467         self.cr_serv_input_data["service-a-end"]["clli"] = "NodeSA"
1468         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1469         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1470         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1471         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1472         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1473         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1474         self.cr_serv_input_data["service-z-end"]["node-id"] = "SPDR-SC1"
1475         self.cr_serv_input_data["service-z-end"]["clli"] = "NodeSC"
1476         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1477         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1478         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1479         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1480         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1481
1482         response = test_utils.transportpce_api_rpc_request(
1483             'org-openroadm-service', 'service-create',
1484             self.cr_serv_input_data)
1485         self.assertEqual(response['status_code'], requests.codes.ok)
1486         self.assertIn('PCE calculation in progress',
1487                       response['output']['configuration-response-common']['response-message'])
1488         time.sleep(self.WAITING)
1489
1490     def test_087_get_100GE_service_AC(self):
1491         response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE-AC")
1492         self.assertEqual(response['status_code'], requests.codes.ok)
1493         self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1494         self.assertEqual(response['services'][0]['service-name'], 'service-100GE-AC')
1495         self.assertEqual(response['services'][0]['connection-type'], 'service')
1496         self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1497         time.sleep(1)
1498
1499     def test_088_check_interface_OTU4_spdra(self):
1500         response = test_utils.check_node_attribute_request(
1501             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
1502         self.assertEqual(response['status_code'], requests.codes.ok)
1503         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1504                         'administrative-state': 'inService',
1505                         'supporting-circuit-pack-name': 'CP5-CFP',
1506                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
1507                         'type': 'org-openroadm-interfaces:otnOtu',
1508                         'supporting-port': 'CP5-CFP-P1'
1509                         }
1510         input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
1511                         'expected-dapi': 'AOQxIv+6nCD+',
1512                         'tx-dapi': 'ALvne1QI5jo4',
1513                         'expected-sapi': 'ALvne1QI5jo4',
1514                         'rate': 'org-openroadm-otn-common-types:OTU4',
1515                         'fec': 'scfec'
1516                         }
1517         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1518                              response['interface'][0])
1519         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1520                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1521
1522         response2 = test_utils.check_node_attribute2_request(
1523             'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
1524         self.assertEqual(response2['status_code'], requests.codes.ok)
1525         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
1526         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
1527         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
1528         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
1529
1530     def test_089_check_interface_OTU4_spdrC(self):
1531         response = test_utils.check_node_attribute_request(
1532             'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU')
1533         self.assertEqual(response['status_code'], requests.codes.ok)
1534         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1535                         'administrative-state': 'inService',
1536                         'supporting-circuit-pack-name': 'CP5-CFP',
1537                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
1538                         'type': 'org-openroadm-interfaces:otnOtu',
1539                         'supporting-port': 'CP5-CFP-P1'
1540                         }
1541         input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
1542                         'expected-sapi': 'AOQxIv+6nCD+',
1543                         'tx-sapi': 'ALvne1QI5jo4',
1544                         'expected-dapi': 'ALvne1QI5jo4',
1545                         'rate': 'org-openroadm-otn-common-types:OTU4',
1546                         'fec': 'scfec'
1547                         }
1548         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1549                              response['interface'][0])
1550         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1551                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1552
1553         response2 = test_utils.check_node_attribute2_request(
1554             'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
1555         self.assertEqual(response2['status_code'], requests.codes.ok)
1556         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
1557         self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
1558         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
1559         self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
1560
1561     def test_090_check_configuration_spdra(self):
1562         self.test_049_check_interface_100GE_CLIENT_spdra()
1563         self.test_050_check_interface_ODU4_CLIENT_spdra()
1564         self.test_051_check_interface_ODU4_NETWORK_spdra()
1565         self.test_052_check_ODU4_connection_spdra()
1566
1567     def test_091_check_configuration_spdrc(self):
1568         self.test_053_check_interface_100GE_CLIENT_spdrc()
1569         self.test_054_check_interface_ODU4_CLIENT_spdrc()
1570         self.test_055_check_interface_ODU4_NETWORK_spdrc()
1571         self.test_056_check_ODU4_connection_spdrc()
1572
1573     def test_092_check_otn_topo_links(self):
1574         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1575         self.assertEqual(response['status_code'], requests.codes.ok)
1576         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
1577         for link in response['network'][0]['ietf-network-topology:link']:
1578             self.assertEqual(
1579                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1580             self.assertEqual(
1581                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1582
1583     def test_093_delete_100GE_service_AC(self):
1584         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE-AC"
1585         response = test_utils.transportpce_api_rpc_request(
1586             'org-openroadm-service', 'service-delete',
1587             self.del_serv_input_data)
1588         self.assertEqual(response['status_code'], requests.codes.ok)
1589         self.assertIn('Renderer service delete in progress',
1590                       response['output']['configuration-response-common']['response-message'])
1591         time.sleep(self.WAITING)
1592
1593     def test_094_check_service_list(self):
1594         response = test_utils.get_ordm_serv_list_request()
1595         self.assertEqual(response['status_code'], requests.codes.ok)
1596         self.assertEqual(len(response['service-list']['services']), 1)
1597         time.sleep(1)
1598
1599     def test_095_check_configuration_spdra(self):
1600         self.test_063_check_no_ODU4_connection_spdra()
1601         self.test_064_check_no_interface_ODU4_NETWORK_spdra()
1602         self.test_065_check_no_interface_ODU4_CLIENT_spdra()
1603         self.test_066_check_no_interface_100GE_CLIENT_spdra()
1604
1605     def test_096_check_otn_topo_links(self):
1606         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1607         self.assertEqual(response['status_code'], requests.codes.ok)
1608         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
1609         for link in response['network'][0]['ietf-network-topology:link']:
1610             self.assertEqual(
1611                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1612             self.assertEqual(
1613                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1614
1615     def test_097_disconnect_xponders_from_roadm(self):
1616         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1617         self.assertEqual(response['status_code'], requests.codes.ok)
1618         links = response['network'][0]['ietf-network-topology:link']
1619         for link in links:
1620             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1621                 response = test_utils.del_ietf_network_link_request(
1622                     'openroadm-topology', link['link-id'], 'config')
1623                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1624
1625     def test_098_disconnect_spdrA(self):
1626         response = test_utils.unmount_device("SPDR-SA1")
1627         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1628
1629     def test_099_disconnect_spdrC(self):
1630         response = test_utils.unmount_device("SPDR-SC1")
1631         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1632
1633     def test_100_disconnect_roadmA(self):
1634         response = test_utils.unmount_device("ROADM-A1")
1635         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1636
1637     def test_101_disconnect_roadmC(self):
1638         response = test_utils.unmount_device("ROADM-C1")
1639         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1640
1641
1642 if __name__ == "__main__":
1643     unittest.main(verbosity=2)