Manage ODU4 services over multiple OTU4
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test14_otn_switch_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_sample_data = {"input": {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service-OCH-OTU4-AB",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "subrate-eth-sla": {
50                     "subrate-eth-sla": {
51                         "committed-info-rate": "100000",
52                         "committed-burst-size": "64"
53                     }
54             },
55             "tx-direction": {
56                 "port": {
57                     "port-device-name": "SPDR-SA1-XPDR2",
58                     "port-type": "fixed",
59                     "port-name": "XPDR2-NETWORK1",
60                     "port-rack": "000000.00",
61                     "port-shelf": "Chassis#1"
62                 },
63                 "lgx": {
64                     "lgx-device-name": "Some lgx-device-name",
65                     "lgx-port-name": "Some lgx-port-name",
66                     "lgx-port-rack": "000000.00",
67                     "lgx-port-shelf": "00"
68                 }
69             },
70             "rx-direction": {
71                 "port": {
72                     "port-device-name": "SPDR-SA1-XPDR2",
73                     "port-type": "fixed",
74                     "port-name": "XPDR2-NETWORK1",
75                     "port-rack": "000000.00",
76                     "port-shelf": "Chassis#1"
77                 },
78                 "lgx": {
79                     "lgx-device-name": "Some lgx-device-name",
80                     "lgx-port-name": "Some lgx-port-name",
81                     "lgx-port-rack": "000000.00",
82                     "lgx-port-shelf": "00"
83                 }
84             },
85             "optic-type": "gray"
86         },
87         "service-z-end": {
88             "service-rate": "100",
89             "node-id": "SPDR-SB1",
90             "service-format": "OTU",
91             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92             "clli": "NodeSB",
93             "subrate-eth-sla": {
94                     "subrate-eth-sla": {
95                         "committed-info-rate": "100000",
96                         "committed-burst-size": "64"
97                     }
98             },
99             "tx-direction": {
100                 "port": {
101                     "port-device-name": "SPDR-SB1-XPDR2",
102                     "port-type": "fixed",
103                     "port-name": "XPDR2-NETWORK1",
104                     "port-rack": "000000.00",
105                     "port-shelf": "Chassis#1"
106                 },
107                 "lgx": {
108                     "lgx-device-name": "Some lgx-device-name",
109                     "lgx-port-name": "Some lgx-port-name",
110                     "lgx-port-rack": "000000.00",
111                     "lgx-port-shelf": "00"
112                 }
113             },
114             "rx-direction": {
115                 "port": {
116                     "port-device-name": "SPDR-SB1-XPDR2",
117                     "port-type": "fixed",
118                     "port-name": "XPDR2-NETWORK1",
119                     "port-rack": "000000.00",
120                     "port-shelf": "Chassis#1"
121                 },
122                 "lgx": {
123                     "lgx-device-name": "Some lgx-device-name",
124                     "lgx-port-name": "Some lgx-port-name",
125                     "lgx-port-rack": "000000.00",
126                     "lgx-port-shelf": "00"
127                 }
128             },
129             "optic-type": "gray"
130         },
131         "due-date": "2018-06-15T00:00:01Z",
132         "operator-contact": "pw1234"
133     }
134     }
135
136     @classmethod
137     def setUpClass(cls):
138         cls.processes = test_utils.start_tpce()
139         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140                                                ('spdrb', cls.NODE_VERSION),
141                                                ('spdrc', cls.NODE_VERSION),
142                                                ('roadma', cls.NODE_VERSION),
143                                                ('roadmb', cls.NODE_VERSION),
144                                                ('roadmc', cls.NODE_VERSION)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(5)
155
156     def test_01_connect_spdrA(self):
157         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_spdrB(self):
162         response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_spdrC(self):
167         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmA(self):
172         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_rdmB(self):
177         response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
178         self.assertEqual(response.status_code,
179                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
180
181     def test_06_connect_rdmC(self):
182         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
183         self.assertEqual(response.status_code,
184                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
185
186     def test_07_connect_sprdA_2_N1_to_roadmA_PP3(self):
187         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
188                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         self.assertIn('Xponder Roadm Link created successfully',
192                       res["output"]["result"])
193         time.sleep(2)
194
195     def test_08_connect_roadmA_PP3_to_spdrA_2_N1(self):
196         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
197                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
198         self.assertEqual(response.status_code, requests.codes.ok)
199         res = response.json()
200         self.assertIn('Roadm Xponder links created successfully',
201                       res["output"]["result"])
202         time.sleep(2)
203
204     def test_09_connect_sprdC_2_N1_to_roadmC_PP3(self):
205         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
206                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
207         self.assertEqual(response.status_code, requests.codes.ok)
208         res = response.json()
209         self.assertIn('Xponder Roadm Link created successfully',
210                       res["output"]["result"])
211         time.sleep(2)
212
213     def test_10_connect_roadmC_PP3_to_spdrC_2_N1(self):
214         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
215                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
216         self.assertEqual(response.status_code, requests.codes.ok)
217         res = response.json()
218         self.assertIn('Roadm Xponder links created successfully',
219                       res["output"]["result"])
220         time.sleep(2)
221
222     def test_11_connect_sprdB_2_N1_to_roadmB_PP1(self):
223         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
224                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
225         self.assertEqual(response.status_code, requests.codes.ok)
226         res = response.json()
227         self.assertIn('Xponder Roadm Link created successfully',
228                       res["output"]["result"])
229         time.sleep(2)
230
231     def test_12_connect_roadmB_PP1_to_spdrB_2_N1(self):
232         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
233                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
234         self.assertEqual(response.status_code, requests.codes.ok)
235         res = response.json()
236         self.assertIn('Roadm Xponder links created successfully',
237                       res["output"]["result"])
238         time.sleep(2)
239
240     def test_13_connect_sprdB_2_N2_to_roadmB_PP2(self):
241         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
242                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
243         self.assertEqual(response.status_code, requests.codes.ok)
244         res = response.json()
245         self.assertIn('Xponder Roadm Link created successfully',
246                       res["output"]["result"])
247         time.sleep(2)
248
249     def test_14_connect_roadmB_PP2_to_spdrB_2_N2(self):
250         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
251                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
252         self.assertEqual(response.status_code, requests.codes.ok)
253         res = response.json()
254         self.assertIn('Roadm Xponder links created successfully',
255                       res["output"]["result"])
256         time.sleep(2)
257
258     def test_15_add_omsAttributes_ROADMA_ROADMB(self):
259         # Config ROADMA-ROADMB oms-attributes
260         data = {"span": {
261             "auto-spanloss": "true",
262             "spanloss-base": 11.4,
263             "spanloss-current": 12,
264             "engineered-spanloss": 12.2,
265             "link-concatenation": [{
266                 "SRLG-Id": 0,
267                 "fiber-type": "smf",
268                 "SRLG-length": 100000,
269                 "pmd": 0.5}]}}
270         response = test_utils.add_oms_attr_request(
271             "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
272         self.assertEqual(response.status_code, requests.codes.created)
273
274     def test_16_add_omsAttributes_ROADMB_ROADMA(self):
275         # Config ROADMB-ROADMA oms-attributes
276         data = {"span": {
277             "auto-spanloss": "true",
278             "spanloss-base": 11.4,
279             "spanloss-current": 12,
280             "engineered-spanloss": 12.2,
281             "link-concatenation": [{
282                 "SRLG-Id": 0,
283                 "fiber-type": "smf",
284                 "SRLG-length": 100000,
285                 "pmd": 0.5}]}}
286         response = test_utils.add_oms_attr_request(
287             "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
288         self.assertEqual(response.status_code, requests.codes.created)
289
290     def test_17_add_omsAttributes_ROADMB_ROADMC(self):
291         # Config ROADMB-ROADMC oms-attributes
292         data = {"span": {
293             "auto-spanloss": "true",
294             "spanloss-base": 11.4,
295             "spanloss-current": 12,
296             "engineered-spanloss": 12.2,
297             "link-concatenation": [{
298                 "SRLG-Id": 0,
299                 "fiber-type": "smf",
300                 "SRLG-length": 100000,
301                 "pmd": 0.5}]}}
302         response = test_utils.add_oms_attr_request(
303             "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
304         self.assertEqual(response.status_code, requests.codes.created)
305
306     def test_18_add_omsAttributes_ROADMC_ROADMB(self):
307         # Config ROADMC-ROADMB oms-attributes
308         data = {"span": {
309             "auto-spanloss": "true",
310             "spanloss-base": 11.4,
311             "spanloss-current": 12,
312             "engineered-spanloss": 12.2,
313             "link-concatenation": [{
314                 "SRLG-Id": 0,
315                 "fiber-type": "smf",
316                 "SRLG-length": 100000,
317                 "pmd": 0.5}]}}
318         response = test_utils.add_oms_attr_request(
319             "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
320         self.assertEqual(response.status_code, requests.codes.created)
321
322     def test_19_create_OTS_ROADMA_DEG1(self):
323         response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
324         time.sleep(10)
325         self.assertEqual(response.status_code, requests.codes.ok)
326         res = response.json()
327         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
328                       res["output"]["result"])
329
330     def test_20_create_OTS_ROADMB_DEG1(self):
331         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
332         time.sleep(10)
333         self.assertEqual(response.status_code, requests.codes.ok)
334         res = response.json()
335         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
336                       res["output"]["result"])
337
338     def test_21_create_OTS_ROADMB_DEG2(self):
339         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
340         time.sleep(10)
341         self.assertEqual(response.status_code, requests.codes.ok)
342         res = response.json()
343         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
344                       res["output"]["result"])
345
346     def test_22_create_OTS_ROADMC_DEG2(self):
347         response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
348         time.sleep(10)
349         self.assertEqual(response.status_code, requests.codes.ok)
350         res = response.json()
351         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
352                       res["output"]["result"])
353
354     def test_23_calculate_span_loss_base_all(self):
355         url = "{}/operations/transportpce-olm:calculate-spanloss-base"
356         data = {
357             "input": {
358                 "src-type": "all"
359             }
360         }
361         response = test_utils.post_request(url, data)
362         self.assertEqual(response.status_code, requests.codes.ok)
363         res = response.json()
364         self.assertIn('Success',
365                       res["output"]["result"])
366         self.assertIn({
367             "spanloss": "25.7",
368             "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
369         }, res["output"]["spans"])
370         self.assertIn({
371             "spanloss": "17.6",
372             "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
373         }, res["output"]["spans"])
374         self.assertIn({
375             "spanloss": "23.6",
376             "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
377         }, res["output"]["spans"])
378         self.assertIn({
379             "spanloss": "23.6",
380             "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
381         }, res["output"]["spans"])
382         self.assertIn({
383             "spanloss": "25.7",
384             "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
385         }, res["output"]["spans"])
386         self.assertIn({
387             "spanloss": "17.6",
388             "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
389         }, res["output"]["spans"])
390         time.sleep(5)
391
392     def test_24_check_otn_topology(self):
393         response = test_utils.get_otn_topo_request()
394         self.assertEqual(response.status_code, requests.codes.ok)
395         res = response.json()
396         nbNode = len(res['network'][0]['node'])
397         self.assertEqual(nbNode, 9, 'There should be 9 nodes')
398         self.assertNotIn('ietf-network-topology:link', res['network'][0],
399                          'otn-topology should have no link')
400
401 # test service-create for OCH-OTU4 service from spdrA to spdrB
402     def test_25_create_OCH_OTU4_service_AB(self):
403         response = test_utils.service_create_request(self.cr_serv_sample_data)
404         self.assertEqual(response.status_code, requests.codes.ok)
405         res = response.json()
406         self.assertIn('PCE calculation in progress',
407                       res['output']['configuration-response-common']['response-message'])
408         time.sleep(self.WAITING)
409
410     def test_26_get_OCH_OTU4_service_AB(self):
411         response = test_utils.get_service_list_request(
412             "services/service-OCH-OTU4-AB")
413         self.assertEqual(response.status_code, requests.codes.ok)
414         res = response.json()
415         self.assertEqual(
416             res['services'][0]['administrative-state'], 'inService')
417         self.assertEqual(
418             res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
419         self.assertEqual(
420             res['services'][0]['connection-type'], 'infrastructure')
421         self.assertEqual(
422             res['services'][0]['lifecycle-state'], 'planned')
423         time.sleep(2)
424
425 # Check correct configuration of devices
426     def test_27_check_interface_och_spdra(self):
427         response = test_utils.check_netconf_node_request(
428             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
429         self.assertEqual(response.status_code, requests.codes.ok)
430         res = response.json()
431         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
432                                    'administrative-state': 'inService',
433                                    'supporting-circuit-pack-name': 'CP5-CFP',
434                                    'type': 'org-openroadm-interfaces:opticalChannel',
435                                    'supporting-port': 'CP5-CFP-P1'
436                                    }, **res['interface'][0]),
437                              res['interface'][0])
438
439         self.assertDictEqual(
440             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
441              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
442             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
443
444     def test_28_check_interface_OTU4_spdra(self):
445         response = test_utils.check_netconf_node_request(
446             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
447         self.assertEqual(response.status_code, requests.codes.ok)
448         res = response.json()
449         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
450                         'administrative-state': 'inService',
451                         'supporting-circuit-pack-name': 'CP5-CFP',
452                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
453                         'type': 'org-openroadm-interfaces:otnOtu',
454                         'supporting-port': 'CP5-CFP-P1'
455                         }
456         input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
457                         'expected-dapi': 'AOQxIv+6nCD+',
458                         'tx-dapi': 'X+8cRNi+HbE=',
459                         'expected-sapi': 'X+8cRNi+HbE=',
460                         'rate': 'org-openroadm-otn-common-types:OTU4',
461                         'fec': 'scfec'
462                         }
463         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
464                              res['interface'][0])
465
466         self.assertDictEqual(input_dict_2,
467                              res['interface'][0]
468                              ['org-openroadm-otn-otu-interfaces:otu'])
469
470     def test_29_check_interface_och_spdrB(self):
471         response = test_utils.check_netconf_node_request(
472             "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
473         self.assertEqual(response.status_code, requests.codes.ok)
474         res = response.json()
475         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
476                                    'administrative-state': 'inService',
477                                    'supporting-circuit-pack-name': 'CP5-CFP',
478                                    'type': 'org-openroadm-interfaces:opticalChannel',
479                                    'supporting-port': 'CP5-CFP-P1'
480                                    }, **res['interface'][0]),
481                              res['interface'][0])
482
483         self.assertDictEqual(
484             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
485              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
486             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
487
488     def test_30_check_interface_OTU4_spdrB(self):
489         response = test_utils.check_netconf_node_request(
490             "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
491         self.assertEqual(response.status_code, requests.codes.ok)
492         res = response.json()
493         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
494                         'administrative-state': 'inService',
495                         'supporting-circuit-pack-name': 'CP5-CFP',
496                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
497                         'type': 'org-openroadm-interfaces:otnOtu',
498                         'supporting-port': 'CP5-CFP-P1'
499                         }
500         input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
501                         'expected-sapi': 'AOQxIv+6nCD+',
502                         'tx-sapi': 'X+8cRNi+HbE=',
503                         'expected-dapi': 'X+8cRNi+HbE=',
504                         'rate': 'org-openroadm-otn-common-types:OTU4',
505                         'fec': 'scfec'
506                         }
507
508         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
509                              res['interface'][0])
510
511         self.assertDictEqual(input_dict_2,
512                              res['interface'][0]
513                              ['org-openroadm-otn-otu-interfaces:otu'])
514
515     def test_31_check_no_interface_ODU4_spdra(self):
516         response = test_utils.check_netconf_node_request(
517             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
518         self.assertEqual(response.status_code, requests.codes.conflict)
519         res = response.json()
520         self.assertIn(
521             {"error-type": "application", "error-tag": "data-missing",
522              "error-message": "Request could not be completed because the relevant data model content does not exist"},
523             res['errors']['error'])
524
525     def test_32_check_openroadm_topo_spdra(self):
526         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
527         self.assertEqual(response.status_code, requests.codes.ok)
528         res = response.json()
529         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
530         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
531         self.assertEqual({'frequency': 196.1,
532                           'width': 40},
533                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
534         self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
535                          ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
536         time.sleep(3)
537
538     def test_33_check_openroadm_topo_ROADMA_SRG(self):
539         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
540         self.assertEqual(response.status_code, requests.codes.ok)
541         res = response.json()
542         freq_map = base64.b64decode(
543             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
544         freq_map_array = [int(x) for x in freq_map]
545         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
546         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
547         for ele in liste_tp:
548             if ele['tp-id'] == 'SRG1-PP3-TXRX':
549                 freq_map = base64.b64decode(
550                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
551                 freq_map_array = [int(x) for x in freq_map]
552                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
553             if ele['tp-id'] == 'SRG1-PP2-TXRX':
554                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
555         time.sleep(3)
556
557     def test_33_check_openroadm_topo_ROADMA_DEG1(self):
558         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
559         self.assertEqual(response.status_code, requests.codes.ok)
560         res = response.json()
561         freq_map = base64.b64decode(
562             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
563         freq_map_array = [int(x) for x in freq_map]
564         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
565         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
566         for ele in liste_tp:
567             if ele['tp-id'] == 'DEG1-CTP-TXRX':
568                 freq_map = base64.b64decode(
569                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
570                 freq_map_array = [int(x) for x in freq_map]
571                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
572             if ele['tp-id'] == 'DEG1-TTP-TXRX':
573                 freq_map = base64.b64decode(
574                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
575                 freq_map_array = [int(x) for x in freq_map]
576                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
577         time.sleep(3)
578
579     def test_34_check_otn_topo_otu4_links(self):
580         response = test_utils.get_otn_topo_request()
581         self.assertEqual(response.status_code, requests.codes.ok)
582         res = response.json()
583         nb_links = len(res['network'][0]['ietf-network-topology:link'])
584         self.assertEqual(nb_links, 2)
585         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
586                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
587         for link in res['network'][0]['ietf-network-topology:link']:
588             self.assertIn(link['link-id'], listLinkId)
589             self.assertEqual(
590                 link['transportpce-topology:otn-link-type'], 'OTU4')
591             self.assertEqual(
592                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
593             self.assertEqual(
594                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
595             self.assertEqual(
596                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
597             self.assertIn(
598                 link['org-openroadm-common-network:opposite-link'], listLinkId)
599
600 # test service-create for OCH-OTU4 service from spdrB to spdrC
601     def test_35_create_OCH_OTU4_service_BC(self):
602         # pylint: disable=line-too-long
603         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
604         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
605         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
606         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
607         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
608         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
609         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
610         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
611         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
612         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
613         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
614         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
615         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
616
617         response = test_utils.service_create_request(self.cr_serv_sample_data)
618         self.assertEqual(response.status_code, requests.codes.ok)
619         res = response.json()
620         self.assertIn('PCE calculation in progress',
621                       res['output']['configuration-response-common']['response-message'])
622         time.sleep(self.WAITING)
623
624     def test_36_get_OCH_OTU4_service_BC(self):
625         response = test_utils.get_service_list_request(
626             "services/service-OCH-OTU4-BC")
627         self.assertEqual(response.status_code, requests.codes.ok)
628         res = response.json()
629         self.assertEqual(
630             res['services'][0]['administrative-state'], 'inService')
631         self.assertEqual(
632             res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
633         self.assertEqual(
634             res['services'][0]['connection-type'], 'infrastructure')
635         self.assertEqual(
636             res['services'][0]['lifecycle-state'], 'planned')
637         time.sleep(2)
638
639 # Check correct configuration of devices
640     def test_37_check_interface_och_spdrB(self):
641         response = test_utils.check_netconf_node_request(
642             "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
643         self.assertEqual(response.status_code, requests.codes.ok)
644         res = response.json()
645         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
646                                    'administrative-state': 'inService',
647                                    'supporting-circuit-pack-name': 'CP6-CFP',
648                                    'type': 'org-openroadm-interfaces:opticalChannel',
649                                    'supporting-port': 'CP1-CFP0-P1'
650                                    }, **res['interface'][0]),
651                              res['interface'][0])
652
653         self.assertDictEqual(
654             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
655              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
656             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
657
658     def test_38_check_interface_OTU4_spdrB(self):
659         response = test_utils.check_netconf_node_request(
660             "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
661         self.assertEqual(response.status_code, requests.codes.ok)
662         res = response.json()
663         input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
664                         'administrative-state': 'inService',
665                         'supporting-circuit-pack-name': 'CP6-CFP',
666                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
667                         'type': 'org-openroadm-interfaces:otnOtu',
668                         'supporting-port': 'CP6-CFP-P1'
669                         }
670         input_dict_2 = {'tx-sapi': 'X+8cRNi+HbI=',
671                         'expected-dapi': 'X+8cRNi+HbI=',
672                         'tx-dapi': 'ALvne1QI5jo4',
673                         'expected-sapi': 'ALvne1QI5jo4',
674                         'rate': 'org-openroadm-otn-common-types:OTU4',
675                         'fec': 'scfec'
676                         }
677         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
678                              res['interface'][0])
679
680         self.assertDictEqual(input_dict_2,
681                              res['interface'][0]
682                              ['org-openroadm-otn-otu-interfaces:otu'])
683
684     def test_39_check_interface_och_spdrC(self):
685         response = test_utils.check_netconf_node_request(
686             "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
687         self.assertEqual(response.status_code, requests.codes.ok)
688         res = response.json()
689         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
690                                    'administrative-state': 'inService',
691                                    'supporting-circuit-pack-name': 'CP5-CFP',
692                                    'type': 'org-openroadm-interfaces:opticalChannel',
693                                    'supporting-port': 'CP5-CFP-P1'
694                                    }, **res['interface'][0]),
695                              res['interface'][0])
696
697         self.assertDictEqual(
698             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
699              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
700             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
701
702     def test_40_check_interface_OTU4_spdrC(self):
703         response = test_utils.check_netconf_node_request(
704             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
705         self.assertEqual(response.status_code, requests.codes.ok)
706         res = response.json()
707         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
708                         'administrative-state': 'inService',
709                         'supporting-circuit-pack-name': 'CP5-CFP',
710                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
711                         'type': 'org-openroadm-interfaces:otnOtu',
712                         'supporting-port': 'CP5-CFP-P1'
713                         }
714         input_dict_2 = {'tx-dapi': 'X+8cRNi+HbI=',
715                         'expected-sapi': 'X+8cRNi+HbI=',
716                         'tx-sapi': 'ALvne1QI5jo4',
717                         'expected-dapi': 'ALvne1QI5jo4',
718                         'rate': 'org-openroadm-otn-common-types:OTU4',
719                         'fec': 'scfec'
720                         }
721
722         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
723                              res['interface'][0])
724
725         self.assertDictEqual(input_dict_2,
726                              res['interface'][0]
727                              ['org-openroadm-otn-otu-interfaces:otu'])
728
729     def test_41_check_no_interface_ODU4_spdrB(self):
730         response = test_utils.check_netconf_node_request(
731             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
732         self.assertEqual(response.status_code, requests.codes.conflict)
733         res = response.json()
734         self.assertIn(
735             {"error-type": "application", "error-tag": "data-missing",
736              "error-message": "Request could not be completed because the relevant data model content does not exist"},
737             res['errors']['error'])
738
739     def test_42_check_openroadm_topo_spdrB(self):
740         response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
741         self.assertEqual(response.status_code, requests.codes.ok)
742         res = response.json()
743         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
744         # pylint: disable=consider-using-f-string
745         for ele in liste_tp:
746             if ele['tp-id'] == 'XPDR2-NETWORK1':
747                 self.assertEqual({'frequency': 196.1,
748                                   'width': 40},
749                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
750                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
751                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
752             elif ele['tp-id'] == 'XPDR2-NETWORK2':
753                 self.assertEqual({'frequency': 196.05,
754                                   'width': 40},
755                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
756                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
757                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
758             else:
759                 print("ele = {}".format(ele))
760                 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
761         time.sleep(3)
762
763     def test_43_check_openroadm_topo_ROADMB_SRG1(self):
764         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
765         self.assertEqual(response.status_code, requests.codes.ok)
766         res = response.json()
767         freq_map = base64.b64decode(
768             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
769         freq_map_array = [int(x) for x in freq_map]
770         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
771         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
772         self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
773         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
774         for ele in liste_tp:
775             if ele['tp-id'] == 'SRG1-PP1-TXRX':
776                 freq_map = base64.b64decode(
777                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
778                 freq_map_array = [int(x) for x in freq_map]
779                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
780             if ele['tp-id'] == 'SRG1-PP2-TXRX':
781                 freq_map = base64.b64decode(
782                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
783                 freq_map_array = [int(x) for x in freq_map]
784                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
785             if ele['tp-id'] == 'SRG1-PP3-TXRX':
786                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
787         time.sleep(3)
788
789     def test_44_check_openroadm_topo_ROADMB_DEG2(self):
790         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
791         self.assertEqual(response.status_code, requests.codes.ok)
792         res = response.json()
793         freq_map = base64.b64decode(
794             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
795         freq_map_array = [int(x) for x in freq_map]
796         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
797         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
798         for ele in liste_tp:
799             if ele['tp-id'] == 'DEG2-CTP-TXRX':
800                 freq_map = base64.b64decode(
801                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
802                 freq_map_array = [int(x) for x in freq_map]
803                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
804             if ele['tp-id'] == 'DEG2-TTP-TXRX':
805                 freq_map = base64.b64decode(
806                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
807                 freq_map_array = [int(x) for x in freq_map]
808                 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
809         time.sleep(3)
810
811     def test_45_check_otn_topo_otu4_links(self):
812         response = test_utils.get_otn_topo_request()
813         self.assertEqual(response.status_code, requests.codes.ok)
814         res = response.json()
815         nb_links = len(res['network'][0]['ietf-network-topology:link'])
816         self.assertEqual(nb_links, 4)
817         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
818                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
819                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
820                       'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
821         for link in res['network'][0]['ietf-network-topology:link']:
822             self.assertIn(link['link-id'], listLinkId)
823             self.assertEqual(
824                 link['transportpce-topology:otn-link-type'], 'OTU4')
825             self.assertEqual(
826                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
827             self.assertEqual(
828                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
829             self.assertEqual(
830                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
831             self.assertIn(
832                 link['org-openroadm-common-network:opposite-link'], listLinkId)
833
834 # test service-create for 100GE service from spdrA to spdrC via spdrB
835     def test_46_create_100GE_service_ABC(self):
836         # pylint: disable=line-too-long
837         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
838         self.cr_serv_sample_data["input"]["connection-type"] = "service"
839         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
840         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
841         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
842         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
843         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
844         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
845         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
846         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
847         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
848         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
849         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
850         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
851         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
852         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
853         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
854         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
855
856         response = test_utils.service_create_request(self.cr_serv_sample_data)
857         self.assertEqual(response.status_code, requests.codes.ok)
858         res = response.json()
859         self.assertIn('PCE calculation in progress',
860                       res['output']['configuration-response-common']['response-message'])
861         time.sleep(self.WAITING)
862
863     def test_47_get_100GE_service_ABC(self):
864         response = test_utils.get_service_list_request(
865             "services/service-100GE-ABC")
866         self.assertEqual(response.status_code, requests.codes.ok)
867         res = response.json()
868         self.assertEqual(
869             res['services'][0]['administrative-state'], 'inService')
870         self.assertEqual(
871             res['services'][0]['service-name'], 'service-100GE-ABC')
872         self.assertEqual(
873             res['services'][0]['connection-type'], 'service')
874         self.assertEqual(
875             res['services'][0]['lifecycle-state'], 'planned')
876         time.sleep(2)
877
878     def test_48_check_interface_100GE_CLIENT_spdra(self):
879         response = test_utils.check_netconf_node_request(
880             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
881         self.assertEqual(response.status_code, requests.codes.ok)
882         res = response.json()
883         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
884                       'administrative-state': 'inService',
885                       'supporting-circuit-pack-name': 'CP2-QSFP1',
886                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
887                       'supporting-port': 'CP2-QSFP1-P1'
888                       }
889         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
890                              res['interface'][0])
891         self.assertDictEqual(
892             {'speed': 100000,
893              'fec': 'off'},
894             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
895
896     def test_49_check_interface_ODU4_CLIENT_spdra(self):
897         response = test_utils.check_netconf_node_request(
898             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
899         self.assertEqual(response.status_code, requests.codes.ok)
900         res = response.json()
901         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
902                         'administrative-state': 'inService',
903                         'supporting-circuit-pack-name': 'CP2-QSFP1',
904                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
905                         'type': 'org-openroadm-interfaces:otnOdu',
906                         'supporting-port': 'CP2-QSFP1-P1'}
907         input_dict_2 = {
908             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
909             'rate': 'org-openroadm-otn-common-types:ODU4',
910             'monitoring-mode': 'terminated',
911             'expected-dapi': 'AItaZ6nmyaKJ',
912             'expected-sapi': 'AKFnJJaijWiz',
913             'tx-dapi': 'AKFnJJaijWiz',
914             'tx-sapi': 'AItaZ6nmyaKJ'}
915
916         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
917                              res['interface'][0])
918         self.assertDictEqual(dict(input_dict_2,
919                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
920                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
921         self.assertDictEqual(
922             {'payload-type': '21', 'exp-payload-type': '21'},
923             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
924
925     def test_50_check_interface_ODU4_NETWORK_spdra(self):
926         response = test_utils.check_netconf_node_request(
927             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
928         self.assertEqual(response.status_code, requests.codes.ok)
929         res = response.json()
930         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
931                         'administrative-state': 'inService',
932                         'supporting-circuit-pack-name': 'CP5-CFP',
933                         'type': 'org-openroadm-interfaces:otnOdu',
934                         'supporting-port': 'CP5-CFP-P1',
935                         'circuit-id': 'TBD',
936                         'description': 'TBD'}
937         input_dict_2 = {
938             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
939             'rate': 'org-openroadm-otn-common-types:ODU4',
940             'monitoring-mode': 'monitored'}
941
942         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
943                              res['interface'][0])
944         self.assertDictEqual(dict(input_dict_2,
945                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
946                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
947         self.assertNotIn('opu',
948                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
949
950     def test_51_check_ODU4_connection_spdra(self):
951         response = test_utils.check_netconf_node_request(
952             "SPDR-SA1",
953             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
954         self.assertEqual(response.status_code, requests.codes.ok)
955         res = response.json()
956         input_dict_1 = {
957             'connection-name':
958             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
959             'direction': 'bidirectional'
960         }
961
962         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
963                              res['odu-connection'][0])
964         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
965                              res['odu-connection'][0]['destination'])
966         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
967                              res['odu-connection'][0]['source'])
968
969     def test_52_check_interface_100GE_CLIENT_spdrc(self):
970         response = test_utils.check_netconf_node_request(
971             "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
972         self.assertEqual(response.status_code, requests.codes.ok)
973         res = response.json()
974         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
975                       'administrative-state': 'inService',
976                       'supporting-circuit-pack-name': 'CP2-QSFP1',
977                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
978                       'supporting-port': 'CP2-QSFP1-P1'
979                       }
980         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
981                              res['interface'][0])
982         self.assertDictEqual(
983             {'speed': 100000,
984              'fec': 'off'},
985             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
986
987     def test_53_check_interface_ODU4_CLIENT_spdrc(self):
988         response = test_utils.check_netconf_node_request(
989             "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
990         self.assertEqual(response.status_code, requests.codes.ok)
991         res = response.json()
992         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
993                         'administrative-state': 'inService',
994                         'supporting-circuit-pack-name': 'CP2-QSFP1',
995                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
996                         'type': 'org-openroadm-interfaces:otnOdu',
997                         'supporting-port': 'CP2-QSFP1-P1',
998                         'circuit-id': 'TBD',
999                         'description': 'TBD'}
1000         input_dict_2 = {
1001             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1002             'rate': 'org-openroadm-otn-common-types:ODU4',
1003             'monitoring-mode': 'terminated',
1004             'expected-dapi': 'AKFnJJaijWiz',
1005             'expected-sapi': 'AItaZ6nmyaKJ',
1006             'tx-dapi': 'AItaZ6nmyaKJ',
1007             'tx-sapi': 'AKFnJJaijWiz'}
1008
1009         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1010                              res['interface'][0])
1011         self.assertDictEqual(dict(input_dict_2,
1012                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1013                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1014         self.assertDictEqual(
1015             {'payload-type': '21', 'exp-payload-type': '21'},
1016             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1017
1018     def test_54_check_interface_ODU4_NETWORK_spdrc(self):
1019         response = test_utils.check_netconf_node_request(
1020             "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
1021         self.assertEqual(response.status_code, requests.codes.ok)
1022         res = response.json()
1023         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1024                         'administrative-state': 'inService',
1025                         'supporting-circuit-pack-name': 'CP5-CFP',
1026                         'type': 'org-openroadm-interfaces:otnOdu',
1027                         'supporting-port': 'CP5-CFP-P1',
1028                         'circuit-id': 'TBD',
1029                         'description': 'TBD'}
1030         input_dict_2 = {
1031             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1032             'rate': 'org-openroadm-otn-common-types:ODU4',
1033             'monitoring-mode': 'monitored'}
1034
1035         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1036                              res['interface'][0])
1037         self.assertDictEqual(dict(input_dict_2,
1038                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1039                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1040         self.assertNotIn('opu',
1041                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1042
1043     def test_55_check_ODU4_connection_spdrc(self):
1044         response = test_utils.check_netconf_node_request(
1045             "SPDR-SC1",
1046             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1047         self.assertEqual(response.status_code, requests.codes.ok)
1048         res = response.json()
1049         input_dict_1 = {
1050             'connection-name':
1051             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1052             'direction': 'bidirectional'
1053         }
1054
1055         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1056                              res['odu-connection'][0])
1057         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1058                              res['odu-connection'][0]['destination'])
1059         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1060                              res['odu-connection'][0]['source'])
1061
1062     def test_56_check_interface_ODU4_NETWORK1_spdrb(self):
1063         response = test_utils.check_netconf_node_request(
1064             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1065         self.assertEqual(response.status_code, requests.codes.ok)
1066         res = response.json()
1067         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1068                         'administrative-state': 'inService',
1069                         'supporting-circuit-pack-name': 'CP5-CFP',
1070                         'type': 'org-openroadm-interfaces:otnOdu',
1071                         'supporting-port': 'CP5-CFP-P1'}
1072         input_dict_2 = {
1073             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1074             'rate': 'org-openroadm-otn-common-types:ODU4',
1075             'monitoring-mode': 'monitored'}
1076
1077         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1078                              res['interface'][0])
1079         self.assertDictEqual(dict(input_dict_2,
1080                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1081                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1082         self.assertNotIn('opu',
1083                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1084
1085     def test_57_check_interface_ODU4_NETWORK2_spdrb(self):
1086         response = test_utils.check_netconf_node_request(
1087             "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1088         self.assertEqual(response.status_code, requests.codes.ok)
1089         res = response.json()
1090         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1091                         'administrative-state': 'inService',
1092                         'supporting-circuit-pack-name': 'CP6-CFP',
1093                         'type': 'org-openroadm-interfaces:otnOdu',
1094                         'supporting-port': 'CP6-CFP-P1'}
1095         input_dict_2 = {
1096             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1097             'rate': 'org-openroadm-otn-common-types:ODU4',
1098             'monitoring-mode': 'monitored'}
1099
1100         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1101                              res['interface'][0])
1102         self.assertDictEqual(dict(input_dict_2,
1103                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1104                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1105         self.assertNotIn('opu',
1106                          dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1107
1108     def test_58_check_ODU4_connection_spdrb(self):
1109         response = test_utils.check_netconf_node_request(
1110             "SPDR-SB1",
1111             "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1112         self.assertEqual(response.status_code, requests.codes.ok)
1113         res = response.json()
1114         input_dict_1 = {
1115             'connection-name':
1116             'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1117             'direction': 'bidirectional'
1118         }
1119
1120         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1121                              res['odu-connection'][0])
1122         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1123                              res['odu-connection'][0]['destination'])
1124         self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1125                              res['odu-connection'][0]['source'])
1126
1127     def test_59_check_otn_topo_links(self):
1128         response = test_utils.get_otn_topo_request()
1129         self.assertEqual(response.status_code, requests.codes.ok)
1130         res = response.json()
1131         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1132         self.assertEqual(nb_links, 4)
1133         for link in res['network'][0]['ietf-network-topology:link']:
1134             self.assertEqual(
1135                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1136             self.assertEqual(
1137                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1138
1139     def test_60_delete_service_100GE_ABC(self):
1140         response = test_utils.service_delete_request("service-100GE-ABC")
1141         self.assertEqual(response.status_code, requests.codes.ok)
1142         res = response.json()
1143         self.assertIn('Renderer service delete in progress',
1144                       res['output']['configuration-response-common']['response-message'])
1145         time.sleep(self.WAITING)
1146
1147     def test_61_check_service_list(self):
1148         response = test_utils.get_service_list_request("")
1149         self.assertEqual(response.status_code, requests.codes.ok)
1150         res = response.json()
1151         self.assertEqual(len(res['service-list']['services']), 2)
1152         time.sleep(2)
1153
1154     def test_62_check_no_ODU4_connection_spdra(self):
1155         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1156         self.assertEqual(response.status_code, requests.codes.ok)
1157         res = response.json()
1158         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1159         time.sleep(1)
1160
1161     def test_63_check_no_interface_ODU4_NETWORK_spdra(self):
1162         response = test_utils.check_netconf_node_request(
1163             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1164         self.assertEqual(response.status_code, requests.codes.conflict)
1165
1166     def test_64_check_no_interface_ODU4_CLIENT_spdra(self):
1167         response = test_utils.check_netconf_node_request(
1168             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1169         self.assertEqual(response.status_code, requests.codes.conflict)
1170
1171     def test_65_check_no_interface_100GE_CLIENT_spdra(self):
1172         response = test_utils.check_netconf_node_request(
1173             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1174         self.assertEqual(response.status_code, requests.codes.conflict)
1175
1176     def test_66_check_otn_topo_links(self):
1177         self.test_45_check_otn_topo_otu4_links()
1178
1179     def test_67_delete_OCH_OTU4_service_AB(self):
1180         response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1181         self.assertEqual(response.status_code, requests.codes.ok)
1182         res = response.json()
1183         self.assertIn('Renderer service delete in progress',
1184                       res['output']['configuration-response-common']['response-message'])
1185         time.sleep(self.WAITING)
1186
1187     def test_68_delete_OCH_OTU4_service_BC(self):
1188         response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1189         self.assertEqual(response.status_code, requests.codes.ok)
1190         res = response.json()
1191         self.assertIn('Renderer service delete in progress',
1192                       res['output']['configuration-response-common']['response-message'])
1193         time.sleep(self.WAITING)
1194
1195     def test_69_get_no_service(self):
1196         response = test_utils.get_service_list_request("")
1197         self.assertEqual(response.status_code, requests.codes.conflict)
1198         res = response.json()
1199         self.assertIn(
1200             {"error-type": "application", "error-tag": "data-missing",
1201              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1202             res['errors']['error'])
1203         time.sleep(1)
1204
1205     def test_70_check_no_interface_OTU4_spdra(self):
1206         response = test_utils.check_netconf_node_request(
1207             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1208         self.assertEqual(response.status_code, requests.codes.conflict)
1209
1210     def test_71_check_no_interface_OCH_spdra(self):
1211         response = test_utils.check_netconf_node_request(
1212             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1213         self.assertEqual(response.status_code, requests.codes.conflict)
1214
1215     def test_72_getLinks_OtnTopology(self):
1216         response = test_utils.get_otn_topo_request()
1217         self.assertEqual(response.status_code, requests.codes.ok)
1218         res = response.json()
1219         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1220
1221     def test_73_check_openroadm_topo_spdra(self):
1222         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1223         self.assertEqual(response.status_code, requests.codes.ok)
1224         res = response.json()
1225         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1226         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1227         self.assertNotIn('wavelength', dict.keys(
1228             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1229         time.sleep(3)
1230
1231     def test_74_check_openroadm_topo_ROADMB_SRG1(self):
1232         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1233         self.assertEqual(response.status_code, requests.codes.ok)
1234         res = response.json()
1235         freq_map = base64.b64decode(
1236             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1237         freq_map_array = [int(x) for x in freq_map]
1238         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1239         self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1240         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1241         for ele in liste_tp:
1242             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1243                 freq_map = base64.b64decode(
1244                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1245                 freq_map_array = [int(x) for x in freq_map]
1246                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1247             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1248                 freq_map = base64.b64decode(
1249                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1250                 freq_map_array = [int(x) for x in freq_map]
1251                 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1252         time.sleep(3)
1253
1254     def test_75_check_openroadm_topo_ROADMB_DEG1(self):
1255         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1256         self.assertEqual(response.status_code, requests.codes.ok)
1257         res = response.json()
1258         freq_map = base64.b64decode(
1259             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1260         freq_map_array = [int(x) for x in freq_map]
1261         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1262         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1263         for ele in liste_tp:
1264             if ele['tp-id'] == 'DEG1-CTP-TXRX':
1265                 freq_map = base64.b64decode(
1266                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1267                 freq_map_array = [int(x) for x in freq_map]
1268                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1269             if ele['tp-id'] == 'DEG1-TTP-TXRX':
1270                 freq_map = base64.b64decode(
1271                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1272                 freq_map_array = [int(x) for x in freq_map]
1273                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1274         time.sleep(3)
1275
1276     def test_76_check_openroadm_topo_ROADMB_DEG2(self):
1277         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1278         self.assertEqual(response.status_code, requests.codes.ok)
1279         res = response.json()
1280         freq_map = base64.b64decode(
1281             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1282         freq_map_array = [int(x) for x in freq_map]
1283         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1284         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1285         for ele in liste_tp:
1286             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1287                 freq_map = base64.b64decode(
1288                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1289                 freq_map_array = [int(x) for x in freq_map]
1290                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1291             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1292                 freq_map = base64.b64decode(
1293                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1294                 freq_map_array = [int(x) for x in freq_map]
1295                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1296         time.sleep(3)
1297
1298     def test_77_disconnect_xponders_from_roadm(self):
1299         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1300         response = test_utils.get_ordm_topo_request("")
1301         self.assertEqual(response.status_code, requests.codes.ok)
1302         res = response.json()
1303         links = res['network'][0]['ietf-network-topology:link']
1304         for link in links:
1305             if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1306                     and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1307                 link_name = link["link-id"]
1308                 response = test_utils.delete_request(url+link_name)
1309                 self.assertEqual(response.status_code, requests.codes.ok)
1310
1311     def test_78_disconnect_spdrB(self):
1312         response = test_utils.unmount_device("SPDR-SB1")
1313         self.assertEqual(response.status_code, requests.codes.ok,
1314                          test_utils.CODE_SHOULD_BE_200)
1315
1316     def test_79_disconnect_roadmB(self):
1317         response = test_utils.unmount_device("ROADM-B1")
1318         self.assertEqual(response.status_code, requests.codes.ok,
1319                          test_utils.CODE_SHOULD_BE_200)
1320
1321     def test_80_remove_roadm_to_roadm_links(self):
1322         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1323         response = test_utils.get_ordm_topo_request("")
1324         self.assertEqual(response.status_code, requests.codes.ok)
1325         res = response.json()
1326         links = res['network'][0]['ietf-network-topology:link']
1327         for link in links:
1328             if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1329                     and 'ROADM-B1' in link['link-id']):
1330                 link_name = link["link-id"]
1331                 response = test_utils.delete_request(url+link_name)
1332                 self.assertEqual(response.status_code, requests.codes.ok)
1333
1334     def test_81_add_omsAttributes_ROADMA_ROADMC(self):
1335         # Config ROADMA-ROADMC oms-attributes
1336         data = {"span": {
1337             "auto-spanloss": "true",
1338             "spanloss-base": 11.4,
1339             "spanloss-current": 12,
1340             "engineered-spanloss": 12.2,
1341             "link-concatenation": [{
1342                 "SRLG-Id": 0,
1343                 "fiber-type": "smf",
1344                 "SRLG-length": 100000,
1345                 "pmd": 0.5}]}}
1346         response = test_utils.add_oms_attr_request(
1347             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1348         self.assertEqual(response.status_code, requests.codes.created)
1349
1350     def test_82_add_omsAttributes_ROADMC_ROADMA(self):
1351         # Config ROADMC-ROADMA oms-attributes
1352         data = {"span": {
1353             "auto-spanloss": "true",
1354             "spanloss-base": 11.4,
1355             "spanloss-current": 12,
1356             "engineered-spanloss": 12.2,
1357             "link-concatenation": [{
1358                 "SRLG-Id": 0,
1359                 "fiber-type": "smf",
1360                 "SRLG-length": 100000,
1361                 "pmd": 0.5}]}}
1362         response = test_utils.add_oms_attr_request(
1363             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1364         self.assertEqual(response.status_code, requests.codes.created)
1365
1366     def test_83_create_OCH_OTU4_service_AC(self):
1367         # pylint: disable=line-too-long
1368         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1369         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1370         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1371         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1372         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1373         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1374         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1375         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1376         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1377         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1378         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1379         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1380         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1381         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1382         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1383         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1384         response = test_utils.service_create_request(self.cr_serv_sample_data)
1385         self.assertEqual(response.status_code, requests.codes.ok)
1386         res = response.json()
1387         self.assertIn('PCE calculation in progress',
1388                       res['output']['configuration-response-common']['response-message'])
1389         time.sleep(self.WAITING)
1390
1391     def test_84_get_OCH_OTU4_service_AC(self):
1392         response = test_utils.get_service_list_request(
1393             "services/service-OCH-OTU4-AC")
1394         self.assertEqual(response.status_code, requests.codes.ok)
1395         res = response.json()
1396         self.assertEqual(
1397             res['services'][0]['administrative-state'], 'inService')
1398         self.assertEqual(
1399             res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1400         self.assertEqual(
1401             res['services'][0]['connection-type'], 'infrastructure')
1402         self.assertEqual(
1403             res['services'][0]['lifecycle-state'], 'planned')
1404         time.sleep(2)
1405
1406 # test service-create for 100GE service from spdrA to spdrC via spdrB
1407     def test_85_create_100GE_service_AC(self):
1408         # pylint: disable=line-too-long
1409         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1410         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1411         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1412         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1413         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1414         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1415         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1416         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1417         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1418         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1419         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1420         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1421         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1422         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1423         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1424         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1425         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1426         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1427
1428         response = test_utils.service_create_request(self.cr_serv_sample_data)
1429         self.assertEqual(response.status_code, requests.codes.ok)
1430         res = response.json()
1431         self.assertIn('PCE calculation in progress',
1432                       res['output']['configuration-response-common']['response-message'])
1433         time.sleep(self.WAITING)
1434
1435     def test_86_get_100GE_service_AC(self):
1436         response = test_utils.get_service_list_request("services/service-100GE-AC")
1437         self.assertEqual(response.status_code, requests.codes.ok)
1438         res = response.json()
1439         self.assertEqual(
1440             res['services'][0]['administrative-state'], 'inService')
1441         self.assertEqual(
1442             res['services'][0]['service-name'], 'service-100GE-AC')
1443         self.assertEqual(
1444             res['services'][0]['connection-type'], 'service')
1445         self.assertEqual(
1446             res['services'][0]['lifecycle-state'], 'planned')
1447         time.sleep(2)
1448
1449     def test_87_check_configuration_spdra(self):
1450         self.test_48_check_interface_100GE_CLIENT_spdra()
1451         self.test_49_check_interface_ODU4_CLIENT_spdra()
1452         self.test_50_check_interface_ODU4_NETWORK_spdra()
1453         self.test_51_check_ODU4_connection_spdra()
1454
1455     def test_88_check_configuration_spdrc(self):
1456         self.test_52_check_interface_100GE_CLIENT_spdrc()
1457         self.test_53_check_interface_ODU4_CLIENT_spdrc()
1458         self.test_54_check_interface_ODU4_NETWORK_spdrc()
1459         self.test_55_check_ODU4_connection_spdrc()
1460
1461     def test_89_check_otn_topo_links(self):
1462         response = test_utils.get_otn_topo_request()
1463         self.assertEqual(response.status_code, requests.codes.ok)
1464         res = response.json()
1465         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1466         self.assertEqual(nb_links, 2)
1467         for link in res['network'][0]['ietf-network-topology:link']:
1468             self.assertEqual(
1469                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1470             self.assertEqual(
1471                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1472
1473     def test_90_delete_100GE_service_AC(self):
1474         response = test_utils.service_delete_request("service-100GE-AC")
1475         self.assertEqual(response.status_code, requests.codes.ok)
1476         res = response.json()
1477         self.assertIn('Renderer service delete in progress',
1478                       res['output']['configuration-response-common']['response-message'])
1479         time.sleep(self.WAITING)
1480
1481     def test_91_check_service_list(self):
1482         response = test_utils.get_service_list_request("")
1483         self.assertEqual(response.status_code, requests.codes.ok)
1484         res = response.json()
1485         self.assertEqual(len(res['service-list']['services']), 1)
1486         time.sleep(2)
1487
1488     def test_92_check_configuration_spdra(self):
1489         self.test_62_check_no_ODU4_connection_spdra()
1490         self.test_63_check_no_interface_ODU4_NETWORK_spdra()
1491         self.test_64_check_no_interface_ODU4_CLIENT_spdra()
1492         self.test_65_check_no_interface_100GE_CLIENT_spdra()
1493
1494     def test_93_check_otn_topo_links(self):
1495         response = test_utils.get_otn_topo_request()
1496         self.assertEqual(response.status_code, requests.codes.ok)
1497         res = response.json()
1498         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1499         self.assertEqual(nb_links, 2)
1500         for link in res['network'][0]['ietf-network-topology:link']:
1501             self.assertEqual(
1502                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1503             self.assertEqual(
1504                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1505
1506     def test_94_disconnect_xponders_from_roadm(self):
1507         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1508         response = test_utils.get_ordm_topo_request("")
1509         self.assertEqual(response.status_code, requests.codes.ok)
1510         res = response.json()
1511         links = res['network'][0]['ietf-network-topology:link']
1512         for link in links:
1513             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1514                 link_name = link["link-id"]
1515                 response = test_utils.delete_request(url+link_name)
1516                 self.assertEqual(response.status_code, requests.codes.ok)
1517
1518     def test_95_disconnect_spdrA(self):
1519         response = test_utils.unmount_device("SPDR-SA1")
1520         self.assertEqual(response.status_code, requests.codes.ok,
1521                          test_utils.CODE_SHOULD_BE_200)
1522
1523     def test_96_disconnect_spdrC(self):
1524         response = test_utils.unmount_device("SPDR-SC1")
1525         self.assertEqual(response.status_code, requests.codes.ok,
1526                          test_utils.CODE_SHOULD_BE_200)
1527
1528     def test_97_disconnect_roadmA(self):
1529         response = test_utils.unmount_device("ROADM-A1")
1530         self.assertEqual(response.status_code, requests.codes.ok,
1531                          test_utils.CODE_SHOULD_BE_200)
1532
1533     def test_98_disconnect_roadmC(self):
1534         response = test_utils.unmount_device("ROADM-C1")
1535         self.assertEqual(response.status_code, requests.codes.ok,
1536                          test_utils.CODE_SHOULD_BE_200)
1537
1538
1539 if __name__ == "__main__":
1540     unittest.main(verbosity=2)