Migrate OTN E2E functional tests to RFC8040 step 1
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26 import test_utils_rfc8040  # nopep8
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     WAITING = 20  # nominal value is 300
33     NODE_VERSION = '2.2.1'
34
35     cr_serv_sample_data = {"input": {
36         "sdnc-request-header": {
37             "request-id": "request-1",
38             "rpc-action": "service-create",
39             "request-system-id": "appname"
40         },
41         "service-name": "service1-OCH-OTU4",
42         "common-id": "commonId",
43         "connection-type": "infrastructure",
44         "service-a-end": {
45             "service-rate": "100",
46             "node-id": "SPDR-SA1",
47             "service-format": "OTU",
48             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
49             "clli": "NodeSA",
50             "tx-direction": [{
51                 "port": {
52                     "port-device-name": "SPDR-SA1-XPDR1",
53                     "port-type": "fixed",
54                     "port-name": "XPDR1-NETWORK1",
55                     "port-rack": "000000.00",
56                     "port-shelf": "Chassis#1"
57                 },
58                 "lgx": {
59                     "lgx-device-name": "Some lgx-device-name",
60                     "lgx-port-name": "Some lgx-port-name",
61                     "lgx-port-rack": "000000.00",
62                     "lgx-port-shelf": "00"
63                 },
64                 "index": 0
65             }],
66             "rx-direction": [{
67                 "port": {
68                     "port-device-name": "SPDR-SA1-XPDR1",
69                     "port-type": "fixed",
70                     "port-name": "XPDR1-NETWORK1",
71                     "port-rack": "000000.00",
72                     "port-shelf": "Chassis#1"
73                 },
74                 "lgx": {
75                     "lgx-device-name": "Some lgx-device-name",
76                     "lgx-port-name": "Some lgx-port-name",
77                     "lgx-port-rack": "000000.00",
78                     "lgx-port-shelf": "00"
79                 },
80                 "index": 0
81             }],
82             "optic-type": "gray"
83         },
84         "service-z-end": {
85             "service-rate": "100",
86             "node-id": "SPDR-SC1",
87             "service-format": "OTU",
88             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89             "clli": "NodeSC",
90             "tx-direction": [{
91                 "port": {
92                     "port-device-name": "SPDR-SC1-XPDR1",
93                     "port-type": "fixed",
94                     "port-name": "XPDR1-NETWORK1",
95                     "port-rack": "000000.00",
96                     "port-shelf": "Chassis#1"
97                 },
98                 "lgx": {
99                     "lgx-device-name": "Some lgx-device-name",
100                     "lgx-port-name": "Some lgx-port-name",
101                     "lgx-port-rack": "000000.00",
102                     "lgx-port-shelf": "00"
103                 },
104                 "index": 0
105             }],
106             "rx-direction": [{
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 },
120                 "index": 0
121             }],
122             "optic-type": "gray"
123         },
124         "due-date": "2018-06-15T00:00:01Z",
125         "operator-contact": "pw1234"
126     }
127     }
128
129     @classmethod
130     def setUpClass(cls):
131         cls.processes = test_utils_rfc8040.start_tpce()
132         cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
133                                                        ('roadma', cls.NODE_VERSION),
134                                                        ('roadmc', cls.NODE_VERSION),
135                                                        ('spdrc', cls.NODE_VERSION)])
136
137     @classmethod
138     def tearDownClass(cls):
139         # pylint: disable=not-an-iterable
140         for process in cls.processes:
141             test_utils_rfc8040.shutdown_process(process)
142         print("all processes killed")
143
144     def setUp(self):
145         time.sleep(5)
146
147     def test_01_connect_spdrA(self):
148         response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149         self.assertEqual(response.status_code,
150                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
151
152     def test_02_connect_spdrC(self):
153         response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
154         self.assertEqual(response.status_code,
155                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
156
157     def test_03_connect_rdmA(self):
158         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
159         self.assertEqual(response.status_code,
160                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
161
162     def test_04_connect_rdmC(self):
163         response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
164         self.assertEqual(response.status_code,
165                          requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
166
167     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
168         response = test_utils_rfc8040.transportpce_api_rpc_request(
169             'transportpce-networkutils', 'init-xpdr-rdm-links',
170             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
171                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172         self.assertEqual(response['status_code'], requests.codes.ok)
173
174     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175         response = test_utils_rfc8040.transportpce_api_rpc_request(
176             'transportpce-networkutils', 'init-rdm-xpdr-links',
177             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
178                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
179         self.assertEqual(response['status_code'], requests.codes.ok)
180
181     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
182         response = test_utils_rfc8040.transportpce_api_rpc_request(
183             'transportpce-networkutils', 'init-xpdr-rdm-links',
184             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
185                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186         self.assertEqual(response['status_code'], requests.codes.ok)
187
188     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189         response = test_utils_rfc8040.transportpce_api_rpc_request(
190             'transportpce-networkutils', 'init-rdm-xpdr-links',
191             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193         self.assertEqual(response['status_code'], requests.codes.ok)
194
195     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196         # Config ROADMA-ROADMC oms-attributes
197         data = {"span": {
198             "auto-spanloss": "true",
199             "spanloss-base": 11.4,
200             "spanloss-current": 12,
201             "engineered-spanloss": 12.2,
202             "link-concatenation": [{
203                 "SRLG-Id": 0,
204                 "fiber-type": "smf",
205                 "SRLG-length": 100000,
206                 "pmd": 0.5}]}}
207         response = test_utils_rfc8040.add_oms_attr_request(
208             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
209         self.assertEqual(response.status_code, requests.codes.created)
210
211     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212         # Config ROADMC-ROADMA oms-attributes
213         data = {"span": {
214             "auto-spanloss": "true",
215             "spanloss-base": 11.4,
216             "spanloss-current": 12,
217             "engineered-spanloss": 12.2,
218             "link-concatenation": [{
219                 "SRLG-Id": 0,
220                 "fiber-type": "smf",
221                 "SRLG-length": 100000,
222                 "pmd": 0.5}]}}
223         response = test_utils_rfc8040.add_oms_attr_request(
224             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
225         self.assertEqual(response.status_code, requests.codes.created)
226
227 # test service-create for OCH-OTU4 service from spdr to spdr
228     def test_11_check_otn_topology(self):
229         response = test_utils.get_otn_topo_request()
230         self.assertEqual(response.status_code, requests.codes.ok)
231         res = response.json()
232         nbNode = len(res['network'][0]['node'])
233         self.assertEqual(nbNode, 6, 'There should be 6 nodes')
234         self.assertNotIn('ietf-network-topology:link', res['network'][0])
235
236     def test_12_create_OCH_OTU4_service(self):
237         response = test_utils.service_create_request(self.cr_serv_sample_data)
238         self.assertEqual(response.status_code, requests.codes.ok)
239         res = response.json()
240         self.assertIn('PCE calculation in progress',
241                       res['output']['configuration-response-common']['response-message'])
242         time.sleep(self.WAITING)
243
244     def test_13_get_OCH_OTU4_service1(self):
245         response = test_utils.get_service_list_request(
246             "services/service1-OCH-OTU4")
247         self.assertEqual(response.status_code, requests.codes.ok)
248         res = response.json()
249         self.assertEqual(
250             res['services'][0]['administrative-state'], 'inService')
251         self.assertEqual(
252             res['services'][0]['service-name'], 'service1-OCH-OTU4')
253         self.assertEqual(
254             res['services'][0]['connection-type'], 'infrastructure')
255         self.assertEqual(
256             res['services'][0]['lifecycle-state'], 'planned')
257         time.sleep(2)
258
259     # Check correct configuration of devices
260     def test_14_check_interface_och_spdra(self):
261         response = test_utils_rfc8040.check_node_attribute_request(
262             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
263         self.assertEqual(response['status_code'], requests.codes.ok)
264         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
265                                    'administrative-state': 'inService',
266                                    'supporting-circuit-pack-name': 'CP1-CFP0',
267                                    'type': 'org-openroadm-interfaces:opticalChannel',
268                                    'supporting-port': 'CP1-CFP0-P1'
269                                    }, **response['interface'][0]),
270                              response['interface'][0])
271         self.assertEqual('org-openroadm-common-types:R100G',
272                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
273         self.assertEqual('dp-qpsk',
274                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
275         self.assertEqual(196.1,
276                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
277         self.assertEqual(
278             -5,
279             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
280
281     def test_15_check_interface_OTU4_spdra(self):
282         response = test_utils_rfc8040.check_node_attribute_request(
283             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
284         self.assertEqual(response['status_code'], requests.codes.ok)
285         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
286                         'administrative-state': 'inService',
287                         'supporting-circuit-pack-name': 'CP1-CFP0',
288                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
289                         'type': 'org-openroadm-interfaces:otnOtu',
290                         'supporting-port': 'CP1-CFP0-P1'
291                         }
292         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
293                         'expected-dapi': 'H/OelLynehI=',
294                         'tx-dapi': 'AMf1n5hK6Xkk',
295                         'expected-sapi': 'AMf1n5hK6Xkk',
296                         'rate': 'org-openroadm-otn-common-types:OTU4',
297                         'fec': 'scfec'
298                         }
299         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
300                              response['interface'][0])
301         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
302                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
303
304     def test_16_check_interface_och_spdrc(self):
305         response = test_utils_rfc8040.check_node_attribute_request(
306             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
307         self.assertEqual(response['status_code'], requests.codes.ok)
308         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
309                                    'administrative-state': 'inService',
310                                    'supporting-circuit-pack-name': 'CP1-CFP0',
311                                    'type': 'org-openroadm-interfaces:opticalChannel',
312                                    'supporting-port': 'CP1-CFP0-P1'
313                                    }, **response['interface'][0]),
314                              response['interface'][0])
315         self.assertEqual('org-openroadm-common-types:R100G',
316                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
317         self.assertEqual('dp-qpsk',
318                          response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
319         self.assertEqual(196.1,
320                          float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
321         self.assertEqual(
322             -5,
323             float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
324
325     def test_17_check_interface_OTU4_spdrc(self):
326         response = test_utils_rfc8040.check_node_attribute_request(
327             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
328         self.assertEqual(response['status_code'], requests.codes.ok)
329         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
330                         'administrative-state': 'inService',
331                         'supporting-circuit-pack-name': 'CP1-CFP0',
332                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
333                         'type': 'org-openroadm-interfaces:otnOtu',
334                         'supporting-port': 'CP1-CFP0-P1'
335                         }
336         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
337                         'expected-sapi': 'H/OelLynehI=',
338                         'tx-sapi': 'AMf1n5hK6Xkk',
339                         'expected-dapi': 'AMf1n5hK6Xkk',
340                         'rate': 'org-openroadm-otn-common-types:OTU4',
341                         'fec': 'scfec'
342                         }
343         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
344                              response['interface'][0])
345         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
346                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
347
348     def test_18_check_no_interface_ODU4_spdra(self):
349         response = test_utils_rfc8040.check_node_attribute_request(
350             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
351         self.assertEqual(response['status_code'], requests.codes.conflict)
352
353     def test_19_check_openroadm_topo_spdra(self):
354         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
355         self.assertEqual(response.status_code, requests.codes.ok)
356         res = response.json()
357         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
358         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
359         self.assertEqual({'frequency': 196.1,
360                           'width': 40},
361                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
362         time.sleep(3)
363
364     def test_20_check_openroadm_topo_ROADMA_SRG(self):
365         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
366         self.assertEqual(response.status_code, requests.codes.ok)
367         res = response.json()
368         freq_map = base64.b64decode(
369             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
370         freq_map_array = [int(x) for x in freq_map]
371         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
372         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
373         for ele in liste_tp:
374             if ele['tp-id'] == 'SRG1-PP1-TXRX':
375                 freq_map = base64.b64decode(
376                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
377                 freq_map_array = [int(x) for x in freq_map]
378                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
379             if ele['tp-id'] == 'SRG1-PP2-TXRX':
380                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
381         time.sleep(3)
382
383     def test_21_check_openroadm_topo_ROADMA_DEG(self):
384         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
385         self.assertEqual(response.status_code, requests.codes.ok)
386         res = response.json()
387         freq_map = base64.b64decode(
388             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
389         freq_map_array = [int(x) for x in freq_map]
390         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
391         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
392         for ele in liste_tp:
393             if ele['tp-id'] == 'DEG2-CTP-TXRX':
394                 freq_map = base64.b64decode(
395                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
396                 freq_map_array = [int(x) for x in freq_map]
397                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
398             if ele['tp-id'] == 'DEG2-TTP-TXRX':
399                 freq_map = base64.b64decode(
400                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
401                 freq_map_array = [int(x) for x in freq_map]
402                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
403         time.sleep(3)
404
405     def test_22_check_otn_topo_otu4_links(self):
406         response = test_utils.get_otn_topo_request()
407         self.assertEqual(response.status_code, requests.codes.ok)
408         res = response.json()
409         nb_links = len(res['network'][0]['ietf-network-topology:link'])
410         self.assertEqual(nb_links, 2)
411         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
412                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
413         for link in res['network'][0]['ietf-network-topology:link']:
414             self.assertIn(link['link-id'], listLinkId)
415             self.assertEqual(
416                 link['transportpce-topology:otn-link-type'], 'OTU4')
417             self.assertEqual(
418                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
419             self.assertEqual(
420                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
421             self.assertEqual(
422                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
423             self.assertIn(
424                 link['org-openroadm-common-network:opposite-link'], listLinkId)
425
426 # test service-create for ODU4 service from spdr to spdr
427     def test_23_create_ODU4_service(self):
428         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
429         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
430         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
431         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
432         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
433         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
434         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
435
436         response = test_utils.service_create_request(self.cr_serv_sample_data)
437         self.assertEqual(response.status_code, requests.codes.ok)
438         res = response.json()
439         self.assertIn('PCE calculation in progress',
440                       res['output']['configuration-response-common']['response-message'])
441         time.sleep(self.WAITING)
442
443     def test_24_get_ODU4_service1(self):
444         response = test_utils.get_service_list_request(
445             "services/service1-ODU4")
446         self.assertEqual(response.status_code, requests.codes.ok)
447         res = response.json()
448         self.assertEqual(
449             res['services'][0]['administrative-state'], 'inService')
450         self.assertEqual(
451             res['services'][0]['service-name'], 'service1-ODU4')
452         self.assertEqual(
453             res['services'][0]['connection-type'], 'infrastructure')
454         self.assertEqual(
455             res['services'][0]['lifecycle-state'], 'planned')
456         time.sleep(2)
457
458     def test_25_check_interface_ODU4_spdra(self):
459         response = test_utils_rfc8040.check_node_attribute_request(
460             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
461         self.assertEqual(response['status_code'], requests.codes.ok)
462         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
463                         'administrative-state': 'inService',
464                         'supporting-circuit-pack-name': 'CP1-CFP0',
465                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
466                         'type': 'org-openroadm-interfaces:otnOdu',
467                         'supporting-port': 'CP1-CFP0-P1',
468                         'circuit-id': 'TBD',
469                         'description': 'TBD'}
470         # SAPI/DAPI are added in the Otu4 renderer
471         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
472                         'rate': 'org-openroadm-otn-common-types:ODU4',
473                         'monitoring-mode': 'terminated',
474                         'expected-dapi': 'H/OelLynehI=',
475                         'expected-sapi': 'AMf1n5hK6Xkk',
476                         'tx-dapi': 'AMf1n5hK6Xkk',
477                         'tx-sapi': 'H/OelLynehI='}
478
479         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
480                              response['interface'][0])
481         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
482                                   **input_dict_2),
483                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
484                              )
485         self.assertDictEqual(
486             {'payload-type': '21', 'exp-payload-type': '21'},
487             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
488
489     def test_26_check_interface_ODU4_spdrc(self):
490         response = test_utils_rfc8040.check_node_attribute_request(
491             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
492         self.assertEqual(response['status_code'], requests.codes.ok)
493         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
494                         'administrative-state': 'inService',
495                         'supporting-circuit-pack-name': 'CP1-CFP0',
496                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
497                         'type': 'org-openroadm-interfaces:otnOdu',
498                         'supporting-port': 'CP1-CFP0-P1',
499                         'circuit-id': 'TBD',
500                         'description': 'TBD'}
501         # SAPI/DAPI are added in the Otu4 renderer
502         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
503                         'rate': 'org-openroadm-otn-common-types:ODU4',
504                         'monitoring-mode': 'terminated',
505                         'tx-sapi': 'AMf1n5hK6Xkk',
506                         'tx-dapi': 'H/OelLynehI=',
507                         'expected-sapi': 'H/OelLynehI=',
508                         'expected-dapi': 'AMf1n5hK6Xkk'
509                         }
510         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
511                              response['interface'][0])
512         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
513                                   **input_dict_2),
514                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
515                              )
516         self.assertDictEqual(
517             {'payload-type': '21', 'exp-payload-type': '21'},
518             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
519
520     def test_27_check_otn_topo_links(self):
521         response = test_utils.get_otn_topo_request()
522         self.assertEqual(response.status_code, requests.codes.ok)
523         res = response.json()
524         nb_links = len(res['network'][0]['ietf-network-topology:link'])
525         self.assertEqual(nb_links, 4)
526         for link in res['network'][0]['ietf-network-topology:link']:
527             linkId = link['link-id']
528             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
529                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
530                 self.assertEqual(
531                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
532                 self.assertEqual(
533                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
534             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
535                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
536                 self.assertEqual(
537                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
538                 self.assertEqual(
539                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
540                 self.assertEqual(
541                     link['transportpce-topology:otn-link-type'], 'ODTU4')
542                 self.assertEqual(
543                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
544                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
545                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
546                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
547             else:
548                 self.fail("this link should not exist")
549
550     def test_28_check_otn_topo_tp(self):
551         response = test_utils.get_otn_topo_request()
552         res = response.json()
553         for node in res['network'][0]['node']:
554             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
555                 tpList = node['ietf-network-topology:termination-point']
556                 for tp in tpList:
557                     if tp['tp-id'] == 'XPDR1-NETWORK1':
558                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
559                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
560                         self.assertEqual(
561                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
562                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
563                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
564
565 # test service-create for 10GE service from spdr to spdr
566     def test_29_create_10GE_service(self):
567         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
568         self.cr_serv_sample_data["input"]["connection-type"] = "service"
569         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
570         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
571         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
572         self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
573         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
574         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
575         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
576         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
577         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
578         self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
579         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
580         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
581         response = test_utils.service_create_request(self.cr_serv_sample_data)
582         self.assertEqual(response.status_code, requests.codes.ok)
583         res = response.json()
584         self.assertIn('PCE calculation in progress',
585                       res['output']['configuration-response-common']['response-message'])
586         time.sleep(self.WAITING)
587
588     def test_30_get_10GE_service1(self):
589         response = test_utils.get_service_list_request(
590             "services/service1-10GE")
591         self.assertEqual(response.status_code, requests.codes.ok)
592         res = response.json()
593         self.assertEqual(
594             res['services'][0]['administrative-state'], 'inService')
595         self.assertEqual(
596             res['services'][0]['service-name'], 'service1-10GE')
597         self.assertEqual(
598             res['services'][0]['connection-type'], 'service')
599         self.assertEqual(
600             res['services'][0]['lifecycle-state'], 'planned')
601         time.sleep(2)
602
603     def test_31_check_interface_10GE_CLIENT_spdra(self):
604         response = test_utils_rfc8040.check_node_attribute_request(
605             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
606         self.assertEqual(response['status_code'], requests.codes.ok)
607         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
608                       'administrative-state': 'inService',
609                       'supporting-circuit-pack-name': 'CP1-SFP4',
610                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
611                       'supporting-port': 'CP1-SFP4-P1'
612                       }
613         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
614                              response['interface'][0])
615         self.assertEqual(
616             10000,
617             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
618
619     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
620         response = test_utils_rfc8040.check_node_attribute_request(
621             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
622         self.assertEqual(response['status_code'], requests.codes.ok)
623         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
624                         'administrative-state': 'inService',
625                         'supporting-circuit-pack-name': 'CP1-SFP4',
626                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
627                         'type': 'org-openroadm-interfaces:otnOdu',
628                         'supporting-port': 'CP1-SFP4-P1'}
629         input_dict_2 = {
630             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
631             'rate': 'org-openroadm-otn-common-types:ODU2e',
632             'monitoring-mode': 'terminated'}
633
634         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
635                              response['interface'][0])
636         self.assertDictEqual(dict(input_dict_2,
637                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
638                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
639         self.assertDictEqual(
640             {'payload-type': '03', 'exp-payload-type': '03'},
641             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
642
643     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
644         response = test_utils_rfc8040.check_node_attribute_request(
645             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
646         self.assertEqual(response['status_code'], requests.codes.ok)
647         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
648                         'administrative-state': 'inService',
649                         'supporting-circuit-pack-name': 'CP1-CFP0',
650                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
651                         'type': 'org-openroadm-interfaces:otnOdu',
652                         'supporting-port': 'CP1-CFP0-P1'}
653         input_dict_2 = {
654             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
655             'rate': 'org-openroadm-otn-common-types:ODU2e',
656             'monitoring-mode': 'monitored'}
657         input_dict_3 = {'trib-port-number': 1}
658
659         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
660                              response['interface'][0])
661         self.assertDictEqual(dict(input_dict_2,
662                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
663                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
664         self.assertDictEqual(dict(input_dict_3,
665                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
666                                       'parent-odu-allocation']),
667                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
668         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
669                       ['trib-slots'])
670
671     def test_34_check_ODU2E_connection_spdra(self):
672         response = test_utils_rfc8040.check_node_attribute_request(
673             'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
674         self.assertEqual(response['status_code'], requests.codes.ok)
675         input_dict_1 = {
676             'connection-name':
677             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
678             'direction': 'bidirectional'
679         }
680
681         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
682                              response['odu-connection'][0])
683         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
684                              response['odu-connection'][0]['destination'])
685         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
686                              response['odu-connection'][0]['source'])
687
688     def test_35_check_interface_10GE_CLIENT_spdrc(self):
689         response = test_utils_rfc8040.check_node_attribute_request(
690             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
691         self.assertEqual(response['status_code'], requests.codes.ok)
692         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
693                       'administrative-state': 'inService',
694                       'supporting-circuit-pack-name': 'CP1-SFP4',
695                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
696                       'supporting-port': 'CP1-SFP4-P1'
697                       }
698         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
699                              response['interface'][0])
700         self.assertEqual(
701             10000,
702             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
703
704     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
705         response = test_utils_rfc8040.check_node_attribute_request(
706             'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
707         self.assertEqual(response['status_code'], requests.codes.ok)
708         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
709                         'administrative-state': 'inService',
710                         'supporting-circuit-pack-name': 'CP1-SFP4',
711                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
712                         'type': 'org-openroadm-interfaces:otnOdu',
713                         'supporting-port': 'CP1-SFP4-P1'}
714         input_dict_2 = {
715             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
716             'rate': 'org-openroadm-otn-common-types:ODU2e',
717             'monitoring-mode': 'terminated'}
718
719         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
720                              response['interface'][0])
721         self.assertDictEqual(dict(input_dict_2,
722                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
723                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
724         self.assertDictEqual(
725             {'payload-type': '03', 'exp-payload-type': '03'},
726             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
727
728     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
729         response = test_utils_rfc8040.check_node_attribute_request(
730             'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
731         self.assertEqual(response['status_code'], requests.codes.ok)
732         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
733                         'administrative-state': 'inService',
734                         'supporting-circuit-pack-name': 'CP1-CFP0',
735                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
736                         'type': 'org-openroadm-interfaces:otnOdu',
737                         'supporting-port': 'CP1-CFP0-P1'}
738         input_dict_2 = {
739             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
740             'rate': 'org-openroadm-otn-common-types:ODU2e',
741             'monitoring-mode': 'monitored'}
742
743         input_dict_3 = {'trib-port-number': 1}
744
745         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
746                              response['interface'][0])
747         self.assertDictEqual(dict(input_dict_2,
748                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
749                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
750         self.assertDictEqual(dict(input_dict_3,
751                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
752                                       'parent-odu-allocation']),
753                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
754             'parent-odu-allocation'])
755         self.assertIn(1,
756                       response['interface'][0][
757                           'org-openroadm-otn-odu-interfaces:odu'][
758                           'parent-odu-allocation']['trib-slots'])
759
760     def test_38_check_ODU2E_connection_spdrc(self):
761         response = test_utils_rfc8040.check_node_attribute_request(
762             'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
763         self.assertEqual(response['status_code'], requests.codes.ok)
764         input_dict_1 = {
765             'connection-name':
766             'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
767             'direction': 'bidirectional'
768         }
769
770         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
771                              response['odu-connection'][0])
772         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
773                              response['odu-connection'][0]['destination'])
774         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
775                              response['odu-connection'][0]['source'])
776
777     def test_39_check_otn_topo_links(self):
778         response = test_utils.get_otn_topo_request()
779         self.assertEqual(response.status_code, requests.codes.ok)
780         res = response.json()
781         nb_links = len(res['network'][0]['ietf-network-topology:link'])
782         self.assertEqual(nb_links, 4)
783         for link in res['network'][0]['ietf-network-topology:link']:
784             linkId = link['link-id']
785             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
786                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
787                 self.assertEqual(
788                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
789                 self.assertEqual(
790                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
791
792     def test_40_check_otn_topo_tp(self):
793         response = test_utils.get_otn_topo_request()
794         res = response.json()
795         for node in res['network'][0]['node']:
796             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
797                 tpList = node['ietf-network-topology:termination-point']
798                 for tp in tpList:
799                     if tp['tp-id'] == 'XPDR1-NETWORK1':
800                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
801                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
802                         tsPoolList = list(range(1, 9))
803                         self.assertNotIn(
804                             tsPoolList, xpdrTpPortConAt['ts-pool'])
805                         self.assertEqual(
806                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
807                         self.assertNotIn(
808                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
809
810     def test_41_delete_10GE_service(self):
811         response = test_utils.service_delete_request("service1-10GE")
812         self.assertEqual(response.status_code, requests.codes.ok)
813         res = response.json()
814         self.assertIn('Renderer service delete in progress',
815                       res['output']['configuration-response-common']['response-message'])
816         time.sleep(self.WAITING)
817
818     def test_42_check_service_list(self):
819         response = test_utils.get_service_list_request("")
820         self.assertEqual(response.status_code, requests.codes.ok)
821         res = response.json()
822         self.assertEqual(len(res['service-list']['services']), 2)
823         time.sleep(2)
824
825     def test_43_check_no_ODU2e_connection_spdra(self):
826         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
827         self.assertEqual(response.status_code, requests.codes.ok)
828         res = response.json()
829         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
830         time.sleep(1)
831
832     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
833         response = test_utils_rfc8040.check_node_attribute_request(
834             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
835         self.assertEqual(response['status_code'], requests.codes.conflict)
836
837     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
838         response = test_utils_rfc8040.check_node_attribute_request(
839             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
840         self.assertEqual(response['status_code'], requests.codes.conflict)
841
842     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
843         response = test_utils_rfc8040.check_node_attribute_request(
844             'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
845         self.assertEqual(response['status_code'], requests.codes.conflict)
846
847     def test_47_check_otn_topo_links(self):
848         response = test_utils.get_otn_topo_request()
849         self.assertEqual(response.status_code, requests.codes.ok)
850         res = response.json()
851         nb_links = len(res['network'][0]['ietf-network-topology:link'])
852         self.assertEqual(nb_links, 4)
853         for link in res['network'][0]['ietf-network-topology:link']:
854             linkId = link['link-id']
855             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
856                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
857                 self.assertEqual(
858                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
859                 self.assertEqual(
860                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
861
862     def test_48_check_otn_topo_tp(self):
863         response = test_utils.get_otn_topo_request()
864         res = response.json()
865         for node in res['network'][0]['node']:
866             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
867                 tpList = node['ietf-network-topology:termination-point']
868                 for tp in tpList:
869                     if tp['tp-id'] == 'XPDR1-NETWORK1':
870                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
871                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
872                         self.assertEqual(
873                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
874
875     def test_49_delete_ODU4_service(self):
876         response = test_utils.service_delete_request("service1-ODU4")
877         self.assertEqual(response.status_code, requests.codes.ok)
878         res = response.json()
879         self.assertIn('Renderer service delete in progress',
880                       res['output']['configuration-response-common']['response-message'])
881         time.sleep(self.WAITING)
882
883     def test_50_check_service_list(self):
884         response = test_utils.get_service_list_request("")
885         self.assertEqual(response.status_code, requests.codes.ok)
886         res = response.json()
887         self.assertEqual(len(res['service-list']['services']), 1)
888         time.sleep(2)
889
890     def test_51_check_no_interface_ODU4_spdra(self):
891         response = test_utils_rfc8040.check_node_attribute_request(
892             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
893         self.assertEqual(response['status_code'], requests.codes.conflict)
894
895     def test_52_check_otn_topo_links(self):
896         self.test_22_check_otn_topo_otu4_links()
897
898     def test_53_check_otn_topo_tp(self):
899         response = test_utils.get_otn_topo_request()
900         res = response.json()
901         for node in res['network'][0]['node']:
902             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
903                 tpList = node['ietf-network-topology:termination-point']
904                 for tp in tpList:
905                     if tp['tp-id'] == 'XPDR1-NETWORK1':
906                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
907                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
908                         self.assertNotIn(
909                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
910
911     def test_54_delete_OCH_OTU4_service(self):
912         response = test_utils.service_delete_request("service1-OCH-OTU4")
913         self.assertEqual(response.status_code, requests.codes.ok)
914         res = response.json()
915         self.assertIn('Renderer service delete in progress',
916                       res['output']['configuration-response-common']['response-message'])
917         time.sleep(self.WAITING)
918
919     def test_55_get_no_service(self):
920         response = test_utils.get_service_list_request("")
921         self.assertEqual(response.status_code, requests.codes.conflict)
922         res = response.json()
923         self.assertIn(
924             {"error-type": "application", "error-tag": "data-missing",
925              "error-message": "Request could not be completed because the relevant data model content does not exist"},
926             res['errors']['error'])
927         time.sleep(1)
928
929     def test_56_check_no_interface_OTU4_spdra(self):
930         response = test_utils_rfc8040.check_node_attribute_request(
931             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
932         self.assertEqual(response['status_code'], requests.codes.conflict)
933
934     def test_57_check_no_interface_OCH_spdra(self):
935         response = test_utils_rfc8040.check_node_attribute_request(
936             'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
937         self.assertEqual(response['status_code'], requests.codes.conflict)
938
939     def test_58_getLinks_OtnTopology(self):
940         response = test_utils.get_otn_topo_request()
941         self.assertEqual(response.status_code, requests.codes.ok)
942         res = response.json()
943         self.assertNotIn('ietf-network-topology:link', res['network'][0])
944
945     def test_59_check_openroadm_topo_spdra(self):
946         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
947         self.assertEqual(response.status_code, requests.codes.ok)
948         res = response.json()
949         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
950         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
951         self.assertNotIn('wavelength', dict.keys(
952             tp['org-openroadm-network-topology:xpdr-network-attributes']))
953         time.sleep(3)
954
955     def test_60_check_openroadm_topo_ROADMA_SRG(self):
956         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
957         self.assertEqual(response.status_code, requests.codes.ok)
958         res = response.json()
959         freq_map = base64.b64decode(
960             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
961         freq_map_array = [int(x) for x in freq_map]
962         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
963         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
964         for ele in liste_tp:
965             if ele['tp-id'] == 'SRG1-PP1-TXRX':
966                 freq_map = base64.b64decode(
967                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
968                 freq_map_array = [int(x) for x in freq_map]
969                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
970         time.sleep(3)
971
972     def test_61_check_openroadm_topo_ROADMA_DEG(self):
973         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
974         self.assertEqual(response.status_code, requests.codes.ok)
975         res = response.json()
976         freq_map = base64.b64decode(
977             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
978         freq_map_array = [int(x) for x in freq_map]
979         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
980         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
981         for ele in liste_tp:
982             if ele['tp-id'] == 'DEG2-CTP-TXRX':
983                 freq_map = base64.b64decode(
984                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
985                 freq_map_array = [int(x) for x in freq_map]
986                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
987             if ele['tp-id'] == 'DEG2-TTP-TXRX':
988                 freq_map = base64.b64decode(
989                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
990                 freq_map_array = [int(x) for x in freq_map]
991                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
992         time.sleep(3)
993
994     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
995         response = test_utils_rfc8040.transportpce_api_rpc_request(
996             'transportpce-networkutils', 'init-xpdr-rdm-links',
997             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
998                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
999         self.assertEqual(response['status_code'], requests.codes.ok)
1000
1001     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1002         response = test_utils_rfc8040.transportpce_api_rpc_request(
1003             'transportpce-networkutils', 'init-rdm-xpdr-links',
1004             {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
1005                              'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1006         self.assertEqual(response['status_code'], requests.codes.ok)
1007
1008     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1009         response = test_utils_rfc8040.transportpce_api_rpc_request(
1010             'transportpce-networkutils', 'init-xpdr-rdm-links',
1011             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1012                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1013         self.assertEqual(response['status_code'], requests.codes.ok)
1014
1015     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1016         response = test_utils_rfc8040.transportpce_api_rpc_request(
1017             'transportpce-networkutils', 'init-rdm-xpdr-links',
1018             {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1019                              'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1020         self.assertEqual(response['status_code'], requests.codes.ok)
1021
1022     def test_66_create_OCH_OTU4_service_2(self):
1023         # pylint: disable=line-too-long
1024         self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1025         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1026         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1027         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1028         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1029         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1030         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1031         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1032         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1033         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1034         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1035         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1036         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1037         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1038         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1039         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1040         response = test_utils.service_create_request(self.cr_serv_sample_data)
1041         self.assertEqual(response.status_code, requests.codes.ok)
1042         res = response.json()
1043         self.assertIn('PCE calculation in progress',
1044                       res['output']['configuration-response-common']['response-message'])
1045         time.sleep(self.WAITING)
1046
1047     def test_67_get_OCH_OTU4_service2(self):
1048         response = test_utils.get_service_list_request(
1049             "services/service2-OCH-OTU4")
1050         self.assertEqual(response.status_code, requests.codes.ok)
1051         res = response.json()
1052         self.assertEqual(
1053             res['services'][0]['administrative-state'], 'inService')
1054         self.assertEqual(
1055             res['services'][0]['service-name'], 'service2-OCH-OTU4')
1056         self.assertEqual(
1057             res['services'][0]['connection-type'], 'infrastructure')
1058         self.assertEqual(
1059             res['services'][0]['lifecycle-state'], 'planned')
1060         time.sleep(2)
1061
1062     def test_68_create_ODU4_service_2(self):
1063         # pylint: disable=line-too-long
1064         self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1065         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1066         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1067         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1068         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1069         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1070         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1071         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1072         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1073         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1074         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1075         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1076         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1077         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1078         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1079
1080         response = test_utils.service_create_request(self.cr_serv_sample_data)
1081         self.assertEqual(response.status_code, requests.codes.ok)
1082         res = response.json()
1083         self.assertIn('PCE calculation in progress',
1084                       res['output']['configuration-response-common']['response-message'])
1085         time.sleep(self.WAITING)
1086
1087     def test_69_get_ODU4_service2(self):
1088         response = test_utils.get_service_list_request(
1089             "services/service2-ODU4")
1090         self.assertEqual(response.status_code, requests.codes.ok)
1091         res = response.json()
1092         self.assertEqual(
1093             res['services'][0]['administrative-state'], 'inService')
1094         self.assertEqual(
1095             res['services'][0]['service-name'], 'service2-ODU4')
1096         self.assertEqual(
1097             res['services'][0]['connection-type'], 'infrastructure')
1098         self.assertEqual(
1099             res['services'][0]['lifecycle-state'], 'planned')
1100         time.sleep(2)
1101
1102     def test_70_create_1GE_service(self):
1103         # pylint: disable=line-too-long
1104         self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1105         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1106         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1107         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1108         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1109         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1110         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1111         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1112         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1113         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1114         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1115         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1116         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1117         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1118         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1119         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1120         response = test_utils.service_create_request(self.cr_serv_sample_data)
1121         self.assertEqual(response.status_code, requests.codes.ok)
1122         res = response.json()
1123         self.assertIn('PCE calculation in progress',
1124                       res['output']['configuration-response-common']['response-message'])
1125         time.sleep(self.WAITING)
1126
1127     def test_71_get_1GE_service1(self):
1128         response = test_utils.get_service_list_request("services/service1-1GE")
1129         self.assertEqual(response.status_code, requests.codes.ok)
1130         res = response.json()
1131         self.assertEqual(
1132             res['services'][0]['administrative-state'], 'inService')
1133         self.assertEqual(
1134             res['services'][0]['service-name'], 'service1-1GE')
1135         self.assertEqual(
1136             res['services'][0]['connection-type'], 'service')
1137         self.assertEqual(
1138             res['services'][0]['lifecycle-state'], 'planned')
1139         time.sleep(2)
1140
1141     def test_72_check_interface_1GE_CLIENT_spdra(self):
1142         response = test_utils_rfc8040.check_node_attribute_request(
1143             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1144         self.assertEqual(response['status_code'], requests.codes.ok)
1145         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1146                       'administrative-state': 'inService',
1147                       'supporting-circuit-pack-name': 'CP1-SFP4',
1148                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1149                       'supporting-port': 'CP1-SFP4-P1'
1150                       }
1151         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1152                              response['interface'][0])
1153         self.assertEqual(
1154             1000,
1155             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1156
1157     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1158         response = test_utils_rfc8040.check_node_attribute_request(
1159             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1160         self.assertEqual(response['status_code'], requests.codes.ok)
1161         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1162                         'administrative-state': 'inService',
1163                         'supporting-circuit-pack-name': 'CP3-SFP1',
1164                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1165                         'type': 'org-openroadm-interfaces:otnOdu',
1166                         'supporting-port': 'CP3-SFP1-P1'}
1167         input_dict_2 = {
1168             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1169             'rate': 'org-openroadm-otn-common-types:ODU0',
1170             'monitoring-mode': 'terminated'}
1171         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1172                              response['interface'][0])
1173         self.assertDictEqual(dict(input_dict_2,
1174                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1175                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1176         self.assertDictEqual(
1177             {'payload-type': '07', 'exp-payload-type': '07'},
1178             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1179
1180     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1181         response = test_utils_rfc8040.check_node_attribute_request(
1182             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1183         self.assertEqual(response['status_code'], requests.codes.ok)
1184         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1185                         'administrative-state': 'inService',
1186                         'supporting-circuit-pack-name': 'CP3-CFP0',
1187                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1188                         'type': 'org-openroadm-interfaces:otnOdu',
1189                         'supporting-port': 'CP3-CFP0-P1'}
1190         input_dict_2 = {
1191             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1192             'rate': 'org-openroadm-otn-common-types:ODU0',
1193             'monitoring-mode': 'monitored'}
1194         input_dict_3 = {'trib-port-number': 1}
1195         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1196                              response['interface'][0])
1197         self.assertDictEqual(dict(input_dict_2,
1198                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1199                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1200         self.assertDictEqual(dict(input_dict_3,
1201                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1202                                       'parent-odu-allocation']),
1203                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1204         self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1205                       ['trib-slots'])
1206
1207     def test_75_check_ODU0_connection_spdra(self):
1208         response = test_utils_rfc8040.check_node_attribute_request(
1209             'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1210         self.assertEqual(response['status_code'], requests.codes.ok)
1211         input_dict_1 = {
1212             'connection-name':
1213             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1214             'direction': 'bidirectional'
1215         }
1216         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1217                              response['odu-connection'][0])
1218         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1219                              response['odu-connection'][0]['destination'])
1220         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1221                              response['odu-connection'][0]['source'])
1222
1223     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1224         response = test_utils_rfc8040.check_node_attribute_request(
1225             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1226         self.assertEqual(response['status_code'], requests.codes.ok)
1227         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1228                       'administrative-state': 'inService',
1229                       'supporting-circuit-pack-name': 'CP3-SFP1',
1230                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1231                       'supporting-port': 'CP3-SFP1-P1'
1232                       }
1233         self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1234                              response['interface'][0])
1235         self.assertEqual(
1236             1000,
1237             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1238
1239     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1240         response = test_utils_rfc8040.check_node_attribute_request(
1241             'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1242         self.assertEqual(response['status_code'], requests.codes.ok)
1243         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1244                         'administrative-state': 'inService',
1245                         'supporting-circuit-pack-name': 'CP3-SFP1',
1246                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1247                         'type': 'org-openroadm-interfaces:otnOdu',
1248                         'supporting-port': 'CP3-SFP1-P1'}
1249         input_dict_2 = {
1250             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1251             'rate': 'org-openroadm-otn-common-types:ODU0',
1252             'monitoring-mode': 'terminated'}
1253         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1254                              response['interface'][0])
1255         self.assertDictEqual(dict(input_dict_2,
1256                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1257                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1258         self.assertDictEqual(
1259             {'payload-type': '07', 'exp-payload-type': '07'},
1260             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1261
1262     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1263         response = test_utils_rfc8040.check_node_attribute_request(
1264             'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1265         self.assertEqual(response['status_code'], requests.codes.ok)
1266         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1267                         'administrative-state': 'inService',
1268                         'supporting-circuit-pack-name': 'CP3-CFP0',
1269                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1270                         'type': 'org-openroadm-interfaces:otnOdu',
1271                         'supporting-port': 'CP3-CFP0-P1'}
1272         input_dict_2 = {
1273             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1274             'rate': 'org-openroadm-otn-common-types:ODU0',
1275             'monitoring-mode': 'monitored'}
1276         input_dict_3 = {'trib-port-number': 1}
1277         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1278                              response['interface'][0])
1279         self.assertDictEqual(dict(input_dict_2,
1280                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1281                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1282         self.assertDictEqual(dict(input_dict_3,
1283                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1284                                       'parent-odu-allocation']),
1285                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1286             'parent-odu-allocation'])
1287         self.assertIn(1,
1288                       response['interface'][0][
1289                           'org-openroadm-otn-odu-interfaces:odu'][
1290                           'parent-odu-allocation']['trib-slots'])
1291
1292     def test_79_check_ODU0_connection_spdrc(self):
1293         response = test_utils_rfc8040.check_node_attribute_request(
1294             'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1295         self.assertEqual(response['status_code'], requests.codes.ok)
1296         input_dict_1 = {
1297             'connection-name':
1298             'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1299             'direction': 'bidirectional'
1300         }
1301         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1302                              response['odu-connection'][0])
1303         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1304                              response['odu-connection'][0]['destination'])
1305         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1306                              response['odu-connection'][0]['source'])
1307
1308     def test_80_check_otn_topo_links(self):
1309         response = test_utils.get_otn_topo_request()
1310         self.assertEqual(response.status_code, requests.codes.ok)
1311         res = response.json()
1312         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1313         self.assertEqual(nb_links, 4)
1314         for link in res['network'][0]['ietf-network-topology:link']:
1315             linkId = link['link-id']
1316             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1317                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1318                 self.assertEqual(
1319                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1320                 self.assertEqual(
1321                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1322
1323     def test_81_check_otn_topo_tp(self):
1324         response = test_utils.get_otn_topo_request()
1325         res = response.json()
1326         for node in res['network'][0]['node']:
1327             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1328                 tpList = node['ietf-network-topology:termination-point']
1329                 for tp in tpList:
1330                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1331                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1332                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1333                         tsPoolList = list(range(1, 2))
1334                         self.assertNotIn(
1335                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1336                         self.assertEqual(
1337                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1338                         self.assertNotIn(
1339                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1340
1341     def test_82_delete_1GE_service(self):
1342         response = test_utils.service_delete_request("service1-1GE")
1343         self.assertEqual(response.status_code, requests.codes.ok)
1344         res = response.json()
1345         self.assertIn('Renderer service delete in progress',
1346                       res['output']['configuration-response-common']['response-message'])
1347         time.sleep(self.WAITING)
1348
1349     def test_83_check_service_list(self):
1350         response = test_utils.get_service_list_request("")
1351         self.assertEqual(response.status_code, requests.codes.ok)
1352         res = response.json()
1353         self.assertEqual(len(res['service-list']['services']), 2)
1354         time.sleep(2)
1355
1356     def test_84_check_no_ODU0_connection_spdra(self):
1357         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1358         self.assertEqual(response.status_code, requests.codes.ok)
1359         res = response.json()
1360         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1361         time.sleep(1)
1362
1363     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1364         response = test_utils_rfc8040.check_node_attribute_request(
1365             'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1366         self.assertEqual(response['status_code'], requests.codes.conflict)
1367
1368     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1369         response = test_utils_rfc8040.check_node_attribute_request(
1370             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1371         self.assertEqual(response['status_code'], requests.codes.conflict)
1372
1373     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1374         response = test_utils_rfc8040.check_node_attribute_request(
1375             'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1376         self.assertEqual(response['status_code'], requests.codes.conflict)
1377
1378     def test_88_check_otn_topo_links(self):
1379         response = test_utils.get_otn_topo_request()
1380         self.assertEqual(response.status_code, requests.codes.ok)
1381         res = response.json()
1382         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1383         self.assertEqual(nb_links, 4)
1384         for link in res['network'][0]['ietf-network-topology:link']:
1385             linkId = link['link-id']
1386             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1387                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1388                 self.assertEqual(
1389                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1390                 self.assertEqual(
1391                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1392
1393     def test_89_check_otn_topo_tp(self):
1394         response = test_utils.get_otn_topo_request()
1395         res = response.json()
1396         for node in res['network'][0]['node']:
1397             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1398                 tpList = node['ietf-network-topology:termination-point']
1399                 for tp in tpList:
1400                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1401                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1402                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1403                         self.assertEqual(
1404                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1405
1406     def test_90_delete_ODU4_service(self):
1407         response = test_utils.service_delete_request("service2-ODU4")
1408         self.assertEqual(response.status_code, requests.codes.ok)
1409         res = response.json()
1410         self.assertIn('Renderer service delete in progress',
1411                       res['output']['configuration-response-common']['response-message'])
1412         time.sleep(self.WAITING)
1413
1414     def test_91_delete_OCH_OTU4_service(self):
1415         response = test_utils.service_delete_request("service2-OCH-OTU4")
1416         self.assertEqual(response.status_code, requests.codes.ok)
1417         res = response.json()
1418         self.assertIn('Renderer service delete in progress',
1419                       res['output']['configuration-response-common']['response-message'])
1420         time.sleep(self.WAITING)
1421
1422     def test_92_disconnect_xponders_from_roadm(self):
1423         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1424         response = test_utils.get_ordm_topo_request("")
1425         self.assertEqual(response.status_code, requests.codes.ok)
1426         res = response.json()
1427         links = res['network'][0]['ietf-network-topology:link']
1428         for link in links:
1429             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1430                 link_name = link["link-id"]
1431                 response = test_utils.delete_request(url+link_name)
1432                 self.assertEqual(response.status_code, requests.codes.ok)
1433
1434     def test_93_check_openroadm_topology(self):
1435         response = test_utils.get_ordm_topo_request("")
1436         self.assertEqual(response.status_code, requests.codes.ok)
1437         res = response.json()
1438         links = res['network'][0]['ietf-network-topology:link']
1439         self.assertEqual(18, len(links), 'Topology should contain 18 links')
1440
1441     def test_94_disconnect_spdrA(self):
1442         response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1443         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1444
1445     def test_95_disconnect_spdrC(self):
1446         response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1447         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1448
1449     def test_96_disconnect_roadmA(self):
1450         response = test_utils_rfc8040.unmount_device("ROADM-A1")
1451         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1452
1453     def test_97_disconnect_roadmC(self):
1454         response = test_utils_rfc8040.unmount_device("ROADM-C1")
1455         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1456
1457
1458 if __name__ == "__main__":
1459     unittest.main(verbosity=2)