Manage ODU4 services over multiple OTU4
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test11_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_sample_data = {"input": {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service1-OCH-OTU4",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "subrate-eth-sla": {
50                     "subrate-eth-sla": {
51                         "committed-info-rate": "100000",
52                         "committed-burst-size": "64"
53                     }
54             },
55             "tx-direction": {
56                 "port": {
57                     "port-device-name": "SPDR-SA1-XPDR1",
58                     "port-type": "fixed",
59                     "port-name": "XPDR1-NETWORK1",
60                     "port-rack": "000000.00",
61                     "port-shelf": "Chassis#1"
62                 },
63                 "lgx": {
64                     "lgx-device-name": "Some lgx-device-name",
65                     "lgx-port-name": "Some lgx-port-name",
66                     "lgx-port-rack": "000000.00",
67                     "lgx-port-shelf": "00"
68                 }
69             },
70             "rx-direction": {
71                 "port": {
72                     "port-device-name": "SPDR-SA1-XPDR1",
73                     "port-type": "fixed",
74                     "port-name": "XPDR1-NETWORK1",
75                     "port-rack": "000000.00",
76                     "port-shelf": "Chassis#1"
77                 },
78                 "lgx": {
79                     "lgx-device-name": "Some lgx-device-name",
80                     "lgx-port-name": "Some lgx-port-name",
81                     "lgx-port-rack": "000000.00",
82                     "lgx-port-shelf": "00"
83                 }
84             },
85             "optic-type": "gray"
86         },
87         "service-z-end": {
88             "service-rate": "100",
89             "node-id": "SPDR-SC1",
90             "service-format": "OTU",
91             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92             "clli": "NodeSC",
93             "subrate-eth-sla": {
94                     "subrate-eth-sla": {
95                         "committed-info-rate": "100000",
96                         "committed-burst-size": "64"
97                     }
98             },
99             "tx-direction": {
100                 "port": {
101                     "port-device-name": "SPDR-SC1-XPDR1",
102                     "port-type": "fixed",
103                     "port-name": "XPDR1-NETWORK1",
104                     "port-rack": "000000.00",
105                     "port-shelf": "Chassis#1"
106                 },
107                 "lgx": {
108                     "lgx-device-name": "Some lgx-device-name",
109                     "lgx-port-name": "Some lgx-port-name",
110                     "lgx-port-rack": "000000.00",
111                     "lgx-port-shelf": "00"
112                 }
113             },
114             "rx-direction": {
115                 "port": {
116                     "port-device-name": "SPDR-SC1-XPDR1",
117                     "port-type": "fixed",
118                     "port-name": "XPDR1-NETWORK1",
119                     "port-rack": "000000.00",
120                     "port-shelf": "Chassis#1"
121                 },
122                 "lgx": {
123                     "lgx-device-name": "Some lgx-device-name",
124                     "lgx-port-name": "Some lgx-port-name",
125                     "lgx-port-rack": "000000.00",
126                     "lgx-port-shelf": "00"
127                 }
128             },
129             "optic-type": "gray"
130         },
131         "due-date": "2018-06-15T00:00:01Z",
132         "operator-contact": "pw1234"
133     }
134     }
135
136     @classmethod
137     def setUpClass(cls):
138         cls.processes = test_utils.start_tpce()
139         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140                                                ('roadma', cls.NODE_VERSION),
141                                                ('roadmc', cls.NODE_VERSION),
142                                                ('spdrc', cls.NODE_VERSION)])
143
144     @classmethod
145     def tearDownClass(cls):
146         # pylint: disable=not-an-iterable
147         for process in cls.processes:
148             test_utils.shutdown_process(process)
149         print("all processes killed")
150
151     def setUp(self):
152         time.sleep(5)
153
154     def test_01_connect_spdrA(self):
155         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
156         self.assertEqual(response.status_code,
157                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
158
159     def test_02_connect_spdrC(self):
160         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
161         self.assertEqual(response.status_code,
162                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
163
164     def test_03_connect_rdmA(self):
165         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
166         self.assertEqual(response.status_code,
167                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
168
169     def test_04_connect_rdmC(self):
170         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
171         self.assertEqual(response.status_code,
172                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
173
174     def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
175         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
176                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
177         self.assertEqual(response.status_code, requests.codes.ok)
178         res = response.json()
179         self.assertIn('Xponder Roadm Link created successfully',
180                       res["output"]["result"])
181         time.sleep(2)
182
183     def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
184         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
185                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
186         self.assertEqual(response.status_code, requests.codes.ok)
187         res = response.json()
188         self.assertIn('Roadm Xponder links created successfully',
189                       res["output"]["result"])
190         time.sleep(2)
191
192     def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
193         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
194                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
195         self.assertEqual(response.status_code, requests.codes.ok)
196         res = response.json()
197         self.assertIn('Xponder Roadm Link created successfully',
198                       res["output"]["result"])
199         time.sleep(2)
200
201     def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
202         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
203                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
204         self.assertEqual(response.status_code, requests.codes.ok)
205         res = response.json()
206         self.assertIn('Roadm Xponder links created successfully',
207                       res["output"]["result"])
208         time.sleep(2)
209
210     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
211         # Config ROADMA-ROADMC oms-attributes
212         data = {"span": {
213             "auto-spanloss": "true",
214             "spanloss-base": 11.4,
215             "spanloss-current": 12,
216             "engineered-spanloss": 12.2,
217             "link-concatenation": [{
218                 "SRLG-Id": 0,
219                 "fiber-type": "smf",
220                 "SRLG-length": 100000,
221                 "pmd": 0.5}]}}
222         response = test_utils.add_oms_attr_request(
223             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224         self.assertEqual(response.status_code, requests.codes.created)
225
226     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
227         # Config ROADMC-ROADMA oms-attributes
228         data = {"span": {
229             "auto-spanloss": "true",
230             "spanloss-base": 11.4,
231             "spanloss-current": 12,
232             "engineered-spanloss": 12.2,
233             "link-concatenation": [{
234                 "SRLG-Id": 0,
235                 "fiber-type": "smf",
236                 "SRLG-length": 100000,
237                 "pmd": 0.5}]}}
238         response = test_utils.add_oms_attr_request(
239             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240         self.assertEqual(response.status_code, requests.codes.created)
241
242 # test service-create for OCH-OTU4 service from spdr to spdr
243     def test_11_check_otn_topology(self):
244         response = test_utils.get_otn_topo_request()
245         self.assertEqual(response.status_code, requests.codes.ok)
246         res = response.json()
247         nbNode = len(res['network'][0]['node'])
248         self.assertEqual(nbNode, 6, 'There should be 6 nodes')
249         self.assertNotIn('ietf-network-topology:link', res['network'][0])
250
251     def test_12_create_OCH_OTU4_service(self):
252         response = test_utils.service_create_request(self.cr_serv_sample_data)
253         self.assertEqual(response.status_code, requests.codes.ok)
254         res = response.json()
255         self.assertIn('PCE calculation in progress',
256                       res['output']['configuration-response-common']['response-message'])
257         time.sleep(self.WAITING)
258
259     def test_13_get_OCH_OTU4_service1(self):
260         response = test_utils.get_service_list_request(
261             "services/service1-OCH-OTU4")
262         self.assertEqual(response.status_code, requests.codes.ok)
263         res = response.json()
264         self.assertEqual(
265             res['services'][0]['administrative-state'], 'inService')
266         self.assertEqual(
267             res['services'][0]['service-name'], 'service1-OCH-OTU4')
268         self.assertEqual(
269             res['services'][0]['connection-type'], 'infrastructure')
270         self.assertEqual(
271             res['services'][0]['lifecycle-state'], 'planned')
272         time.sleep(2)
273
274     # Check correct configuration of devices
275     def test_14_check_interface_och_spdra(self):
276         response = test_utils.check_netconf_node_request(
277             "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
278         self.assertEqual(response.status_code, requests.codes.ok)
279         res = response.json()
280         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
281                                    'administrative-state': 'inService',
282                                    'supporting-circuit-pack-name': 'CP1-CFP0',
283                                    'type': 'org-openroadm-interfaces:opticalChannel',
284                                    'supporting-port': 'CP1-CFP0-P1'
285                                    }, **res['interface'][0]),
286                              res['interface'][0])
287
288         self.assertDictEqual(
289             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
290              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
291             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
292
293     def test_15_check_interface_OTU4_spdra(self):
294         response = test_utils.check_netconf_node_request(
295             "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
296         self.assertEqual(response.status_code, requests.codes.ok)
297         res = response.json()
298         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
299                         'administrative-state': 'inService',
300                         'supporting-circuit-pack-name': 'CP1-CFP0',
301                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
302                         'type': 'org-openroadm-interfaces:otnOtu',
303                         'supporting-port': 'CP1-CFP0-P1'
304                         }
305         input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
306                         'expected-dapi': 'H/OelLynehI=',
307                         'tx-dapi': 'AMf1n5hK6Xkk',
308                         'expected-sapi': 'AMf1n5hK6Xkk',
309                         'rate': 'org-openroadm-otn-common-types:OTU4',
310                         'fec': 'scfec'
311                         }
312         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
313                              res['interface'][0])
314
315         self.assertDictEqual(input_dict_2,
316                              res['interface'][0]
317                              ['org-openroadm-otn-otu-interfaces:otu'])
318
319     def test_16_check_interface_och_spdrc(self):
320         response = test_utils.check_netconf_node_request(
321             "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
322         self.assertEqual(response.status_code, requests.codes.ok)
323         res = response.json()
324         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
325                                    'administrative-state': 'inService',
326                                    'supporting-circuit-pack-name': 'CP1-CFP0',
327                                    'type': 'org-openroadm-interfaces:opticalChannel',
328                                    'supporting-port': 'CP1-CFP0-P1'
329                                    }, **res['interface'][0]),
330                              res['interface'][0])
331
332         self.assertDictEqual(
333             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
334              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
335             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
336
337     def test_17_check_interface_OTU4_spdrc(self):
338         response = test_utils.check_netconf_node_request(
339             "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
340         self.assertEqual(response.status_code, requests.codes.ok)
341         res = response.json()
342         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
343                         'administrative-state': 'inService',
344                         'supporting-circuit-pack-name': 'CP1-CFP0',
345                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
346                         'type': 'org-openroadm-interfaces:otnOtu',
347                         'supporting-port': 'CP1-CFP0-P1'
348                         }
349         input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
350                         'expected-sapi': 'H/OelLynehI=',
351                         'tx-sapi': 'AMf1n5hK6Xkk',
352                         'expected-dapi': 'AMf1n5hK6Xkk',
353                         'rate': 'org-openroadm-otn-common-types:OTU4',
354                         'fec': 'scfec'
355                         }
356
357         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
358                              res['interface'][0])
359
360         self.assertDictEqual(input_dict_2,
361                              res['interface'][0]
362                              ['org-openroadm-otn-otu-interfaces:otu'])
363
364     def test_18_check_no_interface_ODU4_spdra(self):
365         response = test_utils.check_netconf_node_request(
366             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
367         self.assertEqual(response.status_code, requests.codes.conflict)
368         res = response.json()
369         self.assertIn(
370             {"error-type": "application", "error-tag": "data-missing",
371              "error-message": "Request could not be completed because the relevant data model content does not exist"},
372             res['errors']['error'])
373
374     def test_19_check_openroadm_topo_spdra(self):
375         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
376         self.assertEqual(response.status_code, requests.codes.ok)
377         res = response.json()
378         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
379         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
380         self.assertEqual({'frequency': 196.1,
381                           'width': 40},
382                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
383         time.sleep(3)
384
385     def test_20_check_openroadm_topo_ROADMA_SRG(self):
386         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
387         self.assertEqual(response.status_code, requests.codes.ok)
388         res = response.json()
389         freq_map = base64.b64decode(
390             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
391         freq_map_array = [int(x) for x in freq_map]
392         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
393         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
394         for ele in liste_tp:
395             if ele['tp-id'] == 'SRG1-PP1-TXRX':
396                 freq_map = base64.b64decode(
397                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
398                 freq_map_array = [int(x) for x in freq_map]
399                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
400             if ele['tp-id'] == 'SRG1-PP2-TXRX':
401                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
402         time.sleep(3)
403
404     def test_21_check_openroadm_topo_ROADMA_DEG(self):
405         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
406         self.assertEqual(response.status_code, requests.codes.ok)
407         res = response.json()
408         freq_map = base64.b64decode(
409             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
410         freq_map_array = [int(x) for x in freq_map]
411         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
412         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
413         for ele in liste_tp:
414             if ele['tp-id'] == 'DEG2-CTP-TXRX':
415                 freq_map = base64.b64decode(
416                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
417                 freq_map_array = [int(x) for x in freq_map]
418                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
419             if ele['tp-id'] == 'DEG2-TTP-TXRX':
420                 freq_map = base64.b64decode(
421                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
422                 freq_map_array = [int(x) for x in freq_map]
423                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
424         time.sleep(3)
425
426     def test_22_check_otn_topo_otu4_links(self):
427         response = test_utils.get_otn_topo_request()
428         self.assertEqual(response.status_code, requests.codes.ok)
429         res = response.json()
430         nb_links = len(res['network'][0]['ietf-network-topology:link'])
431         self.assertEqual(nb_links, 2)
432         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
433                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
434         for link in res['network'][0]['ietf-network-topology:link']:
435             self.assertIn(link['link-id'], listLinkId)
436             self.assertEqual(
437                 link['transportpce-topology:otn-link-type'], 'OTU4')
438             self.assertEqual(
439                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
440             self.assertEqual(
441                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
442             self.assertEqual(
443                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
444             self.assertIn(
445                 link['org-openroadm-common-network:opposite-link'], listLinkId)
446
447 # test service-create for ODU4 service from spdr to spdr
448     def test_23_create_ODU4_service(self):
449         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
450         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
451         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
452         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
453         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
454         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
455         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
456
457         response = test_utils.service_create_request(self.cr_serv_sample_data)
458         self.assertEqual(response.status_code, requests.codes.ok)
459         res = response.json()
460         self.assertIn('PCE calculation in progress',
461                       res['output']['configuration-response-common']['response-message'])
462         time.sleep(self.WAITING)
463
464     def test_24_get_ODU4_service1(self):
465         response = test_utils.get_service_list_request(
466             "services/service1-ODU4")
467         self.assertEqual(response.status_code, requests.codes.ok)
468         res = response.json()
469         self.assertEqual(
470             res['services'][0]['administrative-state'], 'inService')
471         self.assertEqual(
472             res['services'][0]['service-name'], 'service1-ODU4')
473         self.assertEqual(
474             res['services'][0]['connection-type'], 'infrastructure')
475         self.assertEqual(
476             res['services'][0]['lifecycle-state'], 'planned')
477         time.sleep(2)
478
479     def test_25_check_interface_ODU4_spdra(self):
480         response = test_utils.check_netconf_node_request(
481             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
482         self.assertEqual(response.status_code, requests.codes.ok)
483         res = response.json()
484         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
485                         'administrative-state': 'inService',
486                         'supporting-circuit-pack-name': 'CP1-CFP0',
487                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
488                         'type': 'org-openroadm-interfaces:otnOdu',
489                         'supporting-port': 'CP1-CFP0-P1',
490                         'circuit-id': 'TBD',
491                         'description': 'TBD'}
492         # SAPI/DAPI are added in the Otu4 renderer
493         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
494                         'rate': 'org-openroadm-otn-common-types:ODU4',
495                         'monitoring-mode': 'terminated',
496                         'expected-dapi': 'H/OelLynehI=',
497                         'expected-sapi': 'AMf1n5hK6Xkk',
498                         'tx-dapi': 'AMf1n5hK6Xkk',
499                         'tx-sapi': 'H/OelLynehI='}
500
501         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
502                              res['interface'][0])
503         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
504                                   **input_dict_2),
505                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
506                              )
507         self.assertDictEqual(
508             {'payload-type': '21', 'exp-payload-type': '21'},
509             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
510
511     def test_26_check_interface_ODU4_spdrc(self):
512         response = test_utils.check_netconf_node_request(
513             "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
514         self.assertEqual(response.status_code, requests.codes.ok)
515         res = response.json()
516         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
517                         'administrative-state': 'inService',
518                         'supporting-circuit-pack-name': 'CP1-CFP0',
519                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
520                         'type': 'org-openroadm-interfaces:otnOdu',
521                         'supporting-port': 'CP1-CFP0-P1',
522                         'circuit-id': 'TBD',
523                         'description': 'TBD'}
524         # SAPI/DAPI are added in the Otu4 renderer
525         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
526                         'rate': 'org-openroadm-otn-common-types:ODU4',
527                         'monitoring-mode': 'terminated',
528                         'tx-sapi': 'AMf1n5hK6Xkk',
529                         'tx-dapi': 'H/OelLynehI=',
530                         'expected-sapi': 'H/OelLynehI=',
531                         'expected-dapi': 'AMf1n5hK6Xkk'
532                         }
533         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
534                              res['interface'][0])
535         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
536                                   **input_dict_2),
537                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
538                              )
539         self.assertDictEqual(
540             {'payload-type': '21', 'exp-payload-type': '21'},
541             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
542
543     def test_27_check_otn_topo_links(self):
544         response = test_utils.get_otn_topo_request()
545         self.assertEqual(response.status_code, requests.codes.ok)
546         res = response.json()
547         nb_links = len(res['network'][0]['ietf-network-topology:link'])
548         self.assertEqual(nb_links, 4)
549         for link in res['network'][0]['ietf-network-topology:link']:
550             linkId = link['link-id']
551             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
552                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
553                 self.assertEqual(
554                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
555                 self.assertEqual(
556                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
557             elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
558                              'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
559                 self.assertEqual(
560                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
561                 self.assertEqual(
562                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
563                 self.assertEqual(
564                     link['transportpce-topology:otn-link-type'], 'ODTU4')
565                 self.assertEqual(
566                     link['org-openroadm-common-network:link-type'], 'OTN-LINK')
567                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
568                               ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
569                                'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
570             else:
571                 self.fail("this link should not exist")
572
573     def test_28_check_otn_topo_tp(self):
574         response = test_utils.get_otn_topo_request()
575         res = response.json()
576         for node in res['network'][0]['node']:
577             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
578                 tpList = node['ietf-network-topology:termination-point']
579                 for tp in tpList:
580                     if tp['tp-id'] == 'XPDR1-NETWORK1':
581                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
582                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
583                         self.assertEqual(
584                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
585                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
586                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
587
588 # test service-create for 10GE service from spdr to spdr
589     def test_29_create_10GE_service(self):
590         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
591         self.cr_serv_sample_data["input"]["connection-type"] = "service"
592         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
593         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
594         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
595         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
596         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
597         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
598         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
599         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
600         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
601         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
602         response = test_utils.service_create_request(self.cr_serv_sample_data)
603         self.assertEqual(response.status_code, requests.codes.ok)
604         res = response.json()
605         self.assertIn('PCE calculation in progress',
606                       res['output']['configuration-response-common']['response-message'])
607         time.sleep(self.WAITING)
608
609     def test_30_get_10GE_service1(self):
610         response = test_utils.get_service_list_request(
611             "services/service1-10GE")
612         self.assertEqual(response.status_code, requests.codes.ok)
613         res = response.json()
614         self.assertEqual(
615             res['services'][0]['administrative-state'], 'inService')
616         self.assertEqual(
617             res['services'][0]['service-name'], 'service1-10GE')
618         self.assertEqual(
619             res['services'][0]['connection-type'], 'service')
620         self.assertEqual(
621             res['services'][0]['lifecycle-state'], 'planned')
622         time.sleep(2)
623
624     def test_31_check_interface_10GE_CLIENT_spdra(self):
625         response = test_utils.check_netconf_node_request(
626             "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
627         self.assertEqual(response.status_code, requests.codes.ok)
628         res = response.json()
629         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
630                       'administrative-state': 'inService',
631                       'supporting-circuit-pack-name': 'CP1-SFP4',
632                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
633                       'supporting-port': 'CP1-SFP4-P1'
634                       }
635         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
636                              res['interface'][0])
637         self.assertDictEqual(
638             {'speed': 10000},
639             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
640
641     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
642         response = test_utils.check_netconf_node_request(
643             "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
644         self.assertEqual(response.status_code, requests.codes.ok)
645         res = response.json()
646         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
647                         'administrative-state': 'inService',
648                         'supporting-circuit-pack-name': 'CP1-SFP4',
649                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
650                         'type': 'org-openroadm-interfaces:otnOdu',
651                         'supporting-port': 'CP1-SFP4-P1'}
652         input_dict_2 = {
653             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
654             'rate': 'org-openroadm-otn-common-types:ODU2e',
655             'monitoring-mode': 'terminated'}
656
657         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
658                              res['interface'][0])
659         self.assertDictEqual(dict(input_dict_2,
660                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
661                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
662         self.assertDictEqual(
663             {'payload-type': '03', 'exp-payload-type': '03'},
664             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
665
666     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
667         response = test_utils.check_netconf_node_request(
668             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
669         self.assertEqual(response.status_code, requests.codes.ok)
670         res = response.json()
671         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
672                         'administrative-state': 'inService',
673                         'supporting-circuit-pack-name': 'CP1-CFP0',
674                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
675                         'type': 'org-openroadm-interfaces:otnOdu',
676                         'supporting-port': 'CP1-CFP0-P1'}
677         input_dict_2 = {
678             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
679             'rate': 'org-openroadm-otn-common-types:ODU2e',
680             'monitoring-mode': 'monitored'}
681         input_dict_3 = {'trib-port-number': 1}
682
683         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
684                              res['interface'][0])
685         self.assertDictEqual(dict(input_dict_2,
686                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
687                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
688         self.assertDictEqual(dict(input_dict_3,
689                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
690                                       'parent-odu-allocation']),
691                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
692         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
693                       ['trib-slots'])
694
695     def test_34_check_ODU2E_connection_spdra(self):
696         response = test_utils.check_netconf_node_request(
697             "SPDR-SA1",
698             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
699         self.assertEqual(response.status_code, requests.codes.ok)
700         res = response.json()
701         input_dict_1 = {
702             'connection-name':
703             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
704             'direction': 'bidirectional'
705         }
706
707         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
708                              res['odu-connection'][0])
709         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
710                              res['odu-connection'][0]['destination'])
711         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
712                              res['odu-connection'][0]['source'])
713
714     def test_35_check_interface_10GE_CLIENT_spdrc(self):
715         response = test_utils.check_netconf_node_request(
716             "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
717         self.assertEqual(response.status_code, requests.codes.ok)
718         res = response.json()
719         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
720                       'administrative-state': 'inService',
721                       'supporting-circuit-pack-name': 'CP1-SFP4',
722                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
723                       'supporting-port': 'CP1-SFP4-P1'
724                       }
725         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
726                              res['interface'][0])
727         self.assertDictEqual(
728             {'speed': 10000},
729             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
730
731     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
732         response = test_utils.check_netconf_node_request(
733             "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
734         self.assertEqual(response.status_code, requests.codes.ok)
735         res = response.json()
736         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
737                         'administrative-state': 'inService',
738                         'supporting-circuit-pack-name': 'CP1-SFP4',
739                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
740                         'type': 'org-openroadm-interfaces:otnOdu',
741                         'supporting-port': 'CP1-SFP4-P1'}
742         input_dict_2 = {
743             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
744             'rate': 'org-openroadm-otn-common-types:ODU2e',
745             'monitoring-mode': 'terminated'}
746
747         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
748                              res['interface'][0])
749         self.assertDictEqual(dict(input_dict_2,
750                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
751                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
752         self.assertDictEqual(
753             {'payload-type': '03', 'exp-payload-type': '03'},
754             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
755
756     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
757         response = test_utils.check_netconf_node_request(
758             "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
759         self.assertEqual(response.status_code, requests.codes.ok)
760         res = response.json()
761         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
762                         'administrative-state': 'inService',
763                         'supporting-circuit-pack-name': 'CP1-CFP0',
764                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
765                         'type': 'org-openroadm-interfaces:otnOdu',
766                         'supporting-port': 'CP1-CFP0-P1'}
767         input_dict_2 = {
768             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
769             'rate': 'org-openroadm-otn-common-types:ODU2e',
770             'monitoring-mode': 'monitored'}
771
772         input_dict_3 = {'trib-port-number': 1}
773
774         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
775                              res['interface'][0])
776         self.assertDictEqual(dict(input_dict_2,
777                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
778                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
779         self.assertDictEqual(dict(input_dict_3,
780                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
781                                       'parent-odu-allocation']),
782                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
783             'parent-odu-allocation'])
784         self.assertIn(1,
785                       res['interface'][0][
786                           'org-openroadm-otn-odu-interfaces:odu'][
787                           'parent-odu-allocation']['trib-slots'])
788
789     def test_38_check_ODU2E_connection_spdrc(self):
790         response = test_utils.check_netconf_node_request(
791             "SPDR-SC1",
792             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
793         self.assertEqual(response.status_code, requests.codes.ok)
794         res = response.json()
795         input_dict_1 = {
796             'connection-name':
797             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
798             'direction': 'bidirectional'
799         }
800
801         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
802                              res['odu-connection'][0])
803         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
804                              res['odu-connection'][0]['destination'])
805         self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
806                              res['odu-connection'][0]['source'])
807
808     def test_39_check_otn_topo_links(self):
809         response = test_utils.get_otn_topo_request()
810         self.assertEqual(response.status_code, requests.codes.ok)
811         res = response.json()
812         nb_links = len(res['network'][0]['ietf-network-topology:link'])
813         self.assertEqual(nb_links, 4)
814         for link in res['network'][0]['ietf-network-topology:link']:
815             linkId = link['link-id']
816             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
817                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
818                 self.assertEqual(
819                     link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
820                 self.assertEqual(
821                     link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
822
823     def test_40_check_otn_topo_tp(self):
824         response = test_utils.get_otn_topo_request()
825         res = response.json()
826         for node in res['network'][0]['node']:
827             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
828                 tpList = node['ietf-network-topology:termination-point']
829                 for tp in tpList:
830                     if tp['tp-id'] == 'XPDR1-NETWORK1':
831                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
832                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
833                         tsPoolList = list(range(1, 9))
834                         self.assertNotIn(
835                             tsPoolList, xpdrTpPortConAt['ts-pool'])
836                         self.assertEqual(
837                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
838                         self.assertNotIn(
839                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
840
841     def test_41_delete_10GE_service(self):
842         response = test_utils.service_delete_request("service1-10GE")
843         self.assertEqual(response.status_code, requests.codes.ok)
844         res = response.json()
845         self.assertIn('Renderer service delete in progress',
846                       res['output']['configuration-response-common']['response-message'])
847         time.sleep(self.WAITING)
848
849     def test_42_check_service_list(self):
850         response = test_utils.get_service_list_request("")
851         self.assertEqual(response.status_code, requests.codes.ok)
852         res = response.json()
853         self.assertEqual(len(res['service-list']['services']), 2)
854         time.sleep(2)
855
856     def test_43_check_no_ODU2e_connection_spdra(self):
857         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
858         self.assertEqual(response.status_code, requests.codes.ok)
859         res = response.json()
860         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
861         time.sleep(1)
862
863     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
864         response = test_utils.check_netconf_node_request(
865             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
866         self.assertEqual(response.status_code, requests.codes.conflict)
867
868     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
869         response = test_utils.check_netconf_node_request(
870             "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
871         self.assertEqual(response.status_code, requests.codes.conflict)
872
873     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
874         response = test_utils.check_netconf_node_request(
875             "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
876         self.assertEqual(response.status_code, requests.codes.conflict)
877
878     def test_47_check_otn_topo_links(self):
879         response = test_utils.get_otn_topo_request()
880         self.assertEqual(response.status_code, requests.codes.ok)
881         res = response.json()
882         nb_links = len(res['network'][0]['ietf-network-topology:link'])
883         self.assertEqual(nb_links, 4)
884         for link in res['network'][0]['ietf-network-topology:link']:
885             linkId = link['link-id']
886             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
887                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
888                 self.assertEqual(
889                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
890                 self.assertEqual(
891                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
892
893     def test_48_check_otn_topo_tp(self):
894         response = test_utils.get_otn_topo_request()
895         res = response.json()
896         for node in res['network'][0]['node']:
897             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
898                 tpList = node['ietf-network-topology:termination-point']
899                 for tp in tpList:
900                     if tp['tp-id'] == 'XPDR1-NETWORK1':
901                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
902                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
903                         self.assertEqual(
904                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
905
906     def test_49_delete_ODU4_service(self):
907         response = test_utils.service_delete_request("service1-ODU4")
908         self.assertEqual(response.status_code, requests.codes.ok)
909         res = response.json()
910         self.assertIn('Renderer service delete in progress',
911                       res['output']['configuration-response-common']['response-message'])
912         time.sleep(self.WAITING)
913
914     def test_50_check_service_list(self):
915         response = test_utils.get_service_list_request("")
916         self.assertEqual(response.status_code, requests.codes.ok)
917         res = response.json()
918         self.assertEqual(len(res['service-list']['services']), 1)
919         time.sleep(2)
920
921     def test_51_check_no_interface_ODU4_spdra(self):
922         response = test_utils.check_netconf_node_request(
923             "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
924         self.assertEqual(response.status_code, requests.codes.conflict)
925
926     def test_52_check_otn_topo_links(self):
927         self.test_22_check_otn_topo_otu4_links()
928
929     def test_53_check_otn_topo_tp(self):
930         response = test_utils.get_otn_topo_request()
931         res = response.json()
932         for node in res['network'][0]['node']:
933             if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
934                 tpList = node['ietf-network-topology:termination-point']
935                 for tp in tpList:
936                     if tp['tp-id'] == 'XPDR1-NETWORK1':
937                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
938                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
939                         self.assertNotIn(
940                             'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
941
942     def test_54_delete_OCH_OTU4_service(self):
943         response = test_utils.service_delete_request("service1-OCH-OTU4")
944         self.assertEqual(response.status_code, requests.codes.ok)
945         res = response.json()
946         self.assertIn('Renderer service delete in progress',
947                       res['output']['configuration-response-common']['response-message'])
948         time.sleep(self.WAITING)
949
950     def test_55_get_no_service(self):
951         response = test_utils.get_service_list_request("")
952         self.assertEqual(response.status_code, requests.codes.conflict)
953         res = response.json()
954         self.assertIn(
955             {"error-type": "application", "error-tag": "data-missing",
956              "error-message": "Request could not be completed because the relevant data model content does not exist"},
957             res['errors']['error'])
958         time.sleep(1)
959
960     def test_56_check_no_interface_OTU4_spdra(self):
961         response = test_utils.check_netconf_node_request(
962             "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
963         self.assertEqual(response.status_code, requests.codes.conflict)
964
965     def test_57_check_no_interface_OCH_spdra(self):
966         response = test_utils.check_netconf_node_request(
967             "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
968         self.assertEqual(response.status_code, requests.codes.conflict)
969
970     def test_58_getLinks_OtnTopology(self):
971         response = test_utils.get_otn_topo_request()
972         self.assertEqual(response.status_code, requests.codes.ok)
973         res = response.json()
974         self.assertNotIn('ietf-network-topology:link', res['network'][0])
975
976     def test_59_check_openroadm_topo_spdra(self):
977         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
978         self.assertEqual(response.status_code, requests.codes.ok)
979         res = response.json()
980         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
981         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
982         self.assertNotIn('wavelength', dict.keys(
983             tp['org-openroadm-network-topology:xpdr-network-attributes']))
984         time.sleep(3)
985
986     def test_60_check_openroadm_topo_ROADMA_SRG(self):
987         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
988         self.assertEqual(response.status_code, requests.codes.ok)
989         res = response.json()
990         freq_map = base64.b64decode(
991             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
992         freq_map_array = [int(x) for x in freq_map]
993         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
994         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
995         for ele in liste_tp:
996             if ele['tp-id'] == 'SRG1-PP1-TXRX':
997                 freq_map = base64.b64decode(
998                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
999                 freq_map_array = [int(x) for x in freq_map]
1000                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1001         time.sleep(3)
1002
1003     def test_61_check_openroadm_topo_ROADMA_DEG(self):
1004         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1005         self.assertEqual(response.status_code, requests.codes.ok)
1006         res = response.json()
1007         freq_map = base64.b64decode(
1008             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1009         freq_map_array = [int(x) for x in freq_map]
1010         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1011         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1012         for ele in liste_tp:
1013             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1014                 freq_map = base64.b64decode(
1015                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1016                 freq_map_array = [int(x) for x in freq_map]
1017                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1018             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1019                 freq_map = base64.b64decode(
1020                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1021                 freq_map_array = [int(x) for x in freq_map]
1022                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1023         time.sleep(3)
1024
1025     def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1026         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1027                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
1028         self.assertEqual(response.status_code, requests.codes.ok)
1029         res = response.json()
1030         self.assertIn('Xponder Roadm Link created successfully',
1031                       res["output"]["result"])
1032         time.sleep(2)
1033
1034     def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1035         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1036                                                           "ROADM-A1", "1", "SRG1-PP2-TXRX")
1037         self.assertEqual(response.status_code, requests.codes.ok)
1038         res = response.json()
1039         self.assertIn('Roadm Xponder links created successfully',
1040                       res["output"]["result"])
1041         time.sleep(2)
1042
1043     def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1044         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1045                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
1046         self.assertEqual(response.status_code, requests.codes.ok)
1047         res = response.json()
1048         self.assertIn('Xponder Roadm Link created successfully',
1049                       res["output"]["result"])
1050         time.sleep(2)
1051
1052     def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1053         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1054                                                           "ROADM-C1", "1", "SRG1-PP2-TXRX")
1055         self.assertEqual(response.status_code, requests.codes.ok)
1056         res = response.json()
1057         self.assertIn('Roadm Xponder links created successfully',
1058                       res["output"]["result"])
1059         time.sleep(2)
1060
1061     def test_66_create_OCH_OTU4_service_2(self):
1062         # pylint: disable=line-too-long
1063         self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1064         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1065         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1066         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1067         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1068         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1069         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1070         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1071         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1072         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1073         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1074         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1075         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1076         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1077         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1078         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1079         response = test_utils.service_create_request(self.cr_serv_sample_data)
1080         self.assertEqual(response.status_code, requests.codes.ok)
1081         res = response.json()
1082         self.assertIn('PCE calculation in progress',
1083                       res['output']['configuration-response-common']['response-message'])
1084         time.sleep(self.WAITING)
1085
1086     def test_67_get_OCH_OTU4_service2(self):
1087         response = test_utils.get_service_list_request(
1088             "services/service2-OCH-OTU4")
1089         self.assertEqual(response.status_code, requests.codes.ok)
1090         res = response.json()
1091         self.assertEqual(
1092             res['services'][0]['administrative-state'], 'inService')
1093         self.assertEqual(
1094             res['services'][0]['service-name'], 'service2-OCH-OTU4')
1095         self.assertEqual(
1096             res['services'][0]['connection-type'], 'infrastructure')
1097         self.assertEqual(
1098             res['services'][0]['lifecycle-state'], 'planned')
1099         time.sleep(2)
1100
1101     def test_68_create_ODU4_service_2(self):
1102         # pylint: disable=line-too-long
1103         self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1104         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1105         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1106         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1107         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1108         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1109         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1110         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1111         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1112         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1113         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1114         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1115         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1116         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1117         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1118
1119         response = test_utils.service_create_request(self.cr_serv_sample_data)
1120         self.assertEqual(response.status_code, requests.codes.ok)
1121         res = response.json()
1122         self.assertIn('PCE calculation in progress',
1123                       res['output']['configuration-response-common']['response-message'])
1124         time.sleep(self.WAITING)
1125
1126     def test_69_get_ODU4_service2(self):
1127         response = test_utils.get_service_list_request(
1128             "services/service2-ODU4")
1129         self.assertEqual(response.status_code, requests.codes.ok)
1130         res = response.json()
1131         self.assertEqual(
1132             res['services'][0]['administrative-state'], 'inService')
1133         self.assertEqual(
1134             res['services'][0]['service-name'], 'service2-ODU4')
1135         self.assertEqual(
1136             res['services'][0]['connection-type'], 'infrastructure')
1137         self.assertEqual(
1138             res['services'][0]['lifecycle-state'], 'planned')
1139         time.sleep(2)
1140
1141     def test_70_create_1GE_service(self):
1142         # pylint: disable=line-too-long
1143         self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1144         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1145         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1146         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1147         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1148         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1149         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1150         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1151         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1152         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1153         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1154         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1155         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1156         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1157         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1158         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1159         response = test_utils.service_create_request(self.cr_serv_sample_data)
1160         self.assertEqual(response.status_code, requests.codes.ok)
1161         res = response.json()
1162         self.assertIn('PCE calculation in progress',
1163                       res['output']['configuration-response-common']['response-message'])
1164         time.sleep(self.WAITING)
1165
1166     def test_71_get_1GE_service1(self):
1167         response = test_utils.get_service_list_request("services/service1-1GE")
1168         self.assertEqual(response.status_code, requests.codes.ok)
1169         res = response.json()
1170         self.assertEqual(
1171             res['services'][0]['administrative-state'], 'inService')
1172         self.assertEqual(
1173             res['services'][0]['service-name'], 'service1-1GE')
1174         self.assertEqual(
1175             res['services'][0]['connection-type'], 'service')
1176         self.assertEqual(
1177             res['services'][0]['lifecycle-state'], 'planned')
1178         time.sleep(2)
1179
1180     def test_72_check_interface_1GE_CLIENT_spdra(self):
1181         response = test_utils.check_netconf_node_request(
1182             "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1183         self.assertEqual(response.status_code, requests.codes.ok)
1184         res = response.json()
1185         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1186                       'administrative-state': 'inService',
1187                       'supporting-circuit-pack-name': 'CP1-SFP4',
1188                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1189                       'supporting-port': 'CP1-SFP4-P1'
1190                       }
1191         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1192                              res['interface'][0])
1193         self.assertDictEqual(
1194             {'speed': 1000},
1195             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1196
1197     def test_73_check_interface_ODU0_CLIENT_spdra(self):
1198         response = test_utils.check_netconf_node_request(
1199             "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1200         self.assertEqual(response.status_code, requests.codes.ok)
1201         res = response.json()
1202         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1203                         'administrative-state': 'inService',
1204                         'supporting-circuit-pack-name': 'CP3-SFP1',
1205                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1206                         'type': 'org-openroadm-interfaces:otnOdu',
1207                         'supporting-port': 'CP3-SFP1-P1'}
1208         input_dict_2 = {
1209             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1210             'rate': 'org-openroadm-otn-common-types:ODU0',
1211             'monitoring-mode': 'terminated'}
1212
1213         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1214                              res['interface'][0])
1215         self.assertDictEqual(dict(input_dict_2,
1216                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1217                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1218         self.assertDictEqual(
1219             {'payload-type': '07', 'exp-payload-type': '07'},
1220             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1221
1222     def test_74_check_interface_ODU0_NETWORK_spdra(self):
1223         response = test_utils.check_netconf_node_request(
1224             "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1225         self.assertEqual(response.status_code, requests.codes.ok)
1226         res = response.json()
1227         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1228                         'administrative-state': 'inService',
1229                         'supporting-circuit-pack-name': 'CP3-CFP0',
1230                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1231                         'type': 'org-openroadm-interfaces:otnOdu',
1232                         'supporting-port': 'CP3-CFP0-P1'}
1233         input_dict_2 = {
1234             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1235             'rate': 'org-openroadm-otn-common-types:ODU0',
1236             'monitoring-mode': 'monitored'}
1237         input_dict_3 = {'trib-port-number': 1}
1238
1239         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1240                              res['interface'][0])
1241         self.assertDictEqual(dict(input_dict_2,
1242                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1243                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1244         self.assertDictEqual(dict(input_dict_3,
1245                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1246                                       'parent-odu-allocation']),
1247                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1248         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1249                       ['trib-slots'])
1250
1251     def test_75_check_ODU0_connection_spdra(self):
1252         response = test_utils.check_netconf_node_request(
1253             "SPDR-SA1",
1254             "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1255         self.assertEqual(response.status_code, requests.codes.ok)
1256         res = response.json()
1257         input_dict_1 = {
1258             'connection-name':
1259             'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1260             'direction': 'bidirectional'
1261         }
1262
1263         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1264                              res['odu-connection'][0])
1265         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1266                              res['odu-connection'][0]['destination'])
1267         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1268                              res['odu-connection'][0]['source'])
1269
1270     def test_76_check_interface_1GE_CLIENT_spdrc(self):
1271         response = test_utils.check_netconf_node_request(
1272             "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1273         self.assertEqual(response.status_code, requests.codes.ok)
1274         res = response.json()
1275         input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1276                       'administrative-state': 'inService',
1277                       'supporting-circuit-pack-name': 'CP3-SFP1',
1278                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1279                       'supporting-port': 'CP3-SFP1-P1'
1280                       }
1281         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1282                              res['interface'][0])
1283         self.assertDictEqual(
1284             {'speed': 1000},
1285             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1286
1287     def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1288         response = test_utils.check_netconf_node_request(
1289             "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1290         self.assertEqual(response.status_code, requests.codes.ok)
1291         res = response.json()
1292         input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1293                         'administrative-state': 'inService',
1294                         'supporting-circuit-pack-name': 'CP3-SFP1',
1295                         'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1296                         'type': 'org-openroadm-interfaces:otnOdu',
1297                         'supporting-port': 'CP3-SFP1-P1'}
1298         input_dict_2 = {
1299             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1300             'rate': 'org-openroadm-otn-common-types:ODU0',
1301             'monitoring-mode': 'terminated'}
1302
1303         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1304                              res['interface'][0])
1305         self.assertDictEqual(dict(input_dict_2,
1306                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1307                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1308         self.assertDictEqual(
1309             {'payload-type': '07', 'exp-payload-type': '07'},
1310             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1311
1312     def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1313         response = test_utils.check_netconf_node_request(
1314             "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1315         self.assertEqual(response.status_code, requests.codes.ok)
1316         res = response.json()
1317         input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1318                         'administrative-state': 'inService',
1319                         'supporting-circuit-pack-name': 'CP3-CFP0',
1320                         'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1321                         'type': 'org-openroadm-interfaces:otnOdu',
1322                         'supporting-port': 'CP3-CFP0-P1'}
1323         input_dict_2 = {
1324             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1325             'rate': 'org-openroadm-otn-common-types:ODU0',
1326             'monitoring-mode': 'monitored'}
1327
1328         input_dict_3 = {'trib-port-number': 1}
1329
1330         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1331                              res['interface'][0])
1332         self.assertDictEqual(dict(input_dict_2,
1333                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1334                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1335         self.assertDictEqual(dict(input_dict_3,
1336                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1337                                       'parent-odu-allocation']),
1338                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1339             'parent-odu-allocation'])
1340         self.assertIn(1,
1341                       res['interface'][0][
1342                           'org-openroadm-otn-odu-interfaces:odu'][
1343                           'parent-odu-allocation']['trib-slots'])
1344
1345     def test_79_check_ODU0_connection_spdrc(self):
1346         response = test_utils.check_netconf_node_request(
1347             "SPDR-SC1",
1348             "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1349         self.assertEqual(response.status_code, requests.codes.ok)
1350         res = response.json()
1351         input_dict_1 = {
1352             'connection-name':
1353             'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1354             'direction': 'bidirectional'
1355         }
1356
1357         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1358                              res['odu-connection'][0])
1359         self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1360                              res['odu-connection'][0]['destination'])
1361         self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1362                              res['odu-connection'][0]['source'])
1363
1364     def test_80_check_otn_topo_links(self):
1365         response = test_utils.get_otn_topo_request()
1366         self.assertEqual(response.status_code, requests.codes.ok)
1367         res = response.json()
1368         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1369         self.assertEqual(nb_links, 4)
1370         for link in res['network'][0]['ietf-network-topology:link']:
1371             linkId = link['link-id']
1372             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1373                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1374                 self.assertEqual(
1375                     link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1376                 self.assertEqual(
1377                     link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1378
1379     def test_81_check_otn_topo_tp(self):
1380         response = test_utils.get_otn_topo_request()
1381         res = response.json()
1382         for node in res['network'][0]['node']:
1383             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1384                 tpList = node['ietf-network-topology:termination-point']
1385                 for tp in tpList:
1386                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1387                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1388                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1389                         tsPoolList = list(range(1, 2))
1390                         self.assertNotIn(
1391                             tsPoolList, xpdrTpPortConAt['ts-pool'])
1392                         self.assertEqual(
1393                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1394                         self.assertNotIn(
1395                             1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1396
1397     def test_82_delete_1GE_service(self):
1398         response = test_utils.service_delete_request("service1-1GE")
1399         self.assertEqual(response.status_code, requests.codes.ok)
1400         res = response.json()
1401         self.assertIn('Renderer service delete in progress',
1402                       res['output']['configuration-response-common']['response-message'])
1403         time.sleep(self.WAITING)
1404
1405     def test_83_check_service_list(self):
1406         response = test_utils.get_service_list_request("")
1407         self.assertEqual(response.status_code, requests.codes.ok)
1408         res = response.json()
1409         self.assertEqual(len(res['service-list']['services']), 2)
1410         time.sleep(2)
1411
1412     def test_84_check_no_ODU0_connection_spdra(self):
1413         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1414         self.assertEqual(response.status_code, requests.codes.ok)
1415         res = response.json()
1416         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1417         time.sleep(1)
1418
1419     def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1420         response = test_utils.check_netconf_node_request(
1421             "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1422         self.assertEqual(response.status_code, requests.codes.conflict)
1423
1424     def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1425         response = test_utils.check_netconf_node_request(
1426             "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1427         self.assertEqual(response.status_code, requests.codes.conflict)
1428
1429     def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1430         response = test_utils.check_netconf_node_request(
1431             "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1432         self.assertEqual(response.status_code, requests.codes.conflict)
1433
1434     def test_88_check_otn_topo_links(self):
1435         response = test_utils.get_otn_topo_request()
1436         self.assertEqual(response.status_code, requests.codes.ok)
1437         res = response.json()
1438         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1439         self.assertEqual(nb_links, 4)
1440         for link in res['network'][0]['ietf-network-topology:link']:
1441             linkId = link['link-id']
1442             if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1443                            'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1444                 self.assertEqual(
1445                     link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1446                 self.assertEqual(
1447                     link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1448
1449     def test_89_check_otn_topo_tp(self):
1450         response = test_utils.get_otn_topo_request()
1451         res = response.json()
1452         for node in res['network'][0]['node']:
1453             if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1454                 tpList = node['ietf-network-topology:termination-point']
1455                 for tp in tpList:
1456                     if tp['tp-id'] == 'XPDR3-NETWORK1':
1457                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1458                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1459                         self.assertEqual(
1460                             len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1461
1462     def test_90_delete_ODU4_service(self):
1463         response = test_utils.service_delete_request("service2-ODU4")
1464         self.assertEqual(response.status_code, requests.codes.ok)
1465         res = response.json()
1466         self.assertIn('Renderer service delete in progress',
1467                       res['output']['configuration-response-common']['response-message'])
1468         time.sleep(self.WAITING)
1469
1470     def test_91_delete_OCH_OTU4_service(self):
1471         response = test_utils.service_delete_request("service2-OCH-OTU4")
1472         self.assertEqual(response.status_code, requests.codes.ok)
1473         res = response.json()
1474         self.assertIn('Renderer service delete in progress',
1475                       res['output']['configuration-response-common']['response-message'])
1476         time.sleep(self.WAITING)
1477
1478     def test_92_disconnect_xponders_from_roadm(self):
1479         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1480         response = test_utils.get_ordm_topo_request("")
1481         self.assertEqual(response.status_code, requests.codes.ok)
1482         res = response.json()
1483         links = res['network'][0]['ietf-network-topology:link']
1484         for link in links:
1485             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1486                 link_name = link["link-id"]
1487                 response = test_utils.delete_request(url+link_name)
1488                 self.assertEqual(response.status_code, requests.codes.ok)
1489
1490     def test_93_check_openroadm_topology(self):
1491         response = test_utils.get_ordm_topo_request("")
1492         self.assertEqual(response.status_code, requests.codes.ok)
1493         res = response.json()
1494         links = res['network'][0]['ietf-network-topology:link']
1495         self.assertEqual(18, len(links), 'Topology should contain 18 links')
1496
1497     def test_94_disconnect_spdrA(self):
1498         response = test_utils.unmount_device("SPDR-SA1")
1499         self.assertEqual(response.status_code, requests.codes.ok,
1500                          test_utils.CODE_SHOULD_BE_200)
1501
1502     def test_95_disconnect_spdrC(self):
1503         response = test_utils.unmount_device("SPDR-SC1")
1504         self.assertEqual(response.status_code, requests.codes.ok,
1505                          test_utils.CODE_SHOULD_BE_200)
1506
1507     def test_96_disconnect_roadmA(self):
1508         response = test_utils.unmount_device("ROADM-A1")
1509         self.assertEqual(response.status_code, requests.codes.ok,
1510                          test_utils.CODE_SHOULD_BE_200)
1511
1512     def test_97_disconnect_roadmC(self):
1513         response = test_utils.unmount_device("ROADM-C1")
1514         self.assertEqual(response.status_code, requests.codes.ok,
1515                          test_utils.CODE_SHOULD_BE_200)
1516
1517
1518 if __name__ == "__main__":
1519     unittest.main(verbosity=2)