110111e3e00b12b94bb73e0f8a33ab4814efbc59
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test14_otn_switch_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import base64
16 import unittest
17 import time
18 import requests
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 import test_utils  # nopep8
22
23
24 class TransportPCEtesting(unittest.TestCase):
25
26     processes = None
27     WAITING = 20  # nominal value is 300
28     NODE_VERSION = '2.2.1'
29
30     cr_serv_sample_data = {"input": {
31         "sdnc-request-header": {
32             "request-id": "request-1",
33             "rpc-action": "service-create",
34             "request-system-id": "appname"
35         },
36         "service-name": "service-OCH-OTU4-AB",
37         "common-id": "commonId",
38         "connection-type": "infrastructure",
39         "service-a-end": {
40             "service-rate": "100",
41             "node-id": "SPDR-SA1",
42             "service-format": "OTU",
43             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
44             "clli": "NodeSA",
45             "subrate-eth-sla": {
46                     "subrate-eth-sla": {
47                         "committed-info-rate": "100000",
48                         "committed-burst-size": "64"
49                     }
50             },
51             "tx-direction": {
52                 "port": {
53                     "port-device-name": "SPDR-SA1-XPDR2",
54                     "port-type": "fixed",
55                     "port-name": "XPDR2-NETWORK1",
56                     "port-rack": "000000.00",
57                     "port-shelf": "Chassis#1"
58                 },
59                 "lgx": {
60                     "lgx-device-name": "Some lgx-device-name",
61                     "lgx-port-name": "Some lgx-port-name",
62                     "lgx-port-rack": "000000.00",
63                     "lgx-port-shelf": "00"
64                 }
65             },
66             "rx-direction": {
67                 "port": {
68                     "port-device-name": "SPDR-SA1-XPDR2",
69                     "port-type": "fixed",
70                     "port-name": "XPDR2-NETWORK1",
71                     "port-rack": "000000.00",
72                     "port-shelf": "Chassis#1"
73                 },
74                 "lgx": {
75                     "lgx-device-name": "Some lgx-device-name",
76                     "lgx-port-name": "Some lgx-port-name",
77                     "lgx-port-rack": "000000.00",
78                     "lgx-port-shelf": "00"
79                 }
80             },
81             "optic-type": "gray"
82         },
83         "service-z-end": {
84             "service-rate": "100",
85             "node-id": "SPDR-SB1",
86             "service-format": "OTU",
87             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
88             "clli": "NodeSB",
89             "subrate-eth-sla": {
90                     "subrate-eth-sla": {
91                         "committed-info-rate": "100000",
92                         "committed-burst-size": "64"
93                     }
94             },
95             "tx-direction": {
96                 "port": {
97                     "port-device-name": "SPDR-SB1-XPDR2",
98                     "port-type": "fixed",
99                     "port-name": "XPDR2-NETWORK1",
100                     "port-rack": "000000.00",
101                     "port-shelf": "Chassis#1"
102                 },
103                 "lgx": {
104                     "lgx-device-name": "Some lgx-device-name",
105                     "lgx-port-name": "Some lgx-port-name",
106                     "lgx-port-rack": "000000.00",
107                     "lgx-port-shelf": "00"
108                 }
109             },
110             "rx-direction": {
111                 "port": {
112                     "port-device-name": "SPDR-SB1-XPDR2",
113                     "port-type": "fixed",
114                     "port-name": "XPDR2-NETWORK1",
115                     "port-rack": "000000.00",
116                     "port-shelf": "Chassis#1"
117                 },
118                 "lgx": {
119                     "lgx-device-name": "Some lgx-device-name",
120                     "lgx-port-name": "Some lgx-port-name",
121                     "lgx-port-rack": "000000.00",
122                     "lgx-port-shelf": "00"
123                 }
124             },
125             "optic-type": "gray"
126         },
127         "due-date": "2018-06-15T00:00:01Z",
128         "operator-contact": "pw1234"
129     }
130     }
131
132     @classmethod
133     def setUpClass(cls):
134         cls.processes = test_utils.start_tpce()
135         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
136                                                ('spdrb', cls.NODE_VERSION),
137                                                ('spdrc', cls.NODE_VERSION),
138                                                ('roadma', cls.NODE_VERSION),
139                                                ('roadmb', cls.NODE_VERSION),
140                                                ('roadmc', cls.NODE_VERSION)])
141
142     @classmethod
143     def tearDownClass(cls):
144         # pylint: disable=not-an-iterable
145         for process in cls.processes:
146             test_utils.shutdown_process(process)
147         print("all processes killed")
148
149     def setUp(self):
150         time.sleep(5)
151
152     def test_01_connect_spdrA(self):
153         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
154         self.assertEqual(response.status_code,
155                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156
157     def test_02_connect_spdrB(self):
158         response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
159         self.assertEqual(response.status_code,
160                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161
162     def test_03_connect_spdrC(self):
163         response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
164         self.assertEqual(response.status_code,
165                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166
167     def test_04_connect_rdmA(self):
168         response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
169         self.assertEqual(response.status_code,
170                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171
172     def test_05_connect_rdmB(self):
173         response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
174         self.assertEqual(response.status_code,
175                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
176
177     def test_06_connect_rdmC(self):
178         response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
179         self.assertEqual(response.status_code,
180                          requests.codes.created, test_utils.CODE_SHOULD_BE_201)
181
182     def test_07_connect_sprdA_2_N1_to_roadmA_PP3(self):
183         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
184                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
185         self.assertEqual(response.status_code, requests.codes.ok)
186         res = response.json()
187         self.assertIn('Xponder Roadm Link created successfully',
188                       res["output"]["result"])
189         time.sleep(2)
190
191     def test_08_connect_roadmA_PP3_to_spdrA_2_N1(self):
192         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
193                                                           "ROADM-A1", "1", "SRG1-PP3-TXRX")
194         self.assertEqual(response.status_code, requests.codes.ok)
195         res = response.json()
196         self.assertIn('Roadm Xponder links created successfully',
197                       res["output"]["result"])
198         time.sleep(2)
199
200     def test_09_connect_sprdC_2_N1_to_roadmC_PP3(self):
201         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
202                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
203         self.assertEqual(response.status_code, requests.codes.ok)
204         res = response.json()
205         self.assertIn('Xponder Roadm Link created successfully',
206                       res["output"]["result"])
207         time.sleep(2)
208
209     def test_10_connect_roadmC_PP3_to_spdrC_2_N1(self):
210         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
211                                                           "ROADM-C1", "1", "SRG1-PP3-TXRX")
212         self.assertEqual(response.status_code, requests.codes.ok)
213         res = response.json()
214         self.assertIn('Roadm Xponder links created successfully',
215                       res["output"]["result"])
216         time.sleep(2)
217
218     def test_11_connect_sprdB_2_N1_to_roadmB_PP1(self):
219         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
220                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
221         self.assertEqual(response.status_code, requests.codes.ok)
222         res = response.json()
223         self.assertIn('Xponder Roadm Link created successfully',
224                       res["output"]["result"])
225         time.sleep(2)
226
227     def test_12_connect_roadmB_PP1_to_spdrB_2_N1(self):
228         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
229                                                           "ROADM-B1", "1", "SRG1-PP1-TXRX")
230         self.assertEqual(response.status_code, requests.codes.ok)
231         res = response.json()
232         self.assertIn('Roadm Xponder links created successfully',
233                       res["output"]["result"])
234         time.sleep(2)
235
236     def test_13_connect_sprdB_2_N2_to_roadmB_PP2(self):
237         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
238                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
239         self.assertEqual(response.status_code, requests.codes.ok)
240         res = response.json()
241         self.assertIn('Xponder Roadm Link created successfully',
242                       res["output"]["result"])
243         time.sleep(2)
244
245     def test_14_connect_roadmB_PP2_to_spdrB_2_N2(self):
246         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
247                                                           "ROADM-B1", "1", "SRG1-PP2-TXRX")
248         self.assertEqual(response.status_code, requests.codes.ok)
249         res = response.json()
250         self.assertIn('Roadm Xponder links created successfully',
251                       res["output"]["result"])
252         time.sleep(2)
253
254     def test_15_add_omsAttributes_ROADMA_ROADMB(self):
255         # Config ROADMA-ROADMB oms-attributes
256         data = {"span": {
257             "auto-spanloss": "true",
258             "spanloss-base": 11.4,
259             "spanloss-current": 12,
260             "engineered-spanloss": 12.2,
261             "link-concatenation": [{
262                 "SRLG-Id": 0,
263                 "fiber-type": "smf",
264                 "SRLG-length": 100000,
265                 "pmd": 0.5}]}}
266         response = test_utils.add_oms_attr_request(
267             "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
268         self.assertEqual(response.status_code, requests.codes.created)
269
270     def test_16_add_omsAttributes_ROADMB_ROADMA(self):
271         # Config ROADMB-ROADMA oms-attributes
272         data = {"span": {
273             "auto-spanloss": "true",
274             "spanloss-base": 11.4,
275             "spanloss-current": 12,
276             "engineered-spanloss": 12.2,
277             "link-concatenation": [{
278                 "SRLG-Id": 0,
279                 "fiber-type": "smf",
280                 "SRLG-length": 100000,
281                 "pmd": 0.5}]}}
282         response = test_utils.add_oms_attr_request(
283             "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
284         self.assertEqual(response.status_code, requests.codes.created)
285
286     def test_17_add_omsAttributes_ROADMB_ROADMC(self):
287         # Config ROADMB-ROADMC oms-attributes
288         data = {"span": {
289             "auto-spanloss": "true",
290             "spanloss-base": 11.4,
291             "spanloss-current": 12,
292             "engineered-spanloss": 12.2,
293             "link-concatenation": [{
294                 "SRLG-Id": 0,
295                 "fiber-type": "smf",
296                 "SRLG-length": 100000,
297                 "pmd": 0.5}]}}
298         response = test_utils.add_oms_attr_request(
299             "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
300         self.assertEqual(response.status_code, requests.codes.created)
301
302     def test_18_add_omsAttributes_ROADMC_ROADMB(self):
303         # Config ROADMC-ROADMB oms-attributes
304         data = {"span": {
305             "auto-spanloss": "true",
306             "spanloss-base": 11.4,
307             "spanloss-current": 12,
308             "engineered-spanloss": 12.2,
309             "link-concatenation": [{
310                 "SRLG-Id": 0,
311                 "fiber-type": "smf",
312                 "SRLG-length": 100000,
313                 "pmd": 0.5}]}}
314         response = test_utils.add_oms_attr_request(
315             "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
316         self.assertEqual(response.status_code, requests.codes.created)
317
318     def test_19_create_OTS_ROADMA_DEG1(self):
319         response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
320         time.sleep(10)
321         self.assertEqual(response.status_code, requests.codes.ok)
322         res = response.json()
323         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
324                       res["output"]["result"])
325
326     def test_20_create_OTS_ROADMB_DEG1(self):
327         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
328         time.sleep(10)
329         self.assertEqual(response.status_code, requests.codes.ok)
330         res = response.json()
331         self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
332                       res["output"]["result"])
333
334     def test_21_create_OTS_ROADMB_DEG2(self):
335         response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
336         time.sleep(10)
337         self.assertEqual(response.status_code, requests.codes.ok)
338         res = response.json()
339         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
340                       res["output"]["result"])
341
342     def test_22_create_OTS_ROADMC_DEG2(self):
343         response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
344         time.sleep(10)
345         self.assertEqual(response.status_code, requests.codes.ok)
346         res = response.json()
347         self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
348                       res["output"]["result"])
349
350     def test_23_calculate_span_loss_base_all(self):
351         url = "{}/operations/transportpce-olm:calculate-spanloss-base"
352         data = {
353             "input": {
354                 "src-type": "all"
355             }
356         }
357         response = test_utils.post_request(url, data)
358         self.assertEqual(response.status_code, requests.codes.ok)
359         res = response.json()
360         self.assertIn('Success',
361                       res["output"]["result"])
362         self.assertIn({
363             "spanloss": "25.7",
364             "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
365         }, res["output"]["spans"])
366         self.assertIn({
367             "spanloss": "17.6",
368             "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
369         }, res["output"]["spans"])
370         self.assertIn({
371             "spanloss": "23.6",
372             "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
373         }, res["output"]["spans"])
374         self.assertIn({
375             "spanloss": "23.6",
376             "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
377         }, res["output"]["spans"])
378         self.assertIn({
379             "spanloss": "25.7",
380             "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
381         }, res["output"]["spans"])
382         self.assertIn({
383             "spanloss": "17.6",
384             "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
385         }, res["output"]["spans"])
386         time.sleep(5)
387
388     def test_24_check_otn_topology(self):
389         response = test_utils.get_otn_topo_request()
390         self.assertEqual(response.status_code, requests.codes.ok)
391         res = response.json()
392         nbNode = len(res['network'][0]['node'])
393         self.assertEqual(nbNode, 9, 'There should be 9 nodes')
394         self.assertNotIn('ietf-network-topology:link', res['network'][0],
395                          'otn-topology should have no link')
396
397 # test service-create for OCH-OTU4 service from spdrA to spdrB
398     def test_25_create_OCH_OTU4_service_AB(self):
399         response = test_utils.service_create_request(self.cr_serv_sample_data)
400         self.assertEqual(response.status_code, requests.codes.ok)
401         res = response.json()
402         self.assertIn('PCE calculation in progress',
403                       res['output']['configuration-response-common']['response-message'])
404         time.sleep(self.WAITING)
405
406     def test_26_get_OCH_OTU4_service_AB(self):
407         response = test_utils.get_service_list_request(
408             "services/service-OCH-OTU4-AB")
409         self.assertEqual(response.status_code, requests.codes.ok)
410         res = response.json()
411         self.assertEqual(
412             res['services'][0]['administrative-state'], 'inService')
413         self.assertEqual(
414             res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
415         self.assertEqual(
416             res['services'][0]['connection-type'], 'infrastructure')
417         self.assertEqual(
418             res['services'][0]['lifecycle-state'], 'planned')
419         time.sleep(2)
420
421 # Check correct configuration of devices
422     def test_27_check_interface_och_spdra(self):
423         response = test_utils.check_netconf_node_request(
424             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
425         self.assertEqual(response.status_code, requests.codes.ok)
426         res = response.json()
427         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
428                                    'administrative-state': 'inService',
429                                    'supporting-circuit-pack-name': 'CP5-CFP',
430                                    'type': 'org-openroadm-interfaces:opticalChannel',
431                                    'supporting-port': 'CP5-CFP-P1'
432                                    }, **res['interface'][0]),
433                              res['interface'][0])
434
435         self.assertDictEqual(
436             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
437              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
438             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
439
440     def test_28_check_interface_OTU4_spdra(self):
441         response = test_utils.check_netconf_node_request(
442             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
443         self.assertEqual(response.status_code, requests.codes.ok)
444         res = response.json()
445         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
446                         'administrative-state': 'inService',
447                         'supporting-circuit-pack-name': 'CP5-CFP',
448                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
449                         'type': 'org-openroadm-interfaces:otnOtu',
450                         'supporting-port': 'CP5-CFP-P1'
451                         }
452         input_dict_2 = {'tx-sapi': 'exT821pFtOc=',
453                         'expected-dapi': 'exT821pFtOc=',
454                         'rate': 'org-openroadm-otn-common-types:OTU4',
455                         'fec': 'scfec'
456                         }
457         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
458                              res['interface'][0])
459
460         self.assertDictEqual(input_dict_2,
461                              res['interface'][0]
462                              ['org-openroadm-otn-otu-interfaces:otu'])
463
464     def test_29_check_interface_och_spdrB(self):
465         response = test_utils.check_netconf_node_request(
466             "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
467         self.assertEqual(response.status_code, requests.codes.ok)
468         res = response.json()
469         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
470                                    'administrative-state': 'inService',
471                                    'supporting-circuit-pack-name': 'CP5-CFP',
472                                    'type': 'org-openroadm-interfaces:opticalChannel',
473                                    'supporting-port': 'CP5-CFP-P1'
474                                    }, **res['interface'][0]),
475                              res['interface'][0])
476
477         self.assertDictEqual(
478             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
479              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
480             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
481
482     def test_30_check_interface_OTU4_spdrB(self):
483         response = test_utils.check_netconf_node_request(
484             "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
485         self.assertEqual(response.status_code, requests.codes.ok)
486         res = response.json()
487         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
488                         'administrative-state': 'inService',
489                         'supporting-circuit-pack-name': 'CP5-CFP',
490                         'supporting-interface': 'XPDR2-NETWORK1-761:768',
491                         'type': 'org-openroadm-interfaces:otnOtu',
492                         'supporting-port': 'CP5-CFP-P1'
493                         }
494         input_dict_2 = {'tx-dapi': 'exT821pFtOc=',
495                         'expected-sapi': 'exT821pFtOc=',
496                         'tx-sapi': 'HPQZi9Cb3Aw=',
497                         'expected-dapi': 'HPQZi9Cb3Aw=',
498                         'rate': 'org-openroadm-otn-common-types:OTU4',
499                         'fec': 'scfec'
500                         }
501
502         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
503                              res['interface'][0])
504
505         self.assertDictEqual(input_dict_2,
506                              res['interface'][0]
507                              ['org-openroadm-otn-otu-interfaces:otu'])
508
509     def test_31_check_no_interface_ODU4_spdra(self):
510         response = test_utils.check_netconf_node_request(
511             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
512         self.assertEqual(response.status_code, requests.codes.conflict)
513         res = response.json()
514         self.assertIn(
515             {"error-type": "application", "error-tag": "data-missing",
516              "error-message": "Request could not be completed because the relevant data model content does not exist"},
517             res['errors']['error'])
518
519     def test_32_check_openroadm_topo_spdra(self):
520         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
521         self.assertEqual(response.status_code, requests.codes.ok)
522         res = response.json()
523         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
524         self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
525         self.assertEqual({u'frequency': 196.1,
526                           u'width': 40},
527                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
528         self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
529                          ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
530         time.sleep(3)
531
532     def test_33_check_openroadm_topo_ROADMA_SRG(self):
533         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
534         self.assertEqual(response.status_code, requests.codes.ok)
535         res = response.json()
536         freq_map = base64.b64decode(
537             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
538         freq_map_array = [int(x) for x in freq_map]
539         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
540         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
541         for ele in liste_tp:
542             if ele['tp-id'] == 'SRG1-PP3-TXRX':
543                 freq_map = base64.b64decode(
544                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
545                 freq_map_array = [int(x) for x in freq_map]
546                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
547             if ele['tp-id'] == 'SRG1-PP2-TXRX':
548                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
549         time.sleep(3)
550
551     def test_33_check_openroadm_topo_ROADMA_DEG1(self):
552         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
553         self.assertEqual(response.status_code, requests.codes.ok)
554         res = response.json()
555         freq_map = base64.b64decode(
556             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
557         freq_map_array = [int(x) for x in freq_map]
558         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
559         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
560         for ele in liste_tp:
561             if ele['tp-id'] == 'DEG1-CTP-TXRX':
562                 freq_map = base64.b64decode(
563                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
564                 freq_map_array = [int(x) for x in freq_map]
565                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
566             if ele['tp-id'] == 'DEG1-TTP-TXRX':
567                 freq_map = base64.b64decode(
568                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
569                 freq_map_array = [int(x) for x in freq_map]
570                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
571         time.sleep(3)
572
573     def test_34_check_otn_topo_otu4_links(self):
574         response = test_utils.get_otn_topo_request()
575         self.assertEqual(response.status_code, requests.codes.ok)
576         res = response.json()
577         nb_links = len(res['network'][0]['ietf-network-topology:link'])
578         self.assertEqual(nb_links, 2)
579         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
580                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
581         for link in res['network'][0]['ietf-network-topology:link']:
582             self.assertIn(link['link-id'], listLinkId)
583             self.assertEqual(
584                 link['transportpce-topology:otn-link-type'], 'OTU4')
585             self.assertEqual(
586                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
587             self.assertEqual(
588                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
589             self.assertEqual(
590                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
591             self.assertIn(
592                 link['org-openroadm-common-network:opposite-link'], listLinkId)
593
594 # test service-create for OCH-OTU4 service from spdrB to spdrC
595     def test_35_create_OCH_OTU4_service_BC(self):
596         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
597         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
598         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
599         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
600         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
601         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
602         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
603         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
604         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
605         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
606         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
607         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
608         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
609
610         response = test_utils.service_create_request(self.cr_serv_sample_data)
611         self.assertEqual(response.status_code, requests.codes.ok)
612         res = response.json()
613         self.assertIn('PCE calculation in progress',
614                       res['output']['configuration-response-common']['response-message'])
615         time.sleep(self.WAITING)
616
617     def test_36_get_OCH_OTU4_service_BC(self):
618         response = test_utils.get_service_list_request(
619             "services/service-OCH-OTU4-BC")
620         self.assertEqual(response.status_code, requests.codes.ok)
621         res = response.json()
622         self.assertEqual(
623             res['services'][0]['administrative-state'], 'inService')
624         self.assertEqual(
625             res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
626         self.assertEqual(
627             res['services'][0]['connection-type'], 'infrastructure')
628         self.assertEqual(
629             res['services'][0]['lifecycle-state'], 'planned')
630         time.sleep(2)
631
632 # Check correct configuration of devices
633     def test_37_check_interface_och_spdrB(self):
634         response = test_utils.check_netconf_node_request(
635             "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
636         self.assertEqual(response.status_code, requests.codes.ok)
637         res = response.json()
638         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
639                                    'administrative-state': 'inService',
640                                    'supporting-circuit-pack-name': 'CP6-CFP',
641                                    'type': 'org-openroadm-interfaces:opticalChannel',
642                                    'supporting-port': 'CP1-CFP0-P1'
643                                    }, **res['interface'][0]),
644                              res['interface'][0])
645
646         self.assertDictEqual(
647             {u'frequency': 196.05, u'rate': u'org-openroadm-common-types:R100G',
648              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
649             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
650
651     def test_38_check_interface_OTU4_spdrB(self):
652         response = test_utils.check_netconf_node_request(
653             "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
654         self.assertEqual(response.status_code, requests.codes.ok)
655         res = response.json()
656         input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
657                         'administrative-state': 'inService',
658                         'supporting-circuit-pack-name': 'CP6-CFP',
659                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
660                         'type': 'org-openroadm-interfaces:otnOtu',
661                         'supporting-port': 'CP6-CFP-P1'
662                         }
663         input_dict_2 = {'tx-sapi': 'HPQZi9Cb3A8=',
664                         'expected-dapi': 'HPQZi9Cb3A8=',
665                         'rate': 'org-openroadm-otn-common-types:OTU4',
666                         'fec': 'scfec'
667                         }
668         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
669                              res['interface'][0])
670
671         self.assertDictEqual(input_dict_2,
672                              res['interface'][0]
673                              ['org-openroadm-otn-otu-interfaces:otu'])
674
675     def test_39_check_interface_och_spdrC(self):
676         response = test_utils.check_netconf_node_request(
677             "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
678         self.assertEqual(response.status_code, requests.codes.ok)
679         res = response.json()
680         self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
681                                    'administrative-state': 'inService',
682                                    'supporting-circuit-pack-name': 'CP5-CFP',
683                                    'type': 'org-openroadm-interfaces:opticalChannel',
684                                    'supporting-port': 'CP5-CFP-P1'
685                                    }, **res['interface'][0]),
686                              res['interface'][0])
687
688         self.assertDictEqual(
689             {u'frequency': 196.05, u'rate': u'org-openroadm-common-types:R100G',
690              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
691             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
692
693     def test_40_check_interface_OTU4_spdrC(self):
694         response = test_utils.check_netconf_node_request(
695             "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
696         self.assertEqual(response.status_code, requests.codes.ok)
697         res = response.json()
698         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
699                         'administrative-state': 'inService',
700                         'supporting-circuit-pack-name': 'CP5-CFP',
701                         'supporting-interface': 'XPDR2-NETWORK1-753:760',
702                         'type': 'org-openroadm-interfaces:otnOtu',
703                         'supporting-port': 'CP5-CFP-P1'
704                         }
705         input_dict_2 = {'tx-dapi': 'HPQZi9Cb3A8=',
706                         'expected-sapi': 'HPQZi9Cb3A8=',
707                         'tx-sapi': 'ALx70DYYfGTx',
708                         'expected-dapi': 'ALx70DYYfGTx',
709                         'rate': 'org-openroadm-otn-common-types:OTU4',
710                         'fec': 'scfec'
711                         }
712
713         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
714                              res['interface'][0])
715
716         self.assertDictEqual(input_dict_2,
717                              res['interface'][0]
718                              ['org-openroadm-otn-otu-interfaces:otu'])
719
720     def test_41_check_no_interface_ODU4_spdrB(self):
721         response = test_utils.check_netconf_node_request(
722             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
723         self.assertEqual(response.status_code, requests.codes.conflict)
724         res = response.json()
725         self.assertIn(
726             {"error-type": "application", "error-tag": "data-missing",
727              "error-message": "Request could not be completed because the relevant data model content does not exist"},
728             res['errors']['error'])
729
730     def test_42_check_openroadm_topo_spdrB(self):
731         response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
732         self.assertEqual(response.status_code, requests.codes.ok)
733         res = response.json()
734         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
735         for ele in liste_tp:
736             if ele['tp-id'] == 'XPDR2-NETWORK1':
737                 self.assertEqual({u'frequency': 196.1,
738                                   u'width': 40},
739                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
740                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
741                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
742             elif ele['tp-id'] == 'XPDR2-NETWORK2':
743                 self.assertEqual({u'frequency': 196.05,
744                                   u'width': 40},
745                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
746                 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
747                                  ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
748             else:
749                 print("ele = {}".format(ele))
750                 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
751         time.sleep(3)
752
753     def test_43_check_openroadm_topo_ROADMB_SRG1(self):
754         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
755         self.assertEqual(response.status_code, requests.codes.ok)
756         res = response.json()
757         freq_map = base64.b64decode(
758             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
759         freq_map_array = [int(x) for x in freq_map]
760         self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
761         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
762         self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
763         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
764         for ele in liste_tp:
765             if ele['tp-id'] == 'SRG1-PP1-TXRX':
766                 freq_map = base64.b64decode(
767                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
768                 freq_map_array = [int(x) for x in freq_map]
769                 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
770             if ele['tp-id'] == 'SRG1-PP2-TXRX':
771                 freq_map = base64.b64decode(
772                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
773                 freq_map_array = [int(x) for x in freq_map]
774                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
775             if ele['tp-id'] == 'SRG1-PP3-TXRX':
776                 self.assertNotIn('avail-freq-maps', dict.keys(ele))
777         time.sleep(3)
778
779     def test_44_check_openroadm_topo_ROADMB_DEG2(self):
780         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
781         self.assertEqual(response.status_code, requests.codes.ok)
782         res = response.json()
783         freq_map = base64.b64decode(
784             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
785         freq_map_array = [int(x) for x in freq_map]
786         self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
787         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
788         for ele in liste_tp:
789             if ele['tp-id'] == 'DEG2-CTP-TXRX':
790                 freq_map = base64.b64decode(
791                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
792                 freq_map_array = [int(x) for x in freq_map]
793                 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
794             if ele['tp-id'] == 'DEG2-TTP-TXRX':
795                 freq_map = base64.b64decode(
796                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
797                 freq_map_array = [int(x) for x in freq_map]
798                 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
799         time.sleep(3)
800
801     def test_45_check_otn_topo_otu4_links(self):
802         response = test_utils.get_otn_topo_request()
803         self.assertEqual(response.status_code, requests.codes.ok)
804         res = response.json()
805         nb_links = len(res['network'][0]['ietf-network-topology:link'])
806         self.assertEqual(nb_links, 4)
807         listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
808                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
809                       'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
810                       'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
811         for link in res['network'][0]['ietf-network-topology:link']:
812             self.assertIn(link['link-id'], listLinkId)
813             self.assertEqual(
814                 link['transportpce-topology:otn-link-type'], 'OTU4')
815             self.assertEqual(
816                 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
817             self.assertEqual(
818                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
819             self.assertEqual(
820                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
821             self.assertIn(
822                 link['org-openroadm-common-network:opposite-link'], listLinkId)
823
824 # test service-create for 100GE service from spdrA to spdrC via spdrB
825     def test_46_create_100GE_service_ABC(self):
826         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
827         self.cr_serv_sample_data["input"]["connection-type"] = "service"
828         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
829         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
830         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
831         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
832         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
833         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
834         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
835         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
836         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
837         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
838         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
839         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
840         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
841         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
842         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
843         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
844
845         response = test_utils.service_create_request(self.cr_serv_sample_data)
846         self.assertEqual(response.status_code, requests.codes.ok)
847         res = response.json()
848         self.assertIn('PCE calculation in progress',
849                       res['output']['configuration-response-common']['response-message'])
850         time.sleep(self.WAITING)
851
852     def test_47_get_100GE_service_ABC(self):
853         response = test_utils.get_service_list_request(
854             "services/service-100GE-ABC")
855         self.assertEqual(response.status_code, requests.codes.ok)
856         res = response.json()
857         self.assertEqual(
858             res['services'][0]['administrative-state'], 'inService')
859         self.assertEqual(
860             res['services'][0]['service-name'], 'service-100GE-ABC')
861         self.assertEqual(
862             res['services'][0]['connection-type'], 'service')
863         self.assertEqual(
864             res['services'][0]['lifecycle-state'], 'planned')
865         time.sleep(2)
866
867     def test_48_check_interface_100GE_CLIENT_spdra(self):
868         response = test_utils.check_netconf_node_request(
869             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
870         self.assertEqual(response.status_code, requests.codes.ok)
871         res = response.json()
872         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
873                       'administrative-state': 'inService',
874                       'supporting-circuit-pack-name': 'CP2-QSFP1',
875                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
876                       'supporting-port': 'CP2-QSFP1-P1'
877                       }
878         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
879                              res['interface'][0])
880         self.assertDictEqual(
881             {u'speed': 100000,
882              u'fec': 'off'},
883             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
884
885     def test_49_check_interface_ODU4_CLIENT_spdra(self):
886         response = test_utils.check_netconf_node_request(
887             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
888         self.assertEqual(response.status_code, requests.codes.ok)
889         res = response.json()
890         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
891                         'administrative-state': 'inService',
892                         'supporting-circuit-pack-name': 'CP2-QSFP1',
893                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
894                         'type': 'org-openroadm-interfaces:otnOdu',
895                         'supporting-port': 'CP2-QSFP1-P1'}
896         input_dict_2 = {
897             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
898             'rate': 'org-openroadm-otn-common-types:ODU4',
899             'monitoring-mode': 'terminated'}
900
901         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
902                              res['interface'][0])
903         self.assertDictEqual(dict(input_dict_2,
904                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
905                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
906         self.assertDictEqual(
907             {u'payload-type': u'07', u'exp-payload-type': u'07'},
908             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
909
910     def test_50_check_interface_ODU4_NETWORK_spdra(self):
911         response = test_utils.check_netconf_node_request(
912             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
913         self.assertEqual(response.status_code, requests.codes.ok)
914         res = response.json()
915         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
916                         'administrative-state': 'inService',
917                         'supporting-circuit-pack-name': 'CP5-CFP',
918                         'type': 'org-openroadm-interfaces:otnOdu',
919                         'supporting-port': 'CP5-CFP-P1'}
920         input_dict_2 = {
921             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
922             'rate': 'org-openroadm-otn-common-types:ODU4',
923             'monitoring-mode': 'monitored'}
924
925         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
926                              res['interface'][0])
927         self.assertDictEqual(dict(input_dict_2,
928                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
929                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
930         self.assertDictEqual(
931             {u'payload-type': u'07', u'exp-payload-type': u'07'},
932             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
933
934     def test_51_check_ODU4_connection_spdra(self):
935         response = test_utils.check_netconf_node_request(
936             "SPDR-SA1",
937             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
938         self.assertEqual(response.status_code, requests.codes.ok)
939         res = response.json()
940         input_dict_1 = {
941             'connection-name':
942             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
943             'direction': 'bidirectional'
944         }
945
946         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
947                              res['odu-connection'][0])
948         self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4'},
949                              res['odu-connection'][0]['destination'])
950         self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4'},
951                              res['odu-connection'][0]['source'])
952
953     def test_52_check_interface_100GE_CLIENT_spdrc(self):
954         response = test_utils.check_netconf_node_request(
955             "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
956         self.assertEqual(response.status_code, requests.codes.ok)
957         res = response.json()
958         input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
959                       'administrative-state': 'inService',
960                       'supporting-circuit-pack-name': 'CP2-QSFP1',
961                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
962                       'supporting-port': 'CP2-QSFP1-P1'
963                       }
964         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
965                              res['interface'][0])
966         self.assertDictEqual(
967             {u'speed': 100000,
968              u'fec': 'off'},
969             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
970
971     def test_53_check_interface_ODU4_CLIENT_spdrc(self):
972         response = test_utils.check_netconf_node_request(
973             "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
974         self.assertEqual(response.status_code, requests.codes.ok)
975         res = response.json()
976         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
977                         'administrative-state': 'inService',
978                         'supporting-circuit-pack-name': 'CP2-QSFP1',
979                         'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
980                         'type': 'org-openroadm-interfaces:otnOdu',
981                         'supporting-port': 'CP2-QSFP1-P1'}
982         input_dict_2 = {
983             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
984             'rate': 'org-openroadm-otn-common-types:ODU4',
985             'monitoring-mode': 'terminated'}
986
987         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
988                              res['interface'][0])
989         self.assertDictEqual(dict(input_dict_2,
990                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
991                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
992         self.assertDictEqual(
993             {u'payload-type': u'07', u'exp-payload-type': u'07'},
994             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
995
996     def test_54_check_interface_ODU4_NETWORK_spdrc(self):
997         response = test_utils.check_netconf_node_request(
998             "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
999         self.assertEqual(response.status_code, requests.codes.ok)
1000         res = response.json()
1001         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1002                         'administrative-state': 'inService',
1003                         'supporting-circuit-pack-name': 'CP5-CFP',
1004                         'type': 'org-openroadm-interfaces:otnOdu',
1005                         'supporting-port': 'CP5-CFP-P1'}
1006         input_dict_2 = {
1007             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1008             'rate': 'org-openroadm-otn-common-types:ODU4',
1009             'monitoring-mode': 'monitored'}
1010
1011         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1012                              res['interface'][0])
1013         self.assertDictEqual(dict(input_dict_2,
1014                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1015                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1016         self.assertDictEqual(
1017             {u'payload-type': u'07', u'exp-payload-type': u'07'},
1018             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1019
1020     def test_55_check_ODU4_connection_spdrc(self):
1021         response = test_utils.check_netconf_node_request(
1022             "SPDR-SC1",
1023             "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1024         self.assertEqual(response.status_code, requests.codes.ok)
1025         res = response.json()
1026         input_dict_1 = {
1027             'connection-name':
1028             'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1029             'direction': 'bidirectional'
1030         }
1031
1032         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1033                              res['odu-connection'][0])
1034         self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4'},
1035                              res['odu-connection'][0]['destination'])
1036         self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4'},
1037                              res['odu-connection'][0]['source'])
1038
1039     def test_56_check_interface_ODU4_NETWORK1_spdrb(self):
1040         response = test_utils.check_netconf_node_request(
1041             "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1042         self.assertEqual(response.status_code, requests.codes.ok)
1043         res = response.json()
1044         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1045                         'administrative-state': 'inService',
1046                         'supporting-circuit-pack-name': 'CP5-CFP',
1047                         'type': 'org-openroadm-interfaces:otnOdu',
1048                         'supporting-port': 'CP5-CFP-P1'}
1049         input_dict_2 = {
1050             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1051             'rate': 'org-openroadm-otn-common-types:ODU4',
1052             'monitoring-mode': 'monitored'}
1053
1054         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1055                              res['interface'][0])
1056         self.assertDictEqual(dict(input_dict_2,
1057                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1058                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1059         self.assertDictEqual(
1060             {u'payload-type': u'07', u'exp-payload-type': u'07'},
1061             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1062
1063     def test_57_check_interface_ODU4_NETWORK2_spdrb(self):
1064         response = test_utils.check_netconf_node_request(
1065             "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1066         self.assertEqual(response.status_code, requests.codes.ok)
1067         res = response.json()
1068         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1069                         'administrative-state': 'inService',
1070                         'supporting-circuit-pack-name': 'CP6-CFP',
1071                         'type': 'org-openroadm-interfaces:otnOdu',
1072                         'supporting-port': 'CP6-CFP-P1'}
1073         input_dict_2 = {
1074             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1075             'rate': 'org-openroadm-otn-common-types:ODU4',
1076             'monitoring-mode': 'monitored'}
1077
1078         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1079                              res['interface'][0])
1080         self.assertDictEqual(dict(input_dict_2,
1081                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1082                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1083         self.assertDictEqual(
1084             {u'payload-type': u'07', u'exp-payload-type': u'07'},
1085             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1086
1087     def test_58_check_ODU4_connection_spdrb(self):
1088         response = test_utils.check_netconf_node_request(
1089             "SPDR-SB1",
1090             "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1091         self.assertEqual(response.status_code, requests.codes.ok)
1092         res = response.json()
1093         input_dict_1 = {
1094             'connection-name':
1095             'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1096             'direction': 'bidirectional'
1097         }
1098
1099         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1100                              res['odu-connection'][0])
1101         self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK2-ODU4'},
1102                              res['odu-connection'][0]['destination'])
1103         self.assertDictEqual({u'src-if': u'XPDR2-NETWORK1-ODU4'},
1104                              res['odu-connection'][0]['source'])
1105
1106     def test_59_check_otn_topo_links(self):
1107         response = test_utils.get_otn_topo_request()
1108         self.assertEqual(response.status_code, requests.codes.ok)
1109         res = response.json()
1110         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1111         self.assertEqual(nb_links, 4)
1112         for link in res['network'][0]['ietf-network-topology:link']:
1113             self.assertEqual(
1114                     link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1115             self.assertEqual(
1116                     link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1117
1118     def test_60_delete_service_100GE_ABC(self):
1119         response = test_utils.service_delete_request("service-100GE-ABC")
1120         self.assertEqual(response.status_code, requests.codes.ok)
1121         res = response.json()
1122         self.assertIn('Renderer service delete in progress',
1123                       res['output']['configuration-response-common']['response-message'])
1124         time.sleep(self.WAITING)
1125
1126     def test_61_check_service_list(self):
1127         response = test_utils.get_service_list_request("")
1128         self.assertEqual(response.status_code, requests.codes.ok)
1129         res = response.json()
1130         self.assertEqual(len(res['service-list']['services']), 2)
1131         time.sleep(2)
1132
1133     def test_62_check_no_ODU4_connection_spdra(self):
1134         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1135         self.assertEqual(response.status_code, requests.codes.ok)
1136         res = response.json()
1137         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1138         time.sleep(1)
1139
1140     def test_63_check_no_interface_ODU4_NETWORK_spdra(self):
1141         response = test_utils.check_netconf_node_request(
1142             "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1143         self.assertEqual(response.status_code, requests.codes.conflict)
1144
1145     def test_64_check_no_interface_ODU4_CLIENT_spdra(self):
1146         response = test_utils.check_netconf_node_request(
1147             "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1148         self.assertEqual(response.status_code, requests.codes.conflict)
1149
1150     def test_65_check_no_interface_100GE_CLIENT_spdra(self):
1151         response = test_utils.check_netconf_node_request(
1152             "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1153         self.assertEqual(response.status_code, requests.codes.conflict)
1154
1155     def test_66_check_otn_topo_links(self):
1156         self.test_45_check_otn_topo_otu4_links()
1157
1158     def test_67_delete_OCH_OTU4_service_AB(self):
1159         response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1160         self.assertEqual(response.status_code, requests.codes.ok)
1161         res = response.json()
1162         self.assertIn('Renderer service delete in progress',
1163                       res['output']['configuration-response-common']['response-message'])
1164         time.sleep(self.WAITING)
1165
1166     def test_68_delete_OCH_OTU4_service_BC(self):
1167         response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1168         self.assertEqual(response.status_code, requests.codes.ok)
1169         res = response.json()
1170         self.assertIn('Renderer service delete in progress',
1171                       res['output']['configuration-response-common']['response-message'])
1172         time.sleep(self.WAITING)
1173
1174     def test_69_get_no_service(self):
1175         response = test_utils.get_service_list_request("")
1176         self.assertEqual(response.status_code, requests.codes.conflict)
1177         res = response.json()
1178         self.assertIn(
1179             {"error-type": "application", "error-tag": "data-missing",
1180              "error-message": "Request could not be completed because the relevant data model content does not exist"},
1181             res['errors']['error'])
1182         time.sleep(1)
1183
1184     def test_70_check_no_interface_OTU4_spdra(self):
1185         response = test_utils.check_netconf_node_request(
1186             "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1187         self.assertEqual(response.status_code, requests.codes.conflict)
1188
1189     def test_71_check_no_interface_OCH_spdra(self):
1190         response = test_utils.check_netconf_node_request(
1191             "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1192         self.assertEqual(response.status_code, requests.codes.conflict)
1193
1194     def test_72_getLinks_OtnTopology(self):
1195         response = test_utils.get_otn_topo_request()
1196         self.assertEqual(response.status_code, requests.codes.ok)
1197         res = response.json()
1198         self.assertNotIn('ietf-network-topology:link', res['network'][0])
1199
1200     def test_73_check_openroadm_topo_spdra(self):
1201         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1202         self.assertEqual(response.status_code, requests.codes.ok)
1203         res = response.json()
1204         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1205         self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1206         self.assertNotIn('wavelength', dict.keys(
1207             tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
1208         time.sleep(3)
1209
1210     def test_74_check_openroadm_topo_ROADMB_SRG1(self):
1211         response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1212         self.assertEqual(response.status_code, requests.codes.ok)
1213         res = response.json()
1214         freq_map = base64.b64decode(
1215             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1216         freq_map_array = [int(x) for x in freq_map]
1217         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1218         self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1219         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1220         for ele in liste_tp:
1221             if ele['tp-id'] == 'SRG1-PP1-TXRX':
1222                 freq_map = base64.b64decode(
1223                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1224                 freq_map_array = [int(x) for x in freq_map]
1225                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1226             if ele['tp-id'] == 'SRG1-PP2-TXRX':
1227                 freq_map = base64.b64decode(
1228                     ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1229                 freq_map_array = [int(x) for x in freq_map]
1230                 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1231         time.sleep(3)
1232
1233     def test_75_check_openroadm_topo_ROADMB_DEG1(self):
1234         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1235         self.assertEqual(response.status_code, requests.codes.ok)
1236         res = response.json()
1237         freq_map = base64.b64decode(
1238             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1239         freq_map_array = [int(x) for x in freq_map]
1240         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1241         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1242         for ele in liste_tp:
1243             if ele['tp-id'] == 'DEG1-CTP-TXRX':
1244                 freq_map = base64.b64decode(
1245                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1246                 freq_map_array = [int(x) for x in freq_map]
1247                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1248             if ele['tp-id'] == 'DEG1-TTP-TXRX':
1249                 freq_map = base64.b64decode(
1250                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1251                 freq_map_array = [int(x) for x in freq_map]
1252                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1253         time.sleep(3)
1254
1255     def test_76_check_openroadm_topo_ROADMB_DEG2(self):
1256         response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1257         self.assertEqual(response.status_code, requests.codes.ok)
1258         res = response.json()
1259         freq_map = base64.b64decode(
1260             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1261         freq_map_array = [int(x) for x in freq_map]
1262         self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1263         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1264         for ele in liste_tp:
1265             if ele['tp-id'] == 'DEG2-CTP-TXRX':
1266                 freq_map = base64.b64decode(
1267                     ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1268                 freq_map_array = [int(x) for x in freq_map]
1269                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1270             if ele['tp-id'] == 'DEG2-TTP-TXRX':
1271                 freq_map = base64.b64decode(
1272                     ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1273                 freq_map_array = [int(x) for x in freq_map]
1274                 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1275         time.sleep(3)
1276
1277     def test_77_disconnect_xponders_from_roadm(self):
1278         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1279         response = test_utils.get_ordm_topo_request("")
1280         self.assertEqual(response.status_code, requests.codes.ok)
1281         res = response.json()
1282         links = res['network'][0]['ietf-network-topology:link']
1283         for link in links:
1284             if ((link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1285                     link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT")
1286                 and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1287                 link_name = link["link-id"]
1288                 response = test_utils.delete_request(url+link_name)
1289                 self.assertEqual(response.status_code, requests.codes.ok)
1290
1291     def test_78_disconnect_spdrB(self):
1292         response = test_utils.unmount_device("SPDR-SB1")
1293         self.assertEqual(response.status_code, requests.codes.ok,
1294                          test_utils.CODE_SHOULD_BE_200)
1295
1296     def test_79_disconnect_roadmB(self):
1297         response = test_utils.unmount_device("ROADM-B1")
1298         self.assertEqual(response.status_code, requests.codes.ok,
1299                          test_utils.CODE_SHOULD_BE_200)
1300
1301     def test_80_remove_roadm_to_roadm_links(self):
1302         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1303         response = test_utils.get_ordm_topo_request("")
1304         self.assertEqual(response.status_code, requests.codes.ok)
1305         res = response.json()
1306         links = res['network'][0]['ietf-network-topology:link']
1307         for link in links:
1308             if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1309                 and 'ROADM-B1' in link['link-id']):
1310                 link_name = link["link-id"]
1311                 response = test_utils.delete_request(url+link_name)
1312                 self.assertEqual(response.status_code, requests.codes.ok)
1313
1314     def test_81_add_omsAttributes_ROADMA_ROADMC(self):
1315         # Config ROADMA-ROADMC oms-attributes
1316         data = {"span": {
1317             "auto-spanloss": "true",
1318             "spanloss-base": 11.4,
1319             "spanloss-current": 12,
1320             "engineered-spanloss": 12.2,
1321             "link-concatenation": [{
1322                 "SRLG-Id": 0,
1323                 "fiber-type": "smf",
1324                 "SRLG-length": 100000,
1325                 "pmd": 0.5}]}}
1326         response = test_utils.add_oms_attr_request(
1327             "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1328         self.assertEqual(response.status_code, requests.codes.created)
1329
1330     def test_82_add_omsAttributes_ROADMC_ROADMA(self):
1331         # Config ROADMC-ROADMA oms-attributes
1332         data = {"span": {
1333             "auto-spanloss": "true",
1334             "spanloss-base": 11.4,
1335             "spanloss-current": 12,
1336             "engineered-spanloss": 12.2,
1337             "link-concatenation": [{
1338                 "SRLG-Id": 0,
1339                 "fiber-type": "smf",
1340                 "SRLG-length": 100000,
1341                 "pmd": 0.5}]}}
1342         response = test_utils.add_oms_attr_request(
1343             "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1344         self.assertEqual(response.status_code, requests.codes.created)
1345
1346     def test_83_create_OCH_OTU4_service_AC(self):
1347         self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1348         self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1349         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1350         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1351         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1352         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1353         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1354         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1355         self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1356         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1357         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1358         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1359         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1360         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1361         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1362         self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1363         response = test_utils.service_create_request(self.cr_serv_sample_data)
1364         self.assertEqual(response.status_code, requests.codes.ok)
1365         res = response.json()
1366         self.assertIn('PCE calculation in progress',
1367                       res['output']['configuration-response-common']['response-message'])
1368         time.sleep(self.WAITING)
1369
1370     def test_84_get_OCH_OTU4_service_AC(self):
1371         response = test_utils.get_service_list_request(
1372             "services/service-OCH-OTU4-AC")
1373         self.assertEqual(response.status_code, requests.codes.ok)
1374         res = response.json()
1375         self.assertEqual(
1376             res['services'][0]['administrative-state'], 'inService')
1377         self.assertEqual(
1378             res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1379         self.assertEqual(
1380             res['services'][0]['connection-type'], 'infrastructure')
1381         self.assertEqual(
1382             res['services'][0]['lifecycle-state'], 'planned')
1383         time.sleep(2)
1384
1385 # test service-create for 100GE service from spdrA to spdrC via spdrB
1386     def test_85_create_100GE_service_AC(self):
1387         self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1388         self.cr_serv_sample_data["input"]["connection-type"] = "service"
1389         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1390         self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1391         self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1392         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1393         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1394         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1395         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1396         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1397         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1398         self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1399         self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1400         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1401         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1402         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1403         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1404         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1405
1406         response = test_utils.service_create_request(self.cr_serv_sample_data)
1407         self.assertEqual(response.status_code, requests.codes.ok)
1408         res = response.json()
1409         self.assertIn('PCE calculation in progress',
1410                       res['output']['configuration-response-common']['response-message'])
1411         time.sleep(self.WAITING)
1412
1413     def test_86_get_100GE_service_AC(self):
1414         response = test_utils.get_service_list_request("services/service-100GE-AC")
1415         self.assertEqual(response.status_code, requests.codes.ok)
1416         res = response.json()
1417         self.assertEqual(
1418             res['services'][0]['administrative-state'], 'inService')
1419         self.assertEqual(
1420             res['services'][0]['service-name'], 'service-100GE-AC')
1421         self.assertEqual(
1422             res['services'][0]['connection-type'], 'service')
1423         self.assertEqual(
1424             res['services'][0]['lifecycle-state'], 'planned')
1425         time.sleep(2)
1426
1427     def test_87_check_configuration_spdra(self):
1428         self.test_48_check_interface_100GE_CLIENT_spdra()
1429         self.test_49_check_interface_ODU4_CLIENT_spdra()
1430         self.test_50_check_interface_ODU4_NETWORK_spdra()
1431         self.test_51_check_ODU4_connection_spdra()
1432
1433
1434     def test_88_check_configuration_spdrc(self):
1435         self.test_52_check_interface_100GE_CLIENT_spdrc()
1436         self.test_53_check_interface_ODU4_CLIENT_spdrc()
1437         self.test_54_check_interface_ODU4_NETWORK_spdrc()
1438         self.test_55_check_ODU4_connection_spdrc()
1439
1440     def test_89_check_otn_topo_links(self):
1441         response = test_utils.get_otn_topo_request()
1442         self.assertEqual(response.status_code, requests.codes.ok)
1443         res = response.json()
1444         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1445         self.assertEqual(nb_links, 2)
1446         for link in res['network'][0]['ietf-network-topology:link']:
1447             self.assertEqual(
1448                 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1449             self.assertEqual(
1450                 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1451
1452     def test_90_delete_100GE_service_AC(self):
1453         response = test_utils.service_delete_request("service-100GE-AC")
1454         self.assertEqual(response.status_code, requests.codes.ok)
1455         res = response.json()
1456         self.assertIn('Renderer service delete in progress',
1457                       res['output']['configuration-response-common']['response-message'])
1458         time.sleep(self.WAITING)
1459
1460     def test_91_check_service_list(self):
1461         response = test_utils.get_service_list_request("")
1462         self.assertEqual(response.status_code, requests.codes.ok)
1463         res = response.json()
1464         self.assertEqual(len(res['service-list']['services']), 1)
1465         time.sleep(2)
1466
1467     def test_92_check_configuration_spdra(self):
1468         self.test_62_check_no_ODU4_connection_spdra()
1469         self.test_63_check_no_interface_ODU4_NETWORK_spdra()
1470         self.test_64_check_no_interface_ODU4_CLIENT_spdra()
1471         self.test_65_check_no_interface_100GE_CLIENT_spdra()
1472
1473     def test_93_check_otn_topo_links(self):
1474         response = test_utils.get_otn_topo_request()
1475         self.assertEqual(response.status_code, requests.codes.ok)
1476         res = response.json()
1477         nb_links = len(res['network'][0]['ietf-network-topology:link'])
1478         self.assertEqual(nb_links, 2)
1479         for link in res['network'][0]['ietf-network-topology:link']:
1480             self.assertEqual(
1481                 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1482             self.assertEqual(
1483                 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1484
1485     def test_94_disconnect_xponders_from_roadm(self):
1486         url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1487         response = test_utils.get_ordm_topo_request("")
1488         self.assertEqual(response.status_code, requests.codes.ok)
1489         res = response.json()
1490         links = res['network'][0]['ietf-network-topology:link']
1491         for link in links:
1492             if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1493                     link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1494                 link_name = link["link-id"]
1495                 response = test_utils.delete_request(url+link_name)
1496                 self.assertEqual(response.status_code, requests.codes.ok)
1497
1498     def test_95_disconnect_spdrA(self):
1499         response = test_utils.unmount_device("SPDR-SA1")
1500         self.assertEqual(response.status_code, requests.codes.ok,
1501                          test_utils.CODE_SHOULD_BE_200)
1502
1503     def test_96_disconnect_spdrC(self):
1504         response = test_utils.unmount_device("SPDR-SC1")
1505         self.assertEqual(response.status_code, requests.codes.ok,
1506                          test_utils.CODE_SHOULD_BE_200)
1507
1508     def test_97_disconnect_roadmA(self):
1509         response = test_utils.unmount_device("ROADM-A1")
1510         self.assertEqual(response.status_code, requests.codes.ok,
1511                          test_utils.CODE_SHOULD_BE_200)
1512
1513     def test_98_disconnect_roadmC(self):
1514         response = test_utils.unmount_device("ROADM-C1")
1515         self.assertEqual(response.status_code, requests.codes.ok,
1516                          test_utils.CODE_SHOULD_BE_200)
1517
1518
1519 if __name__ == "__main__":
1520     unittest.main(verbosity=2)