Merge "Improve OpenRoadmOtnTopology and add Junit tests"
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24
25 def extract_a_from_b(a, b):
26     return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     honeynode_process1 = None
32     odl_process = None
33     restconf_baseurl = "http://localhost:8181/restconf"
34
35     @classmethod
36     def setUpClass(cls):
37         print("starting honeynode1...")
38         cls.honeynode_process1 = test_utils.start_spdra_honeynode()
39         time.sleep(30)
40
41         print("starting opendaylight...")
42         cls.odl_process = test_utils.start_tpce()
43         time.sleep(60)
44         print("opendaylight started")
45
46     @classmethod
47     def tearDownClass(cls):
48         for child in psutil.Process(cls.odl_process.pid).children():
49             child.send_signal(signal.SIGINT)
50             child.wait()
51         cls.odl_process.send_signal(signal.SIGINT)
52         cls.odl_process.wait()
53         for child in psutil.Process(cls.honeynode_process1.pid).children():
54             child.send_signal(signal.SIGINT)
55             child.wait()
56         cls.honeynode_process1.send_signal(signal.SIGINT)
57         cls.honeynode_process1.wait()
58
59     def setUp(self):
60         time.sleep(5)
61
62     def test_01_connect_SPDR_SA1(self):
63         url = ("{}/config/network-topology:"
64                "network-topology/topology/topology-netconf/node/SPDR-SA1"
65                .format(self.restconf_baseurl))
66         data = {"node": [{
67             "node-id": "SPDR-SA1",
68             "netconf-node-topology:username": "admin",
69             "netconf-node-topology:password": "admin",
70             "netconf-node-topology:host": "127.0.0.1",
71             "netconf-node-topology:port": "17845",
72             "netconf-node-topology:tcp-only": "false",
73             "netconf-node-topology:pass-through": {}}]}
74         headers = {'content-type': 'application/json'}
75         response = requests.request(
76             "PUT", url, data=json.dumps(data), headers=headers,
77             auth=('admin', 'admin'))
78         self.assertEqual(response.status_code, requests.codes.created)
79         time.sleep(10)
80         url = ("{}/operational/network-topology:"
81                "network-topology/topology/topology-netconf/node/SPDR-SA1"
82                .format(self.restconf_baseurl))
83         response = requests.request(
84             "GET", url, headers=headers, auth=('admin', 'admin'))
85         self.assertEqual(response.status_code, requests.codes.ok)
86         res = response.json()
87         self.assertEqual(
88             res['node'][0]['netconf-node-topology:connection-status'],
89             'connected')
90
91     def test_02_get_portmapping_CLIENT1(self):
92         url = ("{}/config/transportpce-portmapping:network/"
93                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
94                .format(self.restconf_baseurl))
95         headers = {'content-type': 'application/json'}
96         response = requests.request(
97             "GET", url, headers=headers, auth=('admin', 'admin'))
98         self.assertEqual(response.status_code, requests.codes.ok)
99         res = response.json()
100         self.assertIn(
101             {'supported-interface-capability': [
102                 'org-openroadm-port-types:if-10GE-ODU2e',
103                 'org-openroadm-port-types:if-10GE-ODU2',
104                 'org-openroadm-port-types:if-10GE'],
105              'supporting-port': 'CP1-SFP4-P1',
106              'supporting-circuit-pack-name': 'CP1-SFP4',
107              'logical-connection-point': 'XPDR1-CLIENT1',
108              'port-direction': 'bidirectional',
109              'port-qual': 'xpdr-client',
110              'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
111             res['mapping'])
112
113     def test_03_get_portmapping_NETWORK1(self):
114         url = ("{}/config/transportpce-portmapping:network/"
115                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
116                .format(self.restconf_baseurl))
117         headers = {'content-type': 'application/json'}
118         response = requests.request(
119             "GET", url, headers=headers, auth=('admin', 'admin'))
120         self.assertEqual(response.status_code, requests.codes.ok)
121         res = response.json()
122         self.assertIn(
123             {"logical-connection-point": "XPDR1-NETWORK1",
124              "supporting-port": "CP1-CFP0-P1",
125              "supported-interface-capability": [
126                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
127              ],
128                 "port-direction": "bidirectional",
129                 "port-qual": "xpdr-network",
130                 "supporting-circuit-pack-name": "CP1-CFP0",
131                 "xponder-type": "mpdr",
132              'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
133             res['mapping'])
134
135     def test_04_service_path_create_OCH_OTU4(self):
136         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
137         data = {"renderer:input": {
138             "service-name": "service_ODU4",
139             "wave-number": "1",
140             "modulation-format": "qpsk",
141             "operation": "create",
142             "nodes": [
143                 {"node-id": "SPDR-SA1",
144                  "dest-tp": "XPDR1-NETWORK1"}]}}
145         headers = {'content-type': 'application/json'}
146         response = requests.request(
147             "POST", url, data=json.dumps(data),
148             headers=headers, auth=('admin', 'admin'))
149         time.sleep(3)
150         self.assertEqual(response.status_code, requests.codes.ok)
151         res = response.json()
152         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
153         self.assertTrue(res["output"]["success"])
154         self.assertIn(
155             {'node-id': 'SPDR-SA1',
156              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
157              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
158
159     def test_05_get_portmapping_NETWORK1(self):
160         url = ("{}/config/transportpce-portmapping:network/"
161                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
162                .format(self.restconf_baseurl))
163         headers = {'content-type': 'application/json'}
164         response = requests.request(
165             "GET", url, headers=headers, auth=('admin', 'admin'))
166         self.assertEqual(response.status_code, requests.codes.ok)
167         res = response.json()
168         self.assertIn(
169             {"logical-connection-point": "XPDR1-NETWORK1",
170              "supporting-port": "CP1-CFP0-P1",
171              "supported-interface-capability": [
172                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
173              ],
174                 "port-direction": "bidirectional",
175                 "port-qual": "xpdr-network",
176                 "supporting-circuit-pack-name": "CP1-CFP0",
177                 "xponder-type": "mpdr",
178                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
179             res['mapping'])
180
181     def test_06_check_interface_och(self):
182         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
183                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
184                "interface/XPDR1-NETWORK1-1"
185                .format(self.restconf_baseurl))
186         headers = {'content-type': 'application/json'}
187         response = requests.request(
188             "GET", url, headers=headers, auth=('admin', 'admin'))
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191
192         input_dict = {'name': 'XPDR1-NETWORK1-1',
193                       'administrative-state': 'inService',
194                       'supporting-circuit-pack-name': 'CP1-CFP0',
195                       'type': 'org-openroadm-interfaces:opticalChannel',
196                       'supporting-port': 'CP1-CFP0-P1'
197                       }
198         # assertDictContainsSubset is deprecated
199         '''
200         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
201                                        'supporting-circuit-pack-name': 'CP1-CFP0',
202                                        'type': 'org-openroadm-interfaces:opticalChannel',
203                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
204         '''
205         self.assertDictEqual(input_dict,
206                              extract_a_from_b(input_dict,
207                                               res['interface'][0])
208                              )
209         self.assertDictEqual(
210             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
211              u'transmit-power': -5},
212             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
213
214     def test_07_check_interface_OTU(self):
215         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
216                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
217                "interface/XPDR1-NETWORK1-OTU"
218                .format(self.restconf_baseurl))
219         headers = {'content-type': 'application/json'}
220         response = requests.request(
221             "GET", url, headers=headers, auth=('admin', 'admin'))
222         self.assertEqual(response.status_code, requests.codes.ok)
223         res = response.json()
224         input_dict = {'name': 'XPDR1-NETWORK1-OTU',
225                       'administrative-state': 'inService',
226                       'supporting-circuit-pack-name': 'CP1-CFP0',
227                       'supporting-interface': 'XPDR1-NETWORK1-1',
228                       'type': 'org-openroadm-interfaces:otnOtu',
229                       'supporting-port': 'CP1-CFP0-P1'}
230
231         # assertDictContainsSubset is deprecated
232         '''
233         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
234                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
235                                        'type': 'org-openroadm-interfaces:otnOtu',
236                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
237         '''
238         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
239                                                           res['interface'][0])
240                              )
241
242         self.assertDictEqual(
243             {u'rate': u'org-openroadm-otn-common-types:OTU4',
244              u'fec': u'scfec'},
245             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
246
247     def test_08_otn_service_path_create_ODU4(self):
248         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
249         data = {"renderer:input": {
250             "service-name": "service_ODU4",
251             "operation": "create",
252             "service-rate": "100G",
253             "service-type": "ODU",
254             "nodes": [
255                 {"node-id": "SPDR-SA1",
256                  "network-tp": "XPDR1-NETWORK1"}]}}
257         headers = {'content-type': 'application/json'}
258         response = requests.request(
259             "POST", url, data=json.dumps(data),
260             headers=headers, auth=('admin', 'admin'))
261         time.sleep(3)
262         self.assertEqual(response.status_code, requests.codes.ok)
263         res = response.json()
264         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
265         self.assertTrue(res["output"]["success"])
266         self.assertIn(
267             {'node-id': 'SPDR-SA1',
268              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
269
270     def test_09_get_portmapping_NETWORK1(self):
271         url = ("{}/config/transportpce-portmapping:network/"
272                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
273                .format(self.restconf_baseurl))
274         headers = {'content-type': 'application/json'}
275         response = requests.request(
276             "GET", url, headers=headers, auth=('admin', 'admin'))
277         self.assertEqual(response.status_code, requests.codes.ok)
278         res = response.json()
279         self.assertIn(
280             {"logical-connection-point": "XPDR1-NETWORK1",
281              "supporting-port": "CP1-CFP0-P1",
282              "supported-interface-capability": [
283                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
284              ],
285                 "port-direction": "bidirectional",
286                 "port-qual": "xpdr-network",
287                 "supporting-circuit-pack-name": "CP1-CFP0",
288                 "xponder-type": "mpdr",
289                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
290                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
291              },
292             res['mapping'])
293
294     def test_10_check_interface_ODU4(self):
295         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
296                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
297                "interface/XPDR1-NETWORK1-ODU4"
298                .format(self.restconf_baseurl))
299         headers = {'content-type': 'application/json'}
300         response = requests.request(
301             "GET", url, headers=headers, auth=('admin', 'admin'))
302         self.assertEqual(response.status_code, requests.codes.ok)
303         res = response.json()
304         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
305                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
306                         'type': 'org-openroadm-interfaces:otnOdu',
307                         'supporting-port': 'CP1-CFP0-P1'}
308         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
309                         'rate': 'org-openroadm-otn-common-types:ODU4'}
310
311         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
312                                                             res['interface'][0])
313                              )
314         self.assertDictEqual(input_dict_2,
315                              extract_a_from_b(input_dict_2,
316                                               res['interface'][0][
317                                                   'org-openroadm-otn-odu-interfaces:odu'])
318
319                              )
320         '''
321         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
322                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
323                                        'type': 'org-openroadm-interfaces:otnOdu',
324                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
325         self.assertDictContainsSubset(
326             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
327              'rate': 'org-openroadm-otn-common-types:ODU4'},
328             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
329         '''
330         self.assertDictEqual(
331             {u'payload-type': u'21', u'exp-payload-type': u'21'},
332             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
333
334     def test_11_otn_service_path_create_10GE(self):
335         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
336         data = {"renderer:input": {
337             "service-name": "service1",
338             "operation": "create",
339             "service-rate": "10G",
340             "service-type": "Ethernet",
341             "ethernet-encoding": "eth encode",
342             "trib-slot": ["1"],
343             "trib-port-number": "1",
344             "nodes": [
345                 {"node-id": "SPDR-SA1",
346                  "client-tp": "XPDR1-CLIENT1",
347                  "network-tp": "XPDR1-NETWORK1"}]}}
348         headers = {'content-type': 'application/json'}
349         response = requests.request(
350             "POST", url, data=json.dumps(data),
351             headers=headers, auth=('admin', 'admin'))
352         time.sleep(3)
353         self.assertEqual(response.status_code, requests.codes.ok)
354         res = response.json()
355         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
356         self.assertTrue(res["output"]["success"])
357         self.assertIn(
358             {'node-id': 'SPDR-SA1',
359              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
360              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
361              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
362
363     def test_12_check_interface_10GE_CLIENT(self):
364         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
365                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
366                "interface/XPDR1-CLIENT1-ETHERNET10G"
367                .format(self.restconf_baseurl))
368         headers = {'content-type': 'application/json'}
369         response = requests.request(
370             "GET", url, headers=headers, auth=('admin', 'admin'))
371         self.assertEqual(response.status_code, requests.codes.ok)
372         res = response.json()
373         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
374                       'administrative-state': 'inService',
375                       'supporting-circuit-pack-name': 'CP1-SFP4',
376                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
377                       'supporting-port': 'CP1-SFP4-P1'
378                       }
379
380         '''
381         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
382                                        'supporting-circuit-pack-name': 'CP1-SFP4',
383                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
384                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
385         '''
386         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
387                                                           res['interface'][0])
388                              )
389         self.assertDictEqual(
390             {u'speed': 10000},
391             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
392
393     def test_13_check_interface_ODU2E_CLIENT(self):
394         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
395                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
396                "interface/XPDR1-CLIENT1-ODU2e-service1"
397                .format(self.restconf_baseurl))
398         headers = {'content-type': 'application/json'}
399         response = requests.request(
400             "GET", url, headers=headers, auth=('admin', 'admin'))
401         self.assertEqual(response.status_code, requests.codes.ok)
402         res = response.json()
403
404         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
405                         'administrative-state': 'inService',
406                         'supporting-circuit-pack-name': 'CP1-SFP4',
407                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
408                         'type': 'org-openroadm-interfaces:otnOdu',
409                         'supporting-port': 'CP1-SFP4-P1'}
410         input_dict_2 = {
411             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
412             'rate': 'org-openroadm-otn-common-types:ODU2e',
413             'monitoring-mode': 'terminated'}
414
415         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
416                                                             res['interface'][0])
417                              )
418         self.assertDictEqual(input_dict_2,
419                              extract_a_from_b(input_dict_2, res['interface'][0][
420                                  'org-openroadm-otn-odu-interfaces:odu'])
421                              )
422
423         '''
424         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
425                                        'supporting-circuit-pack-name': 'CP1-SFP4',
426                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
427                                        'type': 'org-openroadm-interfaces:otnOdu',
428                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
429         self.assertDictContainsSubset({
430             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
431             'rate': 'org-openroadm-otn-common-types:ODU2e',
432             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
433         '''
434         self.assertDictEqual(
435             {u'payload-type': u'03', u'exp-payload-type': u'03'},
436             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
437
438     def test_14_check_interface_ODU2E_NETWORK(self):
439         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
440                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
441                "interface/XPDR1-NETWORK1-ODU2e-service1"
442                .format(self.restconf_baseurl))
443         headers = {'content-type': 'application/json'}
444         response = requests.request(
445             "GET", url, headers=headers, auth=('admin', 'admin'))
446         self.assertEqual(response.status_code, requests.codes.ok)
447         res = response.json()
448         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
449                         'supporting-circuit-pack-name': 'CP1-CFP0',
450                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
451                         'type': 'org-openroadm-interfaces:otnOdu',
452                         'supporting-port': 'CP1-CFP0-P1'}
453         input_dict_2 = {
454             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
455             'rate': 'org-openroadm-otn-common-types:ODU2e',
456             'monitoring-mode': 'monitored'}
457
458         input_dict_3 = {'trib-port-number': 1}
459
460         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
461                                                             res['interface'][0])
462                              )
463
464         self.assertDictEqual(input_dict_2,
465                              extract_a_from_b(input_dict_2,
466                                               res['interface'][0][
467                                                   'org-openroadm-otn-odu-interfaces:odu']
468                                               ))
469
470         self.assertDictEqual(input_dict_3,
471                              extract_a_from_b(input_dict_3,
472                                               res['interface'][0][
473                                                   'org-openroadm-otn-odu-interfaces:odu'][
474                                                   'parent-odu-allocation']))
475
476         '''
477         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
478                                        'supporting-circuit-pack-name': 'CP1-CFP0',
479                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
480                                        'type': 'org-openroadm-interfaces:otnOdu',
481                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
482         self.assertDictContainsSubset({
483             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
484             'rate': 'org-openroadm-otn-common-types:ODU2e',
485             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
486         self.assertDictContainsSubset(
487             {'trib-port-number': 1},
488             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
489         '''
490         self.assertIn(1,
491                       res['interface'][0][
492                           'org-openroadm-otn-odu-interfaces:odu'][
493                           'parent-odu-allocation']['trib-slots'])
494
495     def test_15_check_ODU2E_connection(self):
496         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
497                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
498                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
499                .format(self.restconf_baseurl))
500         headers = {'content-type': 'application/json'}
501         response = requests.request(
502             "GET", url, headers=headers, auth=('admin', 'admin'))
503         self.assertEqual(response.status_code, requests.codes.ok)
504         res = response.json()
505         input_dict_1 = {
506             'connection-name':
507             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
508             'direction': 'bidirectional'
509         }
510
511         self.assertDictEqual(input_dict_1,
512                              extract_a_from_b(input_dict_1,
513                                               res['odu-connection'][0]))
514         '''
515         self.assertDictContainsSubset({
516             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
517             'direction': 'bidirectional'},
518             res['odu-connection'][0])
519         '''
520         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
521                              res['odu-connection'][0]['destination'])
522         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
523                              res['odu-connection'][0]['source'])
524
525     def test_16_otn_service_path_delete_10GE(self):
526         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
527         data = {"renderer:input": {
528             "service-name": "service1",
529             "operation": "delete",
530             "service-rate": "10G",
531             "service-type": "Ethernet",
532             "ethernet-encoding": "eth encode",
533             "trib-slot": ["1"],
534             "trib-port-number": "1",
535             "nodes": [
536                 {"node-id": "SPDR-SA1",
537                  "client-tp": "XPDR1-CLIENT1",
538                  "network-tp": "XPDR1-NETWORK1"}]}}
539         headers = {'content-type': 'application/json'}
540         response = requests.request(
541             "POST", url, data=json.dumps(data),
542             headers=headers, auth=('admin', 'admin'))
543         time.sleep(3)
544         self.assertEqual(response.status_code, requests.codes.ok)
545         res = response.json()
546         self.assertIn('Request processed', res["output"]["result"])
547         self.assertTrue(res["output"]["success"])
548
549     def test_17_check_no_ODU2E_connection(self):
550         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
551                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
552                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
553                .format(self.restconf_baseurl))
554         headers = {'content-type': 'application/json'}
555         response = requests.request(
556             "GET", url, headers=headers, auth=('admin', 'admin'))
557         self.assertEqual(response.status_code, requests.codes.not_found)
558
559     def test_18_check_no_interface_ODU2E_NETWORK(self):
560         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
561                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
562                "interface/XPDR1-NETWORK1-ODU2e-service1"
563                .format(self.restconf_baseurl))
564         headers = {'content-type': 'application/json'}
565         response = requests.request(
566             "GET", url, headers=headers, auth=('admin', 'admin'))
567         self.assertEqual(response.status_code, requests.codes.not_found)
568
569     def test_19_check_no_interface_ODU2E_CLIENT(self):
570         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
571                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
572                "interface/XPDR1-CLIENT1-ODU2e-service1"
573                .format(self.restconf_baseurl))
574         headers = {'content-type': 'application/json'}
575         response = requests.request(
576             "GET", url, headers=headers, auth=('admin', 'admin'))
577         self.assertEqual(response.status_code, requests.codes.not_found)
578
579     def test_20_check_no_interface_10GE_CLIENT(self):
580         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
581                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
582                "interface/XPDR1-CLIENT1-ETHERNET10G"
583                .format(self.restconf_baseurl))
584         headers = {'content-type': 'application/json'}
585         response = requests.request(
586             "GET", url, headers=headers, auth=('admin', 'admin'))
587         self.assertEqual(response.status_code, requests.codes.not_found)
588
589     def test_21_otn_service_path_delete_ODU4(self):
590         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
591         data = {"renderer:input": {
592             "service-name": "service_ODU4",
593             "operation": "delete",
594             "service-rate": "100G",
595             "service-type": "ODU",
596             "nodes": [
597                 {"node-id": "SPDR-SA1",
598                  "network-tp": "XPDR1-NETWORK1"}]}}
599         headers = {'content-type': 'application/json'}
600         response = requests.request(
601             "POST", url, data=json.dumps(data),
602             headers=headers, auth=('admin', 'admin'))
603         time.sleep(3)
604         self.assertEqual(response.status_code, requests.codes.ok)
605         res = response.json()
606         self.assertIn('Request processed', res["output"]["result"])
607         self.assertTrue(res["output"]["success"])
608
609     def test_22_check_no_interface_ODU4(self):
610         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
611                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
612                "interface/XPDR1-NETWORK1-ODU4"
613                .format(self.restconf_baseurl))
614         headers = {'content-type': 'application/json'}
615         response = requests.request(
616             "GET", url, headers=headers, auth=('admin', 'admin'))
617         self.assertEqual(response.status_code, requests.codes.not_found)
618
619     def test_23_service_path_delete_OCH_OTU4(self):
620         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
621         data = {"renderer:input": {
622             "service-name": "service_OTU4",
623             "wave-number": "1",
624             "modulation-format": "qpsk",
625             "operation": "delete",
626             "nodes": [
627                 {"node-id": "SPDR-SA1",
628                  "dest-tp": "XPDR1-NETWORK1"}]}}
629         headers = {'content-type': 'application/json'}
630         response = requests.request(
631             "POST", url, data=json.dumps(data),
632             headers=headers, auth=('admin', 'admin'))
633         time.sleep(3)
634         self.assertEqual(response.status_code, requests.codes.ok)
635         res = response.json()
636         self.assertIn('Request processed', res["output"]["result"])
637         self.assertTrue(res["output"]["success"])
638
639     def test_24_check_no_interface_OTU4(self):
640         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
641                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
642                "interface/XPDR1-NETWORK1-OTU"
643                .format(self.restconf_baseurl))
644         headers = {'content-type': 'application/json'}
645         response = requests.request(
646             "GET", url, headers=headers, auth=('admin', 'admin'))
647         self.assertEqual(response.status_code, requests.codes.not_found)
648
649     def test_25_check_no_interface_OCH(self):
650         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
651                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
652                "interface/XPDR1-NETWORK1-1"
653                .format(self.restconf_baseurl))
654         headers = {'content-type': 'application/json'}
655         response = requests.request(
656             "GET", url, headers=headers, auth=('admin', 'admin'))
657         self.assertEqual(response.status_code, requests.codes.not_found)
658
659     def test_26_disconnect_SPDR_SA1(self):
660         url = ("{}/config/network-topology:"
661                "network-topology/topology/topology-netconf/node/SPDR-SA1"
662                .format(self.restconf_baseurl))
663         data = {}
664         headers = {'content-type': 'application/json'}
665         response = requests.request(
666             "DELETE", url, data=json.dumps(data), headers=headers,
667             auth=('admin', 'admin'))
668         self.assertEqual(response.status_code, requests.codes.ok)
669
670
671 if __name__ == "__main__":
672     unittest.main(verbosity=2)