use service deletion method in OTN E2E functest
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_end2end.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import time
14 import requests
15 from common import test_utils
16
17
18 class TransportPCEtesting(unittest.TestCase):
19
20     processes = None
21     WAITING = 20  # nominal value is 300
22
23     cr_serv_sample_data = {"input": {
24         "sdnc-request-header": {
25             "request-id": "request-1",
26             "rpc-action": "service-create",
27             "request-system-id": "appname"
28         },
29         "service-name": "service1-OCH-OTU4",
30         "common-id": "commonId",
31         "connection-type": "infrastructure",
32         "service-a-end": {
33             "service-rate": "100",
34             "node-id": "SPDR-SA1",
35             "service-format": "OTU",
36             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
37             "clli": "NodeSA",
38             "subrate-eth-sla": {
39                     "subrate-eth-sla": {
40                         "committed-info-rate": "100000",
41                         "committed-burst-size": "64"
42                     }
43             },
44             "tx-direction": {
45                 "port": {
46                     "port-device-name": "SPDR-SA1-XPDR1",
47                     "port-type": "fixed",
48                     "port-name": "XPDR1-NETWORK1",
49                     "port-rack": "000000.00",
50                     "port-shelf": "Chassis#1"
51                 },
52                 "lgx": {
53                     "lgx-device-name": "Some lgx-device-name",
54                     "lgx-port-name": "Some lgx-port-name",
55                     "lgx-port-rack": "000000.00",
56                     "lgx-port-shelf": "00"
57                 }
58             },
59             "rx-direction": {
60                 "port": {
61                     "port-device-name": "SPDR-SA1-XPDR1",
62                     "port-type": "fixed",
63                     "port-name": "XPDR1-NETWORK1",
64                     "port-rack": "000000.00",
65                     "port-shelf": "Chassis#1"
66                 },
67                 "lgx": {
68                     "lgx-device-name": "Some lgx-device-name",
69                     "lgx-port-name": "Some lgx-port-name",
70                     "lgx-port-rack": "000000.00",
71                     "lgx-port-shelf": "00"
72                 }
73             },
74             "optic-type": "gray"
75         },
76         "service-z-end": {
77             "service-rate": "100",
78             "node-id": "SPDR-SC1",
79             "service-format": "OTU",
80             "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
81             "clli": "NodeSC",
82             "subrate-eth-sla": {
83                     "subrate-eth-sla": {
84                         "committed-info-rate": "100000",
85                         "committed-burst-size": "64"
86                     }
87             },
88             "tx-direction": {
89                 "port": {
90                     "port-device-name": "SPDR-SC1-XPDR1",
91                     "port-type": "fixed",
92                     "port-name": "XPDR1-NETWORK1",
93                     "port-rack": "000000.00",
94                     "port-shelf": "Chassis#1"
95                 },
96                 "lgx": {
97                     "lgx-device-name": "Some lgx-device-name",
98                     "lgx-port-name": "Some lgx-port-name",
99                     "lgx-port-rack": "000000.00",
100                     "lgx-port-shelf": "00"
101                 }
102             },
103             "rx-direction": {
104                 "port": {
105                     "port-device-name": "SPDR-SC1-XPDR1",
106                     "port-type": "fixed",
107                     "port-name": "XPDR1-NETWORK1",
108                     "port-rack": "000000.00",
109                     "port-shelf": "Chassis#1"
110                 },
111                 "lgx": {
112                     "lgx-device-name": "Some lgx-device-name",
113                     "lgx-port-name": "Some lgx-port-name",
114                     "lgx-port-rack": "000000.00",
115                     "lgx-port-shelf": "00"
116                 }
117             },
118             "optic-type": "gray"
119         },
120         "due-date": "2018-06-15T00:00:01Z",
121         "operator-contact": "pw1234"
122     }
123     }
124
125     @classmethod
126     def setUpClass(cls):
127         cls.processes = test_utils.start_tpce()
128         cls.processes = test_utils.start_sims(['spdra', 'roadma', 'roadmc', 'spdrc'])
129
130     @classmethod
131     def tearDownClass(cls):
132         for process in cls.processes:
133             test_utils.shutdown_process(process)
134         print("all processes killed")
135
136     def setUp(self):
137         time.sleep(5)
138
139     def test_01_connect_spdrA(self):
140         response = test_utils.mount_device("SPDR-SA1", 'spdra')
141         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
142
143     def test_02_connect_spdrC(self):
144         response = test_utils.mount_device("SPDR-SC1", 'spdrc')
145         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146
147     def test_03_connect_rdmA(self):
148         response = test_utils.mount_device("ROADM-A1", 'roadma')
149         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150
151     def test_04_connect_rdmC(self):
152         response = test_utils.mount_device("ROADM-C1", 'roadmc')
153         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154
155     def test_05_connect_sprdA_N1_to_roadmA_PP1(self):
156         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
157                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
158         self.assertEqual(response.status_code, requests.codes.ok)
159         res = response.json()
160         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
161         time.sleep(2)
162
163     def test_06_connect_roadmA_PP1_to_spdrA_N1(self):
164         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
165                                                           "ROADM-A1", "1", "SRG1-PP1-TXRX")
166         self.assertEqual(response.status_code, requests.codes.ok)
167         res = response.json()
168         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
169         time.sleep(2)
170
171     def test_07_connect_sprdC_N1_to_roadmC_PP1(self):
172         response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
173                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
174         self.assertEqual(response.status_code, requests.codes.ok)
175         res = response.json()
176         self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
177         time.sleep(2)
178
179     def test_08_connect_roadmC_PP1_to_spdrC_N1(self):
180         response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
181                                                           "ROADM-C1", "1", "SRG1-PP1-TXRX")
182         self.assertEqual(response.status_code, requests.codes.ok)
183         res = response.json()
184         self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
185         time.sleep(2)
186
187     def test_09_add_omsAttributes_ROADMA_ROADMC(self):
188         # Config ROADMA-ROADMC oms-attributes
189         data = {"span": {
190             "auto-spanloss": "true",
191             "spanloss-base": 11.4,
192             "spanloss-current": 12,
193             "engineered-spanloss": 12.2,
194             "link-concatenation": [{
195                 "SRLG-Id": 0,
196                 "fiber-type": "smf",
197                 "SRLG-length": 100000,
198                 "pmd": 0.5}]}}
199         response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
200         self.assertEqual(response.status_code, requests.codes.created)
201
202     def test_10_add_omsAttributes_ROADMC_ROADMA(self):
203         # Config ROADMC-ROADMA oms-attributes
204         data = {"span": {
205             "auto-spanloss": "true",
206             "spanloss-base": 11.4,
207             "spanloss-current": 12,
208             "engineered-spanloss": 12.2,
209             "link-concatenation": [{
210                 "SRLG-Id": 0,
211                 "fiber-type": "smf",
212                 "SRLG-length": 100000,
213                 "pmd": 0.5}]}}
214         response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
215         self.assertEqual(response.status_code, requests.codes.created)
216
217 # test service-create for OCH-OTU4 service from spdr to spdr
218     def test_11_check_otn_topology(self):
219         response = test_utils.get_otn_topo_request()
220         self.assertEqual(response.status_code, requests.codes.ok)
221         res = response.json()
222         nbNode = len(res['network'][0]['node'])
223         self.assertEqual(nbNode, 4)
224         self.assertNotIn('ietf-network-topology:link', res['network'][0])
225
226     def test_12_create_OCH_OTU4_service(self):
227         response = test_utils.service_create_request(self.cr_serv_sample_data)
228         self.assertEqual(response.status_code, requests.codes.ok)
229         res = response.json()
230         self.assertIn('PCE calculation in progress',
231                       res['output']['configuration-response-common']['response-message'])
232         time.sleep(self.WAITING)
233
234     def test_13_get_OCH_OTU4_service1(self):
235         response = test_utils.get_service_list_request("services/service1-OCH-OTU4")
236         self.assertEqual(response.status_code, requests.codes.ok)
237         res = response.json()
238         self.assertEqual(
239             res['services'][0]['administrative-state'], 'inService')
240         self.assertEqual(
241             res['services'][0]['service-name'], 'service1-OCH-OTU4')
242         self.assertEqual(
243             res['services'][0]['connection-type'], 'infrastructure')
244         self.assertEqual(
245             res['services'][0]['lifecycle-state'], 'planned')
246         time.sleep(2)
247
248     # Check correct configuration of devices
249     def test_14_check_interface_och_spdra(self):
250         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
251         self.assertEqual(response.status_code, requests.codes.ok)
252         res = response.json()
253         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
254                                    'administrative-state': 'inService',
255                                    'supporting-circuit-pack-name': 'CP1-CFP0',
256                                    'type': 'org-openroadm-interfaces:opticalChannel',
257                                    'supporting-port': 'CP1-CFP0-P1'
258                                    }, **res['interface'][0]),
259                              res['interface'][0])
260
261         self.assertDictEqual(
262             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
263              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
264             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
265
266     def test_15_check_interface_OTU4_spdra(self):
267         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
268         self.assertEqual(response.status_code, requests.codes.ok)
269         res = response.json()
270         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
271                         'administrative-state': 'inService',
272                         'supporting-circuit-pack-name': 'CP1-CFP0',
273                         'supporting-interface': 'XPDR1-NETWORK1-1',
274                         'type': 'org-openroadm-interfaces:otnOtu',
275                         'supporting-port': 'CP1-CFP0-P1'
276                         }
277         input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
278                         'expected-dapi': 'Swfw02qXGyI=',
279                         'rate': 'org-openroadm-otn-common-types:OTU4',
280                         'fec': 'scfec'
281                         }
282         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
283                              res['interface'][0])
284
285         self.assertDictEqual(input_dict_2,
286                              res['interface'][0]
287                              ['org-openroadm-otn-otu-interfaces:otu'])
288
289     def test_16_check_interface_och_spdrc(self):
290         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-1")
291         self.assertEqual(response.status_code, requests.codes.ok)
292         res = response.json()
293         self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
294                                    'administrative-state': 'inService',
295                                    'supporting-circuit-pack-name': 'CP1-CFP0',
296                                    'type': 'org-openroadm-interfaces:opticalChannel',
297                                    'supporting-port': 'CP1-CFP0-P1'
298                                    }, **res['interface'][0]),
299                              res['interface'][0])
300
301         self.assertDictEqual(
302             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
303              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
304             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
305
306     def test_17_check_interface_OTU4_spdrc(self):
307         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
308         self.assertEqual(response.status_code, requests.codes.ok)
309         res = response.json()
310         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
311                         'administrative-state': 'inService',
312                         'supporting-circuit-pack-name': 'CP1-CFP0',
313                         'supporting-interface': 'XPDR1-NETWORK1-1',
314                         'type': 'org-openroadm-interfaces:otnOtu',
315                         'supporting-port': 'CP1-CFP0-P1'
316                         }
317         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
318                         'expected-sapi': 'Swfw02qXGyI=',
319                         'tx-sapi': 'fuYZwEO660g=',
320                         'expected-dapi': 'fuYZwEO660g=',
321                         'rate': 'org-openroadm-otn-common-types:OTU4',
322                         'fec': 'scfec'
323                         }
324
325         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
326                              res['interface'][0])
327
328         self.assertDictEqual(input_dict_2,
329                              res['interface'][0]
330                              ['org-openroadm-otn-otu-interfaces:otu'])
331
332     def test_18_check_no_interface_ODU4_spdra(self):
333         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
334         self.assertEqual(response.status_code, requests.codes.not_found)
335         res = response.json()
336         self.assertIn(
337             {"error-type": "application", "error-tag": "data-missing",
338              "error-message": "Request could not be completed because the relevant data model content does not exist"},
339             res['errors']['error'])
340
341     def test_19_check_openroadm_topo_spdra(self):
342         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
343         self.assertEqual(response.status_code, requests.codes.ok)
344         res = response.json()
345         ele = res['node'][0]['ietf-network-topology:termination-point'][0]
346         self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
347         self.assertEqual({u'frequency': 196.1,
348                           u'width': 40},
349                          ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
350         time.sleep(3)
351
352     def test_20_check_openroadm_topo_ROADMA_SRG(self):
353         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
354         self.assertEqual(response.status_code, requests.codes.ok)
355         res = response.json()
356         self.assertNotIn({u'index': 1},
357                          res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
358         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
359         for ele in liste_tp:
360             if ele['tp-id'] == 'SRG1-PP1-TXRX':
361                 self.assertIn({u'index': 1, u'frequency': 196.1,
362                                u'width': 40},
363                               ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
364             if ele['tp-id'] == 'SRG1-PP2-TXRX':
365                 self.assertNotIn('used-wavelength', dict.keys(ele))
366         time.sleep(3)
367
368     def test_21_check_openroadm_topo_ROADMA_DEG(self):
369         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
370         self.assertEqual(response.status_code, requests.codes.ok)
371         res = response.json()
372         self.assertNotIn({u'index': 1},
373                          res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
374         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
375         for ele in liste_tp:
376             if ele['tp-id'] == 'DEG2-CTP-TXRX':
377                 self.assertIn({u'index': 1, u'frequency': 196.1,
378                                u'width': 40},
379                               ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
380             if ele['tp-id'] == 'DEG2-TTP-TXRX':
381                 self.assertIn({u'index': 1, u'frequency': 196.1,
382                                u'width': 40},
383                               ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
384         time.sleep(3)
385
386     def test_22_check_otn_topo_otu4_links(self):
387         response = test_utils.get_otn_topo_request()
388         self.assertEqual(response.status_code, requests.codes.ok)
389         res = response.json()
390         nb_links = len(res['network'][0]['ietf-network-topology:link'])
391         self.assertEqual(nb_links, 2)
392         listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
393                       'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
394         for link in res['network'][0]['ietf-network-topology:link']:
395             self.assertIn(link['link-id'], listLinkId)
396             self.assertEqual(link['transportpce-topology:otn-link-type'], 'OTU4')
397             self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
398             self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
399             self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
400             self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
401
402 # test service-create for ODU4 service from spdr to spdr
403     def test_23_create_ODU4_service(self):
404         self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
405         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
406         del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
407         self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
408         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
409         del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
410         self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
411
412         response = test_utils.service_create_request(self.cr_serv_sample_data)
413         self.assertEqual(response.status_code, requests.codes.ok)
414         res = response.json()
415         self.assertIn('PCE calculation in progress',
416                       res['output']['configuration-response-common']['response-message'])
417         time.sleep(self.WAITING)
418
419     def test_24_get_ODU4_service1(self):
420         response = test_utils.get_service_list_request("services/service1-ODU4")
421         self.assertEqual(response.status_code, requests.codes.ok)
422         res = response.json()
423         self.assertEqual(
424             res['services'][0]['administrative-state'], 'inService')
425         self.assertEqual(
426             res['services'][0]['service-name'], 'service1-ODU4')
427         self.assertEqual(
428             res['services'][0]['connection-type'], 'infrastructure')
429         self.assertEqual(
430             res['services'][0]['lifecycle-state'], 'planned')
431         time.sleep(2)
432
433     def test_25_check_interface_ODU4_spdra(self):
434         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
435         self.assertEqual(response.status_code, requests.codes.ok)
436         res = response.json()
437         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
438                         'administrative-state': 'inService',
439                         'supporting-circuit-pack-name': 'CP1-CFP0',
440                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
441                         'type': 'org-openroadm-interfaces:otnOdu',
442                         'supporting-port': 'CP1-CFP0-P1'}
443         # SAPI/DAPI are added in the Otu4 renderer
444         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
445                         'rate': 'org-openroadm-otn-common-types:ODU4',
446                         'expected-dapi': 'Swfw02qXGyI=',
447                         'expected-sapi': 'fuYZwEO660g=',
448                         'tx-dapi': 'fuYZwEO660g=',
449                         'tx-sapi': 'Swfw02qXGyI='}
450
451         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
452                              res['interface'][0])
453         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
454                                   **input_dict_2),
455                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
456                              )
457         self.assertDictEqual(
458             {u'payload-type': u'21', u'exp-payload-type': u'21'},
459             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
460
461     def test_26_check_interface_ODU4_spdrc(self):
462         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
463         self.assertEqual(response.status_code, requests.codes.ok)
464         res = response.json()
465         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
466                         'administrative-state': 'inService',
467                         'supporting-circuit-pack-name': 'CP1-CFP0',
468                         'supporting-interface': 'XPDR1-NETWORK1-OTU',
469                         'type': 'org-openroadm-interfaces:otnOdu',
470                         'supporting-port': 'CP1-CFP0-P1'}
471         # SAPI/DAPI are added in the Otu4 renderer
472         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
473                         'rate': 'org-openroadm-otn-common-types:ODU4',
474                         'tx-sapi': 'fuYZwEO660g=',
475                         'tx-dapi': 'Swfw02qXGyI=',
476                         'expected-sapi': 'Swfw02qXGyI=',
477                         'expected-dapi': 'fuYZwEO660g='
478                         }
479         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
480                              res['interface'][0])
481         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
482                                   **input_dict_2),
483                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
484                              )
485         self.assertDictEqual(
486             {u'payload-type': u'21', u'exp-payload-type': u'21'},
487             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
488
489     def test_27_check_otn_topo_links(self):
490         response = test_utils.get_otn_topo_request()
491         self.assertEqual(response.status_code, requests.codes.ok)
492         res = response.json()
493         nb_links = len(res['network'][0]['ietf-network-topology:link'])
494         self.assertEqual(nb_links, 4)
495         for link in res['network'][0]['ietf-network-topology:link']:
496             linkId = link['link-id']
497             if (linkId == 'OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
498                     linkId == 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
499                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
500                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
501             elif (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
502                   linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
503                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
504                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
505                 self.assertEqual(link['transportpce-topology:otn-link-type'], 'ODTU4')
506                 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
507                 self.assertIn(link['org-openroadm-common-network:opposite-link'],
508                               ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
509                                'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
510             else:
511                 self.fail("this link should not exist")
512
513     def test_28_check_otn_topo_tp(self):
514         response = test_utils.get_otn_topo_request()
515         res = response.json()
516         for node in res['network'][0]['node']:
517             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
518                 tpList = node['ietf-network-topology:termination-point']
519                 for tp in tpList:
520                     if (tp['tp-id'] == 'XPDR1-NETWORK1'):
521                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
522                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
523                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
524                         self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
525                                          'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
526
527 # test service-create for 10GE service from spdr to spdr
528     def test_29_create_10GE_service(self):
529         self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
530         self.cr_serv_sample_data["input"]["connection-type"] = "service"
531         self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
532         self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
533         del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
534         self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
535         self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
536         self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
537         self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
538         del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
539         self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
540         self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
541         response = test_utils.service_create_request(self.cr_serv_sample_data)
542         self.assertEqual(response.status_code, requests.codes.ok)
543         res = response.json()
544         self.assertIn('PCE calculation in progress',
545                       res['output']['configuration-response-common']['response-message'])
546         time.sleep(self.WAITING)
547
548     def test_30_get_10GE_service1(self):
549         response = test_utils.get_service_list_request("services/service1-10GE")
550         self.assertEqual(response.status_code, requests.codes.ok)
551         res = response.json()
552         self.assertEqual(
553             res['services'][0]['administrative-state'], 'inService')
554         self.assertEqual(
555             res['services'][0]['service-name'], 'service1-10GE')
556         self.assertEqual(
557             res['services'][0]['connection-type'], 'service')
558         self.assertEqual(
559             res['services'][0]['lifecycle-state'], 'planned')
560         time.sleep(2)
561
562     def test_31_check_interface_10GE_CLIENT_spdra(self):
563         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
564         self.assertEqual(response.status_code, requests.codes.ok)
565         res = response.json()
566         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
567                       'administrative-state': 'inService',
568                       'supporting-circuit-pack-name': 'CP1-SFP4',
569                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
570                       'supporting-port': 'CP1-SFP4-P1'
571                       }
572         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
573                              res['interface'][0])
574         self.assertDictEqual(
575             {u'speed': 10000},
576             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
577
578     def test_32_check_interface_ODU2E_CLIENT_spdra(self):
579         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
580         self.assertEqual(response.status_code, requests.codes.ok)
581         res = response.json()
582         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
583                         'administrative-state': 'inService',
584                         'supporting-circuit-pack-name': 'CP1-SFP4',
585                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
586                         'type': 'org-openroadm-interfaces:otnOdu',
587                         'supporting-port': 'CP1-SFP4-P1'}
588         input_dict_2 = {
589             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
590             'rate': 'org-openroadm-otn-common-types:ODU2e',
591             'monitoring-mode': 'terminated'}
592
593         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
594                              res['interface'][0])
595         self.assertDictEqual(dict(input_dict_2,
596                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
597                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
598         self.assertDictEqual(
599             {u'payload-type': u'03', u'exp-payload-type': u'03'},
600             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
601
602     def test_33_check_interface_ODU2E_NETWORK_spdra(self):
603         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
604         self.assertEqual(response.status_code, requests.codes.ok)
605         res = response.json()
606         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
607                         'administrative-state': 'inService',
608                         'supporting-circuit-pack-name': 'CP1-CFP0',
609                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
610                         'type': 'org-openroadm-interfaces:otnOdu',
611                         'supporting-port': 'CP1-CFP0-P1'}
612         input_dict_2 = {
613             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
614             'rate': 'org-openroadm-otn-common-types:ODU2e',
615             'monitoring-mode': 'monitored'}
616         input_dict_3 = {'trib-port-number': 1}
617
618         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
619                              res['interface'][0])
620         self.assertDictEqual(dict(input_dict_2,
621                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
622                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
623         self.assertDictEqual(dict(input_dict_3,
624                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
625                                       'parent-odu-allocation']),
626                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
627         self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
628                       ['trib-slots'])
629
630     def test_34_check_ODU2E_connection_spdra(self):
631         response = test_utils.check_netconf_node_request(
632             "SPDR-SA1",
633             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
634         self.assertEqual(response.status_code, requests.codes.ok)
635         res = response.json()
636         input_dict_1 = {
637             'connection-name':
638             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
639             'direction': 'bidirectional'
640         }
641
642         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
643                              res['odu-connection'][0])
644         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
645                              res['odu-connection'][0]['destination'])
646         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
647                              res['odu-connection'][0]['source'])
648
649     def test_35_check_interface_10GE_CLIENT_spdrc(self):
650         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
651         self.assertEqual(response.status_code, requests.codes.ok)
652         res = response.json()
653         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
654                       'administrative-state': 'inService',
655                       'supporting-circuit-pack-name': 'CP1-SFP4',
656                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
657                       'supporting-port': 'CP1-SFP4-P1'
658                       }
659         self.assertDictEqual(dict(input_dict, **res['interface'][0]),
660                              res['interface'][0])
661         self.assertDictEqual(
662             {u'speed': 10000},
663             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
664
665     def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
666         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
667         self.assertEqual(response.status_code, requests.codes.ok)
668         res = response.json()
669         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
670                         'administrative-state': 'inService',
671                         'supporting-circuit-pack-name': 'CP1-SFP4',
672                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
673                         'type': 'org-openroadm-interfaces:otnOdu',
674                         'supporting-port': 'CP1-SFP4-P1'}
675         input_dict_2 = {
676             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
677             'rate': 'org-openroadm-otn-common-types:ODU2e',
678             'monitoring-mode': 'terminated'}
679
680         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
681                              res['interface'][0])
682         self.assertDictEqual(dict(input_dict_2,
683                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
684                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
685         self.assertDictEqual(
686             {u'payload-type': u'03', u'exp-payload-type': u'03'},
687             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
688
689     def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
690         response = test_utils.check_netconf_node_request("SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
691         self.assertEqual(response.status_code, requests.codes.ok)
692         res = response.json()
693         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
694                         'administrative-state': 'inService',
695                         'supporting-circuit-pack-name': 'CP1-CFP0',
696                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
697                         'type': 'org-openroadm-interfaces:otnOdu',
698                         'supporting-port': 'CP1-CFP0-P1'}
699         input_dict_2 = {
700             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
701             'rate': 'org-openroadm-otn-common-types:ODU2e',
702             'monitoring-mode': 'monitored'}
703
704         input_dict_3 = {'trib-port-number': 1}
705
706         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
707                              res['interface'][0])
708         self.assertDictEqual(dict(input_dict_2,
709                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
710                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
711         self.assertDictEqual(dict(input_dict_3,
712                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
713                                       'parent-odu-allocation']),
714                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
715             'parent-odu-allocation'])
716         self.assertIn(1,
717                       res['interface'][0][
718                           'org-openroadm-otn-odu-interfaces:odu'][
719                           'parent-odu-allocation']['trib-slots'])
720
721     def test_38_check_ODU2E_connection_spdrc(self):
722         response = test_utils.check_netconf_node_request(
723             "SPDR-SC1",
724             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
725         self.assertEqual(response.status_code, requests.codes.ok)
726         res = response.json()
727         input_dict_1 = {
728             'connection-name':
729             'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
730             'direction': 'bidirectional'
731         }
732
733         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
734                              res['odu-connection'][0])
735         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
736                              res['odu-connection'][0]['destination'])
737         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
738                              res['odu-connection'][0]['source'])
739
740     def test_39_check_otn_topo_links(self):
741         response = test_utils.get_otn_topo_request()
742         self.assertEqual(response.status_code, requests.codes.ok)
743         res = response.json()
744         nb_links = len(res['network'][0]['ietf-network-topology:link'])
745         self.assertEqual(nb_links, 4)
746         for link in res['network'][0]['ietf-network-topology:link']:
747             linkId = link['link-id']
748             if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
749                     linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
750                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
751                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
752
753     def test_40_check_otn_topo_tp(self):
754         response = test_utils.get_otn_topo_request()
755         res = response.json()
756         for node in res['network'][0]['node']:
757             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
758                 tpList = node['ietf-network-topology:termination-point']
759                 for tp in tpList:
760                     if (tp['tp-id'] == 'XPDR1-NETWORK1'):
761                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
762                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
763                         tsPoolList = [i for i in range(1, 9)]
764                         self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
765                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
766                         self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
767
768     def test_41_delete_10GE_service(self):
769         response = test_utils.service_delete_request("service1-10GE")
770         self.assertEqual(response.status_code, requests.codes.ok)
771         res = response.json()
772         self.assertIn('Renderer service delete in progress',
773                       res['output']['configuration-response-common']['response-message'])
774         time.sleep(20)
775
776     def test_42_check_service_list(self):
777         response = test_utils.get_service_list_request("")
778         self.assertEqual(response.status_code, requests.codes.ok)
779         res = response.json()
780         self.assertEqual(len(res['service-list']['services']), 2)
781         time.sleep(2)
782
783     def test_43_check_no_ODU2e_connection_spdra(self):
784         response = test_utils.check_netconf_node_request("SPDR-SA1", "")
785         self.assertEqual(response.status_code, requests.codes.ok)
786         res = response.json()
787         self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
788         time.sleep(1)
789
790     def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
791         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
792         self.assertEqual(response.status_code, requests.codes.not_found)
793
794     def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
795         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
796         self.assertEqual(response.status_code, requests.codes.not_found)
797
798     def test_46_check_no_interface_10GE_CLIENT_spdra(self):
799         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
800         self.assertEqual(response.status_code, requests.codes.not_found)
801
802     def test_47_check_otn_topo_links(self):
803         response = test_utils.get_otn_topo_request()
804         self.assertEqual(response.status_code, requests.codes.ok)
805         res = response.json()
806         nb_links = len(res['network'][0]['ietf-network-topology:link'])
807         self.assertEqual(nb_links, 4)
808         for link in res['network'][0]['ietf-network-topology:link']:
809             linkId = link['link-id']
810             if (linkId == 'ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1' or
811                     linkId == 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'):
812                 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
813                 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
814
815     def test_48_check_otn_topo_tp(self):
816         response = test_utils.get_otn_topo_request()
817         res = response.json()
818         for node in res['network'][0]['node']:
819             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
820                 tpList = node['ietf-network-topology:termination-point']
821                 for tp in tpList:
822                     if (tp['tp-id'] == 'XPDR1-NETWORK1'):
823                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
824                         self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
825                         self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
826
827     def test_49_delete_ODU4_service(self):
828         response = test_utils.service_delete_request("service1-ODU4")
829         self.assertEqual(response.status_code, requests.codes.ok)
830         res = response.json()
831         self.assertIn('Renderer service delete in progress',
832                       res['output']['configuration-response-common']['response-message'])
833         time.sleep(20)
834
835     def test_50_check_service_list(self):
836         response = test_utils.get_service_list_request("")
837         self.assertEqual(response.status_code, requests.codes.ok)
838         res = response.json()
839         self.assertEqual(len(res['service-list']['services']), 1)
840         time.sleep(2)
841
842     def test_51_check_no_interface_ODU4_spdra(self):
843         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
844         self.assertEqual(response.status_code, requests.codes.not_found)
845
846     def test_52_check_otn_topo_links(self):
847         self.test_22_check_otn_topo_otu4_links()
848
849     def test_53_check_otn_topo_tp(self):
850         response = test_utils.get_otn_topo_request()
851         res = response.json()
852         for node in res['network'][0]['node']:
853             if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
854                 tpList = node['ietf-network-topology:termination-point']
855                 for tp in tpList:
856                     if (tp['tp-id'] == 'XPDR1-NETWORK1'):
857                         xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
858                         self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
859                         self.assertNotIn('odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
860
861     def test_54_delete_OCH_OTU4_service(self):
862         response = test_utils.service_delete_request("service1-OCH-OTU4")
863         self.assertEqual(response.status_code, requests.codes.ok)
864         res = response.json()
865         self.assertIn('Renderer service delete in progress',
866                       res['output']['configuration-response-common']['response-message'])
867         time.sleep(20)
868
869     def test_55_get_no_service(self):
870         response = test_utils.get_service_list_request("")
871         self.assertEqual(response.status_code, requests.codes.not_found)
872         res = response.json()
873         self.assertIn(
874             {"error-type": "application", "error-tag": "data-missing",
875              "error-message": "Request could not be completed because the relevant data model content does not exist"},
876             res['errors']['error'])
877         time.sleep(1)
878
879     def test_56_check_no_interface_OTU4_spdra(self):
880         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
881         self.assertEqual(response.status_code, requests.codes.not_found)
882
883     def test_57_check_no_interface_OCH_spdra(self):
884         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
885         self.assertEqual(response.status_code, requests.codes.not_found)
886
887     def test_58_getLinks_OtnTopology(self):
888         response = test_utils.get_otn_topo_request()
889         self.assertEqual(response.status_code, requests.codes.ok)
890         res = response.json()
891         self.assertNotIn('ietf-network-topology:link', res['network'][0])
892
893     def test_59_check_openroadm_topo_spdra(self):
894         response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
895         self.assertEqual(response.status_code, requests.codes.ok)
896         res = response.json()
897         tp = res['node'][0]['ietf-network-topology:termination-point'][0]
898         self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
899         self.assertNotIn('wavelength', dict.keys(
900             tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
901         time.sleep(3)
902
903     def test_60_check_openroadm_topo_ROADMA_SRG(self):
904         response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
905         self.assertEqual(response.status_code, requests.codes.ok)
906         res = response.json()
907         self.assertIn({u'index': 1},
908                       res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
909         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
910         for ele in liste_tp:
911             if ele['tp-id'] == 'SRG1-PP1-TXRX':
912                 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
913         time.sleep(3)
914
915     def test_61_check_openroadm_topo_ROADMA_DEG(self):
916         response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
917         self.assertEqual(response.status_code, requests.codes.ok)
918         res = response.json()
919         self.assertIn({u'index': 1},
920                       res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
921         liste_tp = res['node'][0]['ietf-network-topology:termination-point']
922         for ele in liste_tp:
923             if ele['tp-id'] == 'DEG2-CTP-TXRX':
924                 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
925             if ele['tp-id'] == 'DEG2-TTP-TXRX':
926                 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
927         time.sleep(3)
928
929     def test_62_disconnect_spdrA(self):
930         response = test_utils.unmount_device("SPDR-SA1")
931         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
932
933     def test_63_disconnect_spdrC(self):
934         response = test_utils.unmount_device("SPDR-SC1")
935         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
936
937     def test_64_disconnect_roadmA(self):
938         response = test_utils.unmount_device("ROADM-A1")
939         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
940
941     def test_65_disconnect_roadmC(self):
942         response = test_utils.unmount_device("ROADM-C1")
943         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
944
945
946 if __name__ == "__main__":
947     unittest.main(verbosity=2)