Fix some pylint warnings
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test14_otn_switch_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_sample_data = {"input": {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service-OCH-OTU4-AB",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "subrate-eth-sla": {
50                     "subrate-eth-sla": {
51                         "committed-info-rate": "100000",
52                         "committed-burst-size": "64"
53                     }
54             },
55             "tx-direction": {
56                 "port": {
57                     "port-device-name": "SPDR-SA1-XPDR2",
58                     "port-type": "fixed",
59                     "port-name": "XPDR2-NETWORK1",
60                     "port-rack": "000000.00",
61                     "port-shelf": "Chassis#1"
62                 },
63                 "lgx": {
64                     "lgx-device-name": "Some lgx-device-name",
65                     "lgx-port-name": "Some lgx-port-name",
66                     "lgx-port-rack": "000000.00",
67                     "lgx-port-shelf": "00"
68                 }
69             },
70             "rx-direction": {
71                 "port": {
72                     "port-device-name": "SPDR-SA1-XPDR2",
73                     "port-type": "fixed",
74                     "port-name": "XPDR2-NETWORK1",
75                     "port-rack": "000000.00",
76                     "port-shelf": "Chassis#1"
77                 },
78                 "lgx": {
79                     "lgx-device-name": "Some lgx-device-name",
80                     "lgx-port-name": "Some lgx-port-name",
81                     "lgx-port-rack": "000000.00",
82                     "lgx-port-shelf": "00"
83                 }
84             },
85             "optic-type": "gray"
86         },
87         "service-z-end": {
88             "service-rate": "100",
89             "node-id": "SPDR-SB1",
90             "service-format": "OTU",
91             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92             "clli": "NodeSB",
93             "subrate-eth-sla": {
94                     "subrate-eth-sla": {
95                         "committed-info-rate": "100000",
96                         "committed-burst-size": "64"
97                     }
98             },
99             "tx-direction": {
100                 "port": {
101                     "port-device-name": "SPDR-SB1-XPDR2",
102                     "port-type": "fixed",
103                     "port-name": "XPDR2-NETWORK1",
104                     "port-rack": "000000.00",
105                     "port-shelf": "Chassis#1"
106                 },
107                 "lgx": {
108                     "lgx-device-name": "Some lgx-device-name",
109                     "lgx-port-name": "Some lgx-port-name",
110                     "lgx-port-rack": "000000.00",
111                     "lgx-port-shelf": "00"
112                 }
113             },
114             "rx-direction": {
115                 "port": {
116                     "port-device-name": "SPDR-SB1-XPDR2",
117                     "port-type": "fixed",
118                     "port-name": "XPDR2-NETWORK1",
119                     "port-rack": "000000.00",
120                     "port-shelf": "Chassis#1"
121                 },
122                 "lgx": {
123                     "lgx-device-name": "Some lgx-device-name",
124                     "lgx-port-name": "Some lgx-port-name",
125                     "lgx-port-rack": "000000.00",
126                     "lgx-port-shelf": "00"
127                 }
128             },
129             "optic-type": "gray"
130         },
131         "due-date": "2018-06-15T00:00:01Z",
132         "operator-contact": "pw1234"
133     }
134     }
135
136     @classmethod
137     def setUpClass(cls):
138         cls.processes = test_utils.start_tpce()
139         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140                                                ('spdrb', cls.NODE_VERSION),
141                                                ('spdrc', cls.NODE_VERSION),
142                                                ('roadma', cls.NODE_VERSION),
143                                                ('roadmb', cls.NODE_VERSION),
144                                                ('roadmc', cls.NODE_VERSION)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(5)
155
156     def test_01_connect_spdrA(self):
157         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_spdrB(self):
162         response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_spdrC(self):
167         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmA(self):
172         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_rdmB(self):
177         response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
178         self.assertEqual(response.status_code,
179                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
180
181     def test_06_connect_rdmC(self):
182         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
183         self.assertEqual(response.status_code,
184                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
185
186     def test_07_connect_sprdA_2_N1_to_roadmA_PP3(self):
187         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
188                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         self.assertIn('Xponder Roadm Link created successfully',
192                       res["output"]["result"])
193         time.sleep(2)
194
195     def test_08_connect_roadmA_PP3_to_spdrA_2_N1(self):
196         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
197                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
198         self.assertEqual(response.status_code, requests.codes.ok)
199         res = response.json()
200         self.assertIn('Roadm Xponder links created successfully',
201                       res["output"]["result"])
202         time.sleep(2)
203
204     def test_09_connect_sprdC_2_N1_to_roadmC_PP3(self):
205         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
206                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
207         self.assertEqual(response.status_code, requests.codes.ok)
208         res = response.json()
209         self.assertIn('Xponder Roadm Link created successfully',
210                       res["output"]["result"])
211         time.sleep(2)
212
213     def test_10_connect_roadmC_PP3_to_spdrC_2_N1(self):
214         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
215                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
216         self.assertEqual(response.status_code, requests.codes.ok)
217         res = response.json()
218         self.assertIn('Roadm Xponder links created successfully',
219                       res["output"]["result"])
220         time.sleep(2)
221
222     def test_11_connect_sprdB_2_N1_to_roadmB_PP1(self):
223         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
224                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
225         self.assertEqual(response.status_code, requests.codes.ok)
226         res = response.json()
227         self.assertIn('Xponder Roadm Link created successfully',
228                       res["output"]["result"])
229         time.sleep(2)
230
231     def test_12_connect_roadmB_PP1_to_spdrB_2_N1(self):
232         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
233                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
234         self.assertEqual(response.status_code, requests.codes.ok)
235         res = response.json()
236         self.assertIn('Roadm Xponder links created successfully',
237                       res["output"]["result"])
238         time.sleep(2)
239
240     def test_13_connect_sprdB_2_N2_to_roadmB_PP2(self):
241         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
242                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
243         self.assertEqual(response.status_code, requests.codes.ok)
244         res = response.json()
245         self.assertIn('Xponder Roadm Link created successfully',
246                       res["output"]["result"])
247         time.sleep(2)
248
249     def test_14_connect_roadmB_PP2_to_spdrB_2_N2(self):
250         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
251                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
252         self.assertEqual(response.status_code, requests.codes.ok)
253         res = response.json()
254         self.assertIn('Roadm Xponder links created successfully',
255                       res["output"]["result"])
256         time.sleep(2)
257
258     def test_15_add_omsAttributes_ROADMA_ROADMB(self):
259         # Config ROADMA-ROADMB oms-attributes
260         data = {"span": {
261             "auto-spanloss": "true",
262             "spanloss-base": 11.4,
263             "spanloss-current": 12,
264             "engineered-spanloss": 12.2,
265             "link-concatenation": [{
266                 "SRLG-Id": 0,
267                 "fiber-type": "smf",
268                 "SRLG-length": 100000,
269                 "pmd": 0.5}]}}
270         response = test_utils.add_oms_attr_request(
271             "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
272         self.assertEqual(response.status_code, requests.codes.created)
273
274     def test_16_add_omsAttributes_ROADMB_ROADMA(self):
275         # Config ROADMB-ROADMA oms-attributes
276         data = {"span": {
277             "auto-spanloss": "true",
278             "spanloss-base": 11.4,
279             "spanloss-current": 12,
280             "engineered-spanloss": 12.2,
281             "link-concatenation": [{
282                 "SRLG-Id": 0,
283                 "fiber-type": "smf",
284                 "SRLG-length": 100000,
285                 "pmd": 0.5}]}}
286         response = test_utils.add_oms_attr_request(
287             "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
288         self.assertEqual(response.status_code, requests.codes.created)
289
290     def test_17_add_omsAttributes_ROADMB_ROADMC(self):
291         # Config ROADMB-ROADMC oms-attributes
292         data = {"span": {
293             "auto-spanloss": "true",
294             "spanloss-base": 11.4,
295             "spanloss-current": 12,
296             "engineered-spanloss": 12.2,
297             "link-concatenation": [{
298                 "SRLG-Id": 0,
299                 "fiber-type": "smf",
300                 "SRLG-length": 100000,
301                 "pmd": 0.5}]}}
302         response = test_utils.add_oms_attr_request(
303             "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
304         self.assertEqual(response.status_code, requests.codes.created)
305
306     def test_18_add_omsAttributes_ROADMC_ROADMB(self):
307         # Config ROADMC-ROADMB oms-attributes
308         data = {"span": {
309             "auto-spanloss": "true",
310             "spanloss-base": 11.4,
311             "spanloss-current": 12,
312             "engineered-spanloss": 12.2,
313             "link-concatenation": [{
314                 "SRLG-Id": 0,
315                 "fiber-type": "smf",
316                 "SRLG-length": 100000,
317                 "pmd": 0.5}]}}
318         response = test_utils.add_oms_attr_request(
319             "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
320         self.assertEqual(response.status_code, requests.codes.created)
321
322     def test_19_create_OTS_ROADMA_DEG1(self):
323         response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
324         time.sleep(10)
325         self.assertEqual(response.status_code, requests.codes.ok)
326         res = response.json()
327         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
328                       res["output"]["result"])
329
330     def test_20_create_OTS_ROADMB_DEG1(self):
331         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
332         time.sleep(10)
333         self.assertEqual(response.status_code, requests.codes.ok)
334         res = response.json()
335         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
336                       res["output"]["result"])
337
338     def test_21_create_OTS_ROADMB_DEG2(self):
339         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
340         time.sleep(10)
341         self.assertEqual(response.status_code, requests.codes.ok)
342         res = response.json()
343         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
344                       res["output"]["result"])
345
346     def test_22_create_OTS_ROADMC_DEG2(self):
347         response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
348         time.sleep(10)
349         self.assertEqual(response.status_code, requests.codes.ok)
350         res = response.json()
351         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
352                       res["output"]["result"])
353
354     def test_23_calculate_span_loss_base_all(self):
355         url = "{}/operations/transportpce-olm:calculate-spanloss-base"
356         data = {
357             "input": {
358                 "src-type": "all"
359             }
360         }
361         response = test_utils.post_request(url, data)
362         self.assertEqual(response.status_code, requests.codes.ok)
363         res = response.json()
364         self.assertIn('Success',
365                       res["output"]["result"])
366         self.assertIn({
367             "spanloss": "25.7",
368             "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
369         }, res["output"]["spans"])
370         self.assertIn({
371             "spanloss": "17.6",
372             "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
373         }, res["output"]["spans"])
374         self.assertIn({
375             "spanloss": "23.6",
376             "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
377         }, res["output"]["spans"])
378         self.assertIn({
379             "spanloss": "23.6",
380             "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
381         }, res["output"]["spans"])
382         self.assertIn({
383             "spanloss": "25.7",
384             "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
385         }, res["output"]["spans"])
386         self.assertIn({
387             "spanloss": "17.6",
388             "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
389         }, res["output"]["spans"])
390         time.sleep(5)
391
392     def test_24_check_otn_topology(self):
393         response = test_utils.get_otn_topo_request()
394         self.assertEqual(response.status_code, requests.codes.ok)
395         res = response.json()
396         nbNode = len(res['network'][0]['node'])
397         self.assertEqual(nbNode, 9, 'There should be 9 nodes')
398         self.assertNotIn('ietf-network-topology:link', res['network'][0],
399                          'otn-topology should have no link')
400
401 # test service-create for OCH-OTU4 service from spdrA to spdrB
402     def test_25_create_OCH_OTU4_service_AB(self):
403         response = test_utils.service_create_request(self.cr_serv_sample_data)
404         self.assertEqual(response.status_code, requests.codes.ok)
405         res = response.json()
406         self.assertIn('PCE calculation in progress',
407                       res['output']['configuration-response-common']['response-message'])
408         time.sleep(self.WAITING)
409
410     def test_26_get_OCH_OTU4_service_AB(self):
411         response = test_utils.get_service_list_request(
412             "services/service-OCH-OTU4-AB")
413         self.assertEqual(response.status_code, requests.codes.ok)
414         res = response.json()
415         self.assertEqual(
416             res['services'][0]['administrative-state'], 'inService')
417         self.assertEqual(
418             res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
419         self.assertEqual(
420             res['services'][0]['connection-type'], 'infrastructure')
421         self.assertEqual(
422             res['services'][0]['lifecycle-state'], 'planned')
423         time.sleep(2)
424
425 # Check correct configuration of devices
426     def test_27_check_interface_och_spdra(self):
427         response = test_utils.check_netconf_node_request(
428             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
429         self.assertEqual(response.status_code, requests.codes.ok)
430         res = response.json()
431         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
432                                    'administrative-state': 'inService',
433                                    'supporting-circuit-pack-name': 'CP5-CFP',
434                                    'type': 'org-openroadm-interfaces:opticalChannel',
435                                    'supporting-port': 'CP5-CFP-P1'
436                                    }, **res['interface'][0]),
437                              res['interface'][0])
438
439         self.assertDictEqual(
440             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
441              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
442             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
443
444     def test_28_check_interface_OTU4_spdra(self):
445         response = test_utils.check_netconf_node_request(
446             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
447         self.assertEqual(response.status_code, requests.codes.ok)
448         res = response.json()
449         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
450                         'administrative-state': 'inService',
451                         'supporting-circuit-pack-name': 'CP5-CFP',
452                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
453                         'type': 'org-openroadm-interfaces:otnOtu',
454                         'supporting-port': 'CP5-CFP-P1'
455                         }
456         input_dict_2 = {'tx-sapi': 'exT821pFtOc=',
457                         'expected-dapi': 'exT821pFtOc=',
458                         'rate': 'org-openroadm-otn-common-types:OTU4',
459                         'fec': 'scfec'
460                         }
461         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
462                              res['interface'][0])
463
464         self.assertDictEqual(input_dict_2,
465                              res['interface'][0]
466                              ['org-openroadm-otn-otu-interfaces:otu'])
467
468     def test_29_check_interface_och_spdrB(self):
469         response = test_utils.check_netconf_node_request(
470             "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
471         self.assertEqual(response.status_code, requests.codes.ok)
472         res = response.json()
473         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
474                                    'administrative-state': 'inService',
475                                    'supporting-circuit-pack-name': 'CP5-CFP',
476                                    'type': 'org-openroadm-interfaces:opticalChannel',
477                                    'supporting-port': 'CP5-CFP-P1'
478                                    }, **res['interface'][0]),
479                              res['interface'][0])
480
481         self.assertDictEqual(
482             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
483              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
484             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
485
486     def test_30_check_interface_OTU4_spdrB(self):
487         response = test_utils.check_netconf_node_request(
488             "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
489         self.assertEqual(response.status_code, requests.codes.ok)
490         res = response.json()
491         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
492                         'administrative-state': 'inService',
493                         'supporting-circuit-pack-name': 'CP5-CFP',
494                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
495                         'type': 'org-openroadm-interfaces:otnOtu',
496                         'supporting-port': 'CP5-CFP-P1'
497                         }
498         input_dict_2 = {'tx-dapi': 'exT821pFtOc=',
499                         'expected-sapi': 'exT821pFtOc=',
500                         'tx-sapi': 'HPQZi9Cb3Aw=',
501                         'expected-dapi': 'HPQZi9Cb3Aw=',
502                         'rate': 'org-openroadm-otn-common-types:OTU4',
503                         'fec': 'scfec'
504                         }
505
506         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
507                              res['interface'][0])
508
509         self.assertDictEqual(input_dict_2,
510                              res['interface'][0]
511                              ['org-openroadm-otn-otu-interfaces:otu'])
512
513     def test_31_check_no_interface_ODU4_spdra(self):
514         response = test_utils.check_netconf_node_request(
515             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
516         self.assertEqual(response.status_code, requests.codes.conflict)
517         res = response.json()
518         self.assertIn(
519             {"error-type": "application", "error-tag": "data-missing",
520              "error-message": "Request could not be completed because the relevant data model content does not exist"},
521             res['errors']['error'])
522
523     def test_32_check_openroadm_topo_spdra(self):
524         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
525         self.assertEqual(response.status_code, requests.codes.ok)
526         res = response.json()
527         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
528         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
529         self.assertEqual({'frequency': 196.1,
530                           'width': 40},
531                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
532         self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
533                          ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
534         time.sleep(3)
535
536     def test_33_check_openroadm_topo_ROADMA_SRG(self):
537         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
538         self.assertEqual(response.status_code, requests.codes.ok)
539         res = response.json()
540         freq_map = base64.b64decode(
541             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
542         freq_map_array = [int(x) for x in freq_map]
543         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
544         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
545         for ele in liste_tp:
546             if ele['tp-id'] == 'SRG1-PP3-TXRX':
547                 freq_map = base64.b64decode(
548                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
549                 freq_map_array = [int(x) for x in freq_map]
550                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
551             if ele['tp-id'] == 'SRG1-PP2-TXRX':
552                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
553         time.sleep(3)
554
555     def test_33_check_openroadm_topo_ROADMA_DEG1(self):
556         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
557         self.assertEqual(response.status_code, requests.codes.ok)
558         res = response.json()
559         freq_map = base64.b64decode(
560             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
561         freq_map_array = [int(x) for x in freq_map]
562         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
563         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
564         for ele in liste_tp:
565             if ele['tp-id'] == 'DEG1-CTP-TXRX':
566                 freq_map = base64.b64decode(
567                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
568                 freq_map_array = [int(x) for x in freq_map]
569                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
570             if ele['tp-id'] == 'DEG1-TTP-TXRX':
571                 freq_map = base64.b64decode(
572                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
573                 freq_map_array = [int(x) for x in freq_map]
574                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
575         time.sleep(3)
576
577     def test_34_check_otn_topo_otu4_links(self):
578         response = test_utils.get_otn_topo_request()
579         self.assertEqual(response.status_code, requests.codes.ok)
580         res = response.json()
581         nb_links = len(res['network'][0]['ietf-network-topology:link'])
582         self.assertEqual(nb_links, 2)
583         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
584                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
585         for link in res['network'][0]['ietf-network-topology:link']:
586             self.assertIn(link['link-id'], listLinkId)
587             self.assertEqual(
588                 link['transportpce-topology:otn-link-type'], 'OTU4')
589             self.assertEqual(
590                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
591             self.assertEqual(
592                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
593             self.assertEqual(
594                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
595             self.assertIn(
596                 link['org-openroadm-common-network:opposite-link'], listLinkId)
597
598 # test service-create for OCH-OTU4 service from spdrB to spdrC
599     def test_35_create_OCH_OTU4_service_BC(self):
600         # pylint: disable=line-too-long
601         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
602         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
603         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
604         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
605         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
606         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
607         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
608         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
609         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
610         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
611         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
612         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
613         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
614
615         response = test_utils.service_create_request(self.cr_serv_sample_data)
616         self.assertEqual(response.status_code, requests.codes.ok)
617         res = response.json()
618         self.assertIn('PCE calculation in progress',
619                       res['output']['configuration-response-common']['response-message'])
620         time.sleep(self.WAITING)
621
622     def test_36_get_OCH_OTU4_service_BC(self):
623         response = test_utils.get_service_list_request(
624             "services/service-OCH-OTU4-BC")
625         self.assertEqual(response.status_code, requests.codes.ok)
626         res = response.json()
627         self.assertEqual(
628             res['services'][0]['administrative-state'], 'inService')
629         self.assertEqual(
630             res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
631         self.assertEqual(
632             res['services'][0]['connection-type'], 'infrastructure')
633         self.assertEqual(
634             res['services'][0]['lifecycle-state'], 'planned')
635         time.sleep(2)
636
637 # Check correct configuration of devices
638     def test_37_check_interface_och_spdrB(self):
639         response = test_utils.check_netconf_node_request(
640             "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
641         self.assertEqual(response.status_code, requests.codes.ok)
642         res = response.json()
643         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
644                                    'administrative-state': 'inService',
645                                    'supporting-circuit-pack-name': 'CP6-CFP',
646                                    'type': 'org-openroadm-interfaces:opticalChannel',
647                                    'supporting-port': 'CP1-CFP0-P1'
648                                    }, **res['interface'][0]),
649                              res['interface'][0])
650
651         self.assertDictEqual(
652             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
653              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
654             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
655
656     def test_38_check_interface_OTU4_spdrB(self):
657         response = test_utils.check_netconf_node_request(
658             "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
659         self.assertEqual(response.status_code, requests.codes.ok)
660         res = response.json()
661         input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
662                         'administrative-state': 'inService',
663                         'supporting-circuit-pack-name': 'CP6-CFP',
664                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
665                         'type': 'org-openroadm-interfaces:otnOtu',
666                         'supporting-port': 'CP6-CFP-P1'
667                         }
668         input_dict_2 = {'tx-sapi': 'HPQZi9Cb3A8=',
669                         'expected-dapi': 'HPQZi9Cb3A8=',
670                         'rate': 'org-openroadm-otn-common-types:OTU4',
671                         'fec': 'scfec'
672                         }
673         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
674                              res['interface'][0])
675
676         self.assertDictEqual(input_dict_2,
677                              res['interface'][0]
678                              ['org-openroadm-otn-otu-interfaces:otu'])
679
680     def test_39_check_interface_och_spdrC(self):
681         response = test_utils.check_netconf_node_request(
682             "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
683         self.assertEqual(response.status_code, requests.codes.ok)
684         res = response.json()
685         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
686                                    'administrative-state': 'inService',
687                                    'supporting-circuit-pack-name': 'CP5-CFP',
688                                    'type': 'org-openroadm-interfaces:opticalChannel',
689                                    'supporting-port': 'CP5-CFP-P1'
690                                    }, **res['interface'][0]),
691                              res['interface'][0])
692
693         self.assertDictEqual(
694             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
695              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
696             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
697
698     def test_40_check_interface_OTU4_spdrC(self):
699         response = test_utils.check_netconf_node_request(
700             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
701         self.assertEqual(response.status_code, requests.codes.ok)
702         res = response.json()
703         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
704                         'administrative-state': 'inService',
705                         'supporting-circuit-pack-name': 'CP5-CFP',
706                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
707                         'type': 'org-openroadm-interfaces:otnOtu',
708                         'supporting-port': 'CP5-CFP-P1'
709                         }
710         input_dict_2 = {'tx-dapi': 'HPQZi9Cb3A8=',
711                         'expected-sapi': 'HPQZi9Cb3A8=',
712                         'tx-sapi': 'ALx70DYYfGTx',
713                         'expected-dapi': 'ALx70DYYfGTx',
714                         'rate': 'org-openroadm-otn-common-types:OTU4',
715                         'fec': 'scfec'
716                         }
717
718         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
719                              res['interface'][0])
720
721         self.assertDictEqual(input_dict_2,
722                              res['interface'][0]
723                              ['org-openroadm-otn-otu-interfaces:otu'])
724
725     def test_41_check_no_interface_ODU4_spdrB(self):
726         response = test_utils.check_netconf_node_request(
727             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
728         self.assertEqual(response.status_code, requests.codes.conflict)
729         res = response.json()
730         self.assertIn(
731             {"error-type": "application", "error-tag": "data-missing",
732              "error-message": "Request could not be completed because the relevant data model content does not exist"},
733             res['errors']['error'])
734
735     def test_42_check_openroadm_topo_spdrB(self):
736         response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
737         self.assertEqual(response.status_code, requests.codes.ok)
738         res = response.json()
739         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
740         for ele in liste_tp:
741             if ele['tp-id'] == 'XPDR2-NETWORK1':
742                 self.assertEqual({'frequency': 196.1,
743                                   'width': 40},
744                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
745                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
746                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
747             elif ele['tp-id'] == 'XPDR2-NETWORK2':
748                 self.assertEqual({'frequency': 196.05,
749                                   'width': 40},
750                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
751                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
752                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
753             else:
754                 print("ele = {}".format(ele))
755                 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
756         time.sleep(3)
757
758     def test_43_check_openroadm_topo_ROADMB_SRG1(self):
759         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
760         self.assertEqual(response.status_code, requests.codes.ok)
761         res = response.json()
762         freq_map = base64.b64decode(
763             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
764         freq_map_array = [int(x) for x in freq_map]
765         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
766         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
767         self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
768         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
769         for ele in liste_tp:
770             if ele['tp-id'] == 'SRG1-PP1-TXRX':
771                 freq_map = base64.b64decode(
772                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
773                 freq_map_array = [int(x) for x in freq_map]
774                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
775             if ele['tp-id'] == 'SRG1-PP2-TXRX':
776                 freq_map = base64.b64decode(
777                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
778                 freq_map_array = [int(x) for x in freq_map]
779                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
780             if ele['tp-id'] == 'SRG1-PP3-TXRX':
781                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
782         time.sleep(3)
783
784     def test_44_check_openroadm_topo_ROADMB_DEG2(self):
785         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
786         self.assertEqual(response.status_code, requests.codes.ok)
787         res = response.json()
788         freq_map = base64.b64decode(
789             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
790         freq_map_array = [int(x) for x in freq_map]
791         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
792         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
793         for ele in liste_tp:
794             if ele['tp-id'] == 'DEG2-CTP-TXRX':
795                 freq_map = base64.b64decode(
796                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
797                 freq_map_array = [int(x) for x in freq_map]
798                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
799             if ele['tp-id'] == 'DEG2-TTP-TXRX':
800                 freq_map = base64.b64decode(
801                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
802                 freq_map_array = [int(x) for x in freq_map]
803                 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
804         time.sleep(3)
805
806     def test_45_check_otn_topo_otu4_links(self):
807         response = test_utils.get_otn_topo_request()
808         self.assertEqual(response.status_code, requests.codes.ok)
809         res = response.json()
810         nb_links = len(res['network'][0]['ietf-network-topology:link'])
811         self.assertEqual(nb_links, 4)
812         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
813                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
814                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
815                       'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
816         for link in res['network'][0]['ietf-network-topology:link']:
817             self.assertIn(link['link-id'], listLinkId)
818             self.assertEqual(
819                 link['transportpce-topology:otn-link-type'], 'OTU4')
820             self.assertEqual(
821                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
822             self.assertEqual(
823                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
824             self.assertEqual(
825                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
826             self.assertIn(
827                 link['org-openroadm-common-network:opposite-link'], listLinkId)
828
829 # test service-create for 100GE service from spdrA to spdrC via spdrB
830     def test_46_create_100GE_service_ABC(self):
831         # pylint: disable=line-too-long
832         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
833         self.cr_serv_sample_data["input"]["connection-type"] = "service"
834         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
835         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
836         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
837         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
838         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
839         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
840         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
841         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
842         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
843         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
844         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
845         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
846         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
847         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
848         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
849         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
850
851         response = test_utils.service_create_request(self.cr_serv_sample_data)
852         self.assertEqual(response.status_code, requests.codes.ok)
853         res = response.json()
854         self.assertIn('PCE calculation in progress',
855                       res['output']['configuration-response-common']['response-message'])
856         time.sleep(self.WAITING)
857
858     def test_47_get_100GE_service_ABC(self):
859         response = test_utils.get_service_list_request(
860             "services/service-100GE-ABC")
861         self.assertEqual(response.status_code, requests.codes.ok)
862         res = response.json()
863         self.assertEqual(
864             res['services'][0]['administrative-state'], 'inService')
865         self.assertEqual(
866             res['services'][0]['service-name'], 'service-100GE-ABC')
867         self.assertEqual(
868             res['services'][0]['connection-type'], 'service')
869         self.assertEqual(
870             res['services'][0]['lifecycle-state'], 'planned')
871         time.sleep(2)
872
873     def test_48_check_interface_100GE_CLIENT_spdra(self):
874         response = test_utils.check_netconf_node_request(
875             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
876         self.assertEqual(response.status_code, requests.codes.ok)
877         res = response.json()
878         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
879                       'administrative-state': 'inService',
880                       'supporting-circuit-pack-name': 'CP2-QSFP1',
881                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
882                       'supporting-port': 'CP2-QSFP1-P1'
883                       }
884         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
885                              res['interface'][0])
886         self.assertDictEqual(
887             {'speed': 100000,
888              'fec': 'off'},
889             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
890
891     def test_49_check_interface_ODU4_CLIENT_spdra(self):
892         response = test_utils.check_netconf_node_request(
893             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
894         self.assertEqual(response.status_code, requests.codes.ok)
895         res = response.json()
896         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
897                         'administrative-state': 'inService',
898                         'supporting-circuit-pack-name': 'CP2-QSFP1',
899                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
900                         'type': 'org-openroadm-interfaces:otnOdu',
901                         'supporting-port': 'CP2-QSFP1-P1'}
902         input_dict_2 = {
903             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
904             'rate': 'org-openroadm-otn-common-types:ODU4',
905             'monitoring-mode': 'terminated'}
906
907         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
908                              res['interface'][0])
909         self.assertDictEqual(dict(input_dict_2,
910                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
911                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
912         self.assertDictEqual(
913             {'payload-type': '07', 'exp-payload-type': '07'},
914             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
915
916     def test_50_check_interface_ODU4_NETWORK_spdra(self):
917         response = test_utils.check_netconf_node_request(
918             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
919         self.assertEqual(response.status_code, requests.codes.ok)
920         res = response.json()
921         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
922                         'administrative-state': 'inService',
923                         'supporting-circuit-pack-name': 'CP5-CFP',
924                         'type': 'org-openroadm-interfaces:otnOdu',
925                         'supporting-port': 'CP5-CFP-P1'}
926         input_dict_2 = {
927             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
928             'rate': 'org-openroadm-otn-common-types:ODU4',
929             'monitoring-mode': 'monitored'}
930
931         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
932                              res['interface'][0])
933         self.assertDictEqual(dict(input_dict_2,
934                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
935                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
936         self.assertDictEqual(
937             {'payload-type': '07', 'exp-payload-type': '07'},
938             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
939
940     def test_51_check_ODU4_connection_spdra(self):
941         response = test_utils.check_netconf_node_request(
942             "SPDR-SA1",
943             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
944         self.assertEqual(response.status_code, requests.codes.ok)
945         res = response.json()
946         input_dict_1 = {
947             'connection-name':
948             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
949             'direction': 'bidirectional'
950         }
951
952         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
953                              res['odu-connection'][0])
954         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
955                              res['odu-connection'][0]['destination'])
956         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
957                              res['odu-connection'][0]['source'])
958
959     def test_52_check_interface_100GE_CLIENT_spdrc(self):
960         response = test_utils.check_netconf_node_request(
961             "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
962         self.assertEqual(response.status_code, requests.codes.ok)
963         res = response.json()
964         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
965                       'administrative-state': 'inService',
966                       'supporting-circuit-pack-name': 'CP2-QSFP1',
967                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
968                       'supporting-port': 'CP2-QSFP1-P1'
969                       }
970         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
971                              res['interface'][0])
972         self.assertDictEqual(
973             {'speed': 100000,
974              'fec': 'off'},
975             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
976
977     def test_53_check_interface_ODU4_CLIENT_spdrc(self):
978         response = test_utils.check_netconf_node_request(
979             "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
980         self.assertEqual(response.status_code, requests.codes.ok)
981         res = response.json()
982         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
983                         'administrative-state': 'inService',
984                         'supporting-circuit-pack-name': 'CP2-QSFP1',
985                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
986                         'type': 'org-openroadm-interfaces:otnOdu',
987                         'supporting-port': 'CP2-QSFP1-P1'}
988         input_dict_2 = {
989             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
990             'rate': 'org-openroadm-otn-common-types:ODU4',
991             'monitoring-mode': 'terminated'}
992
993         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
994                              res['interface'][0])
995         self.assertDictEqual(dict(input_dict_2,
996                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
997                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
998         self.assertDictEqual(
999             {'payload-type': '07', 'exp-payload-type': '07'},
1000             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1001
1002     def test_54_check_interface_ODU4_NETWORK_spdrc(self):
1003         response = test_utils.check_netconf_node_request(
1004             "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
1005         self.assertEqual(response.status_code, requests.codes.ok)
1006         res = response.json()
1007         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1008                         'administrative-state': 'inService',
1009                         'supporting-circuit-pack-name': 'CP5-CFP',
1010                         'type': 'org-openroadm-interfaces:otnOdu',
1011                         'supporting-port': 'CP5-CFP-P1'}
1012         input_dict_2 = {
1013             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1014             'rate': 'org-openroadm-otn-common-types:ODU4',
1015             'monitoring-mode': 'monitored'}
1016
1017         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1018                              res['interface'][0])
1019         self.assertDictEqual(dict(input_dict_2,
1020                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1021                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1022         self.assertDictEqual(
1023             {'payload-type': '07', 'exp-payload-type': '07'},
1024             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1025
1026     def test_55_check_ODU4_connection_spdrc(self):
1027         response = test_utils.check_netconf_node_request(
1028             "SPDR-SC1",
1029             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1030         self.assertEqual(response.status_code, requests.codes.ok)
1031         res = response.json()
1032         input_dict_1 = {
1033             'connection-name':
1034             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1035             'direction': 'bidirectional'
1036         }
1037
1038         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1039                              res['odu-connection'][0])
1040         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1041                              res['odu-connection'][0]['destination'])
1042         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1043                              res['odu-connection'][0]['source'])
1044
1045     def test_56_check_interface_ODU4_NETWORK1_spdrb(self):
1046         response = test_utils.check_netconf_node_request(
1047             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1048         self.assertEqual(response.status_code, requests.codes.ok)
1049         res = response.json()
1050         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1051                         'administrative-state': 'inService',
1052                         'supporting-circuit-pack-name': 'CP5-CFP',
1053                         'type': 'org-openroadm-interfaces:otnOdu',
1054                         'supporting-port': 'CP5-CFP-P1'}
1055         input_dict_2 = {
1056             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1057             'rate': 'org-openroadm-otn-common-types:ODU4',
1058             'monitoring-mode': 'monitored'}
1059
1060         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1061                              res['interface'][0])
1062         self.assertDictEqual(dict(input_dict_2,
1063                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1064                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1065         self.assertDictEqual(
1066             {'payload-type': '07', 'exp-payload-type': '07'},
1067             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1068
1069     def test_57_check_interface_ODU4_NETWORK2_spdrb(self):
1070         response = test_utils.check_netconf_node_request(
1071             "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1072         self.assertEqual(response.status_code, requests.codes.ok)
1073         res = response.json()
1074         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1075                         'administrative-state': 'inService',
1076                         'supporting-circuit-pack-name': 'CP6-CFP',
1077                         'type': 'org-openroadm-interfaces:otnOdu',
1078                         'supporting-port': 'CP6-CFP-P1'}
1079         input_dict_2 = {
1080             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1081             'rate': 'org-openroadm-otn-common-types:ODU4',
1082             'monitoring-mode': 'monitored'}
1083
1084         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1085                              res['interface'][0])
1086         self.assertDictEqual(dict(input_dict_2,
1087                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1088                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1089         self.assertDictEqual(
1090             {'payload-type': '07', 'exp-payload-type': '07'},
1091             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1092
1093     def test_58_check_ODU4_connection_spdrb(self):
1094         response = test_utils.check_netconf_node_request(
1095             "SPDR-SB1",
1096             "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1097         self.assertEqual(response.status_code, requests.codes.ok)
1098         res = response.json()
1099         input_dict_1 = {
1100             'connection-name':
1101             'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1102             'direction': 'bidirectional'
1103         }
1104
1105         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1106                              res['odu-connection'][0])
1107         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1108                              res['odu-connection'][0]['destination'])
1109         self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1110                              res['odu-connection'][0]['source'])
1111
1112     def test_59_check_otn_topo_links(self):
1113         response = test_utils.get_otn_topo_request()
1114         self.assertEqual(response.status_code, requests.codes.ok)
1115         res = response.json()
1116         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1117         self.assertEqual(nb_links, 4)
1118         for link in res['network'][0]['ietf-network-topology:link']:
1119             self.assertEqual(
1120                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1121             self.assertEqual(
1122                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1123
1124     def test_60_delete_service_100GE_ABC(self):
1125         response = test_utils.service_delete_request("service-100GE-ABC")
1126         self.assertEqual(response.status_code, requests.codes.ok)
1127         res = response.json()
1128         self.assertIn('Renderer service delete in progress',
1129                       res['output']['configuration-response-common']['response-message'])
1130         time.sleep(self.WAITING)
1131
1132     def test_61_check_service_list(self):
1133         response = test_utils.get_service_list_request("")
1134         self.assertEqual(response.status_code, requests.codes.ok)
1135         res = response.json()
1136         self.assertEqual(len(res['service-list']['services']), 2)
1137         time.sleep(2)
1138
1139     def test_62_check_no_ODU4_connection_spdra(self):
1140         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1141         self.assertEqual(response.status_code, requests.codes.ok)
1142         res = response.json()
1143         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1144         time.sleep(1)
1145
1146     def test_63_check_no_interface_ODU4_NETWORK_spdra(self):
1147         response = test_utils.check_netconf_node_request(
1148             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1149         self.assertEqual(response.status_code, requests.codes.conflict)
1150
1151     def test_64_check_no_interface_ODU4_CLIENT_spdra(self):
1152         response = test_utils.check_netconf_node_request(
1153             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1154         self.assertEqual(response.status_code, requests.codes.conflict)
1155
1156     def test_65_check_no_interface_100GE_CLIENT_spdra(self):
1157         response = test_utils.check_netconf_node_request(
1158             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1159         self.assertEqual(response.status_code, requests.codes.conflict)
1160
1161     def test_66_check_otn_topo_links(self):
1162         self.test_45_check_otn_topo_otu4_links()
1163
1164     def test_67_delete_OCH_OTU4_service_AB(self):
1165         response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1166         self.assertEqual(response.status_code, requests.codes.ok)
1167         res = response.json()
1168         self.assertIn('Renderer service delete in progress',
1169                       res['output']['configuration-response-common']['response-message'])
1170         time.sleep(self.WAITING)
1171
1172     def test_68_delete_OCH_OTU4_service_BC(self):
1173         response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1174         self.assertEqual(response.status_code, requests.codes.ok)
1175         res = response.json()
1176         self.assertIn('Renderer service delete in progress',
1177                       res['output']['configuration-response-common']['response-message'])
1178         time.sleep(self.WAITING)
1179
1180     def test_69_get_no_service(self):
1181         response = test_utils.get_service_list_request("")
1182         self.assertEqual(response.status_code, requests.codes.conflict)
1183         res = response.json()
1184         self.assertIn(
1185             {"error-type": "application", "error-tag": "data-missing",
1186              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1187             res['errors']['error'])
1188         time.sleep(1)
1189
1190     def test_70_check_no_interface_OTU4_spdra(self):
1191         response = test_utils.check_netconf_node_request(
1192             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1193         self.assertEqual(response.status_code, requests.codes.conflict)
1194
1195     def test_71_check_no_interface_OCH_spdra(self):
1196         response = test_utils.check_netconf_node_request(
1197             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1198         self.assertEqual(response.status_code, requests.codes.conflict)
1199
1200     def test_72_getLinks_OtnTopology(self):
1201         response = test_utils.get_otn_topo_request()
1202         self.assertEqual(response.status_code, requests.codes.ok)
1203         res = response.json()
1204         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1205
1206     def test_73_check_openroadm_topo_spdra(self):
1207         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1208         self.assertEqual(response.status_code, requests.codes.ok)
1209         res = response.json()
1210         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1211         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1212         self.assertNotIn('wavelength', dict.keys(
1213             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1214         time.sleep(3)
1215
1216     def test_74_check_openroadm_topo_ROADMB_SRG1(self):
1217         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1218         self.assertEqual(response.status_code, requests.codes.ok)
1219         res = response.json()
1220         freq_map = base64.b64decode(
1221             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1222         freq_map_array = [int(x) for x in freq_map]
1223         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1224         self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1225         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1226         for ele in liste_tp:
1227             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1228                 freq_map = base64.b64decode(
1229                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1230                 freq_map_array = [int(x) for x in freq_map]
1231                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1232             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1233                 freq_map = base64.b64decode(
1234                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1235                 freq_map_array = [int(x) for x in freq_map]
1236                 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1237         time.sleep(3)
1238
1239     def test_75_check_openroadm_topo_ROADMB_DEG1(self):
1240         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1241         self.assertEqual(response.status_code, requests.codes.ok)
1242         res = response.json()
1243         freq_map = base64.b64decode(
1244             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1245         freq_map_array = [int(x) for x in freq_map]
1246         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1247         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1248         for ele in liste_tp:
1249             if ele['tp-id'] == 'DEG1-CTP-TXRX':
1250                 freq_map = base64.b64decode(
1251                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1252                 freq_map_array = [int(x) for x in freq_map]
1253                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1254             if ele['tp-id'] == 'DEG1-TTP-TXRX':
1255                 freq_map = base64.b64decode(
1256                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1257                 freq_map_array = [int(x) for x in freq_map]
1258                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1259         time.sleep(3)
1260
1261     def test_76_check_openroadm_topo_ROADMB_DEG2(self):
1262         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1263         self.assertEqual(response.status_code, requests.codes.ok)
1264         res = response.json()
1265         freq_map = base64.b64decode(
1266             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1267         freq_map_array = [int(x) for x in freq_map]
1268         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1269         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1270         for ele in liste_tp:
1271             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1272                 freq_map = base64.b64decode(
1273                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1274                 freq_map_array = [int(x) for x in freq_map]
1275                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1276             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1277                 freq_map = base64.b64decode(
1278                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1279                 freq_map_array = [int(x) for x in freq_map]
1280                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1281         time.sleep(3)
1282
1283     def test_77_disconnect_xponders_from_roadm(self):
1284         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1285         response = test_utils.get_ordm_topo_request("")
1286         self.assertEqual(response.status_code, requests.codes.ok)
1287         res = response.json()
1288         links = res['network'][0]['ietf-network-topology:link']
1289         for link in links:
1290             if ((link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1291                     link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT")
1292                     and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1293                 link_name = link["link-id"]
1294                 response = test_utils.delete_request(url+link_name)
1295                 self.assertEqual(response.status_code, requests.codes.ok)
1296
1297     def test_78_disconnect_spdrB(self):
1298         response = test_utils.unmount_device("SPDR-SB1")
1299         self.assertEqual(response.status_code, requests.codes.ok,
1300                          test_utils.CODE_SHOULD_BE_200)
1301
1302     def test_79_disconnect_roadmB(self):
1303         response = test_utils.unmount_device("ROADM-B1")
1304         self.assertEqual(response.status_code, requests.codes.ok,
1305                          test_utils.CODE_SHOULD_BE_200)
1306
1307     def test_80_remove_roadm_to_roadm_links(self):
1308         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1309         response = test_utils.get_ordm_topo_request("")
1310         self.assertEqual(response.status_code, requests.codes.ok)
1311         res = response.json()
1312         links = res['network'][0]['ietf-network-topology:link']
1313         for link in links:
1314             if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1315                     and 'ROADM-B1' in link['link-id']):
1316                 link_name = link["link-id"]
1317                 response = test_utils.delete_request(url+link_name)
1318                 self.assertEqual(response.status_code, requests.codes.ok)
1319
1320     def test_81_add_omsAttributes_ROADMA_ROADMC(self):
1321         # Config ROADMA-ROADMC oms-attributes
1322         data = {"span": {
1323             "auto-spanloss": "true",
1324             "spanloss-base": 11.4,
1325             "spanloss-current": 12,
1326             "engineered-spanloss": 12.2,
1327             "link-concatenation": [{
1328                 "SRLG-Id": 0,
1329                 "fiber-type": "smf",
1330                 "SRLG-length": 100000,
1331                 "pmd": 0.5}]}}
1332         response = test_utils.add_oms_attr_request(
1333             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1334         self.assertEqual(response.status_code, requests.codes.created)
1335
1336     def test_82_add_omsAttributes_ROADMC_ROADMA(self):
1337         # Config ROADMC-ROADMA oms-attributes
1338         data = {"span": {
1339             "auto-spanloss": "true",
1340             "spanloss-base": 11.4,
1341             "spanloss-current": 12,
1342             "engineered-spanloss": 12.2,
1343             "link-concatenation": [{
1344                 "SRLG-Id": 0,
1345                 "fiber-type": "smf",
1346                 "SRLG-length": 100000,
1347                 "pmd": 0.5}]}}
1348         response = test_utils.add_oms_attr_request(
1349             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1350         self.assertEqual(response.status_code, requests.codes.created)
1351
1352     def test_83_create_OCH_OTU4_service_AC(self):
1353         # pylint: disable=line-too-long
1354         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1355         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1356         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1357         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1358         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1359         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1360         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1361         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1362         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1363         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1364         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1365         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1366         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1367         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1368         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1369         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1370         response = test_utils.service_create_request(self.cr_serv_sample_data)
1371         self.assertEqual(response.status_code, requests.codes.ok)
1372         res = response.json()
1373         self.assertIn('PCE calculation in progress',
1374                       res['output']['configuration-response-common']['response-message'])
1375         time.sleep(self.WAITING)
1376
1377     def test_84_get_OCH_OTU4_service_AC(self):
1378         response = test_utils.get_service_list_request(
1379             "services/service-OCH-OTU4-AC")
1380         self.assertEqual(response.status_code, requests.codes.ok)
1381         res = response.json()
1382         self.assertEqual(
1383             res['services'][0]['administrative-state'], 'inService')
1384         self.assertEqual(
1385             res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1386         self.assertEqual(
1387             res['services'][0]['connection-type'], 'infrastructure')
1388         self.assertEqual(
1389             res['services'][0]['lifecycle-state'], 'planned')
1390         time.sleep(2)
1391
1392 # test service-create for 100GE service from spdrA to spdrC via spdrB
1393     def test_85_create_100GE_service_AC(self):
1394         # pylint: disable=line-too-long
1395         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1396         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1397         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1398         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1399         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1400         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1401         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1402         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1403         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1404         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1405         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1406         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1407         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1408         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1409         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1410         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1411         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1412         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1413
1414         response = test_utils.service_create_request(self.cr_serv_sample_data)
1415         self.assertEqual(response.status_code, requests.codes.ok)
1416         res = response.json()
1417         self.assertIn('PCE calculation in progress',
1418                       res['output']['configuration-response-common']['response-message'])
1419         time.sleep(self.WAITING)
1420
1421     def test_86_get_100GE_service_AC(self):
1422         response = test_utils.get_service_list_request("services/service-100GE-AC")
1423         self.assertEqual(response.status_code, requests.codes.ok)
1424         res = response.json()
1425         self.assertEqual(
1426             res['services'][0]['administrative-state'], 'inService')
1427         self.assertEqual(
1428             res['services'][0]['service-name'], 'service-100GE-AC')
1429         self.assertEqual(
1430             res['services'][0]['connection-type'], 'service')
1431         self.assertEqual(
1432             res['services'][0]['lifecycle-state'], 'planned')
1433         time.sleep(2)
1434
1435     def test_87_check_configuration_spdra(self):
1436         self.test_48_check_interface_100GE_CLIENT_spdra()
1437         self.test_49_check_interface_ODU4_CLIENT_spdra()
1438         self.test_50_check_interface_ODU4_NETWORK_spdra()
1439         self.test_51_check_ODU4_connection_spdra()
1440
1441     def test_88_check_configuration_spdrc(self):
1442         self.test_52_check_interface_100GE_CLIENT_spdrc()
1443         self.test_53_check_interface_ODU4_CLIENT_spdrc()
1444         self.test_54_check_interface_ODU4_NETWORK_spdrc()
1445         self.test_55_check_ODU4_connection_spdrc()
1446
1447     def test_89_check_otn_topo_links(self):
1448         response = test_utils.get_otn_topo_request()
1449         self.assertEqual(response.status_code, requests.codes.ok)
1450         res = response.json()
1451         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1452         self.assertEqual(nb_links, 2)
1453         for link in res['network'][0]['ietf-network-topology:link']:
1454             self.assertEqual(
1455                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1456             self.assertEqual(
1457                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1458
1459     def test_90_delete_100GE_service_AC(self):
1460         response = test_utils.service_delete_request("service-100GE-AC")
1461         self.assertEqual(response.status_code, requests.codes.ok)
1462         res = response.json()
1463         self.assertIn('Renderer service delete in progress',
1464                       res['output']['configuration-response-common']['response-message'])
1465         time.sleep(self.WAITING)
1466
1467     def test_91_check_service_list(self):
1468         response = test_utils.get_service_list_request("")
1469         self.assertEqual(response.status_code, requests.codes.ok)
1470         res = response.json()
1471         self.assertEqual(len(res['service-list']['services']), 1)
1472         time.sleep(2)
1473
1474     def test_92_check_configuration_spdra(self):
1475         self.test_62_check_no_ODU4_connection_spdra()
1476         self.test_63_check_no_interface_ODU4_NETWORK_spdra()
1477         self.test_64_check_no_interface_ODU4_CLIENT_spdra()
1478         self.test_65_check_no_interface_100GE_CLIENT_spdra()
1479
1480     def test_93_check_otn_topo_links(self):
1481         response = test_utils.get_otn_topo_request()
1482         self.assertEqual(response.status_code, requests.codes.ok)
1483         res = response.json()
1484         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1485         self.assertEqual(nb_links, 2)
1486         for link in res['network'][0]['ietf-network-topology:link']:
1487             self.assertEqual(
1488                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1489             self.assertEqual(
1490                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1491
1492     def test_94_disconnect_xponders_from_roadm(self):
1493         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1494         response = test_utils.get_ordm_topo_request("")
1495         self.assertEqual(response.status_code, requests.codes.ok)
1496         res = response.json()
1497         links = res['network'][0]['ietf-network-topology:link']
1498         for link in links:
1499             if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1500                     link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1501                 link_name = link["link-id"]
1502                 response = test_utils.delete_request(url+link_name)
1503                 self.assertEqual(response.status_code, requests.codes.ok)
1504
1505     def test_95_disconnect_spdrA(self):
1506         response = test_utils.unmount_device("SPDR-SA1")
1507         self.assertEqual(response.status_code, requests.codes.ok,
1508                          test_utils.CODE_SHOULD_BE_200)
1509
1510     def test_96_disconnect_spdrC(self):
1511         response = test_utils.unmount_device("SPDR-SC1")
1512         self.assertEqual(response.status_code, requests.codes.ok,
1513                          test_utils.CODE_SHOULD_BE_200)
1514
1515     def test_97_disconnect_roadmA(self):
1516         response = test_utils.unmount_device("ROADM-A1")
1517         self.assertEqual(response.status_code, requests.codes.ok,
1518                          test_utils.CODE_SHOULD_BE_200)
1519
1520     def test_98_disconnect_roadmC(self):
1521         response = test_utils.unmount_device("ROADM-C1")
1522         self.assertEqual(response.status_code, requests.codes.ok,
1523                          test_utils.CODE_SHOULD_BE_200)
1524
1525
1526 if __name__ == "__main__":
1527     unittest.main(verbosity=2)