fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 def extract_a_from_b(a, b):
20     return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
21
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     processes = None
26
27     @classmethod
28     def setUpClass(cls):
29         cls.processes = test_utils.start_tpce()
30         cls.processes = test_utils.start_sims(['spdrav2'])
31
32     @classmethod
33     def tearDownClass(cls):
34         for process in cls.processes:
35             test_utils.shutdown_process(process)
36         print("all processes killed")
37
38     def setUp(self):
39         time.sleep(5)
40
41     def test_01_connect_SPDR_SA1(self):
42         response = test_utils.mount_device("SPDR-SA1", 'spdrav2')
43         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
44         time.sleep(10)
45
46         url = ("{}/operational/network-topology:"
47                "network-topology/topology/topology-netconf/node/SPDR-SA1"
48                .format(test_utils.RESTCONF_BASE_URL))
49         response = requests.request(
50             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
51         self.assertEqual(response.status_code, requests.codes.ok)
52         res = response.json()
53         self.assertEqual(
54             res['node'][0]['netconf-node-topology:connection-status'],
55             'connected')
56
57     def test_02_get_portmapping_CLIENT1(self):
58         url = ("{}/config/transportpce-portmapping:network/"
59                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
60                .format(test_utils.RESTCONF_BASE_URL))
61         response = requests.request(
62             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
63         self.assertEqual(response.status_code, requests.codes.ok)
64         res = response.json()
65         self.assertIn(
66             {'supported-interface-capability': [
67                 'org-openroadm-port-types:if-10GE-ODU2e',
68                 'org-openroadm-port-types:if-10GE-ODU2',
69                 'org-openroadm-port-types:if-10GE'],
70              'supporting-port': 'CP1-SFP4-P1',
71              'supporting-circuit-pack-name': 'CP1-SFP4',
72              'logical-connection-point': 'XPDR1-CLIENT1',
73              'port-direction': 'bidirectional',
74              'port-qual': 'xpdr-client',
75              'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
76             res['mapping'])
77
78     def test_03_get_portmapping_NETWORK1(self):
79         url = ("{}/config/transportpce-portmapping:network/"
80                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
81                .format(test_utils.RESTCONF_BASE_URL))
82         response = requests.request(
83             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
84         self.assertEqual(response.status_code, requests.codes.ok)
85         res = response.json()
86         self.assertIn(
87             {"logical-connection-point": "XPDR1-NETWORK1",
88              "supporting-port": "CP1-CFP0-P1",
89              "supported-interface-capability": [
90                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
91              ],
92                 "port-direction": "bidirectional",
93                 "port-qual": "xpdr-network",
94                 "supporting-circuit-pack-name": "CP1-CFP0",
95                 "xponder-type": "mpdr",
96              'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
97             res['mapping'])
98
99     def test_04_service_path_create_OCH_OTU4(self):
100         url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
101         data = {"renderer:input": {
102             "service-name": "service_ODU4",
103             "wave-number": "1",
104             "modulation-format": "qpsk",
105             "operation": "create",
106             "nodes": [
107                 {"node-id": "SPDR-SA1",
108                  "dest-tp": "XPDR1-NETWORK1"}]}}
109         response = requests.request(
110             "POST", url, data=json.dumps(data),
111             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
112         time.sleep(3)
113         self.assertEqual(response.status_code, requests.codes.ok)
114         res = response.json()
115         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
116         self.assertTrue(res["output"]["success"])
117         self.assertIn(
118             {'node-id': 'SPDR-SA1',
119              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
120              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
121
122     def test_05_get_portmapping_NETWORK1(self):
123         url = ("{}/config/transportpce-portmapping:network/"
124                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
125                .format(test_utils.RESTCONF_BASE_URL))
126         response = requests.request(
127             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
128         self.assertEqual(response.status_code, requests.codes.ok)
129         res = response.json()
130         self.assertIn(
131             {"logical-connection-point": "XPDR1-NETWORK1",
132              "supporting-port": "CP1-CFP0-P1",
133              "supported-interface-capability": [
134                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
135              ],
136                 "port-direction": "bidirectional",
137                 "port-qual": "xpdr-network",
138                 "supporting-circuit-pack-name": "CP1-CFP0",
139                 "xponder-type": "mpdr",
140                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
141             res['mapping'])
142
143     def test_06_check_interface_och(self):
144         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
145                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
146                "interface/XPDR1-NETWORK1-1"
147                .format(test_utils.RESTCONF_BASE_URL))
148         response = requests.request(
149             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
150         self.assertEqual(response.status_code, requests.codes.ok)
151         res = response.json()
152
153         input_dict = {'name': 'XPDR1-NETWORK1-1',
154                       'administrative-state': 'inService',
155                       'supporting-circuit-pack-name': 'CP1-CFP0',
156                       'type': 'org-openroadm-interfaces:opticalChannel',
157                       'supporting-port': 'CP1-CFP0-P1'
158                       }
159         # assertDictContainsSubset is deprecated
160         '''
161         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
162                                        'supporting-circuit-pack-name': 'CP1-CFP0',
163                                        'type': 'org-openroadm-interfaces:opticalChannel',
164                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
165         '''
166         self.assertDictEqual(input_dict,
167                              extract_a_from_b(input_dict,
168                                               res['interface'][0])
169                              )
170         self.assertDictEqual(
171             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
172              u'transmit-power': -5},
173             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
174
175     def test_07_check_interface_OTU(self):
176         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
177                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
178                "interface/XPDR1-NETWORK1-OTU"
179                .format(test_utils.RESTCONF_BASE_URL))
180         response = requests.request(
181             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
182         self.assertEqual(response.status_code, requests.codes.ok)
183         res = response.json()
184         input_dict = {'name': 'XPDR1-NETWORK1-OTU',
185                       'administrative-state': 'inService',
186                       'supporting-circuit-pack-name': 'CP1-CFP0',
187                       'supporting-interface': 'XPDR1-NETWORK1-1',
188                       'type': 'org-openroadm-interfaces:otnOtu',
189                       'supporting-port': 'CP1-CFP0-P1'}
190
191         # assertDictContainsSubset is deprecated
192         '''
193         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
194                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
195                                        'type': 'org-openroadm-interfaces:otnOtu',
196                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
197         '''
198         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
199                                                           res['interface'][0])
200                              )
201
202         self.assertDictEqual(
203             {u'rate': u'org-openroadm-otn-common-types:OTU4',
204              u'fec': u'scfec'},
205             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
206
207     def test_08_otn_service_path_create_ODU4(self):
208         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
209         data = {"renderer:input": {
210             "service-name": "service_ODU4",
211             "operation": "create",
212             "service-rate": "100G",
213             "service-type": "ODU",
214             "nodes": [
215                 {"node-id": "SPDR-SA1",
216                  "network-tp": "XPDR1-NETWORK1"}]}}
217         response = requests.request(
218             "POST", url, data=json.dumps(data),
219             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
220         time.sleep(3)
221         self.assertEqual(response.status_code, requests.codes.ok)
222         res = response.json()
223         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
224         self.assertTrue(res["output"]["success"])
225         self.assertIn(
226             {'node-id': 'SPDR-SA1',
227              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
228
229     def test_09_get_portmapping_NETWORK1(self):
230         url = ("{}/config/transportpce-portmapping:network/"
231                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
232                .format(test_utils.RESTCONF_BASE_URL))
233         response = requests.request(
234             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
235         self.assertEqual(response.status_code, requests.codes.ok)
236         res = response.json()
237         self.assertIn(
238             {"logical-connection-point": "XPDR1-NETWORK1",
239              "supporting-port": "CP1-CFP0-P1",
240              "supported-interface-capability": [
241                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
242              ],
243                 "port-direction": "bidirectional",
244                 "port-qual": "xpdr-network",
245                 "supporting-circuit-pack-name": "CP1-CFP0",
246                 "xponder-type": "mpdr",
247                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
248                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
249              },
250             res['mapping'])
251
252     def test_10_check_interface_ODU4(self):
253         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
254                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
255                "interface/XPDR1-NETWORK1-ODU4"
256                .format(test_utils.RESTCONF_BASE_URL))
257         response = requests.request(
258             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
259         self.assertEqual(response.status_code, requests.codes.ok)
260         res = response.json()
261         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
262                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
263                         'type': 'org-openroadm-interfaces:otnOdu',
264                         'supporting-port': 'CP1-CFP0-P1'}
265         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
266                         'rate': 'org-openroadm-otn-common-types:ODU4'}
267
268         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
269                                                             res['interface'][0])
270                              )
271         self.assertDictEqual(input_dict_2,
272                              extract_a_from_b(input_dict_2,
273                                               res['interface'][0][
274                                                   'org-openroadm-otn-odu-interfaces:odu'])
275
276                              )
277         '''
278         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
279                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
280                                        'type': 'org-openroadm-interfaces:otnOdu',
281                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
282         self.assertDictContainsSubset(
283             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
284              'rate': 'org-openroadm-otn-common-types:ODU4'},
285             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
286         '''
287         self.assertDictEqual(
288             {u'payload-type': u'21', u'exp-payload-type': u'21'},
289             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
290
291     def test_11_otn_service_path_create_10GE(self):
292         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
293         data = {"renderer:input": {
294             "service-name": "service1",
295             "operation": "create",
296             "service-rate": "10G",
297             "service-type": "Ethernet",
298             "ethernet-encoding": "eth encode",
299             "trib-slot": ["1"],
300             "trib-port-number": "1",
301             "nodes": [
302                 {"node-id": "SPDR-SA1",
303                  "client-tp": "XPDR1-CLIENT1",
304                  "network-tp": "XPDR1-NETWORK1"}]}}
305         response = requests.request(
306             "POST", url, data=json.dumps(data),
307             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
308         time.sleep(3)
309         self.assertEqual(response.status_code, requests.codes.ok)
310         res = response.json()
311         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
312         self.assertTrue(res["output"]["success"])
313         self.assertIn(
314             {'node-id': 'SPDR-SA1',
315              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
316              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
317              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
318
319     def test_12_check_interface_10GE_CLIENT(self):
320         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
321                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
322                "interface/XPDR1-CLIENT1-ETHERNET10G"
323                .format(test_utils.RESTCONF_BASE_URL))
324         response = requests.request(
325             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
326         self.assertEqual(response.status_code, requests.codes.ok)
327         res = response.json()
328         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
329                       'administrative-state': 'inService',
330                       'supporting-circuit-pack-name': 'CP1-SFP4',
331                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
332                       'supporting-port': 'CP1-SFP4-P1'
333                       }
334
335         '''
336         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
337                                        'supporting-circuit-pack-name': 'CP1-SFP4',
338                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
339                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
340         '''
341         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
342                                                           res['interface'][0])
343                              )
344         self.assertDictEqual(
345             {u'speed': 10000},
346             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
347
348     def test_13_check_interface_ODU2E_CLIENT(self):
349         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
350                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
351                "interface/XPDR1-CLIENT1-ODU2e-service1"
352                .format(test_utils.RESTCONF_BASE_URL))
353         response = requests.request(
354             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
355         self.assertEqual(response.status_code, requests.codes.ok)
356         res = response.json()
357
358         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
359                         'administrative-state': 'inService',
360                         'supporting-circuit-pack-name': 'CP1-SFP4',
361                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
362                         'type': 'org-openroadm-interfaces:otnOdu',
363                         'supporting-port': 'CP1-SFP4-P1'}
364         input_dict_2 = {
365             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
366             'rate': 'org-openroadm-otn-common-types:ODU2e',
367             'monitoring-mode': 'terminated'}
368
369         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
370                                                             res['interface'][0])
371                              )
372         self.assertDictEqual(input_dict_2,
373                              extract_a_from_b(input_dict_2, res['interface'][0][
374                                  'org-openroadm-otn-odu-interfaces:odu'])
375                              )
376
377         '''
378         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
379                                        'supporting-circuit-pack-name': 'CP1-SFP4',
380                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
381                                        'type': 'org-openroadm-interfaces:otnOdu',
382                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
383         self.assertDictContainsSubset({
384             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
385             'rate': 'org-openroadm-otn-common-types:ODU2e',
386             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
387         '''
388         self.assertDictEqual(
389             {u'payload-type': u'03', u'exp-payload-type': u'03'},
390             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
391
392     def test_14_check_interface_ODU2E_NETWORK(self):
393         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
394                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
395                "interface/XPDR1-NETWORK1-ODU2e-service1"
396                .format(test_utils.RESTCONF_BASE_URL))
397         response = requests.request(
398             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
399         self.assertEqual(response.status_code, requests.codes.ok)
400         res = response.json()
401         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
402                         'supporting-circuit-pack-name': 'CP1-CFP0',
403                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
404                         'type': 'org-openroadm-interfaces:otnOdu',
405                         'supporting-port': 'CP1-CFP0-P1'}
406         input_dict_2 = {
407             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
408             'rate': 'org-openroadm-otn-common-types:ODU2e',
409             'monitoring-mode': 'monitored'}
410
411         input_dict_3 = {'trib-port-number': 1}
412
413         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
414                                                             res['interface'][0])
415                              )
416
417         self.assertDictEqual(input_dict_2,
418                              extract_a_from_b(input_dict_2,
419                                               res['interface'][0][
420                                                   'org-openroadm-otn-odu-interfaces:odu']
421                                               ))
422
423         self.assertDictEqual(input_dict_3,
424                              extract_a_from_b(input_dict_3,
425                                               res['interface'][0][
426                                                   'org-openroadm-otn-odu-interfaces:odu'][
427                                                   'parent-odu-allocation']))
428
429         '''
430         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
431                                        'supporting-circuit-pack-name': 'CP1-CFP0',
432                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
433                                        'type': 'org-openroadm-interfaces:otnOdu',
434                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
435         self.assertDictContainsSubset({
436             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
437             'rate': 'org-openroadm-otn-common-types:ODU2e',
438             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
439         self.assertDictContainsSubset(
440             {'trib-port-number': 1},
441             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
442         '''
443         self.assertIn(1,
444                       res['interface'][0][
445                           'org-openroadm-otn-odu-interfaces:odu'][
446                           'parent-odu-allocation']['trib-slots'])
447
448     def test_15_check_ODU2E_connection(self):
449         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
451                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
452                .format(test_utils.RESTCONF_BASE_URL))
453         response = requests.request(
454             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
455         self.assertEqual(response.status_code, requests.codes.ok)
456         res = response.json()
457         input_dict_1 = {
458             'connection-name':
459             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
460             'direction': 'bidirectional'
461         }
462
463         self.assertDictEqual(input_dict_1,
464                              extract_a_from_b(input_dict_1,
465                                               res['odu-connection'][0]))
466         '''
467         self.assertDictContainsSubset({
468             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
469             'direction': 'bidirectional'},
470             res['odu-connection'][0])
471         '''
472         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
473                              res['odu-connection'][0]['destination'])
474         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
475                              res['odu-connection'][0]['source'])
476
477     def test_16_otn_service_path_delete_10GE(self):
478         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
479         data = {"renderer:input": {
480             "service-name": "service1",
481             "operation": "delete",
482             "service-rate": "10G",
483             "service-type": "Ethernet",
484             "ethernet-encoding": "eth encode",
485             "trib-slot": ["1"],
486             "trib-port-number": "1",
487             "nodes": [
488                 {"node-id": "SPDR-SA1",
489                  "client-tp": "XPDR1-CLIENT1",
490                  "network-tp": "XPDR1-NETWORK1"}]}}
491         response = requests.request(
492             "POST", url, data=json.dumps(data),
493             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
494         time.sleep(3)
495         self.assertEqual(response.status_code, requests.codes.ok)
496         res = response.json()
497         self.assertIn('Request processed', res["output"]["result"])
498         self.assertTrue(res["output"]["success"])
499
500     def test_17_check_no_ODU2E_connection(self):
501         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
502                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
503                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
504                .format(test_utils.RESTCONF_BASE_URL))
505         response = requests.request(
506             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
507         self.assertEqual(response.status_code, requests.codes.not_found)
508
509     def test_18_check_no_interface_ODU2E_NETWORK(self):
510         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
511                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
512                "interface/XPDR1-NETWORK1-ODU2e-service1"
513                .format(test_utils.RESTCONF_BASE_URL))
514         response = requests.request(
515             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
516         self.assertEqual(response.status_code, requests.codes.not_found)
517
518     def test_19_check_no_interface_ODU2E_CLIENT(self):
519         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
520                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
521                "interface/XPDR1-CLIENT1-ODU2e-service1"
522                .format(test_utils.RESTCONF_BASE_URL))
523         response = requests.request(
524             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
525         self.assertEqual(response.status_code, requests.codes.not_found)
526
527     def test_20_check_no_interface_10GE_CLIENT(self):
528         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
529                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
530                "interface/XPDR1-CLIENT1-ETHERNET10G"
531                .format(test_utils.RESTCONF_BASE_URL))
532         response = requests.request(
533             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
534         self.assertEqual(response.status_code, requests.codes.not_found)
535
536     def test_21_otn_service_path_delete_ODU4(self):
537         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
538         data = {"renderer:input": {
539             "service-name": "service_ODU4",
540             "operation": "delete",
541             "service-rate": "100G",
542             "service-type": "ODU",
543             "nodes": [
544                 {"node-id": "SPDR-SA1",
545                  "network-tp": "XPDR1-NETWORK1"}]}}
546         response = requests.request(
547             "POST", url, data=json.dumps(data),
548             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
549         time.sleep(3)
550         self.assertEqual(response.status_code, requests.codes.ok)
551         res = response.json()
552         self.assertIn('Request processed', res["output"]["result"])
553         self.assertTrue(res["output"]["success"])
554
555     def test_22_check_no_interface_ODU4(self):
556         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
557                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
558                "interface/XPDR1-NETWORK1-ODU4"
559                .format(test_utils.RESTCONF_BASE_URL))
560         response = requests.request(
561             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
562         self.assertEqual(response.status_code, requests.codes.not_found)
563
564     def test_23_service_path_delete_OCH_OTU4(self):
565         url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
566         data = {"renderer:input": {
567             "service-name": "service_OTU4",
568             "wave-number": "1",
569             "modulation-format": "qpsk",
570             "operation": "delete",
571             "nodes": [
572                 {"node-id": "SPDR-SA1",
573                  "dest-tp": "XPDR1-NETWORK1"}]}}
574         response = requests.request(
575             "POST", url, data=json.dumps(data),
576             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
577         time.sleep(3)
578         self.assertEqual(response.status_code, requests.codes.ok)
579         res = response.json()
580         self.assertIn('Request processed', res["output"]["result"])
581         self.assertTrue(res["output"]["success"])
582
583     def test_24_check_no_interface_OTU4(self):
584         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
585                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
586                "interface/XPDR1-NETWORK1-OTU"
587                .format(test_utils.RESTCONF_BASE_URL))
588         response = requests.request(
589             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
590         self.assertEqual(response.status_code, requests.codes.not_found)
591
592     def test_25_check_no_interface_OCH(self):
593         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
594                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
595                "interface/XPDR1-NETWORK1-1"
596                .format(test_utils.RESTCONF_BASE_URL))
597         response = requests.request(
598             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
599         self.assertEqual(response.status_code, requests.codes.not_found)
600
601     def test_26_disconnect_SPDR_SA1(self):
602         response = test_utils.unmount_device("SPDR-SA1")
603         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
604
605
606 if __name__ == "__main__":
607     unittest.main(verbosity=2)