Migrate OTN E2E functional tests to RFC8040 step 4
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26 import test_utils_rfc8040  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION = '2.2.1'
34
35     cr_serv_input_data = {
36         "sdnc-request-header": {
37             "request-id": "request-1",
38             "rpc-action": "service-create",
39             "request-system-id": "appname"
40         },
41         "service-name": "service1-OCH-OTU4",
42         "common-id": "commonId",
43         "connection-type": "infrastructure",
44         "service-a-end": {
45             "service-rate": "100",
46             "node-id": "SPDR-SA1",
47             "service-format": "OTU",
48             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
49             "clli": "NodeSA",
50             "tx-direction": [{
51                 "port": {
52                     "port-device-name": "SPDR-SA1-XPDR1",
53                     "port-type": "fixed",
54                     "port-name": "XPDR1-NETWORK1",
55                     "port-rack": "000000.00",
56                     "port-shelf": "Chassis#1"
57                 },
58                 "lgx": {
59                     "lgx-device-name": "Some lgx-device-name",
60                     "lgx-port-name": "Some lgx-port-name",
61                     "lgx-port-rack": "000000.00",
62                     "lgx-port-shelf": "00"
63                 },
64                 "index": 0
65             }],
66             "rx-direction": [{
67                 "port": {
68                     "port-device-name": "SPDR-SA1-XPDR1",
69                     "port-type": "fixed",
70                     "port-name": "XPDR1-NETWORK1",
71                     "port-rack": "000000.00",
72                     "port-shelf": "Chassis#1"
73                 },
74                 "lgx": {
75                     "lgx-device-name": "Some lgx-device-name",
76                     "lgx-port-name": "Some lgx-port-name",
77                     "lgx-port-rack": "000000.00",
78                     "lgx-port-shelf": "00"
79                 },
80                 "index": 0
81             }],
82             "optic-type": "gray"
83         },
84         "service-z-end": {
85             "service-rate": "100",
86             "node-id": "SPDR-SC1",
87             "service-format": "OTU",
88             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89             "clli": "NodeSC",
90             "tx-direction": [{
91                 "port": {
92                     "port-device-name": "SPDR-SC1-XPDR1",
93                     "port-type": "fixed",
94                     "port-name": "XPDR1-NETWORK1",
95                     "port-rack": "000000.00",
96                     "port-shelf": "Chassis#1"
97                 },
98                 "lgx": {
99                     "lgx-device-name": "Some lgx-device-name",
100                     "lgx-port-name": "Some lgx-port-name",
101                     "lgx-port-rack": "000000.00",
102                     "lgx-port-shelf": "00"
103                 },
104                 "index": 0
105             }],
106             "rx-direction": [{
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 },
120                 "index": 0
121             }],
122             "optic-type": "gray"
123         },
124         "due-date": "2018-06-15T00:00:01Z",
125         "operator-contact": "pw1234"
126     }
127
128     del_serv_input_data = {
129         "sdnc-request-header": {
130             "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
131             "rpc-action": "service-delete",
132             "request-system-id": "appname",
133             "notification-url": "http://localhost:8585/NotificationServer/notify"},
134         "service-delete-req-info": {
135             "service-name": "TBD",
136             "tail-retention": "no"}
137     }
138
139     @classmethod
140     def setUpClass(cls):
141         cls.processes = test_utils_rfc8040.start_tpce()
142         cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
143                                                        ('roadma', cls.NODE_VERSION),
144                                                        ('roadmc', cls.NODE_VERSION),
145                                                        ('spdrc', cls.NODE_VERSION)])
146
147     @classmethod
148     def tearDownClass(cls):
149         # pylint: disable=not-an-iterable
150         for process in cls.processes:
151             test_utils_rfc8040.shutdown_process(process)
152         print("all processes killed")
153
154     def setUp(self):
155         time.sleep(5)
156
157     def test_01_connect_spdrA(self):
158         response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
159         self.assertEqual(response.status_code,
160                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
161
162     def test_02_connect_spdrC(self):
163         response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
164         self.assertEqual(response.status_code,
165                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
166
167     def test_03_connect_rdmA(self):
168         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
169         self.assertEqual(response.status_code,
170                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
171
172     def test_04_connect_rdmC(self):
173         response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
174         self.assertEqual(response.status_code,
175                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
176
177     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
178         response = test_utils_rfc8040.transportpce_api_rpc_request(
179             'transportpce-networkutils', 'init-xpdr-rdm-links',
180             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
181                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
182         self.assertEqual(response['status_code'], requests.codes.ok)
183
184     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
185         response = test_utils_rfc8040.transportpce_api_rpc_request(
186             'transportpce-networkutils', 'init-rdm-xpdr-links',
187             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
188                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
189         self.assertEqual(response['status_code'], requests.codes.ok)
190
191     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
192         response = test_utils_rfc8040.transportpce_api_rpc_request(
193             'transportpce-networkutils', 'init-xpdr-rdm-links',
194             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
195                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
196         self.assertEqual(response['status_code'], requests.codes.ok)
197
198     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
199         response = test_utils_rfc8040.transportpce_api_rpc_request(
200             'transportpce-networkutils', 'init-rdm-xpdr-links',
201             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
202                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
203         self.assertEqual(response['status_code'], requests.codes.ok)
204
205     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
206         # Config ROADMA-ROADMC oms-attributes
207         data = {"span": {
208             "auto-spanloss": "true",
209             "spanloss-base": 11.4,
210             "spanloss-current": 12,
211             "engineered-spanloss": 12.2,
212             "link-concatenation": [{
213                 "SRLG-Id": 0,
214                 "fiber-type": "smf",
215                 "SRLG-length": 100000,
216                 "pmd": 0.5}]}}
217         response = test_utils_rfc8040.add_oms_attr_request(
218             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
219         self.assertEqual(response.status_code, requests.codes.created)
220
221     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
222         # Config ROADMC-ROADMA oms-attributes
223         data = {"span": {
224             "auto-spanloss": "true",
225             "spanloss-base": 11.4,
226             "spanloss-current": 12,
227             "engineered-spanloss": 12.2,
228             "link-concatenation": [{
229                 "SRLG-Id": 0,
230                 "fiber-type": "smf",
231                 "SRLG-length": 100000,
232                 "pmd": 0.5}]}}
233         response = test_utils_rfc8040.add_oms_attr_request(
234             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
235         self.assertEqual(response.status_code, requests.codes.created)
236
237 # test service-create for OCH-OTU4 service from spdr to spdr
238     def test_11_check_otn_topology(self):
239         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
240         self.assertEqual(response['status_code'], requests.codes.ok)
241         self.assertEqual(len(response['network'][0]['node']), 6)
242         self.assertNotIn('ietf-network-topology:link', response['network'][0])
243
244     def test_12_create_OCH_OTU4_service(self):
245         response = test_utils_rfc8040.transportpce_api_rpc_request(
246             'org-openroadm-service', 'service-create',
247             self.cr_serv_input_data)
248         self.assertEqual(response['status_code'], requests.codes.ok)
249         time.sleep(self.WAITING)
250
251     def test_13_get_OCH_OTU4_service1(self):
252         response = test_utils.get_service_list_request(
253             "services/service1-OCH-OTU4")
254         self.assertEqual(response.status_code, requests.codes.ok)
255         res = response.json()
256         self.assertEqual(
257             res['services'][0]['administrative-state'], 'inService')
258         self.assertEqual(
259             res['services'][0]['service-name'], 'service1-OCH-OTU4')
260         self.assertEqual(
261             res['services'][0]['connection-type'], 'infrastructure')
262         self.assertEqual(
263             res['services'][0]['lifecycle-state'], 'planned')
264         time.sleep(2)
265
266     # Check correct configuration of devices
267     def test_14_check_interface_och_spdra(self):
268         response = test_utils_rfc8040.check_node_attribute_request(
269             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
270         self.assertEqual(response['status_code'], requests.codes.ok)
271         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
272                                    'administrative-state': 'inService',
273                                    'supporting-circuit-pack-name': 'CP1-CFP0',
274                                    'type': 'org-openroadm-interfaces:opticalChannel',
275                                    'supporting-port': 'CP1-CFP0-P1'
276                                    }, **response['interface'][0]),
277                              response['interface'][0])
278         self.assertEqual('org-openroadm-common-types:R100G',
279                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
280         self.assertEqual('dp-qpsk',
281                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
282         self.assertEqual(196.1,
283                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
284         self.assertEqual(
285             -5,
286             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
287
288     def test_15_check_interface_OTU4_spdra(self):
289         response = test_utils_rfc8040.check_node_attribute_request(
290             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
291         self.assertEqual(response['status_code'], requests.codes.ok)
292         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
293                         'administrative-state': 'inService',
294                         'supporting-circuit-pack-name': 'CP1-CFP0',
295                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
296                         'type': 'org-openroadm-interfaces:otnOtu',
297                         'supporting-port': 'CP1-CFP0-P1'
298                         }
299         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
300                         'expected-dapi': 'H/OelLynehI=',
301                         'tx-dapi': 'AMf1n5hK6Xkk',
302                         'expected-sapi': 'AMf1n5hK6Xkk',
303                         'rate': 'org-openroadm-otn-common-types:OTU4',
304                         'fec': 'scfec'
305                         }
306         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
307                              response['interface'][0])
308         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
309                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
310
311     def test_16_check_interface_och_spdrc(self):
312         response = test_utils_rfc8040.check_node_attribute_request(
313             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
314         self.assertEqual(response['status_code'], requests.codes.ok)
315         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
316                                    'administrative-state': 'inService',
317                                    'supporting-circuit-pack-name': 'CP1-CFP0',
318                                    'type': 'org-openroadm-interfaces:opticalChannel',
319                                    'supporting-port': 'CP1-CFP0-P1'
320                                    }, **response['interface'][0]),
321                              response['interface'][0])
322         self.assertEqual('org-openroadm-common-types:R100G',
323                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
324         self.assertEqual('dp-qpsk',
325                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
326         self.assertEqual(196.1,
327                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
328         self.assertEqual(
329             -5,
330             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
331
332     def test_17_check_interface_OTU4_spdrc(self):
333         response = test_utils_rfc8040.check_node_attribute_request(
334             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
335         self.assertEqual(response['status_code'], requests.codes.ok)
336         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
337                         'administrative-state': 'inService',
338                         'supporting-circuit-pack-name': 'CP1-CFP0',
339                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
340                         'type': 'org-openroadm-interfaces:otnOtu',
341                         'supporting-port': 'CP1-CFP0-P1'
342                         }
343         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
344                         'expected-sapi': 'H/OelLynehI=',
345                         'tx-sapi': 'AMf1n5hK6Xkk',
346                         'expected-dapi': 'AMf1n5hK6Xkk',
347                         'rate': 'org-openroadm-otn-common-types:OTU4',
348                         'fec': 'scfec'
349                         }
350         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
351                              response['interface'][0])
352         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
353                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
354
355     def test_18_check_no_interface_ODU4_spdra(self):
356         response = test_utils_rfc8040.check_node_attribute_request(
357             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
358         self.assertEqual(response['status_code'], requests.codes.conflict)
359
360     def test_19_check_openroadm_topo_spdra(self):
361         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
362         self.assertEqual(response['status_code'], requests.codes.ok)
363         ele = response['node']['ietf-network-topology:termination-point'][0]
364         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
365         self.assertEqual(
366             196.1,
367             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
368         self.assertEqual(
369             40,
370             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
371
372     def test_20_check_openroadm_topo_ROADMA_SRG(self):
373         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
374         self.assertEqual(response['status_code'], requests.codes.ok)
375         freq_map = base64.b64decode(
376             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
377         freq_map_array = [int(x) for x in freq_map]
378         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
379         liste_tp = response['node']['ietf-network-topology:termination-point']
380         for ele in liste_tp:
381             if ele['tp-id'] == 'SRG1-PP1-TXRX':
382                 freq_map = base64.b64decode(
383                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
384                 freq_map_array = [int(x) for x in freq_map]
385                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
386             if ele['tp-id'] == 'SRG1-PP2-TXRX':
387                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
388
389     def test_21_check_openroadm_topo_ROADMA_DEG(self):
390         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
391         self.assertEqual(response['status_code'], requests.codes.ok)
392         freq_map = base64.b64decode(
393             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
394         freq_map_array = [int(x) for x in freq_map]
395         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
396         liste_tp = response['node']['ietf-network-topology:termination-point']
397         for ele in liste_tp:
398             if ele['tp-id'] == 'DEG2-CTP-TXRX':
399                 freq_map = base64.b64decode(
400                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
401                 freq_map_array = [int(x) for x in freq_map]
402                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
403             if ele['tp-id'] == 'DEG2-TTP-TXRX':
404                 freq_map = base64.b64decode(
405                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
406                 freq_map_array = [int(x) for x in freq_map]
407                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
408
409     def test_22_check_otn_topo_otu4_links(self):
410         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
411         self.assertEqual(response['status_code'], requests.codes.ok)
412         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
413         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
414                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
415         for link in response['network'][0]['ietf-network-topology:link']:
416             self.assertIn(link['link-id'], listLinkId)
417             self.assertEqual(
418                 link['transportpce-topology:otn-link-type'], 'OTU4')
419             self.assertEqual(
420                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
421             self.assertEqual(
422                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
423             self.assertEqual(
424                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
425             self.assertIn(
426                 link['org-openroadm-common-network:opposite-link'], listLinkId)
427
428 # test service-create for ODU4 service from spdr to spdr
429     def test_23_create_ODU4_service(self):
430         self.cr_serv_input_data["service-name"] = "service1-ODU4"
431         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
432         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
433         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
434         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
435         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
436         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
437
438         response = test_utils_rfc8040.transportpce_api_rpc_request(
439             'org-openroadm-service', 'service-create',
440             self.cr_serv_input_data)
441         self.assertEqual(response['status_code'], requests.codes.ok)
442         time.sleep(self.WAITING)
443
444     def test_24_get_ODU4_service1(self):
445         response = test_utils.get_service_list_request(
446             "services/service1-ODU4")
447         self.assertEqual(response.status_code, requests.codes.ok)
448         res = response.json()
449         self.assertEqual(
450             res['services'][0]['administrative-state'], 'inService')
451         self.assertEqual(
452             res['services'][0]['service-name'], 'service1-ODU4')
453         self.assertEqual(
454             res['services'][0]['connection-type'], 'infrastructure')
455         self.assertEqual(
456             res['services'][0]['lifecycle-state'], 'planned')
457         time.sleep(2)
458
459     def test_25_check_interface_ODU4_spdra(self):
460         response = test_utils_rfc8040.check_node_attribute_request(
461             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
462         self.assertEqual(response['status_code'], requests.codes.ok)
463         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
464                         'administrative-state': 'inService',
465                         'supporting-circuit-pack-name': 'CP1-CFP0',
466                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
467                         'type': 'org-openroadm-interfaces:otnOdu',
468                         'supporting-port': 'CP1-CFP0-P1',
469                         'circuit-id': 'TBD',
470                         'description': 'TBD'}
471         # SAPI/DAPI are added in the Otu4 renderer
472         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
473                         'rate': 'org-openroadm-otn-common-types:ODU4',
474                         'monitoring-mode': 'terminated',
475                         'expected-dapi': 'H/OelLynehI=',
476                         'expected-sapi': 'AMf1n5hK6Xkk',
477                         'tx-dapi': 'AMf1n5hK6Xkk',
478                         'tx-sapi': 'H/OelLynehI='}
479
480         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
481                              response['interface'][0])
482         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
483                                   **input_dict_2),
484                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
485                              )
486         self.assertDictEqual(
487             {'payload-type': '21', 'exp-payload-type': '21'},
488             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
489
490     def test_26_check_interface_ODU4_spdrc(self):
491         response = test_utils_rfc8040.check_node_attribute_request(
492             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
493         self.assertEqual(response['status_code'], requests.codes.ok)
494         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
495                         'administrative-state': 'inService',
496                         'supporting-circuit-pack-name': 'CP1-CFP0',
497                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
498                         'type': 'org-openroadm-interfaces:otnOdu',
499                         'supporting-port': 'CP1-CFP0-P1',
500                         'circuit-id': 'TBD',
501                         'description': 'TBD'}
502         # SAPI/DAPI are added in the Otu4 renderer
503         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
504                         'rate': 'org-openroadm-otn-common-types:ODU4',
505                         'monitoring-mode': 'terminated',
506                         'tx-sapi': 'AMf1n5hK6Xkk',
507                         'tx-dapi': 'H/OelLynehI=',
508                         'expected-sapi': 'H/OelLynehI=',
509                         'expected-dapi': 'AMf1n5hK6Xkk'
510                         }
511         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
512                              response['interface'][0])
513         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
514                                   **input_dict_2),
515                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
516                              )
517         self.assertDictEqual(
518             {'payload-type': '21', 'exp-payload-type': '21'},
519             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
520
521     def test_27_check_otn_topo_links(self):
522         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
523         self.assertEqual(response['status_code'], requests.codes.ok)
524         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
525         for link in response['network'][0]['ietf-network-topology:link']:
526             linkId = link['link-id']
527             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
528                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
529                 self.assertEqual(
530                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
531                 self.assertEqual(
532                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
533             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
534                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
535                 self.assertEqual(
536                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
537                 self.assertEqual(
538                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
539                 self.assertEqual(
540                     link['transportpce-topology:otn-link-type'], 'ODTU4')
541                 self.assertEqual(
542                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
543                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
544                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
545                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
546             else:
547                 self.fail("this link should not exist")
548
549     def test_28_check_otn_topo_tp(self):
550         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
551         self.assertEqual(response['status_code'], requests.codes.ok)
552         for node in response['network'][0]['node']:
553             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
554                 tpList = node['ietf-network-topology:termination-point']
555                 for tp in tpList:
556                     if tp['tp-id'] == 'XPDR1-NETWORK1':
557                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
558                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
559                         self.assertEqual(
560                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
561                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
562                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
563
564 # test service-create for 10GE service from spdr to spdr
565     def test_29_create_10GE_service(self):
566         self.cr_serv_input_data["service-name"] = "service1-10GE"
567         self.cr_serv_input_data["connection-type"] = "service"
568         self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
569         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
570         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
571         self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
572         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
573         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
574         self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
575         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
576         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
577         self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
578         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
579         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
580         response = test_utils_rfc8040.transportpce_api_rpc_request(
581             'org-openroadm-service', 'service-create',
582             self.cr_serv_input_data)
583         self.assertEqual(response['status_code'], requests.codes.ok)
584         time.sleep(self.WAITING)
585
586     def test_30_get_10GE_service1(self):
587         response = test_utils.get_service_list_request(
588             "services/service1-10GE")
589         self.assertEqual(response.status_code, requests.codes.ok)
590         res = response.json()
591         self.assertEqual(
592             res['services'][0]['administrative-state'], 'inService')
593         self.assertEqual(
594             res['services'][0]['service-name'], 'service1-10GE')
595         self.assertEqual(
596             res['services'][0]['connection-type'], 'service')
597         self.assertEqual(
598             res['services'][0]['lifecycle-state'], 'planned')
599         time.sleep(2)
600
601     def test_31_check_interface_10GE_CLIENT_spdra(self):
602         response = test_utils_rfc8040.check_node_attribute_request(
603             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
604         self.assertEqual(response['status_code'], requests.codes.ok)
605         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
606                       'administrative-state': 'inService',
607                       'supporting-circuit-pack-name': 'CP1-SFP4',
608                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
609                       'supporting-port': 'CP1-SFP4-P1'
610                       }
611         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
612                              response['interface'][0])
613         self.assertEqual(
614             10000,
615             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
616
617     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
618         response = test_utils_rfc8040.check_node_attribute_request(
619             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
620         self.assertEqual(response['status_code'], requests.codes.ok)
621         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
622                         'administrative-state': 'inService',
623                         'supporting-circuit-pack-name': 'CP1-SFP4',
624                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
625                         'type': 'org-openroadm-interfaces:otnOdu',
626                         'supporting-port': 'CP1-SFP4-P1'}
627         input_dict_2 = {
628             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
629             'rate': 'org-openroadm-otn-common-types:ODU2e',
630             'monitoring-mode': 'terminated'}
631
632         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
633                              response['interface'][0])
634         self.assertDictEqual(dict(input_dict_2,
635                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
636                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
637         self.assertDictEqual(
638             {'payload-type': '03', 'exp-payload-type': '03'},
639             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
640
641     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
642         response = test_utils_rfc8040.check_node_attribute_request(
643             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
644         self.assertEqual(response['status_code'], requests.codes.ok)
645         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
646                         'administrative-state': 'inService',
647                         'supporting-circuit-pack-name': 'CP1-CFP0',
648                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
649                         'type': 'org-openroadm-interfaces:otnOdu',
650                         'supporting-port': 'CP1-CFP0-P1'}
651         input_dict_2 = {
652             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
653             'rate': 'org-openroadm-otn-common-types:ODU2e',
654             'monitoring-mode': 'monitored'}
655         input_dict_3 = {'trib-port-number': 1}
656
657         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
658                              response['interface'][0])
659         self.assertDictEqual(dict(input_dict_2,
660                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
661                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
662         self.assertDictEqual(dict(input_dict_3,
663                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
664                                       'parent-odu-allocation']),
665                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
666         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
667                       ['trib-slots'])
668
669     def test_34_check_ODU2E_connection_spdra(self):
670         response = test_utils_rfc8040.check_node_attribute_request(
671             'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
672         self.assertEqual(response['status_code'], requests.codes.ok)
673         input_dict_1 = {
674             'connection-name':
675             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
676             'direction': 'bidirectional'
677         }
678
679         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
680                              response['odu-connection'][0])
681         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
682                              response['odu-connection'][0]['destination'])
683         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
684                              response['odu-connection'][0]['source'])
685
686     def test_35_check_interface_10GE_CLIENT_spdrc(self):
687         response = test_utils_rfc8040.check_node_attribute_request(
688             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
689         self.assertEqual(response['status_code'], requests.codes.ok)
690         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
691                       'administrative-state': 'inService',
692                       'supporting-circuit-pack-name': 'CP1-SFP4',
693                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
694                       'supporting-port': 'CP1-SFP4-P1'
695                       }
696         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
697                              response['interface'][0])
698         self.assertEqual(
699             10000,
700             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
701
702     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
703         response = test_utils_rfc8040.check_node_attribute_request(
704             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
705         self.assertEqual(response['status_code'], requests.codes.ok)
706         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
707                         'administrative-state': 'inService',
708                         'supporting-circuit-pack-name': 'CP1-SFP4',
709                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
710                         'type': 'org-openroadm-interfaces:otnOdu',
711                         'supporting-port': 'CP1-SFP4-P1'}
712         input_dict_2 = {
713             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
714             'rate': 'org-openroadm-otn-common-types:ODU2e',
715             'monitoring-mode': 'terminated'}
716
717         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
718                              response['interface'][0])
719         self.assertDictEqual(dict(input_dict_2,
720                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
721                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
722         self.assertDictEqual(
723             {'payload-type': '03', 'exp-payload-type': '03'},
724             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
725
726     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
727         response = test_utils_rfc8040.check_node_attribute_request(
728             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
729         self.assertEqual(response['status_code'], requests.codes.ok)
730         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
731                         'administrative-state': 'inService',
732                         'supporting-circuit-pack-name': 'CP1-CFP0',
733                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
734                         'type': 'org-openroadm-interfaces:otnOdu',
735                         'supporting-port': 'CP1-CFP0-P1'}
736         input_dict_2 = {
737             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
738             'rate': 'org-openroadm-otn-common-types:ODU2e',
739             'monitoring-mode': 'monitored'}
740
741         input_dict_3 = {'trib-port-number': 1}
742
743         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
744                              response['interface'][0])
745         self.assertDictEqual(dict(input_dict_2,
746                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
747                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
748         self.assertDictEqual(dict(input_dict_3,
749                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
750                                       'parent-odu-allocation']),
751                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
752             'parent-odu-allocation'])
753         self.assertIn(1,
754                       response['interface'][0][
755                           'org-openroadm-otn-odu-interfaces:odu'][
756                           'parent-odu-allocation']['trib-slots'])
757
758     def test_38_check_ODU2E_connection_spdrc(self):
759         response = test_utils_rfc8040.check_node_attribute_request(
760             'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
761         self.assertEqual(response['status_code'], requests.codes.ok)
762         input_dict_1 = {
763             'connection-name':
764             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
765             'direction': 'bidirectional'
766         }
767
768         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
769                              response['odu-connection'][0])
770         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
771                              response['odu-connection'][0]['destination'])
772         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
773                              response['odu-connection'][0]['source'])
774
775     def test_39_check_otn_topo_links(self):
776         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
777         self.assertEqual(response['status_code'], requests.codes.ok)
778         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
779         for link in response['network'][0]['ietf-network-topology:link']:
780             linkId = link['link-id']
781             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
782                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
783                 self.assertEqual(
784                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
785                 self.assertEqual(
786                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
787
788     def test_40_check_otn_topo_tp(self):
789         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
790         self.assertEqual(response['status_code'], requests.codes.ok)
791         for node in response['network'][0]['node']:
792             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
793                 tpList = node['ietf-network-topology:termination-point']
794                 for tp in tpList:
795                     if tp['tp-id'] == 'XPDR1-NETWORK1':
796                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
797                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
798                         tsPoolList = list(range(1, 9))
799                         self.assertNotIn(
800                             tsPoolList, xpdrTpPortConAt['ts-pool'])
801                         self.assertEqual(
802                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
803                         self.assertNotIn(
804                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
805
806     def test_41_delete_10GE_service(self):
807         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
808         response = test_utils_rfc8040.transportpce_api_rpc_request(
809             'org-openroadm-service', 'service-delete',
810             self.del_serv_input_data)
811         self.assertEqual(response['status_code'], requests.codes.ok)
812         time.sleep(self.WAITING)
813
814     def test_42_check_service_list(self):
815         response = test_utils.get_service_list_request("")
816         self.assertEqual(response.status_code, requests.codes.ok)
817         res = response.json()
818         self.assertEqual(len(res['service-list']['services']), 2)
819         time.sleep(2)
820
821     def test_43_check_no_ODU2e_connection_spdra(self):
822         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
823         self.assertEqual(response.status_code, requests.codes.ok)
824         res = response.json()
825         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
826         time.sleep(1)
827
828     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
829         response = test_utils_rfc8040.check_node_attribute_request(
830             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
831         self.assertEqual(response['status_code'], requests.codes.conflict)
832
833     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
834         response = test_utils_rfc8040.check_node_attribute_request(
835             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
836         self.assertEqual(response['status_code'], requests.codes.conflict)
837
838     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
839         response = test_utils_rfc8040.check_node_attribute_request(
840             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
841         self.assertEqual(response['status_code'], requests.codes.conflict)
842
843     def test_47_check_otn_topo_links(self):
844         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
845         self.assertEqual(response['status_code'], requests.codes.ok)
846         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
847         for link in response['network'][0]['ietf-network-topology:link']:
848             linkId = link['link-id']
849             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
850                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
851                 self.assertEqual(
852                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
853                 self.assertEqual(
854                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
855
856     def test_48_check_otn_topo_tp(self):
857         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
858         self.assertEqual(response['status_code'], requests.codes.ok)
859         for node in response['network'][0]['node']:
860             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
861                 tpList = node['ietf-network-topology:termination-point']
862                 for tp in tpList:
863                     if tp['tp-id'] == 'XPDR1-NETWORK1':
864                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
865                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
866                         self.assertEqual(
867                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
868
869     def test_49_delete_ODU4_service(self):
870         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
871         response = test_utils_rfc8040.transportpce_api_rpc_request(
872             'org-openroadm-service', 'service-delete',
873             self.del_serv_input_data)
874         self.assertEqual(response['status_code'], requests.codes.ok)
875         time.sleep(self.WAITING)
876
877     def test_50_check_service_list(self):
878         response = test_utils.get_service_list_request("")
879         self.assertEqual(response.status_code, requests.codes.ok)
880         res = response.json()
881         self.assertEqual(len(res['service-list']['services']), 1)
882         time.sleep(2)
883
884     def test_51_check_no_interface_ODU4_spdra(self):
885         response = test_utils_rfc8040.check_node_attribute_request(
886             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
887         self.assertEqual(response['status_code'], requests.codes.conflict)
888
889     def test_52_check_otn_topo_links(self):
890         self.test_22_check_otn_topo_otu4_links()
891
892     def test_53_check_otn_topo_tp(self):
893         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
894         self.assertEqual(response['status_code'], requests.codes.ok)
895         for node in response['network'][0]['node']:
896             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
897                 tpList = node['ietf-network-topology:termination-point']
898                 for tp in tpList:
899                     if tp['tp-id'] == 'XPDR1-NETWORK1':
900                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
901                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
902                         self.assertNotIn(
903                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
904
905     def test_54_delete_OCH_OTU4_service(self):
906         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
907         response = test_utils_rfc8040.transportpce_api_rpc_request(
908             'org-openroadm-service', 'service-delete',
909             self.del_serv_input_data)
910         self.assertEqual(response['status_code'], requests.codes.ok)
911         time.sleep(self.WAITING)
912
913     def test_55_get_no_service(self):
914         response = test_utils.get_service_list_request("")
915         self.assertEqual(response.status_code, requests.codes.conflict)
916         res = response.json()
917         self.assertIn(
918             {"error-type": "application", "error-tag": "data-missing",
919              "error-message": "Request could not be completed because the relevant data model content does not exist"},
920             res['errors']['error'])
921         time.sleep(1)
922
923     def test_56_check_no_interface_OTU4_spdra(self):
924         response = test_utils_rfc8040.check_node_attribute_request(
925             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
926         self.assertEqual(response['status_code'], requests.codes.conflict)
927
928     def test_57_check_no_interface_OCH_spdra(self):
929         response = test_utils_rfc8040.check_node_attribute_request(
930             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
931         self.assertEqual(response['status_code'], requests.codes.conflict)
932
933     def test_58_getLinks_OtnTopology(self):
934         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
935         self.assertEqual(response['status_code'], requests.codes.ok)
936         self.assertNotIn('ietf-network-topology:link', response['network'][0])
937
938     def test_59_check_openroadm_topo_spdra(self):
939         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
940         self.assertEqual(response['status_code'], requests.codes.ok)
941         tp = response['node']['ietf-network-topology:termination-point'][0]
942         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
943         self.assertNotIn('wavelength', dict.keys(
944             tp['org-openroadm-network-topology:xpdr-network-attributes']))
945
946     def test_60_check_openroadm_topo_ROADMA_SRG(self):
947         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
948         self.assertEqual(response['status_code'], requests.codes.ok)
949         freq_map = base64.b64decode(
950             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
951         freq_map_array = [int(x) for x in freq_map]
952         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
953         liste_tp = response['node']['ietf-network-topology:termination-point']
954         for ele in liste_tp:
955             if ele['tp-id'] == 'SRG1-PP1-TXRX':
956                 freq_map = base64.b64decode(
957                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
958                 freq_map_array = [int(x) for x in freq_map]
959                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
960         time.sleep(3)
961
962     def test_61_check_openroadm_topo_ROADMA_DEG(self):
963         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
964         self.assertEqual(response['status_code'], requests.codes.ok)
965         freq_map = base64.b64decode(
966             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
967         freq_map_array = [int(x) for x in freq_map]
968         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
969         liste_tp = response['node']['ietf-network-topology:termination-point']
970         for ele in liste_tp:
971             if ele['tp-id'] == 'DEG2-CTP-TXRX':
972                 freq_map = base64.b64decode(
973                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
974                 freq_map_array = [int(x) for x in freq_map]
975                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
976             if ele['tp-id'] == 'DEG2-TTP-TXRX':
977                 freq_map = base64.b64decode(
978                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
979                 freq_map_array = [int(x) for x in freq_map]
980                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
981         time.sleep(3)
982
983     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
984         response = test_utils_rfc8040.transportpce_api_rpc_request(
985             'transportpce-networkutils', 'init-xpdr-rdm-links',
986             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
987                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
988         self.assertEqual(response['status_code'], requests.codes.ok)
989
990     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
991         response = test_utils_rfc8040.transportpce_api_rpc_request(
992             'transportpce-networkutils', 'init-rdm-xpdr-links',
993             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
994                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
995         self.assertEqual(response['status_code'], requests.codes.ok)
996
997     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
998         response = test_utils_rfc8040.transportpce_api_rpc_request(
999             'transportpce-networkutils', 'init-xpdr-rdm-links',
1000             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1001                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1002         self.assertEqual(response['status_code'], requests.codes.ok)
1003
1004     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1005         response = test_utils_rfc8040.transportpce_api_rpc_request(
1006             'transportpce-networkutils', 'init-rdm-xpdr-links',
1007             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1008                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1009         self.assertEqual(response['status_code'], requests.codes.ok)
1010
1011     def test_66_create_OCH_OTU4_service_2(self):
1012         # pylint: disable=line-too-long
1013         self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
1014         self.cr_serv_input_data["connection-type"] = "infrastructure"
1015         self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
1016         self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
1017         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1018         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1019         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1020         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1021         self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1022         self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1023         self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1024         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1025         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1026         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1027         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1028         self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1029         response = test_utils_rfc8040.transportpce_api_rpc_request(
1030             'org-openroadm-service', 'service-create',
1031             self.cr_serv_input_data)
1032         self.assertEqual(response['status_code'], requests.codes.ok)
1033         time.sleep(self.WAITING)
1034
1035     def test_67_get_OCH_OTU4_service2(self):
1036         response = test_utils.get_service_list_request(
1037             "services/service2-OCH-OTU4")
1038         self.assertEqual(response.status_code, requests.codes.ok)
1039         res = response.json()
1040         self.assertEqual(
1041             res['services'][0]['administrative-state'], 'inService')
1042         self.assertEqual(
1043             res['services'][0]['service-name'], 'service2-OCH-OTU4')
1044         self.assertEqual(
1045             res['services'][0]['connection-type'], 'infrastructure')
1046         self.assertEqual(
1047             res['services'][0]['lifecycle-state'], 'planned')
1048         time.sleep(2)
1049
1050     def test_68_create_ODU4_service_2(self):
1051         # pylint: disable=line-too-long
1052         self.cr_serv_input_data["service-name"] = "service2-ODU4"
1053         self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
1054         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1055         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1056         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1057         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1058         self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1059         del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1060         self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
1061         self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1062         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1063         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1064         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1065         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1066         del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1067         response = test_utils_rfc8040.transportpce_api_rpc_request(
1068             'org-openroadm-service', 'service-create',
1069             self.cr_serv_input_data)
1070         self.assertEqual(response['status_code'], requests.codes.ok)
1071         time.sleep(self.WAITING)
1072
1073     def test_69_get_ODU4_service2(self):
1074         response = test_utils.get_service_list_request(
1075             "services/service2-ODU4")
1076         self.assertEqual(response.status_code, requests.codes.ok)
1077         res = response.json()
1078         self.assertEqual(
1079             res['services'][0]['administrative-state'], 'inService')
1080         self.assertEqual(
1081             res['services'][0]['service-name'], 'service2-ODU4')
1082         self.assertEqual(
1083             res['services'][0]['connection-type'], 'infrastructure')
1084         self.assertEqual(
1085             res['services'][0]['lifecycle-state'], 'planned')
1086         time.sleep(2)
1087
1088     def test_70_create_1GE_service(self):
1089         # pylint: disable=line-too-long
1090         self.cr_serv_input_data["service-name"] = "service1-1GE"
1091         self.cr_serv_input_data["connection-type"] = "service"
1092         self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
1093         self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1094         self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
1095         self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1096         del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
1097         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1098         self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1099         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1100         self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1101         del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
1102         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1103         self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1104         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1105         self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1106         response = test_utils_rfc8040.transportpce_api_rpc_request(
1107             'org-openroadm-service', 'service-create',
1108             self.cr_serv_input_data)
1109         self.assertEqual(response['status_code'], requests.codes.ok)
1110         time.sleep(self.WAITING)
1111
1112     def test_71_get_1GE_service1(self):
1113         response = test_utils.get_service_list_request("services/service1-1GE")
1114         self.assertEqual(response.status_code, requests.codes.ok)
1115         res = response.json()
1116         self.assertEqual(
1117             res['services'][0]['administrative-state'], 'inService')
1118         self.assertEqual(
1119             res['services'][0]['service-name'], 'service1-1GE')
1120         self.assertEqual(
1121             res['services'][0]['connection-type'], 'service')
1122         self.assertEqual(
1123             res['services'][0]['lifecycle-state'], 'planned')
1124         time.sleep(2)
1125
1126     def test_72_check_interface_1GE_CLIENT_spdra(self):
1127         response = test_utils_rfc8040.check_node_attribute_request(
1128             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1129         self.assertEqual(response['status_code'], requests.codes.ok)
1130         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1131                       'administrative-state': 'inService',
1132                       'supporting-circuit-pack-name': 'CP1-SFP4',
1133                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1134                       'supporting-port': 'CP1-SFP4-P1'
1135                       }
1136         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1137                              response['interface'][0])
1138         self.assertEqual(
1139             1000,
1140             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1141
1142     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1143         response = test_utils_rfc8040.check_node_attribute_request(
1144             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1145         self.assertEqual(response['status_code'], requests.codes.ok)
1146         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1147                         'administrative-state': 'inService',
1148                         'supporting-circuit-pack-name': 'CP3-SFP1',
1149                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1150                         'type': 'org-openroadm-interfaces:otnOdu',
1151                         'supporting-port': 'CP3-SFP1-P1'}
1152         input_dict_2 = {
1153             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1154             'rate': 'org-openroadm-otn-common-types:ODU0',
1155             'monitoring-mode': 'terminated'}
1156         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1157                              response['interface'][0])
1158         self.assertDictEqual(dict(input_dict_2,
1159                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1160                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1161         self.assertDictEqual(
1162             {'payload-type': '07', 'exp-payload-type': '07'},
1163             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1164
1165     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1166         response = test_utils_rfc8040.check_node_attribute_request(
1167             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1168         self.assertEqual(response['status_code'], requests.codes.ok)
1169         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1170                         'administrative-state': 'inService',
1171                         'supporting-circuit-pack-name': 'CP3-CFP0',
1172                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1173                         'type': 'org-openroadm-interfaces:otnOdu',
1174                         'supporting-port': 'CP3-CFP0-P1'}
1175         input_dict_2 = {
1176             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1177             'rate': 'org-openroadm-otn-common-types:ODU0',
1178             'monitoring-mode': 'monitored'}
1179         input_dict_3 = {'trib-port-number': 1}
1180         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1181                              response['interface'][0])
1182         self.assertDictEqual(dict(input_dict_2,
1183                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1184                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1185         self.assertDictEqual(dict(input_dict_3,
1186                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1187                                       'parent-odu-allocation']),
1188                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1189         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1190                       ['trib-slots'])
1191
1192     def test_75_check_ODU0_connection_spdra(self):
1193         response = test_utils_rfc8040.check_node_attribute_request(
1194             'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1195         self.assertEqual(response['status_code'], requests.codes.ok)
1196         input_dict_1 = {
1197             'connection-name':
1198             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1199             'direction': 'bidirectional'
1200         }
1201         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1202                              response['odu-connection'][0])
1203         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1204                              response['odu-connection'][0]['destination'])
1205         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1206                              response['odu-connection'][0]['source'])
1207
1208     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1209         response = test_utils_rfc8040.check_node_attribute_request(
1210             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1211         self.assertEqual(response['status_code'], requests.codes.ok)
1212         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1213                       'administrative-state': 'inService',
1214                       'supporting-circuit-pack-name': 'CP3-SFP1',
1215                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1216                       'supporting-port': 'CP3-SFP1-P1'
1217                       }
1218         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1219                              response['interface'][0])
1220         self.assertEqual(
1221             1000,
1222             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1223
1224     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1225         response = test_utils_rfc8040.check_node_attribute_request(
1226             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1227         self.assertEqual(response['status_code'], requests.codes.ok)
1228         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1229                         'administrative-state': 'inService',
1230                         'supporting-circuit-pack-name': 'CP3-SFP1',
1231                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1232                         'type': 'org-openroadm-interfaces:otnOdu',
1233                         'supporting-port': 'CP3-SFP1-P1'}
1234         input_dict_2 = {
1235             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1236             'rate': 'org-openroadm-otn-common-types:ODU0',
1237             'monitoring-mode': 'terminated'}
1238         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1239                              response['interface'][0])
1240         self.assertDictEqual(dict(input_dict_2,
1241                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1242                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1243         self.assertDictEqual(
1244             {'payload-type': '07', 'exp-payload-type': '07'},
1245             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1246
1247     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1248         response = test_utils_rfc8040.check_node_attribute_request(
1249             'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1250         self.assertEqual(response['status_code'], requests.codes.ok)
1251         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1252                         'administrative-state': 'inService',
1253                         'supporting-circuit-pack-name': 'CP3-CFP0',
1254                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1255                         'type': 'org-openroadm-interfaces:otnOdu',
1256                         'supporting-port': 'CP3-CFP0-P1'}
1257         input_dict_2 = {
1258             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1259             'rate': 'org-openroadm-otn-common-types:ODU0',
1260             'monitoring-mode': 'monitored'}
1261         input_dict_3 = {'trib-port-number': 1}
1262         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1263                              response['interface'][0])
1264         self.assertDictEqual(dict(input_dict_2,
1265                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1266                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1267         self.assertDictEqual(dict(input_dict_3,
1268                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1269                                       'parent-odu-allocation']),
1270                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1271             'parent-odu-allocation'])
1272         self.assertIn(1,
1273                       response['interface'][0][
1274                           'org-openroadm-otn-odu-interfaces:odu'][
1275                           'parent-odu-allocation']['trib-slots'])
1276
1277     def test_79_check_ODU0_connection_spdrc(self):
1278         response = test_utils_rfc8040.check_node_attribute_request(
1279             'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1280         self.assertEqual(response['status_code'], requests.codes.ok)
1281         input_dict_1 = {
1282             'connection-name':
1283             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1284             'direction': 'bidirectional'
1285         }
1286         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1287                              response['odu-connection'][0])
1288         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1289                              response['odu-connection'][0]['destination'])
1290         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1291                              response['odu-connection'][0]['source'])
1292
1293     def test_80_check_otn_topo_links(self):
1294         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1295         self.assertEqual(response['status_code'], requests.codes.ok)
1296         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1297         for link in response['network'][0]['ietf-network-topology:link']:
1298             linkId = link['link-id']
1299             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1300                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1301                 self.assertEqual(
1302                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1303                 self.assertEqual(
1304                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1305
1306     def test_81_check_otn_topo_tp(self):
1307         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1308         self.assertEqual(response['status_code'], requests.codes.ok)
1309         for node in response['network'][0]['node']:
1310             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1311                 tpList = node['ietf-network-topology:termination-point']
1312                 for tp in tpList:
1313                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1314                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1315                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1316                         tsPoolList = list(range(1, 2))
1317                         self.assertNotIn(
1318                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1319                         self.assertEqual(
1320                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1321                         self.assertNotIn(
1322                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1323
1324     def test_82_delete_1GE_service(self):
1325         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
1326         response = test_utils_rfc8040.transportpce_api_rpc_request(
1327             'org-openroadm-service', 'service-delete',
1328             self.del_serv_input_data)
1329         self.assertEqual(response['status_code'], requests.codes.ok)
1330         time.sleep(self.WAITING)
1331
1332     def test_83_check_service_list(self):
1333         response = test_utils.get_service_list_request("")
1334         self.assertEqual(response.status_code, requests.codes.ok)
1335         res = response.json()
1336         self.assertEqual(len(res['service-list']['services']), 2)
1337         time.sleep(2)
1338
1339     def test_84_check_no_ODU0_connection_spdra(self):
1340         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1341         self.assertEqual(response.status_code, requests.codes.ok)
1342         res = response.json()
1343         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1344         time.sleep(1)
1345
1346     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1347         response = test_utils_rfc8040.check_node_attribute_request(
1348             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1349         self.assertEqual(response['status_code'], requests.codes.conflict)
1350
1351     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1352         response = test_utils_rfc8040.check_node_attribute_request(
1353             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1354         self.assertEqual(response['status_code'], requests.codes.conflict)
1355
1356     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1357         response = test_utils_rfc8040.check_node_attribute_request(
1358             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1359         self.assertEqual(response['status_code'], requests.codes.conflict)
1360
1361     def test_88_check_otn_topo_links(self):
1362         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1363         self.assertEqual(response['status_code'], requests.codes.ok)
1364         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1365         for link in response['network'][0]['ietf-network-topology:link']:
1366             linkId = link['link-id']
1367             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1368                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1369                 self.assertEqual(
1370                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1371                 self.assertEqual(
1372                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1373
1374     def test_89_check_otn_topo_tp(self):
1375         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1376         self.assertEqual(response['status_code'], requests.codes.ok)
1377         for node in response['network'][0]['node']:
1378             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1379                 tpList = node['ietf-network-topology:termination-point']
1380                 for tp in tpList:
1381                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1382                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1383                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1384                         self.assertEqual(
1385                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1386
1387     def test_90_delete_ODU4_service(self):
1388         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
1389         response = test_utils_rfc8040.transportpce_api_rpc_request(
1390             'org-openroadm-service', 'service-delete',
1391             self.del_serv_input_data)
1392         self.assertEqual(response['status_code'], requests.codes.ok)
1393         time.sleep(self.WAITING)
1394
1395     def test_91_delete_OCH_OTU4_service(self):
1396         self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
1397         response = test_utils_rfc8040.transportpce_api_rpc_request(
1398             'org-openroadm-service', 'service-delete',
1399             self.del_serv_input_data)
1400         self.assertEqual(response['status_code'], requests.codes.ok)
1401         time.sleep(self.WAITING)
1402
1403     def test_92_disconnect_xponders_from_roadm(self):
1404         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1405         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1406         self.assertEqual(response['status_code'], requests.codes.ok)
1407         links = response['network'][0]['ietf-network-topology:link']
1408         for link in links:
1409             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1410                 link_name = link["link-id"]
1411                 response = test_utils.delete_request(url+link_name)
1412                 self.assertEqual(response.status_code, requests.codes.ok)
1413
1414     def test_93_check_openroadm_topology(self):
1415         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1416         self.assertEqual(response['status_code'], requests.codes.ok)
1417         self.assertEqual(18,
1418                          len(response['network'][0]['ietf-network-topology:link']),
1419                          'Topology should contain 18 links')
1420
1421     def test_94_disconnect_spdrA(self):
1422         response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1423         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1424
1425     def test_95_disconnect_spdrC(self):
1426         response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1427         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1428
1429     def test_96_disconnect_roadmA(self):
1430         response = test_utils_rfc8040.unmount_device("ROADM-A1")
1431         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1432
1433     def test_97_disconnect_roadmC(self):
1434         response = test_utils_rfc8040.unmount_device("ROADM-C1")
1435         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1436
1437
1438 if __name__ == "__main__":
1439     unittest.main(verbosity=2)