ee0e0d1092ee0f042bcde90763fcf9c7e6eb7d93
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24     WAITING = 20  # nominal value is 300
25
26     cr_serv_sample_data = {"input": {
27         "sdnc-request-header": {
28             "request-id": "request-1",
29             "rpc-action": "service-create",
30             "request-system-id": "appname"
31         },
32         "service-name": "service1-OCH-OTU4",
33         "common-id": "commonId",
34         "connection-type": "infrastructure",
35         "service-a-end": {
36             "service-rate": "100",
37             "node-id": "SPDR-SA1",
38             "service-format": "OTU",
39             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
40             "clli": "NodeSA",
41             "subrate-eth-sla": {
42                     "subrate-eth-sla": {
43                         "committed-info-rate": "100000",
44                         "committed-burst-size": "64"
45                     }
46             },
47             "tx-direction": {
48                 "port": {
49                     "port-device-name": "SPDR-SA1-XPDR1",
50                     "port-type": "fixed",
51                     "port-name": "XPDR1-NETWORK1",
52                     "port-rack": "000000.00",
53                     "port-shelf": "Chassis#1"
54                 },
55                 "lgx": {
56                     "lgx-device-name": "Some lgx-device-name",
57                     "lgx-port-name": "Some lgx-port-name",
58                     "lgx-port-rack": "000000.00",
59                     "lgx-port-shelf": "00"
60                 }
61             },
62             "rx-direction": {
63                 "port": {
64                     "port-device-name": "SPDR-SA1-XPDR1",
65                     "port-type": "fixed",
66                     "port-name": "XPDR1-NETWORK1",
67                     "port-rack": "000000.00",
68                     "port-shelf": "Chassis#1"
69                 },
70                 "lgx": {
71                     "lgx-device-name": "Some lgx-device-name",
72                     "lgx-port-name": "Some lgx-port-name",
73                     "lgx-port-rack": "000000.00",
74                     "lgx-port-shelf": "00"
75                 }
76             },
77             "optic-type": "gray"
78         },
79         "service-z-end": {
80             "service-rate": "100",
81             "node-id": "SPDR-SC1",
82             "service-format": "OTU",
83             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
84             "clli": "NodeSC",
85             "subrate-eth-sla": {
86                     "subrate-eth-sla": {
87                         "committed-info-rate": "100000",
88                         "committed-burst-size": "64"
89                     }
90             },
91             "tx-direction": {
92                 "port": {
93                     "port-device-name": "SPDR-SC1-XPDR1",
94                     "port-type": "fixed",
95                     "port-name": "XPDR1-NETWORK1",
96                     "port-rack": "000000.00",
97                     "port-shelf": "Chassis#1"
98                 },
99                 "lgx": {
100                     "lgx-device-name": "Some lgx-device-name",
101                     "lgx-port-name": "Some lgx-port-name",
102                     "lgx-port-rack": "000000.00",
103                     "lgx-port-shelf": "00"
104                 }
105             },
106             "rx-direction": {
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 }
120             },
121             "optic-type": "gray"
122         },
123         "due-date": "2018-06-15T00:00:01Z",
124         "operator-contact": "pw1234"
125     }
126     }
127
128     @classmethod
129     def setUpClass(cls):
130         cls.processes = test_utils.start_tpce()
131         cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
132
133     @classmethod
134     def tearDownClass(cls):
135         for process in cls.processes:
136             test_utils.shutdown_process(process)
137         print("all processes killed")
138
139     def setUp(self):
140         time.sleep(5)
141
142     def test_01_connect_spdrA(self):
143         response = test_utils.mount_device("SPDR-SA1", 'spdra')
144         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145
146     def test_02_connect_spdrC(self):
147         response = test_utils.mount_device("SPDR-SC1", 'spdrc')
148         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149
150     def test_03_connect_rdmA(self):
151         response = test_utils.mount_device("ROADM-A1", 'roadma')
152         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153
154     def test_04_connect_rdmC(self):
155         response = test_utils.mount_device("ROADM-C1", 'roadmc')
156         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157
158     def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
159         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
160                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
161         self.assertEqual(response.status_code, requests.codes.ok)
162         res = response.json()
163         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
164         time.sleep(2)
165
166     def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
167         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
168                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
169         self.assertEqual(response.status_code, requests.codes.ok)
170         res = response.json()
171         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
172         time.sleep(2)
173
174     def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
175         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
176                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
177         self.assertEqual(response.status_code, requests.codes.ok)
178         res = response.json()
179         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
180         time.sleep(2)
181
182     def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
183         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
184                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
185         self.assertEqual(response.status_code, requests.codes.ok)
186         res = response.json()
187         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
188         time.sleep(2)
189
190     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
191         # Config ROADMA-ROADMC oms-attributes
192         data = {"span": {
193             "auto-spanloss": "true",
194             "spanloss-base": 11.4,
195             "spanloss-current": 12,
196             "engineered-spanloss": 12.2,
197             "link-concatenation": [{
198                 "SRLG-Id": 0,
199                 "fiber-type": "smf",
200                 "SRLG-length": 100000,
201                 "pmd": 0.5}]}}
202         response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
203         self.assertEqual(response.status_code, requests.codes.created)
204
205     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
206         # Config ROADMC-ROADMA oms-attributes
207         data = {"span": {
208             "auto-spanloss": "true",
209             "spanloss-base": 11.4,
210             "spanloss-current": 12,
211             "engineered-spanloss": 12.2,
212             "link-concatenation": [{
213                 "SRLG-Id": 0,
214                 "fiber-type": "smf",
215                 "SRLG-length": 100000,
216                 "pmd": 0.5}]}}
217         response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
218         self.assertEqual(response.status_code, requests.codes.created)
219
220 # test service-create for OCH-OTU4 service from spdr to spdr
221     def test_11_check_otn_topology(self):
222         response = test_utils.get_otn_topo_request()
223         self.assertEqual(response.status_code, requests.codes.ok)
224         res = response.json()
225         nbNode = len(res['network'][0]['node'])
226         self.assertEqual(nbNode, 4)
227         self.assertNotIn('ietf-network-topology:link', res['network'][0])
228
229     def test_12_create_OCH_OTU4_service(self):
230         response = test_utils.service_create_request(self.cr_serv_sample_data)
231         self.assertEqual(response.status_code, requests.codes.ok)
232         res = response.json()
233         self.assertIn('PCE calculation in progress',
234                       res['output']['configuration-response-common']['response-message'])
235         time.sleep(self.WAITING)
236
237     def test_13_get_OCH_OTU4_service1(self):
238         response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
239         self.assertEqual(response.status_code, requests.codes.ok)
240         res = response.json()
241         self.assertEqual(
242             res['services'][0]['administrative-state'], 'inService')
243         self.assertEqual(
244             res['services'][0]['service-name'], 'service1-OCH-OTU4')
245         self.assertEqual(
246             res['services'][0]['connection-type'], 'infrastructure')
247         self.assertEqual(
248             res['services'][0]['lifecycle-state'], 'planned')
249         time.sleep(2)
250
251     # Check correct configuration of devices
252     def test_14_check_interface_och_spdra(self):
253         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
254         self.assertEqual(response.status_code, requests.codes.ok)
255         res = response.json()
256         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
257                                    'administrative-state': 'inService',
258                                    'supporting-circuit-pack-name': 'CP1-CFP0',
259                                    'type': 'org-openroadm-interfaces:opticalChannel',
260                                    'supporting-port': 'CP1-CFP0-P1'
261                                    }, **res['interface'][0]),
262                              res['interface'][0])
263
264         self.assertDictEqual(
265             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
266              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
267             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
268
269     def test_15_check_interface_OTU4_spdra(self):
270         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
271         self.assertEqual(response.status_code, requests.codes.ok)
272         res = response.json()
273         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
274                         'administrative-state': 'inService',
275                         'supporting-circuit-pack-name': 'CP1-CFP0',
276                         'supporting-interface': 'XPDR1-NETWORK1-1',
277                         'type': 'org-openroadm-interfaces:otnOtu',
278                         'supporting-port': 'CP1-CFP0-P1'
279                         }
280         input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
281                         'expected-dapi': 'Swfw02qXGyI=',
282                         'rate': 'org-openroadm-otn-common-types:OTU4',
283                         'fec': 'scfec'
284                         }
285         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
286                              res['interface'][0])
287
288         self.assertDictEqual(input_dict_2,
289                              res['interface'][0]
290                              ['org-openroadm-otn-otu-interfaces:otu'])
291
292     def test_16_check_interface_och_spdrc(self):
293         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
294         self.assertEqual(response.status_code, requests.codes.ok)
295         res = response.json()
296         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
297                                    'administrative-state': 'inService',
298                                    'supporting-circuit-pack-name': 'CP1-CFP0',
299                                    'type': 'org-openroadm-interfaces:opticalChannel',
300                                    'supporting-port': 'CP1-CFP0-P1'
301                                    }, **res['interface'][0]),
302                              res['interface'][0])
303
304         self.assertDictEqual(
305             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
306              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
307             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
308
309     def test_17_check_interface_OTU4_spdrc(self):
310         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
311         self.assertEqual(response.status_code, requests.codes.ok)
312         res = response.json()
313         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
314                         'administrative-state': 'inService',
315                         'supporting-circuit-pack-name': 'CP1-CFP0',
316                         'supporting-interface': 'XPDR1-NETWORK1-1',
317                         'type': 'org-openroadm-interfaces:otnOtu',
318                         'supporting-port': 'CP1-CFP0-P1'
319                         }
320         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
321                         'expected-sapi': 'Swfw02qXGyI=',
322                         'tx-sapi': 'fuYZwEO660g=',
323                         'expected-dapi': 'fuYZwEO660g=',
324                         'rate': 'org-openroadm-otn-common-types:OTU4',
325                         'fec': 'scfec'
326                         }
327
328         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
329                              res['interface'][0])
330
331         self.assertDictEqual(input_dict_2,
332                              res['interface'][0]
333                              ['org-openroadm-otn-otu-interfaces:otu'])
334
335     def test_18_check_no_interface_ODU4_spdra(self):
336         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
337         self.assertEqual(response.status_code, requests.codes.conflict)
338         res = response.json()
339         self.assertIn(
340             {"error-type": "application", "error-tag": "data-missing",
341              "error-message": "Request could not be completed because the relevant data model content does not exist"},
342             res['errors']['error'])
343
344     def test_19_check_openroadm_topo_spdra(self):
345         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
346         self.assertEqual(response.status_code, requests.codes.ok)
347         res = response.json()
348         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
349         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
350         self.assertEqual({u'frequency': 196.1,
351                           u'width': 40},
352                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
353         time.sleep(3)
354
355     def test_20_check_openroadm_topo_ROADMA_SRG(self):
356         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
357         self.assertEqual(response.status_code, requests.codes.ok)
358         res = response.json()
359         self.assertNotIn({u'index': 1},
360                          res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
361         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
362         for ele in liste_tp:
363             if ele['tp-id'] == 'SRG1-PP1-TXRX':
364                 self.assertIn({u'index': 1, u'frequency': 196.1,
365                                u'width': 40},
366                               ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
367             if ele['tp-id'] == 'SRG1-PP2-TXRX':
368                 self.assertNotIn('used-wavelength', dict.keys(ele))
369         time.sleep(3)
370
371     def test_21_check_openroadm_topo_ROADMA_DEG(self):
372         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
373         self.assertEqual(response.status_code, requests.codes.ok)
374         res = response.json()
375         self.assertNotIn({u'index': 1},
376                          res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
377         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
378         for ele in liste_tp:
379             if ele['tp-id'] == 'DEG2-CTP-TXRX':
380                 self.assertIn({u'index': 1, u'frequency': 196.1,
381                                u'width': 40},
382                               ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
383             if ele['tp-id'] == 'DEG2-TTP-TXRX':
384                 self.assertIn({u'index': 1, u'frequency': 196.1,
385                                u'width': 40},
386                               ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
387         time.sleep(3)
388
389     def test_22_check_otn_topo_otu4_links(self):
390         response = test_utils.get_otn_topo_request()
391         self.assertEqual(response.status_code, requests.codes.ok)
392         res = response.json()
393         nb_links = len(res['network'][0]['ietf-network-topology:link'])
394         self.assertEqual(nb_links, 2)
395         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
396                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
397         for link in res['network'][0]['ietf-network-topology:link']:
398             self.assertIn(link['link-id'], listLinkId)
399             self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
400             self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
401             self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
402             self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
403             self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
404
405 # test service-create for ODU4 service from spdr to spdr
406     def test_23_create_ODU4_service(self):
407         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
408         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
409         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
410         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
411         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
412         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
413         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
414
415         response = test_utils.service_create_request(self.cr_serv_sample_data)
416         self.assertEqual(response.status_code, requests.codes.ok)
417         res = response.json()
418         self.assertIn('PCE calculation in progress',
419                       res['output']['configuration-response-common']['response-message'])
420         time.sleep(self.WAITING)
421
422     def test_24_get_ODU4_service1(self):
423         response = test_utils.get_service_list_request("services/service1-ODU4")
424         self.assertEqual(response.status_code, requests.codes.ok)
425         res = response.json()
426         self.assertEqual(
427             res['services'][0]['administrative-state'], 'inService')
428         self.assertEqual(
429             res['services'][0]['service-name'], 'service1-ODU4')
430         self.assertEqual(
431             res['services'][0]['connection-type'], 'infrastructure')
432         self.assertEqual(
433             res['services'][0]['lifecycle-state'], 'planned')
434         time.sleep(2)
435
436     def test_25_check_interface_ODU4_spdra(self):
437         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
438         self.assertEqual(response.status_code, requests.codes.ok)
439         res = response.json()
440         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
441                         'administrative-state': 'inService',
442                         'supporting-circuit-pack-name': 'CP1-CFP0',
443                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
444                         'type': 'org-openroadm-interfaces:otnOdu',
445                         'supporting-port': 'CP1-CFP0-P1'}
446         # SAPI/DAPI are added in the Otu4 renderer
447         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
448                         'rate': 'org-openroadm-otn-common-types:ODU4',
449                         'expected-dapi': 'Swfw02qXGyI=',
450                         'expected-sapi': 'fuYZwEO660g=',
451                         'tx-dapi': 'fuYZwEO660g=',
452                         'tx-sapi': 'Swfw02qXGyI='}
453
454         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
455                              res['interface'][0])
456         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
457                                   **input_dict_2),
458                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
459                              )
460         self.assertDictEqual(
461             {u'payload-type': u'21', u'exp-payload-type': u'21'},
462             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
463
464     def test_26_check_interface_ODU4_spdrc(self):
465         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
466         self.assertEqual(response.status_code, requests.codes.ok)
467         res = response.json()
468         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
469                         'administrative-state': 'inService',
470                         'supporting-circuit-pack-name': 'CP1-CFP0',
471                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
472                         'type': 'org-openroadm-interfaces:otnOdu',
473                         'supporting-port': 'CP1-CFP0-P1'}
474         # SAPI/DAPI are added in the Otu4 renderer
475         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
476                         'rate': 'org-openroadm-otn-common-types:ODU4',
477                         'tx-sapi': 'fuYZwEO660g=',
478                         'tx-dapi': 'Swfw02qXGyI=',
479                         'expected-sapi': 'Swfw02qXGyI=',
480                         'expected-dapi': 'fuYZwEO660g='
481                         }
482         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
483                              res['interface'][0])
484         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
485                                   **input_dict_2),
486                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
487                              )
488         self.assertDictEqual(
489             {u'payload-type': u'21', u'exp-payload-type': u'21'},
490             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
491
492     def test_27_check_otn_topo_links(self):
493         response = test_utils.get_otn_topo_request()
494         self.assertEqual(response.status_code, requests.codes.ok)
495         res = response.json()
496         nb_links = len(res['network'][0]['ietf-network-topology:link'])
497         self.assertEqual(nb_links, 4)
498         for link in res['network'][0]['ietf-network-topology:link']:
499             linkId = link['link-id']
500             if (linkId == 'OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
501                     linkId == 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
502                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
503                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
504             elif (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
505                   linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
506                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
507                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
508                 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
509                 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
510                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
511                               ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
512                                'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
513             else:
514                 self.fail("this link should not exist")
515
516     def test_28_check_otn_topo_tp(self):
517         response = test_utils.get_otn_topo_request()
518         res = response.json()
519         for node in res['network'][0]['node']:
520             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
521                 tpList = node['ietf-network-topology:termination-point']
522                 for tp in tpList:
523                     if tp['tp-id'] == 'XPDR1-NETWORK1':
524                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
525                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
526                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
527                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
528                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
529
530 # test service-create for 10GE service from spdr to spdr
531     def test_29_create_10GE_service(self):
532         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
533         self.cr_serv_sample_data["input"]["connection-type"] = "service"
534         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
535         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
536         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
537         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
538         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
539         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
540         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
541         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
542         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
543         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
544         response = test_utils.service_create_request(self.cr_serv_sample_data)
545         self.assertEqual(response.status_code, requests.codes.ok)
546         res = response.json()
547         self.assertIn('PCE calculation in progress',
548                       res['output']['configuration-response-common']['response-message'])
549         time.sleep(self.WAITING)
550
551     def test_30_get_10GE_service1(self):
552         response = test_utils.get_service_list_request("services/service1-10GE")
553         self.assertEqual(response.status_code, requests.codes.ok)
554         res = response.json()
555         self.assertEqual(
556             res['services'][0]['administrative-state'], 'inService')
557         self.assertEqual(
558             res['services'][0]['service-name'], 'service1-10GE')
559         self.assertEqual(
560             res['services'][0]['connection-type'], 'service')
561         self.assertEqual(
562             res['services'][0]['lifecycle-state'], 'planned')
563         time.sleep(2)
564
565     def test_31_check_interface_10GE_CLIENT_spdra(self):
566         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
567         self.assertEqual(response.status_code, requests.codes.ok)
568         res = response.json()
569         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
570                       'administrative-state': 'inService',
571                       'supporting-circuit-pack-name': 'CP1-SFP4',
572                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
573                       'supporting-port': 'CP1-SFP4-P1'
574                       }
575         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
576                              res['interface'][0])
577         self.assertDictEqual(
578             {u'speed': 10000},
579             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
580
581     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
582         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
583         self.assertEqual(response.status_code, requests.codes.ok)
584         res = response.json()
585         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
586                         'administrative-state': 'inService',
587                         'supporting-circuit-pack-name': 'CP1-SFP4',
588                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
589                         'type': 'org-openroadm-interfaces:otnOdu',
590                         'supporting-port': 'CP1-SFP4-P1'}
591         input_dict_2 = {
592             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
593             'rate': 'org-openroadm-otn-common-types:ODU2e',
594             'monitoring-mode': 'terminated'}
595
596         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
597                              res['interface'][0])
598         self.assertDictEqual(dict(input_dict_2,
599                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
600                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
601         self.assertDictEqual(
602             {u'payload-type': u'03', u'exp-payload-type': u'03'},
603             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
604
605     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
606         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
607         self.assertEqual(response.status_code, requests.codes.ok)
608         res = response.json()
609         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
610                         'administrative-state': 'inService',
611                         'supporting-circuit-pack-name': 'CP1-CFP0',
612                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
613                         'type': 'org-openroadm-interfaces:otnOdu',
614                         'supporting-port': 'CP1-CFP0-P1'}
615         input_dict_2 = {
616             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
617             'rate': 'org-openroadm-otn-common-types:ODU2e',
618             'monitoring-mode': 'monitored'}
619         input_dict_3 = {'trib-port-number': 1}
620
621         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
622                              res['interface'][0])
623         self.assertDictEqual(dict(input_dict_2,
624                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
625                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
626         self.assertDictEqual(dict(input_dict_3,
627                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
628                                       'parent-odu-allocation']),
629                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
630         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
631                       ['trib-slots'])
632
633     def test_34_check_ODU2E_connection_spdra(self):
634         response = test_utils.check_netconf_node_request(
635             "SPDR-SA1",
636             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
637         self.assertEqual(response.status_code, requests.codes.ok)
638         res = response.json()
639         input_dict_1 = {
640             'connection-name':
641             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
642             'direction': 'bidirectional'
643         }
644
645         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
646                              res['odu-connection'][0])
647         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
648                              res['odu-connection'][0]['destination'])
649         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
650                              res['odu-connection'][0]['source'])
651
652     def test_35_check_interface_10GE_CLIENT_spdrc(self):
653         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
654         self.assertEqual(response.status_code, requests.codes.ok)
655         res = response.json()
656         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
657                       'administrative-state': 'inService',
658                       'supporting-circuit-pack-name': 'CP1-SFP4',
659                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
660                       'supporting-port': 'CP1-SFP4-P1'
661                       }
662         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
663                              res['interface'][0])
664         self.assertDictEqual(
665             {u'speed': 10000},
666             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
667
668     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
669         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
670         self.assertEqual(response.status_code, requests.codes.ok)
671         res = response.json()
672         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
673                         'administrative-state': 'inService',
674                         'supporting-circuit-pack-name': 'CP1-SFP4',
675                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
676                         'type': 'org-openroadm-interfaces:otnOdu',
677                         'supporting-port': 'CP1-SFP4-P1'}
678         input_dict_2 = {
679             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
680             'rate': 'org-openroadm-otn-common-types:ODU2e',
681             'monitoring-mode': 'terminated'}
682
683         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
684                              res['interface'][0])
685         self.assertDictEqual(dict(input_dict_2,
686                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
687                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
688         self.assertDictEqual(
689             {u'payload-type': u'03', u'exp-payload-type': u'03'},
690             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
691
692     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
693         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
694         self.assertEqual(response.status_code, requests.codes.ok)
695         res = response.json()
696         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
697                         'administrative-state': 'inService',
698                         'supporting-circuit-pack-name': 'CP1-CFP0',
699                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
700                         'type': 'org-openroadm-interfaces:otnOdu',
701                         'supporting-port': 'CP1-CFP0-P1'}
702         input_dict_2 = {
703             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
704             'rate': 'org-openroadm-otn-common-types:ODU2e',
705             'monitoring-mode': 'monitored'}
706
707         input_dict_3 = {'trib-port-number': 1}
708
709         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
710                              res['interface'][0])
711         self.assertDictEqual(dict(input_dict_2,
712                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
713                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
714         self.assertDictEqual(dict(input_dict_3,
715                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
716                                       'parent-odu-allocation']),
717                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
718             'parent-odu-allocation'])
719         self.assertIn(1,
720                       res['interface'][0][
721                           'org-openroadm-otn-odu-interfaces:odu'][
722                           'parent-odu-allocation']['trib-slots'])
723
724     def test_38_check_ODU2E_connection_spdrc(self):
725         response = test_utils.check_netconf_node_request(
726             "SPDR-SC1",
727             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
728         self.assertEqual(response.status_code, requests.codes.ok)
729         res = response.json()
730         input_dict_1 = {
731             'connection-name':
732             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
733             'direction': 'bidirectional'
734         }
735
736         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
737                              res['odu-connection'][0])
738         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
739                              res['odu-connection'][0]['destination'])
740         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
741                              res['odu-connection'][0]['source'])
742
743     def test_39_check_otn_topo_links(self):
744         response = test_utils.get_otn_topo_request()
745         self.assertEqual(response.status_code, requests.codes.ok)
746         res = response.json()
747         nb_links = len(res['network'][0]['ietf-network-topology:link'])
748         self.assertEqual(nb_links, 4)
749         for link in res['network'][0]['ietf-network-topology:link']:
750             linkId = link['link-id']
751             if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
752                     linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
753                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
754                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
755
756     def test_40_check_otn_topo_tp(self):
757         response = test_utils.get_otn_topo_request()
758         res = response.json()
759         for node in res['network'][0]['node']:
760             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
761                 tpList = node['ietf-network-topology:termination-point']
762                 for tp in tpList:
763                     if tp['tp-id'] == 'XPDR1-NETWORK1':
764                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
765                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
766                         tsPoolList = [i for i in range(1, 9)]
767                         self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
768                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
769                         self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
770
771     def test_41_delete_10GE_service(self):
772         response = test_utils.service_delete_request("service1-10GE")
773         self.assertEqual(response.status_code, requests.codes.ok)
774         res = response.json()
775         self.assertIn('Renderer service delete in progress',
776                       res['output']['configuration-response-common']['response-message'])
777         time.sleep(20)
778
779     def test_42_check_service_list(self):
780         response = test_utils.get_service_list_request("")
781         self.assertEqual(response.status_code, requests.codes.ok)
782         res = response.json()
783         self.assertEqual(len(res['service-list']['services']), 2)
784         time.sleep(2)
785
786     def test_43_check_no_ODU2e_connection_spdra(self):
787         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
788         self.assertEqual(response.status_code, requests.codes.ok)
789         res = response.json()
790         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
791         time.sleep(1)
792
793     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
794         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
795         self.assertEqual(response.status_code, requests.codes.conflict)
796
797     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
798         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
799         self.assertEqual(response.status_code, requests.codes.conflict)
800
801     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
802         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
803         self.assertEqual(response.status_code, requests.codes.conflict)
804
805     def test_47_check_otn_topo_links(self):
806         response = test_utils.get_otn_topo_request()
807         self.assertEqual(response.status_code, requests.codes.ok)
808         res = response.json()
809         nb_links = len(res['network'][0]['ietf-network-topology:link'])
810         self.assertEqual(nb_links, 4)
811         for link in res['network'][0]['ietf-network-topology:link']:
812             linkId = link['link-id']
813             if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
814                     linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
815                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
816                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
817
818     def test_48_check_otn_topo_tp(self):
819         response = test_utils.get_otn_topo_request()
820         res = response.json()
821         for node in res['network'][0]['node']:
822             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
823                 tpList = node['ietf-network-topology:termination-point']
824                 for tp in tpList:
825                     if tp['tp-id'] == 'XPDR1-NETWORK1':
826                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
827                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
828                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
829
830     def test_49_delete_ODU4_service(self):
831         response = test_utils.service_delete_request("service1-ODU4")
832         self.assertEqual(response.status_code, requests.codes.ok)
833         res = response.json()
834         self.assertIn('Renderer service delete in progress',
835                       res['output']['configuration-response-common']['response-message'])
836         time.sleep(20)
837
838     def test_50_check_service_list(self):
839         response = test_utils.get_service_list_request("")
840         self.assertEqual(response.status_code, requests.codes.ok)
841         res = response.json()
842         self.assertEqual(len(res['service-list']['services']), 1)
843         time.sleep(2)
844
845     def test_51_check_no_interface_ODU4_spdra(self):
846         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
847         self.assertEqual(response.status_code, requests.codes.conflict)
848
849     def test_52_check_otn_topo_links(self):
850         self.test_22_check_otn_topo_otu4_links()
851
852     def test_53_check_otn_topo_tp(self):
853         response = test_utils.get_otn_topo_request()
854         res = response.json()
855         for node in res['network'][0]['node']:
856             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
857                 tpList = node['ietf-network-topology:termination-point']
858                 for tp in tpList:
859                     if tp['tp-id'] == 'XPDR1-NETWORK1':
860                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
861                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
862                         self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
863
864     def test_54_delete_OCH_OTU4_service(self):
865         response = test_utils.service_delete_request("service1-OCH-OTU4")
866         self.assertEqual(response.status_code, requests.codes.ok)
867         res = response.json()
868         self.assertIn('Renderer service delete in progress',
869                       res['output']['configuration-response-common']['response-message'])
870         time.sleep(20)
871
872     def test_55_get_no_service(self):
873         response = test_utils.get_service_list_request("")
874         self.assertEqual(response.status_code, requests.codes.conflict)
875         res = response.json()
876         self.assertIn(
877             {"error-type": "application", "error-tag": "data-missing",
878              "error-message": "Request could not be completed because the relevant data model content does not exist"},
879             res['errors']['error'])
880         time.sleep(1)
881
882     def test_56_check_no_interface_OTU4_spdra(self):
883         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
884         self.assertEqual(response.status_code, requests.codes.conflict)
885
886     def test_57_check_no_interface_OCH_spdra(self):
887         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
888         self.assertEqual(response.status_code, requests.codes.conflict)
889
890     def test_58_getLinks_OtnTopology(self):
891         response = test_utils.get_otn_topo_request()
892         self.assertEqual(response.status_code, requests.codes.ok)
893         res = response.json()
894         self.assertNotIn('ietf-network-topology:link', res['network'][0])
895
896     def test_59_check_openroadm_topo_spdra(self):
897         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
898         self.assertEqual(response.status_code, requests.codes.ok)
899         res = response.json()
900         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
901         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
902         self.assertNotIn('wavelength', dict.keys(
903             tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
904         time.sleep(3)
905
906     def test_60_check_openroadm_topo_ROADMA_SRG(self):
907         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
908         self.assertEqual(response.status_code, requests.codes.ok)
909         res = response.json()
910         self.assertIn({u'index': 1},
911                       res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
912         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
913         for ele in liste_tp:
914             if ele['tp-id'] == 'SRG1-PP1-TXRX':
915                 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
916         time.sleep(3)
917
918     def test_61_check_openroadm_topo_ROADMA_DEG(self):
919         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
920         self.assertEqual(response.status_code, requests.codes.ok)
921         res = response.json()
922         self.assertIn({u'index': 1},
923                       res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
924         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
925         for ele in liste_tp:
926             if ele['tp-id'] == 'DEG2-CTP-TXRX':
927                 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
928             if ele['tp-id'] == 'DEG2-TTP-TXRX':
929                 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
930         time.sleep(3)
931
932     def test_62_disconnect_spdrA(self):
933         response = test_utils.unmount_device("SPDR-SA1")
934         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
935
936     def test_63_disconnect_spdrC(self):
937         response = test_utils.unmount_device("SPDR-SC1")
938         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
939
940     def test_64_disconnect_roadmA(self):
941         response = test_utils.unmount_device("ROADM-A1")
942         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
943
944     def test_65_disconnect_roadmC(self):
945         response = test_utils.unmount_device("ROADM-C1")
946         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
947
948
949 if __name__ == "__main__":
950     unittest.main(verbosity=2)