fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24     WAITING = 20  # nominal value is 300
25
26     cr_serv_sample_data = {"input": {
27         "sdnc-request-header": {
28             "request-id": "request-1",
29             "rpc-action": "service-create",
30             "request-system-id": "appname"
31         },
32         "service-name": "service1-OCH-OTU4",
33         "common-id": "commonId",
34         "connection-type": "infrastructure",
35         "service-a-end": {
36             "service-rate": "100",
37             "node-id": "SPDR-SA1",
38             "service-format": "OTU",
39             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
40             "clli": "NodeSA",
41             "subrate-eth-sla": {
42                     "subrate-eth-sla": {
43                         "committed-info-rate": "100000",
44                         "committed-burst-size": "64"
45                     }
46             },
47             "tx-direction": {
48                 "port": {
49                     "port-device-name": "SPDR-SA1-XPDR1",
50                     "port-type": "fixed",
51                     "port-name": "XPDR1-NETWORK1",
52                     "port-rack": "000000.00",
53                     "port-shelf": "Chassis#1"
54                 },
55                 "lgx": {
56                     "lgx-device-name": "Some lgx-device-name",
57                     "lgx-port-name": "Some lgx-port-name",
58                     "lgx-port-rack": "000000.00",
59                     "lgx-port-shelf": "00"
60                 }
61             },
62             "rx-direction": {
63                 "port": {
64                     "port-device-name": "SPDR-SA1-XPDR1",
65                     "port-type": "fixed",
66                     "port-name": "XPDR1-NETWORK1",
67                     "port-rack": "000000.00",
68                     "port-shelf": "Chassis#1"
69                 },
70                 "lgx": {
71                     "lgx-device-name": "Some lgx-device-name",
72                     "lgx-port-name": "Some lgx-port-name",
73                     "lgx-port-rack": "000000.00",
74                     "lgx-port-shelf": "00"
75                 }
76             },
77             "optic-type": "gray"
78         },
79         "service-z-end": {
80             "service-rate": "100",
81             "node-id": "SPDR-SC1",
82             "service-format": "OTU",
83             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
84             "clli": "NodeSC",
85             "subrate-eth-sla": {
86                     "subrate-eth-sla": {
87                         "committed-info-rate": "100000",
88                         "committed-burst-size": "64"
89                     }
90             },
91             "tx-direction": {
92                 "port": {
93                     "port-device-name": "SPDR-SC1-XPDR1",
94                     "port-type": "fixed",
95                     "port-name": "XPDR1-NETWORK1",
96                     "port-rack": "000000.00",
97                     "port-shelf": "Chassis#1"
98                 },
99                 "lgx": {
100                     "lgx-device-name": "Some lgx-device-name",
101                     "lgx-port-name": "Some lgx-port-name",
102                     "lgx-port-rack": "000000.00",
103                     "lgx-port-shelf": "00"
104                 }
105             },
106             "rx-direction": {
107                 "port": {
108                     "port-device-name": "SPDR-SC1-XPDR1",
109                     "port-type": "fixed",
110                     "port-name": "XPDR1-NETWORK1",
111                     "port-rack": "000000.00",
112                     "port-shelf": "Chassis#1"
113                 },
114                 "lgx": {
115                     "lgx-device-name": "Some lgx-device-name",
116                     "lgx-port-name": "Some lgx-port-name",
117                     "lgx-port-rack": "000000.00",
118                     "lgx-port-shelf": "00"
119                 }
120             },
121             "optic-type": "gray"
122         },
123         "due-date": "2018-06-15T00:00:01Z",
124         "operator-contact": "pw1234"
125     }
126     }
127
128     @classmethod
129     def setUpClass(cls):
130         cls.processes = test_utils.start_tpce()
131         cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
132
133     @classmethod
134     def tearDownClass(cls):
135         # pylint: disable=not-an-iterable
136         for process in cls.processes:
137             test_utils.shutdown_process(process)
138         print("all processes killed")
139
140     def setUp(self):
141         time.sleep(5)
142
143     def test_01_connect_spdrA(self):
144         response = test_utils.mount_device("SPDR-SA1", 'spdra')
145         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146
147     def test_02_connect_spdrC(self):
148         response = test_utils.mount_device("SPDR-SC1", 'spdrc')
149         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150
151     def test_03_connect_rdmA(self):
152         response = test_utils.mount_device("ROADM-A1", 'roadma')
153         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154
155     def test_04_connect_rdmC(self):
156         response = test_utils.mount_device("ROADM-C1", 'roadmc')
157         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
158
159     def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
160         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
161                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
162         self.assertEqual(response.status_code, requests.codes.ok)
163         res = response.json()
164         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
165         time.sleep(2)
166
167     def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
168         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
169                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
170         self.assertEqual(response.status_code, requests.codes.ok)
171         res = response.json()
172         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
173         time.sleep(2)
174
175     def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
176         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
177                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
178         self.assertEqual(response.status_code, requests.codes.ok)
179         res = response.json()
180         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
181         time.sleep(2)
182
183     def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
184         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
185                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
186         self.assertEqual(response.status_code, requests.codes.ok)
187         res = response.json()
188         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
189         time.sleep(2)
190
191     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
192         # Config ROADMA-ROADMC oms-attributes
193         data = {"span": {
194             "auto-spanloss": "true",
195             "spanloss-base": 11.4,
196             "spanloss-current": 12,
197             "engineered-spanloss": 12.2,
198             "link-concatenation": [{
199                 "SRLG-Id": 0,
200                 "fiber-type": "smf",
201                 "SRLG-length": 100000,
202                 "pmd": 0.5}]}}
203         response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
204         self.assertEqual(response.status_code, requests.codes.created)
205
206     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
207         # Config ROADMC-ROADMA oms-attributes
208         data = {"span": {
209             "auto-spanloss": "true",
210             "spanloss-base": 11.4,
211             "spanloss-current": 12,
212             "engineered-spanloss": 12.2,
213             "link-concatenation": [{
214                 "SRLG-Id": 0,
215                 "fiber-type": "smf",
216                 "SRLG-length": 100000,
217                 "pmd": 0.5}]}}
218         response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
219         self.assertEqual(response.status_code, requests.codes.created)
220
221 # test service-create for OCH-OTU4 service from spdr to spdr
222     def test_11_check_otn_topology(self):
223         response = test_utils.get_otn_topo_request()
224         self.assertEqual(response.status_code, requests.codes.ok)
225         res = response.json()
226         nbNode = len(res['network'][0]['node'])
227         self.assertEqual(nbNode, 4)
228         self.assertNotIn('ietf-network-topology:link', res['network'][0])
229
230     def test_12_create_OCH_OTU4_service(self):
231         response = test_utils.service_create_request(self.cr_serv_sample_data)
232         self.assertEqual(response.status_code, requests.codes.ok)
233         res = response.json()
234         self.assertIn('PCE calculation in progress',
235                       res['output']['configuration-response-common']['response-message'])
236         time.sleep(self.WAITING)
237
238     def test_13_get_OCH_OTU4_service1(self):
239         response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
240         self.assertEqual(response.status_code, requests.codes.ok)
241         res = response.json()
242         self.assertEqual(
243             res['services'][0]['administrative-state'], 'inService')
244         self.assertEqual(
245             res['services'][0]['service-name'], 'service1-OCH-OTU4')
246         self.assertEqual(
247             res['services'][0]['connection-type'], 'infrastructure')
248         self.assertEqual(
249             res['services'][0]['lifecycle-state'], 'planned')
250         time.sleep(2)
251
252     # Check correct configuration of devices
253     def test_14_check_interface_och_spdra(self):
254         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
255         self.assertEqual(response.status_code, requests.codes.ok)
256         res = response.json()
257         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
258                                    'administrative-state': 'inService',
259                                    'supporting-circuit-pack-name': 'CP1-CFP0',
260                                    'type': 'org-openroadm-interfaces:opticalChannel',
261                                    'supporting-port': 'CP1-CFP0-P1'
262                                    }, **res['interface'][0]),
263                              res['interface'][0])
264
265         self.assertDictEqual(
266             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
267              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
268             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
269
270     def test_15_check_interface_OTU4_spdra(self):
271         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
272         self.assertEqual(response.status_code, requests.codes.ok)
273         res = response.json()
274         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
275                         'administrative-state': 'inService',
276                         'supporting-circuit-pack-name': 'CP1-CFP0',
277                         'supporting-interface': 'XPDR1-NETWORK1-1',
278                         'type': 'org-openroadm-interfaces:otnOtu',
279                         'supporting-port': 'CP1-CFP0-P1'
280                         }
281         input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
282                         'expected-dapi': 'Swfw02qXGyI=',
283                         'rate': 'org-openroadm-otn-common-types:OTU4',
284                         'fec': 'scfec'
285                         }
286         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
287                              res['interface'][0])
288
289         self.assertDictEqual(input_dict_2,
290                              res['interface'][0]
291                              ['org-openroadm-otn-otu-interfaces:otu'])
292
293     def test_16_check_interface_och_spdrc(self):
294         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
295         self.assertEqual(response.status_code, requests.codes.ok)
296         res = response.json()
297         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
298                                    'administrative-state': 'inService',
299                                    'supporting-circuit-pack-name': 'CP1-CFP0',
300                                    'type': 'org-openroadm-interfaces:opticalChannel',
301                                    'supporting-port': 'CP1-CFP0-P1'
302                                    }, **res['interface'][0]),
303                              res['interface'][0])
304
305         self.assertDictEqual(
306             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
307              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
308             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
309
310     def test_17_check_interface_OTU4_spdrc(self):
311         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
312         self.assertEqual(response.status_code, requests.codes.ok)
313         res = response.json()
314         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
315                         'administrative-state': 'inService',
316                         'supporting-circuit-pack-name': 'CP1-CFP0',
317                         'supporting-interface': 'XPDR1-NETWORK1-1',
318                         'type': 'org-openroadm-interfaces:otnOtu',
319                         'supporting-port': 'CP1-CFP0-P1'
320                         }
321         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
322                         'expected-sapi': 'Swfw02qXGyI=',
323                         'tx-sapi': 'fuYZwEO660g=',
324                         'expected-dapi': 'fuYZwEO660g=',
325                         'rate': 'org-openroadm-otn-common-types:OTU4',
326                         'fec': 'scfec'
327                         }
328
329         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
330                              res['interface'][0])
331
332         self.assertDictEqual(input_dict_2,
333                              res['interface'][0]
334                              ['org-openroadm-otn-otu-interfaces:otu'])
335
336     def test_18_check_no_interface_ODU4_spdra(self):
337         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
338         self.assertEqual(response.status_code, requests.codes.conflict)
339         res = response.json()
340         self.assertIn(
341             {"error-type": "application", "error-tag": "data-missing",
342              "error-message": "Request could not be completed because the relevant data model content does not exist"},
343             res['errors']['error'])
344
345     def test_19_check_openroadm_topo_spdra(self):
346         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
347         self.assertEqual(response.status_code, requests.codes.ok)
348         res = response.json()
349         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
350         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
351         self.assertEqual({u'frequency': 196.1,
352                           u'width': 40},
353                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
354         time.sleep(3)
355
356     def test_20_check_openroadm_topo_ROADMA_SRG(self):
357         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
358         self.assertEqual(response.status_code, requests.codes.ok)
359         res = response.json()
360         self.assertNotIn({u'index': 1},
361                          res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
362         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
363         for ele in liste_tp:
364             if ele['tp-id'] == 'SRG1-PP1-TXRX':
365                 self.assertIn({u'index': 1, u'frequency': 196.1,
366                                u'width': 40},
367                               ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
368             if ele['tp-id'] == 'SRG1-PP2-TXRX':
369                 self.assertNotIn('used-wavelength', dict.keys(ele))
370         time.sleep(3)
371
372     def test_21_check_openroadm_topo_ROADMA_DEG(self):
373         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
374         self.assertEqual(response.status_code, requests.codes.ok)
375         res = response.json()
376         self.assertNotIn({u'index': 1},
377                          res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
378         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
379         for ele in liste_tp:
380             if ele['tp-id'] == 'DEG2-CTP-TXRX':
381                 self.assertIn({u'index': 1, u'frequency': 196.1,
382                                u'width': 40},
383                               ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
384             if ele['tp-id'] == 'DEG2-TTP-TXRX':
385                 self.assertIn({u'index': 1, u'frequency': 196.1,
386                                u'width': 40},
387                               ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
388         time.sleep(3)
389
390     def test_22_check_otn_topo_otu4_links(self):
391         response = test_utils.get_otn_topo_request()
392         self.assertEqual(response.status_code, requests.codes.ok)
393         res = response.json()
394         nb_links = len(res['network'][0]['ietf-network-topology:link'])
395         self.assertEqual(nb_links, 2)
396         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
397                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
398         for link in res['network'][0]['ietf-network-topology:link']:
399             self.assertIn(link['link-id'], listLinkId)
400             self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
401             self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
402             self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
403             self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
404             self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
405
406 # test service-create for ODU4 service from spdr to spdr
407     def test_23_create_ODU4_service(self):
408         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
409         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
410         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
411         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
412         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
413         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
414         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
415
416         response = test_utils.service_create_request(self.cr_serv_sample_data)
417         self.assertEqual(response.status_code, requests.codes.ok)
418         res = response.json()
419         self.assertIn('PCE calculation in progress',
420                       res['output']['configuration-response-common']['response-message'])
421         time.sleep(self.WAITING)
422
423     def test_24_get_ODU4_service1(self):
424         response = test_utils.get_service_list_request("services/service1-ODU4")
425         self.assertEqual(response.status_code, requests.codes.ok)
426         res = response.json()
427         self.assertEqual(
428             res['services'][0]['administrative-state'], 'inService')
429         self.assertEqual(
430             res['services'][0]['service-name'], 'service1-ODU4')
431         self.assertEqual(
432             res['services'][0]['connection-type'], 'infrastructure')
433         self.assertEqual(
434             res['services'][0]['lifecycle-state'], 'planned')
435         time.sleep(2)
436
437     def test_25_check_interface_ODU4_spdra(self):
438         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
439         self.assertEqual(response.status_code, requests.codes.ok)
440         res = response.json()
441         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
442                         'administrative-state': 'inService',
443                         'supporting-circuit-pack-name': 'CP1-CFP0',
444                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
445                         'type': 'org-openroadm-interfaces:otnOdu',
446                         'supporting-port': 'CP1-CFP0-P1'}
447         # SAPI/DAPI are added in the Otu4 renderer
448         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
449                         'rate': 'org-openroadm-otn-common-types:ODU4',
450                         'expected-dapi': 'Swfw02qXGyI=',
451                         'expected-sapi': 'fuYZwEO660g=',
452                         'tx-dapi': 'fuYZwEO660g=',
453                         'tx-sapi': 'Swfw02qXGyI='}
454
455         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
456                              res['interface'][0])
457         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
458                                   **input_dict_2),
459                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
460                              )
461         self.assertDictEqual(
462             {u'payload-type': u'21', u'exp-payload-type': u'21'},
463             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
464
465     def test_26_check_interface_ODU4_spdrc(self):
466         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
467         self.assertEqual(response.status_code, requests.codes.ok)
468         res = response.json()
469         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
470                         'administrative-state': 'inService',
471                         'supporting-circuit-pack-name': 'CP1-CFP0',
472                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
473                         'type': 'org-openroadm-interfaces:otnOdu',
474                         'supporting-port': 'CP1-CFP0-P1'}
475         # SAPI/DAPI are added in the Otu4 renderer
476         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
477                         'rate': 'org-openroadm-otn-common-types:ODU4',
478                         'tx-sapi': 'fuYZwEO660g=',
479                         'tx-dapi': 'Swfw02qXGyI=',
480                         'expected-sapi': 'Swfw02qXGyI=',
481                         'expected-dapi': 'fuYZwEO660g='
482                         }
483         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
484                              res['interface'][0])
485         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
486                                   **input_dict_2),
487                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
488                              )
489         self.assertDictEqual(
490             {u'payload-type': u'21', u'exp-payload-type': u'21'},
491             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
492
493     def test_27_check_otn_topo_links(self):
494         response = test_utils.get_otn_topo_request()
495         self.assertEqual(response.status_code, requests.codes.ok)
496         res = response.json()
497         nb_links = len(res['network'][0]['ietf-network-topology:link'])
498         self.assertEqual(nb_links, 4)
499         for link in res['network'][0]['ietf-network-topology:link']:
500             linkId = link['link-id']
501             if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
502                            'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
503                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
504                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
505             elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
506                              'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
507                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
508                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
509                 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
510                 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
511                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
512                               ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
513                                'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
514             else:
515                 self.fail("this link should not exist")
516
517     def test_28_check_otn_topo_tp(self):
518         response = test_utils.get_otn_topo_request()
519         res = response.json()
520         for node in res['network'][0]['node']:
521             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
522                 tpList = node['ietf-network-topology:termination-point']
523                 for tp in tpList:
524                     if tp['tp-id'] == 'XPDR1-NETWORK1':
525                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
526                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
527                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
528                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
529                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
530
531 # test service-create for 10GE service from spdr to spdr
532     def test_29_create_10GE_service(self):
533         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
534         self.cr_serv_sample_data["input"]["connection-type"] = "service"
535         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
536         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
537         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
538         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
539         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
540         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
541         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
542         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
543         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
544         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
545         response = test_utils.service_create_request(self.cr_serv_sample_data)
546         self.assertEqual(response.status_code, requests.codes.ok)
547         res = response.json()
548         self.assertIn('PCE calculation in progress',
549                       res['output']['configuration-response-common']['response-message'])
550         time.sleep(self.WAITING)
551
552     def test_30_get_10GE_service1(self):
553         response = test_utils.get_service_list_request("services/service1-10GE")
554         self.assertEqual(response.status_code, requests.codes.ok)
555         res = response.json()
556         self.assertEqual(
557             res['services'][0]['administrative-state'], 'inService')
558         self.assertEqual(
559             res['services'][0]['service-name'], 'service1-10GE')
560         self.assertEqual(
561             res['services'][0]['connection-type'], 'service')
562         self.assertEqual(
563             res['services'][0]['lifecycle-state'], 'planned')
564         time.sleep(2)
565
566     def test_31_check_interface_10GE_CLIENT_spdra(self):
567         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
568         self.assertEqual(response.status_code, requests.codes.ok)
569         res = response.json()
570         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
571                       'administrative-state': 'inService',
572                       'supporting-circuit-pack-name': 'CP1-SFP4',
573                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
574                       'supporting-port': 'CP1-SFP4-P1'
575                       }
576         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
577                              res['interface'][0])
578         self.assertDictEqual(
579             {u'speed': 10000},
580             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
581
582     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
583         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
584         self.assertEqual(response.status_code, requests.codes.ok)
585         res = response.json()
586         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
587                         'administrative-state': 'inService',
588                         'supporting-circuit-pack-name': 'CP1-SFP4',
589                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
590                         'type': 'org-openroadm-interfaces:otnOdu',
591                         'supporting-port': 'CP1-SFP4-P1'}
592         input_dict_2 = {
593             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
594             'rate': 'org-openroadm-otn-common-types:ODU2e',
595             'monitoring-mode': 'terminated'}
596
597         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
598                              res['interface'][0])
599         self.assertDictEqual(dict(input_dict_2,
600                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
601                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
602         self.assertDictEqual(
603             {u'payload-type': u'03', u'exp-payload-type': u'03'},
604             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
605
606     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
607         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
608         self.assertEqual(response.status_code, requests.codes.ok)
609         res = response.json()
610         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
611                         'administrative-state': 'inService',
612                         'supporting-circuit-pack-name': 'CP1-CFP0',
613                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
614                         'type': 'org-openroadm-interfaces:otnOdu',
615                         'supporting-port': 'CP1-CFP0-P1'}
616         input_dict_2 = {
617             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
618             'rate': 'org-openroadm-otn-common-types:ODU2e',
619             'monitoring-mode': 'monitored'}
620         input_dict_3 = {'trib-port-number': 1}
621
622         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
623                              res['interface'][0])
624         self.assertDictEqual(dict(input_dict_2,
625                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
626                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
627         self.assertDictEqual(dict(input_dict_3,
628                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
629                                       'parent-odu-allocation']),
630                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
631         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
632                       ['trib-slots'])
633
634     def test_34_check_ODU2E_connection_spdra(self):
635         response = test_utils.check_netconf_node_request(
636             "SPDR-SA1",
637             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
638         self.assertEqual(response.status_code, requests.codes.ok)
639         res = response.json()
640         input_dict_1 = {
641             'connection-name':
642             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
643             'direction': 'bidirectional'
644         }
645
646         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
647                              res['odu-connection'][0])
648         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
649                              res['odu-connection'][0]['destination'])
650         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
651                              res['odu-connection'][0]['source'])
652
653     def test_35_check_interface_10GE_CLIENT_spdrc(self):
654         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
655         self.assertEqual(response.status_code, requests.codes.ok)
656         res = response.json()
657         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
658                       'administrative-state': 'inService',
659                       'supporting-circuit-pack-name': 'CP1-SFP4',
660                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
661                       'supporting-port': 'CP1-SFP4-P1'
662                       }
663         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
664                              res['interface'][0])
665         self.assertDictEqual(
666             {u'speed': 10000},
667             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
668
669     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
670         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
671         self.assertEqual(response.status_code, requests.codes.ok)
672         res = response.json()
673         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
674                         'administrative-state': 'inService',
675                         'supporting-circuit-pack-name': 'CP1-SFP4',
676                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
677                         'type': 'org-openroadm-interfaces:otnOdu',
678                         'supporting-port': 'CP1-SFP4-P1'}
679         input_dict_2 = {
680             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
681             'rate': 'org-openroadm-otn-common-types:ODU2e',
682             'monitoring-mode': 'terminated'}
683
684         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
685                              res['interface'][0])
686         self.assertDictEqual(dict(input_dict_2,
687                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
688                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
689         self.assertDictEqual(
690             {u'payload-type': u'03', u'exp-payload-type': u'03'},
691             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
692
693     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
694         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
695         self.assertEqual(response.status_code, requests.codes.ok)
696         res = response.json()
697         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
698                         'administrative-state': 'inService',
699                         'supporting-circuit-pack-name': 'CP1-CFP0',
700                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
701                         'type': 'org-openroadm-interfaces:otnOdu',
702                         'supporting-port': 'CP1-CFP0-P1'}
703         input_dict_2 = {
704             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
705             'rate': 'org-openroadm-otn-common-types:ODU2e',
706             'monitoring-mode': 'monitored'}
707
708         input_dict_3 = {'trib-port-number': 1}
709
710         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
711                              res['interface'][0])
712         self.assertDictEqual(dict(input_dict_2,
713                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
714                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
715         self.assertDictEqual(dict(input_dict_3,
716                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
717                                       'parent-odu-allocation']),
718                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
719             'parent-odu-allocation'])
720         self.assertIn(1,
721                       res['interface'][0][
722                           'org-openroadm-otn-odu-interfaces:odu'][
723                           'parent-odu-allocation']['trib-slots'])
724
725     def test_38_check_ODU2E_connection_spdrc(self):
726         response = test_utils.check_netconf_node_request(
727             "SPDR-SC1",
728             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
729         self.assertEqual(response.status_code, requests.codes.ok)
730         res = response.json()
731         input_dict_1 = {
732             'connection-name':
733             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
734             'direction': 'bidirectional'
735         }
736
737         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
738                              res['odu-connection'][0])
739         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
740                              res['odu-connection'][0]['destination'])
741         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
742                              res['odu-connection'][0]['source'])
743
744     def test_39_check_otn_topo_links(self):
745         response = test_utils.get_otn_topo_request()
746         self.assertEqual(response.status_code, requests.codes.ok)
747         res = response.json()
748         nb_links = len(res['network'][0]['ietf-network-topology:link'])
749         self.assertEqual(nb_links, 4)
750         for link in res['network'][0]['ietf-network-topology:link']:
751             linkId = link['link-id']
752             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
753                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
754                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
755                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
756
757     def test_40_check_otn_topo_tp(self):
758         response = test_utils.get_otn_topo_request()
759         res = response.json()
760         for node in res['network'][0]['node']:
761             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
762                 tpList = node['ietf-network-topology:termination-point']
763                 for tp in tpList:
764                     if tp['tp-id'] == 'XPDR1-NETWORK1':
765                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
766                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
767                         tsPoolList = list(range(1, 9))
768                         self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
769                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
770                         self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
771
772     def test_41_delete_10GE_service(self):
773         response = test_utils.service_delete_request("service1-10GE")
774         self.assertEqual(response.status_code, requests.codes.ok)
775         res = response.json()
776         self.assertIn('Renderer service delete in progress',
777                       res['output']['configuration-response-common']['response-message'])
778         time.sleep(20)
779
780     def test_42_check_service_list(self):
781         response = test_utils.get_service_list_request("")
782         self.assertEqual(response.status_code, requests.codes.ok)
783         res = response.json()
784         self.assertEqual(len(res['service-list']['services']), 2)
785         time.sleep(2)
786
787     def test_43_check_no_ODU2e_connection_spdra(self):
788         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
789         self.assertEqual(response.status_code, requests.codes.ok)
790         res = response.json()
791         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
792         time.sleep(1)
793
794     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
795         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
796         self.assertEqual(response.status_code, requests.codes.conflict)
797
798     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
799         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
800         self.assertEqual(response.status_code, requests.codes.conflict)
801
802     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
803         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
804         self.assertEqual(response.status_code, requests.codes.conflict)
805
806     def test_47_check_otn_topo_links(self):
807         response = test_utils.get_otn_topo_request()
808         self.assertEqual(response.status_code, requests.codes.ok)
809         res = response.json()
810         nb_links = len(res['network'][0]['ietf-network-topology:link'])
811         self.assertEqual(nb_links, 4)
812         for link in res['network'][0]['ietf-network-topology:link']:
813             linkId = link['link-id']
814             if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
815                            'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
816                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
817                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
818
819     def test_48_check_otn_topo_tp(self):
820         response = test_utils.get_otn_topo_request()
821         res = response.json()
822         for node in res['network'][0]['node']:
823             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
824                 tpList = node['ietf-network-topology:termination-point']
825                 for tp in tpList:
826                     if tp['tp-id'] == 'XPDR1-NETWORK1':
827                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
828                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
829                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
830
831     def test_49_delete_ODU4_service(self):
832         response = test_utils.service_delete_request("service1-ODU4")
833         self.assertEqual(response.status_code, requests.codes.ok)
834         res = response.json()
835         self.assertIn('Renderer service delete in progress',
836                       res['output']['configuration-response-common']['response-message'])
837         time.sleep(20)
838
839     def test_50_check_service_list(self):
840         response = test_utils.get_service_list_request("")
841         self.assertEqual(response.status_code, requests.codes.ok)
842         res = response.json()
843         self.assertEqual(len(res['service-list']['services']), 1)
844         time.sleep(2)
845
846     def test_51_check_no_interface_ODU4_spdra(self):
847         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
848         self.assertEqual(response.status_code, requests.codes.conflict)
849
850     def test_52_check_otn_topo_links(self):
851         self.test_22_check_otn_topo_otu4_links()
852
853     def test_53_check_otn_topo_tp(self):
854         response = test_utils.get_otn_topo_request()
855         res = response.json()
856         for node in res['network'][0]['node']:
857             if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
858                 tpList = node['ietf-network-topology:termination-point']
859                 for tp in tpList:
860                     if tp['tp-id'] == 'XPDR1-NETWORK1':
861                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
862                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
863                         self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
864
865     def test_54_delete_OCH_OTU4_service(self):
866         response = test_utils.service_delete_request("service1-OCH-OTU4")
867         self.assertEqual(response.status_code, requests.codes.ok)
868         res = response.json()
869         self.assertIn('Renderer service delete in progress',
870                       res['output']['configuration-response-common']['response-message'])
871         time.sleep(20)
872
873     def test_55_get_no_service(self):
874         response = test_utils.get_service_list_request("")
875         self.assertEqual(response.status_code, requests.codes.conflict)
876         res = response.json()
877         self.assertIn(
878             {"error-type": "application", "error-tag": "data-missing",
879              "error-message": "Request could not be completed because the relevant data model content does not exist"},
880             res['errors']['error'])
881         time.sleep(1)
882
883     def test_56_check_no_interface_OTU4_spdra(self):
884         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
885         self.assertEqual(response.status_code, requests.codes.conflict)
886
887     def test_57_check_no_interface_OCH_spdra(self):
888         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
889         self.assertEqual(response.status_code, requests.codes.conflict)
890
891     def test_58_getLinks_OtnTopology(self):
892         response = test_utils.get_otn_topo_request()
893         self.assertEqual(response.status_code, requests.codes.ok)
894         res = response.json()
895         self.assertNotIn('ietf-network-topology:link', res['network'][0])
896
897     def test_59_check_openroadm_topo_spdra(self):
898         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
899         self.assertEqual(response.status_code, requests.codes.ok)
900         res = response.json()
901         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
902         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
903         self.assertNotIn('wavelength', dict.keys(
904             tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
905         time.sleep(3)
906
907     def test_60_check_openroadm_topo_ROADMA_SRG(self):
908         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
909         self.assertEqual(response.status_code, requests.codes.ok)
910         res = response.json()
911         self.assertIn({u'index': 1},
912                       res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
913         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
914         for ele in liste_tp:
915             if ele['tp-id'] == 'SRG1-PP1-TXRX':
916                 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
917         time.sleep(3)
918
919     def test_61_check_openroadm_topo_ROADMA_DEG(self):
920         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
921         self.assertEqual(response.status_code, requests.codes.ok)
922         res = response.json()
923         self.assertIn({u'index': 1},
924                       res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
925         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
926         for ele in liste_tp:
927             if ele['tp-id'] == 'DEG2-CTP-TXRX':
928                 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
929             if ele['tp-id'] == 'DEG2-TTP-TXRX':
930                 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
931         time.sleep(3)
932
933     def test_62_disconnect_spdrA(self):
934         response = test_utils.unmount_device("SPDR-SA1")
935         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
936
937     def test_63_disconnect_spdrC(self):
938         response = test_utils.unmount_device("SPDR-SC1")
939         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
940
941     def test_64_disconnect_roadmA(self):
942         response = test_utils.unmount_device("ROADM-A1")
943         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
944
945     def test_65_disconnect_roadmC(self):
946         response = test_utils.unmount_device("ROADM-C1")
947         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
948
949
950 if __name__ == "__main__":
951     unittest.main(verbosity=2)