Migrate OTN E2E functional tests to RFC8040 step 5
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26 import test_utils_rfc8040  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION = '2.2.1'
34
35     cr_serv_input_data = {
36         "sdnc-request-header": {
37             "request-id": "request-1",
38             "rpc-action": "service-create",
39             "request-system-id": "appname"
40         },
41         "service-name": "service1-OCH-OTU4",
42         "common-id": "commonId",
43         "connection-type": "infrastructure",
44         "service-a-end": {
45             "service-rate": "100",
46             "node-id": "SPDR-SA1",
47             "service-format": "OTU",
48             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
49             "clli": "NodeSA",
50             "tx-direction": [{
51                 "port": {
52                     "port-device-name": "SPDR-SA1-XPDR1",
53                     "port-type": "fixed",
54                     "port-name": "XPDR1-NETWORK1",
55                     "port-rack": "000000.00",
56                     "port-shelf": "Chassis#1"
57                 },
58                 "lgx": {
59                     "lgx-device-name": "Some lgx-device-name",
60                     "lgx-port-name": "Some lgx-port-name",
61                     "lgx-port-rack": "000000.00",
62                     "lgx-port-shelf": "00"
63                 },
64                 "index": 0
65             }],
66             "rx-direction": [{
67                 "port": {
68                     "port-device-name": "SPDR-SA1-XPDR1",
69                     "port-type": "fixed",
70                     "port-name": "XPDR1-NETWORK1",
71                     "port-rack": "000000.00",
72                     "port-shelf": "Chassis#1"
73                 },
74                 "lgx": {
75                     "lgx-device-name": "Some lgx-device-name",
76                     "lgx-port-name": "Some lgx-port-name",
77                     "lgx-port-rack": "000000.00",
78                     "lgx-port-shelf": "00"
79                 },
80                 "index": 0
81             }],
82             "optic-type": "gray"
83         },
84         "service-z-end": {
85             "service-rate": "100",
86             "node-id": "SPDR-SC1",
87             "service-format": "OTU",
88             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89             "clli": "NodeSC",
90             "tx-direction": [{
91                 "port": {
92                     "port-device-name": "SPDR-SC1-XPDR1",
93                     "port-type": "fixed",
94                     "port-name": "XPDR1-NETWORK1",
95                     "port-rack": "000000.00",
96                     "port-shelf": "Chassis#1"
97                 },
98                 "lgx": {
99                     "lgx-device-name": "Some lgx-device-name",
100                     "lgx-port-name": "Some lgx-port-name",
101                     "lgx-port-rack": "000000.00",
102                     "lgx-port-shelf": "00"
103                 },
104                 "index": 0
105             }],
106             "rx-direction": [{
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 },
120                 "index": 0
121             }],
122             "optic-type": "gray"
123         },
124         "due-date": "2018-06-15T00:00:01Z",
125         "operator-contact": "pw1234"
126     }
127
128     del_serv_input_data = {
129         "sdnc-request-header": {
130             "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
131             "rpc-action": "service-delete",
132             "request-system-id": "appname",
133             "notification-url": "http://localhost:8585/NotificationServer/notify"},
134         "service-delete-req-info": {
135             "service-name": "TBD",
136             "tail-retention": "no"}
137     }
138
139     @classmethod
140     def setUpClass(cls):
141         cls.processes = test_utils_rfc8040.start_tpce()
142         cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
143                                                        ('roadma', cls.NODE_VERSION),
144                                                        ('roadmc', cls.NODE_VERSION),
145                                                        ('spdrc', cls.NODE_VERSION)])
146
147     @classmethod
148     def tearDownClass(cls):
149         # pylint: disable=not-an-iterable
150         for process in cls.processes:
151             test_utils_rfc8040.shutdown_process(process)
152         print("all processes killed")
153
154     def setUp(self):
155         time.sleep(5)
156
157     def test_01_connect_spdrA(self):
158         response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
159         self.assertEqual(response.status_code,
160                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
161
162     def test_02_connect_spdrC(self):
163         response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
164         self.assertEqual(response.status_code,
165                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
166
167     def test_03_connect_rdmA(self):
168         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
169         self.assertEqual(response.status_code,
170                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
171
172     def test_04_connect_rdmC(self):
173         response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
174         self.assertEqual(response.status_code,
175                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
176
177     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
178         response = test_utils_rfc8040.transportpce_api_rpc_request(
179             'transportpce-networkutils', 'init-xpdr-rdm-links',
180             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
181                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
182         self.assertEqual(response['status_code'], requests.codes.ok)
183
184     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
185         response = test_utils_rfc8040.transportpce_api_rpc_request(
186             'transportpce-networkutils', 'init-rdm-xpdr-links',
187             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
188                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
189         self.assertEqual(response['status_code'], requests.codes.ok)
190
191     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
192         response = test_utils_rfc8040.transportpce_api_rpc_request(
193             'transportpce-networkutils', 'init-xpdr-rdm-links',
194             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
195                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
196         self.assertEqual(response['status_code'], requests.codes.ok)
197
198     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
199         response = test_utils_rfc8040.transportpce_api_rpc_request(
200             'transportpce-networkutils', 'init-rdm-xpdr-links',
201             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
202                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
203         self.assertEqual(response['status_code'], requests.codes.ok)
204
205     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
206         # Config ROADMA-ROADMC oms-attributes
207         data = {"span": {
208             "auto-spanloss": "true",
209             "spanloss-base": 11.4,
210             "spanloss-current": 12,
211             "engineered-spanloss": 12.2,
212             "link-concatenation": [{
213                 "SRLG-Id": 0,
214                 "fiber-type": "smf",
215                 "SRLG-length": 100000,
216                 "pmd": 0.5}]}}
217         response = test_utils_rfc8040.add_oms_attr_request(
218             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
219         self.assertEqual(response.status_code, requests.codes.created)
220
221     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
222         # Config ROADMC-ROADMA oms-attributes
223         data = {"span": {
224             "auto-spanloss": "true",
225             "spanloss-base": 11.4,
226             "spanloss-current": 12,
227             "engineered-spanloss": 12.2,
228             "link-concatenation": [{
229                 "SRLG-Id": 0,
230                 "fiber-type": "smf",
231                 "SRLG-length": 100000,
232                 "pmd": 0.5}]}}
233         response = test_utils_rfc8040.add_oms_attr_request(
234             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
235         self.assertEqual(response.status_code, requests.codes.created)
236
237 # test service-create for OCH-OTU4 service from spdr to spdr
238     def test_11_check_otn_topology(self):
239         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
240         self.assertEqual(response['status_code'], requests.codes.ok)
241         self.assertEqual(len(response['network'][0]['node']), 6)
242         self.assertNotIn('ietf-network-topology:link', response['network'][0])
243
244     def test_12_create_OCH_OTU4_service(self):
245         response = test_utils_rfc8040.transportpce_api_rpc_request(
246             'org-openroadm-service', 'service-create',
247             self.cr_serv_input_data)
248         self.assertEqual(response['status_code'], requests.codes.ok)
249         time.sleep(self.WAITING)
250
251     def test_13_get_OCH_OTU4_service1(self):
252         response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
253             "services", "service1-OCH-OTU4")
254         self.assertEqual(response['status_code'], requests.codes.ok)
255         self.assertEqual(
256             response['services'][0]['administrative-state'], 'inService')
257         self.assertEqual(
258             response['services'][0]['service-name'], 'service1-OCH-OTU4')
259         self.assertEqual(
260             response['services'][0]['connection-type'], 'infrastructure')
261         self.assertEqual(
262             response['services'][0]['lifecycle-state'], 'planned')
263
264     # Check correct configuration of devices
265     def test_14_check_interface_och_spdra(self):
266         response = test_utils_rfc8040.check_node_attribute_request(
267             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
268         self.assertEqual(response['status_code'], requests.codes.ok)
269         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
270                                    'administrative-state': 'inService',
271                                    'supporting-circuit-pack-name': 'CP1-CFP0',
272                                    'type': 'org-openroadm-interfaces:opticalChannel',
273                                    'supporting-port': 'CP1-CFP0-P1'
274                                    }, **response['interface'][0]),
275                              response['interface'][0])
276         self.assertEqual('org-openroadm-common-types:R100G',
277                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
278         self.assertEqual('dp-qpsk',
279                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
280         self.assertEqual(196.1,
281                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
282         self.assertEqual(
283             -5,
284             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
285
286     def test_15_check_interface_OTU4_spdra(self):
287         response = test_utils_rfc8040.check_node_attribute_request(
288             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
289         self.assertEqual(response['status_code'], requests.codes.ok)
290         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
291                         'administrative-state': 'inService',
292                         'supporting-circuit-pack-name': 'CP1-CFP0',
293                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
294                         'type': 'org-openroadm-interfaces:otnOtu',
295                         'supporting-port': 'CP1-CFP0-P1'
296                         }
297         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
298                         'expected-dapi': 'H/OelLynehI=',
299                         'tx-dapi': 'AMf1n5hK6Xkk',
300                         'expected-sapi': 'AMf1n5hK6Xkk',
301                         'rate': 'org-openroadm-otn-common-types:OTU4',
302                         'fec': 'scfec'
303                         }
304         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
305                              response['interface'][0])
306         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
307                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
308
309     def test_16_check_interface_och_spdrc(self):
310         response = test_utils_rfc8040.check_node_attribute_request(
311             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
312         self.assertEqual(response['status_code'], requests.codes.ok)
313         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
314                                    'administrative-state': 'inService',
315                                    'supporting-circuit-pack-name': 'CP1-CFP0',
316                                    'type': 'org-openroadm-interfaces:opticalChannel',
317                                    'supporting-port': 'CP1-CFP0-P1'
318                                    }, **response['interface'][0]),
319                              response['interface'][0])
320         self.assertEqual('org-openroadm-common-types:R100G',
321                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
322         self.assertEqual('dp-qpsk',
323                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
324         self.assertEqual(196.1,
325                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
326         self.assertEqual(
327             -5,
328             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
329
330     def test_17_check_interface_OTU4_spdrc(self):
331         response = test_utils_rfc8040.check_node_attribute_request(
332             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
333         self.assertEqual(response['status_code'], requests.codes.ok)
334         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
335                         'administrative-state': 'inService',
336                         'supporting-circuit-pack-name': 'CP1-CFP0',
337                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
338                         'type': 'org-openroadm-interfaces:otnOtu',
339                         'supporting-port': 'CP1-CFP0-P1'
340                         }
341         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
342                         'expected-sapi': 'H/OelLynehI=',
343                         'tx-sapi': 'AMf1n5hK6Xkk',
344                         'expected-dapi': 'AMf1n5hK6Xkk',
345                         'rate': 'org-openroadm-otn-common-types:OTU4',
346                         'fec': 'scfec'
347                         }
348         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
349                              response['interface'][0])
350         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
351                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
352
353     def test_18_check_no_interface_ODU4_spdra(self):
354         response = test_utils_rfc8040.check_node_attribute_request(
355             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
356         self.assertEqual(response['status_code'], requests.codes.conflict)
357
358     def test_19_check_openroadm_topo_spdra(self):
359         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
360         self.assertEqual(response['status_code'], requests.codes.ok)
361         ele = response['node']['ietf-network-topology:termination-point'][0]
362         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
363         self.assertEqual(
364             196.1,
365             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
366         self.assertEqual(
367             40,
368             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
369
370     def test_20_check_openroadm_topo_ROADMA_SRG(self):
371         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
372         self.assertEqual(response['status_code'], requests.codes.ok)
373         freq_map = base64.b64decode(
374             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
375         freq_map_array = [int(x) for x in freq_map]
376         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
377         liste_tp = response['node']['ietf-network-topology:termination-point']
378         for ele in liste_tp:
379             if ele['tp-id'] == 'SRG1-PP1-TXRX':
380                 freq_map = base64.b64decode(
381                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
382                 freq_map_array = [int(x) for x in freq_map]
383                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
384             if ele['tp-id'] == 'SRG1-PP2-TXRX':
385                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
386
387     def test_21_check_openroadm_topo_ROADMA_DEG(self):
388         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
389         self.assertEqual(response['status_code'], requests.codes.ok)
390         freq_map = base64.b64decode(
391             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
392         freq_map_array = [int(x) for x in freq_map]
393         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
394         liste_tp = response['node']['ietf-network-topology:termination-point']
395         for ele in liste_tp:
396             if ele['tp-id'] == 'DEG2-CTP-TXRX':
397                 freq_map = base64.b64decode(
398                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
399                 freq_map_array = [int(x) for x in freq_map]
400                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
401             if ele['tp-id'] == 'DEG2-TTP-TXRX':
402                 freq_map = base64.b64decode(
403                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
404                 freq_map_array = [int(x) for x in freq_map]
405                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
406
407     def test_22_check_otn_topo_otu4_links(self):
408         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
409         self.assertEqual(response['status_code'], requests.codes.ok)
410         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
411         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
412                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
413         for link in response['network'][0]['ietf-network-topology:link']:
414             self.assertIn(link['link-id'], listLinkId)
415             self.assertEqual(
416                 link['transportpce-topology:otn-link-type'], 'OTU4')
417             self.assertEqual(
418                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
419             self.assertEqual(
420                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
421             self.assertEqual(
422                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
423             self.assertIn(
424                 link['org-openroadm-common-network:opposite-link'], listLinkId)
425
426 # test service-create for ODU4 service from spdr to spdr
427     def test_23_create_ODU4_service(self):
428         self.cr_serv_input_data["service-name"] = "service1-ODU4"
429         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
430         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
431         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
432         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
433         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
434         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
435
436         response = test_utils_rfc8040.transportpce_api_rpc_request(
437             'org-openroadm-service', 'service-create',
438             self.cr_serv_input_data)
439         self.assertEqual(response['status_code'], requests.codes.ok)
440         time.sleep(self.WAITING)
441
442     def test_24_get_ODU4_service1(self):
443         response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
444             "services", "service1-ODU4")
445         self.assertEqual(response['status_code'], requests.codes.ok)
446         self.assertEqual(
447             response['services'][0]['administrative-state'], 'inService')
448         self.assertEqual(
449             response['services'][0]['service-name'], 'service1-ODU4')
450         self.assertEqual(
451             response['services'][0]['connection-type'], 'infrastructure')
452         self.assertEqual(
453             response['services'][0]['lifecycle-state'], 'planned')
454
455     def test_25_check_interface_ODU4_spdra(self):
456         response = test_utils_rfc8040.check_node_attribute_request(
457             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
458         self.assertEqual(response['status_code'], requests.codes.ok)
459         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
460                         'administrative-state': 'inService',
461                         'supporting-circuit-pack-name': 'CP1-CFP0',
462                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
463                         'type': 'org-openroadm-interfaces:otnOdu',
464                         'supporting-port': 'CP1-CFP0-P1',
465                         'circuit-id': 'TBD',
466                         'description': 'TBD'}
467         # SAPI/DAPI are added in the Otu4 renderer
468         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
469                         'rate': 'org-openroadm-otn-common-types:ODU4',
470                         'monitoring-mode': 'terminated',
471                         'expected-dapi': 'H/OelLynehI=',
472                         'expected-sapi': 'AMf1n5hK6Xkk',
473                         'tx-dapi': 'AMf1n5hK6Xkk',
474                         'tx-sapi': 'H/OelLynehI='}
475
476         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
477                              response['interface'][0])
478         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
479                                   **input_dict_2),
480                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
481                              )
482         self.assertDictEqual(
483             {'payload-type': '21', 'exp-payload-type': '21'},
484             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
485
486     def test_26_check_interface_ODU4_spdrc(self):
487         response = test_utils_rfc8040.check_node_attribute_request(
488             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
489         self.assertEqual(response['status_code'], requests.codes.ok)
490         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
491                         'administrative-state': 'inService',
492                         'supporting-circuit-pack-name': 'CP1-CFP0',
493                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
494                         'type': 'org-openroadm-interfaces:otnOdu',
495                         'supporting-port': 'CP1-CFP0-P1',
496                         'circuit-id': 'TBD',
497                         'description': 'TBD'}
498         # SAPI/DAPI are added in the Otu4 renderer
499         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
500                         'rate': 'org-openroadm-otn-common-types:ODU4',
501                         'monitoring-mode': 'terminated',
502                         'tx-sapi': 'AMf1n5hK6Xkk',
503                         'tx-dapi': 'H/OelLynehI=',
504                         'expected-sapi': 'H/OelLynehI=',
505                         'expected-dapi': 'AMf1n5hK6Xkk'
506                         }
507         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
508                              response['interface'][0])
509         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
510                                   **input_dict_2),
511                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
512                              )
513         self.assertDictEqual(
514             {'payload-type': '21', 'exp-payload-type': '21'},
515             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
516
517     def test_27_check_otn_topo_links(self):
518         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
519         self.assertEqual(response['status_code'], requests.codes.ok)
520         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
521         for link in response['network'][0]['ietf-network-topology:link']:
522             linkId = link['link-id']
523             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
524                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
525                 self.assertEqual(
526                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
527                 self.assertEqual(
528                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
529             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
530                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
531                 self.assertEqual(
532                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
533                 self.assertEqual(
534                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
535                 self.assertEqual(
536                     link['transportpce-topology:otn-link-type'], 'ODTU4')
537                 self.assertEqual(
538                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
539                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
540                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
541                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
542             else:
543                 self.fail("this link should not exist")
544
545     def test_28_check_otn_topo_tp(self):
546         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
547         self.assertEqual(response['status_code'], requests.codes.ok)
548         for node in response['network'][0]['node']:
549             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
550                 tpList = node['ietf-network-topology:termination-point']
551                 for tp in tpList:
552                     if tp['tp-id'] == 'XPDR1-NETWORK1':
553                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
554                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
555                         self.assertEqual(
556                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
557                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
558                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
559
560 # test service-create for 10GE service from spdr to spdr
561     def test_29_create_10GE_service(self):
562         self.cr_serv_input_data["service-name"] = "service1-10GE"
563         self.cr_serv_input_data["connection-type"] = "service"
564         self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
565         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
566         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
567         self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
568         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
569         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
570         self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
571         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
572         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
573         self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
574         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
575         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
576         response = test_utils_rfc8040.transportpce_api_rpc_request(
577             'org-openroadm-service', 'service-create',
578             self.cr_serv_input_data)
579         self.assertEqual(response['status_code'], requests.codes.ok)
580         time.sleep(self.WAITING)
581
582     def test_30_get_10GE_service1(self):
583         response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
584             "services", "service1-10GE")
585         self.assertEqual(response['status_code'], requests.codes.ok)
586         self.assertEqual(
587             response['services'][0]['administrative-state'], 'inService')
588         self.assertEqual(
589             response['services'][0]['service-name'], 'service1-10GE')
590         self.assertEqual(
591             response['services'][0]['connection-type'], 'service')
592         self.assertEqual(
593             response['services'][0]['lifecycle-state'], 'planned')
594
595     def test_31_check_interface_10GE_CLIENT_spdra(self):
596         response = test_utils_rfc8040.check_node_attribute_request(
597             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
598         self.assertEqual(response['status_code'], requests.codes.ok)
599         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
600                       'administrative-state': 'inService',
601                       'supporting-circuit-pack-name': 'CP1-SFP4',
602                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
603                       'supporting-port': 'CP1-SFP4-P1'
604                       }
605         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
606                              response['interface'][0])
607         self.assertEqual(
608             10000,
609             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
610
611     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
612         response = test_utils_rfc8040.check_node_attribute_request(
613             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
614         self.assertEqual(response['status_code'], requests.codes.ok)
615         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
616                         'administrative-state': 'inService',
617                         'supporting-circuit-pack-name': 'CP1-SFP4',
618                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
619                         'type': 'org-openroadm-interfaces:otnOdu',
620                         'supporting-port': 'CP1-SFP4-P1'}
621         input_dict_2 = {
622             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
623             'rate': 'org-openroadm-otn-common-types:ODU2e',
624             'monitoring-mode': 'terminated'}
625
626         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
627                              response['interface'][0])
628         self.assertDictEqual(dict(input_dict_2,
629                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
630                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
631         self.assertDictEqual(
632             {'payload-type': '03', 'exp-payload-type': '03'},
633             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
634
635     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
636         response = test_utils_rfc8040.check_node_attribute_request(
637             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
638         self.assertEqual(response['status_code'], requests.codes.ok)
639         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
640                         'administrative-state': 'inService',
641                         'supporting-circuit-pack-name': 'CP1-CFP0',
642                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
643                         'type': 'org-openroadm-interfaces:otnOdu',
644                         'supporting-port': 'CP1-CFP0-P1'}
645         input_dict_2 = {
646             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
647             'rate': 'org-openroadm-otn-common-types:ODU2e',
648             'monitoring-mode': 'monitored'}
649         input_dict_3 = {'trib-port-number': 1}
650
651         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
652                              response['interface'][0])
653         self.assertDictEqual(dict(input_dict_2,
654                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656         self.assertDictEqual(dict(input_dict_3,
657                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
658                                       'parent-odu-allocation']),
659                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
660         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
661                       ['trib-slots'])
662
663     def test_34_check_ODU2E_connection_spdra(self):
664         response = test_utils_rfc8040.check_node_attribute_request(
665             'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
666         self.assertEqual(response['status_code'], requests.codes.ok)
667         input_dict_1 = {
668             'connection-name':
669             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
670             'direction': 'bidirectional'
671         }
672
673         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
674                              response['odu-connection'][0])
675         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
676                              response['odu-connection'][0]['destination'])
677         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
678                              response['odu-connection'][0]['source'])
679
680     def test_35_check_interface_10GE_CLIENT_spdrc(self):
681         response = test_utils_rfc8040.check_node_attribute_request(
682             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
683         self.assertEqual(response['status_code'], requests.codes.ok)
684         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
685                       'administrative-state': 'inService',
686                       'supporting-circuit-pack-name': 'CP1-SFP4',
687                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
688                       'supporting-port': 'CP1-SFP4-P1'
689                       }
690         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
691                              response['interface'][0])
692         self.assertEqual(
693             10000,
694             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
695
696     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
697         response = test_utils_rfc8040.check_node_attribute_request(
698             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
699         self.assertEqual(response['status_code'], requests.codes.ok)
700         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
701                         'administrative-state': 'inService',
702                         'supporting-circuit-pack-name': 'CP1-SFP4',
703                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
704                         'type': 'org-openroadm-interfaces:otnOdu',
705                         'supporting-port': 'CP1-SFP4-P1'}
706         input_dict_2 = {
707             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
708             'rate': 'org-openroadm-otn-common-types:ODU2e',
709             'monitoring-mode': 'terminated'}
710
711         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
712                              response['interface'][0])
713         self.assertDictEqual(dict(input_dict_2,
714                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
715                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
716         self.assertDictEqual(
717             {'payload-type': '03', 'exp-payload-type': '03'},
718             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
719
720     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
721         response = test_utils_rfc8040.check_node_attribute_request(
722             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
723         self.assertEqual(response['status_code'], requests.codes.ok)
724         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
725                         'administrative-state': 'inService',
726                         'supporting-circuit-pack-name': 'CP1-CFP0',
727                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
728                         'type': 'org-openroadm-interfaces:otnOdu',
729                         'supporting-port': 'CP1-CFP0-P1'}
730         input_dict_2 = {
731             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
732             'rate': 'org-openroadm-otn-common-types:ODU2e',
733             'monitoring-mode': 'monitored'}
734
735         input_dict_3 = {'trib-port-number': 1}
736
737         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
738                              response['interface'][0])
739         self.assertDictEqual(dict(input_dict_2,
740                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
741                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
742         self.assertDictEqual(dict(input_dict_3,
743                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
744                                       'parent-odu-allocation']),
745                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
746             'parent-odu-allocation'])
747         self.assertIn(1,
748                       response['interface'][0][
749                           'org-openroadm-otn-odu-interfaces:odu'][
750                           'parent-odu-allocation']['trib-slots'])
751
752     def test_38_check_ODU2E_connection_spdrc(self):
753         response = test_utils_rfc8040.check_node_attribute_request(
754             'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
755         self.assertEqual(response['status_code'], requests.codes.ok)
756         input_dict_1 = {
757             'connection-name':
758             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
759             'direction': 'bidirectional'
760         }
761
762         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
763                              response['odu-connection'][0])
764         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
765                              response['odu-connection'][0]['destination'])
766         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
767                              response['odu-connection'][0]['source'])
768
769     def test_39_check_otn_topo_links(self):
770         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
771         self.assertEqual(response['status_code'], requests.codes.ok)
772         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
773         for link in response['network'][0]['ietf-network-topology:link']:
774             linkId = link['link-id']
775             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
776                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
777                 self.assertEqual(
778                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
779                 self.assertEqual(
780                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
781
782     def test_40_check_otn_topo_tp(self):
783         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
784         self.assertEqual(response['status_code'], requests.codes.ok)
785         for node in response['network'][0]['node']:
786             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
787                 tpList = node['ietf-network-topology:termination-point']
788                 for tp in tpList:
789                     if tp['tp-id'] == 'XPDR1-NETWORK1':
790                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
791                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
792                         tsPoolList = list(range(1, 9))
793                         self.assertNotIn(
794                             tsPoolList, xpdrTpPortConAt['ts-pool'])
795                         self.assertEqual(
796                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
797                         self.assertNotIn(
798                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
799
800     def test_41_delete_10GE_service(self):
801         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
802         response = test_utils_rfc8040.transportpce_api_rpc_request(
803             'org-openroadm-service', 'service-delete',
804             self.del_serv_input_data)
805         self.assertEqual(response['status_code'], requests.codes.ok)
806         time.sleep(self.WAITING)
807
808     def test_42_check_service_list(self):
809         response = test_utils_rfc8040.get_ordm_serv_list_request()
810         self.assertEqual(response['status_code'], requests.codes.ok)
811         self.assertEqual(len(response['service-list']['services']), 2)
812
813     def test_43_check_no_ODU2e_connection_spdra(self):
814         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
815         self.assertEqual(response.status_code, requests.codes.ok)
816         res = response.json()
817         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
818         time.sleep(1)
819
820     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
821         response = test_utils_rfc8040.check_node_attribute_request(
822             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
823         self.assertEqual(response['status_code'], requests.codes.conflict)
824
825     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
826         response = test_utils_rfc8040.check_node_attribute_request(
827             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
828         self.assertEqual(response['status_code'], requests.codes.conflict)
829
830     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
831         response = test_utils_rfc8040.check_node_attribute_request(
832             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
833         self.assertEqual(response['status_code'], requests.codes.conflict)
834
835     def test_47_check_otn_topo_links(self):
836         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
837         self.assertEqual(response['status_code'], requests.codes.ok)
838         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
839         for link in response['network'][0]['ietf-network-topology:link']:
840             linkId = link['link-id']
841             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
842                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
843                 self.assertEqual(
844                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
845                 self.assertEqual(
846                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
847
848     def test_48_check_otn_topo_tp(self):
849         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
850         self.assertEqual(response['status_code'], requests.codes.ok)
851         for node in response['network'][0]['node']:
852             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
853                 tpList = node['ietf-network-topology:termination-point']
854                 for tp in tpList:
855                     if tp['tp-id'] == 'XPDR1-NETWORK1':
856                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
857                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
858                         self.assertEqual(
859                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
860
861     def test_49_delete_ODU4_service(self):
862         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
863         response = test_utils_rfc8040.transportpce_api_rpc_request(
864             'org-openroadm-service', 'service-delete',
865             self.del_serv_input_data)
866         self.assertEqual(response['status_code'], requests.codes.ok)
867         time.sleep(self.WAITING)
868
869     def test_50_check_service_list(self):
870         response = test_utils_rfc8040.get_ordm_serv_list_request()
871         self.assertEqual(response['status_code'], requests.codes.ok)
872         self.assertEqual(len(response['service-list']['services']), 1)
873
874     def test_51_check_no_interface_ODU4_spdra(self):
875         response = test_utils_rfc8040.check_node_attribute_request(
876             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
877         self.assertEqual(response['status_code'], requests.codes.conflict)
878
879     def test_52_check_otn_topo_links(self):
880         self.test_22_check_otn_topo_otu4_links()
881
882     def test_53_check_otn_topo_tp(self):
883         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
884         self.assertEqual(response['status_code'], requests.codes.ok)
885         for node in response['network'][0]['node']:
886             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
887                 tpList = node['ietf-network-topology:termination-point']
888                 for tp in tpList:
889                     if tp['tp-id'] == 'XPDR1-NETWORK1':
890                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
891                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
892                         self.assertNotIn(
893                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
894
895     def test_54_delete_OCH_OTU4_service(self):
896         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
897         response = test_utils_rfc8040.transportpce_api_rpc_request(
898             'org-openroadm-service', 'service-delete',
899             self.del_serv_input_data)
900         self.assertEqual(response['status_code'], requests.codes.ok)
901         time.sleep(self.WAITING)
902
903     def test_55_get_no_service(self):
904         response = test_utils_rfc8040.get_ordm_serv_list_request()
905         self.assertEqual(response['status_code'], requests.codes.conflict)
906
907     def test_56_check_no_interface_OTU4_spdra(self):
908         response = test_utils_rfc8040.check_node_attribute_request(
909             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
910         self.assertEqual(response['status_code'], requests.codes.conflict)
911
912     def test_57_check_no_interface_OCH_spdra(self):
913         response = test_utils_rfc8040.check_node_attribute_request(
914             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
915         self.assertEqual(response['status_code'], requests.codes.conflict)
916
917     def test_58_getLinks_OtnTopology(self):
918         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
919         self.assertEqual(response['status_code'], requests.codes.ok)
920         self.assertNotIn('ietf-network-topology:link', response['network'][0])
921
922     def test_59_check_openroadm_topo_spdra(self):
923         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
924         self.assertEqual(response['status_code'], requests.codes.ok)
925         tp = response['node']['ietf-network-topology:termination-point'][0]
926         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
927         self.assertNotIn('wavelength', dict.keys(
928             tp['org-openroadm-network-topology:xpdr-network-attributes']))
929
930     def test_60_check_openroadm_topo_ROADMA_SRG(self):
931         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
932         self.assertEqual(response['status_code'], requests.codes.ok)
933         freq_map = base64.b64decode(
934             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
935         freq_map_array = [int(x) for x in freq_map]
936         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
937         liste_tp = response['node']['ietf-network-topology:termination-point']
938         for ele in liste_tp:
939             if ele['tp-id'] == 'SRG1-PP1-TXRX':
940                 freq_map = base64.b64decode(
941                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
942                 freq_map_array = [int(x) for x in freq_map]
943                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
944         time.sleep(3)
945
946     def test_61_check_openroadm_topo_ROADMA_DEG(self):
947         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
948         self.assertEqual(response['status_code'], requests.codes.ok)
949         freq_map = base64.b64decode(
950             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
951         freq_map_array = [int(x) for x in freq_map]
952         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
953         liste_tp = response['node']['ietf-network-topology:termination-point']
954         for ele in liste_tp:
955             if ele['tp-id'] == 'DEG2-CTP-TXRX':
956                 freq_map = base64.b64decode(
957                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
958                 freq_map_array = [int(x) for x in freq_map]
959                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
960             if ele['tp-id'] == 'DEG2-TTP-TXRX':
961                 freq_map = base64.b64decode(
962                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
963                 freq_map_array = [int(x) for x in freq_map]
964                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
965         time.sleep(3)
966
967     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
968         response = test_utils_rfc8040.transportpce_api_rpc_request(
969             'transportpce-networkutils', 'init-xpdr-rdm-links',
970             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
971                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
972         self.assertEqual(response['status_code'], requests.codes.ok)
973
974     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
975         response = test_utils_rfc8040.transportpce_api_rpc_request(
976             'transportpce-networkutils', 'init-rdm-xpdr-links',
977             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
978                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
979         self.assertEqual(response['status_code'], requests.codes.ok)
980
981     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
982         response = test_utils_rfc8040.transportpce_api_rpc_request(
983             'transportpce-networkutils', 'init-xpdr-rdm-links',
984             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
985                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
986         self.assertEqual(response['status_code'], requests.codes.ok)
987
988     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
989         response = test_utils_rfc8040.transportpce_api_rpc_request(
990             'transportpce-networkutils', 'init-rdm-xpdr-links',
991             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
992                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
993         self.assertEqual(response['status_code'], requests.codes.ok)
994
995     def test_66_create_OCH_OTU4_service_2(self):
996         # pylint: disable=line-too-long
997         self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
998         self.cr_serv_input_data["connection-type"] = "infrastructure"
999         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
1000         self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
1001         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1002         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1003         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1004         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1005         self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1006         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1007         self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1008         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1009         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1010         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1011         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1012         self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1013         response = test_utils_rfc8040.transportpce_api_rpc_request(
1014             'org-openroadm-service', 'service-create',
1015             self.cr_serv_input_data)
1016         self.assertEqual(response['status_code'], requests.codes.ok)
1017         time.sleep(self.WAITING)
1018
1019     def test_67_get_OCH_OTU4_service2(self):
1020         response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
1021             "services", "service2-OCH-OTU4")
1022         self.assertEqual(response['status_code'], requests.codes.ok)
1023         self.assertEqual(
1024             response['services'][0]['administrative-state'], 'inService')
1025         self.assertEqual(
1026             response['services'][0]['service-name'], 'service2-OCH-OTU4')
1027         self.assertEqual(
1028             response['services'][0]['connection-type'], 'infrastructure')
1029         self.assertEqual(
1030             response['services'][0]['lifecycle-state'], 'planned')
1031
1032     def test_68_create_ODU4_service_2(self):
1033         # pylint: disable=line-too-long
1034         self.cr_serv_input_data["service-name"] = "service2-ODU4"
1035         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
1036         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1037         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1038         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1039         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1040         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1041         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1042         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
1043         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1044         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1045         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1046         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1047         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1048         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1049         response = test_utils_rfc8040.transportpce_api_rpc_request(
1050             'org-openroadm-service', 'service-create',
1051             self.cr_serv_input_data)
1052         self.assertEqual(response['status_code'], requests.codes.ok)
1053         time.sleep(self.WAITING)
1054
1055     def test_69_get_ODU4_service2(self):
1056         response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
1057             "services", "service2-ODU4")
1058         self.assertEqual(response['status_code'], requests.codes.ok)
1059         self.assertEqual(
1060             response['services'][0]['administrative-state'], 'inService')
1061         self.assertEqual(
1062             response['services'][0]['service-name'], 'service2-ODU4')
1063         self.assertEqual(
1064             response['services'][0]['connection-type'], 'infrastructure')
1065         self.assertEqual(
1066             response['services'][0]['lifecycle-state'], 'planned')
1067
1068     def test_70_create_1GE_service(self):
1069         # pylint: disable=line-too-long
1070         self.cr_serv_input_data["service-name"] = "service1-1GE"
1071         self.cr_serv_input_data["connection-type"] = "service"
1072         self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
1073         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1074         self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
1075         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1076         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
1077         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1078         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1079         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1080         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1081         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
1082         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1083         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1084         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1085         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1086         response = test_utils_rfc8040.transportpce_api_rpc_request(
1087             'org-openroadm-service', 'service-create',
1088             self.cr_serv_input_data)
1089         self.assertEqual(response['status_code'], requests.codes.ok)
1090         time.sleep(self.WAITING)
1091
1092     def test_71_get_1GE_service1(self):
1093         response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
1094             "services", "service1-1GE")
1095         self.assertEqual(response['status_code'], requests.codes.ok)
1096         self.assertEqual(
1097             response['services'][0]['administrative-state'], 'inService')
1098         self.assertEqual(
1099             response['services'][0]['service-name'], 'service1-1GE')
1100         self.assertEqual(
1101             response['services'][0]['connection-type'], 'service')
1102         self.assertEqual(
1103             response['services'][0]['lifecycle-state'], 'planned')
1104
1105     def test_72_check_interface_1GE_CLIENT_spdra(self):
1106         response = test_utils_rfc8040.check_node_attribute_request(
1107             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1108         self.assertEqual(response['status_code'], requests.codes.ok)
1109         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1110                       'administrative-state': 'inService',
1111                       'supporting-circuit-pack-name': 'CP1-SFP4',
1112                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1113                       'supporting-port': 'CP1-SFP4-P1'
1114                       }
1115         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1116                              response['interface'][0])
1117         self.assertEqual(
1118             1000,
1119             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1120
1121     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1122         response = test_utils_rfc8040.check_node_attribute_request(
1123             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1124         self.assertEqual(response['status_code'], requests.codes.ok)
1125         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1126                         'administrative-state': 'inService',
1127                         'supporting-circuit-pack-name': 'CP3-SFP1',
1128                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1129                         'type': 'org-openroadm-interfaces:otnOdu',
1130                         'supporting-port': 'CP3-SFP1-P1'}
1131         input_dict_2 = {
1132             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1133             'rate': 'org-openroadm-otn-common-types:ODU0',
1134             'monitoring-mode': 'terminated'}
1135         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1136                              response['interface'][0])
1137         self.assertDictEqual(dict(input_dict_2,
1138                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1139                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1140         self.assertDictEqual(
1141             {'payload-type': '07', 'exp-payload-type': '07'},
1142             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1143
1144     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1145         response = test_utils_rfc8040.check_node_attribute_request(
1146             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1147         self.assertEqual(response['status_code'], requests.codes.ok)
1148         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1149                         'administrative-state': 'inService',
1150                         'supporting-circuit-pack-name': 'CP3-CFP0',
1151                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1152                         'type': 'org-openroadm-interfaces:otnOdu',
1153                         'supporting-port': 'CP3-CFP0-P1'}
1154         input_dict_2 = {
1155             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1156             'rate': 'org-openroadm-otn-common-types:ODU0',
1157             'monitoring-mode': 'monitored'}
1158         input_dict_3 = {'trib-port-number': 1}
1159         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1160                              response['interface'][0])
1161         self.assertDictEqual(dict(input_dict_2,
1162                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1163                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1164         self.assertDictEqual(dict(input_dict_3,
1165                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1166                                       'parent-odu-allocation']),
1167                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1168         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1169                       ['trib-slots'])
1170
1171     def test_75_check_ODU0_connection_spdra(self):
1172         response = test_utils_rfc8040.check_node_attribute_request(
1173             'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1174         self.assertEqual(response['status_code'], requests.codes.ok)
1175         input_dict_1 = {
1176             'connection-name':
1177             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1178             'direction': 'bidirectional'
1179         }
1180         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1181                              response['odu-connection'][0])
1182         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1183                              response['odu-connection'][0]['destination'])
1184         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1185                              response['odu-connection'][0]['source'])
1186
1187     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1188         response = test_utils_rfc8040.check_node_attribute_request(
1189             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1190         self.assertEqual(response['status_code'], requests.codes.ok)
1191         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1192                       'administrative-state': 'inService',
1193                       'supporting-circuit-pack-name': 'CP3-SFP1',
1194                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1195                       'supporting-port': 'CP3-SFP1-P1'
1196                       }
1197         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1198                              response['interface'][0])
1199         self.assertEqual(
1200             1000,
1201             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1202
1203     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1204         response = test_utils_rfc8040.check_node_attribute_request(
1205             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1206         self.assertEqual(response['status_code'], requests.codes.ok)
1207         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1208                         'administrative-state': 'inService',
1209                         'supporting-circuit-pack-name': 'CP3-SFP1',
1210                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1211                         'type': 'org-openroadm-interfaces:otnOdu',
1212                         'supporting-port': 'CP3-SFP1-P1'}
1213         input_dict_2 = {
1214             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1215             'rate': 'org-openroadm-otn-common-types:ODU0',
1216             'monitoring-mode': 'terminated'}
1217         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1218                              response['interface'][0])
1219         self.assertDictEqual(dict(input_dict_2,
1220                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1221                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1222         self.assertDictEqual(
1223             {'payload-type': '07', 'exp-payload-type': '07'},
1224             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1225
1226     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1227         response = test_utils_rfc8040.check_node_attribute_request(
1228             'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1229         self.assertEqual(response['status_code'], requests.codes.ok)
1230         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1231                         'administrative-state': 'inService',
1232                         'supporting-circuit-pack-name': 'CP3-CFP0',
1233                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1234                         'type': 'org-openroadm-interfaces:otnOdu',
1235                         'supporting-port': 'CP3-CFP0-P1'}
1236         input_dict_2 = {
1237             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1238             'rate': 'org-openroadm-otn-common-types:ODU0',
1239             'monitoring-mode': 'monitored'}
1240         input_dict_3 = {'trib-port-number': 1}
1241         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1242                              response['interface'][0])
1243         self.assertDictEqual(dict(input_dict_2,
1244                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1245                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1246         self.assertDictEqual(dict(input_dict_3,
1247                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1248                                       'parent-odu-allocation']),
1249                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1250             'parent-odu-allocation'])
1251         self.assertIn(1,
1252                       response['interface'][0][
1253                           'org-openroadm-otn-odu-interfaces:odu'][
1254                           'parent-odu-allocation']['trib-slots'])
1255
1256     def test_79_check_ODU0_connection_spdrc(self):
1257         response = test_utils_rfc8040.check_node_attribute_request(
1258             'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1259         self.assertEqual(response['status_code'], requests.codes.ok)
1260         input_dict_1 = {
1261             'connection-name':
1262             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1263             'direction': 'bidirectional'
1264         }
1265         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1266                              response['odu-connection'][0])
1267         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1268                              response['odu-connection'][0]['destination'])
1269         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1270                              response['odu-connection'][0]['source'])
1271
1272     def test_80_check_otn_topo_links(self):
1273         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1274         self.assertEqual(response['status_code'], requests.codes.ok)
1275         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1276         for link in response['network'][0]['ietf-network-topology:link']:
1277             linkId = link['link-id']
1278             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1279                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1280                 self.assertEqual(
1281                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1282                 self.assertEqual(
1283                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1284
1285     def test_81_check_otn_topo_tp(self):
1286         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1287         self.assertEqual(response['status_code'], requests.codes.ok)
1288         for node in response['network'][0]['node']:
1289             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1290                 tpList = node['ietf-network-topology:termination-point']
1291                 for tp in tpList:
1292                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1293                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1294                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1295                         tsPoolList = list(range(1, 2))
1296                         self.assertNotIn(
1297                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1298                         self.assertEqual(
1299                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1300                         self.assertNotIn(
1301                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1302
1303     def test_82_delete_1GE_service(self):
1304         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
1305         response = test_utils_rfc8040.transportpce_api_rpc_request(
1306             'org-openroadm-service', 'service-delete',
1307             self.del_serv_input_data)
1308         self.assertEqual(response['status_code'], requests.codes.ok)
1309         time.sleep(self.WAITING)
1310
1311     def test_83_check_service_list(self):
1312         response = test_utils_rfc8040.get_ordm_serv_list_request()
1313         self.assertEqual(response['status_code'], requests.codes.ok)
1314         self.assertEqual(len(response['service-list']['services']), 2)
1315
1316     def test_84_check_no_ODU0_connection_spdra(self):
1317         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1318         self.assertEqual(response.status_code, requests.codes.ok)
1319         res = response.json()
1320         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1321         time.sleep(1)
1322
1323     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1324         response = test_utils_rfc8040.check_node_attribute_request(
1325             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1326         self.assertEqual(response['status_code'], requests.codes.conflict)
1327
1328     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1329         response = test_utils_rfc8040.check_node_attribute_request(
1330             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1331         self.assertEqual(response['status_code'], requests.codes.conflict)
1332
1333     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1334         response = test_utils_rfc8040.check_node_attribute_request(
1335             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1336         self.assertEqual(response['status_code'], requests.codes.conflict)
1337
1338     def test_88_check_otn_topo_links(self):
1339         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1340         self.assertEqual(response['status_code'], requests.codes.ok)
1341         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1342         for link in response['network'][0]['ietf-network-topology:link']:
1343             linkId = link['link-id']
1344             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1345                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1346                 self.assertEqual(
1347                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1348                 self.assertEqual(
1349                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1350
1351     def test_89_check_otn_topo_tp(self):
1352         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1353         self.assertEqual(response['status_code'], requests.codes.ok)
1354         for node in response['network'][0]['node']:
1355             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1356                 tpList = node['ietf-network-topology:termination-point']
1357                 for tp in tpList:
1358                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1359                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1360                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1361                         self.assertEqual(
1362                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1363
1364     def test_90_delete_ODU4_service(self):
1365         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
1366         response = test_utils_rfc8040.transportpce_api_rpc_request(
1367             'org-openroadm-service', 'service-delete',
1368             self.del_serv_input_data)
1369         self.assertEqual(response['status_code'], requests.codes.ok)
1370         time.sleep(self.WAITING)
1371
1372     def test_91_delete_OCH_OTU4_service(self):
1373         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
1374         response = test_utils_rfc8040.transportpce_api_rpc_request(
1375             'org-openroadm-service', 'service-delete',
1376             self.del_serv_input_data)
1377         self.assertEqual(response['status_code'], requests.codes.ok)
1378         time.sleep(self.WAITING)
1379
1380     def test_92_disconnect_xponders_from_roadm(self):
1381         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1382         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1383         self.assertEqual(response['status_code'], requests.codes.ok)
1384         links = response['network'][0]['ietf-network-topology:link']
1385         for link in links:
1386             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1387                 link_name = link["link-id"]
1388                 response = test_utils.delete_request(url+link_name)
1389                 self.assertEqual(response.status_code, requests.codes.ok)
1390
1391     def test_93_check_openroadm_topology(self):
1392         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1393         self.assertEqual(response['status_code'], requests.codes.ok)
1394         self.assertEqual(18,
1395                          len(response['network'][0]['ietf-network-topology:link']),
1396                          'Topology should contain 18 links')
1397
1398     def test_94_disconnect_spdrA(self):
1399         response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1400         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1401
1402     def test_95_disconnect_spdrC(self):
1403         response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1404         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1405
1406     def test_96_disconnect_roadmA(self):
1407         response = test_utils_rfc8040.unmount_device("ROADM-A1")
1408         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1409
1410     def test_97_disconnect_roadmC(self):
1411         response = test_utils_rfc8040.unmount_device("ROADM-C1")
1412         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1413
1414
1415 if __name__ == "__main__":
1416     unittest.main(verbosity=2)