Update OTN configuration files for honeynode
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import base64
16 import unittest
17 import time
18 import requests
19 from common import test_utils
20 from common.test_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP
21
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     processes = None
26     WAITING = 20  # nominal value is 300
27
28     cr_serv_sample_data = {"input": {
29         "sdnc-request-header": {
30             "request-id": "request-1",
31             "rpc-action": "service-create",
32             "request-system-id": "appname"
33         },
34         "service-name": "service1-OCH-OTU4",
35         "common-id": "commonId",
36         "connection-type": "infrastructure",
37         "service-a-end": {
38             "service-rate": "100",
39             "node-id": "SPDR-SA1",
40             "service-format": "OTU",
41             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
42             "clli": "NodeSA",
43             "subrate-eth-sla": {
44                     "subrate-eth-sla": {
45                         "committed-info-rate": "100000",
46                         "committed-burst-size": "64"
47                     }
48             },
49             "tx-direction": {
50                 "port": {
51                     "port-device-name": "SPDR-SA1-XPDR1",
52                     "port-type": "fixed",
53                     "port-name": "XPDR1-NETWORK1",
54                     "port-rack": "000000.00",
55                     "port-shelf": "Chassis#1"
56                 },
57                 "lgx": {
58                     "lgx-device-name": "Some lgx-device-name",
59                     "lgx-port-name": "Some lgx-port-name",
60                     "lgx-port-rack": "000000.00",
61                     "lgx-port-shelf": "00"
62                 }
63             },
64             "rx-direction": {
65                 "port": {
66                     "port-device-name": "SPDR-SA1-XPDR1",
67                     "port-type": "fixed",
68                     "port-name": "XPDR1-NETWORK1",
69                     "port-rack": "000000.00",
70                     "port-shelf": "Chassis#1"
71                 },
72                 "lgx": {
73                     "lgx-device-name": "Some lgx-device-name",
74                     "lgx-port-name": "Some lgx-port-name",
75                     "lgx-port-rack": "000000.00",
76                     "lgx-port-shelf": "00"
77                 }
78             },
79             "optic-type": "gray"
80         },
81         "service-z-end": {
82             "service-rate": "100",
83             "node-id": "SPDR-SC1",
84             "service-format": "OTU",
85             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
86             "clli": "NodeSC",
87             "subrate-eth-sla": {
88                     "subrate-eth-sla": {
89                         "committed-info-rate": "100000",
90                         "committed-burst-size": "64"
91                     }
92             },
93             "tx-direction": {
94                 "port": {
95                     "port-device-name": "SPDR-SC1-XPDR1",
96                     "port-type": "fixed",
97                     "port-name": "XPDR1-NETWORK1",
98                     "port-rack": "000000.00",
99                     "port-shelf": "Chassis#1"
100                 },
101                 "lgx": {
102                     "lgx-device-name": "Some lgx-device-name",
103                     "lgx-port-name": "Some lgx-port-name",
104                     "lgx-port-rack": "000000.00",
105                     "lgx-port-shelf": "00"
106                 }
107             },
108             "rx-direction": {
109                 "port": {
110                     "port-device-name": "SPDR-SC1-XPDR1",
111                     "port-type": "fixed",
112                     "port-name": "XPDR1-NETWORK1",
113                     "port-rack": "000000.00",
114                     "port-shelf": "Chassis#1"
115                 },
116                 "lgx": {
117                     "lgx-device-name": "Some lgx-device-name",
118                     "lgx-port-name": "Some lgx-port-name",
119                     "lgx-port-rack": "000000.00",
120                     "lgx-port-shelf": "00"
121                 }
122             },
123             "optic-type": "gray"
124         },
125         "due-date": "2018-06-15T00:00:01Z",
126         "operator-contact": "pw1234"
127     }
128     }
129
130     @classmethod
131     def setUpClass(cls):
132         cls.processes = test_utils.start_tpce()
133         cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
134
135     @classmethod
136     def tearDownClass(cls):
137         # pylint: disable=not-an-iterable
138         for process in cls.processes:
139             test_utils.shutdown_process(process)
140         print("all processes killed")
141
142     def setUp(self):
143         time.sleep(5)
144
145     def test_01_connect_spdrA(self):
146         response = test_utils.mount_device("SPDR-SA1", 'spdra')
147         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148
149     def test_02_connect_spdrC(self):
150         response = test_utils.mount_device("SPDR-SC1", 'spdrc')
151         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152
153     def test_03_connect_rdmA(self):
154         response = test_utils.mount_device("ROADM-A1", 'roadma')
155         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156
157     def test_04_connect_rdmC(self):
158         response = test_utils.mount_device("ROADM-C1", 'roadmc')
159         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160
161     def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
162         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
163                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
164         self.assertEqual(response.status_code, requests.codes.ok)
165         res = response.json()
166         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
167         time.sleep(2)
168
169     def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
170         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
171                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
172         self.assertEqual(response.status_code, requests.codes.ok)
173         res = response.json()
174         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
175         time.sleep(2)
176
177     def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
178         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
179                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
180         self.assertEqual(response.status_code, requests.codes.ok)
181         res = response.json()
182         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
183         time.sleep(2)
184
185     def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
186         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
187                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
188         self.assertEqual(response.status_code, requests.codes.ok)
189         res = response.json()
190         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
191         time.sleep(2)
192
193     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
194         # Config ROADMA-ROADMC oms-attributes
195         data = {"span": {
196             "auto-spanloss": "true",
197             "spanloss-base": 11.4,
198             "spanloss-current": 12,
199             "engineered-spanloss": 12.2,
200             "link-concatenation": [{
201                 "SRLG-Id": 0,
202                 "fiber-type": "smf",
203                 "SRLG-length": 100000,
204                 "pmd": 0.5}]}}
205         response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
206         self.assertEqual(response.status_code, requests.codes.created)
207
208     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
209         # Config ROADMC-ROADMA oms-attributes
210         data = {"span": {
211             "auto-spanloss": "true",
212             "spanloss-base": 11.4,
213             "spanloss-current": 12,
214             "engineered-spanloss": 12.2,
215             "link-concatenation": [{
216                 "SRLG-Id": 0,
217                 "fiber-type": "smf",
218                 "SRLG-length": 100000,
219                 "pmd": 0.5}]}}
220         response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
221         self.assertEqual(response.status_code, requests.codes.created)
222
223 # test service-create for OCH-OTU4 service from spdr to spdr
224     def test_11_check_otn_topology(self):
225         response = test_utils.get_otn_topo_request()
226         self.assertEqual(response.status_code, requests.codes.ok)
227         res = response.json()
228         nbNode = len(res['network'][0]['node'])
229         self.assertEqual(nbNode, 6, 'There should be 6 nodes')
230         self.assertNotIn('ietf-network-topology:link', res['network'][0])
231
232     def test_12_create_OCH_OTU4_service(self):
233         response = test_utils.service_create_request(self.cr_serv_sample_data)
234         self.assertEqual(response.status_code, requests.codes.ok)
235         res = response.json()
236         self.assertIn('PCE calculation in progress',
237                       res['output']['configuration-response-common']['response-message'])
238         time.sleep(self.WAITING)
239
240     def test_13_get_OCH_OTU4_service1(self):
241         response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
242         self.assertEqual(response.status_code, requests.codes.ok)
243         res = response.json()
244         self.assertEqual(
245             res['services'][0]['administrative-state'], 'inService')
246         self.assertEqual(
247             res['services'][0]['service-name'], 'service1-OCH-OTU4')
248         self.assertEqual(
249             res['services'][0]['connection-type'], 'infrastructure')
250         self.assertEqual(
251             res['services'][0]['lifecycle-state'], 'planned')
252         time.sleep(2)
253
254     # Check correct configuration of devices
255     def test_14_check_interface_och_spdra(self):
256         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
257         self.assertEqual(response.status_code, requests.codes.ok)
258         res = response.json()
259         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
260                                    'administrative-state': 'inService',
261                                    'supporting-circuit-pack-name': 'CP1-CFP0',
262                                    'type': 'org-openroadm-interfaces:opticalChannel',
263                                    'supporting-port': 'CP1-CFP0-P1'
264                                    }, **res['interface'][0]),
265                              res['interface'][0])
266
267         self.assertDictEqual(
268             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
269              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
270             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
271
272     def test_15_check_interface_OTU4_spdra(self):
273         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
274         self.assertEqual(response.status_code, requests.codes.ok)
275         res = response.json()
276         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
277                         'administrative-state': 'inService',
278                         'supporting-circuit-pack-name': 'CP1-CFP0',
279                         'supporting-interface': 'XPDR1-NETWORK1-1',
280                         'type': 'org-openroadm-interfaces:otnOtu',
281                         'supporting-port': 'CP1-CFP0-P1'
282                         }
283         input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
284                         'expected-dapi': 'Swfw02qXGyI=',
285                         'rate': 'org-openroadm-otn-common-types:OTU4',
286                         'fec': 'scfec'
287                         }
288         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
289                              res['interface'][0])
290
291         self.assertDictEqual(input_dict_2,
292                              res['interface'][0]
293                              ['org-openroadm-otn-otu-interfaces:otu'])
294
295     def test_16_check_interface_och_spdrc(self):
296         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
297         self.assertEqual(response.status_code, requests.codes.ok)
298         res = response.json()
299         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
300                                    'administrative-state': 'inService',
301                                    'supporting-circuit-pack-name': 'CP1-CFP0',
302                                    'type': 'org-openroadm-interfaces:opticalChannel',
303                                    'supporting-port': 'CP1-CFP0-P1'
304                                    }, **res['interface'][0]),
305                              res['interface'][0])
306
307         self.assertDictEqual(
308             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
309              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
310             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
311
312     def test_17_check_interface_OTU4_spdrc(self):
313         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
314         self.assertEqual(response.status_code, requests.codes.ok)
315         res = response.json()
316         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
317                         'administrative-state': 'inService',
318                         'supporting-circuit-pack-name': 'CP1-CFP0',
319                         'supporting-interface': 'XPDR1-NETWORK1-1',
320                         'type': 'org-openroadm-interfaces:otnOtu',
321                         'supporting-port': 'CP1-CFP0-P1'
322                         }
323         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
324                         'expected-sapi': 'Swfw02qXGyI=',
325                         'tx-sapi': 'fuYZwEO660g=',
326                         'expected-dapi': 'fuYZwEO660g=',
327                         'rate': 'org-openroadm-otn-common-types:OTU4',
328                         'fec': 'scfec'
329                         }
330
331         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
332                              res['interface'][0])
333
334         self.assertDictEqual(input_dict_2,
335                              res['interface'][0]
336                              ['org-openroadm-otn-otu-interfaces:otu'])
337
338     def test_18_check_no_interface_ODU4_spdra(self):
339         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
340         self.assertEqual(response.status_code, requests.codes.conflict)
341         res = response.json()
342         self.assertIn(
343             {"error-type": "application", "error-tag": "data-missing",
344              "error-message": "Request could not be completed because the relevant data model content does not exist"},
345             res['errors']['error'])
346
347     def test_19_check_openroadm_topo_spdra(self):
348         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
349         self.assertEqual(response.status_code, requests.codes.ok)
350         res = response.json()
351         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
352         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
353         self.assertEqual({u'frequency': 196.1,
354                           u'width': 40},
355                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
356         time.sleep(3)
357
358     def test_20_check_openroadm_topo_ROADMA_SRG(self):
359         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
360         self.assertEqual(response.status_code, requests.codes.ok)
361         res = response.json()
362         freq_map = base64.b64decode(
363             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
364         freq_map_array = [int(x) for x in freq_map]
365         self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
366         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
367         for ele in liste_tp:
368             if ele['tp-id'] == 'SRG1-PP1-TXRX':
369                 self.assertIn({u'index': 1, u'frequency': 196.1,
370                                u'width': 40},
371                               ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
372             if ele['tp-id'] == 'SRG1-PP2-TXRX':
373                 self.assertNotIn('used-wavelength', dict.keys(ele))
374         time.sleep(3)
375
376     def test_21_check_openroadm_topo_ROADMA_DEG(self):
377         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
378         self.assertEqual(response.status_code, requests.codes.ok)
379         res = response.json()
380         freq_map = base64.b64decode(
381             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
382         freq_map_array = [int(x) for x in freq_map]
383         self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
384         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
385         for ele in liste_tp:
386             if ele['tp-id'] == 'DEG2-CTP-TXRX':
387                 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
388                                u'effective-bits': 8, u'freq-map': INDEX_1_USED_FREQ_MAP},
389                               ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'])
390             if ele['tp-id'] == 'DEG2-TTP-TXRX':
391                 self.assertIn({u'index': 1, u'frequency': 196.1,
392                                u'width': 40},
393                               ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
394         time.sleep(3)
395
396     def test_22_check_otn_topo_otu4_links(self):
397         response = test_utils.get_otn_topo_request()
398         self.assertEqual(response.status_code, requests.codes.ok)
399         res = response.json()
400         nb_links = len(res['network'][0]['ietf-network-topology:link'])
401         self.assertEqual(nb_links, 2)
402         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
403                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
404         for link in res['network'][0]['ietf-network-topology:link']:
405             self.assertIn(link['link-id'], listLinkId)
406             self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
407             self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
408             self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
409             self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
410             self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
411
412 # test service-create for ODU4 service from spdr to spdr
413     def test_23_create_ODU4_service(self):
414         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
415         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
416         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
417         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
418         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
419         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
420         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
421
422         response = test_utils.service_create_request(self.cr_serv_sample_data)
423         self.assertEqual(response.status_code, requests.codes.ok)
424         res = response.json()
425         self.assertIn('PCE calculation in progress',
426                       res['output']['configuration-response-common']['response-message'])
427         time.sleep(self.WAITING)
428
429     def test_24_get_ODU4_service1(self):
430         response = test_utils.get_service_list_request("services/service1-ODU4")
431         self.assertEqual(response.status_code, requests.codes.ok)
432         res = response.json()
433         self.assertEqual(
434             res['services'][0]['administrative-state'], 'inService')
435         self.assertEqual(
436             res['services'][0]['service-name'], 'service1-ODU4')
437         self.assertEqual(
438             res['services'][0]['connection-type'], 'infrastructure')
439         self.assertEqual(
440             res['services'][0]['lifecycle-state'], 'planned')
441         time.sleep(2)
442
443     def test_25_check_interface_ODU4_spdra(self):
444         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
445         self.assertEqual(response.status_code, requests.codes.ok)
446         res = response.json()
447         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
448                         'administrative-state': 'inService',
449                         'supporting-circuit-pack-name': 'CP1-CFP0',
450                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
451                         'type': 'org-openroadm-interfaces:otnOdu',
452                         'supporting-port': 'CP1-CFP0-P1'}
453         # SAPI/DAPI are added in the Otu4 renderer
454         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
455                         'rate': 'org-openroadm-otn-common-types:ODU4',
456                         'expected-dapi': 'Swfw02qXGyI=',
457                         'expected-sapi': 'fuYZwEO660g=',
458                         'tx-dapi': 'fuYZwEO660g=',
459                         'tx-sapi': 'Swfw02qXGyI='}
460
461         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
462                              res['interface'][0])
463         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
464                                   **input_dict_2),
465                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
466                              )
467         self.assertDictEqual(
468             {u'payload-type': u'21', u'exp-payload-type': u'21'},
469             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
470
471     def test_26_check_interface_ODU4_spdrc(self):
472         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
473         self.assertEqual(response.status_code, requests.codes.ok)
474         res = response.json()
475         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
476                         'administrative-state': 'inService',
477                         'supporting-circuit-pack-name': 'CP1-CFP0',
478                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
479                         'type': 'org-openroadm-interfaces:otnOdu',
480                         'supporting-port': 'CP1-CFP0-P1'}
481         # SAPI/DAPI are added in the Otu4 renderer
482         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
483                         'rate': 'org-openroadm-otn-common-types:ODU4',
484                         'tx-sapi': 'fuYZwEO660g=',
485                         'tx-dapi': 'Swfw02qXGyI=',
486                         'expected-sapi': 'Swfw02qXGyI=',
487                         'expected-dapi': 'fuYZwEO660g='
488                         }
489         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
490                              res['interface'][0])
491         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
492                                   **input_dict_2),
493                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
494                              )
495         self.assertDictEqual(
496             {u'payload-type': u'21', u'exp-payload-type': u'21'},
497             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
498
499     def test_27_check_otn_topo_links(self):
500         response = test_utils.get_otn_topo_request()
501         self.assertEqual(response.status_code, requests.codes.ok)
502         res = response.json()
503         nb_links = len(res['network'][0]['ietf-network-topology:link'])
504         self.assertEqual(nb_links, 4)
505         for link in res['network'][0]['ietf-network-topology:link']:
506             linkId = link['link-id']
507             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
508                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
509                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
510                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
511             elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
512                              'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
513                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
514                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
515                 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
516                 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
517                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
518                               ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
519                                'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
520             else:
521                 self.fail("this link should not exist")
522
523     def test_28_check_otn_topo_tp(self):
524         response = test_utils.get_otn_topo_request()
525         res = response.json()
526         for node in res['network'][0]['node']:
527             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
528                 tpList = node['ietf-network-topology:termination-point']
529                 for tp in tpList:
530                     if tp['tp-id'] == 'XPDR1-NETWORK1':
531                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
532                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
533                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
534                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
535                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
536
537 # test service-create for 10GE service from spdr to spdr
538     def test_29_create_10GE_service(self):
539         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
540         self.cr_serv_sample_data["input"]["connection-type"] = "service"
541         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
542         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
543         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
544         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
545         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
546         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
547         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
548         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
549         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
550         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
551         response = test_utils.service_create_request(self.cr_serv_sample_data)
552         self.assertEqual(response.status_code, requests.codes.ok)
553         res = response.json()
554         self.assertIn('PCE calculation in progress',
555                       res['output']['configuration-response-common']['response-message'])
556         time.sleep(self.WAITING)
557
558     def test_30_get_10GE_service1(self):
559         response = test_utils.get_service_list_request("services/service1-10GE")
560         self.assertEqual(response.status_code, requests.codes.ok)
561         res = response.json()
562         self.assertEqual(
563             res['services'][0]['administrative-state'], 'inService')
564         self.assertEqual(
565             res['services'][0]['service-name'], 'service1-10GE')
566         self.assertEqual(
567             res['services'][0]['connection-type'], 'service')
568         self.assertEqual(
569             res['services'][0]['lifecycle-state'], 'planned')
570         time.sleep(2)
571
572     def test_31_check_interface_10GE_CLIENT_spdra(self):
573         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
574         self.assertEqual(response.status_code, requests.codes.ok)
575         res = response.json()
576         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
577                       'administrative-state': 'inService',
578                       'supporting-circuit-pack-name': 'CP1-SFP4',
579                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
580                       'supporting-port': 'CP1-SFP4-P1'
581                       }
582         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
583                              res['interface'][0])
584         self.assertDictEqual(
585             {u'speed': 10000},
586             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
587
588     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
589         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
590         self.assertEqual(response.status_code, requests.codes.ok)
591         res = response.json()
592         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
593                         'administrative-state': 'inService',
594                         'supporting-circuit-pack-name': 'CP1-SFP4',
595                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
596                         'type': 'org-openroadm-interfaces:otnOdu',
597                         'supporting-port': 'CP1-SFP4-P1'}
598         input_dict_2 = {
599             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
600             'rate': 'org-openroadm-otn-common-types:ODU2e',
601             'monitoring-mode': 'terminated'}
602
603         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
604                              res['interface'][0])
605         self.assertDictEqual(dict(input_dict_2,
606                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
607                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
608         self.assertDictEqual(
609             {u'payload-type': u'03', u'exp-payload-type': u'03'},
610             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
611
612     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
613         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
614         self.assertEqual(response.status_code, requests.codes.ok)
615         res = response.json()
616         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
617                         'administrative-state': 'inService',
618                         'supporting-circuit-pack-name': 'CP1-CFP0',
619                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
620                         'type': 'org-openroadm-interfaces:otnOdu',
621                         'supporting-port': 'CP1-CFP0-P1'}
622         input_dict_2 = {
623             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
624             'rate': 'org-openroadm-otn-common-types:ODU2e',
625             'monitoring-mode': 'monitored'}
626         input_dict_3 = {'trib-port-number': 1}
627
628         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
629                              res['interface'][0])
630         self.assertDictEqual(dict(input_dict_2,
631                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
632                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
633         self.assertDictEqual(dict(input_dict_3,
634                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
635                                       'parent-odu-allocation']),
636                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
637         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
638                       ['trib-slots'])
639
640     def test_34_check_ODU2E_connection_spdra(self):
641         response = test_utils.check_netconf_node_request(
642             "SPDR-SA1",
643             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
644         self.assertEqual(response.status_code, requests.codes.ok)
645         res = response.json()
646         input_dict_1 = {
647             'connection-name':
648             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
649             'direction': 'bidirectional'
650         }
651
652         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
653                              res['odu-connection'][0])
654         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
655                              res['odu-connection'][0]['destination'])
656         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
657                              res['odu-connection'][0]['source'])
658
659     def test_35_check_interface_10GE_CLIENT_spdrc(self):
660         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
661         self.assertEqual(response.status_code, requests.codes.ok)
662         res = response.json()
663         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
664                       'administrative-state': 'inService',
665                       'supporting-circuit-pack-name': 'CP1-SFP4',
666                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
667                       'supporting-port': 'CP1-SFP4-P1'
668                       }
669         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
670                              res['interface'][0])
671         self.assertDictEqual(
672             {u'speed': 10000},
673             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
674
675     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
676         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
677         self.assertEqual(response.status_code, requests.codes.ok)
678         res = response.json()
679         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
680                         'administrative-state': 'inService',
681                         'supporting-circuit-pack-name': 'CP1-SFP4',
682                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
683                         'type': 'org-openroadm-interfaces:otnOdu',
684                         'supporting-port': 'CP1-SFP4-P1'}
685         input_dict_2 = {
686             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
687             'rate': 'org-openroadm-otn-common-types:ODU2e',
688             'monitoring-mode': 'terminated'}
689
690         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
691                              res['interface'][0])
692         self.assertDictEqual(dict(input_dict_2,
693                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
694                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
695         self.assertDictEqual(
696             {u'payload-type': u'03', u'exp-payload-type': u'03'},
697             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
698
699     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
700         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
701         self.assertEqual(response.status_code, requests.codes.ok)
702         res = response.json()
703         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
704                         'administrative-state': 'inService',
705                         'supporting-circuit-pack-name': 'CP1-CFP0',
706                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
707                         'type': 'org-openroadm-interfaces:otnOdu',
708                         'supporting-port': 'CP1-CFP0-P1'}
709         input_dict_2 = {
710             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
711             'rate': 'org-openroadm-otn-common-types:ODU2e',
712             'monitoring-mode': 'monitored'}
713
714         input_dict_3 = {'trib-port-number': 1}
715
716         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
717                              res['interface'][0])
718         self.assertDictEqual(dict(input_dict_2,
719                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
720                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
721         self.assertDictEqual(dict(input_dict_3,
722                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
723                                       'parent-odu-allocation']),
724                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
725             'parent-odu-allocation'])
726         self.assertIn(1,
727                       res['interface'][0][
728                           'org-openroadm-otn-odu-interfaces:odu'][
729                           'parent-odu-allocation']['trib-slots'])
730
731     def test_38_check_ODU2E_connection_spdrc(self):
732         response = test_utils.check_netconf_node_request(
733             "SPDR-SC1",
734             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
735         self.assertEqual(response.status_code, requests.codes.ok)
736         res = response.json()
737         input_dict_1 = {
738             'connection-name':
739             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
740             'direction': 'bidirectional'
741         }
742
743         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
744                              res['odu-connection'][0])
745         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
746                              res['odu-connection'][0]['destination'])
747         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
748                              res['odu-connection'][0]['source'])
749
750     def test_39_check_otn_topo_links(self):
751         response = test_utils.get_otn_topo_request()
752         self.assertEqual(response.status_code, requests.codes.ok)
753         res = response.json()
754         nb_links = len(res['network'][0]['ietf-network-topology:link'])
755         self.assertEqual(nb_links, 4)
756         for link in res['network'][0]['ietf-network-topology:link']:
757             linkId = link['link-id']
758             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
759                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
760                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
761                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
762
763     def test_40_check_otn_topo_tp(self):
764         response = test_utils.get_otn_topo_request()
765         res = response.json()
766         for node in res['network'][0]['node']:
767             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
768                 tpList = node['ietf-network-topology:termination-point']
769                 for tp in tpList:
770                     if tp['tp-id'] == 'XPDR1-NETWORK1':
771                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
772                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
773                         tsPoolList = list(range(1, 9))
774                         self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
775                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
776                         self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
777
778     def test_41_delete_10GE_service(self):
779         response = test_utils.service_delete_request("service1-10GE")
780         self.assertEqual(response.status_code, requests.codes.ok)
781         res = response.json()
782         self.assertIn('Renderer service delete in progress',
783                       res['output']['configuration-response-common']['response-message'])
784         time.sleep(20)
785
786     def test_42_check_service_list(self):
787         response = test_utils.get_service_list_request("")
788         self.assertEqual(response.status_code, requests.codes.ok)
789         res = response.json()
790         self.assertEqual(len(res['service-list']['services']), 2)
791         time.sleep(2)
792
793     def test_43_check_no_ODU2e_connection_spdra(self):
794         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
795         self.assertEqual(response.status_code, requests.codes.ok)
796         res = response.json()
797         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
798         time.sleep(1)
799
800     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
801         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
802         self.assertEqual(response.status_code, requests.codes.conflict)
803
804     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
805         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
806         self.assertEqual(response.status_code, requests.codes.conflict)
807
808     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
809         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
810         self.assertEqual(response.status_code, requests.codes.conflict)
811
812     def test_47_check_otn_topo_links(self):
813         response = test_utils.get_otn_topo_request()
814         self.assertEqual(response.status_code, requests.codes.ok)
815         res = response.json()
816         nb_links = len(res['network'][0]['ietf-network-topology:link'])
817         self.assertEqual(nb_links, 4)
818         for link in res['network'][0]['ietf-network-topology:link']:
819             linkId = link['link-id']
820             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
821                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
822                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
823                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
824
825     def test_48_check_otn_topo_tp(self):
826         response = test_utils.get_otn_topo_request()
827         res = response.json()
828         for node in res['network'][0]['node']:
829             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
830                 tpList = node['ietf-network-topology:termination-point']
831                 for tp in tpList:
832                     if tp['tp-id'] == 'XPDR1-NETWORK1':
833                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
834                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
835                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
836
837     def test_49_delete_ODU4_service(self):
838         response = test_utils.service_delete_request("service1-ODU4")
839         self.assertEqual(response.status_code, requests.codes.ok)
840         res = response.json()
841         self.assertIn('Renderer service delete in progress',
842                       res['output']['configuration-response-common']['response-message'])
843         time.sleep(20)
844
845     def test_50_check_service_list(self):
846         response = test_utils.get_service_list_request("")
847         self.assertEqual(response.status_code, requests.codes.ok)
848         res = response.json()
849         self.assertEqual(len(res['service-list']['services']), 1)
850         time.sleep(2)
851
852     def test_51_check_no_interface_ODU4_spdra(self):
853         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
854         self.assertEqual(response.status_code, requests.codes.conflict)
855
856     def test_52_check_otn_topo_links(self):
857         self.test_22_check_otn_topo_otu4_links()
858
859     def test_53_check_otn_topo_tp(self):
860         response = test_utils.get_otn_topo_request()
861         res = response.json()
862         for node in res['network'][0]['node']:
863             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
864                 tpList = node['ietf-network-topology:termination-point']
865                 for tp in tpList:
866                     if tp['tp-id'] == 'XPDR1-NETWORK1':
867                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
868                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
869                         self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
870
871     def test_54_delete_OCH_OTU4_service(self):
872         response = test_utils.service_delete_request("service1-OCH-OTU4")
873         self.assertEqual(response.status_code, requests.codes.ok)
874         res = response.json()
875         self.assertIn('Renderer service delete in progress',
876                       res['output']['configuration-response-common']['response-message'])
877         time.sleep(20)
878
879     def test_55_get_no_service(self):
880         response = test_utils.get_service_list_request("")
881         self.assertEqual(response.status_code, requests.codes.conflict)
882         res = response.json()
883         self.assertIn(
884             {"error-type": "application", "error-tag": "data-missing",
885              "error-message": "Request could not be completed because the relevant data model content does not exist"},
886             res['errors']['error'])
887         time.sleep(1)
888
889     def test_56_check_no_interface_OTU4_spdra(self):
890         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
891         self.assertEqual(response.status_code, requests.codes.conflict)
892
893     def test_57_check_no_interface_OCH_spdra(self):
894         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
895         self.assertEqual(response.status_code, requests.codes.conflict)
896
897     def test_58_getLinks_OtnTopology(self):
898         response = test_utils.get_otn_topo_request()
899         self.assertEqual(response.status_code, requests.codes.ok)
900         res = response.json()
901         self.assertNotIn('ietf-network-topology:link', res['network'][0])
902
903     def test_59_check_openroadm_topo_spdra(self):
904         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
905         self.assertEqual(response.status_code, requests.codes.ok)
906         res = response.json()
907         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
908         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
909         self.assertNotIn('wavelength', dict.keys(
910             tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
911         time.sleep(3)
912
913     def test_60_check_openroadm_topo_ROADMA_SRG(self):
914         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
915         self.assertEqual(response.status_code, requests.codes.ok)
916         res = response.json()
917         freq_map = base64.b64decode(
918             res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
919         freq_map_array = [int(x) for x in freq_map]
920         self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
921         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
922         for ele in liste_tp:
923             if ele['tp-id'] == 'SRG1-PP1-TXRX':
924                 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
925         time.sleep(3)
926
927     def test_61_check_openroadm_topo_ROADMA_DEG(self):
928         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
929         self.assertEqual(response.status_code, requests.codes.ok)
930         res = response.json()
931         print(res)
932         freq_map = base64.b64decode(
933             res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
934         freq_map_array = [int(x) for x in freq_map]
935         self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
936         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
937         for ele in liste_tp:
938             if ele['tp-id'] == 'DEG2-CTP-TXRX':
939                 self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
940                                  AVAILABLE_FREQ_MAP)
941             if ele['tp-id'] == 'DEG2-TTP-TXRX':
942                 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
943         time.sleep(3)
944
945     def test_62_disconnect_spdrA(self):
946         response = test_utils.unmount_device("SPDR-SA1")
947         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
948
949     def test_63_disconnect_spdrC(self):
950         response = test_utils.unmount_device("SPDR-SC1")
951         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
952
953     def test_64_disconnect_roadmA(self):
954         response = test_utils.unmount_device("ROADM-A1")
955         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
956
957     def test_65_disconnect_roadmC(self):
958         response = test_utils.unmount_device("ROADM-C1")
959         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
960
961
962 if __name__ == "__main__":
963     unittest.main(verbosity=2)