Speed up functional tests execution
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_input_data = {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service1-OCH-OTU4",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "tx-direction": [{
50                 "port": {
51                     "port-device-name": "SPDR-SA1-XPDR1",
52                     "port-type": "fixed",
53                     "port-name": "XPDR1-NETWORK1",
54                     "port-rack": "000000.00",
55                     "port-shelf": "Chassis#1"
56                 },
57                 "lgx": {
58                     "lgx-device-name": "Some lgx-device-name",
59                     "lgx-port-name": "Some lgx-port-name",
60                     "lgx-port-rack": "000000.00",
61                     "lgx-port-shelf": "00"
62                 },
63                 "index": 0
64             }],
65             "rx-direction": [{
66                 "port": {
67                     "port-device-name": "SPDR-SA1-XPDR1",
68                     "port-type": "fixed",
69                     "port-name": "XPDR1-NETWORK1",
70                     "port-rack": "000000.00",
71                     "port-shelf": "Chassis#1"
72                 },
73                 "lgx": {
74                     "lgx-device-name": "Some lgx-device-name",
75                     "lgx-port-name": "Some lgx-port-name",
76                     "lgx-port-rack": "000000.00",
77                     "lgx-port-shelf": "00"
78                 },
79                 "index": 0
80             }],
81             "optic-type": "gray"
82         },
83         "service-z-end": {
84             "service-rate": "100",
85             "node-id": "SPDR-SC1",
86             "service-format": "OTU",
87             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
88             "clli": "NodeSC",
89             "tx-direction": [{
90                 "port": {
91                     "port-device-name": "SPDR-SC1-XPDR1",
92                     "port-type": "fixed",
93                     "port-name": "XPDR1-NETWORK1",
94                     "port-rack": "000000.00",
95                     "port-shelf": "Chassis#1"
96                 },
97                 "lgx": {
98                     "lgx-device-name": "Some lgx-device-name",
99                     "lgx-port-name": "Some lgx-port-name",
100                     "lgx-port-rack": "000000.00",
101                     "lgx-port-shelf": "00"
102                 },
103                 "index": 0
104             }],
105             "rx-direction": [{
106                 "port": {
107                     "port-device-name": "SPDR-SC1-XPDR1",
108                     "port-type": "fixed",
109                     "port-name": "XPDR1-NETWORK1",
110                     "port-rack": "000000.00",
111                     "port-shelf": "Chassis#1"
112                 },
113                 "lgx": {
114                     "lgx-device-name": "Some lgx-device-name",
115                     "lgx-port-name": "Some lgx-port-name",
116                     "lgx-port-rack": "000000.00",
117                     "lgx-port-shelf": "00"
118                 },
119                 "index": 0
120             }],
121             "optic-type": "gray"
122         },
123         "due-date": "2018-06-15T00:00:01Z",
124         "operator-contact": "pw1234"
125     }
126
127     del_serv_input_data = {
128         "sdnc-request-header": {
129             "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
130             "rpc-action": "service-delete",
131             "request-system-id": "appname",
132             "notification-url": "http://localhost:8585/NotificationServer/notify"},
133         "service-delete-req-info": {
134             "service-name": "TBD",
135             "tail-retention": "no"}
136     }
137
138     @classmethod
139     def setUpClass(cls):
140         cls.processes = test_utils.start_tpce()
141         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
142                                                ('roadma', cls.NODE_VERSION),
143                                                ('roadmc', cls.NODE_VERSION),
144                                                ('spdrc', cls.NODE_VERSION)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(2)
155
156     def test_01_connect_spdrA(self):
157         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_spdrC(self):
162         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_rdmA(self):
167         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmC(self):
172         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
177         response = test_utils.transportpce_api_rpc_request(
178             'transportpce-networkutils', 'init-xpdr-rdm-links',
179             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
180                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
181         self.assertEqual(response['status_code'], requests.codes.ok)
182
183     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
184         response = test_utils.transportpce_api_rpc_request(
185             'transportpce-networkutils', 'init-rdm-xpdr-links',
186             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
187                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
188         self.assertEqual(response['status_code'], requests.codes.ok)
189
190     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
191         response = test_utils.transportpce_api_rpc_request(
192             'transportpce-networkutils', 'init-xpdr-rdm-links',
193             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
194                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
195         self.assertEqual(response['status_code'], requests.codes.ok)
196
197     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
198         response = test_utils.transportpce_api_rpc_request(
199             'transportpce-networkutils', 'init-rdm-xpdr-links',
200             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
201                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
202         self.assertEqual(response['status_code'], requests.codes.ok)
203
204     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
205         # Config ROADMA-ROADMC oms-attributes
206         data = {"span": {
207             "auto-spanloss": "true",
208             "spanloss-base": 11.4,
209             "spanloss-current": 12,
210             "engineered-spanloss": 12.2,
211             "link-concatenation": [{
212                 "SRLG-Id": 0,
213                 "fiber-type": "smf",
214                 "SRLG-length": 100000,
215                 "pmd": 0.5}]}}
216         response = test_utils.add_oms_attr_request(
217             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
218         self.assertEqual(response.status_code, requests.codes.created)
219
220     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
221         # Config ROADMC-ROADMA oms-attributes
222         data = {"span": {
223             "auto-spanloss": "true",
224             "spanloss-base": 11.4,
225             "spanloss-current": 12,
226             "engineered-spanloss": 12.2,
227             "link-concatenation": [{
228                 "SRLG-Id": 0,
229                 "fiber-type": "smf",
230                 "SRLG-length": 100000,
231                 "pmd": 0.5}]}}
232         response = test_utils.add_oms_attr_request(
233             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
234         self.assertEqual(response.status_code, requests.codes.created)
235
236 # test service-create for OCH-OTU4 service from spdr to spdr
237     def test_11_check_otn_topology(self):
238         response = test_utils.get_ietf_network_request('otn-topology', 'config')
239         self.assertEqual(response['status_code'], requests.codes.ok)
240         self.assertEqual(len(response['network'][0]['node']), 6)
241         self.assertNotIn('ietf-network-topology:link', response['network'][0])
242
243     def test_12_create_OCH_OTU4_service(self):
244         response = test_utils.transportpce_api_rpc_request(
245             'org-openroadm-service', 'service-create',
246             self.cr_serv_input_data)
247         self.assertEqual(response['status_code'], requests.codes.ok)
248         time.sleep(self.WAITING)
249
250     def test_13_get_OCH_OTU4_service1(self):
251         response = test_utils.get_ordm_serv_list_attr_request(
252             "services", "service1-OCH-OTU4")
253         self.assertEqual(response['status_code'], requests.codes.ok)
254         self.assertEqual(
255             response['services'][0]['administrative-state'], 'inService')
256         self.assertEqual(
257             response['services'][0]['service-name'], 'service1-OCH-OTU4')
258         self.assertEqual(
259             response['services'][0]['connection-type'], 'infrastructure')
260         self.assertEqual(
261             response['services'][0]['lifecycle-state'], 'planned')
262
263     # Check correct configuration of devices
264     def test_14_check_interface_och_spdra(self):
265         response = test_utils.check_node_attribute_request(
266             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
267         self.assertEqual(response['status_code'], requests.codes.ok)
268         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
269                                    'administrative-state': 'inService',
270                                    'supporting-circuit-pack-name': 'CP1-CFP0',
271                                    'type': 'org-openroadm-interfaces:opticalChannel',
272                                    'supporting-port': 'CP1-CFP0-P1'
273                                    }, **response['interface'][0]),
274                              response['interface'][0])
275         self.assertEqual('org-openroadm-common-types:R100G',
276                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
277         self.assertEqual('dp-qpsk',
278                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
279         self.assertEqual(196.1,
280                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
281         self.assertEqual(
282             -5,
283             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
284
285     def test_15_check_interface_OTU4_spdra(self):
286         response = test_utils.check_node_attribute_request(
287             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
288         self.assertEqual(response['status_code'], requests.codes.ok)
289         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
290                         'administrative-state': 'inService',
291                         'supporting-circuit-pack-name': 'CP1-CFP0',
292                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
293                         'type': 'org-openroadm-interfaces:otnOtu',
294                         'supporting-port': 'CP1-CFP0-P1'
295                         }
296         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
297                         'expected-dapi': 'H/OelLynehI=',
298                         'tx-dapi': 'AMf1n5hK6Xkk',
299                         'expected-sapi': 'AMf1n5hK6Xkk',
300                         'rate': 'org-openroadm-otn-common-types:OTU4',
301                         'fec': 'scfec'
302                         }
303         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
304                              response['interface'][0])
305         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
306                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
307
308     def test_16_check_interface_och_spdrc(self):
309         response = test_utils.check_node_attribute_request(
310             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
311         self.assertEqual(response['status_code'], requests.codes.ok)
312         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
313                                    'administrative-state': 'inService',
314                                    'supporting-circuit-pack-name': 'CP1-CFP0',
315                                    'type': 'org-openroadm-interfaces:opticalChannel',
316                                    'supporting-port': 'CP1-CFP0-P1'
317                                    }, **response['interface'][0]),
318                              response['interface'][0])
319         self.assertEqual('org-openroadm-common-types:R100G',
320                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
321         self.assertEqual('dp-qpsk',
322                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
323         self.assertEqual(196.1,
324                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
325         self.assertEqual(
326             -5,
327             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
328
329     def test_17_check_interface_OTU4_spdrc(self):
330         response = test_utils.check_node_attribute_request(
331             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
332         self.assertEqual(response['status_code'], requests.codes.ok)
333         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
334                         'administrative-state': 'inService',
335                         'supporting-circuit-pack-name': 'CP1-CFP0',
336                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
337                         'type': 'org-openroadm-interfaces:otnOtu',
338                         'supporting-port': 'CP1-CFP0-P1'
339                         }
340         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
341                         'expected-sapi': 'H/OelLynehI=',
342                         'tx-sapi': 'AMf1n5hK6Xkk',
343                         'expected-dapi': 'AMf1n5hK6Xkk',
344                         'rate': 'org-openroadm-otn-common-types:OTU4',
345                         'fec': 'scfec'
346                         }
347         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
348                              response['interface'][0])
349         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
350                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
351
352     def test_18_check_no_interface_ODU4_spdra(self):
353         response = test_utils.check_node_attribute_request(
354             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
355         self.assertEqual(response['status_code'], requests.codes.conflict)
356
357     def test_19_check_openroadm_topo_spdra(self):
358         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
359         self.assertEqual(response['status_code'], requests.codes.ok)
360         ele = response['node']['ietf-network-topology:termination-point'][0]
361         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
362         self.assertEqual(
363             196.1,
364             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
365         self.assertEqual(
366             40,
367             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
368
369     def test_20_check_openroadm_topo_ROADMA_SRG(self):
370         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
371         self.assertEqual(response['status_code'], requests.codes.ok)
372         freq_map = base64.b64decode(
373             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
374         freq_map_array = [int(x) for x in freq_map]
375         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
376         liste_tp = response['node']['ietf-network-topology:termination-point']
377         for ele in liste_tp:
378             if ele['tp-id'] == 'SRG1-PP1-TXRX':
379                 freq_map = base64.b64decode(
380                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
381                 freq_map_array = [int(x) for x in freq_map]
382                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
383             if ele['tp-id'] == 'SRG1-PP2-TXRX':
384                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
385
386     def test_21_check_openroadm_topo_ROADMA_DEG(self):
387         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
388         self.assertEqual(response['status_code'], requests.codes.ok)
389         freq_map = base64.b64decode(
390             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
391         freq_map_array = [int(x) for x in freq_map]
392         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
393         liste_tp = response['node']['ietf-network-topology:termination-point']
394         for ele in liste_tp:
395             if ele['tp-id'] == 'DEG2-CTP-TXRX':
396                 freq_map = base64.b64decode(
397                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
398                 freq_map_array = [int(x) for x in freq_map]
399                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
400             if ele['tp-id'] == 'DEG2-TTP-TXRX':
401                 freq_map = base64.b64decode(
402                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
403                 freq_map_array = [int(x) for x in freq_map]
404                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
405
406     def test_22_check_otn_topo_otu4_links(self):
407         response = test_utils.get_ietf_network_request('otn-topology', 'config')
408         self.assertEqual(response['status_code'], requests.codes.ok)
409         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
410         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
411                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
412         for link in response['network'][0]['ietf-network-topology:link']:
413             self.assertIn(link['link-id'], listLinkId)
414             self.assertEqual(
415                 link['transportpce-networkutils:otn-link-type'], 'OTU4')
416             self.assertEqual(
417                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
418             self.assertEqual(
419                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
420             self.assertEqual(
421                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
422             self.assertIn(
423                 link['org-openroadm-common-network:opposite-link'], listLinkId)
424
425 # test service-create for ODU4 service from spdr to spdr
426     def test_23_create_ODU4_service(self):
427         self.cr_serv_input_data["service-name"] = "service1-ODU4"
428         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
429         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
430         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
431         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
432         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
433         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
434
435         response = test_utils.transportpce_api_rpc_request(
436             'org-openroadm-service', 'service-create',
437             self.cr_serv_input_data)
438         self.assertEqual(response['status_code'], requests.codes.ok)
439         time.sleep(self.WAITING)
440
441     def test_24_get_ODU4_service1(self):
442         response = test_utils.get_ordm_serv_list_attr_request(
443             "services", "service1-ODU4")
444         self.assertEqual(response['status_code'], requests.codes.ok)
445         self.assertEqual(
446             response['services'][0]['administrative-state'], 'inService')
447         self.assertEqual(
448             response['services'][0]['service-name'], 'service1-ODU4')
449         self.assertEqual(
450             response['services'][0]['connection-type'], 'infrastructure')
451         self.assertEqual(
452             response['services'][0]['lifecycle-state'], 'planned')
453
454     def test_25_check_interface_ODU4_spdra(self):
455         response = test_utils.check_node_attribute_request(
456             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
457         self.assertEqual(response['status_code'], requests.codes.ok)
458         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
459                         'administrative-state': 'inService',
460                         'supporting-circuit-pack-name': 'CP1-CFP0',
461                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
462                         'type': 'org-openroadm-interfaces:otnOdu',
463                         'supporting-port': 'CP1-CFP0-P1',
464                         'circuit-id': 'TBD',
465                         'description': 'TBD'}
466         # SAPI/DAPI are added in the Otu4 renderer
467         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
468                         'rate': 'org-openroadm-otn-common-types:ODU4',
469                         'monitoring-mode': 'terminated',
470                         'expected-dapi': 'H/OelLynehI=',
471                         'expected-sapi': 'AMf1n5hK6Xkk',
472                         'tx-dapi': 'AMf1n5hK6Xkk',
473                         'tx-sapi': 'H/OelLynehI='}
474
475         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
476                              response['interface'][0])
477         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
478                                   **input_dict_2),
479                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
480                              )
481         self.assertDictEqual(
482             {'payload-type': '21', 'exp-payload-type': '21'},
483             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
484
485     def test_26_check_interface_ODU4_spdrc(self):
486         response = test_utils.check_node_attribute_request(
487             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
488         self.assertEqual(response['status_code'], requests.codes.ok)
489         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
490                         'administrative-state': 'inService',
491                         'supporting-circuit-pack-name': 'CP1-CFP0',
492                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
493                         'type': 'org-openroadm-interfaces:otnOdu',
494                         'supporting-port': 'CP1-CFP0-P1',
495                         'circuit-id': 'TBD',
496                         'description': 'TBD'}
497         # SAPI/DAPI are added in the Otu4 renderer
498         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
499                         'rate': 'org-openroadm-otn-common-types:ODU4',
500                         'monitoring-mode': 'terminated',
501                         'tx-sapi': 'AMf1n5hK6Xkk',
502                         'tx-dapi': 'H/OelLynehI=',
503                         'expected-sapi': 'H/OelLynehI=',
504                         'expected-dapi': 'AMf1n5hK6Xkk'
505                         }
506         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
507                              response['interface'][0])
508         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
509                                   **input_dict_2),
510                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
511                              )
512         self.assertDictEqual(
513             {'payload-type': '21', 'exp-payload-type': '21'},
514             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
515
516     def test_27_check_otn_topo_links(self):
517         response = test_utils.get_ietf_network_request('otn-topology', 'config')
518         self.assertEqual(response['status_code'], requests.codes.ok)
519         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
520         for link in response['network'][0]['ietf-network-topology:link']:
521             linkId = link['link-id']
522             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
523                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
524                 self.assertEqual(
525                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
526                 self.assertEqual(
527                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
528             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
529                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
530                 self.assertEqual(
531                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
532                 self.assertEqual(
533                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
534                 self.assertEqual(
535                     link['transportpce-networkutils:otn-link-type'], 'ODTU4')
536                 self.assertEqual(
537                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
538                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
539                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
540                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
541             else:
542                 self.fail("this link should not exist")
543
544     def test_28_check_otn_topo_tp(self):
545         response = test_utils.get_ietf_network_request('otn-topology', 'config')
546         self.assertEqual(response['status_code'], requests.codes.ok)
547         for node in response['network'][0]['node']:
548             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
549                 tpList = node['ietf-network-topology:termination-point']
550                 for tp in tpList:
551                     if tp['tp-id'] == 'XPDR1-NETWORK1':
552                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
553                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
554                         self.assertEqual(
555                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
556                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
557                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
558
559 # test service-create for 10GE service from spdr to spdr
560     def test_29_create_10GE_service(self):
561         self.cr_serv_input_data["service-name"] = "service1-10GE"
562         self.cr_serv_input_data["connection-type"] = "service"
563         self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
564         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
565         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
566         self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
567         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
568         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
569         self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
570         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
571         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
572         self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
573         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
574         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
575         response = test_utils.transportpce_api_rpc_request(
576             'org-openroadm-service', 'service-create',
577             self.cr_serv_input_data)
578         self.assertEqual(response['status_code'], requests.codes.ok)
579         time.sleep(self.WAITING)
580
581     def test_30_get_10GE_service1(self):
582         response = test_utils.get_ordm_serv_list_attr_request(
583             "services", "service1-10GE")
584         self.assertEqual(response['status_code'], requests.codes.ok)
585         self.assertEqual(
586             response['services'][0]['administrative-state'], 'inService')
587         self.assertEqual(
588             response['services'][0]['service-name'], 'service1-10GE')
589         self.assertEqual(
590             response['services'][0]['connection-type'], 'service')
591         self.assertEqual(
592             response['services'][0]['lifecycle-state'], 'planned')
593
594     def test_31_check_interface_10GE_CLIENT_spdra(self):
595         response = test_utils.check_node_attribute_request(
596             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
597         self.assertEqual(response['status_code'], requests.codes.ok)
598         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
599                       'administrative-state': 'inService',
600                       'supporting-circuit-pack-name': 'CP1-SFP4',
601                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
602                       'supporting-port': 'CP1-SFP4-P1'
603                       }
604         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
605                              response['interface'][0])
606         self.assertEqual(
607             10000,
608             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
609
610     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
611         response = test_utils.check_node_attribute_request(
612             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e:service1-10GE')
613         self.assertEqual(response['status_code'], requests.codes.ok)
614         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e:service1-10GE',
615                         'administrative-state': 'inService',
616                         'supporting-circuit-pack-name': 'CP1-SFP4',
617                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
618                         'type': 'org-openroadm-interfaces:otnOdu',
619                         'supporting-port': 'CP1-SFP4-P1'}
620         input_dict_2 = {
621             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
622             'rate': 'org-openroadm-otn-common-types:ODU2e',
623             'monitoring-mode': 'terminated'}
624
625         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
626                              response['interface'][0])
627         self.assertDictEqual(dict(input_dict_2,
628                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
629                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
630         self.assertDictEqual(
631             {'payload-type': '03', 'exp-payload-type': '03'},
632             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
633
634     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
635         response = test_utils.check_node_attribute_request(
636             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e:service1-10GE')
637         self.assertEqual(response['status_code'], requests.codes.ok)
638         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:service1-10GE',
639                         'administrative-state': 'inService',
640                         'supporting-circuit-pack-name': 'CP1-CFP0',
641                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
642                         'type': 'org-openroadm-interfaces:otnOdu',
643                         'supporting-port': 'CP1-CFP0-P1'}
644         input_dict_2 = {
645             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
646             'rate': 'org-openroadm-otn-common-types:ODU2e',
647             'monitoring-mode': 'monitored'}
648         input_dict_3 = {'trib-port-number': 1}
649
650         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
651                              response['interface'][0])
652         self.assertDictEqual(dict(input_dict_2,
653                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
654                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
655         self.assertDictEqual(dict(input_dict_3,
656                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
657                                       'parent-odu-allocation']),
658                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
659         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
660                       ['trib-slots'])
661
662     def test_34_check_ODU2E_connection_spdra(self):
663         response = test_utils.check_node_attribute_request(
664             'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
665         self.assertEqual(response['status_code'], requests.codes.ok)
666         input_dict_1 = {
667             'connection-name':
668             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
669             'direction': 'bidirectional'
670         }
671
672         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
673                              response['odu-connection'][0])
674         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:service1-10GE'},
675                              response['odu-connection'][0]['destination'])
676         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e:service1-10GE'},
677                              response['odu-connection'][0]['source'])
678
679     def test_35_check_interface_10GE_CLIENT_spdrc(self):
680         response = test_utils.check_node_attribute_request(
681             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
682         self.assertEqual(response['status_code'], requests.codes.ok)
683         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
684                       'administrative-state': 'inService',
685                       'supporting-circuit-pack-name': 'CP1-SFP4',
686                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
687                       'supporting-port': 'CP1-SFP4-P1'
688                       }
689         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
690                              response['interface'][0])
691         self.assertEqual(
692             10000,
693             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
694
695     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
696         response = test_utils.check_node_attribute_request(
697             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e:service1-10GE')
698         self.assertEqual(response['status_code'], requests.codes.ok)
699         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e:service1-10GE',
700                         'administrative-state': 'inService',
701                         'supporting-circuit-pack-name': 'CP1-SFP4',
702                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
703                         'type': 'org-openroadm-interfaces:otnOdu',
704                         'supporting-port': 'CP1-SFP4-P1'}
705         input_dict_2 = {
706             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
707             'rate': 'org-openroadm-otn-common-types:ODU2e',
708             'monitoring-mode': 'terminated'}
709
710         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
711                              response['interface'][0])
712         self.assertDictEqual(dict(input_dict_2,
713                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
714                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
715         self.assertDictEqual(
716             {'payload-type': '03', 'exp-payload-type': '03'},
717             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
718
719     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
720         response = test_utils.check_node_attribute_request(
721             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e:service1-10GE')
722         self.assertEqual(response['status_code'], requests.codes.ok)
723         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:service1-10GE',
724                         'administrative-state': 'inService',
725                         'supporting-circuit-pack-name': 'CP1-CFP0',
726                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
727                         'type': 'org-openroadm-interfaces:otnOdu',
728                         'supporting-port': 'CP1-CFP0-P1'}
729         input_dict_2 = {
730             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
731             'rate': 'org-openroadm-otn-common-types:ODU2e',
732             'monitoring-mode': 'monitored'}
733
734         input_dict_3 = {'trib-port-number': 1}
735
736         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
737                              response['interface'][0])
738         self.assertDictEqual(dict(input_dict_2,
739                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
740                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
741         self.assertDictEqual(dict(input_dict_3,
742                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
743                                       'parent-odu-allocation']),
744                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
745             'parent-odu-allocation'])
746         self.assertIn(1,
747                       response['interface'][0][
748                           'org-openroadm-otn-odu-interfaces:odu'][
749                           'parent-odu-allocation']['trib-slots'])
750
751     def test_38_check_ODU2E_connection_spdrc(self):
752         response = test_utils.check_node_attribute_request(
753             'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
754         self.assertEqual(response['status_code'], requests.codes.ok)
755         input_dict_1 = {
756             'connection-name':
757             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
758             'direction': 'bidirectional'
759         }
760
761         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
762                              response['odu-connection'][0])
763         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:service1-10GE'},
764                              response['odu-connection'][0]['destination'])
765         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e:service1-10GE'},
766                              response['odu-connection'][0]['source'])
767
768     def test_39_check_otn_topo_links(self):
769         response = test_utils.get_ietf_network_request('otn-topology', 'config')
770         self.assertEqual(response['status_code'], requests.codes.ok)
771         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
772         for link in response['network'][0]['ietf-network-topology:link']:
773             linkId = link['link-id']
774             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
775                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
776                 self.assertEqual(
777                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
778                 self.assertEqual(
779                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
780
781     def test_40_check_otn_topo_tp(self):
782         response = test_utils.get_ietf_network_request('otn-topology', 'config')
783         self.assertEqual(response['status_code'], requests.codes.ok)
784         for node in response['network'][0]['node']:
785             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
786                 tpList = node['ietf-network-topology:termination-point']
787                 for tp in tpList:
788                     if tp['tp-id'] == 'XPDR1-NETWORK1':
789                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
790                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
791                         tsPoolList = list(range(1, 9))
792                         self.assertNotIn(
793                             tsPoolList, xpdrTpPortConAt['ts-pool'])
794                         self.assertEqual(
795                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
796                         self.assertNotIn(
797                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
798
799     def test_41_delete_10GE_service(self):
800         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
801         response = test_utils.transportpce_api_rpc_request(
802             'org-openroadm-service', 'service-delete',
803             self.del_serv_input_data)
804         self.assertEqual(response['status_code'], requests.codes.ok)
805         time.sleep(self.WAITING)
806
807     def test_42_check_service_list(self):
808         response = test_utils.get_ordm_serv_list_request()
809         self.assertEqual(response['status_code'], requests.codes.ok)
810         self.assertEqual(len(response['service-list']['services']), 2)
811
812     def test_43_check_no_ODU2e_connection_spdra(self):
813         response = test_utils.check_node_request("SPDR-SA1")
814         self.assertEqual(response['status_code'], requests.codes.ok)
815         self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
816
817     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
818         response = test_utils.check_node_attribute_request(
819             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
820         self.assertEqual(response['status_code'], requests.codes.conflict)
821
822     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
823         response = test_utils.check_node_attribute_request(
824             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
825         self.assertEqual(response['status_code'], requests.codes.conflict)
826
827     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
828         response = test_utils.check_node_attribute_request(
829             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
830         self.assertEqual(response['status_code'], requests.codes.conflict)
831
832     def test_47_check_otn_topo_links(self):
833         response = test_utils.get_ietf_network_request('otn-topology', 'config')
834         self.assertEqual(response['status_code'], requests.codes.ok)
835         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
836         for link in response['network'][0]['ietf-network-topology:link']:
837             linkId = link['link-id']
838             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
839                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
840                 self.assertEqual(
841                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
842                 self.assertEqual(
843                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
844
845     def test_48_check_otn_topo_tp(self):
846         response = test_utils.get_ietf_network_request('otn-topology', 'config')
847         self.assertEqual(response['status_code'], requests.codes.ok)
848         for node in response['network'][0]['node']:
849             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
850                 tpList = node['ietf-network-topology:termination-point']
851                 for tp in tpList:
852                     if tp['tp-id'] == 'XPDR1-NETWORK1':
853                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
854                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
855                         self.assertEqual(
856                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
857
858     def test_49_delete_ODU4_service(self):
859         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
860         response = test_utils.transportpce_api_rpc_request(
861             'org-openroadm-service', 'service-delete',
862             self.del_serv_input_data)
863         self.assertEqual(response['status_code'], requests.codes.ok)
864         time.sleep(self.WAITING)
865
866     def test_50_check_service_list(self):
867         response = test_utils.get_ordm_serv_list_request()
868         self.assertEqual(response['status_code'], requests.codes.ok)
869         self.assertEqual(len(response['service-list']['services']), 1)
870
871     def test_51_check_no_interface_ODU4_spdra(self):
872         response = test_utils.check_node_attribute_request(
873             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
874         self.assertEqual(response['status_code'], requests.codes.conflict)
875
876     def test_52_check_otn_topo_links(self):
877         self.test_22_check_otn_topo_otu4_links()
878
879     def test_53_check_otn_topo_tp(self):
880         response = test_utils.get_ietf_network_request('otn-topology', 'config')
881         self.assertEqual(response['status_code'], requests.codes.ok)
882         for node in response['network'][0]['node']:
883             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
884                 tpList = node['ietf-network-topology:termination-point']
885                 for tp in tpList:
886                     if tp['tp-id'] == 'XPDR1-NETWORK1':
887                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
888                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
889                         self.assertNotIn(
890                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
891
892     def test_54_delete_OCH_OTU4_service(self):
893         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
894         response = test_utils.transportpce_api_rpc_request(
895             'org-openroadm-service', 'service-delete',
896             self.del_serv_input_data)
897         self.assertEqual(response['status_code'], requests.codes.ok)
898         time.sleep(self.WAITING)
899
900     def test_55_get_no_service(self):
901         response = test_utils.get_ordm_serv_list_request()
902         self.assertEqual(response['status_code'], requests.codes.conflict)
903
904     def test_56_check_no_interface_OTU4_spdra(self):
905         response = test_utils.check_node_attribute_request(
906             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
907         self.assertEqual(response['status_code'], requests.codes.conflict)
908
909     def test_57_check_no_interface_OCH_spdra(self):
910         response = test_utils.check_node_attribute_request(
911             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
912         self.assertEqual(response['status_code'], requests.codes.conflict)
913
914     def test_58_getLinks_OtnTopology(self):
915         response = test_utils.get_ietf_network_request('otn-topology', 'config')
916         self.assertEqual(response['status_code'], requests.codes.ok)
917         self.assertNotIn('ietf-network-topology:link', response['network'][0])
918
919     def test_59_check_openroadm_topo_spdra(self):
920         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
921         self.assertEqual(response['status_code'], requests.codes.ok)
922         tp = response['node']['ietf-network-topology:termination-point'][0]
923         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
924         self.assertNotIn('wavelength', dict.keys(
925             tp['org-openroadm-network-topology:xpdr-network-attributes']))
926
927     def test_60_check_openroadm_topo_ROADMA_SRG(self):
928         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
929         self.assertEqual(response['status_code'], requests.codes.ok)
930         freq_map = base64.b64decode(
931             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
932         freq_map_array = [int(x) for x in freq_map]
933         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
934         liste_tp = response['node']['ietf-network-topology:termination-point']
935         for ele in liste_tp:
936             if ele['tp-id'] == 'SRG1-PP1-TXRX':
937                 freq_map = base64.b64decode(
938                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
939                 freq_map_array = [int(x) for x in freq_map]
940                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
941         time.sleep(3)
942
943     def test_61_check_openroadm_topo_ROADMA_DEG(self):
944         response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
945         self.assertEqual(response['status_code'], requests.codes.ok)
946         freq_map = base64.b64decode(
947             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
948         freq_map_array = [int(x) for x in freq_map]
949         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
950         liste_tp = response['node']['ietf-network-topology:termination-point']
951         for ele in liste_tp:
952             if ele['tp-id'] == 'DEG2-CTP-TXRX':
953                 freq_map = base64.b64decode(
954                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
955                 freq_map_array = [int(x) for x in freq_map]
956                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
957             if ele['tp-id'] == 'DEG2-TTP-TXRX':
958                 freq_map = base64.b64decode(
959                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
960                 freq_map_array = [int(x) for x in freq_map]
961                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
962         time.sleep(3)
963
964     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
965         response = test_utils.transportpce_api_rpc_request(
966             'transportpce-networkutils', 'init-xpdr-rdm-links',
967             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
968                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
969         self.assertEqual(response['status_code'], requests.codes.ok)
970
971     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
972         response = test_utils.transportpce_api_rpc_request(
973             'transportpce-networkutils', 'init-rdm-xpdr-links',
974             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
975                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
976         self.assertEqual(response['status_code'], requests.codes.ok)
977
978     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
979         response = test_utils.transportpce_api_rpc_request(
980             'transportpce-networkutils', 'init-xpdr-rdm-links',
981             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
982                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
983         self.assertEqual(response['status_code'], requests.codes.ok)
984
985     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
986         response = test_utils.transportpce_api_rpc_request(
987             'transportpce-networkutils', 'init-rdm-xpdr-links',
988             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
989                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
990         self.assertEqual(response['status_code'], requests.codes.ok)
991
992     def test_66_create_OCH_OTU4_service_2(self):
993         # pylint: disable=line-too-long
994         self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
995         self.cr_serv_input_data["connection-type"] = "infrastructure"
996         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
997         self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
998         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
999         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1000         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1001         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1002         self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1003         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1004         self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1005         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1006         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1007         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1008         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1009         self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1010         response = test_utils.transportpce_api_rpc_request(
1011             'org-openroadm-service', 'service-create',
1012             self.cr_serv_input_data)
1013         self.assertEqual(response['status_code'], requests.codes.ok)
1014         time.sleep(self.WAITING)
1015
1016     def test_67_get_OCH_OTU4_service2(self):
1017         response = test_utils.get_ordm_serv_list_attr_request(
1018             "services", "service2-OCH-OTU4")
1019         self.assertEqual(response['status_code'], requests.codes.ok)
1020         self.assertEqual(
1021             response['services'][0]['administrative-state'], 'inService')
1022         self.assertEqual(
1023             response['services'][0]['service-name'], 'service2-OCH-OTU4')
1024         self.assertEqual(
1025             response['services'][0]['connection-type'], 'infrastructure')
1026         self.assertEqual(
1027             response['services'][0]['lifecycle-state'], 'planned')
1028
1029     def test_68_create_ODU4_service_2(self):
1030         # pylint: disable=line-too-long
1031         self.cr_serv_input_data["service-name"] = "service2-ODU4"
1032         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
1033         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1034         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1035         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1036         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1037         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1038         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1039         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
1040         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1041         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1042         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1043         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1044         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1045         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1046         response = test_utils.transportpce_api_rpc_request(
1047             'org-openroadm-service', 'service-create',
1048             self.cr_serv_input_data)
1049         self.assertEqual(response['status_code'], requests.codes.ok)
1050         time.sleep(self.WAITING)
1051
1052     def test_69_get_ODU4_service2(self):
1053         response = test_utils.get_ordm_serv_list_attr_request(
1054             "services", "service2-ODU4")
1055         self.assertEqual(response['status_code'], requests.codes.ok)
1056         self.assertEqual(
1057             response['services'][0]['administrative-state'], 'inService')
1058         self.assertEqual(
1059             response['services'][0]['service-name'], 'service2-ODU4')
1060         self.assertEqual(
1061             response['services'][0]['connection-type'], 'infrastructure')
1062         self.assertEqual(
1063             response['services'][0]['lifecycle-state'], 'planned')
1064
1065     def test_70_create_1GE_service(self):
1066         # pylint: disable=line-too-long
1067         self.cr_serv_input_data["service-name"] = "service1-1GE"
1068         self.cr_serv_input_data["connection-type"] = "service"
1069         self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
1070         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1071         self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
1072         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1073         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
1074         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1075         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1076         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1077         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1078         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
1079         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1080         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1081         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1082         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1083         response = test_utils.transportpce_api_rpc_request(
1084             'org-openroadm-service', 'service-create',
1085             self.cr_serv_input_data)
1086         self.assertEqual(response['status_code'], requests.codes.ok)
1087         time.sleep(self.WAITING)
1088
1089     def test_71_get_1GE_service1(self):
1090         response = test_utils.get_ordm_serv_list_attr_request(
1091             "services", "service1-1GE")
1092         self.assertEqual(response['status_code'], requests.codes.ok)
1093         self.assertEqual(
1094             response['services'][0]['administrative-state'], 'inService')
1095         self.assertEqual(
1096             response['services'][0]['service-name'], 'service1-1GE')
1097         self.assertEqual(
1098             response['services'][0]['connection-type'], 'service')
1099         self.assertEqual(
1100             response['services'][0]['lifecycle-state'], 'planned')
1101
1102     def test_72_check_interface_1GE_CLIENT_spdra(self):
1103         response = test_utils.check_node_attribute_request(
1104             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1105         self.assertEqual(response['status_code'], requests.codes.ok)
1106         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1107                       'administrative-state': 'inService',
1108                       'supporting-circuit-pack-name': 'CP1-SFP4',
1109                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1110                       'supporting-port': 'CP1-SFP4-P1'
1111                       }
1112         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1113                              response['interface'][0])
1114         self.assertEqual(
1115             1000,
1116             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1117
1118     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1119         response = test_utils.check_node_attribute_request(
1120             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0:service1-1GE')
1121         self.assertEqual(response['status_code'], requests.codes.ok)
1122         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0:service1-1GE',
1123                         'administrative-state': 'inService',
1124                         'supporting-circuit-pack-name': 'CP3-SFP1',
1125                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1126                         'type': 'org-openroadm-interfaces:otnOdu',
1127                         'supporting-port': 'CP3-SFP1-P1'}
1128         input_dict_2 = {
1129             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1130             'rate': 'org-openroadm-otn-common-types:ODU0',
1131             'monitoring-mode': 'terminated'}
1132         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1133                              response['interface'][0])
1134         self.assertDictEqual(dict(input_dict_2,
1135                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1136                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1137         self.assertDictEqual(
1138             {'payload-type': '07', 'exp-payload-type': '07'},
1139             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1140
1141     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1142         response = test_utils.check_node_attribute_request(
1143             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0:service1-1GE')
1144         self.assertEqual(response['status_code'], requests.codes.ok)
1145         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0:service1-1GE',
1146                         'administrative-state': 'inService',
1147                         'supporting-circuit-pack-name': 'CP3-CFP0',
1148                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1149                         'type': 'org-openroadm-interfaces:otnOdu',
1150                         'supporting-port': 'CP3-CFP0-P1'}
1151         input_dict_2 = {
1152             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1153             'rate': 'org-openroadm-otn-common-types:ODU0',
1154             'monitoring-mode': 'monitored'}
1155         input_dict_3 = {'trib-port-number': 1}
1156         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1157                              response['interface'][0])
1158         self.assertDictEqual(dict(input_dict_2,
1159                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1160                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1161         self.assertDictEqual(dict(input_dict_3,
1162                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1163                                       'parent-odu-allocation']),
1164                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1165         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1166                       ['trib-slots'])
1167
1168     def test_75_check_ODU0_connection_spdra(self):
1169         response = test_utils.check_node_attribute_request(
1170             'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1171         self.assertEqual(response['status_code'], requests.codes.ok)
1172         input_dict_1 = {
1173             'connection-name':
1174             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1175             'direction': 'bidirectional'
1176         }
1177         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1178                              response['odu-connection'][0])
1179         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0:service1-1GE'},
1180                              response['odu-connection'][0]['destination'])
1181         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0:service1-1GE'},
1182                              response['odu-connection'][0]['source'])
1183
1184     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1185         response = test_utils.check_node_attribute_request(
1186             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1187         self.assertEqual(response['status_code'], requests.codes.ok)
1188         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1189                       'administrative-state': 'inService',
1190                       'supporting-circuit-pack-name': 'CP3-SFP1',
1191                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1192                       'supporting-port': 'CP3-SFP1-P1'
1193                       }
1194         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1195                              response['interface'][0])
1196         self.assertEqual(
1197             1000,
1198             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1199
1200     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1201         response = test_utils.check_node_attribute_request(
1202             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0:service1-1GE')
1203         self.assertEqual(response['status_code'], requests.codes.ok)
1204         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0:service1-1GE',
1205                         'administrative-state': 'inService',
1206                         'supporting-circuit-pack-name': 'CP3-SFP1',
1207                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1208                         'type': 'org-openroadm-interfaces:otnOdu',
1209                         'supporting-port': 'CP3-SFP1-P1'}
1210         input_dict_2 = {
1211             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1212             'rate': 'org-openroadm-otn-common-types:ODU0',
1213             'monitoring-mode': 'terminated'}
1214         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1215                              response['interface'][0])
1216         self.assertDictEqual(dict(input_dict_2,
1217                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1218                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1219         self.assertDictEqual(
1220             {'payload-type': '07', 'exp-payload-type': '07'},
1221             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1222
1223     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1224         response = test_utils.check_node_attribute_request(
1225             'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0:service1-1GE')
1226         self.assertEqual(response['status_code'], requests.codes.ok)
1227         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0:service1-1GE',
1228                         'administrative-state': 'inService',
1229                         'supporting-circuit-pack-name': 'CP3-CFP0',
1230                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1231                         'type': 'org-openroadm-interfaces:otnOdu',
1232                         'supporting-port': 'CP3-CFP0-P1'}
1233         input_dict_2 = {
1234             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1235             'rate': 'org-openroadm-otn-common-types:ODU0',
1236             'monitoring-mode': 'monitored'}
1237         input_dict_3 = {'trib-port-number': 1}
1238         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1239                              response['interface'][0])
1240         self.assertDictEqual(dict(input_dict_2,
1241                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1242                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1243         self.assertDictEqual(dict(input_dict_3,
1244                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1245                                       'parent-odu-allocation']),
1246                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1247             'parent-odu-allocation'])
1248         self.assertIn(1,
1249                       response['interface'][0][
1250                           'org-openroadm-otn-odu-interfaces:odu'][
1251                           'parent-odu-allocation']['trib-slots'])
1252
1253     def test_79_check_ODU0_connection_spdrc(self):
1254         response = test_utils.check_node_attribute_request(
1255             'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1256         self.assertEqual(response['status_code'], requests.codes.ok)
1257         input_dict_1 = {
1258             'connection-name':
1259             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1260             'direction': 'bidirectional'
1261         }
1262         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1263                              response['odu-connection'][0])
1264         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0:service1-1GE'},
1265                              response['odu-connection'][0]['destination'])
1266         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0:service1-1GE'},
1267                              response['odu-connection'][0]['source'])
1268
1269     def test_80_check_otn_topo_links(self):
1270         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1271         self.assertEqual(response['status_code'], requests.codes.ok)
1272         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1273         for link in response['network'][0]['ietf-network-topology:link']:
1274             linkId = link['link-id']
1275             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1276                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1277                 self.assertEqual(
1278                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1279                 self.assertEqual(
1280                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1281
1282     def test_81_check_otn_topo_tp(self):
1283         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1284         self.assertEqual(response['status_code'], requests.codes.ok)
1285         for node in response['network'][0]['node']:
1286             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1287                 tpList = node['ietf-network-topology:termination-point']
1288                 for tp in tpList:
1289                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1290                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1291                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1292                         tsPoolList = list(range(1, 2))
1293                         self.assertNotIn(
1294                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1295                         self.assertEqual(
1296                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1297                         self.assertNotIn(
1298                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1299
1300     def test_82_delete_1GE_service(self):
1301         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
1302         response = test_utils.transportpce_api_rpc_request(
1303             'org-openroadm-service', 'service-delete',
1304             self.del_serv_input_data)
1305         self.assertEqual(response['status_code'], requests.codes.ok)
1306         time.sleep(self.WAITING)
1307
1308     def test_83_check_service_list(self):
1309         response = test_utils.get_ordm_serv_list_request()
1310         self.assertEqual(response['status_code'], requests.codes.ok)
1311         self.assertEqual(len(response['service-list']['services']), 2)
1312
1313     def test_84_check_no_ODU0_connection_spdra(self):
1314         response = test_utils.check_node_request("SPDR-SA1")
1315         self.assertEqual(response['status_code'], requests.codes.ok)
1316         self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
1317
1318     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1319         response = test_utils.check_node_attribute_request(
1320             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0:service1-1GE')
1321         self.assertEqual(response['status_code'], requests.codes.conflict)
1322
1323     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1324         response = test_utils.check_node_attribute_request(
1325             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0:service1-1GE')
1326         self.assertEqual(response['status_code'], requests.codes.conflict)
1327
1328     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1329         response = test_utils.check_node_attribute_request(
1330             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1331         self.assertEqual(response['status_code'], requests.codes.conflict)
1332
1333     def test_88_check_otn_topo_links(self):
1334         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1335         self.assertEqual(response['status_code'], requests.codes.ok)
1336         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1337         for link in response['network'][0]['ietf-network-topology:link']:
1338             linkId = link['link-id']
1339             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1340                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1341                 self.assertEqual(
1342                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1343                 self.assertEqual(
1344                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1345
1346     def test_89_check_otn_topo_tp(self):
1347         response = test_utils.get_ietf_network_request('otn-topology', 'config')
1348         self.assertEqual(response['status_code'], requests.codes.ok)
1349         for node in response['network'][0]['node']:
1350             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1351                 tpList = node['ietf-network-topology:termination-point']
1352                 for tp in tpList:
1353                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1354                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1355                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1356                         self.assertEqual(
1357                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1358
1359     def test_90_delete_ODU4_service(self):
1360         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
1361         response = test_utils.transportpce_api_rpc_request(
1362             'org-openroadm-service', 'service-delete',
1363             self.del_serv_input_data)
1364         self.assertEqual(response['status_code'], requests.codes.ok)
1365         time.sleep(self.WAITING)
1366
1367     def test_91_delete_OCH_OTU4_service(self):
1368         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
1369         response = test_utils.transportpce_api_rpc_request(
1370             'org-openroadm-service', 'service-delete',
1371             self.del_serv_input_data)
1372         self.assertEqual(response['status_code'], requests.codes.ok)
1373         time.sleep(self.WAITING)
1374
1375     def test_92_disconnect_xponders_from_roadm(self):
1376         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1377         self.assertEqual(response['status_code'], requests.codes.ok)
1378         links = response['network'][0]['ietf-network-topology:link']
1379         for link in links:
1380             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1381                 response = test_utils.del_ietf_network_link_request(
1382                     'openroadm-topology', link['link-id'], 'config')
1383                 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1384
1385     def test_93_check_openroadm_topology(self):
1386         response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1387         self.assertEqual(response['status_code'], requests.codes.ok)
1388         self.assertEqual(18,
1389                          len(response['network'][0]['ietf-network-topology:link']),
1390                          'Topology should contain 18 links')
1391
1392     def test_94_disconnect_spdrA(self):
1393         response = test_utils.unmount_device("SPDR-SA1")
1394         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1395
1396     def test_95_disconnect_spdrC(self):
1397         response = test_utils.unmount_device("SPDR-SC1")
1398         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1399
1400     def test_96_disconnect_roadmA(self):
1401         response = test_utils.unmount_device("ROADM-A1")
1402         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1403
1404     def test_97_disconnect_roadmC(self):
1405         response = test_utils.unmount_device("ROADM-C1")
1406         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1407
1408
1409 if __name__ == "__main__":
1410     unittest.main(verbosity=2)