Migrate OTN E2E functional tests to RFC8040 step 3
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26 import test_utils_rfc8040  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION = '2.2.1'
34
35     cr_serv_sample_data = {"input": {
36         "sdnc-request-header": {
37             "request-id": "request-1",
38             "rpc-action": "service-create",
39             "request-system-id": "appname"
40         },
41         "service-name": "service1-OCH-OTU4",
42         "common-id": "commonId",
43         "connection-type": "infrastructure",
44         "service-a-end": {
45             "service-rate": "100",
46             "node-id": "SPDR-SA1",
47             "service-format": "OTU",
48             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
49             "clli": "NodeSA",
50             "tx-direction": [{
51                 "port": {
52                     "port-device-name": "SPDR-SA1-XPDR1",
53                     "port-type": "fixed",
54                     "port-name": "XPDR1-NETWORK1",
55                     "port-rack": "000000.00",
56                     "port-shelf": "Chassis#1"
57                 },
58                 "lgx": {
59                     "lgx-device-name": "Some lgx-device-name",
60                     "lgx-port-name": "Some lgx-port-name",
61                     "lgx-port-rack": "000000.00",
62                     "lgx-port-shelf": "00"
63                 },
64                 "index": 0
65             }],
66             "rx-direction": [{
67                 "port": {
68                     "port-device-name": "SPDR-SA1-XPDR1",
69                     "port-type": "fixed",
70                     "port-name": "XPDR1-NETWORK1",
71                     "port-rack": "000000.00",
72                     "port-shelf": "Chassis#1"
73                 },
74                 "lgx": {
75                     "lgx-device-name": "Some lgx-device-name",
76                     "lgx-port-name": "Some lgx-port-name",
77                     "lgx-port-rack": "000000.00",
78                     "lgx-port-shelf": "00"
79                 },
80                 "index": 0
81             }],
82             "optic-type": "gray"
83         },
84         "service-z-end": {
85             "service-rate": "100",
86             "node-id": "SPDR-SC1",
87             "service-format": "OTU",
88             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89             "clli": "NodeSC",
90             "tx-direction": [{
91                 "port": {
92                     "port-device-name": "SPDR-SC1-XPDR1",
93                     "port-type": "fixed",
94                     "port-name": "XPDR1-NETWORK1",
95                     "port-rack": "000000.00",
96                     "port-shelf": "Chassis#1"
97                 },
98                 "lgx": {
99                     "lgx-device-name": "Some lgx-device-name",
100                     "lgx-port-name": "Some lgx-port-name",
101                     "lgx-port-rack": "000000.00",
102                     "lgx-port-shelf": "00"
103                 },
104                 "index": 0
105             }],
106             "rx-direction": [{
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 },
120                 "index": 0
121             }],
122             "optic-type": "gray"
123         },
124         "due-date": "2018-06-15T00:00:01Z",
125         "operator-contact": "pw1234"
126     }
127     }
128
129     @classmethod
130     def setUpClass(cls):
131         cls.processes = test_utils_rfc8040.start_tpce()
132         cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
133                                                        ('roadma', cls.NODE_VERSION),
134                                                        ('roadmc', cls.NODE_VERSION),
135                                                        ('spdrc', cls.NODE_VERSION)])
136
137     @classmethod
138     def tearDownClass(cls):
139         # pylint: disable=not-an-iterable
140         for process in cls.processes:
141             test_utils_rfc8040.shutdown_process(process)
142         print("all processes killed")
143
144     def setUp(self):
145         time.sleep(5)
146
147     def test_01_connect_spdrA(self):
148         response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149         self.assertEqual(response.status_code,
150                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
151
152     def test_02_connect_spdrC(self):
153         response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
154         self.assertEqual(response.status_code,
155                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
156
157     def test_03_connect_rdmA(self):
158         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
159         self.assertEqual(response.status_code,
160                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
161
162     def test_04_connect_rdmC(self):
163         response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
164         self.assertEqual(response.status_code,
165                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
166
167     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
168         response = test_utils_rfc8040.transportpce_api_rpc_request(
169             'transportpce-networkutils', 'init-xpdr-rdm-links',
170             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
171                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172         self.assertEqual(response['status_code'], requests.codes.ok)
173
174     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175         response = test_utils_rfc8040.transportpce_api_rpc_request(
176             'transportpce-networkutils', 'init-rdm-xpdr-links',
177             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
178                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
179         self.assertEqual(response['status_code'], requests.codes.ok)
180
181     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
182         response = test_utils_rfc8040.transportpce_api_rpc_request(
183             'transportpce-networkutils', 'init-xpdr-rdm-links',
184             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
185                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186         self.assertEqual(response['status_code'], requests.codes.ok)
187
188     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189         response = test_utils_rfc8040.transportpce_api_rpc_request(
190             'transportpce-networkutils', 'init-rdm-xpdr-links',
191             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193         self.assertEqual(response['status_code'], requests.codes.ok)
194
195     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196         # Config ROADMA-ROADMC oms-attributes
197         data = {"span": {
198             "auto-spanloss": "true",
199             "spanloss-base": 11.4,
200             "spanloss-current": 12,
201             "engineered-spanloss": 12.2,
202             "link-concatenation": [{
203                 "SRLG-Id": 0,
204                 "fiber-type": "smf",
205                 "SRLG-length": 100000,
206                 "pmd": 0.5}]}}
207         response = test_utils_rfc8040.add_oms_attr_request(
208             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
209         self.assertEqual(response.status_code, requests.codes.created)
210
211     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212         # Config ROADMC-ROADMA oms-attributes
213         data = {"span": {
214             "auto-spanloss": "true",
215             "spanloss-base": 11.4,
216             "spanloss-current": 12,
217             "engineered-spanloss": 12.2,
218             "link-concatenation": [{
219                 "SRLG-Id": 0,
220                 "fiber-type": "smf",
221                 "SRLG-length": 100000,
222                 "pmd": 0.5}]}}
223         response = test_utils_rfc8040.add_oms_attr_request(
224             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
225         self.assertEqual(response.status_code, requests.codes.created)
226
227 # test service-create for OCH-OTU4 service from spdr to spdr
228     def test_11_check_otn_topology(self):
229         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
230         self.assertEqual(response['status_code'], requests.codes.ok)
231         self.assertEqual(len(response['network'][0]['node']), 6)
232         self.assertNotIn('ietf-network-topology:link', response['network'][0])
233
234     def test_12_create_OCH_OTU4_service(self):
235         response = test_utils.service_create_request(self.cr_serv_sample_data)
236         self.assertEqual(response.status_code, requests.codes.ok)
237         res = response.json()
238         self.assertIn('PCE calculation in progress',
239                       res['output']['configuration-response-common']['response-message'])
240         time.sleep(self.WAITING)
241
242     def test_13_get_OCH_OTU4_service1(self):
243         response = test_utils.get_service_list_request(
244             "services/service1-OCH-OTU4")
245         self.assertEqual(response.status_code, requests.codes.ok)
246         res = response.json()
247         self.assertEqual(
248             res['services'][0]['administrative-state'], 'inService')
249         self.assertEqual(
250             res['services'][0]['service-name'], 'service1-OCH-OTU4')
251         self.assertEqual(
252             res['services'][0]['connection-type'], 'infrastructure')
253         self.assertEqual(
254             res['services'][0]['lifecycle-state'], 'planned')
255         time.sleep(2)
256
257     # Check correct configuration of devices
258     def test_14_check_interface_och_spdra(self):
259         response = test_utils_rfc8040.check_node_attribute_request(
260             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
261         self.assertEqual(response['status_code'], requests.codes.ok)
262         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
263                                    'administrative-state': 'inService',
264                                    'supporting-circuit-pack-name': 'CP1-CFP0',
265                                    'type': 'org-openroadm-interfaces:opticalChannel',
266                                    'supporting-port': 'CP1-CFP0-P1'
267                                    }, **response['interface'][0]),
268                              response['interface'][0])
269         self.assertEqual('org-openroadm-common-types:R100G',
270                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
271         self.assertEqual('dp-qpsk',
272                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
273         self.assertEqual(196.1,
274                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
275         self.assertEqual(
276             -5,
277             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
278
279     def test_15_check_interface_OTU4_spdra(self):
280         response = test_utils_rfc8040.check_node_attribute_request(
281             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
282         self.assertEqual(response['status_code'], requests.codes.ok)
283         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
284                         'administrative-state': 'inService',
285                         'supporting-circuit-pack-name': 'CP1-CFP0',
286                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
287                         'type': 'org-openroadm-interfaces:otnOtu',
288                         'supporting-port': 'CP1-CFP0-P1'
289                         }
290         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
291                         'expected-dapi': 'H/OelLynehI=',
292                         'tx-dapi': 'AMf1n5hK6Xkk',
293                         'expected-sapi': 'AMf1n5hK6Xkk',
294                         'rate': 'org-openroadm-otn-common-types:OTU4',
295                         'fec': 'scfec'
296                         }
297         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
298                              response['interface'][0])
299         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
300                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
301
302     def test_16_check_interface_och_spdrc(self):
303         response = test_utils_rfc8040.check_node_attribute_request(
304             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
305         self.assertEqual(response['status_code'], requests.codes.ok)
306         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
307                                    'administrative-state': 'inService',
308                                    'supporting-circuit-pack-name': 'CP1-CFP0',
309                                    'type': 'org-openroadm-interfaces:opticalChannel',
310                                    'supporting-port': 'CP1-CFP0-P1'
311                                    }, **response['interface'][0]),
312                              response['interface'][0])
313         self.assertEqual('org-openroadm-common-types:R100G',
314                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
315         self.assertEqual('dp-qpsk',
316                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
317         self.assertEqual(196.1,
318                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
319         self.assertEqual(
320             -5,
321             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
322
323     def test_17_check_interface_OTU4_spdrc(self):
324         response = test_utils_rfc8040.check_node_attribute_request(
325             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
326         self.assertEqual(response['status_code'], requests.codes.ok)
327         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
328                         'administrative-state': 'inService',
329                         'supporting-circuit-pack-name': 'CP1-CFP0',
330                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
331                         'type': 'org-openroadm-interfaces:otnOtu',
332                         'supporting-port': 'CP1-CFP0-P1'
333                         }
334         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
335                         'expected-sapi': 'H/OelLynehI=',
336                         'tx-sapi': 'AMf1n5hK6Xkk',
337                         'expected-dapi': 'AMf1n5hK6Xkk',
338                         'rate': 'org-openroadm-otn-common-types:OTU4',
339                         'fec': 'scfec'
340                         }
341         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
342                              response['interface'][0])
343         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
344                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
345
346     def test_18_check_no_interface_ODU4_spdra(self):
347         response = test_utils_rfc8040.check_node_attribute_request(
348             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
349         self.assertEqual(response['status_code'], requests.codes.conflict)
350
351     def test_19_check_openroadm_topo_spdra(self):
352         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
353         self.assertEqual(response['status_code'], requests.codes.ok)
354         ele = response['node']['ietf-network-topology:termination-point'][0]
355         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
356         self.assertEqual(
357             196.1,
358             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
359         self.assertEqual(
360             40,
361             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
362
363     def test_20_check_openroadm_topo_ROADMA_SRG(self):
364         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
365         self.assertEqual(response['status_code'], requests.codes.ok)
366         freq_map = base64.b64decode(
367             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
368         freq_map_array = [int(x) for x in freq_map]
369         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
370         liste_tp = response['node']['ietf-network-topology:termination-point']
371         for ele in liste_tp:
372             if ele['tp-id'] == 'SRG1-PP1-TXRX':
373                 freq_map = base64.b64decode(
374                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
375                 freq_map_array = [int(x) for x in freq_map]
376                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
377             if ele['tp-id'] == 'SRG1-PP2-TXRX':
378                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
379
380     def test_21_check_openroadm_topo_ROADMA_DEG(self):
381         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
382         self.assertEqual(response['status_code'], requests.codes.ok)
383         freq_map = base64.b64decode(
384             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
385         freq_map_array = [int(x) for x in freq_map]
386         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
387         liste_tp = response['node']['ietf-network-topology:termination-point']
388         for ele in liste_tp:
389             if ele['tp-id'] == 'DEG2-CTP-TXRX':
390                 freq_map = base64.b64decode(
391                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
392                 freq_map_array = [int(x) for x in freq_map]
393                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
394             if ele['tp-id'] == 'DEG2-TTP-TXRX':
395                 freq_map = base64.b64decode(
396                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
397                 freq_map_array = [int(x) for x in freq_map]
398                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
399
400     def test_22_check_otn_topo_otu4_links(self):
401         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
402         self.assertEqual(response['status_code'], requests.codes.ok)
403         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
404         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
405                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
406         for link in response['network'][0]['ietf-network-topology:link']:
407             self.assertIn(link['link-id'], listLinkId)
408             self.assertEqual(
409                 link['transportpce-topology:otn-link-type'], 'OTU4')
410             self.assertEqual(
411                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
412             self.assertEqual(
413                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
414             self.assertEqual(
415                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
416             self.assertIn(
417                 link['org-openroadm-common-network:opposite-link'], listLinkId)
418
419 # test service-create for ODU4 service from spdr to spdr
420     def test_23_create_ODU4_service(self):
421         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
422         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
423         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
424         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
425         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
426         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
427         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
428
429         response = test_utils.service_create_request(self.cr_serv_sample_data)
430         self.assertEqual(response.status_code, requests.codes.ok)
431         res = response.json()
432         self.assertIn('PCE calculation in progress',
433                       res['output']['configuration-response-common']['response-message'])
434         time.sleep(self.WAITING)
435
436     def test_24_get_ODU4_service1(self):
437         response = test_utils.get_service_list_request(
438             "services/service1-ODU4")
439         self.assertEqual(response.status_code, requests.codes.ok)
440         res = response.json()
441         self.assertEqual(
442             res['services'][0]['administrative-state'], 'inService')
443         self.assertEqual(
444             res['services'][0]['service-name'], 'service1-ODU4')
445         self.assertEqual(
446             res['services'][0]['connection-type'], 'infrastructure')
447         self.assertEqual(
448             res['services'][0]['lifecycle-state'], 'planned')
449         time.sleep(2)
450
451     def test_25_check_interface_ODU4_spdra(self):
452         response = test_utils_rfc8040.check_node_attribute_request(
453             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
454         self.assertEqual(response['status_code'], requests.codes.ok)
455         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
456                         'administrative-state': 'inService',
457                         'supporting-circuit-pack-name': 'CP1-CFP0',
458                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
459                         'type': 'org-openroadm-interfaces:otnOdu',
460                         'supporting-port': 'CP1-CFP0-P1',
461                         'circuit-id': 'TBD',
462                         'description': 'TBD'}
463         # SAPI/DAPI are added in the Otu4 renderer
464         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
465                         'rate': 'org-openroadm-otn-common-types:ODU4',
466                         'monitoring-mode': 'terminated',
467                         'expected-dapi': 'H/OelLynehI=',
468                         'expected-sapi': 'AMf1n5hK6Xkk',
469                         'tx-dapi': 'AMf1n5hK6Xkk',
470                         'tx-sapi': 'H/OelLynehI='}
471
472         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
473                              response['interface'][0])
474         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
475                                   **input_dict_2),
476                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
477                              )
478         self.assertDictEqual(
479             {'payload-type': '21', 'exp-payload-type': '21'},
480             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
481
482     def test_26_check_interface_ODU4_spdrc(self):
483         response = test_utils_rfc8040.check_node_attribute_request(
484             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
485         self.assertEqual(response['status_code'], requests.codes.ok)
486         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
487                         'administrative-state': 'inService',
488                         'supporting-circuit-pack-name': 'CP1-CFP0',
489                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
490                         'type': 'org-openroadm-interfaces:otnOdu',
491                         'supporting-port': 'CP1-CFP0-P1',
492                         'circuit-id': 'TBD',
493                         'description': 'TBD'}
494         # SAPI/DAPI are added in the Otu4 renderer
495         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
496                         'rate': 'org-openroadm-otn-common-types:ODU4',
497                         'monitoring-mode': 'terminated',
498                         'tx-sapi': 'AMf1n5hK6Xkk',
499                         'tx-dapi': 'H/OelLynehI=',
500                         'expected-sapi': 'H/OelLynehI=',
501                         'expected-dapi': 'AMf1n5hK6Xkk'
502                         }
503         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
504                              response['interface'][0])
505         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
506                                   **input_dict_2),
507                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
508                              )
509         self.assertDictEqual(
510             {'payload-type': '21', 'exp-payload-type': '21'},
511             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
512
513     def test_27_check_otn_topo_links(self):
514         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
515         self.assertEqual(response['status_code'], requests.codes.ok)
516         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
517         for link in response['network'][0]['ietf-network-topology:link']:
518             linkId = link['link-id']
519             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
520                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
521                 self.assertEqual(
522                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
523                 self.assertEqual(
524                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
525             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
526                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
527                 self.assertEqual(
528                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
529                 self.assertEqual(
530                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
531                 self.assertEqual(
532                     link['transportpce-topology:otn-link-type'], 'ODTU4')
533                 self.assertEqual(
534                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
535                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
536                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
537                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
538             else:
539                 self.fail("this link should not exist")
540
541     def test_28_check_otn_topo_tp(self):
542         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
543         self.assertEqual(response['status_code'], requests.codes.ok)
544         for node in response['network'][0]['node']:
545             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
546                 tpList = node['ietf-network-topology:termination-point']
547                 for tp in tpList:
548                     if tp['tp-id'] == 'XPDR1-NETWORK1':
549                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
550                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
551                         self.assertEqual(
552                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
553                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
554                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
555
556 # test service-create for 10GE service from spdr to spdr
557     def test_29_create_10GE_service(self):
558         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
559         self.cr_serv_sample_data["input"]["connection-type"] = "service"
560         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
561         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
562         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
563         self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
564         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
565         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
566         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
567         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
568         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
569         self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
570         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
571         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
572         response = test_utils.service_create_request(self.cr_serv_sample_data)
573         self.assertEqual(response.status_code, requests.codes.ok)
574         res = response.json()
575         self.assertIn('PCE calculation in progress',
576                       res['output']['configuration-response-common']['response-message'])
577         time.sleep(self.WAITING)
578
579     def test_30_get_10GE_service1(self):
580         response = test_utils.get_service_list_request(
581             "services/service1-10GE")
582         self.assertEqual(response.status_code, requests.codes.ok)
583         res = response.json()
584         self.assertEqual(
585             res['services'][0]['administrative-state'], 'inService')
586         self.assertEqual(
587             res['services'][0]['service-name'], 'service1-10GE')
588         self.assertEqual(
589             res['services'][0]['connection-type'], 'service')
590         self.assertEqual(
591             res['services'][0]['lifecycle-state'], 'planned')
592         time.sleep(2)
593
594     def test_31_check_interface_10GE_CLIENT_spdra(self):
595         response = test_utils_rfc8040.check_node_attribute_request(
596             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
597         self.assertEqual(response['status_code'], requests.codes.ok)
598         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
599                       'administrative-state': 'inService',
600                       'supporting-circuit-pack-name': 'CP1-SFP4',
601                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
602                       'supporting-port': 'CP1-SFP4-P1'
603                       }
604         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
605                              response['interface'][0])
606         self.assertEqual(
607             10000,
608             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
609
610     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
611         response = test_utils_rfc8040.check_node_attribute_request(
612             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
613         self.assertEqual(response['status_code'], requests.codes.ok)
614         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
615                         'administrative-state': 'inService',
616                         'supporting-circuit-pack-name': 'CP1-SFP4',
617                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
618                         'type': 'org-openroadm-interfaces:otnOdu',
619                         'supporting-port': 'CP1-SFP4-P1'}
620         input_dict_2 = {
621             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
622             'rate': 'org-openroadm-otn-common-types:ODU2e',
623             'monitoring-mode': 'terminated'}
624
625         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
626                              response['interface'][0])
627         self.assertDictEqual(dict(input_dict_2,
628                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
629                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
630         self.assertDictEqual(
631             {'payload-type': '03', 'exp-payload-type': '03'},
632             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
633
634     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
635         response = test_utils_rfc8040.check_node_attribute_request(
636             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
637         self.assertEqual(response['status_code'], requests.codes.ok)
638         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
639                         'administrative-state': 'inService',
640                         'supporting-circuit-pack-name': 'CP1-CFP0',
641                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
642                         'type': 'org-openroadm-interfaces:otnOdu',
643                         'supporting-port': 'CP1-CFP0-P1'}
644         input_dict_2 = {
645             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
646             'rate': 'org-openroadm-otn-common-types:ODU2e',
647             'monitoring-mode': 'monitored'}
648         input_dict_3 = {'trib-port-number': 1}
649
650         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
651                              response['interface'][0])
652         self.assertDictEqual(dict(input_dict_2,
653                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
654                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
655         self.assertDictEqual(dict(input_dict_3,
656                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
657                                       'parent-odu-allocation']),
658                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
659         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
660                       ['trib-slots'])
661
662     def test_34_check_ODU2E_connection_spdra(self):
663         response = test_utils_rfc8040.check_node_attribute_request(
664             'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
665         self.assertEqual(response['status_code'], requests.codes.ok)
666         input_dict_1 = {
667             'connection-name':
668             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
669             'direction': 'bidirectional'
670         }
671
672         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
673                              response['odu-connection'][0])
674         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
675                              response['odu-connection'][0]['destination'])
676         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
677                              response['odu-connection'][0]['source'])
678
679     def test_35_check_interface_10GE_CLIENT_spdrc(self):
680         response = test_utils_rfc8040.check_node_attribute_request(
681             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
682         self.assertEqual(response['status_code'], requests.codes.ok)
683         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
684                       'administrative-state': 'inService',
685                       'supporting-circuit-pack-name': 'CP1-SFP4',
686                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
687                       'supporting-port': 'CP1-SFP4-P1'
688                       }
689         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
690                              response['interface'][0])
691         self.assertEqual(
692             10000,
693             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
694
695     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
696         response = test_utils_rfc8040.check_node_attribute_request(
697             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
698         self.assertEqual(response['status_code'], requests.codes.ok)
699         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
700                         'administrative-state': 'inService',
701                         'supporting-circuit-pack-name': 'CP1-SFP4',
702                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
703                         'type': 'org-openroadm-interfaces:otnOdu',
704                         'supporting-port': 'CP1-SFP4-P1'}
705         input_dict_2 = {
706             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
707             'rate': 'org-openroadm-otn-common-types:ODU2e',
708             'monitoring-mode': 'terminated'}
709
710         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
711                              response['interface'][0])
712         self.assertDictEqual(dict(input_dict_2,
713                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
714                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
715         self.assertDictEqual(
716             {'payload-type': '03', 'exp-payload-type': '03'},
717             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
718
719     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
720         response = test_utils_rfc8040.check_node_attribute_request(
721             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
722         self.assertEqual(response['status_code'], requests.codes.ok)
723         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
724                         'administrative-state': 'inService',
725                         'supporting-circuit-pack-name': 'CP1-CFP0',
726                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
727                         'type': 'org-openroadm-interfaces:otnOdu',
728                         'supporting-port': 'CP1-CFP0-P1'}
729         input_dict_2 = {
730             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
731             'rate': 'org-openroadm-otn-common-types:ODU2e',
732             'monitoring-mode': 'monitored'}
733
734         input_dict_3 = {'trib-port-number': 1}
735
736         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
737                              response['interface'][0])
738         self.assertDictEqual(dict(input_dict_2,
739                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
740                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
741         self.assertDictEqual(dict(input_dict_3,
742                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
743                                       'parent-odu-allocation']),
744                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
745             'parent-odu-allocation'])
746         self.assertIn(1,
747                       response['interface'][0][
748                           'org-openroadm-otn-odu-interfaces:odu'][
749                           'parent-odu-allocation']['trib-slots'])
750
751     def test_38_check_ODU2E_connection_spdrc(self):
752         response = test_utils_rfc8040.check_node_attribute_request(
753             'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
754         self.assertEqual(response['status_code'], requests.codes.ok)
755         input_dict_1 = {
756             'connection-name':
757             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
758             'direction': 'bidirectional'
759         }
760
761         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
762                              response['odu-connection'][0])
763         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
764                              response['odu-connection'][0]['destination'])
765         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
766                              response['odu-connection'][0]['source'])
767
768     def test_39_check_otn_topo_links(self):
769         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
770         self.assertEqual(response['status_code'], requests.codes.ok)
771         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
772         for link in response['network'][0]['ietf-network-topology:link']:
773             linkId = link['link-id']
774             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
775                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
776                 self.assertEqual(
777                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
778                 self.assertEqual(
779                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
780
781     def test_40_check_otn_topo_tp(self):
782         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
783         self.assertEqual(response['status_code'], requests.codes.ok)
784         for node in response['network'][0]['node']:
785             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
786                 tpList = node['ietf-network-topology:termination-point']
787                 for tp in tpList:
788                     if tp['tp-id'] == 'XPDR1-NETWORK1':
789                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
790                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
791                         tsPoolList = list(range(1, 9))
792                         self.assertNotIn(
793                             tsPoolList, xpdrTpPortConAt['ts-pool'])
794                         self.assertEqual(
795                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
796                         self.assertNotIn(
797                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
798
799     def test_41_delete_10GE_service(self):
800         response = test_utils.service_delete_request("service1-10GE")
801         self.assertEqual(response.status_code, requests.codes.ok)
802         res = response.json()
803         self.assertIn('Renderer service delete in progress',
804                       res['output']['configuration-response-common']['response-message'])
805         time.sleep(self.WAITING)
806
807     def test_42_check_service_list(self):
808         response = test_utils.get_service_list_request("")
809         self.assertEqual(response.status_code, requests.codes.ok)
810         res = response.json()
811         self.assertEqual(len(res['service-list']['services']), 2)
812         time.sleep(2)
813
814     def test_43_check_no_ODU2e_connection_spdra(self):
815         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
816         self.assertEqual(response.status_code, requests.codes.ok)
817         res = response.json()
818         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
819         time.sleep(1)
820
821     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
822         response = test_utils_rfc8040.check_node_attribute_request(
823             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
824         self.assertEqual(response['status_code'], requests.codes.conflict)
825
826     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
827         response = test_utils_rfc8040.check_node_attribute_request(
828             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
829         self.assertEqual(response['status_code'], requests.codes.conflict)
830
831     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
832         response = test_utils_rfc8040.check_node_attribute_request(
833             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
834         self.assertEqual(response['status_code'], requests.codes.conflict)
835
836     def test_47_check_otn_topo_links(self):
837         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
838         self.assertEqual(response['status_code'], requests.codes.ok)
839         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
840         for link in response['network'][0]['ietf-network-topology:link']:
841             linkId = link['link-id']
842             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
843                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
844                 self.assertEqual(
845                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
846                 self.assertEqual(
847                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
848
849     def test_48_check_otn_topo_tp(self):
850         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
851         self.assertEqual(response['status_code'], requests.codes.ok)
852         for node in response['network'][0]['node']:
853             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
854                 tpList = node['ietf-network-topology:termination-point']
855                 for tp in tpList:
856                     if tp['tp-id'] == 'XPDR1-NETWORK1':
857                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
858                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
859                         self.assertEqual(
860                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
861
862     def test_49_delete_ODU4_service(self):
863         response = test_utils.service_delete_request("service1-ODU4")
864         self.assertEqual(response.status_code, requests.codes.ok)
865         res = response.json()
866         self.assertIn('Renderer service delete in progress',
867                       res['output']['configuration-response-common']['response-message'])
868         time.sleep(self.WAITING)
869
870     def test_50_check_service_list(self):
871         response = test_utils.get_service_list_request("")
872         self.assertEqual(response.status_code, requests.codes.ok)
873         res = response.json()
874         self.assertEqual(len(res['service-list']['services']), 1)
875         time.sleep(2)
876
877     def test_51_check_no_interface_ODU4_spdra(self):
878         response = test_utils_rfc8040.check_node_attribute_request(
879             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
880         self.assertEqual(response['status_code'], requests.codes.conflict)
881
882     def test_52_check_otn_topo_links(self):
883         self.test_22_check_otn_topo_otu4_links()
884
885     def test_53_check_otn_topo_tp(self):
886         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
887         self.assertEqual(response['status_code'], requests.codes.ok)
888         for node in response['network'][0]['node']:
889             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
890                 tpList = node['ietf-network-topology:termination-point']
891                 for tp in tpList:
892                     if tp['tp-id'] == 'XPDR1-NETWORK1':
893                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
894                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
895                         self.assertNotIn(
896                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
897
898     def test_54_delete_OCH_OTU4_service(self):
899         response = test_utils.service_delete_request("service1-OCH-OTU4")
900         self.assertEqual(response.status_code, requests.codes.ok)
901         res = response.json()
902         self.assertIn('Renderer service delete in progress',
903                       res['output']['configuration-response-common']['response-message'])
904         time.sleep(self.WAITING)
905
906     def test_55_get_no_service(self):
907         response = test_utils.get_service_list_request("")
908         self.assertEqual(response.status_code, requests.codes.conflict)
909         res = response.json()
910         self.assertIn(
911             {"error-type": "application", "error-tag": "data-missing",
912              "error-message": "Request could not be completed because the relevant data model content does not exist"},
913             res['errors']['error'])
914         time.sleep(1)
915
916     def test_56_check_no_interface_OTU4_spdra(self):
917         response = test_utils_rfc8040.check_node_attribute_request(
918             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
919         self.assertEqual(response['status_code'], requests.codes.conflict)
920
921     def test_57_check_no_interface_OCH_spdra(self):
922         response = test_utils_rfc8040.check_node_attribute_request(
923             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
924         self.assertEqual(response['status_code'], requests.codes.conflict)
925
926     def test_58_getLinks_OtnTopology(self):
927         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
928         self.assertEqual(response['status_code'], requests.codes.ok)
929         self.assertNotIn('ietf-network-topology:link', response['network'][0])
930
931     def test_59_check_openroadm_topo_spdra(self):
932         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
933         self.assertEqual(response['status_code'], requests.codes.ok)
934         tp = response['node']['ietf-network-topology:termination-point'][0]
935         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
936         self.assertNotIn('wavelength', dict.keys(
937             tp['org-openroadm-network-topology:xpdr-network-attributes']))
938
939     def test_60_check_openroadm_topo_ROADMA_SRG(self):
940         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
941         self.assertEqual(response['status_code'], requests.codes.ok)
942         freq_map = base64.b64decode(
943             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
944         freq_map_array = [int(x) for x in freq_map]
945         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
946         liste_tp = response['node']['ietf-network-topology:termination-point']
947         for ele in liste_tp:
948             if ele['tp-id'] == 'SRG1-PP1-TXRX':
949                 freq_map = base64.b64decode(
950                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
951                 freq_map_array = [int(x) for x in freq_map]
952                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
953         time.sleep(3)
954
955     def test_61_check_openroadm_topo_ROADMA_DEG(self):
956         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
957         self.assertEqual(response['status_code'], requests.codes.ok)
958         freq_map = base64.b64decode(
959             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
960         freq_map_array = [int(x) for x in freq_map]
961         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
962         liste_tp = response['node']['ietf-network-topology:termination-point']
963         for ele in liste_tp:
964             if ele['tp-id'] == 'DEG2-CTP-TXRX':
965                 freq_map = base64.b64decode(
966                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
967                 freq_map_array = [int(x) for x in freq_map]
968                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
969             if ele['tp-id'] == 'DEG2-TTP-TXRX':
970                 freq_map = base64.b64decode(
971                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
972                 freq_map_array = [int(x) for x in freq_map]
973                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
974         time.sleep(3)
975
976     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
977         response = test_utils_rfc8040.transportpce_api_rpc_request(
978             'transportpce-networkutils', 'init-xpdr-rdm-links',
979             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
980                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
981         self.assertEqual(response['status_code'], requests.codes.ok)
982
983     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
984         response = test_utils_rfc8040.transportpce_api_rpc_request(
985             'transportpce-networkutils', 'init-rdm-xpdr-links',
986             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
987                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
988         self.assertEqual(response['status_code'], requests.codes.ok)
989
990     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
991         response = test_utils_rfc8040.transportpce_api_rpc_request(
992             'transportpce-networkutils', 'init-xpdr-rdm-links',
993             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
994                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
995         self.assertEqual(response['status_code'], requests.codes.ok)
996
997     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
998         response = test_utils_rfc8040.transportpce_api_rpc_request(
999             'transportpce-networkutils', 'init-rdm-xpdr-links',
1000             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1001                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1002         self.assertEqual(response['status_code'], requests.codes.ok)
1003
1004     def test_66_create_OCH_OTU4_service_2(self):
1005         # pylint: disable=line-too-long
1006         self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1007         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1008         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1009         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1010         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1011         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1012         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1013         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1014         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1015         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1016         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1017         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1018         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1019         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1020         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1021         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1022         response = test_utils.service_create_request(self.cr_serv_sample_data)
1023         self.assertEqual(response.status_code, requests.codes.ok)
1024         res = response.json()
1025         self.assertIn('PCE calculation in progress',
1026                       res['output']['configuration-response-common']['response-message'])
1027         time.sleep(self.WAITING)
1028
1029     def test_67_get_OCH_OTU4_service2(self):
1030         response = test_utils.get_service_list_request(
1031             "services/service2-OCH-OTU4")
1032         self.assertEqual(response.status_code, requests.codes.ok)
1033         res = response.json()
1034         self.assertEqual(
1035             res['services'][0]['administrative-state'], 'inService')
1036         self.assertEqual(
1037             res['services'][0]['service-name'], 'service2-OCH-OTU4')
1038         self.assertEqual(
1039             res['services'][0]['connection-type'], 'infrastructure')
1040         self.assertEqual(
1041             res['services'][0]['lifecycle-state'], 'planned')
1042         time.sleep(2)
1043
1044     def test_68_create_ODU4_service_2(self):
1045         # pylint: disable=line-too-long
1046         self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1047         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1048         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1049         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1050         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1051         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1052         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1053         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1054         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1055         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1056         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1057         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1058         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1059         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1060         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1061
1062         response = test_utils.service_create_request(self.cr_serv_sample_data)
1063         self.assertEqual(response.status_code, requests.codes.ok)
1064         res = response.json()
1065         self.assertIn('PCE calculation in progress',
1066                       res['output']['configuration-response-common']['response-message'])
1067         time.sleep(self.WAITING)
1068
1069     def test_69_get_ODU4_service2(self):
1070         response = test_utils.get_service_list_request(
1071             "services/service2-ODU4")
1072         self.assertEqual(response.status_code, requests.codes.ok)
1073         res = response.json()
1074         self.assertEqual(
1075             res['services'][0]['administrative-state'], 'inService')
1076         self.assertEqual(
1077             res['services'][0]['service-name'], 'service2-ODU4')
1078         self.assertEqual(
1079             res['services'][0]['connection-type'], 'infrastructure')
1080         self.assertEqual(
1081             res['services'][0]['lifecycle-state'], 'planned')
1082         time.sleep(2)
1083
1084     def test_70_create_1GE_service(self):
1085         # pylint: disable=line-too-long
1086         self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1087         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1088         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1089         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1090         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1091         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1092         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1093         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1094         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1095         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1096         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1097         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1098         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1099         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1100         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1101         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1102         response = test_utils.service_create_request(self.cr_serv_sample_data)
1103         self.assertEqual(response.status_code, requests.codes.ok)
1104         res = response.json()
1105         self.assertIn('PCE calculation in progress',
1106                       res['output']['configuration-response-common']['response-message'])
1107         time.sleep(self.WAITING)
1108
1109     def test_71_get_1GE_service1(self):
1110         response = test_utils.get_service_list_request("services/service1-1GE")
1111         self.assertEqual(response.status_code, requests.codes.ok)
1112         res = response.json()
1113         self.assertEqual(
1114             res['services'][0]['administrative-state'], 'inService')
1115         self.assertEqual(
1116             res['services'][0]['service-name'], 'service1-1GE')
1117         self.assertEqual(
1118             res['services'][0]['connection-type'], 'service')
1119         self.assertEqual(
1120             res['services'][0]['lifecycle-state'], 'planned')
1121         time.sleep(2)
1122
1123     def test_72_check_interface_1GE_CLIENT_spdra(self):
1124         response = test_utils_rfc8040.check_node_attribute_request(
1125             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1126         self.assertEqual(response['status_code'], requests.codes.ok)
1127         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1128                       'administrative-state': 'inService',
1129                       'supporting-circuit-pack-name': 'CP1-SFP4',
1130                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1131                       'supporting-port': 'CP1-SFP4-P1'
1132                       }
1133         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1134                              response['interface'][0])
1135         self.assertEqual(
1136             1000,
1137             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1138
1139     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1140         response = test_utils_rfc8040.check_node_attribute_request(
1141             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1142         self.assertEqual(response['status_code'], requests.codes.ok)
1143         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1144                         'administrative-state': 'inService',
1145                         'supporting-circuit-pack-name': 'CP3-SFP1',
1146                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1147                         'type': 'org-openroadm-interfaces:otnOdu',
1148                         'supporting-port': 'CP3-SFP1-P1'}
1149         input_dict_2 = {
1150             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1151             'rate': 'org-openroadm-otn-common-types:ODU0',
1152             'monitoring-mode': 'terminated'}
1153         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1154                              response['interface'][0])
1155         self.assertDictEqual(dict(input_dict_2,
1156                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1157                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1158         self.assertDictEqual(
1159             {'payload-type': '07', 'exp-payload-type': '07'},
1160             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1161
1162     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1163         response = test_utils_rfc8040.check_node_attribute_request(
1164             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1165         self.assertEqual(response['status_code'], requests.codes.ok)
1166         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1167                         'administrative-state': 'inService',
1168                         'supporting-circuit-pack-name': 'CP3-CFP0',
1169                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1170                         'type': 'org-openroadm-interfaces:otnOdu',
1171                         'supporting-port': 'CP3-CFP0-P1'}
1172         input_dict_2 = {
1173             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1174             'rate': 'org-openroadm-otn-common-types:ODU0',
1175             'monitoring-mode': 'monitored'}
1176         input_dict_3 = {'trib-port-number': 1}
1177         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1178                              response['interface'][0])
1179         self.assertDictEqual(dict(input_dict_2,
1180                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1181                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1182         self.assertDictEqual(dict(input_dict_3,
1183                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1184                                       'parent-odu-allocation']),
1185                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1186         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1187                       ['trib-slots'])
1188
1189     def test_75_check_ODU0_connection_spdra(self):
1190         response = test_utils_rfc8040.check_node_attribute_request(
1191             'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1192         self.assertEqual(response['status_code'], requests.codes.ok)
1193         input_dict_1 = {
1194             'connection-name':
1195             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1196             'direction': 'bidirectional'
1197         }
1198         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1199                              response['odu-connection'][0])
1200         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1201                              response['odu-connection'][0]['destination'])
1202         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1203                              response['odu-connection'][0]['source'])
1204
1205     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1206         response = test_utils_rfc8040.check_node_attribute_request(
1207             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1208         self.assertEqual(response['status_code'], requests.codes.ok)
1209         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1210                       'administrative-state': 'inService',
1211                       'supporting-circuit-pack-name': 'CP3-SFP1',
1212                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1213                       'supporting-port': 'CP3-SFP1-P1'
1214                       }
1215         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1216                              response['interface'][0])
1217         self.assertEqual(
1218             1000,
1219             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1220
1221     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1222         response = test_utils_rfc8040.check_node_attribute_request(
1223             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1224         self.assertEqual(response['status_code'], requests.codes.ok)
1225         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1226                         'administrative-state': 'inService',
1227                         'supporting-circuit-pack-name': 'CP3-SFP1',
1228                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1229                         'type': 'org-openroadm-interfaces:otnOdu',
1230                         'supporting-port': 'CP3-SFP1-P1'}
1231         input_dict_2 = {
1232             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1233             'rate': 'org-openroadm-otn-common-types:ODU0',
1234             'monitoring-mode': 'terminated'}
1235         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1236                              response['interface'][0])
1237         self.assertDictEqual(dict(input_dict_2,
1238                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1239                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1240         self.assertDictEqual(
1241             {'payload-type': '07', 'exp-payload-type': '07'},
1242             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1243
1244     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1245         response = test_utils_rfc8040.check_node_attribute_request(
1246             'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1247         self.assertEqual(response['status_code'], requests.codes.ok)
1248         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1249                         'administrative-state': 'inService',
1250                         'supporting-circuit-pack-name': 'CP3-CFP0',
1251                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1252                         'type': 'org-openroadm-interfaces:otnOdu',
1253                         'supporting-port': 'CP3-CFP0-P1'}
1254         input_dict_2 = {
1255             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1256             'rate': 'org-openroadm-otn-common-types:ODU0',
1257             'monitoring-mode': 'monitored'}
1258         input_dict_3 = {'trib-port-number': 1}
1259         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1260                              response['interface'][0])
1261         self.assertDictEqual(dict(input_dict_2,
1262                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1263                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1264         self.assertDictEqual(dict(input_dict_3,
1265                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1266                                       'parent-odu-allocation']),
1267                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1268             'parent-odu-allocation'])
1269         self.assertIn(1,
1270                       response['interface'][0][
1271                           'org-openroadm-otn-odu-interfaces:odu'][
1272                           'parent-odu-allocation']['trib-slots'])
1273
1274     def test_79_check_ODU0_connection_spdrc(self):
1275         response = test_utils_rfc8040.check_node_attribute_request(
1276             'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1277         self.assertEqual(response['status_code'], requests.codes.ok)
1278         input_dict_1 = {
1279             'connection-name':
1280             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1281             'direction': 'bidirectional'
1282         }
1283         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1284                              response['odu-connection'][0])
1285         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1286                              response['odu-connection'][0]['destination'])
1287         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1288                              response['odu-connection'][0]['source'])
1289
1290     def test_80_check_otn_topo_links(self):
1291         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1292         self.assertEqual(response['status_code'], requests.codes.ok)
1293         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1294         for link in response['network'][0]['ietf-network-topology:link']:
1295             linkId = link['link-id']
1296             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1297                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1298                 self.assertEqual(
1299                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1300                 self.assertEqual(
1301                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1302
1303     def test_81_check_otn_topo_tp(self):
1304         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1305         self.assertEqual(response['status_code'], requests.codes.ok)
1306         for node in response['network'][0]['node']:
1307             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1308                 tpList = node['ietf-network-topology:termination-point']
1309                 for tp in tpList:
1310                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1311                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1312                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1313                         tsPoolList = list(range(1, 2))
1314                         self.assertNotIn(
1315                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1316                         self.assertEqual(
1317                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1318                         self.assertNotIn(
1319                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1320
1321     def test_82_delete_1GE_service(self):
1322         response = test_utils.service_delete_request("service1-1GE")
1323         self.assertEqual(response.status_code, requests.codes.ok)
1324         res = response.json()
1325         self.assertIn('Renderer service delete in progress',
1326                       res['output']['configuration-response-common']['response-message'])
1327         time.sleep(self.WAITING)
1328
1329     def test_83_check_service_list(self):
1330         response = test_utils.get_service_list_request("")
1331         self.assertEqual(response.status_code, requests.codes.ok)
1332         res = response.json()
1333         self.assertEqual(len(res['service-list']['services']), 2)
1334         time.sleep(2)
1335
1336     def test_84_check_no_ODU0_connection_spdra(self):
1337         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1338         self.assertEqual(response.status_code, requests.codes.ok)
1339         res = response.json()
1340         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1341         time.sleep(1)
1342
1343     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1344         response = test_utils_rfc8040.check_node_attribute_request(
1345             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1346         self.assertEqual(response['status_code'], requests.codes.conflict)
1347
1348     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1349         response = test_utils_rfc8040.check_node_attribute_request(
1350             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1351         self.assertEqual(response['status_code'], requests.codes.conflict)
1352
1353     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1354         response = test_utils_rfc8040.check_node_attribute_request(
1355             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1356         self.assertEqual(response['status_code'], requests.codes.conflict)
1357
1358     def test_88_check_otn_topo_links(self):
1359         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1360         self.assertEqual(response['status_code'], requests.codes.ok)
1361         self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1362         for link in response['network'][0]['ietf-network-topology:link']:
1363             linkId = link['link-id']
1364             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1365                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1366                 self.assertEqual(
1367                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1368                 self.assertEqual(
1369                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1370
1371     def test_89_check_otn_topo_tp(self):
1372         response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1373         self.assertEqual(response['status_code'], requests.codes.ok)
1374         for node in response['network'][0]['node']:
1375             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1376                 tpList = node['ietf-network-topology:termination-point']
1377                 for tp in tpList:
1378                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1379                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1380                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1381                         self.assertEqual(
1382                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1383
1384     def test_90_delete_ODU4_service(self):
1385         response = test_utils.service_delete_request("service2-ODU4")
1386         self.assertEqual(response.status_code, requests.codes.ok)
1387         res = response.json()
1388         self.assertIn('Renderer service delete in progress',
1389                       res['output']['configuration-response-common']['response-message'])
1390         time.sleep(self.WAITING)
1391
1392     def test_91_delete_OCH_OTU4_service(self):
1393         response = test_utils.service_delete_request("service2-OCH-OTU4")
1394         self.assertEqual(response.status_code, requests.codes.ok)
1395         res = response.json()
1396         self.assertIn('Renderer service delete in progress',
1397                       res['output']['configuration-response-common']['response-message'])
1398         time.sleep(self.WAITING)
1399
1400     def test_92_disconnect_xponders_from_roadm(self):
1401         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1402         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1403         self.assertEqual(response['status_code'], requests.codes.ok)
1404         links = response['network'][0]['ietf-network-topology:link']
1405         for link in links:
1406             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1407                 link_name = link["link-id"]
1408                 response = test_utils.delete_request(url+link_name)
1409                 self.assertEqual(response.status_code, requests.codes.ok)
1410
1411     def test_93_check_openroadm_topology(self):
1412         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1413         self.assertEqual(response['status_code'], requests.codes.ok)
1414         self.assertEqual(18,
1415                          len(response['network'][0]['ietf-network-topology:link']),
1416                          'Topology should contain 18 links')
1417
1418     def test_94_disconnect_spdrA(self):
1419         response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1420         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1421
1422     def test_95_disconnect_spdrC(self):
1423         response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1424         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1425
1426     def test_96_disconnect_roadmA(self):
1427         response = test_utils_rfc8040.unmount_device("ROADM-A1")
1428         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1429
1430     def test_97_disconnect_roadmC(self):
1431         response = test_utils_rfc8040.unmount_device("ROADM-C1")
1432         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1433
1434
1435 if __name__ == "__main__":
1436     unittest.main(verbosity=2)