Fix remaining pylint warnings apart fixme & dup
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test14_otn_switch_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
15
16 import base64
17 import unittest
18 import time
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     WAITING = 20  # nominal value is 300
32     NODE_VERSION = '2.2.1'
33
34     cr_serv_sample_data = {"input": {
35         "sdnc-request-header": {
36             "request-id": "request-1",
37             "rpc-action": "service-create",
38             "request-system-id": "appname"
39         },
40         "service-name": "service-OCH-OTU4-AB",
41         "common-id": "commonId",
42         "connection-type": "infrastructure",
43         "service-a-end": {
44             "service-rate": "100",
45             "node-id": "SPDR-SA1",
46             "service-format": "OTU",
47             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
48             "clli": "NodeSA",
49             "subrate-eth-sla": {
50                     "subrate-eth-sla": {
51                         "committed-info-rate": "100000",
52                         "committed-burst-size": "64"
53                     }
54             },
55             "tx-direction": {
56                 "port": {
57                     "port-device-name": "SPDR-SA1-XPDR2",
58                     "port-type": "fixed",
59                     "port-name": "XPDR2-NETWORK1",
60                     "port-rack": "000000.00",
61                     "port-shelf": "Chassis#1"
62                 },
63                 "lgx": {
64                     "lgx-device-name": "Some lgx-device-name",
65                     "lgx-port-name": "Some lgx-port-name",
66                     "lgx-port-rack": "000000.00",
67                     "lgx-port-shelf": "00"
68                 }
69             },
70             "rx-direction": {
71                 "port": {
72                     "port-device-name": "SPDR-SA1-XPDR2",
73                     "port-type": "fixed",
74                     "port-name": "XPDR2-NETWORK1",
75                     "port-rack": "000000.00",
76                     "port-shelf": "Chassis#1"
77                 },
78                 "lgx": {
79                     "lgx-device-name": "Some lgx-device-name",
80                     "lgx-port-name": "Some lgx-port-name",
81                     "lgx-port-rack": "000000.00",
82                     "lgx-port-shelf": "00"
83                 }
84             },
85             "optic-type": "gray"
86         },
87         "service-z-end": {
88             "service-rate": "100",
89             "node-id": "SPDR-SB1",
90             "service-format": "OTU",
91             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92             "clli": "NodeSB",
93             "subrate-eth-sla": {
94                     "subrate-eth-sla": {
95                         "committed-info-rate": "100000",
96                         "committed-burst-size": "64"
97                     }
98             },
99             "tx-direction": {
100                 "port": {
101                     "port-device-name": "SPDR-SB1-XPDR2",
102                     "port-type": "fixed",
103                     "port-name": "XPDR2-NETWORK1",
104                     "port-rack": "000000.00",
105                     "port-shelf": "Chassis#1"
106                 },
107                 "lgx": {
108                     "lgx-device-name": "Some lgx-device-name",
109                     "lgx-port-name": "Some lgx-port-name",
110                     "lgx-port-rack": "000000.00",
111                     "lgx-port-shelf": "00"
112                 }
113             },
114             "rx-direction": {
115                 "port": {
116                     "port-device-name": "SPDR-SB1-XPDR2",
117                     "port-type": "fixed",
118                     "port-name": "XPDR2-NETWORK1",
119                     "port-rack": "000000.00",
120                     "port-shelf": "Chassis#1"
121                 },
122                 "lgx": {
123                     "lgx-device-name": "Some lgx-device-name",
124                     "lgx-port-name": "Some lgx-port-name",
125                     "lgx-port-rack": "000000.00",
126                     "lgx-port-shelf": "00"
127                 }
128             },
129             "optic-type": "gray"
130         },
131         "due-date": "2018-06-15T00:00:01Z",
132         "operator-contact": "pw1234"
133     }
134     }
135
136     @classmethod
137     def setUpClass(cls):
138         cls.processes = test_utils.start_tpce()
139         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140                                                ('spdrb', cls.NODE_VERSION),
141                                                ('spdrc', cls.NODE_VERSION),
142                                                ('roadma', cls.NODE_VERSION),
143                                                ('roadmb', cls.NODE_VERSION),
144                                                ('roadmc', cls.NODE_VERSION)])
145
146     @classmethod
147     def tearDownClass(cls):
148         # pylint: disable=not-an-iterable
149         for process in cls.processes:
150             test_utils.shutdown_process(process)
151         print("all processes killed")
152
153     def setUp(self):
154         time.sleep(5)
155
156     def test_01_connect_spdrA(self):
157         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
158         self.assertEqual(response.status_code,
159                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_02_connect_spdrB(self):
162         response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
163         self.assertEqual(response.status_code,
164                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165
166     def test_03_connect_spdrC(self):
167         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
168         self.assertEqual(response.status_code,
169                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170
171     def test_04_connect_rdmA(self):
172         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
173         self.assertEqual(response.status_code,
174                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175
176     def test_05_connect_rdmB(self):
177         response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
178         self.assertEqual(response.status_code,
179                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
180
181     def test_06_connect_rdmC(self):
182         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
183         self.assertEqual(response.status_code,
184                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
185
186     def test_07_connect_sprdA_2_N1_to_roadmA_PP3(self):
187         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
188                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         self.assertIn('Xponder Roadm Link created successfully',
192                       res["output"]["result"])
193         time.sleep(2)
194
195     def test_08_connect_roadmA_PP3_to_spdrA_2_N1(self):
196         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
197                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
198         self.assertEqual(response.status_code, requests.codes.ok)
199         res = response.json()
200         self.assertIn('Roadm Xponder links created successfully',
201                       res["output"]["result"])
202         time.sleep(2)
203
204     def test_09_connect_sprdC_2_N1_to_roadmC_PP3(self):
205         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
206                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
207         self.assertEqual(response.status_code, requests.codes.ok)
208         res = response.json()
209         self.assertIn('Xponder Roadm Link created successfully',
210                       res["output"]["result"])
211         time.sleep(2)
212
213     def test_10_connect_roadmC_PP3_to_spdrC_2_N1(self):
214         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
215                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
216         self.assertEqual(response.status_code, requests.codes.ok)
217         res = response.json()
218         self.assertIn('Roadm Xponder links created successfully',
219                       res["output"]["result"])
220         time.sleep(2)
221
222     def test_11_connect_sprdB_2_N1_to_roadmB_PP1(self):
223         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
224                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
225         self.assertEqual(response.status_code, requests.codes.ok)
226         res = response.json()
227         self.assertIn('Xponder Roadm Link created successfully',
228                       res["output"]["result"])
229         time.sleep(2)
230
231     def test_12_connect_roadmB_PP1_to_spdrB_2_N1(self):
232         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
233                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
234         self.assertEqual(response.status_code, requests.codes.ok)
235         res = response.json()
236         self.assertIn('Roadm Xponder links created successfully',
237                       res["output"]["result"])
238         time.sleep(2)
239
240     def test_13_connect_sprdB_2_N2_to_roadmB_PP2(self):
241         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
242                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
243         self.assertEqual(response.status_code, requests.codes.ok)
244         res = response.json()
245         self.assertIn('Xponder Roadm Link created successfully',
246                       res["output"]["result"])
247         time.sleep(2)
248
249     def test_14_connect_roadmB_PP2_to_spdrB_2_N2(self):
250         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
251                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
252         self.assertEqual(response.status_code, requests.codes.ok)
253         res = response.json()
254         self.assertIn('Roadm Xponder links created successfully',
255                       res["output"]["result"])
256         time.sleep(2)
257
258     def test_15_add_omsAttributes_ROADMA_ROADMB(self):
259         # Config ROADMA-ROADMB oms-attributes
260         data = {"span": {
261             "auto-spanloss": "true",
262             "spanloss-base": 11.4,
263             "spanloss-current": 12,
264             "engineered-spanloss": 12.2,
265             "link-concatenation": [{
266                 "SRLG-Id": 0,
267                 "fiber-type": "smf",
268                 "SRLG-length": 100000,
269                 "pmd": 0.5}]}}
270         response = test_utils.add_oms_attr_request(
271             "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
272         self.assertEqual(response.status_code, requests.codes.created)
273
274     def test_16_add_omsAttributes_ROADMB_ROADMA(self):
275         # Config ROADMB-ROADMA oms-attributes
276         data = {"span": {
277             "auto-spanloss": "true",
278             "spanloss-base": 11.4,
279             "spanloss-current": 12,
280             "engineered-spanloss": 12.2,
281             "link-concatenation": [{
282                 "SRLG-Id": 0,
283                 "fiber-type": "smf",
284                 "SRLG-length": 100000,
285                 "pmd": 0.5}]}}
286         response = test_utils.add_oms_attr_request(
287             "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
288         self.assertEqual(response.status_code, requests.codes.created)
289
290     def test_17_add_omsAttributes_ROADMB_ROADMC(self):
291         # Config ROADMB-ROADMC oms-attributes
292         data = {"span": {
293             "auto-spanloss": "true",
294             "spanloss-base": 11.4,
295             "spanloss-current": 12,
296             "engineered-spanloss": 12.2,
297             "link-concatenation": [{
298                 "SRLG-Id": 0,
299                 "fiber-type": "smf",
300                 "SRLG-length": 100000,
301                 "pmd": 0.5}]}}
302         response = test_utils.add_oms_attr_request(
303             "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
304         self.assertEqual(response.status_code, requests.codes.created)
305
306     def test_18_add_omsAttributes_ROADMC_ROADMB(self):
307         # Config ROADMC-ROADMB oms-attributes
308         data = {"span": {
309             "auto-spanloss": "true",
310             "spanloss-base": 11.4,
311             "spanloss-current": 12,
312             "engineered-spanloss": 12.2,
313             "link-concatenation": [{
314                 "SRLG-Id": 0,
315                 "fiber-type": "smf",
316                 "SRLG-length": 100000,
317                 "pmd": 0.5}]}}
318         response = test_utils.add_oms_attr_request(
319             "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
320         self.assertEqual(response.status_code, requests.codes.created)
321
322     def test_19_create_OTS_ROADMA_DEG1(self):
323         response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
324         time.sleep(10)
325         self.assertEqual(response.status_code, requests.codes.ok)
326         res = response.json()
327         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
328                       res["output"]["result"])
329
330     def test_20_create_OTS_ROADMB_DEG1(self):
331         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
332         time.sleep(10)
333         self.assertEqual(response.status_code, requests.codes.ok)
334         res = response.json()
335         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
336                       res["output"]["result"])
337
338     def test_21_create_OTS_ROADMB_DEG2(self):
339         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
340         time.sleep(10)
341         self.assertEqual(response.status_code, requests.codes.ok)
342         res = response.json()
343         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
344                       res["output"]["result"])
345
346     def test_22_create_OTS_ROADMC_DEG2(self):
347         response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
348         time.sleep(10)
349         self.assertEqual(response.status_code, requests.codes.ok)
350         res = response.json()
351         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
352                       res["output"]["result"])
353
354     def test_23_calculate_span_loss_base_all(self):
355         url = "{}/operations/transportpce-olm:calculate-spanloss-base"
356         data = {
357             "input": {
358                 "src-type": "all"
359             }
360         }
361         response = test_utils.post_request(url, data)
362         self.assertEqual(response.status_code, requests.codes.ok)
363         res = response.json()
364         self.assertIn('Success',
365                       res["output"]["result"])
366         self.assertIn({
367             "spanloss": "25.7",
368             "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
369         }, res["output"]["spans"])
370         self.assertIn({
371             "spanloss": "17.6",
372             "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
373         }, res["output"]["spans"])
374         self.assertIn({
375             "spanloss": "23.6",
376             "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
377         }, res["output"]["spans"])
378         self.assertIn({
379             "spanloss": "23.6",
380             "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
381         }, res["output"]["spans"])
382         self.assertIn({
383             "spanloss": "25.7",
384             "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
385         }, res["output"]["spans"])
386         self.assertIn({
387             "spanloss": "17.6",
388             "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
389         }, res["output"]["spans"])
390         time.sleep(5)
391
392     def test_24_check_otn_topology(self):
393         response = test_utils.get_otn_topo_request()
394         self.assertEqual(response.status_code, requests.codes.ok)
395         res = response.json()
396         nbNode = len(res['network'][0]['node'])
397         self.assertEqual(nbNode, 9, 'There should be 9 nodes')
398         self.assertNotIn('ietf-network-topology:link', res['network'][0],
399                          'otn-topology should have no link')
400
401 # test service-create for OCH-OTU4 service from spdrA to spdrB
402     def test_25_create_OCH_OTU4_service_AB(self):
403         response = test_utils.service_create_request(self.cr_serv_sample_data)
404         self.assertEqual(response.status_code, requests.codes.ok)
405         res = response.json()
406         self.assertIn('PCE calculation in progress',
407                       res['output']['configuration-response-common']['response-message'])
408         time.sleep(self.WAITING)
409
410     def test_26_get_OCH_OTU4_service_AB(self):
411         response = test_utils.get_service_list_request(
412             "services/service-OCH-OTU4-AB")
413         self.assertEqual(response.status_code, requests.codes.ok)
414         res = response.json()
415         self.assertEqual(
416             res['services'][0]['administrative-state'], 'inService')
417         self.assertEqual(
418             res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
419         self.assertEqual(
420             res['services'][0]['connection-type'], 'infrastructure')
421         self.assertEqual(
422             res['services'][0]['lifecycle-state'], 'planned')
423         time.sleep(2)
424
425 # Check correct configuration of devices
426     def test_27_check_interface_och_spdra(self):
427         response = test_utils.check_netconf_node_request(
428             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
429         self.assertEqual(response.status_code, requests.codes.ok)
430         res = response.json()
431         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
432                                    'administrative-state': 'inService',
433                                    'supporting-circuit-pack-name': 'CP5-CFP',
434                                    'type': 'org-openroadm-interfaces:opticalChannel',
435                                    'supporting-port': 'CP5-CFP-P1'
436                                    }, **res['interface'][0]),
437                              res['interface'][0])
438
439         self.assertDictEqual(
440             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
441              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
442             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
443
444     def test_28_check_interface_OTU4_spdra(self):
445         response = test_utils.check_netconf_node_request(
446             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
447         self.assertEqual(response.status_code, requests.codes.ok)
448         res = response.json()
449         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
450                         'administrative-state': 'inService',
451                         'supporting-circuit-pack-name': 'CP5-CFP',
452                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
453                         'type': 'org-openroadm-interfaces:otnOtu',
454                         'supporting-port': 'CP5-CFP-P1'
455                         }
456         input_dict_2 = {'tx-sapi': 'exT821pFtOc=',
457                         'expected-dapi': 'exT821pFtOc=',
458                         'rate': 'org-openroadm-otn-common-types:OTU4',
459                         'fec': 'scfec'
460                         }
461         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
462                              res['interface'][0])
463
464         self.assertDictEqual(input_dict_2,
465                              res['interface'][0]
466                              ['org-openroadm-otn-otu-interfaces:otu'])
467
468     def test_29_check_interface_och_spdrB(self):
469         response = test_utils.check_netconf_node_request(
470             "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
471         self.assertEqual(response.status_code, requests.codes.ok)
472         res = response.json()
473         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
474                                    'administrative-state': 'inService',
475                                    'supporting-circuit-pack-name': 'CP5-CFP',
476                                    'type': 'org-openroadm-interfaces:opticalChannel',
477                                    'supporting-port': 'CP5-CFP-P1'
478                                    }, **res['interface'][0]),
479                              res['interface'][0])
480
481         self.assertDictEqual(
482             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
483              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
484             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
485
486     def test_30_check_interface_OTU4_spdrB(self):
487         response = test_utils.check_netconf_node_request(
488             "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
489         self.assertEqual(response.status_code, requests.codes.ok)
490         res = response.json()
491         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
492                         'administrative-state': 'inService',
493                         'supporting-circuit-pack-name': 'CP5-CFP',
494                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
495                         'type': 'org-openroadm-interfaces:otnOtu',
496                         'supporting-port': 'CP5-CFP-P1'
497                         }
498         input_dict_2 = {'tx-dapi': 'exT821pFtOc=',
499                         'expected-sapi': 'exT821pFtOc=',
500                         'tx-sapi': 'HPQZi9Cb3Aw=',
501                         'expected-dapi': 'HPQZi9Cb3Aw=',
502                         'rate': 'org-openroadm-otn-common-types:OTU4',
503                         'fec': 'scfec'
504                         }
505
506         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
507                              res['interface'][0])
508
509         self.assertDictEqual(input_dict_2,
510                              res['interface'][0]
511                              ['org-openroadm-otn-otu-interfaces:otu'])
512
513     def test_31_check_no_interface_ODU4_spdra(self):
514         response = test_utils.check_netconf_node_request(
515             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
516         self.assertEqual(response.status_code, requests.codes.conflict)
517         res = response.json()
518         self.assertIn(
519             {"error-type": "application", "error-tag": "data-missing",
520              "error-message": "Request could not be completed because the relevant data model content does not exist"},
521             res['errors']['error'])
522
523     def test_32_check_openroadm_topo_spdra(self):
524         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
525         self.assertEqual(response.status_code, requests.codes.ok)
526         res = response.json()
527         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
528         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
529         self.assertEqual({'frequency': 196.1,
530                           'width': 40},
531                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
532         self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
533                          ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
534         time.sleep(3)
535
536     def test_33_check_openroadm_topo_ROADMA_SRG(self):
537         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
538         self.assertEqual(response.status_code, requests.codes.ok)
539         res = response.json()
540         freq_map = base64.b64decode(
541             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
542         freq_map_array = [int(x) for x in freq_map]
543         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
544         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
545         for ele in liste_tp:
546             if ele['tp-id'] == 'SRG1-PP3-TXRX':
547                 freq_map = base64.b64decode(
548                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
549                 freq_map_array = [int(x) for x in freq_map]
550                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
551             if ele['tp-id'] == 'SRG1-PP2-TXRX':
552                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
553         time.sleep(3)
554
555     def test_33_check_openroadm_topo_ROADMA_DEG1(self):
556         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
557         self.assertEqual(response.status_code, requests.codes.ok)
558         res = response.json()
559         freq_map = base64.b64decode(
560             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
561         freq_map_array = [int(x) for x in freq_map]
562         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
563         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
564         for ele in liste_tp:
565             if ele['tp-id'] == 'DEG1-CTP-TXRX':
566                 freq_map = base64.b64decode(
567                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
568                 freq_map_array = [int(x) for x in freq_map]
569                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
570             if ele['tp-id'] == 'DEG1-TTP-TXRX':
571                 freq_map = base64.b64decode(
572                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
573                 freq_map_array = [int(x) for x in freq_map]
574                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
575         time.sleep(3)
576
577     def test_34_check_otn_topo_otu4_links(self):
578         response = test_utils.get_otn_topo_request()
579         self.assertEqual(response.status_code, requests.codes.ok)
580         res = response.json()
581         nb_links = len(res['network'][0]['ietf-network-topology:link'])
582         self.assertEqual(nb_links, 2)
583         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
584                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
585         for link in res['network'][0]['ietf-network-topology:link']:
586             self.assertIn(link['link-id'], listLinkId)
587             self.assertEqual(
588                 link['transportpce-topology:otn-link-type'], 'OTU4')
589             self.assertEqual(
590                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
591             self.assertEqual(
592                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
593             self.assertEqual(
594                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
595             self.assertIn(
596                 link['org-openroadm-common-network:opposite-link'], listLinkId)
597
598 # test service-create for OCH-OTU4 service from spdrB to spdrC
599     def test_35_create_OCH_OTU4_service_BC(self):
600         # pylint: disable=line-too-long
601         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
602         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
603         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
604         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
605         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
606         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
607         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
608         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
609         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
610         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
611         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
612         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
613         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
614
615         response = test_utils.service_create_request(self.cr_serv_sample_data)
616         self.assertEqual(response.status_code, requests.codes.ok)
617         res = response.json()
618         self.assertIn('PCE calculation in progress',
619                       res['output']['configuration-response-common']['response-message'])
620         time.sleep(self.WAITING)
621
622     def test_36_get_OCH_OTU4_service_BC(self):
623         response = test_utils.get_service_list_request(
624             "services/service-OCH-OTU4-BC")
625         self.assertEqual(response.status_code, requests.codes.ok)
626         res = response.json()
627         self.assertEqual(
628             res['services'][0]['administrative-state'], 'inService')
629         self.assertEqual(
630             res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
631         self.assertEqual(
632             res['services'][0]['connection-type'], 'infrastructure')
633         self.assertEqual(
634             res['services'][0]['lifecycle-state'], 'planned')
635         time.sleep(2)
636
637 # Check correct configuration of devices
638     def test_37_check_interface_och_spdrB(self):
639         response = test_utils.check_netconf_node_request(
640             "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
641         self.assertEqual(response.status_code, requests.codes.ok)
642         res = response.json()
643         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
644                                    'administrative-state': 'inService',
645                                    'supporting-circuit-pack-name': 'CP6-CFP',
646                                    'type': 'org-openroadm-interfaces:opticalChannel',
647                                    'supporting-port': 'CP1-CFP0-P1'
648                                    }, **res['interface'][0]),
649                              res['interface'][0])
650
651         self.assertDictEqual(
652             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
653              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
654             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
655
656     def test_38_check_interface_OTU4_spdrB(self):
657         response = test_utils.check_netconf_node_request(
658             "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
659         self.assertEqual(response.status_code, requests.codes.ok)
660         res = response.json()
661         input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
662                         'administrative-state': 'inService',
663                         'supporting-circuit-pack-name': 'CP6-CFP',
664                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
665                         'type': 'org-openroadm-interfaces:otnOtu',
666                         'supporting-port': 'CP6-CFP-P1'
667                         }
668         input_dict_2 = {'tx-sapi': 'HPQZi9Cb3A8=',
669                         'expected-dapi': 'HPQZi9Cb3A8=',
670                         'rate': 'org-openroadm-otn-common-types:OTU4',
671                         'fec': 'scfec'
672                         }
673         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
674                              res['interface'][0])
675
676         self.assertDictEqual(input_dict_2,
677                              res['interface'][0]
678                              ['org-openroadm-otn-otu-interfaces:otu'])
679
680     def test_39_check_interface_och_spdrC(self):
681         response = test_utils.check_netconf_node_request(
682             "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
683         self.assertEqual(response.status_code, requests.codes.ok)
684         res = response.json()
685         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
686                                    'administrative-state': 'inService',
687                                    'supporting-circuit-pack-name': 'CP5-CFP',
688                                    'type': 'org-openroadm-interfaces:opticalChannel',
689                                    'supporting-port': 'CP5-CFP-P1'
690                                    }, **res['interface'][0]),
691                              res['interface'][0])
692
693         self.assertDictEqual(
694             {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
695              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
696             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
697
698     def test_40_check_interface_OTU4_spdrC(self):
699         response = test_utils.check_netconf_node_request(
700             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
701         self.assertEqual(response.status_code, requests.codes.ok)
702         res = response.json()
703         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
704                         'administrative-state': 'inService',
705                         'supporting-circuit-pack-name': 'CP5-CFP',
706                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
707                         'type': 'org-openroadm-interfaces:otnOtu',
708                         'supporting-port': 'CP5-CFP-P1'
709                         }
710         input_dict_2 = {'tx-dapi': 'HPQZi9Cb3A8=',
711                         'expected-sapi': 'HPQZi9Cb3A8=',
712                         'tx-sapi': 'ALx70DYYfGTx',
713                         'expected-dapi': 'ALx70DYYfGTx',
714                         'rate': 'org-openroadm-otn-common-types:OTU4',
715                         'fec': 'scfec'
716                         }
717
718         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
719                              res['interface'][0])
720
721         self.assertDictEqual(input_dict_2,
722                              res['interface'][0]
723                              ['org-openroadm-otn-otu-interfaces:otu'])
724
725     def test_41_check_no_interface_ODU4_spdrB(self):
726         response = test_utils.check_netconf_node_request(
727             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
728         self.assertEqual(response.status_code, requests.codes.conflict)
729         res = response.json()
730         self.assertIn(
731             {"error-type": "application", "error-tag": "data-missing",
732              "error-message": "Request could not be completed because the relevant data model content does not exist"},
733             res['errors']['error'])
734
735     def test_42_check_openroadm_topo_spdrB(self):
736         response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
737         self.assertEqual(response.status_code, requests.codes.ok)
738         res = response.json()
739         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
740         # pylint: disable=consider-using-f-string
741         for ele in liste_tp:
742             if ele['tp-id'] == 'XPDR2-NETWORK1':
743                 self.assertEqual({'frequency': 196.1,
744                                   'width': 40},
745                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
746                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
747                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
748             elif ele['tp-id'] == 'XPDR2-NETWORK2':
749                 self.assertEqual({'frequency': 196.05,
750                                   'width': 40},
751                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
752                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
753                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
754             else:
755                 print("ele = {}".format(ele))
756                 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
757         time.sleep(3)
758
759     def test_43_check_openroadm_topo_ROADMB_SRG1(self):
760         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
761         self.assertEqual(response.status_code, requests.codes.ok)
762         res = response.json()
763         freq_map = base64.b64decode(
764             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
765         freq_map_array = [int(x) for x in freq_map]
766         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
767         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
768         self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
769         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
770         for ele in liste_tp:
771             if ele['tp-id'] == 'SRG1-PP1-TXRX':
772                 freq_map = base64.b64decode(
773                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
774                 freq_map_array = [int(x) for x in freq_map]
775                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
776             if ele['tp-id'] == 'SRG1-PP2-TXRX':
777                 freq_map = base64.b64decode(
778                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
779                 freq_map_array = [int(x) for x in freq_map]
780                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
781             if ele['tp-id'] == 'SRG1-PP3-TXRX':
782                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
783         time.sleep(3)
784
785     def test_44_check_openroadm_topo_ROADMB_DEG2(self):
786         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
787         self.assertEqual(response.status_code, requests.codes.ok)
788         res = response.json()
789         freq_map = base64.b64decode(
790             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
791         freq_map_array = [int(x) for x in freq_map]
792         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
793         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
794         for ele in liste_tp:
795             if ele['tp-id'] == 'DEG2-CTP-TXRX':
796                 freq_map = base64.b64decode(
797                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
798                 freq_map_array = [int(x) for x in freq_map]
799                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
800             if ele['tp-id'] == 'DEG2-TTP-TXRX':
801                 freq_map = base64.b64decode(
802                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
803                 freq_map_array = [int(x) for x in freq_map]
804                 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
805         time.sleep(3)
806
807     def test_45_check_otn_topo_otu4_links(self):
808         response = test_utils.get_otn_topo_request()
809         self.assertEqual(response.status_code, requests.codes.ok)
810         res = response.json()
811         nb_links = len(res['network'][0]['ietf-network-topology:link'])
812         self.assertEqual(nb_links, 4)
813         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
814                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
815                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
816                       'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
817         for link in res['network'][0]['ietf-network-topology:link']:
818             self.assertIn(link['link-id'], listLinkId)
819             self.assertEqual(
820                 link['transportpce-topology:otn-link-type'], 'OTU4')
821             self.assertEqual(
822                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
823             self.assertEqual(
824                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
825             self.assertEqual(
826                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
827             self.assertIn(
828                 link['org-openroadm-common-network:opposite-link'], listLinkId)
829
830 # test service-create for 100GE service from spdrA to spdrC via spdrB
831     def test_46_create_100GE_service_ABC(self):
832         # pylint: disable=line-too-long
833         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
834         self.cr_serv_sample_data["input"]["connection-type"] = "service"
835         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
836         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
837         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
838         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
839         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
840         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
841         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
842         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
843         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
844         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
845         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
846         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
847         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
848         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
849         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
850         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
851
852         response = test_utils.service_create_request(self.cr_serv_sample_data)
853         self.assertEqual(response.status_code, requests.codes.ok)
854         res = response.json()
855         self.assertIn('PCE calculation in progress',
856                       res['output']['configuration-response-common']['response-message'])
857         time.sleep(self.WAITING)
858
859     def test_47_get_100GE_service_ABC(self):
860         response = test_utils.get_service_list_request(
861             "services/service-100GE-ABC")
862         self.assertEqual(response.status_code, requests.codes.ok)
863         res = response.json()
864         self.assertEqual(
865             res['services'][0]['administrative-state'], 'inService')
866         self.assertEqual(
867             res['services'][0]['service-name'], 'service-100GE-ABC')
868         self.assertEqual(
869             res['services'][0]['connection-type'], 'service')
870         self.assertEqual(
871             res['services'][0]['lifecycle-state'], 'planned')
872         time.sleep(2)
873
874     def test_48_check_interface_100GE_CLIENT_spdra(self):
875         response = test_utils.check_netconf_node_request(
876             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
877         self.assertEqual(response.status_code, requests.codes.ok)
878         res = response.json()
879         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
880                       'administrative-state': 'inService',
881                       'supporting-circuit-pack-name': 'CP2-QSFP1',
882                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
883                       'supporting-port': 'CP2-QSFP1-P1'
884                       }
885         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
886                              res['interface'][0])
887         self.assertDictEqual(
888             {'speed': 100000,
889              'fec': 'off'},
890             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
891
892     def test_49_check_interface_ODU4_CLIENT_spdra(self):
893         response = test_utils.check_netconf_node_request(
894             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
895         self.assertEqual(response.status_code, requests.codes.ok)
896         res = response.json()
897         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
898                         'administrative-state': 'inService',
899                         'supporting-circuit-pack-name': 'CP2-QSFP1',
900                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
901                         'type': 'org-openroadm-interfaces:otnOdu',
902                         'supporting-port': 'CP2-QSFP1-P1'}
903         input_dict_2 = {
904             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
905             'rate': 'org-openroadm-otn-common-types:ODU4',
906             'monitoring-mode': 'terminated'}
907
908         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
909                              res['interface'][0])
910         self.assertDictEqual(dict(input_dict_2,
911                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
912                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
913         self.assertDictEqual(
914             {'payload-type': '07', 'exp-payload-type': '07'},
915             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
916
917     def test_50_check_interface_ODU4_NETWORK_spdra(self):
918         response = test_utils.check_netconf_node_request(
919             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
920         self.assertEqual(response.status_code, requests.codes.ok)
921         res = response.json()
922         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
923                         'administrative-state': 'inService',
924                         'supporting-circuit-pack-name': 'CP5-CFP',
925                         'type': 'org-openroadm-interfaces:otnOdu',
926                         'supporting-port': 'CP5-CFP-P1'}
927         input_dict_2 = {
928             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
929             'rate': 'org-openroadm-otn-common-types:ODU4',
930             'monitoring-mode': 'monitored'}
931
932         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
933                              res['interface'][0])
934         self.assertDictEqual(dict(input_dict_2,
935                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
936                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
937         self.assertDictEqual(
938             {'payload-type': '07', 'exp-payload-type': '07'},
939             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
940
941     def test_51_check_ODU4_connection_spdra(self):
942         response = test_utils.check_netconf_node_request(
943             "SPDR-SA1",
944             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
945         self.assertEqual(response.status_code, requests.codes.ok)
946         res = response.json()
947         input_dict_1 = {
948             'connection-name':
949             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
950             'direction': 'bidirectional'
951         }
952
953         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
954                              res['odu-connection'][0])
955         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
956                              res['odu-connection'][0]['destination'])
957         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
958                              res['odu-connection'][0]['source'])
959
960     def test_52_check_interface_100GE_CLIENT_spdrc(self):
961         response = test_utils.check_netconf_node_request(
962             "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
963         self.assertEqual(response.status_code, requests.codes.ok)
964         res = response.json()
965         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
966                       'administrative-state': 'inService',
967                       'supporting-circuit-pack-name': 'CP2-QSFP1',
968                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
969                       'supporting-port': 'CP2-QSFP1-P1'
970                       }
971         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
972                              res['interface'][0])
973         self.assertDictEqual(
974             {'speed': 100000,
975              'fec': 'off'},
976             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
977
978     def test_53_check_interface_ODU4_CLIENT_spdrc(self):
979         response = test_utils.check_netconf_node_request(
980             "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
981         self.assertEqual(response.status_code, requests.codes.ok)
982         res = response.json()
983         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
984                         'administrative-state': 'inService',
985                         'supporting-circuit-pack-name': 'CP2-QSFP1',
986                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
987                         'type': 'org-openroadm-interfaces:otnOdu',
988                         'supporting-port': 'CP2-QSFP1-P1'}
989         input_dict_2 = {
990             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
991             'rate': 'org-openroadm-otn-common-types:ODU4',
992             'monitoring-mode': 'terminated'}
993
994         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
995                              res['interface'][0])
996         self.assertDictEqual(dict(input_dict_2,
997                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
998                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
999         self.assertDictEqual(
1000             {'payload-type': '07', 'exp-payload-type': '07'},
1001             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1002
1003     def test_54_check_interface_ODU4_NETWORK_spdrc(self):
1004         response = test_utils.check_netconf_node_request(
1005             "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
1006         self.assertEqual(response.status_code, requests.codes.ok)
1007         res = response.json()
1008         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1009                         'administrative-state': 'inService',
1010                         'supporting-circuit-pack-name': 'CP5-CFP',
1011                         'type': 'org-openroadm-interfaces:otnOdu',
1012                         'supporting-port': 'CP5-CFP-P1'}
1013         input_dict_2 = {
1014             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1015             'rate': 'org-openroadm-otn-common-types:ODU4',
1016             'monitoring-mode': 'monitored'}
1017
1018         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1019                              res['interface'][0])
1020         self.assertDictEqual(dict(input_dict_2,
1021                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1022                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1023         self.assertDictEqual(
1024             {'payload-type': '07', 'exp-payload-type': '07'},
1025             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1026
1027     def test_55_check_ODU4_connection_spdrc(self):
1028         response = test_utils.check_netconf_node_request(
1029             "SPDR-SC1",
1030             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1031         self.assertEqual(response.status_code, requests.codes.ok)
1032         res = response.json()
1033         input_dict_1 = {
1034             'connection-name':
1035             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1036             'direction': 'bidirectional'
1037         }
1038
1039         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1040                              res['odu-connection'][0])
1041         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1042                              res['odu-connection'][0]['destination'])
1043         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1044                              res['odu-connection'][0]['source'])
1045
1046     def test_56_check_interface_ODU4_NETWORK1_spdrb(self):
1047         response = test_utils.check_netconf_node_request(
1048             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1049         self.assertEqual(response.status_code, requests.codes.ok)
1050         res = response.json()
1051         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1052                         'administrative-state': 'inService',
1053                         'supporting-circuit-pack-name': 'CP5-CFP',
1054                         'type': 'org-openroadm-interfaces:otnOdu',
1055                         'supporting-port': 'CP5-CFP-P1'}
1056         input_dict_2 = {
1057             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1058             'rate': 'org-openroadm-otn-common-types:ODU4',
1059             'monitoring-mode': 'monitored'}
1060
1061         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1062                              res['interface'][0])
1063         self.assertDictEqual(dict(input_dict_2,
1064                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1065                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1066         self.assertDictEqual(
1067             {'payload-type': '07', 'exp-payload-type': '07'},
1068             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1069
1070     def test_57_check_interface_ODU4_NETWORK2_spdrb(self):
1071         response = test_utils.check_netconf_node_request(
1072             "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1073         self.assertEqual(response.status_code, requests.codes.ok)
1074         res = response.json()
1075         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1076                         'administrative-state': 'inService',
1077                         'supporting-circuit-pack-name': 'CP6-CFP',
1078                         'type': 'org-openroadm-interfaces:otnOdu',
1079                         'supporting-port': 'CP6-CFP-P1'}
1080         input_dict_2 = {
1081             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1082             'rate': 'org-openroadm-otn-common-types:ODU4',
1083             'monitoring-mode': 'monitored'}
1084
1085         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1086                              res['interface'][0])
1087         self.assertDictEqual(dict(input_dict_2,
1088                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1089                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1090         self.assertDictEqual(
1091             {'payload-type': '07', 'exp-payload-type': '07'},
1092             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1093
1094     def test_58_check_ODU4_connection_spdrb(self):
1095         response = test_utils.check_netconf_node_request(
1096             "SPDR-SB1",
1097             "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1098         self.assertEqual(response.status_code, requests.codes.ok)
1099         res = response.json()
1100         input_dict_1 = {
1101             'connection-name':
1102             'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1103             'direction': 'bidirectional'
1104         }
1105
1106         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1107                              res['odu-connection'][0])
1108         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1109                              res['odu-connection'][0]['destination'])
1110         self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1111                              res['odu-connection'][0]['source'])
1112
1113     def test_59_check_otn_topo_links(self):
1114         response = test_utils.get_otn_topo_request()
1115         self.assertEqual(response.status_code, requests.codes.ok)
1116         res = response.json()
1117         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1118         self.assertEqual(nb_links, 4)
1119         for link in res['network'][0]['ietf-network-topology:link']:
1120             self.assertEqual(
1121                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1122             self.assertEqual(
1123                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1124
1125     def test_60_delete_service_100GE_ABC(self):
1126         response = test_utils.service_delete_request("service-100GE-ABC")
1127         self.assertEqual(response.status_code, requests.codes.ok)
1128         res = response.json()
1129         self.assertIn('Renderer service delete in progress',
1130                       res['output']['configuration-response-common']['response-message'])
1131         time.sleep(self.WAITING)
1132
1133     def test_61_check_service_list(self):
1134         response = test_utils.get_service_list_request("")
1135         self.assertEqual(response.status_code, requests.codes.ok)
1136         res = response.json()
1137         self.assertEqual(len(res['service-list']['services']), 2)
1138         time.sleep(2)
1139
1140     def test_62_check_no_ODU4_connection_spdra(self):
1141         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1142         self.assertEqual(response.status_code, requests.codes.ok)
1143         res = response.json()
1144         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1145         time.sleep(1)
1146
1147     def test_63_check_no_interface_ODU4_NETWORK_spdra(self):
1148         response = test_utils.check_netconf_node_request(
1149             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1150         self.assertEqual(response.status_code, requests.codes.conflict)
1151
1152     def test_64_check_no_interface_ODU4_CLIENT_spdra(self):
1153         response = test_utils.check_netconf_node_request(
1154             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1155         self.assertEqual(response.status_code, requests.codes.conflict)
1156
1157     def test_65_check_no_interface_100GE_CLIENT_spdra(self):
1158         response = test_utils.check_netconf_node_request(
1159             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1160         self.assertEqual(response.status_code, requests.codes.conflict)
1161
1162     def test_66_check_otn_topo_links(self):
1163         self.test_45_check_otn_topo_otu4_links()
1164
1165     def test_67_delete_OCH_OTU4_service_AB(self):
1166         response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1167         self.assertEqual(response.status_code, requests.codes.ok)
1168         res = response.json()
1169         self.assertIn('Renderer service delete in progress',
1170                       res['output']['configuration-response-common']['response-message'])
1171         time.sleep(self.WAITING)
1172
1173     def test_68_delete_OCH_OTU4_service_BC(self):
1174         response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1175         self.assertEqual(response.status_code, requests.codes.ok)
1176         res = response.json()
1177         self.assertIn('Renderer service delete in progress',
1178                       res['output']['configuration-response-common']['response-message'])
1179         time.sleep(self.WAITING)
1180
1181     def test_69_get_no_service(self):
1182         response = test_utils.get_service_list_request("")
1183         self.assertEqual(response.status_code, requests.codes.conflict)
1184         res = response.json()
1185         self.assertIn(
1186             {"error-type": "application", "error-tag": "data-missing",
1187              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1188             res['errors']['error'])
1189         time.sleep(1)
1190
1191     def test_70_check_no_interface_OTU4_spdra(self):
1192         response = test_utils.check_netconf_node_request(
1193             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1194         self.assertEqual(response.status_code, requests.codes.conflict)
1195
1196     def test_71_check_no_interface_OCH_spdra(self):
1197         response = test_utils.check_netconf_node_request(
1198             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1199         self.assertEqual(response.status_code, requests.codes.conflict)
1200
1201     def test_72_getLinks_OtnTopology(self):
1202         response = test_utils.get_otn_topo_request()
1203         self.assertEqual(response.status_code, requests.codes.ok)
1204         res = response.json()
1205         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1206
1207     def test_73_check_openroadm_topo_spdra(self):
1208         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1209         self.assertEqual(response.status_code, requests.codes.ok)
1210         res = response.json()
1211         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1212         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1213         self.assertNotIn('wavelength', dict.keys(
1214             tp['org-openroadm-network-topology:xpdr-network-attributes']))
1215         time.sleep(3)
1216
1217     def test_74_check_openroadm_topo_ROADMB_SRG1(self):
1218         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1219         self.assertEqual(response.status_code, requests.codes.ok)
1220         res = response.json()
1221         freq_map = base64.b64decode(
1222             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1223         freq_map_array = [int(x) for x in freq_map]
1224         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1225         self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1226         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1227         for ele in liste_tp:
1228             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1229                 freq_map = base64.b64decode(
1230                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1231                 freq_map_array = [int(x) for x in freq_map]
1232                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1233             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1234                 freq_map = base64.b64decode(
1235                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1236                 freq_map_array = [int(x) for x in freq_map]
1237                 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1238         time.sleep(3)
1239
1240     def test_75_check_openroadm_topo_ROADMB_DEG1(self):
1241         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1242         self.assertEqual(response.status_code, requests.codes.ok)
1243         res = response.json()
1244         freq_map = base64.b64decode(
1245             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1246         freq_map_array = [int(x) for x in freq_map]
1247         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1248         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1249         for ele in liste_tp:
1250             if ele['tp-id'] == 'DEG1-CTP-TXRX':
1251                 freq_map = base64.b64decode(
1252                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1253                 freq_map_array = [int(x) for x in freq_map]
1254                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1255             if ele['tp-id'] == 'DEG1-TTP-TXRX':
1256                 freq_map = base64.b64decode(
1257                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1258                 freq_map_array = [int(x) for x in freq_map]
1259                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1260         time.sleep(3)
1261
1262     def test_76_check_openroadm_topo_ROADMB_DEG2(self):
1263         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1264         self.assertEqual(response.status_code, requests.codes.ok)
1265         res = response.json()
1266         freq_map = base64.b64decode(
1267             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1268         freq_map_array = [int(x) for x in freq_map]
1269         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1270         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1271         for ele in liste_tp:
1272             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1273                 freq_map = base64.b64decode(
1274                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1275                 freq_map_array = [int(x) for x in freq_map]
1276                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1277             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1278                 freq_map = base64.b64decode(
1279                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1280                 freq_map_array = [int(x) for x in freq_map]
1281                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1282         time.sleep(3)
1283
1284     def test_77_disconnect_xponders_from_roadm(self):
1285         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1286         response = test_utils.get_ordm_topo_request("")
1287         self.assertEqual(response.status_code, requests.codes.ok)
1288         res = response.json()
1289         links = res['network'][0]['ietf-network-topology:link']
1290         for link in links:
1291             if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1292                     and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1293                 link_name = link["link-id"]
1294                 response = test_utils.delete_request(url+link_name)
1295                 self.assertEqual(response.status_code, requests.codes.ok)
1296
1297     def test_78_disconnect_spdrB(self):
1298         response = test_utils.unmount_device("SPDR-SB1")
1299         self.assertEqual(response.status_code, requests.codes.ok,
1300                          test_utils.CODE_SHOULD_BE_200)
1301
1302     def test_79_disconnect_roadmB(self):
1303         response = test_utils.unmount_device("ROADM-B1")
1304         self.assertEqual(response.status_code, requests.codes.ok,
1305                          test_utils.CODE_SHOULD_BE_200)
1306
1307     def test_80_remove_roadm_to_roadm_links(self):
1308         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1309         response = test_utils.get_ordm_topo_request("")
1310         self.assertEqual(response.status_code, requests.codes.ok)
1311         res = response.json()
1312         links = res['network'][0]['ietf-network-topology:link']
1313         for link in links:
1314             if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1315                     and 'ROADM-B1' in link['link-id']):
1316                 link_name = link["link-id"]
1317                 response = test_utils.delete_request(url+link_name)
1318                 self.assertEqual(response.status_code, requests.codes.ok)
1319
1320     def test_81_add_omsAttributes_ROADMA_ROADMC(self):
1321         # Config ROADMA-ROADMC oms-attributes
1322         data = {"span": {
1323             "auto-spanloss": "true",
1324             "spanloss-base": 11.4,
1325             "spanloss-current": 12,
1326             "engineered-spanloss": 12.2,
1327             "link-concatenation": [{
1328                 "SRLG-Id": 0,
1329                 "fiber-type": "smf",
1330                 "SRLG-length": 100000,
1331                 "pmd": 0.5}]}}
1332         response = test_utils.add_oms_attr_request(
1333             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1334         self.assertEqual(response.status_code, requests.codes.created)
1335
1336     def test_82_add_omsAttributes_ROADMC_ROADMA(self):
1337         # Config ROADMC-ROADMA oms-attributes
1338         data = {"span": {
1339             "auto-spanloss": "true",
1340             "spanloss-base": 11.4,
1341             "spanloss-current": 12,
1342             "engineered-spanloss": 12.2,
1343             "link-concatenation": [{
1344                 "SRLG-Id": 0,
1345                 "fiber-type": "smf",
1346                 "SRLG-length": 100000,
1347                 "pmd": 0.5}]}}
1348         response = test_utils.add_oms_attr_request(
1349             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1350         self.assertEqual(response.status_code, requests.codes.created)
1351
1352     def test_83_create_OCH_OTU4_service_AC(self):
1353         # pylint: disable=line-too-long
1354         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1355         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1356         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1357         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1358         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1359         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1360         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1361         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1362         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1363         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1364         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1365         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1366         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1367         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1368         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1369         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1370         response = test_utils.service_create_request(self.cr_serv_sample_data)
1371         self.assertEqual(response.status_code, requests.codes.ok)
1372         res = response.json()
1373         self.assertIn('PCE calculation in progress',
1374                       res['output']['configuration-response-common']['response-message'])
1375         time.sleep(self.WAITING)
1376
1377     def test_84_get_OCH_OTU4_service_AC(self):
1378         response = test_utils.get_service_list_request(
1379             "services/service-OCH-OTU4-AC")
1380         self.assertEqual(response.status_code, requests.codes.ok)
1381         res = response.json()
1382         self.assertEqual(
1383             res['services'][0]['administrative-state'], 'inService')
1384         self.assertEqual(
1385             res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1386         self.assertEqual(
1387             res['services'][0]['connection-type'], 'infrastructure')
1388         self.assertEqual(
1389             res['services'][0]['lifecycle-state'], 'planned')
1390         time.sleep(2)
1391
1392 # test service-create for 100GE service from spdrA to spdrC via spdrB
1393     def test_85_create_100GE_service_AC(self):
1394         # pylint: disable=line-too-long
1395         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1396         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1397         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1398         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1399         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1400         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1401         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1402         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1403         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1404         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1405         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1406         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1407         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1408         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1409         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1410         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1411         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1412         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1413
1414         response = test_utils.service_create_request(self.cr_serv_sample_data)
1415         self.assertEqual(response.status_code, requests.codes.ok)
1416         res = response.json()
1417         self.assertIn('PCE calculation in progress',
1418                       res['output']['configuration-response-common']['response-message'])
1419         time.sleep(self.WAITING)
1420
1421     def test_86_get_100GE_service_AC(self):
1422         response = test_utils.get_service_list_request("services/service-100GE-AC")
1423         self.assertEqual(response.status_code, requests.codes.ok)
1424         res = response.json()
1425         self.assertEqual(
1426             res['services'][0]['administrative-state'], 'inService')
1427         self.assertEqual(
1428             res['services'][0]['service-name'], 'service-100GE-AC')
1429         self.assertEqual(
1430             res['services'][0]['connection-type'], 'service')
1431         self.assertEqual(
1432             res['services'][0]['lifecycle-state'], 'planned')
1433         time.sleep(2)
1434
1435     def test_87_check_configuration_spdra(self):
1436         self.test_48_check_interface_100GE_CLIENT_spdra()
1437         self.test_49_check_interface_ODU4_CLIENT_spdra()
1438         self.test_50_check_interface_ODU4_NETWORK_spdra()
1439         self.test_51_check_ODU4_connection_spdra()
1440
1441     def test_88_check_configuration_spdrc(self):
1442         self.test_52_check_interface_100GE_CLIENT_spdrc()
1443         self.test_53_check_interface_ODU4_CLIENT_spdrc()
1444         self.test_54_check_interface_ODU4_NETWORK_spdrc()
1445         self.test_55_check_ODU4_connection_spdrc()
1446
1447     def test_89_check_otn_topo_links(self):
1448         response = test_utils.get_otn_topo_request()
1449         self.assertEqual(response.status_code, requests.codes.ok)
1450         res = response.json()
1451         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1452         self.assertEqual(nb_links, 2)
1453         for link in res['network'][0]['ietf-network-topology:link']:
1454             self.assertEqual(
1455                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1456             self.assertEqual(
1457                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1458
1459     def test_90_delete_100GE_service_AC(self):
1460         response = test_utils.service_delete_request("service-100GE-AC")
1461         self.assertEqual(response.status_code, requests.codes.ok)
1462         res = response.json()
1463         self.assertIn('Renderer service delete in progress',
1464                       res['output']['configuration-response-common']['response-message'])
1465         time.sleep(self.WAITING)
1466
1467     def test_91_check_service_list(self):
1468         response = test_utils.get_service_list_request("")
1469         self.assertEqual(response.status_code, requests.codes.ok)
1470         res = response.json()
1471         self.assertEqual(len(res['service-list']['services']), 1)
1472         time.sleep(2)
1473
1474     def test_92_check_configuration_spdra(self):
1475         self.test_62_check_no_ODU4_connection_spdra()
1476         self.test_63_check_no_interface_ODU4_NETWORK_spdra()
1477         self.test_64_check_no_interface_ODU4_CLIENT_spdra()
1478         self.test_65_check_no_interface_100GE_CLIENT_spdra()
1479
1480     def test_93_check_otn_topo_links(self):
1481         response = test_utils.get_otn_topo_request()
1482         self.assertEqual(response.status_code, requests.codes.ok)
1483         res = response.json()
1484         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1485         self.assertEqual(nb_links, 2)
1486         for link in res['network'][0]['ietf-network-topology:link']:
1487             self.assertEqual(
1488                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1489             self.assertEqual(
1490                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1491
1492     def test_94_disconnect_xponders_from_roadm(self):
1493         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1494         response = test_utils.get_ordm_topo_request("")
1495         self.assertEqual(response.status_code, requests.codes.ok)
1496         res = response.json()
1497         links = res['network'][0]['ietf-network-topology:link']
1498         for link in links:
1499             if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1500                 link_name = link["link-id"]
1501                 response = test_utils.delete_request(url+link_name)
1502                 self.assertEqual(response.status_code, requests.codes.ok)
1503
1504     def test_95_disconnect_spdrA(self):
1505         response = test_utils.unmount_device("SPDR-SA1")
1506         self.assertEqual(response.status_code, requests.codes.ok,
1507                          test_utils.CODE_SHOULD_BE_200)
1508
1509     def test_96_disconnect_spdrC(self):
1510         response = test_utils.unmount_device("SPDR-SC1")
1511         self.assertEqual(response.status_code, requests.codes.ok,
1512                          test_utils.CODE_SHOULD_BE_200)
1513
1514     def test_97_disconnect_roadmA(self):
1515         response = test_utils.unmount_device("ROADM-A1")
1516         self.assertEqual(response.status_code, requests.codes.ok,
1517                          test_utils.CODE_SHOULD_BE_200)
1518
1519     def test_98_disconnect_roadmC(self):
1520         response = test_utils.unmount_device("ROADM-C1")
1521         self.assertEqual(response.status_code, requests.codes.ok,
1522                          test_utils.CODE_SHOULD_BE_200)
1523
1524
1525 if __name__ == "__main__":
1526     unittest.main(verbosity=2)