Migrate OTN E2E functional tests to RFC8040 step 2
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26 import test_utils_rfc8040  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION = '2.2.1'
34
35     cr_serv_sample_data = {"input": {
36         "sdnc-request-header": {
37             "request-id": "request-1",
38             "rpc-action": "service-create",
39             "request-system-id": "appname"
40         },
41         "service-name": "service1-OCH-OTU4",
42         "common-id": "commonId",
43         "connection-type": "infrastructure",
44         "service-a-end": {
45             "service-rate": "100",
46             "node-id": "SPDR-SA1",
47             "service-format": "OTU",
48             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
49             "clli": "NodeSA",
50             "tx-direction": [{
51                 "port": {
52                     "port-device-name": "SPDR-SA1-XPDR1",
53                     "port-type": "fixed",
54                     "port-name": "XPDR1-NETWORK1",
55                     "port-rack": "000000.00",
56                     "port-shelf": "Chassis#1"
57                 },
58                 "lgx": {
59                     "lgx-device-name": "Some lgx-device-name",
60                     "lgx-port-name": "Some lgx-port-name",
61                     "lgx-port-rack": "000000.00",
62                     "lgx-port-shelf": "00"
63                 },
64                 "index": 0
65             }],
66             "rx-direction": [{
67                 "port": {
68                     "port-device-name": "SPDR-SA1-XPDR1",
69                     "port-type": "fixed",
70                     "port-name": "XPDR1-NETWORK1",
71                     "port-rack": "000000.00",
72                     "port-shelf": "Chassis#1"
73                 },
74                 "lgx": {
75                     "lgx-device-name": "Some lgx-device-name",
76                     "lgx-port-name": "Some lgx-port-name",
77                     "lgx-port-rack": "000000.00",
78                     "lgx-port-shelf": "00"
79                 },
80                 "index": 0
81             }],
82             "optic-type": "gray"
83         },
84         "service-z-end": {
85             "service-rate": "100",
86             "node-id": "SPDR-SC1",
87             "service-format": "OTU",
88             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89             "clli": "NodeSC",
90             "tx-direction": [{
91                 "port": {
92                     "port-device-name": "SPDR-SC1-XPDR1",
93                     "port-type": "fixed",
94                     "port-name": "XPDR1-NETWORK1",
95                     "port-rack": "000000.00",
96                     "port-shelf": "Chassis#1"
97                 },
98                 "lgx": {
99                     "lgx-device-name": "Some lgx-device-name",
100                     "lgx-port-name": "Some lgx-port-name",
101                     "lgx-port-rack": "000000.00",
102                     "lgx-port-shelf": "00"
103                 },
104                 "index": 0
105             }],
106             "rx-direction": [{
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 },
120                 "index": 0
121             }],
122             "optic-type": "gray"
123         },
124         "due-date": "2018-06-15T00:00:01Z",
125         "operator-contact": "pw1234"
126     }
127     }
128
129     @classmethod
130     def setUpClass(cls):
131         cls.processes = test_utils_rfc8040.start_tpce()
132         cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
133                                                        ('roadma', cls.NODE_VERSION),
134                                                        ('roadmc', cls.NODE_VERSION),
135                                                        ('spdrc', cls.NODE_VERSION)])
136
137     @classmethod
138     def tearDownClass(cls):
139         # pylint: disable=not-an-iterable
140         for process in cls.processes:
141             test_utils_rfc8040.shutdown_process(process)
142         print("all processes killed")
143
144     def setUp(self):
145         time.sleep(5)
146
147     def test_01_connect_spdrA(self):
148         response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149         self.assertEqual(response.status_code,
150                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
151
152     def test_02_connect_spdrC(self):
153         response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
154         self.assertEqual(response.status_code,
155                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
156
157     def test_03_connect_rdmA(self):
158         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
159         self.assertEqual(response.status_code,
160                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
161
162     def test_04_connect_rdmC(self):
163         response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
164         self.assertEqual(response.status_code,
165                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
166
167     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
168         response = test_utils_rfc8040.transportpce_api_rpc_request(
169             'transportpce-networkutils', 'init-xpdr-rdm-links',
170             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
171                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172         self.assertEqual(response['status_code'], requests.codes.ok)
173
174     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175         response = test_utils_rfc8040.transportpce_api_rpc_request(
176             'transportpce-networkutils', 'init-rdm-xpdr-links',
177             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
178                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
179         self.assertEqual(response['status_code'], requests.codes.ok)
180
181     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
182         response = test_utils_rfc8040.transportpce_api_rpc_request(
183             'transportpce-networkutils', 'init-xpdr-rdm-links',
184             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
185                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186         self.assertEqual(response['status_code'], requests.codes.ok)
187
188     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189         response = test_utils_rfc8040.transportpce_api_rpc_request(
190             'transportpce-networkutils', 'init-rdm-xpdr-links',
191             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193         self.assertEqual(response['status_code'], requests.codes.ok)
194
195     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196         # Config ROADMA-ROADMC oms-attributes
197         data = {"span": {
198             "auto-spanloss": "true",
199             "spanloss-base": 11.4,
200             "spanloss-current": 12,
201             "engineered-spanloss": 12.2,
202             "link-concatenation": [{
203                 "SRLG-Id": 0,
204                 "fiber-type": "smf",
205                 "SRLG-length": 100000,
206                 "pmd": 0.5}]}}
207         response = test_utils_rfc8040.add_oms_attr_request(
208             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
209         self.assertEqual(response.status_code, requests.codes.created)
210
211     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212         # Config ROADMC-ROADMA oms-attributes
213         data = {"span": {
214             "auto-spanloss": "true",
215             "spanloss-base": 11.4,
216             "spanloss-current": 12,
217             "engineered-spanloss": 12.2,
218             "link-concatenation": [{
219                 "SRLG-Id": 0,
220                 "fiber-type": "smf",
221                 "SRLG-length": 100000,
222                 "pmd": 0.5}]}}
223         response = test_utils_rfc8040.add_oms_attr_request(
224             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
225         self.assertEqual(response.status_code, requests.codes.created)
226
227 # test service-create for OCH-OTU4 service from spdr to spdr
228     def test_11_check_otn_topology(self):
229         response = test_utils.get_otn_topo_request()
230         self.assertEqual(response.status_code, requests.codes.ok)
231         res = response.json()
232         nbNode = len(res['network'][0]['node'])
233         self.assertEqual(nbNode, 6, 'There should be 6 nodes')
234         self.assertNotIn('ietf-network-topology:link', res['network'][0])
235
236     def test_12_create_OCH_OTU4_service(self):
237         response = test_utils.service_create_request(self.cr_serv_sample_data)
238         self.assertEqual(response.status_code, requests.codes.ok)
239         res = response.json()
240         self.assertIn('PCE calculation in progress',
241                       res['output']['configuration-response-common']['response-message'])
242         time.sleep(self.WAITING)
243
244     def test_13_get_OCH_OTU4_service1(self):
245         response = test_utils.get_service_list_request(
246             "services/service1-OCH-OTU4")
247         self.assertEqual(response.status_code, requests.codes.ok)
248         res = response.json()
249         self.assertEqual(
250             res['services'][0]['administrative-state'], 'inService')
251         self.assertEqual(
252             res['services'][0]['service-name'], 'service1-OCH-OTU4')
253         self.assertEqual(
254             res['services'][0]['connection-type'], 'infrastructure')
255         self.assertEqual(
256             res['services'][0]['lifecycle-state'], 'planned')
257         time.sleep(2)
258
259     # Check correct configuration of devices
260     def test_14_check_interface_och_spdra(self):
261         response = test_utils_rfc8040.check_node_attribute_request(
262             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
263         self.assertEqual(response['status_code'], requests.codes.ok)
264         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
265                                    'administrative-state': 'inService',
266                                    'supporting-circuit-pack-name': 'CP1-CFP0',
267                                    'type': 'org-openroadm-interfaces:opticalChannel',
268                                    'supporting-port': 'CP1-CFP0-P1'
269                                    }, **response['interface'][0]),
270                              response['interface'][0])
271         self.assertEqual('org-openroadm-common-types:R100G',
272                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
273         self.assertEqual('dp-qpsk',
274                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
275         self.assertEqual(196.1,
276                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
277         self.assertEqual(
278             -5,
279             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
280
281     def test_15_check_interface_OTU4_spdra(self):
282         response = test_utils_rfc8040.check_node_attribute_request(
283             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
284         self.assertEqual(response['status_code'], requests.codes.ok)
285         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
286                         'administrative-state': 'inService',
287                         'supporting-circuit-pack-name': 'CP1-CFP0',
288                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
289                         'type': 'org-openroadm-interfaces:otnOtu',
290                         'supporting-port': 'CP1-CFP0-P1'
291                         }
292         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
293                         'expected-dapi': 'H/OelLynehI=',
294                         'tx-dapi': 'AMf1n5hK6Xkk',
295                         'expected-sapi': 'AMf1n5hK6Xkk',
296                         'rate': 'org-openroadm-otn-common-types:OTU4',
297                         'fec': 'scfec'
298                         }
299         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
300                              response['interface'][0])
301         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
302                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
303
304     def test_16_check_interface_och_spdrc(self):
305         response = test_utils_rfc8040.check_node_attribute_request(
306             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
307         self.assertEqual(response['status_code'], requests.codes.ok)
308         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
309                                    'administrative-state': 'inService',
310                                    'supporting-circuit-pack-name': 'CP1-CFP0',
311                                    'type': 'org-openroadm-interfaces:opticalChannel',
312                                    'supporting-port': 'CP1-CFP0-P1'
313                                    }, **response['interface'][0]),
314                              response['interface'][0])
315         self.assertEqual('org-openroadm-common-types:R100G',
316                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
317         self.assertEqual('dp-qpsk',
318                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
319         self.assertEqual(196.1,
320                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
321         self.assertEqual(
322             -5,
323             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
324
325     def test_17_check_interface_OTU4_spdrc(self):
326         response = test_utils_rfc8040.check_node_attribute_request(
327             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
328         self.assertEqual(response['status_code'], requests.codes.ok)
329         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
330                         'administrative-state': 'inService',
331                         'supporting-circuit-pack-name': 'CP1-CFP0',
332                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
333                         'type': 'org-openroadm-interfaces:otnOtu',
334                         'supporting-port': 'CP1-CFP0-P1'
335                         }
336         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
337                         'expected-sapi': 'H/OelLynehI=',
338                         'tx-sapi': 'AMf1n5hK6Xkk',
339                         'expected-dapi': 'AMf1n5hK6Xkk',
340                         'rate': 'org-openroadm-otn-common-types:OTU4',
341                         'fec': 'scfec'
342                         }
343         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
344                              response['interface'][0])
345         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
346                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
347
348     def test_18_check_no_interface_ODU4_spdra(self):
349         response = test_utils_rfc8040.check_node_attribute_request(
350             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
351         self.assertEqual(response['status_code'], requests.codes.conflict)
352
353     def test_19_check_openroadm_topo_spdra(self):
354         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
355         self.assertEqual(response['status_code'], requests.codes.ok)
356         ele = response['node']['ietf-network-topology:termination-point'][0]
357         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
358         self.assertEqual(
359             196.1,
360             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
361         self.assertEqual(
362             40,
363             float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
364
365     def test_20_check_openroadm_topo_ROADMA_SRG(self):
366         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
367         self.assertEqual(response['status_code'], requests.codes.ok)
368         freq_map = base64.b64decode(
369             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
370         freq_map_array = [int(x) for x in freq_map]
371         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
372         liste_tp = response['node']['ietf-network-topology:termination-point']
373         for ele in liste_tp:
374             if ele['tp-id'] == 'SRG1-PP1-TXRX':
375                 freq_map = base64.b64decode(
376                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
377                 freq_map_array = [int(x) for x in freq_map]
378                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
379             if ele['tp-id'] == 'SRG1-PP2-TXRX':
380                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
381
382     def test_21_check_openroadm_topo_ROADMA_DEG(self):
383         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
384         self.assertEqual(response['status_code'], requests.codes.ok)
385         freq_map = base64.b64decode(
386             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
387         freq_map_array = [int(x) for x in freq_map]
388         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
389         liste_tp = response['node']['ietf-network-topology:termination-point']
390         for ele in liste_tp:
391             if ele['tp-id'] == 'DEG2-CTP-TXRX':
392                 freq_map = base64.b64decode(
393                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
394                 freq_map_array = [int(x) for x in freq_map]
395                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
396             if ele['tp-id'] == 'DEG2-TTP-TXRX':
397                 freq_map = base64.b64decode(
398                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
399                 freq_map_array = [int(x) for x in freq_map]
400                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
401
402     def test_22_check_otn_topo_otu4_links(self):
403         response = test_utils.get_otn_topo_request()
404         self.assertEqual(response.status_code, requests.codes.ok)
405         res = response.json()
406         nb_links = len(res['network'][0]['ietf-network-topology:link'])
407         self.assertEqual(nb_links, 2)
408         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
409                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
410         for link in res['network'][0]['ietf-network-topology:link']:
411             self.assertIn(link['link-id'], listLinkId)
412             self.assertEqual(
413                 link['transportpce-topology:otn-link-type'], 'OTU4')
414             self.assertEqual(
415                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
416             self.assertEqual(
417                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
418             self.assertEqual(
419                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
420             self.assertIn(
421                 link['org-openroadm-common-network:opposite-link'], listLinkId)
422
423 # test service-create for ODU4 service from spdr to spdr
424     def test_23_create_ODU4_service(self):
425         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
426         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
427         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
428         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
429         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
430         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
431         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
432
433         response = test_utils.service_create_request(self.cr_serv_sample_data)
434         self.assertEqual(response.status_code, requests.codes.ok)
435         res = response.json()
436         self.assertIn('PCE calculation in progress',
437                       res['output']['configuration-response-common']['response-message'])
438         time.sleep(self.WAITING)
439
440     def test_24_get_ODU4_service1(self):
441         response = test_utils.get_service_list_request(
442             "services/service1-ODU4")
443         self.assertEqual(response.status_code, requests.codes.ok)
444         res = response.json()
445         self.assertEqual(
446             res['services'][0]['administrative-state'], 'inService')
447         self.assertEqual(
448             res['services'][0]['service-name'], 'service1-ODU4')
449         self.assertEqual(
450             res['services'][0]['connection-type'], 'infrastructure')
451         self.assertEqual(
452             res['services'][0]['lifecycle-state'], 'planned')
453         time.sleep(2)
454
455     def test_25_check_interface_ODU4_spdra(self):
456         response = test_utils_rfc8040.check_node_attribute_request(
457             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
458         self.assertEqual(response['status_code'], requests.codes.ok)
459         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
460                         'administrative-state': 'inService',
461                         'supporting-circuit-pack-name': 'CP1-CFP0',
462                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
463                         'type': 'org-openroadm-interfaces:otnOdu',
464                         'supporting-port': 'CP1-CFP0-P1',
465                         'circuit-id': 'TBD',
466                         'description': 'TBD'}
467         # SAPI/DAPI are added in the Otu4 renderer
468         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
469                         'rate': 'org-openroadm-otn-common-types:ODU4',
470                         'monitoring-mode': 'terminated',
471                         'expected-dapi': 'H/OelLynehI=',
472                         'expected-sapi': 'AMf1n5hK6Xkk',
473                         'tx-dapi': 'AMf1n5hK6Xkk',
474                         'tx-sapi': 'H/OelLynehI='}
475
476         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
477                              response['interface'][0])
478         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
479                                   **input_dict_2),
480                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
481                              )
482         self.assertDictEqual(
483             {'payload-type': '21', 'exp-payload-type': '21'},
484             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
485
486     def test_26_check_interface_ODU4_spdrc(self):
487         response = test_utils_rfc8040.check_node_attribute_request(
488             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
489         self.assertEqual(response['status_code'], requests.codes.ok)
490         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
491                         'administrative-state': 'inService',
492                         'supporting-circuit-pack-name': 'CP1-CFP0',
493                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
494                         'type': 'org-openroadm-interfaces:otnOdu',
495                         'supporting-port': 'CP1-CFP0-P1',
496                         'circuit-id': 'TBD',
497                         'description': 'TBD'}
498         # SAPI/DAPI are added in the Otu4 renderer
499         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
500                         'rate': 'org-openroadm-otn-common-types:ODU4',
501                         'monitoring-mode': 'terminated',
502                         'tx-sapi': 'AMf1n5hK6Xkk',
503                         'tx-dapi': 'H/OelLynehI=',
504                         'expected-sapi': 'H/OelLynehI=',
505                         'expected-dapi': 'AMf1n5hK6Xkk'
506                         }
507         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
508                              response['interface'][0])
509         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
510                                   **input_dict_2),
511                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
512                              )
513         self.assertDictEqual(
514             {'payload-type': '21', 'exp-payload-type': '21'},
515             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
516
517     def test_27_check_otn_topo_links(self):
518         response = test_utils.get_otn_topo_request()
519         self.assertEqual(response.status_code, requests.codes.ok)
520         res = response.json()
521         nb_links = len(res['network'][0]['ietf-network-topology:link'])
522         self.assertEqual(nb_links, 4)
523         for link in res['network'][0]['ietf-network-topology:link']:
524             linkId = link['link-id']
525             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
526                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
527                 self.assertEqual(
528                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
529                 self.assertEqual(
530                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
531             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
532                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
533                 self.assertEqual(
534                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
535                 self.assertEqual(
536                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
537                 self.assertEqual(
538                     link['transportpce-topology:otn-link-type'], 'ODTU4')
539                 self.assertEqual(
540                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
541                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
542                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
543                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
544             else:
545                 self.fail("this link should not exist")
546
547     def test_28_check_otn_topo_tp(self):
548         response = test_utils.get_otn_topo_request()
549         res = response.json()
550         for node in res['network'][0]['node']:
551             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
552                 tpList = node['ietf-network-topology:termination-point']
553                 for tp in tpList:
554                     if tp['tp-id'] == 'XPDR1-NETWORK1':
555                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
556                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
557                         self.assertEqual(
558                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
559                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
560                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
561
562 # test service-create for 10GE service from spdr to spdr
563     def test_29_create_10GE_service(self):
564         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
565         self.cr_serv_sample_data["input"]["connection-type"] = "service"
566         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
567         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
568         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
569         self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
570         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
571         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
572         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
573         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
574         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
575         self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
576         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
577         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
578         response = test_utils.service_create_request(self.cr_serv_sample_data)
579         self.assertEqual(response.status_code, requests.codes.ok)
580         res = response.json()
581         self.assertIn('PCE calculation in progress',
582                       res['output']['configuration-response-common']['response-message'])
583         time.sleep(self.WAITING)
584
585     def test_30_get_10GE_service1(self):
586         response = test_utils.get_service_list_request(
587             "services/service1-10GE")
588         self.assertEqual(response.status_code, requests.codes.ok)
589         res = response.json()
590         self.assertEqual(
591             res['services'][0]['administrative-state'], 'inService')
592         self.assertEqual(
593             res['services'][0]['service-name'], 'service1-10GE')
594         self.assertEqual(
595             res['services'][0]['connection-type'], 'service')
596         self.assertEqual(
597             res['services'][0]['lifecycle-state'], 'planned')
598         time.sleep(2)
599
600     def test_31_check_interface_10GE_CLIENT_spdra(self):
601         response = test_utils_rfc8040.check_node_attribute_request(
602             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
603         self.assertEqual(response['status_code'], requests.codes.ok)
604         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
605                       'administrative-state': 'inService',
606                       'supporting-circuit-pack-name': 'CP1-SFP4',
607                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
608                       'supporting-port': 'CP1-SFP4-P1'
609                       }
610         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
611                              response['interface'][0])
612         self.assertEqual(
613             10000,
614             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
615
616     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
617         response = test_utils_rfc8040.check_node_attribute_request(
618             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
619         self.assertEqual(response['status_code'], requests.codes.ok)
620         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
621                         'administrative-state': 'inService',
622                         'supporting-circuit-pack-name': 'CP1-SFP4',
623                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
624                         'type': 'org-openroadm-interfaces:otnOdu',
625                         'supporting-port': 'CP1-SFP4-P1'}
626         input_dict_2 = {
627             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
628             'rate': 'org-openroadm-otn-common-types:ODU2e',
629             'monitoring-mode': 'terminated'}
630
631         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
632                              response['interface'][0])
633         self.assertDictEqual(dict(input_dict_2,
634                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
635                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
636         self.assertDictEqual(
637             {'payload-type': '03', 'exp-payload-type': '03'},
638             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
639
640     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
641         response = test_utils_rfc8040.check_node_attribute_request(
642             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
643         self.assertEqual(response['status_code'], requests.codes.ok)
644         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
645                         'administrative-state': 'inService',
646                         'supporting-circuit-pack-name': 'CP1-CFP0',
647                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
648                         'type': 'org-openroadm-interfaces:otnOdu',
649                         'supporting-port': 'CP1-CFP0-P1'}
650         input_dict_2 = {
651             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
652             'rate': 'org-openroadm-otn-common-types:ODU2e',
653             'monitoring-mode': 'monitored'}
654         input_dict_3 = {'trib-port-number': 1}
655
656         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
657                              response['interface'][0])
658         self.assertDictEqual(dict(input_dict_2,
659                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
660                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
661         self.assertDictEqual(dict(input_dict_3,
662                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
663                                       'parent-odu-allocation']),
664                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
665         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
666                       ['trib-slots'])
667
668     def test_34_check_ODU2E_connection_spdra(self):
669         response = test_utils_rfc8040.check_node_attribute_request(
670             'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
671         self.assertEqual(response['status_code'], requests.codes.ok)
672         input_dict_1 = {
673             'connection-name':
674             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
675             'direction': 'bidirectional'
676         }
677
678         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
679                              response['odu-connection'][0])
680         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
681                              response['odu-connection'][0]['destination'])
682         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
683                              response['odu-connection'][0]['source'])
684
685     def test_35_check_interface_10GE_CLIENT_spdrc(self):
686         response = test_utils_rfc8040.check_node_attribute_request(
687             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
688         self.assertEqual(response['status_code'], requests.codes.ok)
689         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
690                       'administrative-state': 'inService',
691                       'supporting-circuit-pack-name': 'CP1-SFP4',
692                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
693                       'supporting-port': 'CP1-SFP4-P1'
694                       }
695         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
696                              response['interface'][0])
697         self.assertEqual(
698             10000,
699             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
700
701     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
702         response = test_utils_rfc8040.check_node_attribute_request(
703             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
704         self.assertEqual(response['status_code'], requests.codes.ok)
705         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
706                         'administrative-state': 'inService',
707                         'supporting-circuit-pack-name': 'CP1-SFP4',
708                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
709                         'type': 'org-openroadm-interfaces:otnOdu',
710                         'supporting-port': 'CP1-SFP4-P1'}
711         input_dict_2 = {
712             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
713             'rate': 'org-openroadm-otn-common-types:ODU2e',
714             'monitoring-mode': 'terminated'}
715
716         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
717                              response['interface'][0])
718         self.assertDictEqual(dict(input_dict_2,
719                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
720                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
721         self.assertDictEqual(
722             {'payload-type': '03', 'exp-payload-type': '03'},
723             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
724
725     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
726         response = test_utils_rfc8040.check_node_attribute_request(
727             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
728         self.assertEqual(response['status_code'], requests.codes.ok)
729         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
730                         'administrative-state': 'inService',
731                         'supporting-circuit-pack-name': 'CP1-CFP0',
732                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
733                         'type': 'org-openroadm-interfaces:otnOdu',
734                         'supporting-port': 'CP1-CFP0-P1'}
735         input_dict_2 = {
736             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
737             'rate': 'org-openroadm-otn-common-types:ODU2e',
738             'monitoring-mode': 'monitored'}
739
740         input_dict_3 = {'trib-port-number': 1}
741
742         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
743                              response['interface'][0])
744         self.assertDictEqual(dict(input_dict_2,
745                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
746                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
747         self.assertDictEqual(dict(input_dict_3,
748                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
749                                       'parent-odu-allocation']),
750                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
751             'parent-odu-allocation'])
752         self.assertIn(1,
753                       response['interface'][0][
754                           'org-openroadm-otn-odu-interfaces:odu'][
755                           'parent-odu-allocation']['trib-slots'])
756
757     def test_38_check_ODU2E_connection_spdrc(self):
758         response = test_utils_rfc8040.check_node_attribute_request(
759             'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
760         self.assertEqual(response['status_code'], requests.codes.ok)
761         input_dict_1 = {
762             'connection-name':
763             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
764             'direction': 'bidirectional'
765         }
766
767         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
768                              response['odu-connection'][0])
769         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
770                              response['odu-connection'][0]['destination'])
771         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
772                              response['odu-connection'][0]['source'])
773
774     def test_39_check_otn_topo_links(self):
775         response = test_utils.get_otn_topo_request()
776         self.assertEqual(response.status_code, requests.codes.ok)
777         res = response.json()
778         nb_links = len(res['network'][0]['ietf-network-topology:link'])
779         self.assertEqual(nb_links, 4)
780         for link in res['network'][0]['ietf-network-topology:link']:
781             linkId = link['link-id']
782             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
783                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
784                 self.assertEqual(
785                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
786                 self.assertEqual(
787                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
788
789     def test_40_check_otn_topo_tp(self):
790         response = test_utils.get_otn_topo_request()
791         res = response.json()
792         for node in res['network'][0]['node']:
793             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
794                 tpList = node['ietf-network-topology:termination-point']
795                 for tp in tpList:
796                     if tp['tp-id'] == 'XPDR1-NETWORK1':
797                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
798                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
799                         tsPoolList = list(range(1, 9))
800                         self.assertNotIn(
801                             tsPoolList, xpdrTpPortConAt['ts-pool'])
802                         self.assertEqual(
803                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
804                         self.assertNotIn(
805                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
806
807     def test_41_delete_10GE_service(self):
808         response = test_utils.service_delete_request("service1-10GE")
809         self.assertEqual(response.status_code, requests.codes.ok)
810         res = response.json()
811         self.assertIn('Renderer service delete in progress',
812                       res['output']['configuration-response-common']['response-message'])
813         time.sleep(self.WAITING)
814
815     def test_42_check_service_list(self):
816         response = test_utils.get_service_list_request("")
817         self.assertEqual(response.status_code, requests.codes.ok)
818         res = response.json()
819         self.assertEqual(len(res['service-list']['services']), 2)
820         time.sleep(2)
821
822     def test_43_check_no_ODU2e_connection_spdra(self):
823         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
824         self.assertEqual(response.status_code, requests.codes.ok)
825         res = response.json()
826         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
827         time.sleep(1)
828
829     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
830         response = test_utils_rfc8040.check_node_attribute_request(
831             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
832         self.assertEqual(response['status_code'], requests.codes.conflict)
833
834     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
835         response = test_utils_rfc8040.check_node_attribute_request(
836             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
837         self.assertEqual(response['status_code'], requests.codes.conflict)
838
839     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
840         response = test_utils_rfc8040.check_node_attribute_request(
841             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
842         self.assertEqual(response['status_code'], requests.codes.conflict)
843
844     def test_47_check_otn_topo_links(self):
845         response = test_utils.get_otn_topo_request()
846         self.assertEqual(response.status_code, requests.codes.ok)
847         res = response.json()
848         nb_links = len(res['network'][0]['ietf-network-topology:link'])
849         self.assertEqual(nb_links, 4)
850         for link in res['network'][0]['ietf-network-topology:link']:
851             linkId = link['link-id']
852             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
853                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
854                 self.assertEqual(
855                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
856                 self.assertEqual(
857                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
858
859     def test_48_check_otn_topo_tp(self):
860         response = test_utils.get_otn_topo_request()
861         res = response.json()
862         for node in res['network'][0]['node']:
863             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
864                 tpList = node['ietf-network-topology:termination-point']
865                 for tp in tpList:
866                     if tp['tp-id'] == 'XPDR1-NETWORK1':
867                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
868                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
869                         self.assertEqual(
870                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
871
872     def test_49_delete_ODU4_service(self):
873         response = test_utils.service_delete_request("service1-ODU4")
874         self.assertEqual(response.status_code, requests.codes.ok)
875         res = response.json()
876         self.assertIn('Renderer service delete in progress',
877                       res['output']['configuration-response-common']['response-message'])
878         time.sleep(self.WAITING)
879
880     def test_50_check_service_list(self):
881         response = test_utils.get_service_list_request("")
882         self.assertEqual(response.status_code, requests.codes.ok)
883         res = response.json()
884         self.assertEqual(len(res['service-list']['services']), 1)
885         time.sleep(2)
886
887     def test_51_check_no_interface_ODU4_spdra(self):
888         response = test_utils_rfc8040.check_node_attribute_request(
889             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
890         self.assertEqual(response['status_code'], requests.codes.conflict)
891
892     def test_52_check_otn_topo_links(self):
893         self.test_22_check_otn_topo_otu4_links()
894
895     def test_53_check_otn_topo_tp(self):
896         response = test_utils.get_otn_topo_request()
897         res = response.json()
898         for node in res['network'][0]['node']:
899             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
900                 tpList = node['ietf-network-topology:termination-point']
901                 for tp in tpList:
902                     if tp['tp-id'] == 'XPDR1-NETWORK1':
903                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
904                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
905                         self.assertNotIn(
906                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
907
908     def test_54_delete_OCH_OTU4_service(self):
909         response = test_utils.service_delete_request("service1-OCH-OTU4")
910         self.assertEqual(response.status_code, requests.codes.ok)
911         res = response.json()
912         self.assertIn('Renderer service delete in progress',
913                       res['output']['configuration-response-common']['response-message'])
914         time.sleep(self.WAITING)
915
916     def test_55_get_no_service(self):
917         response = test_utils.get_service_list_request("")
918         self.assertEqual(response.status_code, requests.codes.conflict)
919         res = response.json()
920         self.assertIn(
921             {"error-type": "application", "error-tag": "data-missing",
922              "error-message": "Request could not be completed because the relevant data model content does not exist"},
923             res['errors']['error'])
924         time.sleep(1)
925
926     def test_56_check_no_interface_OTU4_spdra(self):
927         response = test_utils_rfc8040.check_node_attribute_request(
928             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
929         self.assertEqual(response['status_code'], requests.codes.conflict)
930
931     def test_57_check_no_interface_OCH_spdra(self):
932         response = test_utils_rfc8040.check_node_attribute_request(
933             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
934         self.assertEqual(response['status_code'], requests.codes.conflict)
935
936     def test_58_getLinks_OtnTopology(self):
937         response = test_utils.get_otn_topo_request()
938         self.assertEqual(response.status_code, requests.codes.ok)
939         res = response.json()
940         self.assertNotIn('ietf-network-topology:link', res['network'][0])
941
942     def test_59_check_openroadm_topo_spdra(self):
943         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
944         self.assertEqual(response['status_code'], requests.codes.ok)
945         tp = response['node']['ietf-network-topology:termination-point'][0]
946         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
947         self.assertNotIn('wavelength', dict.keys(
948             tp['org-openroadm-network-topology:xpdr-network-attributes']))
949
950     def test_60_check_openroadm_topo_ROADMA_SRG(self):
951         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
952         self.assertEqual(response['status_code'], requests.codes.ok)
953         freq_map = base64.b64decode(
954             response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
955         freq_map_array = [int(x) for x in freq_map]
956         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
957         liste_tp = response['node']['ietf-network-topology:termination-point']
958         for ele in liste_tp:
959             if ele['tp-id'] == 'SRG1-PP1-TXRX':
960                 freq_map = base64.b64decode(
961                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
962                 freq_map_array = [int(x) for x in freq_map]
963                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
964         time.sleep(3)
965
966     def test_61_check_openroadm_topo_ROADMA_DEG(self):
967         response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
968         self.assertEqual(response['status_code'], requests.codes.ok)
969         freq_map = base64.b64decode(
970             response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
971         freq_map_array = [int(x) for x in freq_map]
972         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
973         liste_tp = response['node']['ietf-network-topology:termination-point']
974         for ele in liste_tp:
975             if ele['tp-id'] == 'DEG2-CTP-TXRX':
976                 freq_map = base64.b64decode(
977                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
978                 freq_map_array = [int(x) for x in freq_map]
979                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
980             if ele['tp-id'] == 'DEG2-TTP-TXRX':
981                 freq_map = base64.b64decode(
982                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
983                 freq_map_array = [int(x) for x in freq_map]
984                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
985         time.sleep(3)
986
987     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
988         response = test_utils_rfc8040.transportpce_api_rpc_request(
989             'transportpce-networkutils', 'init-xpdr-rdm-links',
990             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
991                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
992         self.assertEqual(response['status_code'], requests.codes.ok)
993
994     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
995         response = test_utils_rfc8040.transportpce_api_rpc_request(
996             'transportpce-networkutils', 'init-rdm-xpdr-links',
997             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
998                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
999         self.assertEqual(response['status_code'], requests.codes.ok)
1000
1001     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1002         response = test_utils_rfc8040.transportpce_api_rpc_request(
1003             'transportpce-networkutils', 'init-xpdr-rdm-links',
1004             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1005                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1006         self.assertEqual(response['status_code'], requests.codes.ok)
1007
1008     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1009         response = test_utils_rfc8040.transportpce_api_rpc_request(
1010             'transportpce-networkutils', 'init-rdm-xpdr-links',
1011             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1012                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1013         self.assertEqual(response['status_code'], requests.codes.ok)
1014
1015     def test_66_create_OCH_OTU4_service_2(self):
1016         # pylint: disable=line-too-long
1017         self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1018         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1019         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1020         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1021         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1022         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1023         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1024         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1025         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1026         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1027         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1028         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1029         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1030         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1031         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1032         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1033         response = test_utils.service_create_request(self.cr_serv_sample_data)
1034         self.assertEqual(response.status_code, requests.codes.ok)
1035         res = response.json()
1036         self.assertIn('PCE calculation in progress',
1037                       res['output']['configuration-response-common']['response-message'])
1038         time.sleep(self.WAITING)
1039
1040     def test_67_get_OCH_OTU4_service2(self):
1041         response = test_utils.get_service_list_request(
1042             "services/service2-OCH-OTU4")
1043         self.assertEqual(response.status_code, requests.codes.ok)
1044         res = response.json()
1045         self.assertEqual(
1046             res['services'][0]['administrative-state'], 'inService')
1047         self.assertEqual(
1048             res['services'][0]['service-name'], 'service2-OCH-OTU4')
1049         self.assertEqual(
1050             res['services'][0]['connection-type'], 'infrastructure')
1051         self.assertEqual(
1052             res['services'][0]['lifecycle-state'], 'planned')
1053         time.sleep(2)
1054
1055     def test_68_create_ODU4_service_2(self):
1056         # pylint: disable=line-too-long
1057         self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1058         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1059         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1060         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1061         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1062         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1063         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1064         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1065         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1066         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1067         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1068         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1069         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1070         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1071         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1072
1073         response = test_utils.service_create_request(self.cr_serv_sample_data)
1074         self.assertEqual(response.status_code, requests.codes.ok)
1075         res = response.json()
1076         self.assertIn('PCE calculation in progress',
1077                       res['output']['configuration-response-common']['response-message'])
1078         time.sleep(self.WAITING)
1079
1080     def test_69_get_ODU4_service2(self):
1081         response = test_utils.get_service_list_request(
1082             "services/service2-ODU4")
1083         self.assertEqual(response.status_code, requests.codes.ok)
1084         res = response.json()
1085         self.assertEqual(
1086             res['services'][0]['administrative-state'], 'inService')
1087         self.assertEqual(
1088             res['services'][0]['service-name'], 'service2-ODU4')
1089         self.assertEqual(
1090             res['services'][0]['connection-type'], 'infrastructure')
1091         self.assertEqual(
1092             res['services'][0]['lifecycle-state'], 'planned')
1093         time.sleep(2)
1094
1095     def test_70_create_1GE_service(self):
1096         # pylint: disable=line-too-long
1097         self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1098         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1099         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1100         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1101         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1102         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1103         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1104         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1105         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1106         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1107         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1108         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1109         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1110         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1111         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1112         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1113         response = test_utils.service_create_request(self.cr_serv_sample_data)
1114         self.assertEqual(response.status_code, requests.codes.ok)
1115         res = response.json()
1116         self.assertIn('PCE calculation in progress',
1117                       res['output']['configuration-response-common']['response-message'])
1118         time.sleep(self.WAITING)
1119
1120     def test_71_get_1GE_service1(self):
1121         response = test_utils.get_service_list_request("services/service1-1GE")
1122         self.assertEqual(response.status_code, requests.codes.ok)
1123         res = response.json()
1124         self.assertEqual(
1125             res['services'][0]['administrative-state'], 'inService')
1126         self.assertEqual(
1127             res['services'][0]['service-name'], 'service1-1GE')
1128         self.assertEqual(
1129             res['services'][0]['connection-type'], 'service')
1130         self.assertEqual(
1131             res['services'][0]['lifecycle-state'], 'planned')
1132         time.sleep(2)
1133
1134     def test_72_check_interface_1GE_CLIENT_spdra(self):
1135         response = test_utils_rfc8040.check_node_attribute_request(
1136             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1137         self.assertEqual(response['status_code'], requests.codes.ok)
1138         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1139                       'administrative-state': 'inService',
1140                       'supporting-circuit-pack-name': 'CP1-SFP4',
1141                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1142                       'supporting-port': 'CP1-SFP4-P1'
1143                       }
1144         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1145                              response['interface'][0])
1146         self.assertEqual(
1147             1000,
1148             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1149
1150     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1151         response = test_utils_rfc8040.check_node_attribute_request(
1152             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1153         self.assertEqual(response['status_code'], requests.codes.ok)
1154         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1155                         'administrative-state': 'inService',
1156                         'supporting-circuit-pack-name': 'CP3-SFP1',
1157                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1158                         'type': 'org-openroadm-interfaces:otnOdu',
1159                         'supporting-port': 'CP3-SFP1-P1'}
1160         input_dict_2 = {
1161             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1162             'rate': 'org-openroadm-otn-common-types:ODU0',
1163             'monitoring-mode': 'terminated'}
1164         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1165                              response['interface'][0])
1166         self.assertDictEqual(dict(input_dict_2,
1167                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1168                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1169         self.assertDictEqual(
1170             {'payload-type': '07', 'exp-payload-type': '07'},
1171             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1172
1173     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1174         response = test_utils_rfc8040.check_node_attribute_request(
1175             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1176         self.assertEqual(response['status_code'], requests.codes.ok)
1177         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1178                         'administrative-state': 'inService',
1179                         'supporting-circuit-pack-name': 'CP3-CFP0',
1180                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1181                         'type': 'org-openroadm-interfaces:otnOdu',
1182                         'supporting-port': 'CP3-CFP0-P1'}
1183         input_dict_2 = {
1184             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1185             'rate': 'org-openroadm-otn-common-types:ODU0',
1186             'monitoring-mode': 'monitored'}
1187         input_dict_3 = {'trib-port-number': 1}
1188         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1189                              response['interface'][0])
1190         self.assertDictEqual(dict(input_dict_2,
1191                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1192                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1193         self.assertDictEqual(dict(input_dict_3,
1194                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1195                                       'parent-odu-allocation']),
1196                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1197         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1198                       ['trib-slots'])
1199
1200     def test_75_check_ODU0_connection_spdra(self):
1201         response = test_utils_rfc8040.check_node_attribute_request(
1202             'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1203         self.assertEqual(response['status_code'], requests.codes.ok)
1204         input_dict_1 = {
1205             'connection-name':
1206             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1207             'direction': 'bidirectional'
1208         }
1209         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1210                              response['odu-connection'][0])
1211         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1212                              response['odu-connection'][0]['destination'])
1213         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1214                              response['odu-connection'][0]['source'])
1215
1216     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1217         response = test_utils_rfc8040.check_node_attribute_request(
1218             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1219         self.assertEqual(response['status_code'], requests.codes.ok)
1220         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1221                       'administrative-state': 'inService',
1222                       'supporting-circuit-pack-name': 'CP3-SFP1',
1223                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1224                       'supporting-port': 'CP3-SFP1-P1'
1225                       }
1226         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1227                              response['interface'][0])
1228         self.assertEqual(
1229             1000,
1230             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1231
1232     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1233         response = test_utils_rfc8040.check_node_attribute_request(
1234             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1235         self.assertEqual(response['status_code'], requests.codes.ok)
1236         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1237                         'administrative-state': 'inService',
1238                         'supporting-circuit-pack-name': 'CP3-SFP1',
1239                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1240                         'type': 'org-openroadm-interfaces:otnOdu',
1241                         'supporting-port': 'CP3-SFP1-P1'}
1242         input_dict_2 = {
1243             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1244             'rate': 'org-openroadm-otn-common-types:ODU0',
1245             'monitoring-mode': 'terminated'}
1246         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1247                              response['interface'][0])
1248         self.assertDictEqual(dict(input_dict_2,
1249                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1250                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1251         self.assertDictEqual(
1252             {'payload-type': '07', 'exp-payload-type': '07'},
1253             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1254
1255     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1256         response = test_utils_rfc8040.check_node_attribute_request(
1257             'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1258         self.assertEqual(response['status_code'], requests.codes.ok)
1259         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1260                         'administrative-state': 'inService',
1261                         'supporting-circuit-pack-name': 'CP3-CFP0',
1262                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1263                         'type': 'org-openroadm-interfaces:otnOdu',
1264                         'supporting-port': 'CP3-CFP0-P1'}
1265         input_dict_2 = {
1266             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1267             'rate': 'org-openroadm-otn-common-types:ODU0',
1268             'monitoring-mode': 'monitored'}
1269         input_dict_3 = {'trib-port-number': 1}
1270         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1271                              response['interface'][0])
1272         self.assertDictEqual(dict(input_dict_2,
1273                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1274                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1275         self.assertDictEqual(dict(input_dict_3,
1276                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1277                                       'parent-odu-allocation']),
1278                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1279             'parent-odu-allocation'])
1280         self.assertIn(1,
1281                       response['interface'][0][
1282                           'org-openroadm-otn-odu-interfaces:odu'][
1283                           'parent-odu-allocation']['trib-slots'])
1284
1285     def test_79_check_ODU0_connection_spdrc(self):
1286         response = test_utils_rfc8040.check_node_attribute_request(
1287             'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1288         self.assertEqual(response['status_code'], requests.codes.ok)
1289         input_dict_1 = {
1290             'connection-name':
1291             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1292             'direction': 'bidirectional'
1293         }
1294         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1295                              response['odu-connection'][0])
1296         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1297                              response['odu-connection'][0]['destination'])
1298         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1299                              response['odu-connection'][0]['source'])
1300
1301     def test_80_check_otn_topo_links(self):
1302         response = test_utils.get_otn_topo_request()
1303         self.assertEqual(response.status_code, requests.codes.ok)
1304         res = response.json()
1305         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1306         self.assertEqual(nb_links, 4)
1307         for link in res['network'][0]['ietf-network-topology:link']:
1308             linkId = link['link-id']
1309             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1310                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1311                 self.assertEqual(
1312                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1313                 self.assertEqual(
1314                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1315
1316     def test_81_check_otn_topo_tp(self):
1317         response = test_utils.get_otn_topo_request()
1318         res = response.json()
1319         for node in res['network'][0]['node']:
1320             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1321                 tpList = node['ietf-network-topology:termination-point']
1322                 for tp in tpList:
1323                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1324                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1325                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1326                         tsPoolList = list(range(1, 2))
1327                         self.assertNotIn(
1328                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1329                         self.assertEqual(
1330                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1331                         self.assertNotIn(
1332                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1333
1334     def test_82_delete_1GE_service(self):
1335         response = test_utils.service_delete_request("service1-1GE")
1336         self.assertEqual(response.status_code, requests.codes.ok)
1337         res = response.json()
1338         self.assertIn('Renderer service delete in progress',
1339                       res['output']['configuration-response-common']['response-message'])
1340         time.sleep(self.WAITING)
1341
1342     def test_83_check_service_list(self):
1343         response = test_utils.get_service_list_request("")
1344         self.assertEqual(response.status_code, requests.codes.ok)
1345         res = response.json()
1346         self.assertEqual(len(res['service-list']['services']), 2)
1347         time.sleep(2)
1348
1349     def test_84_check_no_ODU0_connection_spdra(self):
1350         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1351         self.assertEqual(response.status_code, requests.codes.ok)
1352         res = response.json()
1353         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1354         time.sleep(1)
1355
1356     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1357         response = test_utils_rfc8040.check_node_attribute_request(
1358             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1359         self.assertEqual(response['status_code'], requests.codes.conflict)
1360
1361     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1362         response = test_utils_rfc8040.check_node_attribute_request(
1363             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1364         self.assertEqual(response['status_code'], requests.codes.conflict)
1365
1366     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1367         response = test_utils_rfc8040.check_node_attribute_request(
1368             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1369         self.assertEqual(response['status_code'], requests.codes.conflict)
1370
1371     def test_88_check_otn_topo_links(self):
1372         response = test_utils.get_otn_topo_request()
1373         self.assertEqual(response.status_code, requests.codes.ok)
1374         res = response.json()
1375         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1376         self.assertEqual(nb_links, 4)
1377         for link in res['network'][0]['ietf-network-topology:link']:
1378             linkId = link['link-id']
1379             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1380                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1381                 self.assertEqual(
1382                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1383                 self.assertEqual(
1384                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1385
1386     def test_89_check_otn_topo_tp(self):
1387         response = test_utils.get_otn_topo_request()
1388         res = response.json()
1389         for node in res['network'][0]['node']:
1390             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1391                 tpList = node['ietf-network-topology:termination-point']
1392                 for tp in tpList:
1393                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1394                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1395                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1396                         self.assertEqual(
1397                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1398
1399     def test_90_delete_ODU4_service(self):
1400         response = test_utils.service_delete_request("service2-ODU4")
1401         self.assertEqual(response.status_code, requests.codes.ok)
1402         res = response.json()
1403         self.assertIn('Renderer service delete in progress',
1404                       res['output']['configuration-response-common']['response-message'])
1405         time.sleep(self.WAITING)
1406
1407     def test_91_delete_OCH_OTU4_service(self):
1408         response = test_utils.service_delete_request("service2-OCH-OTU4")
1409         self.assertEqual(response.status_code, requests.codes.ok)
1410         res = response.json()
1411         self.assertIn('Renderer service delete in progress',
1412                       res['output']['configuration-response-common']['response-message'])
1413         time.sleep(self.WAITING)
1414
1415     def test_92_disconnect_xponders_from_roadm(self):
1416         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1417         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1418         self.assertEqual(response['status_code'], requests.codes.ok)
1419         links = response['network'][0]['ietf-network-topology:link']
1420         for link in links:
1421             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1422                 link_name = link["link-id"]
1423                 response = test_utils.delete_request(url+link_name)
1424                 self.assertEqual(response.status_code, requests.codes.ok)
1425
1426     def test_93_check_openroadm_topology(self):
1427         response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1428         self.assertEqual(response['status_code'], requests.codes.ok)
1429         self.assertEqual(18,
1430                          len(response['network'][0]['ietf-network-topology:link']),
1431                          'Topology should contain 18 links')
1432
1433     def test_94_disconnect_spdrA(self):
1434         response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1435         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1436
1437     def test_95_disconnect_spdrC(self):
1438         response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1439         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1440
1441     def test_96_disconnect_roadmA(self):
1442         response = test_utils_rfc8040.unmount_device("ROADM-A1")
1443         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1444
1445     def test_97_disconnect_roadmC(self):
1446         response = test_utils_rfc8040.unmount_device("ROADM-C1")
1447         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1448
1449
1450 if __name__ == "__main__":
1451     unittest.main(verbosity=2)