f448ba331bdc95fdf3229a2d497200c93b509896
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_sample_data = {"input": {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service1-OCH-OTU4",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "subrate-eth-sla": {
50                     "subrate-eth-sla": {
51                         "committed-info-rate": "100000",
52                         "committed-burst-size": "64"
53                     }
54             },
55             "tx-direction": {
56                 "port": {
57                     "port-device-name": "SPDR-SA1-XPDR1",
58                     "port-type": "fixed",
59                     "port-name": "XPDR1-NETWORK1",
60                     "port-rack": "000000.00",
61                     "port-shelf": "Chassis#1"
62                 },
63                 "lgx": {
64                     "lgx-device-name": "Some lgx-device-name",
65                     "lgx-port-name": "Some lgx-port-name",
66                     "lgx-port-rack": "000000.00",
67                     "lgx-port-shelf": "00"
68                 }
69             },
70             "rx-direction": {
71                 "port": {
72                     "port-device-name": "SPDR-SA1-XPDR1",
73                     "port-type": "fixed",
74                     "port-name": "XPDR1-NETWORK1",
75                     "port-rack": "000000.00",
76                     "port-shelf": "Chassis#1"
77                 },
78                 "lgx": {
79                     "lgx-device-name": "Some lgx-device-name",
80                     "lgx-port-name": "Some lgx-port-name",
81                     "lgx-port-rack": "000000.00",
82                     "lgx-port-shelf": "00"
83                 }
84             },
85             "optic-type": "gray"
86         },
87         "service-z-end": {
88             "service-rate": "100",
89             "node-id": "SPDR-SC1",
90             "service-format": "OTU",
91             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92             "clli": "NodeSC",
93             "subrate-eth-sla": {
94                     "subrate-eth-sla": {
95                         "committed-info-rate": "100000",
96                         "committed-burst-size": "64"
97                     }
98             },
99             "tx-direction": {
100                 "port": {
101                     "port-device-name": "SPDR-SC1-XPDR1",
102                     "port-type": "fixed",
103                     "port-name": "XPDR1-NETWORK1",
104                     "port-rack": "000000.00",
105                     "port-shelf": "Chassis#1"
106                 },
107                 "lgx": {
108                     "lgx-device-name": "Some lgx-device-name",
109                     "lgx-port-name": "Some lgx-port-name",
110                     "lgx-port-rack": "000000.00",
111                     "lgx-port-shelf": "00"
112                 }
113             },
114             "rx-direction": {
115                 "port": {
116                     "port-device-name": "SPDR-SC1-XPDR1",
117                     "port-type": "fixed",
118                     "port-name": "XPDR1-NETWORK1",
119                     "port-rack": "000000.00",
120                     "port-shelf": "Chassis#1"
121                 },
122                 "lgx": {
123                     "lgx-device-name": "Some lgx-device-name",
124                     "lgx-port-name": "Some lgx-port-name",
125                     "lgx-port-rack": "000000.00",
126                     "lgx-port-shelf": "00"
127                 }
128             },
129             "optic-type": "gray"
130         },
131         "due-date": "2018-06-15T00:00:01Z",
132         "operator-contact": "pw1234"
133     }
134     }
135
136     @classmethod
137     def setUpClass(cls):
138         cls.processes = test_utils.start_tpce()
139         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140                                                ('roadma', cls.NODE_VERSION),
141                                                ('roadmc', cls.NODE_VERSION),
142                                                ('spdrc', cls.NODE_VERSION)])
143
144     @classmethod
145     def tearDownClass(cls):
146         # pylint: disable=not-an-iterable
147         for process in cls.processes:
148             test_utils.shutdown_process(process)
149         print("all processes killed")
150
151     def setUp(self):
152         time.sleep(5)
153
154     def test_01_connect_spdrA(self):
155         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
156         self.assertEqual(response.status_code,
157                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
158
159     def test_02_connect_spdrC(self):
160         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
161         self.assertEqual(response.status_code,
162                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
163
164     def test_03_connect_rdmA(self):
165         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
166         self.assertEqual(response.status_code,
167                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
168
169     def test_04_connect_rdmC(self):
170         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
171         self.assertEqual(response.status_code,
172                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
173
174     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
175         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
176                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
177         self.assertEqual(response.status_code, requests.codes.ok)
178         res = response.json()
179         self.assertIn('Xponder Roadm Link created successfully',
180                       res["output"]["result"])
181         time.sleep(2)
182
183     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
184         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
185                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
186         self.assertEqual(response.status_code, requests.codes.ok)
187         res = response.json()
188         self.assertIn('Roadm Xponder links created successfully',
189                       res["output"]["result"])
190         time.sleep(2)
191
192     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
193         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
194                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
195         self.assertEqual(response.status_code, requests.codes.ok)
196         res = response.json()
197         self.assertIn('Xponder Roadm Link created successfully',
198                       res["output"]["result"])
199         time.sleep(2)
200
201     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
202         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
203                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
204         self.assertEqual(response.status_code, requests.codes.ok)
205         res = response.json()
206         self.assertIn('Roadm Xponder links created successfully',
207                       res["output"]["result"])
208         time.sleep(2)
209
210     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
211         # Config ROADMA-ROADMC oms-attributes
212         data = {"span": {
213             "auto-spanloss": "true",
214             "spanloss-base": 11.4,
215             "spanloss-current": 12,
216             "engineered-spanloss": 12.2,
217             "link-concatenation": [{
218                 "SRLG-Id": 0,
219                 "fiber-type": "smf",
220                 "SRLG-length": 100000,
221                 "pmd": 0.5}]}}
222         response = test_utils.add_oms_attr_request(
223             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224         self.assertEqual(response.status_code, requests.codes.created)
225
226     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
227         # Config ROADMC-ROADMA oms-attributes
228         data = {"span": {
229             "auto-spanloss": "true",
230             "spanloss-base": 11.4,
231             "spanloss-current": 12,
232             "engineered-spanloss": 12.2,
233             "link-concatenation": [{
234                 "SRLG-Id": 0,
235                 "fiber-type": "smf",
236                 "SRLG-length": 100000,
237                 "pmd": 0.5}]}}
238         response = test_utils.add_oms_attr_request(
239             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240         self.assertEqual(response.status_code, requests.codes.created)
241
242 # test service-create for OCH-OTU4 service from spdr to spdr
243     def test_11_check_otn_topology(self):
244         response = test_utils.get_otn_topo_request()
245         self.assertEqual(response.status_code, requests.codes.ok)
246         res = response.json()
247         nbNode = len(res['network'][0]['node'])
248         self.assertEqual(nbNode, 6, 'There should be 6 nodes')
249         self.assertNotIn('ietf-network-topology:link', res['network'][0])
250
251     def test_12_create_OCH_OTU4_service(self):
252         response = test_utils.service_create_request(self.cr_serv_sample_data)
253         self.assertEqual(response.status_code, requests.codes.ok)
254         res = response.json()
255         self.assertIn('PCE calculation in progress',
256                       res['output']['configuration-response-common']['response-message'])
257         time.sleep(self.WAITING)
258
259     def test_13_get_OCH_OTU4_service1(self):
260         response = test_utils.get_service_list_request(
261             "services/service1-OCH-OTU4")
262         self.assertEqual(response.status_code, requests.codes.ok)
263         res = response.json()
264         self.assertEqual(
265             res['services'][0]['administrative-state'], 'inService')
266         self.assertEqual(
267             res['services'][0]['service-name'], 'service1-OCH-OTU4')
268         self.assertEqual(
269             res['services'][0]['connection-type'], 'infrastructure')
270         self.assertEqual(
271             res['services'][0]['lifecycle-state'], 'planned')
272         time.sleep(2)
273
274     # Check correct configuration of devices
275     def test_14_check_interface_och_spdra(self):
276         response = test_utils.check_netconf_node_request(
277             "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
278         self.assertEqual(response.status_code, requests.codes.ok)
279         res = response.json()
280         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
281                                    'administrative-state': 'inService',
282                                    'supporting-circuit-pack-name': 'CP1-CFP0',
283                                    'type': 'org-openroadm-interfaces:opticalChannel',
284                                    'supporting-port': 'CP1-CFP0-P1'
285                                    }, **res['interface'][0]),
286                              res['interface'][0])
287
288         self.assertDictEqual(
289             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
290              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
291             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
292
293     def test_15_check_interface_OTU4_spdra(self):
294         response = test_utils.check_netconf_node_request(
295             "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
296         self.assertEqual(response.status_code, requests.codes.ok)
297         res = response.json()
298         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
299                         'administrative-state': 'inService',
300                         'supporting-circuit-pack-name': 'CP1-CFP0',
301                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
302                         'type': 'org-openroadm-interfaces:otnOtu',
303                         'supporting-port': 'CP1-CFP0-P1'
304                         }
305         input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
306                         'expected-dapi': 'Swfw02qXGyI=',
307                         'rate': 'org-openroadm-otn-common-types:OTU4',
308                         'fec': 'scfec'
309                         }
310         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
311                              res['interface'][0])
312
313         self.assertDictEqual(input_dict_2,
314                              res['interface'][0]
315                              ['org-openroadm-otn-otu-interfaces:otu'])
316
317     def test_16_check_interface_och_spdrc(self):
318         response = test_utils.check_netconf_node_request(
319             "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
320         self.assertEqual(response.status_code, requests.codes.ok)
321         res = response.json()
322         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
323                                    'administrative-state': 'inService',
324                                    'supporting-circuit-pack-name': 'CP1-CFP0',
325                                    'type': 'org-openroadm-interfaces:opticalChannel',
326                                    'supporting-port': 'CP1-CFP0-P1'
327                                    }, **res['interface'][0]),
328                              res['interface'][0])
329
330         self.assertDictEqual(
331             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
332              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
333             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
334
335     def test_17_check_interface_OTU4_spdrc(self):
336         response = test_utils.check_netconf_node_request(
337             "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
338         self.assertEqual(response.status_code, requests.codes.ok)
339         res = response.json()
340         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
341                         'administrative-state': 'inService',
342                         'supporting-circuit-pack-name': 'CP1-CFP0',
343                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
344                         'type': 'org-openroadm-interfaces:otnOtu',
345                         'supporting-port': 'CP1-CFP0-P1'
346                         }
347         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
348                         'expected-sapi': 'Swfw02qXGyI=',
349                         'tx-sapi': 'fuYZwEO660g=',
350                         'expected-dapi': 'fuYZwEO660g=',
351                         'rate': 'org-openroadm-otn-common-types:OTU4',
352                         'fec': 'scfec'
353                         }
354
355         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
356                              res['interface'][0])
357
358         self.assertDictEqual(input_dict_2,
359                              res['interface'][0]
360                              ['org-openroadm-otn-otu-interfaces:otu'])
361
362     def test_18_check_no_interface_ODU4_spdra(self):
363         response = test_utils.check_netconf_node_request(
364             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
365         self.assertEqual(response.status_code, requests.codes.conflict)
366         res = response.json()
367         self.assertIn(
368             {"error-type": "application", "error-tag": "data-missing",
369              "error-message": "Request could not be completed because the relevant data model content does not exist"},
370             res['errors']['error'])
371
372     def test_19_check_openroadm_topo_spdra(self):
373         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
374         self.assertEqual(response.status_code, requests.codes.ok)
375         res = response.json()
376         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
377         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
378         self.assertEqual({'frequency': 196.1,
379                           'width': 40},
380                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
381         time.sleep(3)
382
383     def test_20_check_openroadm_topo_ROADMA_SRG(self):
384         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
385         self.assertEqual(response.status_code, requests.codes.ok)
386         res = response.json()
387         freq_map = base64.b64decode(
388             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
389         freq_map_array = [int(x) for x in freq_map]
390         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
391         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
392         for ele in liste_tp:
393             if ele['tp-id'] == 'SRG1-PP1-TXRX':
394                 freq_map = base64.b64decode(
395                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
396                 freq_map_array = [int(x) for x in freq_map]
397                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
398             if ele['tp-id'] == 'SRG1-PP2-TXRX':
399                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
400         time.sleep(3)
401
402     def test_21_check_openroadm_topo_ROADMA_DEG(self):
403         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
404         self.assertEqual(response.status_code, requests.codes.ok)
405         res = response.json()
406         freq_map = base64.b64decode(
407             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
408         freq_map_array = [int(x) for x in freq_map]
409         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
410         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
411         for ele in liste_tp:
412             if ele['tp-id'] == 'DEG2-CTP-TXRX':
413                 freq_map = base64.b64decode(
414                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
415                 freq_map_array = [int(x) for x in freq_map]
416                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
417             if ele['tp-id'] == 'DEG2-TTP-TXRX':
418                 freq_map = base64.b64decode(
419                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
420                 freq_map_array = [int(x) for x in freq_map]
421                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
422         time.sleep(3)
423
424     def test_22_check_otn_topo_otu4_links(self):
425         response = test_utils.get_otn_topo_request()
426         self.assertEqual(response.status_code, requests.codes.ok)
427         res = response.json()
428         nb_links = len(res['network'][0]['ietf-network-topology:link'])
429         self.assertEqual(nb_links, 2)
430         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
431                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
432         for link in res['network'][0]['ietf-network-topology:link']:
433             self.assertIn(link['link-id'], listLinkId)
434             self.assertEqual(
435                 link['transportpce-topology:otn-link-type'], 'OTU4')
436             self.assertEqual(
437                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
438             self.assertEqual(
439                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
440             self.assertEqual(
441                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
442             self.assertIn(
443                 link['org-openroadm-common-network:opposite-link'], listLinkId)
444
445 # test service-create for ODU4 service from spdr to spdr
446     def test_23_create_ODU4_service(self):
447         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
448         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
449         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
450         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
451         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
452         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
453         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
454
455         response = test_utils.service_create_request(self.cr_serv_sample_data)
456         self.assertEqual(response.status_code, requests.codes.ok)
457         res = response.json()
458         self.assertIn('PCE calculation in progress',
459                       res['output']['configuration-response-common']['response-message'])
460         time.sleep(self.WAITING)
461
462     def test_24_get_ODU4_service1(self):
463         response = test_utils.get_service_list_request(
464             "services/service1-ODU4")
465         self.assertEqual(response.status_code, requests.codes.ok)
466         res = response.json()
467         self.assertEqual(
468             res['services'][0]['administrative-state'], 'inService')
469         self.assertEqual(
470             res['services'][0]['service-name'], 'service1-ODU4')
471         self.assertEqual(
472             res['services'][0]['connection-type'], 'infrastructure')
473         self.assertEqual(
474             res['services'][0]['lifecycle-state'], 'planned')
475         time.sleep(2)
476
477     def test_25_check_interface_ODU4_spdra(self):
478         response = test_utils.check_netconf_node_request(
479             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
480         self.assertEqual(response.status_code, requests.codes.ok)
481         res = response.json()
482         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
483                         'administrative-state': 'inService',
484                         'supporting-circuit-pack-name': 'CP1-CFP0',
485                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
486                         'type': 'org-openroadm-interfaces:otnOdu',
487                         'supporting-port': 'CP1-CFP0-P1'}
488         # SAPI/DAPI are added in the Otu4 renderer
489         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
490                         'rate': 'org-openroadm-otn-common-types:ODU4',
491                         'expected-dapi': 'Swfw02qXGyI=',
492                         'expected-sapi': 'fuYZwEO660g=',
493                         'tx-dapi': 'fuYZwEO660g=',
494                         'tx-sapi': 'Swfw02qXGyI='}
495
496         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
497                              res['interface'][0])
498         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
499                                   **input_dict_2),
500                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
501                              )
502         self.assertDictEqual(
503             {'payload-type': '21', 'exp-payload-type': '21'},
504             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
505
506     def test_26_check_interface_ODU4_spdrc(self):
507         response = test_utils.check_netconf_node_request(
508             "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
509         self.assertEqual(response.status_code, requests.codes.ok)
510         res = response.json()
511         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
512                         'administrative-state': 'inService',
513                         'supporting-circuit-pack-name': 'CP1-CFP0',
514                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
515                         'type': 'org-openroadm-interfaces:otnOdu',
516                         'supporting-port': 'CP1-CFP0-P1'}
517         # SAPI/DAPI are added in the Otu4 renderer
518         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
519                         'rate': 'org-openroadm-otn-common-types:ODU4',
520                         'tx-sapi': 'fuYZwEO660g=',
521                         'tx-dapi': 'Swfw02qXGyI=',
522                         'expected-sapi': 'Swfw02qXGyI=',
523                         'expected-dapi': 'fuYZwEO660g='
524                         }
525         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
526                              res['interface'][0])
527         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
528                                   **input_dict_2),
529                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
530                              )
531         self.assertDictEqual(
532             {'payload-type': '21', 'exp-payload-type': '21'},
533             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
534
535     def test_27_check_otn_topo_links(self):
536         response = test_utils.get_otn_topo_request()
537         self.assertEqual(response.status_code, requests.codes.ok)
538         res = response.json()
539         nb_links = len(res['network'][0]['ietf-network-topology:link'])
540         self.assertEqual(nb_links, 4)
541         for link in res['network'][0]['ietf-network-topology:link']:
542             linkId = link['link-id']
543             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
544                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
545                 self.assertEqual(
546                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
547                 self.assertEqual(
548                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
549             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
550                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
551                 self.assertEqual(
552                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
553                 self.assertEqual(
554                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
555                 self.assertEqual(
556                     link['transportpce-topology:otn-link-type'], 'ODTU4')
557                 self.assertEqual(
558                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
559                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
560                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
561                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
562             else:
563                 self.fail("this link should not exist")
564
565     def test_28_check_otn_topo_tp(self):
566         response = test_utils.get_otn_topo_request()
567         res = response.json()
568         for node in res['network'][0]['node']:
569             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
570                 tpList = node['ietf-network-topology:termination-point']
571                 for tp in tpList:
572                     if tp['tp-id'] == 'XPDR1-NETWORK1':
573                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
574                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
575                         self.assertEqual(
576                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
577                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
578                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
579
580 # test service-create for 10GE service from spdr to spdr
581     def test_29_create_10GE_service(self):
582         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
583         self.cr_serv_sample_data["input"]["connection-type"] = "service"
584         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
585         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
586         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
587         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
588         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
589         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
590         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
591         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
592         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
593         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
594         response = test_utils.service_create_request(self.cr_serv_sample_data)
595         self.assertEqual(response.status_code, requests.codes.ok)
596         res = response.json()
597         self.assertIn('PCE calculation in progress',
598                       res['output']['configuration-response-common']['response-message'])
599         time.sleep(self.WAITING)
600
601     def test_30_get_10GE_service1(self):
602         response = test_utils.get_service_list_request(
603             "services/service1-10GE")
604         self.assertEqual(response.status_code, requests.codes.ok)
605         res = response.json()
606         self.assertEqual(
607             res['services'][0]['administrative-state'], 'inService')
608         self.assertEqual(
609             res['services'][0]['service-name'], 'service1-10GE')
610         self.assertEqual(
611             res['services'][0]['connection-type'], 'service')
612         self.assertEqual(
613             res['services'][0]['lifecycle-state'], 'planned')
614         time.sleep(2)
615
616     def test_31_check_interface_10GE_CLIENT_spdra(self):
617         response = test_utils.check_netconf_node_request(
618             "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
619         self.assertEqual(response.status_code, requests.codes.ok)
620         res = response.json()
621         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
622                       'administrative-state': 'inService',
623                       'supporting-circuit-pack-name': 'CP1-SFP4',
624                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
625                       'supporting-port': 'CP1-SFP4-P1'
626                       }
627         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
628                              res['interface'][0])
629         self.assertDictEqual(
630             {'speed': 10000},
631             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
632
633     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
634         response = test_utils.check_netconf_node_request(
635             "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
636         self.assertEqual(response.status_code, requests.codes.ok)
637         res = response.json()
638         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
639                         'administrative-state': 'inService',
640                         'supporting-circuit-pack-name': 'CP1-SFP4',
641                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
642                         'type': 'org-openroadm-interfaces:otnOdu',
643                         'supporting-port': 'CP1-SFP4-P1'}
644         input_dict_2 = {
645             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
646             'rate': 'org-openroadm-otn-common-types:ODU2e',
647             'monitoring-mode': 'terminated'}
648
649         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
650                              res['interface'][0])
651         self.assertDictEqual(dict(input_dict_2,
652                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
653                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
654         self.assertDictEqual(
655             {'payload-type': '03', 'exp-payload-type': '03'},
656             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
657
658     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
659         response = test_utils.check_netconf_node_request(
660             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
661         self.assertEqual(response.status_code, requests.codes.ok)
662         res = response.json()
663         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
664                         'administrative-state': 'inService',
665                         'supporting-circuit-pack-name': 'CP1-CFP0',
666                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
667                         'type': 'org-openroadm-interfaces:otnOdu',
668                         'supporting-port': 'CP1-CFP0-P1'}
669         input_dict_2 = {
670             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
671             'rate': 'org-openroadm-otn-common-types:ODU2e',
672             'monitoring-mode': 'monitored'}
673         input_dict_3 = {'trib-port-number': 1}
674
675         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
676                              res['interface'][0])
677         self.assertDictEqual(dict(input_dict_2,
678                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
679                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
680         self.assertDictEqual(dict(input_dict_3,
681                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
682                                       'parent-odu-allocation']),
683                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
684         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
685                       ['trib-slots'])
686
687     def test_34_check_ODU2E_connection_spdra(self):
688         response = test_utils.check_netconf_node_request(
689             "SPDR-SA1",
690             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
691         self.assertEqual(response.status_code, requests.codes.ok)
692         res = response.json()
693         input_dict_1 = {
694             'connection-name':
695             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
696             'direction': 'bidirectional'
697         }
698
699         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
700                              res['odu-connection'][0])
701         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
702                              res['odu-connection'][0]['destination'])
703         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
704                              res['odu-connection'][0]['source'])
705
706     def test_35_check_interface_10GE_CLIENT_spdrc(self):
707         response = test_utils.check_netconf_node_request(
708             "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
709         self.assertEqual(response.status_code, requests.codes.ok)
710         res = response.json()
711         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
712                       'administrative-state': 'inService',
713                       'supporting-circuit-pack-name': 'CP1-SFP4',
714                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
715                       'supporting-port': 'CP1-SFP4-P1'
716                       }
717         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
718                              res['interface'][0])
719         self.assertDictEqual(
720             {'speed': 10000},
721             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
722
723     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
724         response = test_utils.check_netconf_node_request(
725             "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
726         self.assertEqual(response.status_code, requests.codes.ok)
727         res = response.json()
728         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
729                         'administrative-state': 'inService',
730                         'supporting-circuit-pack-name': 'CP1-SFP4',
731                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
732                         'type': 'org-openroadm-interfaces:otnOdu',
733                         'supporting-port': 'CP1-SFP4-P1'}
734         input_dict_2 = {
735             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
736             'rate': 'org-openroadm-otn-common-types:ODU2e',
737             'monitoring-mode': 'terminated'}
738
739         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
740                              res['interface'][0])
741         self.assertDictEqual(dict(input_dict_2,
742                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
743                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
744         self.assertDictEqual(
745             {'payload-type': '03', 'exp-payload-type': '03'},
746             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
747
748     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
749         response = test_utils.check_netconf_node_request(
750             "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
751         self.assertEqual(response.status_code, requests.codes.ok)
752         res = response.json()
753         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
754                         'administrative-state': 'inService',
755                         'supporting-circuit-pack-name': 'CP1-CFP0',
756                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
757                         'type': 'org-openroadm-interfaces:otnOdu',
758                         'supporting-port': 'CP1-CFP0-P1'}
759         input_dict_2 = {
760             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
761             'rate': 'org-openroadm-otn-common-types:ODU2e',
762             'monitoring-mode': 'monitored'}
763
764         input_dict_3 = {'trib-port-number': 1}
765
766         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
767                              res['interface'][0])
768         self.assertDictEqual(dict(input_dict_2,
769                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
770                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
771         self.assertDictEqual(dict(input_dict_3,
772                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
773                                       'parent-odu-allocation']),
774                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
775             'parent-odu-allocation'])
776         self.assertIn(1,
777                       res['interface'][0][
778                           'org-openroadm-otn-odu-interfaces:odu'][
779                           'parent-odu-allocation']['trib-slots'])
780
781     def test_38_check_ODU2E_connection_spdrc(self):
782         response = test_utils.check_netconf_node_request(
783             "SPDR-SC1",
784             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
785         self.assertEqual(response.status_code, requests.codes.ok)
786         res = response.json()
787         input_dict_1 = {
788             'connection-name':
789             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
790             'direction': 'bidirectional'
791         }
792
793         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
794                              res['odu-connection'][0])
795         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
796                              res['odu-connection'][0]['destination'])
797         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
798                              res['odu-connection'][0]['source'])
799
800     def test_39_check_otn_topo_links(self):
801         response = test_utils.get_otn_topo_request()
802         self.assertEqual(response.status_code, requests.codes.ok)
803         res = response.json()
804         nb_links = len(res['network'][0]['ietf-network-topology:link'])
805         self.assertEqual(nb_links, 4)
806         for link in res['network'][0]['ietf-network-topology:link']:
807             linkId = link['link-id']
808             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
809                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
810                 self.assertEqual(
811                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
812                 self.assertEqual(
813                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
814
815     def test_40_check_otn_topo_tp(self):
816         response = test_utils.get_otn_topo_request()
817         res = response.json()
818         for node in res['network'][0]['node']:
819             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
820                 tpList = node['ietf-network-topology:termination-point']
821                 for tp in tpList:
822                     if tp['tp-id'] == 'XPDR1-NETWORK1':
823                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
824                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
825                         tsPoolList = list(range(1, 9))
826                         self.assertNotIn(
827                             tsPoolList, xpdrTpPortConAt['ts-pool'])
828                         self.assertEqual(
829                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
830                         self.assertNotIn(
831                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
832
833     def test_41_delete_10GE_service(self):
834         response = test_utils.service_delete_request("service1-10GE")
835         self.assertEqual(response.status_code, requests.codes.ok)
836         res = response.json()
837         self.assertIn('Renderer service delete in progress',
838                       res['output']['configuration-response-common']['response-message'])
839         time.sleep(self.WAITING)
840
841     def test_42_check_service_list(self):
842         response = test_utils.get_service_list_request("")
843         self.assertEqual(response.status_code, requests.codes.ok)
844         res = response.json()
845         self.assertEqual(len(res['service-list']['services']), 2)
846         time.sleep(2)
847
848     def test_43_check_no_ODU2e_connection_spdra(self):
849         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
850         self.assertEqual(response.status_code, requests.codes.ok)
851         res = response.json()
852         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
853         time.sleep(1)
854
855     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
856         response = test_utils.check_netconf_node_request(
857             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
858         self.assertEqual(response.status_code, requests.codes.conflict)
859
860     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
861         response = test_utils.check_netconf_node_request(
862             "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
863         self.assertEqual(response.status_code, requests.codes.conflict)
864
865     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
866         response = test_utils.check_netconf_node_request(
867             "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
868         self.assertEqual(response.status_code, requests.codes.conflict)
869
870     def test_47_check_otn_topo_links(self):
871         response = test_utils.get_otn_topo_request()
872         self.assertEqual(response.status_code, requests.codes.ok)
873         res = response.json()
874         nb_links = len(res['network'][0]['ietf-network-topology:link'])
875         self.assertEqual(nb_links, 4)
876         for link in res['network'][0]['ietf-network-topology:link']:
877             linkId = link['link-id']
878             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
879                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
880                 self.assertEqual(
881                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
882                 self.assertEqual(
883                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
884
885     def test_48_check_otn_topo_tp(self):
886         response = test_utils.get_otn_topo_request()
887         res = response.json()
888         for node in res['network'][0]['node']:
889             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
890                 tpList = node['ietf-network-topology:termination-point']
891                 for tp in tpList:
892                     if tp['tp-id'] == 'XPDR1-NETWORK1':
893                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
894                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
895                         self.assertEqual(
896                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
897
898     def test_49_delete_ODU4_service(self):
899         response = test_utils.service_delete_request("service1-ODU4")
900         self.assertEqual(response.status_code, requests.codes.ok)
901         res = response.json()
902         self.assertIn('Renderer service delete in progress',
903                       res['output']['configuration-response-common']['response-message'])
904         time.sleep(self.WAITING)
905
906     def test_50_check_service_list(self):
907         response = test_utils.get_service_list_request("")
908         self.assertEqual(response.status_code, requests.codes.ok)
909         res = response.json()
910         self.assertEqual(len(res['service-list']['services']), 1)
911         time.sleep(2)
912
913     def test_51_check_no_interface_ODU4_spdra(self):
914         response = test_utils.check_netconf_node_request(
915             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
916         self.assertEqual(response.status_code, requests.codes.conflict)
917
918     def test_52_check_otn_topo_links(self):
919         self.test_22_check_otn_topo_otu4_links()
920
921     def test_53_check_otn_topo_tp(self):
922         response = test_utils.get_otn_topo_request()
923         res = response.json()
924         for node in res['network'][0]['node']:
925             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
926                 tpList = node['ietf-network-topology:termination-point']
927                 for tp in tpList:
928                     if tp['tp-id'] == 'XPDR1-NETWORK1':
929                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
930                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
931                         self.assertNotIn(
932                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
933
934     def test_54_delete_OCH_OTU4_service(self):
935         response = test_utils.service_delete_request("service1-OCH-OTU4")
936         self.assertEqual(response.status_code, requests.codes.ok)
937         res = response.json()
938         self.assertIn('Renderer service delete in progress',
939                       res['output']['configuration-response-common']['response-message'])
940         time.sleep(self.WAITING)
941
942     def test_55_get_no_service(self):
943         response = test_utils.get_service_list_request("")
944         self.assertEqual(response.status_code, requests.codes.conflict)
945         res = response.json()
946         self.assertIn(
947             {"error-type": "application", "error-tag": "data-missing",
948              "error-message": "Request could not be completed because the relevant data model content does not exist"},
949             res['errors']['error'])
950         time.sleep(1)
951
952     def test_56_check_no_interface_OTU4_spdra(self):
953         response = test_utils.check_netconf_node_request(
954             "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
955         self.assertEqual(response.status_code, requests.codes.conflict)
956
957     def test_57_check_no_interface_OCH_spdra(self):
958         response = test_utils.check_netconf_node_request(
959             "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
960         self.assertEqual(response.status_code, requests.codes.conflict)
961
962     def test_58_getLinks_OtnTopology(self):
963         response = test_utils.get_otn_topo_request()
964         self.assertEqual(response.status_code, requests.codes.ok)
965         res = response.json()
966         self.assertNotIn('ietf-network-topology:link', res['network'][0])
967
968     def test_59_check_openroadm_topo_spdra(self):
969         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
970         self.assertEqual(response.status_code, requests.codes.ok)
971         res = response.json()
972         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
973         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
974         self.assertNotIn('wavelength', dict.keys(
975             tp['org-openroadm-network-topology:xpdr-network-attributes']))
976         time.sleep(3)
977
978     def test_60_check_openroadm_topo_ROADMA_SRG(self):
979         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
980         self.assertEqual(response.status_code, requests.codes.ok)
981         res = response.json()
982         freq_map = base64.b64decode(
983             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
984         freq_map_array = [int(x) for x in freq_map]
985         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
986         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
987         for ele in liste_tp:
988             if ele['tp-id'] == 'SRG1-PP1-TXRX':
989                 freq_map = base64.b64decode(
990                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
991                 freq_map_array = [int(x) for x in freq_map]
992                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
993         time.sleep(3)
994
995     def test_61_check_openroadm_topo_ROADMA_DEG(self):
996         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
997         self.assertEqual(response.status_code, requests.codes.ok)
998         res = response.json()
999         freq_map = base64.b64decode(
1000             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1001         freq_map_array = [int(x) for x in freq_map]
1002         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1003         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1004         for ele in liste_tp:
1005             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1006                 freq_map = base64.b64decode(
1007                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1008                 freq_map_array = [int(x) for x in freq_map]
1009                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1010             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1011                 freq_map = base64.b64decode(
1012                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1013                 freq_map_array = [int(x) for x in freq_map]
1014                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1015         time.sleep(3)
1016
1017     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1018         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1019                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
1020         self.assertEqual(response.status_code, requests.codes.ok)
1021         res = response.json()
1022         self.assertIn('Xponder Roadm Link created successfully',
1023                       res["output"]["result"])
1024         time.sleep(2)
1025
1026     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1027         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1028                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
1029         self.assertEqual(response.status_code, requests.codes.ok)
1030         res = response.json()
1031         self.assertIn('Roadm Xponder links created successfully',
1032                       res["output"]["result"])
1033         time.sleep(2)
1034
1035     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1036         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1037                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
1038         self.assertEqual(response.status_code, requests.codes.ok)
1039         res = response.json()
1040         self.assertIn('Xponder Roadm Link created successfully',
1041                       res["output"]["result"])
1042         time.sleep(2)
1043
1044     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1045         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1046                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
1047         self.assertEqual(response.status_code, requests.codes.ok)
1048         res = response.json()
1049         self.assertIn('Roadm Xponder links created successfully',
1050                       res["output"]["result"])
1051         time.sleep(2)
1052
1053     def test_66_create_OCH_OTU4_service_2(self):
1054         # pylint: disable=line-too-long
1055         self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1056         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1057         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1058         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1059         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1060         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1061         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1062         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1063         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1064         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1065         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1066         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1067         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1068         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1069         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1070         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1071         response = test_utils.service_create_request(self.cr_serv_sample_data)
1072         self.assertEqual(response.status_code, requests.codes.ok)
1073         res = response.json()
1074         self.assertIn('PCE calculation in progress',
1075                       res['output']['configuration-response-common']['response-message'])
1076         time.sleep(self.WAITING)
1077
1078     def test_67_get_OCH_OTU4_service2(self):
1079         response = test_utils.get_service_list_request(
1080             "services/service2-OCH-OTU4")
1081         self.assertEqual(response.status_code, requests.codes.ok)
1082         res = response.json()
1083         self.assertEqual(
1084             res['services'][0]['administrative-state'], 'inService')
1085         self.assertEqual(
1086             res['services'][0]['service-name'], 'service2-OCH-OTU4')
1087         self.assertEqual(
1088             res['services'][0]['connection-type'], 'infrastructure')
1089         self.assertEqual(
1090             res['services'][0]['lifecycle-state'], 'planned')
1091         time.sleep(2)
1092
1093     def test_68_create_ODU4_service_2(self):
1094         # pylint: disable=line-too-long
1095         self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1096         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1097         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1098         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1099         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1100         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1101         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1102         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1103         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1104         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1105         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1106         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1107         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1108         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1109         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1110
1111         response = test_utils.service_create_request(self.cr_serv_sample_data)
1112         self.assertEqual(response.status_code, requests.codes.ok)
1113         res = response.json()
1114         self.assertIn('PCE calculation in progress',
1115                       res['output']['configuration-response-common']['response-message'])
1116         time.sleep(self.WAITING)
1117
1118     def test_69_get_ODU4_service2(self):
1119         response = test_utils.get_service_list_request(
1120             "services/service2-ODU4")
1121         self.assertEqual(response.status_code, requests.codes.ok)
1122         res = response.json()
1123         self.assertEqual(
1124             res['services'][0]['administrative-state'], 'inService')
1125         self.assertEqual(
1126             res['services'][0]['service-name'], 'service2-ODU4')
1127         self.assertEqual(
1128             res['services'][0]['connection-type'], 'infrastructure')
1129         self.assertEqual(
1130             res['services'][0]['lifecycle-state'], 'planned')
1131         time.sleep(2)
1132
1133     def test_70_create_1GE_service(self):
1134         # pylint: disable=line-too-long
1135         self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1136         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1137         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1138         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1139         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1140         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1141         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1142         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1143         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1144         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1145         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1146         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1147         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1148         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1149         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1150         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1151         response = test_utils.service_create_request(self.cr_serv_sample_data)
1152         self.assertEqual(response.status_code, requests.codes.ok)
1153         res = response.json()
1154         self.assertIn('PCE calculation in progress',
1155                       res['output']['configuration-response-common']['response-message'])
1156         time.sleep(self.WAITING)
1157
1158     def test_71_get_1GE_service1(self):
1159         response = test_utils.get_service_list_request("services/service1-1GE")
1160         self.assertEqual(response.status_code, requests.codes.ok)
1161         res = response.json()
1162         self.assertEqual(
1163             res['services'][0]['administrative-state'], 'inService')
1164         self.assertEqual(
1165             res['services'][0]['service-name'], 'service1-1GE')
1166         self.assertEqual(
1167             res['services'][0]['connection-type'], 'service')
1168         self.assertEqual(
1169             res['services'][0]['lifecycle-state'], 'planned')
1170         time.sleep(2)
1171
1172     def test_72_check_interface_1GE_CLIENT_spdra(self):
1173         response = test_utils.check_netconf_node_request(
1174             "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1175         self.assertEqual(response.status_code, requests.codes.ok)
1176         res = response.json()
1177         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1178                       'administrative-state': 'inService',
1179                       'supporting-circuit-pack-name': 'CP1-SFP4',
1180                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1181                       'supporting-port': 'CP1-SFP4-P1'
1182                       }
1183         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1184                              res['interface'][0])
1185         self.assertDictEqual(
1186             {'speed': 1000},
1187             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1188
1189     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1190         response = test_utils.check_netconf_node_request(
1191             "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1192         self.assertEqual(response.status_code, requests.codes.ok)
1193         res = response.json()
1194         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1195                         'administrative-state': 'inService',
1196                         'supporting-circuit-pack-name': 'CP3-SFP1',
1197                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1198                         'type': 'org-openroadm-interfaces:otnOdu',
1199                         'supporting-port': 'CP3-SFP1-P1'}
1200         input_dict_2 = {
1201             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1202             'rate': 'org-openroadm-otn-common-types:ODU0',
1203             'monitoring-mode': 'terminated'}
1204
1205         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1206                              res['interface'][0])
1207         self.assertDictEqual(dict(input_dict_2,
1208                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1209                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1210         self.assertDictEqual(
1211             {'payload-type': '07', 'exp-payload-type': '07'},
1212             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1213
1214     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1215         response = test_utils.check_netconf_node_request(
1216             "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1217         self.assertEqual(response.status_code, requests.codes.ok)
1218         res = response.json()
1219         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1220                         'administrative-state': 'inService',
1221                         'supporting-circuit-pack-name': 'CP3-CFP0',
1222                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1223                         'type': 'org-openroadm-interfaces:otnOdu',
1224                         'supporting-port': 'CP3-CFP0-P1'}
1225         input_dict_2 = {
1226             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1227             'rate': 'org-openroadm-otn-common-types:ODU0',
1228             'monitoring-mode': 'monitored'}
1229         input_dict_3 = {'trib-port-number': 1}
1230
1231         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1232                              res['interface'][0])
1233         self.assertDictEqual(dict(input_dict_2,
1234                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1235                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1236         self.assertDictEqual(dict(input_dict_3,
1237                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1238                                       'parent-odu-allocation']),
1239                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1240         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1241                       ['trib-slots'])
1242
1243     def test_75_check_ODU0_connection_spdra(self):
1244         response = test_utils.check_netconf_node_request(
1245             "SPDR-SA1",
1246             "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1247         self.assertEqual(response.status_code, requests.codes.ok)
1248         res = response.json()
1249         input_dict_1 = {
1250             'connection-name':
1251             'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1252             'direction': 'bidirectional'
1253         }
1254
1255         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1256                              res['odu-connection'][0])
1257         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1258                              res['odu-connection'][0]['destination'])
1259         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1260                              res['odu-connection'][0]['source'])
1261
1262     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1263         response = test_utils.check_netconf_node_request(
1264             "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1265         self.assertEqual(response.status_code, requests.codes.ok)
1266         res = response.json()
1267         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1268                       'administrative-state': 'inService',
1269                       'supporting-circuit-pack-name': 'CP3-SFP1',
1270                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1271                       'supporting-port': 'CP3-SFP1-P1'
1272                       }
1273         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1274                              res['interface'][0])
1275         self.assertDictEqual(
1276             {'speed': 1000},
1277             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1278
1279     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1280         response = test_utils.check_netconf_node_request(
1281             "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1282         self.assertEqual(response.status_code, requests.codes.ok)
1283         res = response.json()
1284         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1285                         'administrative-state': 'inService',
1286                         'supporting-circuit-pack-name': 'CP3-SFP1',
1287                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1288                         'type': 'org-openroadm-interfaces:otnOdu',
1289                         'supporting-port': 'CP3-SFP1-P1'}
1290         input_dict_2 = {
1291             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1292             'rate': 'org-openroadm-otn-common-types:ODU0',
1293             'monitoring-mode': 'terminated'}
1294
1295         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1296                              res['interface'][0])
1297         self.assertDictEqual(dict(input_dict_2,
1298                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1299                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1300         self.assertDictEqual(
1301             {'payload-type': '07', 'exp-payload-type': '07'},
1302             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1303
1304     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1305         response = test_utils.check_netconf_node_request(
1306             "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1307         self.assertEqual(response.status_code, requests.codes.ok)
1308         res = response.json()
1309         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1310                         'administrative-state': 'inService',
1311                         'supporting-circuit-pack-name': 'CP3-CFP0',
1312                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1313                         'type': 'org-openroadm-interfaces:otnOdu',
1314                         'supporting-port': 'CP3-CFP0-P1'}
1315         input_dict_2 = {
1316             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1317             'rate': 'org-openroadm-otn-common-types:ODU0',
1318             'monitoring-mode': 'monitored'}
1319
1320         input_dict_3 = {'trib-port-number': 1}
1321
1322         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1323                              res['interface'][0])
1324         self.assertDictEqual(dict(input_dict_2,
1325                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1326                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1327         self.assertDictEqual(dict(input_dict_3,
1328                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1329                                       'parent-odu-allocation']),
1330                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1331             'parent-odu-allocation'])
1332         self.assertIn(1,
1333                       res['interface'][0][
1334                           'org-openroadm-otn-odu-interfaces:odu'][
1335                           'parent-odu-allocation']['trib-slots'])
1336
1337     def test_79_check_ODU0_connection_spdrc(self):
1338         response = test_utils.check_netconf_node_request(
1339             "SPDR-SC1",
1340             "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1341         self.assertEqual(response.status_code, requests.codes.ok)
1342         res = response.json()
1343         input_dict_1 = {
1344             'connection-name':
1345             'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1346             'direction': 'bidirectional'
1347         }
1348
1349         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1350                              res['odu-connection'][0])
1351         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1352                              res['odu-connection'][0]['destination'])
1353         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1354                              res['odu-connection'][0]['source'])
1355
1356     def test_80_check_otn_topo_links(self):
1357         response = test_utils.get_otn_topo_request()
1358         self.assertEqual(response.status_code, requests.codes.ok)
1359         res = response.json()
1360         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1361         self.assertEqual(nb_links, 4)
1362         for link in res['network'][0]['ietf-network-topology:link']:
1363             linkId = link['link-id']
1364             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1365                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1366                 self.assertEqual(
1367                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1368                 self.assertEqual(
1369                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1370
1371     def test_81_check_otn_topo_tp(self):
1372         response = test_utils.get_otn_topo_request()
1373         res = response.json()
1374         for node in res['network'][0]['node']:
1375             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1376                 tpList = node['ietf-network-topology:termination-point']
1377                 for tp in tpList:
1378                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1379                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1380                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1381                         tsPoolList = list(range(1, 2))
1382                         self.assertNotIn(
1383                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1384                         self.assertEqual(
1385                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1386                         self.assertNotIn(
1387                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1388
1389     def test_82_delete_1GE_service(self):
1390         response = test_utils.service_delete_request("service1-1GE")
1391         self.assertEqual(response.status_code, requests.codes.ok)
1392         res = response.json()
1393         self.assertIn('Renderer service delete in progress',
1394                       res['output']['configuration-response-common']['response-message'])
1395         time.sleep(self.WAITING)
1396
1397     def test_83_check_service_list(self):
1398         response = test_utils.get_service_list_request("")
1399         self.assertEqual(response.status_code, requests.codes.ok)
1400         res = response.json()
1401         self.assertEqual(len(res['service-list']['services']), 2)
1402         time.sleep(2)
1403
1404     def test_84_check_no_ODU0_connection_spdra(self):
1405         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1406         self.assertEqual(response.status_code, requests.codes.ok)
1407         res = response.json()
1408         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1409         time.sleep(1)
1410
1411     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1412         response = test_utils.check_netconf_node_request(
1413             "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1414         self.assertEqual(response.status_code, requests.codes.conflict)
1415
1416     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1417         response = test_utils.check_netconf_node_request(
1418             "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1419         self.assertEqual(response.status_code, requests.codes.conflict)
1420
1421     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1422         response = test_utils.check_netconf_node_request(
1423             "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1424         self.assertEqual(response.status_code, requests.codes.conflict)
1425
1426     def test_88_check_otn_topo_links(self):
1427         response = test_utils.get_otn_topo_request()
1428         self.assertEqual(response.status_code, requests.codes.ok)
1429         res = response.json()
1430         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1431         self.assertEqual(nb_links, 4)
1432         for link in res['network'][0]['ietf-network-topology:link']:
1433             linkId = link['link-id']
1434             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1435                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1436                 self.assertEqual(
1437                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1438                 self.assertEqual(
1439                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1440
1441     def test_89_check_otn_topo_tp(self):
1442         response = test_utils.get_otn_topo_request()
1443         res = response.json()
1444         for node in res['network'][0]['node']:
1445             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1446                 tpList = node['ietf-network-topology:termination-point']
1447                 for tp in tpList:
1448                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1449                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1450                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1451                         self.assertEqual(
1452                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1453
1454     def test_90_delete_ODU4_service(self):
1455         response = test_utils.service_delete_request("service2-ODU4")
1456         self.assertEqual(response.status_code, requests.codes.ok)
1457         res = response.json()
1458         self.assertIn('Renderer service delete in progress',
1459                       res['output']['configuration-response-common']['response-message'])
1460         time.sleep(self.WAITING)
1461
1462     def test_91_delete_OCH_OTU4_service(self):
1463         response = test_utils.service_delete_request("service2-OCH-OTU4")
1464         self.assertEqual(response.status_code, requests.codes.ok)
1465         res = response.json()
1466         self.assertIn('Renderer service delete in progress',
1467                       res['output']['configuration-response-common']['response-message'])
1468         time.sleep(self.WAITING)
1469
1470     def test_92_disconnect_xponders_from_roadm(self):
1471         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1472         response = test_utils.get_ordm_topo_request("")
1473         self.assertEqual(response.status_code, requests.codes.ok)
1474         res = response.json()
1475         links = res['network'][0]['ietf-network-topology:link']
1476         for link in links:
1477             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1478                 link_name = link["link-id"]
1479                 response = test_utils.delete_request(url+link_name)
1480                 self.assertEqual(response.status_code, requests.codes.ok)
1481
1482     def test_93_check_openroadm_topology(self):
1483         response = test_utils.get_ordm_topo_request("")
1484         self.assertEqual(response.status_code, requests.codes.ok)
1485         res = response.json()
1486         links = res['network'][0]['ietf-network-topology:link']
1487         self.assertEqual(18, len(links), 'Topology should contain 18 links')
1488
1489     def test_94_disconnect_spdrA(self):
1490         response = test_utils.unmount_device("SPDR-SA1")
1491         self.assertEqual(response.status_code, requests.codes.ok,
1492                          test_utils.CODE_SHOULD_BE_200)
1493
1494     def test_95_disconnect_spdrC(self):
1495         response = test_utils.unmount_device("SPDR-SC1")
1496         self.assertEqual(response.status_code, requests.codes.ok,
1497                          test_utils.CODE_SHOULD_BE_200)
1498
1499     def test_96_disconnect_roadmA(self):
1500         response = test_utils.unmount_device("ROADM-A1")
1501         self.assertEqual(response.status_code, requests.codes.ok,
1502                          test_utils.CODE_SHOULD_BE_200)
1503
1504     def test_97_disconnect_roadmC(self):
1505         response = test_utils.unmount_device("ROADM-C1")
1506         self.assertEqual(response.status_code, requests.codes.ok,
1507                          test_utils.CODE_SHOULD_BE_200)
1508
1509
1510 if __name__ == "__main__":
1511     unittest.main(verbosity=2)