Replace tpce-topology yang by existing ordmodels
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test14_otn_switch_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_sample_data = {"input": {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service-OCH-OTU4-AB",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "tx-direction": [{
50                 "port": {
51                     "port-device-name": "SPDR-SA1-XPDR2",
52                     "port-type": "fixed",
53                     "port-name": "XPDR2-NETWORK1",
54                     "port-rack": "000000.00",
55                     "port-shelf": "Chassis#1"
56                 },
57                 "lgx": {
58                     "lgx-device-name": "Some lgx-device-name",
59                     "lgx-port-name": "Some lgx-port-name",
60                     "lgx-port-rack": "000000.00",
61                     "lgx-port-shelf": "00"
62                 },
63                 "index": 0
64             }],
65             "rx-direction": [{
66                 "port": {
67                     "port-device-name": "SPDR-SA1-XPDR2",
68                     "port-type": "fixed",
69                     "port-name": "XPDR2-NETWORK1",
70                     "port-rack": "000000.00",
71                     "port-shelf": "Chassis#1"
72                 },
73                 "lgx": {
74                     "lgx-device-name": "Some lgx-device-name",
75                     "lgx-port-name": "Some lgx-port-name",
76                     "lgx-port-rack": "000000.00",
77                     "lgx-port-shelf": "00"
78                 },
79                 "index": 0
80             }],
81             "optic-type": "gray"
82         },
83         "service-z-end": {
84             "service-rate": "100",
85             "node-id": "SPDR-SB1",
86             "service-format": "OTU",
87             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
88             "clli": "NodeSB",
89             "tx-direction": [{
90                 "port": {
91                     "port-device-name": "SPDR-SB1-XPDR2",
92                     "port-type": "fixed",
93                     "port-name": "XPDR2-NETWORK1",
94                     "port-rack": "000000.00",
95                     "port-shelf": "Chassis#1"
96                 },
97                 "lgx": {
98                     "lgx-device-name": "Some lgx-device-name",
99                     "lgx-port-name": "Some lgx-port-name",
100                     "lgx-port-rack": "000000.00",
101                     "lgx-port-shelf": "00"
102                 },
103                 "index": 0
104             }],
105             "rx-direction": [{
106                 "port": {
107                     "port-device-name": "SPDR-SB1-XPDR2",
108                     "port-type": "fixed",
109                     "port-name": "XPDR2-NETWORK1",
110                     "port-rack": "000000.00",
111                     "port-shelf": "Chassis#1"
112                 },
113                 "lgx": {
114                     "lgx-device-name": "Some lgx-device-name",
115                     "lgx-port-name": "Some lgx-port-name",
116                     "lgx-port-rack": "000000.00",
117                     "lgx-port-shelf": "00"
118                 },
119                 "index": 0
120             }],
121             "optic-type": "gray"
122         },
123         "due-date": "2018-06-15T00:00:01Z",
124         "operator-contact": "pw1234"
125     }
126     }
127
128     @classmethod
129     def setUpClass(cls):
130         cls.processes = test_utils.start_tpce()
131         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
132                                                ('spdrb', cls.NODE_VERSION),
133                                                ('spdrc', cls.NODE_VERSION),
134                                                ('roadma', cls.NODE_VERSION),
135                                                ('roadmb', cls.NODE_VERSION),
136                                                ('roadmc', cls.NODE_VERSION)])
137
138     @classmethod
139     def tearDownClass(cls):
140         # pylint: disable=not-an-iterable
141         for process in cls.processes:
142             test_utils.shutdown_process(process)
143         print("all processes killed")
144
145     def setUp(self):
146         time.sleep(2)
147
148     def test_001_connect_spdrA(self):
149         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
150         self.assertEqual(response.status_code,
151                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152
153     def test_002_connect_spdrB(self):
154         response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
155         self.assertEqual(response.status_code,
156                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157
158     def test_003_connect_spdrC(self):
159         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
160         self.assertEqual(response.status_code,
161                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162
163     def test_004_connect_rdmA(self):
164         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
165         self.assertEqual(response.status_code,
166                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167
168     def test_005_connect_rdmB(self):
169         response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
170         self.assertEqual(response.status_code,
171                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
172
173     def test_006_connect_rdmC(self):
174         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
175         self.assertEqual(response.status_code,
176                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
177
178     def test_007_connect_sprdA_2_N1_to_roadmA_PP3(self):
179         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
180                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
181         self.assertEqual(response.status_code, requests.codes.ok)
182         res = response.json()
183         self.assertIn('Xponder Roadm Link created successfully',
184                       res["output"]["result"])
185         time.sleep(2)
186
187     def test_008_connect_roadmA_PP3_to_spdrA_2_N1(self):
188         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
189                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
190         self.assertEqual(response.status_code, requests.codes.ok)
191         res = response.json()
192         self.assertIn('Roadm Xponder links created successfully',
193                       res["output"]["result"])
194         time.sleep(2)
195
196     def test_009_connect_sprdC_2_N1_to_roadmC_PP3(self):
197         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
198                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
199         self.assertEqual(response.status_code, requests.codes.ok)
200         res = response.json()
201         self.assertIn('Xponder Roadm Link created successfully',
202                       res["output"]["result"])
203         time.sleep(2)
204
205     def test_010_connect_roadmC_PP3_to_spdrC_2_N1(self):
206         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
207                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
208         self.assertEqual(response.status_code, requests.codes.ok)
209         res = response.json()
210         self.assertIn('Roadm Xponder links created successfully',
211                       res["output"]["result"])
212         time.sleep(2)
213
214     def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
215         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
216                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
217         self.assertEqual(response.status_code, requests.codes.ok)
218         res = response.json()
219         self.assertIn('Xponder Roadm Link created successfully',
220                       res["output"]["result"])
221         time.sleep(2)
222
223     def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
224         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
225                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
226         self.assertEqual(response.status_code, requests.codes.ok)
227         res = response.json()
228         self.assertIn('Roadm Xponder links created successfully',
229                       res["output"]["result"])
230         time.sleep(2)
231
232     def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
233         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
234                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
235         self.assertEqual(response.status_code, requests.codes.ok)
236         res = response.json()
237         self.assertIn('Xponder Roadm Link created successfully',
238                       res["output"]["result"])
239         time.sleep(2)
240
241     def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
242         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
243                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
244         self.assertEqual(response.status_code, requests.codes.ok)
245         res = response.json()
246         self.assertIn('Roadm Xponder links created successfully',
247                       res["output"]["result"])
248         time.sleep(2)
249
250     def test_015_add_omsAttributes_ROADMA_ROADMB(self):
251         # Config ROADMA-ROADMB oms-attributes
252         data = {"span": {
253             "auto-spanloss": "true",
254             "spanloss-base": 11.4,
255             "spanloss-current": 12,
256             "engineered-spanloss": 12.2,
257             "link-concatenation": [{
258                 "SRLG-Id": 0,
259                 "fiber-type": "smf",
260                 "SRLG-length": 100000,
261                 "pmd": 0.5}]}}
262         response = test_utils.add_oms_attr_request(
263             "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
264         self.assertEqual(response.status_code, requests.codes.created)
265
266     def test_016_add_omsAttributes_ROADMB_ROADMA(self):
267         # Config ROADMB-ROADMA oms-attributes
268         data = {"span": {
269             "auto-spanloss": "true",
270             "spanloss-base": 11.4,
271             "spanloss-current": 12,
272             "engineered-spanloss": 12.2,
273             "link-concatenation": [{
274                 "SRLG-Id": 0,
275                 "fiber-type": "smf",
276                 "SRLG-length": 100000,
277                 "pmd": 0.5}]}}
278         response = test_utils.add_oms_attr_request(
279             "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
280         self.assertEqual(response.status_code, requests.codes.created)
281
282     def test_017_add_omsAttributes_ROADMB_ROADMC(self):
283         # Config ROADMB-ROADMC oms-attributes
284         data = {"span": {
285             "auto-spanloss": "true",
286             "spanloss-base": 11.4,
287             "spanloss-current": 12,
288             "engineered-spanloss": 12.2,
289             "link-concatenation": [{
290                 "SRLG-Id": 0,
291                 "fiber-type": "smf",
292                 "SRLG-length": 100000,
293                 "pmd": 0.5}]}}
294         response = test_utils.add_oms_attr_request(
295             "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
296         self.assertEqual(response.status_code, requests.codes.created)
297
298     def test_018_add_omsAttributes_ROADMC_ROADMB(self):
299         # Config ROADMC-ROADMB oms-attributes
300         data = {"span": {
301             "auto-spanloss": "true",
302             "spanloss-base": 11.4,
303             "spanloss-current": 12,
304             "engineered-spanloss": 12.2,
305             "link-concatenation": [{
306                 "SRLG-Id": 0,
307                 "fiber-type": "smf",
308                 "SRLG-length": 100000,
309                 "pmd": 0.5}]}}
310         response = test_utils.add_oms_attr_request(
311             "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
312         self.assertEqual(response.status_code, requests.codes.created)
313
314     def test_019_create_OTS_ROADMA_DEG1(self):
315         response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
316         time.sleep(10)
317         self.assertEqual(response.status_code, requests.codes.ok)
318         res = response.json()
319         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
320                       res["output"]["result"])
321
322     def test_020_create_OTS_ROADMB_DEG1(self):
323         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
324         time.sleep(10)
325         self.assertEqual(response.status_code, requests.codes.ok)
326         res = response.json()
327         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
328                       res["output"]["result"])
329
330     def test_021_create_OTS_ROADMB_DEG2(self):
331         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
332         time.sleep(10)
333         self.assertEqual(response.status_code, requests.codes.ok)
334         res = response.json()
335         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
336                       res["output"]["result"])
337
338     def test_022_create_OTS_ROADMC_DEG2(self):
339         response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
340         time.sleep(10)
341         self.assertEqual(response.status_code, requests.codes.ok)
342         res = response.json()
343         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
344                       res["output"]["result"])
345
346     def test_023_calculate_span_loss_base_all(self):
347         url = "{}/operations/transportpce-olm:calculate-spanloss-base"
348         data = {
349             "input": {
350                 "src-type": "all"
351             }
352         }
353         response = test_utils.post_request(url, data)
354         self.assertEqual(response.status_code, requests.codes.ok)
355         res = response.json()
356         self.assertIn('Success',
357                       res["output"]["result"])
358         self.assertIn({
359             "spanloss": "25.7",
360             "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
361         }, res["output"]["spans"])
362         self.assertIn({
363             "spanloss": "17.6",
364             "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
365         }, res["output"]["spans"])
366         self.assertIn({
367             "spanloss": "23.6",
368             "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
369         }, res["output"]["spans"])
370         self.assertIn({
371             "spanloss": "23.6",
372             "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
373         }, res["output"]["spans"])
374         self.assertIn({
375             "spanloss": "25.7",
376             "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
377         }, res["output"]["spans"])
378         self.assertIn({
379             "spanloss": "17.6",
380             "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
381         }, res["output"]["spans"])
382         time.sleep(5)
383
384     def test_024_check_otn_topology(self):
385         response = test_utils.get_otn_topo_request()
386         self.assertEqual(response.status_code, requests.codes.ok)
387         res = response.json()
388         nbNode = len(res['network'][0]['node'])
389         self.assertEqual(nbNode, 9, 'There should be 9 nodes')
390         self.assertNotIn('ietf-network-topology:link', res['network'][0],
391                          'otn-topology should have no link')
392
393 # test service-create for OCH-OTU4 service from spdrA to spdrB
394     def test_025_create_OCH_OTU4_service_AB(self):
395         response = test_utils.service_create_request(self.cr_serv_sample_data)
396         self.assertEqual(response.status_code, requests.codes.ok)
397         res = response.json()
398         self.assertIn('PCE calculation in progress',
399                       res['output']['configuration-response-common']['response-message'])
400         time.sleep(self.WAITING)
401
402     def test_026_get_OCH_OTU4_service_AB(self):
403         response = test_utils.get_service_list_request(
404             "services/service-OCH-OTU4-AB")
405         self.assertEqual(response.status_code, requests.codes.ok)
406         res = response.json()
407         self.assertEqual(
408             res['services'][0]['administrative-state'], 'inService')
409         self.assertEqual(
410             res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
411         self.assertEqual(
412             res['services'][0]['connection-type'], 'infrastructure')
413         self.assertEqual(
414             res['services'][0]['lifecycle-state'], 'planned')
415         time.sleep(2)
416
417 # Check correct configuration of devices
418     def test_027_check_interface_och_spdra(self):
419         response = test_utils.check_netconf_node_request(
420             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
421         self.assertEqual(response.status_code, requests.codes.ok)
422         res = response.json()
423         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
424                                    'administrative-state': 'inService',
425                                    'supporting-circuit-pack-name': 'CP5-CFP',
426                                    'type': 'org-openroadm-interfaces:opticalChannel',
427                                    'supporting-port': 'CP5-CFP-P1'
428                                    }, **res['interface'][0]),
429                              res['interface'][0])
430
431         self.assertDictEqual(
432             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
433              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
434             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
435
436     def test_028_check_interface_OTU4_spdra(self):
437         response = test_utils.check_netconf_node_request(
438             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
439         self.assertEqual(response.status_code, requests.codes.ok)
440         res = response.json()
441         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
442                         'administrative-state': 'inService',
443                         'supporting-circuit-pack-name': 'CP5-CFP',
444                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
445                         'type': 'org-openroadm-interfaces:otnOtu',
446                         'supporting-port': 'CP5-CFP-P1'
447                         }
448         input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
449                         'expected-dapi': 'AOQxIv+6nCD+',
450                         'tx-dapi': 'X+8cRNi+HbE=',
451                         'expected-sapi': 'X+8cRNi+HbE=',
452                         'rate': 'org-openroadm-otn-common-types:OTU4',
453                         'fec': 'scfec'
454                         }
455         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
456                              res['interface'][0])
457
458         self.assertDictEqual(input_dict_2,
459                              res['interface'][0]
460                              ['org-openroadm-otn-otu-interfaces:otu'])
461         response2 = test_utils.check_netconf_node_request(
462             "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU/otu")
463         self.assertEqual(response2.status_code, requests.codes.ok)
464         res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
465         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
466         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
467         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
468         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
469
470     def test_029_check_interface_och_spdrB(self):
471         response = test_utils.check_netconf_node_request(
472             "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
473         self.assertEqual(response.status_code, requests.codes.ok)
474         res = response.json()
475         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
476                                    'administrative-state': 'inService',
477                                    'supporting-circuit-pack-name': 'CP5-CFP',
478                                    'type': 'org-openroadm-interfaces:opticalChannel',
479                                    'supporting-port': 'CP5-CFP-P1'
480                                    }, **res['interface'][0]),
481                              res['interface'][0])
482
483         self.assertDictEqual(
484             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
485              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
486             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
487
488     def test_030_check_interface_OTU4_spdrB(self):
489         response = test_utils.check_netconf_node_request(
490             "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
491         self.assertEqual(response.status_code, requests.codes.ok)
492         res = response.json()
493         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
494                         'administrative-state': 'inService',
495                         'supporting-circuit-pack-name': 'CP5-CFP',
496                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
497                         'type': 'org-openroadm-interfaces:otnOtu',
498                         'supporting-port': 'CP5-CFP-P1'
499                         }
500         input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
501                         'expected-sapi': 'AOQxIv+6nCD+',
502                         'tx-sapi': 'X+8cRNi+HbE=',
503                         'expected-dapi': 'X+8cRNi+HbE=',
504                         'rate': 'org-openroadm-otn-common-types:OTU4',
505                         'fec': 'scfec'
506                         }
507
508         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
509                              res['interface'][0])
510
511         self.assertDictEqual(input_dict_2,
512                              res['interface'][0]
513                              ['org-openroadm-otn-otu-interfaces:otu'])
514         response2 = test_utils.check_netconf_node_request(
515             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU/otu")
516         self.assertEqual(response.status_code, requests.codes.ok)
517         res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
518         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
519         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
520         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
521         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
522
523     def test_031_check_no_interface_ODU4_spdra(self):
524         response = test_utils.check_netconf_node_request(
525             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
526         self.assertEqual(response.status_code, requests.codes.conflict)
527         res = response.json()
528         self.assertIn(
529             {"error-type": "application", "error-tag": "data-missing",
530              "error-message": "Request could not be completed because the relevant data model content does not exist"},
531             res['errors']['error'])
532
533     def test_032_check_openroadm_topo_spdra(self):
534         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
535         self.assertEqual(response.status_code, requests.codes.ok)
536         res = response.json()
537         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
538         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
539         self.assertEqual({'frequency': 196.1,
540                           'width': 40},
541                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
542         self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
543                          ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
544         time.sleep(3)
545
546     def test_033_check_openroadm_topo_ROADMA_SRG(self):
547         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
548         self.assertEqual(response.status_code, requests.codes.ok)
549         res = response.json()
550         freq_map = base64.b64decode(
551             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
552         freq_map_array = [int(x) for x in freq_map]
553         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
554         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
555         for ele in liste_tp:
556             if ele['tp-id'] == 'SRG1-PP3-TXRX':
557                 freq_map = base64.b64decode(
558                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
559                 freq_map_array = [int(x) for x in freq_map]
560                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
561             if ele['tp-id'] == 'SRG1-PP2-TXRX':
562                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
563         time.sleep(3)
564
565     def test_034_check_openroadm_topo_ROADMA_DEG1(self):
566         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
567         self.assertEqual(response.status_code, requests.codes.ok)
568         res = response.json()
569         freq_map = base64.b64decode(
570             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
571         freq_map_array = [int(x) for x in freq_map]
572         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
573         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
574         for ele in liste_tp:
575             if ele['tp-id'] == 'DEG1-CTP-TXRX':
576                 freq_map = base64.b64decode(
577                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
578                 freq_map_array = [int(x) for x in freq_map]
579                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
580             if ele['tp-id'] == 'DEG1-TTP-TXRX':
581                 freq_map = base64.b64decode(
582                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
583                 freq_map_array = [int(x) for x in freq_map]
584                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
585         time.sleep(3)
586
587     def test_035_check_otn_topo_otu4_links(self):
588         response = test_utils.get_otn_topo_request()
589         self.assertEqual(response.status_code, requests.codes.ok)
590         res = response.json()
591         nb_links = len(res['network'][0]['ietf-network-topology:link'])
592         self.assertEqual(nb_links, 2)
593         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
594                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
595         for link in res['network'][0]['ietf-network-topology:link']:
596             self.assertIn(link['link-id'], listLinkId)
597             self.assertEqual(
598                 link['transportpce-networkutils:otn-link-type'], 'OTU4')
599             self.assertEqual(
600                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
601             self.assertEqual(
602                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
603             self.assertEqual(
604                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
605             self.assertIn(
606                 link['org-openroadm-common-network:opposite-link'], listLinkId)
607
608
609 # test service-create for OCH-OTU4 service from spdrB to spdrC
610
611     def test_036_create_OCH_OTU4_service_BC(self):
612         # pylint: disable=line-too-long
613         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
614         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
615         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
616         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
617         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
618         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
619         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
620         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
621         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
622         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
623         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
624         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
625         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
626
627         response = test_utils.service_create_request(self.cr_serv_sample_data)
628         self.assertEqual(response.status_code, requests.codes.ok)
629         res = response.json()
630         self.assertIn('PCE calculation in progress',
631                       res['output']['configuration-response-common']['response-message'])
632         time.sleep(self.WAITING)
633
634     def test_037_get_OCH_OTU4_service_BC(self):
635         response = test_utils.get_service_list_request(
636             "services/service-OCH-OTU4-BC")
637         self.assertEqual(response.status_code, requests.codes.ok)
638         res = response.json()
639         self.assertEqual(
640             res['services'][0]['administrative-state'], 'inService')
641         self.assertEqual(
642             res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
643         self.assertEqual(
644             res['services'][0]['connection-type'], 'infrastructure')
645         self.assertEqual(
646             res['services'][0]['lifecycle-state'], 'planned')
647         time.sleep(2)
648
649 # Check correct configuration of devices
650     def test_038_check_interface_och_spdrB(self):
651         response = test_utils.check_netconf_node_request(
652             "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
653         self.assertEqual(response.status_code, requests.codes.ok)
654         res = response.json()
655         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
656                                    'administrative-state': 'inService',
657                                    'supporting-circuit-pack-name': 'CP6-CFP',
658                                    'type': 'org-openroadm-interfaces:opticalChannel',
659                                    'supporting-port': 'CP1-CFP0-P1'
660                                    }, **res['interface'][0]),
661                              res['interface'][0])
662
663         self.assertDictEqual(
664             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
665              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
666             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
667
668     def test_039_check_interface_OTU4_spdrB(self):
669         response = test_utils.check_netconf_node_request(
670             "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
671         self.assertEqual(response.status_code, requests.codes.ok)
672         res = response.json()
673         input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
674                         'administrative-state': 'inService',
675                         'supporting-circuit-pack-name': 'CP6-CFP',
676                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
677                         'type': 'org-openroadm-interfaces:otnOtu',
678                         'supporting-port': 'CP6-CFP-P1'
679                         }
680         input_dict_2 = {'tx-sapi': 'X+8cRNi+HbI=',
681                         'expected-dapi': 'X+8cRNi+HbI=',
682                         'tx-dapi': 'ALvne1QI5jo4',
683                         'expected-sapi': 'ALvne1QI5jo4',
684                         'rate': 'org-openroadm-otn-common-types:OTU4',
685                         'fec': 'scfec'
686                         }
687         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
688                              res['interface'][0])
689
690         self.assertDictEqual(input_dict_2,
691                              res['interface'][0]
692                              ['org-openroadm-otn-otu-interfaces:otu'])
693         response2 = test_utils.check_netconf_node_request(
694             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU/otu")
695         self.assertEqual(response.status_code, requests.codes.ok)
696         res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
697         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
698         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
699         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
700         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
701
702     def test_040_check_interface_och_spdrC(self):
703         response = test_utils.check_netconf_node_request(
704             "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
705         self.assertEqual(response.status_code, requests.codes.ok)
706         res = response.json()
707         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
708                                    'administrative-state': 'inService',
709                                    'supporting-circuit-pack-name': 'CP5-CFP',
710                                    'type': 'org-openroadm-interfaces:opticalChannel',
711                                    'supporting-port': 'CP5-CFP-P1'
712                                    }, **res['interface'][0]),
713                              res['interface'][0])
714
715         self.assertDictEqual(
716             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
717              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
718             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
719
720     def test_041_check_interface_OTU4_spdrC(self):
721         response = test_utils.check_netconf_node_request(
722             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
723         self.assertEqual(response.status_code, requests.codes.ok)
724         res = response.json()
725         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
726                         'administrative-state': 'inService',
727                         'supporting-circuit-pack-name': 'CP5-CFP',
728                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
729                         'type': 'org-openroadm-interfaces:otnOtu',
730                         'supporting-port': 'CP5-CFP-P1'
731                         }
732         input_dict_2 = {'tx-dapi': 'X+8cRNi+HbI=',
733                         'expected-sapi': 'X+8cRNi+HbI=',
734                         'tx-sapi': 'ALvne1QI5jo4',
735                         'expected-dapi': 'ALvne1QI5jo4',
736                         'rate': 'org-openroadm-otn-common-types:OTU4',
737                         'fec': 'scfec'
738                         }
739
740         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
741                              res['interface'][0])
742
743         self.assertDictEqual(input_dict_2,
744                              res['interface'][0]
745                              ['org-openroadm-otn-otu-interfaces:otu'])
746         response2 = test_utils.check_netconf_node_request(
747             "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU/otu")
748         self.assertEqual(response.status_code, requests.codes.ok)
749         res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
750         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
751         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
752         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
753         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
754
755     def test_042_check_no_interface_ODU4_spdrB(self):
756         response = test_utils.check_netconf_node_request(
757             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
758         self.assertEqual(response.status_code, requests.codes.conflict)
759         res = response.json()
760         self.assertIn(
761             {"error-type": "application", "error-tag": "data-missing",
762              "error-message": "Request could not be completed because the relevant data model content does not exist"},
763             res['errors']['error'])
764
765     def test_043_check_openroadm_topo_spdrB(self):
766         response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
767         self.assertEqual(response.status_code, requests.codes.ok)
768         res = response.json()
769         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
770         # pylint: disable=consider-using-f-string
771         for ele in liste_tp:
772             if ele['tp-id'] == 'XPDR2-NETWORK1':
773                 self.assertEqual({'frequency': 196.1,
774                                   'width': 40},
775                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
776                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
777                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
778             elif ele['tp-id'] == 'XPDR2-NETWORK2':
779                 self.assertEqual({'frequency': 196.05,
780                                   'width': 40},
781                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
782                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
783                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
784             else:
785                 print("ele = {}".format(ele))
786                 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
787         time.sleep(3)
788
789     def test_044_check_openroadm_topo_ROADMB_SRG1(self):
790         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
791         self.assertEqual(response.status_code, requests.codes.ok)
792         res = response.json()
793         freq_map = base64.b64decode(
794             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
795         freq_map_array = [int(x) for x in freq_map]
796         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
797         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
798         self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
799         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
800         for ele in liste_tp:
801             if ele['tp-id'] == 'SRG1-PP1-TXRX':
802                 freq_map = base64.b64decode(
803                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
804                 freq_map_array = [int(x) for x in freq_map]
805                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
806             if ele['tp-id'] == 'SRG1-PP2-TXRX':
807                 freq_map = base64.b64decode(
808                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
809                 freq_map_array = [int(x) for x in freq_map]
810                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
811             if ele['tp-id'] == 'SRG1-PP3-TXRX':
812                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
813         time.sleep(3)
814
815     def test_045_check_openroadm_topo_ROADMB_DEG2(self):
816         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
817         self.assertEqual(response.status_code, requests.codes.ok)
818         res = response.json()
819         freq_map = base64.b64decode(
820             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
821         freq_map_array = [int(x) for x in freq_map]
822         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
823         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
824         for ele in liste_tp:
825             if ele['tp-id'] == 'DEG2-CTP-TXRX':
826                 freq_map = base64.b64decode(
827                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
828                 freq_map_array = [int(x) for x in freq_map]
829                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
830             if ele['tp-id'] == 'DEG2-TTP-TXRX':
831                 freq_map = base64.b64decode(
832                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
833                 freq_map_array = [int(x) for x in freq_map]
834                 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
835         time.sleep(3)
836
837     def test_046_check_otn_topo_otu4_links(self):
838         response = test_utils.get_otn_topo_request()
839         self.assertEqual(response.status_code, requests.codes.ok)
840         res = response.json()
841         nb_links = len(res['network'][0]['ietf-network-topology:link'])
842         self.assertEqual(nb_links, 4)
843         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
844                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
845                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
846                       'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
847         for link in res['network'][0]['ietf-network-topology:link']:
848             self.assertIn(link['link-id'], listLinkId)
849             self.assertEqual(
850                 link['transportpce-networkutils:otn-link-type'], 'OTU4')
851             self.assertEqual(
852                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
853             self.assertEqual(
854                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
855             self.assertEqual(
856                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
857             self.assertIn(
858                 link['org-openroadm-common-network:opposite-link'], listLinkId)
859
860 # test service-create for 100GE service from spdrA to spdrC via spdrB
861     def test_047_create_100GE_service_ABC(self):
862         # pylint: disable=line-too-long
863         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
864         self.cr_serv_sample_data["input"]["connection-type"] = "service"
865         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
866         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
867         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
868         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
869         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
870         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
871         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
872         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
873         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
874         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
875         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
876         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
877         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
878         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
879         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
880         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
881
882         response = test_utils.service_create_request(self.cr_serv_sample_data)
883         self.assertEqual(response.status_code, requests.codes.ok)
884         res = response.json()
885         self.assertIn('PCE calculation in progress',
886                       res['output']['configuration-response-common']['response-message'])
887         time.sleep(self.WAITING)
888
889     def test_048_get_100GE_service_ABC(self):
890         response = test_utils.get_service_list_request(
891             "services/service-100GE-ABC")
892         self.assertEqual(response.status_code, requests.codes.ok)
893         res = response.json()
894         self.assertEqual(
895             res['services'][0]['administrative-state'], 'inService')
896         self.assertEqual(
897             res['services'][0]['service-name'], 'service-100GE-ABC')
898         self.assertEqual(
899             res['services'][0]['connection-type'], 'service')
900         self.assertEqual(
901             res['services'][0]['lifecycle-state'], 'planned')
902         time.sleep(2)
903
904     def test_049_check_interface_100GE_CLIENT_spdra(self):
905         response = test_utils.check_netconf_node_request(
906             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
907         self.assertEqual(response.status_code, requests.codes.ok)
908         res = response.json()
909         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
910                       'administrative-state': 'inService',
911                       'supporting-circuit-pack-name': 'CP2-QSFP1',
912                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
913                       'supporting-port': 'CP2-QSFP1-P1'
914                       }
915         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
916                              res['interface'][0])
917         self.assertDictEqual(
918             {'speed': 100000,
919              'fec': 'off'},
920             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
921
922     def test_050_check_interface_ODU4_CLIENT_spdra(self):
923         response = test_utils.check_netconf_node_request(
924             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
925         self.assertEqual(response.status_code, requests.codes.ok)
926         res = response.json()
927         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
928                         'administrative-state': 'inService',
929                         'supporting-circuit-pack-name': 'CP2-QSFP1',
930                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
931                         'type': 'org-openroadm-interfaces:otnOdu',
932                         'supporting-port': 'CP2-QSFP1-P1'}
933         input_dict_2 = {
934             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
935             'rate': 'org-openroadm-otn-common-types:ODU4',
936             'monitoring-mode': 'terminated',
937             'expected-dapi': 'AItaZ6nmyaKJ',
938             'expected-sapi': 'AKFnJJaijWiz',
939             'tx-dapi': 'AKFnJJaijWiz',
940             'tx-sapi': 'AItaZ6nmyaKJ'}
941
942         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
943                              res['interface'][0])
944         self.assertDictEqual(dict(input_dict_2,
945                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
946                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
947         self.assertDictEqual(
948             {'payload-type': '21', 'exp-payload-type': '21'},
949             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
950         response2 = test_utils.check_netconf_node_request(
951             "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
952         self.assertEqual(response.status_code, requests.codes.ok)
953         res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
954         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
955         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
956         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
957         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
958
959     def test_051_check_interface_ODU4_NETWORK_spdra(self):
960         response = test_utils.check_netconf_node_request(
961             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
962         self.assertEqual(response.status_code, requests.codes.ok)
963         res = response.json()
964         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
965                         'administrative-state': 'inService',
966                         'supporting-circuit-pack-name': 'CP5-CFP',
967                         'type': 'org-openroadm-interfaces:otnOdu',
968                         'supporting-port': 'CP5-CFP-P1',
969                         'circuit-id': 'TBD',
970                         'description': 'TBD'}
971         input_dict_2 = {
972             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
973             'rate': 'org-openroadm-otn-common-types:ODU4',
974             'monitoring-mode': 'monitored'}
975
976         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
977                              res['interface'][0])
978         self.assertDictEqual(dict(input_dict_2,
979                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
980                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
981         self.assertNotIn('opu',
982                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
983
984     def test_052_check_ODU4_connection_spdra(self):
985         response = test_utils.check_netconf_node_request(
986             "SPDR-SA1",
987             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
988         self.assertEqual(response.status_code, requests.codes.ok)
989         res = response.json()
990         input_dict_1 = {
991             'connection-name':
992             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
993             'direction': 'bidirectional'
994         }
995
996         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
997                              res['odu-connection'][0])
998         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
999                              res['odu-connection'][0]['destination'])
1000         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1001                              res['odu-connection'][0]['source'])
1002
1003     def test_053_check_interface_100GE_CLIENT_spdrc(self):
1004         response = test_utils.check_netconf_node_request(
1005             "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
1006         self.assertEqual(response.status_code, requests.codes.ok)
1007         res = response.json()
1008         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
1009                       'administrative-state': 'inService',
1010                       'supporting-circuit-pack-name': 'CP2-QSFP1',
1011                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
1012                       'supporting-port': 'CP2-QSFP1-P1'
1013                       }
1014         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1015                              res['interface'][0])
1016         self.assertDictEqual(
1017             {'speed': 100000,
1018              'fec': 'off'},
1019             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1020
1021     def test_054_check_interface_ODU4_CLIENT_spdrc(self):
1022         response = test_utils.check_netconf_node_request(
1023             "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
1024         self.assertEqual(response.status_code, requests.codes.ok)
1025         res = response.json()
1026         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
1027                         'administrative-state': 'inService',
1028                         'supporting-circuit-pack-name': 'CP2-QSFP1',
1029                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
1030                         'type': 'org-openroadm-interfaces:otnOdu',
1031                         'supporting-port': 'CP2-QSFP1-P1',
1032                         'circuit-id': 'TBD',
1033                         'description': 'TBD'}
1034         input_dict_2 = {
1035             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1036             'rate': 'org-openroadm-otn-common-types:ODU4',
1037             'monitoring-mode': 'terminated',
1038             'expected-dapi': 'AKFnJJaijWiz',
1039             'expected-sapi': 'AItaZ6nmyaKJ',
1040             'tx-dapi': 'AItaZ6nmyaKJ',
1041             'tx-sapi': 'AKFnJJaijWiz'}
1042
1043         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1044                              res['interface'][0])
1045         self.assertDictEqual(dict(input_dict_2,
1046                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1047                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1048         self.assertDictEqual(
1049             {'payload-type': '21', 'exp-payload-type': '21'},
1050             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1051         response2 = test_utils.check_netconf_node_request(
1052             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1053         self.assertEqual(response.status_code, requests.codes.ok)
1054         res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1055         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1056         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1057         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1058         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1059
1060     def test_055_check_interface_ODU4_NETWORK_spdrc(self):
1061         response = test_utils.check_netconf_node_request(
1062             "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
1063         self.assertEqual(response.status_code, requests.codes.ok)
1064         res = response.json()
1065         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1066                         'administrative-state': 'inService',
1067                         'supporting-circuit-pack-name': 'CP5-CFP',
1068                         'type': 'org-openroadm-interfaces:otnOdu',
1069                         'supporting-port': 'CP5-CFP-P1',
1070                         'circuit-id': 'TBD',
1071                         'description': 'TBD'}
1072         input_dict_2 = {
1073             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1074             'rate': 'org-openroadm-otn-common-types:ODU4',
1075             'monitoring-mode': 'monitored'}
1076
1077         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1078                              res['interface'][0])
1079         self.assertDictEqual(dict(input_dict_2,
1080                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1081                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1082         self.assertNotIn('opu',
1083                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1084
1085     def test_056_check_ODU4_connection_spdrc(self):
1086         response = test_utils.check_netconf_node_request(
1087             "SPDR-SC1",
1088             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1089         self.assertEqual(response.status_code, requests.codes.ok)
1090         res = response.json()
1091         input_dict_1 = {
1092             'connection-name':
1093             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1094             'direction': 'bidirectional'
1095         }
1096
1097         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1098                              res['odu-connection'][0])
1099         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1100                              res['odu-connection'][0]['destination'])
1101         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1102                              res['odu-connection'][0]['source'])
1103
1104     def test_057_check_interface_ODU4_NETWORK1_spdrb(self):
1105         response = test_utils.check_netconf_node_request(
1106             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1107         self.assertEqual(response.status_code, requests.codes.ok)
1108         res = response.json()
1109         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1110                         'administrative-state': 'inService',
1111                         'supporting-circuit-pack-name': 'CP5-CFP',
1112                         'type': 'org-openroadm-interfaces:otnOdu',
1113                         'supporting-port': 'CP5-CFP-P1'}
1114         input_dict_2 = {
1115             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1116             'rate': 'org-openroadm-otn-common-types:ODU4',
1117             'monitoring-mode': 'monitored'}
1118
1119         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1120                              res['interface'][0])
1121         self.assertDictEqual(dict(input_dict_2,
1122                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1123                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1124         self.assertNotIn('opu',
1125                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1126
1127     def test_058_check_interface_ODU4_NETWORK2_spdrb(self):
1128         response = test_utils.check_netconf_node_request(
1129             "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1130         self.assertEqual(response.status_code, requests.codes.ok)
1131         res = response.json()
1132         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1133                         'administrative-state': 'inService',
1134                         'supporting-circuit-pack-name': 'CP6-CFP',
1135                         'type': 'org-openroadm-interfaces:otnOdu',
1136                         'supporting-port': 'CP6-CFP-P1'}
1137         input_dict_2 = {
1138             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1139             'rate': 'org-openroadm-otn-common-types:ODU4',
1140             'monitoring-mode': 'monitored'}
1141
1142         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1143                              res['interface'][0])
1144         self.assertDictEqual(dict(input_dict_2,
1145                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1146                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1147         self.assertNotIn('opu',
1148                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1149
1150     def test_059_check_ODU4_connection_spdrb(self):
1151         response = test_utils.check_netconf_node_request(
1152             "SPDR-SB1",
1153             "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1154         self.assertEqual(response.status_code, requests.codes.ok)
1155         res = response.json()
1156         input_dict_1 = {
1157             'connection-name':
1158             'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1159             'direction': 'bidirectional'
1160         }
1161
1162         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1163                              res['odu-connection'][0])
1164         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1165                              res['odu-connection'][0]['destination'])
1166         self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1167                              res['odu-connection'][0]['source'])
1168
1169     def test_060_check_otn_topo_links(self):
1170         response = test_utils.get_otn_topo_request()
1171         self.assertEqual(response.status_code, requests.codes.ok)
1172         res = response.json()
1173         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1174         self.assertEqual(nb_links, 4)
1175         for link in res['network'][0]['ietf-network-topology:link']:
1176             self.assertEqual(
1177                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1178             self.assertEqual(
1179                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1180
1181     def test_061_delete_service_100GE_ABC(self):
1182         response = test_utils.service_delete_request("service-100GE-ABC")
1183         self.assertEqual(response.status_code, requests.codes.ok)
1184         res = response.json()
1185         self.assertIn('Renderer service delete in progress',
1186                       res['output']['configuration-response-common']['response-message'])
1187         time.sleep(self.WAITING)
1188
1189     def test_062_check_service_list(self):
1190         response = test_utils.get_service_list_request("")
1191         self.assertEqual(response.status_code, requests.codes.ok)
1192         res = response.json()
1193         self.assertEqual(len(res['service-list']['services']), 2)
1194         time.sleep(2)
1195
1196     def test_063_check_no_ODU4_connection_spdra(self):
1197         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1198         self.assertEqual(response.status_code, requests.codes.ok)
1199         res = response.json()
1200         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1201         time.sleep(1)
1202
1203     def test_064_check_no_interface_ODU4_NETWORK_spdra(self):
1204         response = test_utils.check_netconf_node_request(
1205             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1206         self.assertEqual(response.status_code, requests.codes.conflict)
1207
1208     def test_065_check_no_interface_ODU4_CLIENT_spdra(self):
1209         response = test_utils.check_netconf_node_request(
1210             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1211         self.assertEqual(response.status_code, requests.codes.conflict)
1212
1213     def test_066_check_no_interface_100GE_CLIENT_spdra(self):
1214         response = test_utils.check_netconf_node_request(
1215             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1216         self.assertEqual(response.status_code, requests.codes.conflict)
1217
1218     def test_067_check_otn_topo_links(self):
1219         self.test_046_check_otn_topo_otu4_links()
1220
1221     def test_068_delete_OCH_OTU4_service_AB(self):
1222         response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1223         self.assertEqual(response.status_code, requests.codes.ok)
1224         res = response.json()
1225         self.assertIn('Renderer service delete in progress',
1226                       res['output']['configuration-response-common']['response-message'])
1227         time.sleep(self.WAITING)
1228
1229     def test_069_delete_OCH_OTU4_service_BC(self):
1230         response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1231         self.assertEqual(response.status_code, requests.codes.ok)
1232         res = response.json()
1233         self.assertIn('Renderer service delete in progress',
1234                       res['output']['configuration-response-common']['response-message'])
1235         time.sleep(self.WAITING)
1236
1237     def test_070_get_no_service(self):
1238         response = test_utils.get_service_list_request("")
1239         self.assertEqual(response.status_code, requests.codes.conflict)
1240         res = response.json()
1241         self.assertIn(
1242             {"error-type": "application", "error-tag": "data-missing",
1243              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1244             res['errors']['error'])
1245         time.sleep(1)
1246
1247     def test_071_check_no_interface_OTU4_spdra(self):
1248         response = test_utils.check_netconf_node_request(
1249             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1250         self.assertEqual(response.status_code, requests.codes.conflict)
1251
1252     def test_072_check_no_interface_OCH_spdra(self):
1253         response = test_utils.check_netconf_node_request(
1254             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1255         self.assertEqual(response.status_code, requests.codes.conflict)
1256
1257     def test_073_getLinks_OtnTopology(self):
1258         response = test_utils.get_otn_topo_request()
1259         self.assertEqual(response.status_code, requests.codes.ok)
1260         res = response.json()
1261         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1262
1263     def test_074_check_openroadm_topo_spdra(self):
1264         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1265         self.assertEqual(response.status_code, requests.codes.ok)
1266         res = response.json()
1267         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1268         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1269         self.assertNotIn('wavelength', dict.keys(
1270             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1271         time.sleep(3)
1272
1273     def test_075_check_openroadm_topo_ROADMB_SRG1(self):
1274         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1275         self.assertEqual(response.status_code, requests.codes.ok)
1276         res = response.json()
1277         freq_map = base64.b64decode(
1278             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1279         freq_map_array = [int(x) for x in freq_map]
1280         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1281         self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1282         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1283         for ele in liste_tp:
1284             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1285                 freq_map = base64.b64decode(
1286                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1287                 freq_map_array = [int(x) for x in freq_map]
1288                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1289             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1290                 freq_map = base64.b64decode(
1291                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1292                 freq_map_array = [int(x) for x in freq_map]
1293                 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1294         time.sleep(3)
1295
1296     def test_076_check_openroadm_topo_ROADMB_DEG1(self):
1297         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1298         self.assertEqual(response.status_code, requests.codes.ok)
1299         res = response.json()
1300         freq_map = base64.b64decode(
1301             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1302         freq_map_array = [int(x) for x in freq_map]
1303         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1304         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1305         for ele in liste_tp:
1306             if ele['tp-id'] == 'DEG1-CTP-TXRX':
1307                 freq_map = base64.b64decode(
1308                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1309                 freq_map_array = [int(x) for x in freq_map]
1310                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1311             if ele['tp-id'] == 'DEG1-TTP-TXRX':
1312                 freq_map = base64.b64decode(
1313                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1314                 freq_map_array = [int(x) for x in freq_map]
1315                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1316         time.sleep(3)
1317
1318     def test_077_check_openroadm_topo_ROADMB_DEG2(self):
1319         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1320         self.assertEqual(response.status_code, requests.codes.ok)
1321         res = response.json()
1322         freq_map = base64.b64decode(
1323             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1324         freq_map_array = [int(x) for x in freq_map]
1325         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1326         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1327         for ele in liste_tp:
1328             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1329                 freq_map = base64.b64decode(
1330                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1331                 freq_map_array = [int(x) for x in freq_map]
1332                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1333             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1334                 freq_map = base64.b64decode(
1335                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1336                 freq_map_array = [int(x) for x in freq_map]
1337                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1338         time.sleep(3)
1339
1340     def test_078_disconnect_xponders_from_roadm(self):
1341         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1342         response = test_utils.get_ordm_topo_request("")
1343         self.assertEqual(response.status_code, requests.codes.ok)
1344         res = response.json()
1345         links = res['network'][0]['ietf-network-topology:link']
1346         for link in links:
1347             if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1348                     and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1349                 link_name = link["link-id"]
1350                 response = test_utils.delete_request(url+link_name)
1351                 self.assertEqual(response.status_code, requests.codes.ok)
1352
1353     def test_079_disconnect_spdrB(self):
1354         response = test_utils.unmount_device("SPDR-SB1")
1355         self.assertEqual(response.status_code, requests.codes.ok,
1356                          test_utils.CODE_SHOULD_BE_200)
1357
1358     def test_080_disconnect_roadmB(self):
1359         response = test_utils.unmount_device("ROADM-B1")
1360         self.assertEqual(response.status_code, requests.codes.ok,
1361                          test_utils.CODE_SHOULD_BE_200)
1362
1363     def test_081_remove_roadm_to_roadm_links(self):
1364         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1365         response = test_utils.get_ordm_topo_request("")
1366         self.assertEqual(response.status_code, requests.codes.ok)
1367         res = response.json()
1368         links = res['network'][0]['ietf-network-topology:link']
1369         for link in links:
1370             if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1371                     and 'ROADM-B1' in link['link-id']):
1372                 link_name = link["link-id"]
1373                 response = test_utils.delete_request(url+link_name)
1374                 self.assertEqual(response.status_code, requests.codes.ok)
1375
1376     def test_082_add_omsAttributes_ROADMA_ROADMC(self):
1377         # Config ROADMA-ROADMC oms-attributes
1378         data = {"span": {
1379             "auto-spanloss": "true",
1380             "spanloss-base": 11.4,
1381             "spanloss-current": 12,
1382             "engineered-spanloss": 12.2,
1383             "link-concatenation": [{
1384                 "SRLG-Id": 0,
1385                 "fiber-type": "smf",
1386                 "SRLG-length": 100000,
1387                 "pmd": 0.5}]}}
1388         response = test_utils.add_oms_attr_request(
1389             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1390         self.assertEqual(response.status_code, requests.codes.created)
1391
1392     def test_083_add_omsAttributes_ROADMC_ROADMA(self):
1393         # Config ROADMC-ROADMA oms-attributes
1394         data = {"span": {
1395             "auto-spanloss": "true",
1396             "spanloss-base": 11.4,
1397             "spanloss-current": 12,
1398             "engineered-spanloss": 12.2,
1399             "link-concatenation": [{
1400                 "SRLG-Id": 0,
1401                 "fiber-type": "smf",
1402                 "SRLG-length": 100000,
1403                 "pmd": 0.5}]}}
1404         response = test_utils.add_oms_attr_request(
1405             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1406         self.assertEqual(response.status_code, requests.codes.created)
1407
1408     def test_084_create_OCH_OTU4_service_AC(self):
1409         # pylint: disable=line-too-long
1410         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1411         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1412         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1413         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1414         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1415         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1416         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1417         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1418         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1419         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1420         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1421         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1422         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1423         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1424         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1425         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1426         response = test_utils.service_create_request(self.cr_serv_sample_data)
1427         self.assertEqual(response.status_code, requests.codes.ok)
1428         res = response.json()
1429         self.assertIn('PCE calculation in progress',
1430                       res['output']['configuration-response-common']['response-message'])
1431         time.sleep(self.WAITING)
1432
1433     def test_085_get_OCH_OTU4_service_AC(self):
1434         response = test_utils.get_service_list_request(
1435             "services/service-OCH-OTU4-AC")
1436         self.assertEqual(response.status_code, requests.codes.ok)
1437         res = response.json()
1438         self.assertEqual(
1439             res['services'][0]['administrative-state'], 'inService')
1440         self.assertEqual(
1441             res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1442         self.assertEqual(
1443             res['services'][0]['connection-type'], 'infrastructure')
1444         self.assertEqual(
1445             res['services'][0]['lifecycle-state'], 'planned')
1446         time.sleep(2)
1447
1448 # test service-create for 100GE service from spdrA to spdrC via spdrB
1449     def test_086_create_100GE_service_AC(self):
1450         # pylint: disable=line-too-long
1451         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1452         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1453         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1454         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1455         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1456         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1457         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1458         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1459         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1460         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1461         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1462         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1463         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1464         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1465         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1466         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1467         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1468         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1469
1470         response = test_utils.service_create_request(self.cr_serv_sample_data)
1471         self.assertEqual(response.status_code, requests.codes.ok)
1472         res = response.json()
1473         self.assertIn('PCE calculation in progress',
1474                       res['output']['configuration-response-common']['response-message'])
1475         time.sleep(self.WAITING)
1476
1477     def test_087_get_100GE_service_AC(self):
1478         response = test_utils.get_service_list_request("services/service-100GE-AC")
1479         self.assertEqual(response.status_code, requests.codes.ok)
1480         res = response.json()
1481         self.assertEqual(
1482             res['services'][0]['administrative-state'], 'inService')
1483         self.assertEqual(
1484             res['services'][0]['service-name'], 'service-100GE-AC')
1485         self.assertEqual(
1486             res['services'][0]['connection-type'], 'service')
1487         self.assertEqual(
1488             res['services'][0]['lifecycle-state'], 'planned')
1489         time.sleep(2)
1490
1491     def test_088_check_interface_OTU4_spdra(self):
1492         response = test_utils.check_netconf_node_request(
1493             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1494         self.assertEqual(response.status_code, requests.codes.ok)
1495         res = response.json()
1496         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1497                         'administrative-state': 'inService',
1498                         'supporting-circuit-pack-name': 'CP5-CFP',
1499                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
1500                         'type': 'org-openroadm-interfaces:otnOtu',
1501                         'supporting-port': 'CP5-CFP-P1'
1502                         }
1503         input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
1504                         'expected-dapi': 'AOQxIv+6nCD+',
1505                         'tx-dapi': 'ALvne1QI5jo4',
1506                         'expected-sapi': 'ALvne1QI5jo4',
1507                         'rate': 'org-openroadm-otn-common-types:OTU4',
1508                         'fec': 'scfec'
1509                         }
1510         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1511                              res['interface'][0])
1512
1513         self.assertDictEqual(input_dict_2,
1514                              res['interface'][0]
1515                              ['org-openroadm-otn-otu-interfaces:otu'])
1516         response2 = test_utils.check_netconf_node_request(
1517             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU/otu")
1518         self.assertEqual(response.status_code, requests.codes.ok)
1519         res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
1520         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1521         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1522         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1523         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1524
1525     def test_089_check_interface_OTU4_spdrC(self):
1526         response = test_utils.check_netconf_node_request(
1527             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
1528         self.assertEqual(response.status_code, requests.codes.ok)
1529         res = response.json()
1530         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1531                         'administrative-state': 'inService',
1532                         'supporting-circuit-pack-name': 'CP5-CFP',
1533                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
1534                         'type': 'org-openroadm-interfaces:otnOtu',
1535                         'supporting-port': 'CP5-CFP-P1'
1536                         }
1537         input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
1538                         'expected-sapi': 'AOQxIv+6nCD+',
1539                         'tx-sapi': 'ALvne1QI5jo4',
1540                         'expected-dapi': 'ALvne1QI5jo4',
1541                         'rate': 'org-openroadm-otn-common-types:OTU4',
1542                         'fec': 'scfec'
1543                         }
1544
1545         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1546                              res['interface'][0])
1547         self.assertDictEqual(input_dict_2,
1548                              res['interface'][0]
1549                              ['org-openroadm-otn-otu-interfaces:otu'])
1550         response2 = test_utils.check_netconf_node_request(
1551             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU/otu")
1552         self.assertEqual(response.status_code, requests.codes.ok)
1553         res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
1554         self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1555         self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1556         self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1557         self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1558
1559     def test_090_check_configuration_spdra(self):
1560         self.test_049_check_interface_100GE_CLIENT_spdra()
1561         self.test_050_check_interface_ODU4_CLIENT_spdra()
1562         self.test_051_check_interface_ODU4_NETWORK_spdra()
1563         self.test_052_check_ODU4_connection_spdra()
1564
1565     def test_091_check_configuration_spdrc(self):
1566         self.test_053_check_interface_100GE_CLIENT_spdrc()
1567         self.test_054_check_interface_ODU4_CLIENT_spdrc()
1568         self.test_055_check_interface_ODU4_NETWORK_spdrc()
1569         self.test_056_check_ODU4_connection_spdrc()
1570
1571     def test_092_check_otn_topo_links(self):
1572         response = test_utils.get_otn_topo_request()
1573         self.assertEqual(response.status_code, requests.codes.ok)
1574         res = response.json()
1575         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1576         self.assertEqual(nb_links, 2)
1577         for link in res['network'][0]['ietf-network-topology:link']:
1578             self.assertEqual(
1579                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1580             self.assertEqual(
1581                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1582
1583     def test_093_delete_100GE_service_AC(self):
1584         response = test_utils.service_delete_request("service-100GE-AC")
1585         self.assertEqual(response.status_code, requests.codes.ok)
1586         res = response.json()
1587         self.assertIn('Renderer service delete in progress',
1588                       res['output']['configuration-response-common']['response-message'])
1589         time.sleep(self.WAITING)
1590
1591     def test_094_check_service_list(self):
1592         response = test_utils.get_service_list_request("")
1593         self.assertEqual(response.status_code, requests.codes.ok)
1594         res = response.json()
1595         self.assertEqual(len(res['service-list']['services']), 1)
1596         time.sleep(2)
1597
1598     def test_095_check_configuration_spdra(self):
1599         self.test_063_check_no_ODU4_connection_spdra()
1600         self.test_064_check_no_interface_ODU4_NETWORK_spdra()
1601         self.test_065_check_no_interface_ODU4_CLIENT_spdra()
1602         self.test_066_check_no_interface_100GE_CLIENT_spdra()
1603
1604     def test_096_check_otn_topo_links(self):
1605         response = test_utils.get_otn_topo_request()
1606         self.assertEqual(response.status_code, requests.codes.ok)
1607         res = response.json()
1608         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1609         self.assertEqual(nb_links, 2)
1610         for link in res['network'][0]['ietf-network-topology:link']:
1611             self.assertEqual(
1612                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1613             self.assertEqual(
1614                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1615
1616     def test_097_disconnect_xponders_from_roadm(self):
1617         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1618         response = test_utils.get_ordm_topo_request("")
1619         self.assertEqual(response.status_code, requests.codes.ok)
1620         res = response.json()
1621         links = res['network'][0]['ietf-network-topology:link']
1622         for link in links:
1623             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1624                 link_name = link["link-id"]
1625                 response = test_utils.delete_request(url+link_name)
1626                 self.assertEqual(response.status_code, requests.codes.ok)
1627
1628     def test_098_disconnect_spdrA(self):
1629         response = test_utils.unmount_device("SPDR-SA1")
1630         self.assertEqual(response.status_code, requests.codes.ok,
1631                          test_utils.CODE_SHOULD_BE_200)
1632
1633     def test_099_disconnect_spdrC(self):
1634         response = test_utils.unmount_device("SPDR-SC1")
1635         self.assertEqual(response.status_code, requests.codes.ok,
1636                          test_utils.CODE_SHOULD_BE_200)
1637
1638     def test_100_disconnect_roadmA(self):
1639         response = test_utils.unmount_device("ROADM-A1")
1640         self.assertEqual(response.status_code, requests.codes.ok,
1641                          test_utils.CODE_SHOULD_BE_200)
1642
1643     def test_101_disconnect_roadmC(self):
1644         response = test_utils.unmount_device("ROADM-C1")
1645         self.assertEqual(response.status_code, requests.codes.ok,
1646                          test_utils.CODE_SHOULD_BE_200)
1647
1648
1649 if __name__ == "__main__":
1650     unittest.main(verbosity=2)