rationalize functional tests sims starters
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24
25 def extract_a_from_b(a, b):
26     return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     sim_process1 = None
32     odl_process = None
33     restconf_baseurl = "http://localhost:8181/restconf"
34
35     @classmethod
36     def setUpClass(cls):
37         cls.sim_process1 = test_utils.start_sim('spdrav2')
38
39         cls.odl_process = test_utils.start_tpce()
40         time.sleep(60)
41         print("opendaylight started")
42
43     @classmethod
44     def tearDownClass(cls):
45         for child in psutil.Process(cls.odl_process.pid).children():
46             child.send_signal(signal.SIGINT)
47             child.wait()
48         cls.odl_process.send_signal(signal.SIGINT)
49         cls.odl_process.wait()
50         for child in psutil.Process(cls.sim_process1.pid).children():
51             child.send_signal(signal.SIGINT)
52             child.wait()
53         cls.sim_process1.send_signal(signal.SIGINT)
54         cls.sim_process1.wait()
55
56     def setUp(self):
57         time.sleep(5)
58
59     def test_01_connect_SPDR_SA1(self):
60         url = ("{}/config/network-topology:"
61                "network-topology/topology/topology-netconf/node/SPDR-SA1"
62                .format(self.restconf_baseurl))
63         data = {"node": [{
64             "node-id": "SPDR-SA1",
65             "netconf-node-topology:username": "admin",
66             "netconf-node-topology:password": "admin",
67             "netconf-node-topology:host": "127.0.0.1",
68             "netconf-node-topology:port": test_utils.sims['spdrav2']['port'],
69             "netconf-node-topology:tcp-only": "false",
70             "netconf-node-topology:pass-through": {}}]}
71         headers = {'content-type': 'application/json'}
72         response = requests.request(
73             "PUT", url, data=json.dumps(data), headers=headers,
74             auth=('admin', 'admin'))
75         self.assertEqual(response.status_code, requests.codes.created)
76         time.sleep(10)
77         url = ("{}/operational/network-topology:"
78                "network-topology/topology/topology-netconf/node/SPDR-SA1"
79                .format(self.restconf_baseurl))
80         response = requests.request(
81             "GET", url, headers=headers, auth=('admin', 'admin'))
82         self.assertEqual(response.status_code, requests.codes.ok)
83         res = response.json()
84         self.assertEqual(
85             res['node'][0]['netconf-node-topology:connection-status'],
86             'connected')
87
88     def test_02_get_portmapping_CLIENT1(self):
89         url = ("{}/config/transportpce-portmapping:network/"
90                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
91                .format(self.restconf_baseurl))
92         headers = {'content-type': 'application/json'}
93         response = requests.request(
94             "GET", url, headers=headers, auth=('admin', 'admin'))
95         self.assertEqual(response.status_code, requests.codes.ok)
96         res = response.json()
97         self.assertIn(
98             {'supported-interface-capability': [
99                 'org-openroadm-port-types:if-10GE-ODU2e',
100                 'org-openroadm-port-types:if-10GE-ODU2',
101                 'org-openroadm-port-types:if-10GE'],
102              'supporting-port': 'CP1-SFP4-P1',
103              'supporting-circuit-pack-name': 'CP1-SFP4',
104              'logical-connection-point': 'XPDR1-CLIENT1',
105              'port-direction': 'bidirectional',
106              'port-qual': 'xpdr-client',
107              'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
108             res['mapping'])
109
110     def test_03_get_portmapping_NETWORK1(self):
111         url = ("{}/config/transportpce-portmapping:network/"
112                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
113                .format(self.restconf_baseurl))
114         headers = {'content-type': 'application/json'}
115         response = requests.request(
116             "GET", url, headers=headers, auth=('admin', 'admin'))
117         self.assertEqual(response.status_code, requests.codes.ok)
118         res = response.json()
119         self.assertIn(
120             {"logical-connection-point": "XPDR1-NETWORK1",
121              "supporting-port": "CP1-CFP0-P1",
122              "supported-interface-capability": [
123                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
124              ],
125                 "port-direction": "bidirectional",
126                 "port-qual": "xpdr-network",
127                 "supporting-circuit-pack-name": "CP1-CFP0",
128                 "xponder-type": "mpdr",
129              'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
130             res['mapping'])
131
132     def test_04_service_path_create_OCH_OTU4(self):
133         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
134         data = {"renderer:input": {
135             "service-name": "service_ODU4",
136             "wave-number": "1",
137             "modulation-format": "qpsk",
138             "operation": "create",
139             "nodes": [
140                 {"node-id": "SPDR-SA1",
141                  "dest-tp": "XPDR1-NETWORK1"}]}}
142         headers = {'content-type': 'application/json'}
143         response = requests.request(
144             "POST", url, data=json.dumps(data),
145             headers=headers, auth=('admin', 'admin'))
146         time.sleep(3)
147         self.assertEqual(response.status_code, requests.codes.ok)
148         res = response.json()
149         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
150         self.assertTrue(res["output"]["success"])
151         self.assertIn(
152             {'node-id': 'SPDR-SA1',
153              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
154              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
155
156     def test_05_get_portmapping_NETWORK1(self):
157         url = ("{}/config/transportpce-portmapping:network/"
158                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
159                .format(self.restconf_baseurl))
160         headers = {'content-type': 'application/json'}
161         response = requests.request(
162             "GET", url, headers=headers, auth=('admin', 'admin'))
163         self.assertEqual(response.status_code, requests.codes.ok)
164         res = response.json()
165         self.assertIn(
166             {"logical-connection-point": "XPDR1-NETWORK1",
167              "supporting-port": "CP1-CFP0-P1",
168              "supported-interface-capability": [
169                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
170              ],
171                 "port-direction": "bidirectional",
172                 "port-qual": "xpdr-network",
173                 "supporting-circuit-pack-name": "CP1-CFP0",
174                 "xponder-type": "mpdr",
175                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
176             res['mapping'])
177
178     def test_06_check_interface_och(self):
179         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
180                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
181                "interface/XPDR1-NETWORK1-1"
182                .format(self.restconf_baseurl))
183         headers = {'content-type': 'application/json'}
184         response = requests.request(
185             "GET", url, headers=headers, auth=('admin', 'admin'))
186         self.assertEqual(response.status_code, requests.codes.ok)
187         res = response.json()
188
189         input_dict = {'name': 'XPDR1-NETWORK1-1',
190                       'administrative-state': 'inService',
191                       'supporting-circuit-pack-name': 'CP1-CFP0',
192                       'type': 'org-openroadm-interfaces:opticalChannel',
193                       'supporting-port': 'CP1-CFP0-P1'
194                       }
195         # assertDictContainsSubset is deprecated
196         '''
197         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
198                                        'supporting-circuit-pack-name': 'CP1-CFP0',
199                                        'type': 'org-openroadm-interfaces:opticalChannel',
200                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
201         '''
202         self.assertDictEqual(input_dict,
203                              extract_a_from_b(input_dict,
204                                               res['interface'][0])
205                              )
206         self.assertDictEqual(
207             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
208              u'transmit-power': -5},
209             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
210
211     def test_07_check_interface_OTU(self):
212         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
213                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
214                "interface/XPDR1-NETWORK1-OTU"
215                .format(self.restconf_baseurl))
216         headers = {'content-type': 'application/json'}
217         response = requests.request(
218             "GET", url, headers=headers, auth=('admin', 'admin'))
219         self.assertEqual(response.status_code, requests.codes.ok)
220         res = response.json()
221         input_dict = {'name': 'XPDR1-NETWORK1-OTU',
222                       'administrative-state': 'inService',
223                       'supporting-circuit-pack-name': 'CP1-CFP0',
224                       'supporting-interface': 'XPDR1-NETWORK1-1',
225                       'type': 'org-openroadm-interfaces:otnOtu',
226                       'supporting-port': 'CP1-CFP0-P1'}
227
228         # assertDictContainsSubset is deprecated
229         '''
230         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
231                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
232                                        'type': 'org-openroadm-interfaces:otnOtu',
233                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
234         '''
235         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
236                                                           res['interface'][0])
237                              )
238
239         self.assertDictEqual(
240             {u'rate': u'org-openroadm-otn-common-types:OTU4',
241              u'fec': u'scfec'},
242             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
243
244     def test_08_otn_service_path_create_ODU4(self):
245         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
246         data = {"renderer:input": {
247             "service-name": "service_ODU4",
248             "operation": "create",
249             "service-rate": "100G",
250             "service-type": "ODU",
251             "nodes": [
252                 {"node-id": "SPDR-SA1",
253                  "network-tp": "XPDR1-NETWORK1"}]}}
254         headers = {'content-type': 'application/json'}
255         response = requests.request(
256             "POST", url, data=json.dumps(data),
257             headers=headers, auth=('admin', 'admin'))
258         time.sleep(3)
259         self.assertEqual(response.status_code, requests.codes.ok)
260         res = response.json()
261         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
262         self.assertTrue(res["output"]["success"])
263         self.assertIn(
264             {'node-id': 'SPDR-SA1',
265              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
266
267     def test_09_get_portmapping_NETWORK1(self):
268         url = ("{}/config/transportpce-portmapping:network/"
269                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
270                .format(self.restconf_baseurl))
271         headers = {'content-type': 'application/json'}
272         response = requests.request(
273             "GET", url, headers=headers, auth=('admin', 'admin'))
274         self.assertEqual(response.status_code, requests.codes.ok)
275         res = response.json()
276         self.assertIn(
277             {"logical-connection-point": "XPDR1-NETWORK1",
278              "supporting-port": "CP1-CFP0-P1",
279              "supported-interface-capability": [
280                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
281              ],
282                 "port-direction": "bidirectional",
283                 "port-qual": "xpdr-network",
284                 "supporting-circuit-pack-name": "CP1-CFP0",
285                 "xponder-type": "mpdr",
286                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
287                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
288              },
289             res['mapping'])
290
291     def test_10_check_interface_ODU4(self):
292         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
293                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
294                "interface/XPDR1-NETWORK1-ODU4"
295                .format(self.restconf_baseurl))
296         headers = {'content-type': 'application/json'}
297         response = requests.request(
298             "GET", url, headers=headers, auth=('admin', 'admin'))
299         self.assertEqual(response.status_code, requests.codes.ok)
300         res = response.json()
301         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
302                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
303                         'type': 'org-openroadm-interfaces:otnOdu',
304                         'supporting-port': 'CP1-CFP0-P1'}
305         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
306                         'rate': 'org-openroadm-otn-common-types:ODU4'}
307
308         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
309                                                             res['interface'][0])
310                              )
311         self.assertDictEqual(input_dict_2,
312                              extract_a_from_b(input_dict_2,
313                                               res['interface'][0][
314                                                   'org-openroadm-otn-odu-interfaces:odu'])
315
316                              )
317         '''
318         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
319                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
320                                        'type': 'org-openroadm-interfaces:otnOdu',
321                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
322         self.assertDictContainsSubset(
323             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
324              'rate': 'org-openroadm-otn-common-types:ODU4'},
325             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
326         '''
327         self.assertDictEqual(
328             {u'payload-type': u'21', u'exp-payload-type': u'21'},
329             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
330
331     def test_11_otn_service_path_create_10GE(self):
332         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
333         data = {"renderer:input": {
334             "service-name": "service1",
335             "operation": "create",
336             "service-rate": "10G",
337             "service-type": "Ethernet",
338             "ethernet-encoding": "eth encode",
339             "trib-slot": ["1"],
340             "trib-port-number": "1",
341             "nodes": [
342                 {"node-id": "SPDR-SA1",
343                  "client-tp": "XPDR1-CLIENT1",
344                  "network-tp": "XPDR1-NETWORK1"}]}}
345         headers = {'content-type': 'application/json'}
346         response = requests.request(
347             "POST", url, data=json.dumps(data),
348             headers=headers, auth=('admin', 'admin'))
349         time.sleep(3)
350         self.assertEqual(response.status_code, requests.codes.ok)
351         res = response.json()
352         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
353         self.assertTrue(res["output"]["success"])
354         self.assertIn(
355             {'node-id': 'SPDR-SA1',
356              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
357              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
358              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
359
360     def test_12_check_interface_10GE_CLIENT(self):
361         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
362                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
363                "interface/XPDR1-CLIENT1-ETHERNET10G"
364                .format(self.restconf_baseurl))
365         headers = {'content-type': 'application/json'}
366         response = requests.request(
367             "GET", url, headers=headers, auth=('admin', 'admin'))
368         self.assertEqual(response.status_code, requests.codes.ok)
369         res = response.json()
370         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
371                       'administrative-state': 'inService',
372                       'supporting-circuit-pack-name': 'CP1-SFP4',
373                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
374                       'supporting-port': 'CP1-SFP4-P1'
375                       }
376
377         '''
378         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
379                                        'supporting-circuit-pack-name': 'CP1-SFP4',
380                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
381                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
382         '''
383         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
384                                                           res['interface'][0])
385                              )
386         self.assertDictEqual(
387             {u'speed': 10000},
388             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
389
390     def test_13_check_interface_ODU2E_CLIENT(self):
391         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
392                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
393                "interface/XPDR1-CLIENT1-ODU2e-service1"
394                .format(self.restconf_baseurl))
395         headers = {'content-type': 'application/json'}
396         response = requests.request(
397             "GET", url, headers=headers, auth=('admin', 'admin'))
398         self.assertEqual(response.status_code, requests.codes.ok)
399         res = response.json()
400
401         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
402                         'administrative-state': 'inService',
403                         'supporting-circuit-pack-name': 'CP1-SFP4',
404                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
405                         'type': 'org-openroadm-interfaces:otnOdu',
406                         'supporting-port': 'CP1-SFP4-P1'}
407         input_dict_2 = {
408             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
409             'rate': 'org-openroadm-otn-common-types:ODU2e',
410             'monitoring-mode': 'terminated'}
411
412         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
413                                                             res['interface'][0])
414                              )
415         self.assertDictEqual(input_dict_2,
416                              extract_a_from_b(input_dict_2, res['interface'][0][
417                                  'org-openroadm-otn-odu-interfaces:odu'])
418                              )
419
420         '''
421         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
422                                        'supporting-circuit-pack-name': 'CP1-SFP4',
423                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
424                                        'type': 'org-openroadm-interfaces:otnOdu',
425                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
426         self.assertDictContainsSubset({
427             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
428             'rate': 'org-openroadm-otn-common-types:ODU2e',
429             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
430         '''
431         self.assertDictEqual(
432             {u'payload-type': u'03', u'exp-payload-type': u'03'},
433             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
434
435     def test_14_check_interface_ODU2E_NETWORK(self):
436         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
437                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
438                "interface/XPDR1-NETWORK1-ODU2e-service1"
439                .format(self.restconf_baseurl))
440         headers = {'content-type': 'application/json'}
441         response = requests.request(
442             "GET", url, headers=headers, auth=('admin', 'admin'))
443         self.assertEqual(response.status_code, requests.codes.ok)
444         res = response.json()
445         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
446                         'supporting-circuit-pack-name': 'CP1-CFP0',
447                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
448                         'type': 'org-openroadm-interfaces:otnOdu',
449                         'supporting-port': 'CP1-CFP0-P1'}
450         input_dict_2 = {
451             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
452             'rate': 'org-openroadm-otn-common-types:ODU2e',
453             'monitoring-mode': 'monitored'}
454
455         input_dict_3 = {'trib-port-number': 1}
456
457         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
458                                                             res['interface'][0])
459                              )
460
461         self.assertDictEqual(input_dict_2,
462                              extract_a_from_b(input_dict_2,
463                                               res['interface'][0][
464                                                   'org-openroadm-otn-odu-interfaces:odu']
465                                               ))
466
467         self.assertDictEqual(input_dict_3,
468                              extract_a_from_b(input_dict_3,
469                                               res['interface'][0][
470                                                   'org-openroadm-otn-odu-interfaces:odu'][
471                                                   'parent-odu-allocation']))
472
473         '''
474         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
475                                        'supporting-circuit-pack-name': 'CP1-CFP0',
476                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
477                                        'type': 'org-openroadm-interfaces:otnOdu',
478                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
479         self.assertDictContainsSubset({
480             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
481             'rate': 'org-openroadm-otn-common-types:ODU2e',
482             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
483         self.assertDictContainsSubset(
484             {'trib-port-number': 1},
485             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
486         '''
487         self.assertIn(1,
488                       res['interface'][0][
489                           'org-openroadm-otn-odu-interfaces:odu'][
490                           'parent-odu-allocation']['trib-slots'])
491
492     def test_15_check_ODU2E_connection(self):
493         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
494                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
495                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
496                .format(self.restconf_baseurl))
497         headers = {'content-type': 'application/json'}
498         response = requests.request(
499             "GET", url, headers=headers, auth=('admin', 'admin'))
500         self.assertEqual(response.status_code, requests.codes.ok)
501         res = response.json()
502         input_dict_1 = {
503             'connection-name':
504             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
505             'direction': 'bidirectional'
506         }
507
508         self.assertDictEqual(input_dict_1,
509                              extract_a_from_b(input_dict_1,
510                                               res['odu-connection'][0]))
511         '''
512         self.assertDictContainsSubset({
513             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
514             'direction': 'bidirectional'},
515             res['odu-connection'][0])
516         '''
517         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
518                              res['odu-connection'][0]['destination'])
519         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
520                              res['odu-connection'][0]['source'])
521
522     def test_16_otn_service_path_delete_10GE(self):
523         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
524         data = {"renderer:input": {
525             "service-name": "service1",
526             "operation": "delete",
527             "service-rate": "10G",
528             "service-type": "Ethernet",
529             "ethernet-encoding": "eth encode",
530             "trib-slot": ["1"],
531             "trib-port-number": "1",
532             "nodes": [
533                 {"node-id": "SPDR-SA1",
534                  "client-tp": "XPDR1-CLIENT1",
535                  "network-tp": "XPDR1-NETWORK1"}]}}
536         headers = {'content-type': 'application/json'}
537         response = requests.request(
538             "POST", url, data=json.dumps(data),
539             headers=headers, auth=('admin', 'admin'))
540         time.sleep(3)
541         self.assertEqual(response.status_code, requests.codes.ok)
542         res = response.json()
543         self.assertIn('Request processed', res["output"]["result"])
544         self.assertTrue(res["output"]["success"])
545
546     def test_17_check_no_ODU2E_connection(self):
547         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
548                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
549                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
550                .format(self.restconf_baseurl))
551         headers = {'content-type': 'application/json'}
552         response = requests.request(
553             "GET", url, headers=headers, auth=('admin', 'admin'))
554         self.assertEqual(response.status_code, requests.codes.not_found)
555
556     def test_18_check_no_interface_ODU2E_NETWORK(self):
557         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
558                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
559                "interface/XPDR1-NETWORK1-ODU2e-service1"
560                .format(self.restconf_baseurl))
561         headers = {'content-type': 'application/json'}
562         response = requests.request(
563             "GET", url, headers=headers, auth=('admin', 'admin'))
564         self.assertEqual(response.status_code, requests.codes.not_found)
565
566     def test_19_check_no_interface_ODU2E_CLIENT(self):
567         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
568                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
569                "interface/XPDR1-CLIENT1-ODU2e-service1"
570                .format(self.restconf_baseurl))
571         headers = {'content-type': 'application/json'}
572         response = requests.request(
573             "GET", url, headers=headers, auth=('admin', 'admin'))
574         self.assertEqual(response.status_code, requests.codes.not_found)
575
576     def test_20_check_no_interface_10GE_CLIENT(self):
577         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
578                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
579                "interface/XPDR1-CLIENT1-ETHERNET10G"
580                .format(self.restconf_baseurl))
581         headers = {'content-type': 'application/json'}
582         response = requests.request(
583             "GET", url, headers=headers, auth=('admin', 'admin'))
584         self.assertEqual(response.status_code, requests.codes.not_found)
585
586     def test_21_otn_service_path_delete_ODU4(self):
587         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
588         data = {"renderer:input": {
589             "service-name": "service_ODU4",
590             "operation": "delete",
591             "service-rate": "100G",
592             "service-type": "ODU",
593             "nodes": [
594                 {"node-id": "SPDR-SA1",
595                  "network-tp": "XPDR1-NETWORK1"}]}}
596         headers = {'content-type': 'application/json'}
597         response = requests.request(
598             "POST", url, data=json.dumps(data),
599             headers=headers, auth=('admin', 'admin'))
600         time.sleep(3)
601         self.assertEqual(response.status_code, requests.codes.ok)
602         res = response.json()
603         self.assertIn('Request processed', res["output"]["result"])
604         self.assertTrue(res["output"]["success"])
605
606     def test_22_check_no_interface_ODU4(self):
607         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
608                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
609                "interface/XPDR1-NETWORK1-ODU4"
610                .format(self.restconf_baseurl))
611         headers = {'content-type': 'application/json'}
612         response = requests.request(
613             "GET", url, headers=headers, auth=('admin', 'admin'))
614         self.assertEqual(response.status_code, requests.codes.not_found)
615
616     def test_23_service_path_delete_OCH_OTU4(self):
617         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
618         data = {"renderer:input": {
619             "service-name": "service_OTU4",
620             "wave-number": "1",
621             "modulation-format": "qpsk",
622             "operation": "delete",
623             "nodes": [
624                 {"node-id": "SPDR-SA1",
625                  "dest-tp": "XPDR1-NETWORK1"}]}}
626         headers = {'content-type': 'application/json'}
627         response = requests.request(
628             "POST", url, data=json.dumps(data),
629             headers=headers, auth=('admin', 'admin'))
630         time.sleep(3)
631         self.assertEqual(response.status_code, requests.codes.ok)
632         res = response.json()
633         self.assertIn('Request processed', res["output"]["result"])
634         self.assertTrue(res["output"]["success"])
635
636     def test_24_check_no_interface_OTU4(self):
637         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
638                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
639                "interface/XPDR1-NETWORK1-OTU"
640                .format(self.restconf_baseurl))
641         headers = {'content-type': 'application/json'}
642         response = requests.request(
643             "GET", url, headers=headers, auth=('admin', 'admin'))
644         self.assertEqual(response.status_code, requests.codes.not_found)
645
646     def test_25_check_no_interface_OCH(self):
647         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
648                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
649                "interface/XPDR1-NETWORK1-1"
650                .format(self.restconf_baseurl))
651         headers = {'content-type': 'application/json'}
652         response = requests.request(
653             "GET", url, headers=headers, auth=('admin', 'admin'))
654         self.assertEqual(response.status_code, requests.codes.not_found)
655
656     def test_26_disconnect_SPDR_SA1(self):
657         url = ("{}/config/network-topology:"
658                "network-topology/topology/topology-netconf/node/SPDR-SA1"
659                .format(self.restconf_baseurl))
660         data = {}
661         headers = {'content-type': 'application/json'}
662         response = requests.request(
663             "DELETE", url, data=json.dumps(data), headers=headers,
664             auth=('admin', 'admin'))
665         self.assertEqual(response.status_code, requests.codes.ok)
666
667
668 if __name__ == "__main__":
669     unittest.main(verbosity=2)