review sims/tpce start-up sequence in 2.2.1 tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24
25 def extract_a_from_b(a, b):
26     return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     sim_process1 = None
32     odl_process = None
33     restconf_baseurl = "http://localhost:8181/restconf"
34
35     @classmethod
36     def setUpClass(cls):
37         cls.odl_process = test_utils.start_tpce()
38         cls.sim_process1 = test_utils.start_sim('spdrav2')
39
40     @classmethod
41     def tearDownClass(cls):
42         for child in psutil.Process(cls.odl_process.pid).children():
43             child.send_signal(signal.SIGINT)
44             child.wait()
45         cls.odl_process.send_signal(signal.SIGINT)
46         cls.odl_process.wait()
47         for child in psutil.Process(cls.sim_process1.pid).children():
48             child.send_signal(signal.SIGINT)
49             child.wait()
50         cls.sim_process1.send_signal(signal.SIGINT)
51         cls.sim_process1.wait()
52
53     def setUp(self):
54         time.sleep(5)
55
56     def test_01_connect_SPDR_SA1(self):
57         url = ("{}/config/network-topology:"
58                "network-topology/topology/topology-netconf/node/SPDR-SA1"
59                .format(self.restconf_baseurl))
60         data = {"node": [{
61             "node-id": "SPDR-SA1",
62             "netconf-node-topology:username": "admin",
63             "netconf-node-topology:password": "admin",
64             "netconf-node-topology:host": "127.0.0.1",
65             "netconf-node-topology:port": test_utils.sims['spdrav2']['port'],
66             "netconf-node-topology:tcp-only": "false",
67             "netconf-node-topology:pass-through": {}}]}
68         headers = {'content-type': 'application/json'}
69         response = requests.request(
70             "PUT", url, data=json.dumps(data), headers=headers,
71             auth=('admin', 'admin'))
72         self.assertEqual(response.status_code, requests.codes.created)
73         time.sleep(10)
74         url = ("{}/operational/network-topology:"
75                "network-topology/topology/topology-netconf/node/SPDR-SA1"
76                .format(self.restconf_baseurl))
77         response = requests.request(
78             "GET", url, headers=headers, auth=('admin', 'admin'))
79         self.assertEqual(response.status_code, requests.codes.ok)
80         res = response.json()
81         self.assertEqual(
82             res['node'][0]['netconf-node-topology:connection-status'],
83             'connected')
84
85     def test_02_get_portmapping_CLIENT1(self):
86         url = ("{}/config/transportpce-portmapping:network/"
87                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
88                .format(self.restconf_baseurl))
89         headers = {'content-type': 'application/json'}
90         response = requests.request(
91             "GET", url, headers=headers, auth=('admin', 'admin'))
92         self.assertEqual(response.status_code, requests.codes.ok)
93         res = response.json()
94         self.assertIn(
95             {'supported-interface-capability': [
96                 'org-openroadm-port-types:if-10GE-ODU2e',
97                 'org-openroadm-port-types:if-10GE-ODU2',
98                 'org-openroadm-port-types:if-10GE'],
99              'supporting-port': 'CP1-SFP4-P1',
100              'supporting-circuit-pack-name': 'CP1-SFP4',
101              'logical-connection-point': 'XPDR1-CLIENT1',
102              'port-direction': 'bidirectional',
103              'port-qual': 'xpdr-client',
104              'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
105             res['mapping'])
106
107     def test_03_get_portmapping_NETWORK1(self):
108         url = ("{}/config/transportpce-portmapping:network/"
109                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
110                .format(self.restconf_baseurl))
111         headers = {'content-type': 'application/json'}
112         response = requests.request(
113             "GET", url, headers=headers, auth=('admin', 'admin'))
114         self.assertEqual(response.status_code, requests.codes.ok)
115         res = response.json()
116         self.assertIn(
117             {"logical-connection-point": "XPDR1-NETWORK1",
118              "supporting-port": "CP1-CFP0-P1",
119              "supported-interface-capability": [
120                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
121              ],
122                 "port-direction": "bidirectional",
123                 "port-qual": "xpdr-network",
124                 "supporting-circuit-pack-name": "CP1-CFP0",
125                 "xponder-type": "mpdr",
126              'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
127             res['mapping'])
128
129     def test_04_service_path_create_OCH_OTU4(self):
130         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
131         data = {"renderer:input": {
132             "service-name": "service_ODU4",
133             "wave-number": "1",
134             "modulation-format": "qpsk",
135             "operation": "create",
136             "nodes": [
137                 {"node-id": "SPDR-SA1",
138                  "dest-tp": "XPDR1-NETWORK1"}]}}
139         headers = {'content-type': 'application/json'}
140         response = requests.request(
141             "POST", url, data=json.dumps(data),
142             headers=headers, auth=('admin', 'admin'))
143         time.sleep(3)
144         self.assertEqual(response.status_code, requests.codes.ok)
145         res = response.json()
146         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
147         self.assertTrue(res["output"]["success"])
148         self.assertIn(
149             {'node-id': 'SPDR-SA1',
150              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
151              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
152
153     def test_05_get_portmapping_NETWORK1(self):
154         url = ("{}/config/transportpce-portmapping:network/"
155                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
156                .format(self.restconf_baseurl))
157         headers = {'content-type': 'application/json'}
158         response = requests.request(
159             "GET", url, headers=headers, auth=('admin', 'admin'))
160         self.assertEqual(response.status_code, requests.codes.ok)
161         res = response.json()
162         self.assertIn(
163             {"logical-connection-point": "XPDR1-NETWORK1",
164              "supporting-port": "CP1-CFP0-P1",
165              "supported-interface-capability": [
166                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
167              ],
168                 "port-direction": "bidirectional",
169                 "port-qual": "xpdr-network",
170                 "supporting-circuit-pack-name": "CP1-CFP0",
171                 "xponder-type": "mpdr",
172                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
173             res['mapping'])
174
175     def test_06_check_interface_och(self):
176         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
177                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
178                "interface/XPDR1-NETWORK1-1"
179                .format(self.restconf_baseurl))
180         headers = {'content-type': 'application/json'}
181         response = requests.request(
182             "GET", url, headers=headers, auth=('admin', 'admin'))
183         self.assertEqual(response.status_code, requests.codes.ok)
184         res = response.json()
185
186         input_dict = {'name': 'XPDR1-NETWORK1-1',
187                       'administrative-state': 'inService',
188                       'supporting-circuit-pack-name': 'CP1-CFP0',
189                       'type': 'org-openroadm-interfaces:opticalChannel',
190                       'supporting-port': 'CP1-CFP0-P1'
191                       }
192         # assertDictContainsSubset is deprecated
193         '''
194         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
195                                        'supporting-circuit-pack-name': 'CP1-CFP0',
196                                        'type': 'org-openroadm-interfaces:opticalChannel',
197                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
198         '''
199         self.assertDictEqual(input_dict,
200                              extract_a_from_b(input_dict,
201                                               res['interface'][0])
202                              )
203         self.assertDictEqual(
204             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
205              u'transmit-power': -5},
206             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
207
208     def test_07_check_interface_OTU(self):
209         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
210                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
211                "interface/XPDR1-NETWORK1-OTU"
212                .format(self.restconf_baseurl))
213         headers = {'content-type': 'application/json'}
214         response = requests.request(
215             "GET", url, headers=headers, auth=('admin', 'admin'))
216         self.assertEqual(response.status_code, requests.codes.ok)
217         res = response.json()
218         input_dict = {'name': 'XPDR1-NETWORK1-OTU',
219                       'administrative-state': 'inService',
220                       'supporting-circuit-pack-name': 'CP1-CFP0',
221                       'supporting-interface': 'XPDR1-NETWORK1-1',
222                       'type': 'org-openroadm-interfaces:otnOtu',
223                       'supporting-port': 'CP1-CFP0-P1'}
224
225         # assertDictContainsSubset is deprecated
226         '''
227         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
228                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
229                                        'type': 'org-openroadm-interfaces:otnOtu',
230                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
231         '''
232         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
233                                                           res['interface'][0])
234                              )
235
236         self.assertDictEqual(
237             {u'rate': u'org-openroadm-otn-common-types:OTU4',
238              u'fec': u'scfec'},
239             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
240
241     def test_08_otn_service_path_create_ODU4(self):
242         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
243         data = {"renderer:input": {
244             "service-name": "service_ODU4",
245             "operation": "create",
246             "service-rate": "100G",
247             "service-type": "ODU",
248             "nodes": [
249                 {"node-id": "SPDR-SA1",
250                  "network-tp": "XPDR1-NETWORK1"}]}}
251         headers = {'content-type': 'application/json'}
252         response = requests.request(
253             "POST", url, data=json.dumps(data),
254             headers=headers, auth=('admin', 'admin'))
255         time.sleep(3)
256         self.assertEqual(response.status_code, requests.codes.ok)
257         res = response.json()
258         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
259         self.assertTrue(res["output"]["success"])
260         self.assertIn(
261             {'node-id': 'SPDR-SA1',
262              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
263
264     def test_09_get_portmapping_NETWORK1(self):
265         url = ("{}/config/transportpce-portmapping:network/"
266                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
267                .format(self.restconf_baseurl))
268         headers = {'content-type': 'application/json'}
269         response = requests.request(
270             "GET", url, headers=headers, auth=('admin', 'admin'))
271         self.assertEqual(response.status_code, requests.codes.ok)
272         res = response.json()
273         self.assertIn(
274             {"logical-connection-point": "XPDR1-NETWORK1",
275              "supporting-port": "CP1-CFP0-P1",
276              "supported-interface-capability": [
277                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
278              ],
279                 "port-direction": "bidirectional",
280                 "port-qual": "xpdr-network",
281                 "supporting-circuit-pack-name": "CP1-CFP0",
282                 "xponder-type": "mpdr",
283                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
284                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
285              },
286             res['mapping'])
287
288     def test_10_check_interface_ODU4(self):
289         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
290                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
291                "interface/XPDR1-NETWORK1-ODU4"
292                .format(self.restconf_baseurl))
293         headers = {'content-type': 'application/json'}
294         response = requests.request(
295             "GET", url, headers=headers, auth=('admin', 'admin'))
296         self.assertEqual(response.status_code, requests.codes.ok)
297         res = response.json()
298         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
299                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
300                         'type': 'org-openroadm-interfaces:otnOdu',
301                         'supporting-port': 'CP1-CFP0-P1'}
302         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
303                         'rate': 'org-openroadm-otn-common-types:ODU4'}
304
305         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
306                                                             res['interface'][0])
307                              )
308         self.assertDictEqual(input_dict_2,
309                              extract_a_from_b(input_dict_2,
310                                               res['interface'][0][
311                                                   'org-openroadm-otn-odu-interfaces:odu'])
312
313                              )
314         '''
315         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
316                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
317                                        'type': 'org-openroadm-interfaces:otnOdu',
318                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
319         self.assertDictContainsSubset(
320             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
321              'rate': 'org-openroadm-otn-common-types:ODU4'},
322             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
323         '''
324         self.assertDictEqual(
325             {u'payload-type': u'21', u'exp-payload-type': u'21'},
326             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
327
328     def test_11_otn_service_path_create_10GE(self):
329         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
330         data = {"renderer:input": {
331             "service-name": "service1",
332             "operation": "create",
333             "service-rate": "10G",
334             "service-type": "Ethernet",
335             "ethernet-encoding": "eth encode",
336             "trib-slot": ["1"],
337             "trib-port-number": "1",
338             "nodes": [
339                 {"node-id": "SPDR-SA1",
340                  "client-tp": "XPDR1-CLIENT1",
341                  "network-tp": "XPDR1-NETWORK1"}]}}
342         headers = {'content-type': 'application/json'}
343         response = requests.request(
344             "POST", url, data=json.dumps(data),
345             headers=headers, auth=('admin', 'admin'))
346         time.sleep(3)
347         self.assertEqual(response.status_code, requests.codes.ok)
348         res = response.json()
349         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
350         self.assertTrue(res["output"]["success"])
351         self.assertIn(
352             {'node-id': 'SPDR-SA1',
353              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
354              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
355              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
356
357     def test_12_check_interface_10GE_CLIENT(self):
358         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
359                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
360                "interface/XPDR1-CLIENT1-ETHERNET10G"
361                .format(self.restconf_baseurl))
362         headers = {'content-type': 'application/json'}
363         response = requests.request(
364             "GET", url, headers=headers, auth=('admin', 'admin'))
365         self.assertEqual(response.status_code, requests.codes.ok)
366         res = response.json()
367         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
368                       'administrative-state': 'inService',
369                       'supporting-circuit-pack-name': 'CP1-SFP4',
370                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
371                       'supporting-port': 'CP1-SFP4-P1'
372                       }
373
374         '''
375         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
376                                        'supporting-circuit-pack-name': 'CP1-SFP4',
377                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
378                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
379         '''
380         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
381                                                           res['interface'][0])
382                              )
383         self.assertDictEqual(
384             {u'speed': 10000},
385             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
386
387     def test_13_check_interface_ODU2E_CLIENT(self):
388         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
389                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
390                "interface/XPDR1-CLIENT1-ODU2e-service1"
391                .format(self.restconf_baseurl))
392         headers = {'content-type': 'application/json'}
393         response = requests.request(
394             "GET", url, headers=headers, auth=('admin', 'admin'))
395         self.assertEqual(response.status_code, requests.codes.ok)
396         res = response.json()
397
398         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
399                         'administrative-state': 'inService',
400                         'supporting-circuit-pack-name': 'CP1-SFP4',
401                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
402                         'type': 'org-openroadm-interfaces:otnOdu',
403                         'supporting-port': 'CP1-SFP4-P1'}
404         input_dict_2 = {
405             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
406             'rate': 'org-openroadm-otn-common-types:ODU2e',
407             'monitoring-mode': 'terminated'}
408
409         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
410                                                             res['interface'][0])
411                              )
412         self.assertDictEqual(input_dict_2,
413                              extract_a_from_b(input_dict_2, res['interface'][0][
414                                  'org-openroadm-otn-odu-interfaces:odu'])
415                              )
416
417         '''
418         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
419                                        'supporting-circuit-pack-name': 'CP1-SFP4',
420                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
421                                        'type': 'org-openroadm-interfaces:otnOdu',
422                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
423         self.assertDictContainsSubset({
424             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
425             'rate': 'org-openroadm-otn-common-types:ODU2e',
426             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
427         '''
428         self.assertDictEqual(
429             {u'payload-type': u'03', u'exp-payload-type': u'03'},
430             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
431
432     def test_14_check_interface_ODU2E_NETWORK(self):
433         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
434                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
435                "interface/XPDR1-NETWORK1-ODU2e-service1"
436                .format(self.restconf_baseurl))
437         headers = {'content-type': 'application/json'}
438         response = requests.request(
439             "GET", url, headers=headers, auth=('admin', 'admin'))
440         self.assertEqual(response.status_code, requests.codes.ok)
441         res = response.json()
442         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
443                         'supporting-circuit-pack-name': 'CP1-CFP0',
444                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
445                         'type': 'org-openroadm-interfaces:otnOdu',
446                         'supporting-port': 'CP1-CFP0-P1'}
447         input_dict_2 = {
448             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
449             'rate': 'org-openroadm-otn-common-types:ODU2e',
450             'monitoring-mode': 'monitored'}
451
452         input_dict_3 = {'trib-port-number': 1}
453
454         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
455                                                             res['interface'][0])
456                              )
457
458         self.assertDictEqual(input_dict_2,
459                              extract_a_from_b(input_dict_2,
460                                               res['interface'][0][
461                                                   'org-openroadm-otn-odu-interfaces:odu']
462                                               ))
463
464         self.assertDictEqual(input_dict_3,
465                              extract_a_from_b(input_dict_3,
466                                               res['interface'][0][
467                                                   'org-openroadm-otn-odu-interfaces:odu'][
468                                                   'parent-odu-allocation']))
469
470         '''
471         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
472                                        'supporting-circuit-pack-name': 'CP1-CFP0',
473                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
474                                        'type': 'org-openroadm-interfaces:otnOdu',
475                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
476         self.assertDictContainsSubset({
477             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
478             'rate': 'org-openroadm-otn-common-types:ODU2e',
479             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
480         self.assertDictContainsSubset(
481             {'trib-port-number': 1},
482             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
483         '''
484         self.assertIn(1,
485                       res['interface'][0][
486                           'org-openroadm-otn-odu-interfaces:odu'][
487                           'parent-odu-allocation']['trib-slots'])
488
489     def test_15_check_ODU2E_connection(self):
490         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
491                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
492                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
493                .format(self.restconf_baseurl))
494         headers = {'content-type': 'application/json'}
495         response = requests.request(
496             "GET", url, headers=headers, auth=('admin', 'admin'))
497         self.assertEqual(response.status_code, requests.codes.ok)
498         res = response.json()
499         input_dict_1 = {
500             'connection-name':
501             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
502             'direction': 'bidirectional'
503         }
504
505         self.assertDictEqual(input_dict_1,
506                              extract_a_from_b(input_dict_1,
507                                               res['odu-connection'][0]))
508         '''
509         self.assertDictContainsSubset({
510             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
511             'direction': 'bidirectional'},
512             res['odu-connection'][0])
513         '''
514         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
515                              res['odu-connection'][0]['destination'])
516         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
517                              res['odu-connection'][0]['source'])
518
519     def test_16_otn_service_path_delete_10GE(self):
520         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
521         data = {"renderer:input": {
522             "service-name": "service1",
523             "operation": "delete",
524             "service-rate": "10G",
525             "service-type": "Ethernet",
526             "ethernet-encoding": "eth encode",
527             "trib-slot": ["1"],
528             "trib-port-number": "1",
529             "nodes": [
530                 {"node-id": "SPDR-SA1",
531                  "client-tp": "XPDR1-CLIENT1",
532                  "network-tp": "XPDR1-NETWORK1"}]}}
533         headers = {'content-type': 'application/json'}
534         response = requests.request(
535             "POST", url, data=json.dumps(data),
536             headers=headers, auth=('admin', 'admin'))
537         time.sleep(3)
538         self.assertEqual(response.status_code, requests.codes.ok)
539         res = response.json()
540         self.assertIn('Request processed', res["output"]["result"])
541         self.assertTrue(res["output"]["success"])
542
543     def test_17_check_no_ODU2E_connection(self):
544         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
545                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
546                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
547                .format(self.restconf_baseurl))
548         headers = {'content-type': 'application/json'}
549         response = requests.request(
550             "GET", url, headers=headers, auth=('admin', 'admin'))
551         self.assertEqual(response.status_code, requests.codes.not_found)
552
553     def test_18_check_no_interface_ODU2E_NETWORK(self):
554         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
555                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
556                "interface/XPDR1-NETWORK1-ODU2e-service1"
557                .format(self.restconf_baseurl))
558         headers = {'content-type': 'application/json'}
559         response = requests.request(
560             "GET", url, headers=headers, auth=('admin', 'admin'))
561         self.assertEqual(response.status_code, requests.codes.not_found)
562
563     def test_19_check_no_interface_ODU2E_CLIENT(self):
564         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
565                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
566                "interface/XPDR1-CLIENT1-ODU2e-service1"
567                .format(self.restconf_baseurl))
568         headers = {'content-type': 'application/json'}
569         response = requests.request(
570             "GET", url, headers=headers, auth=('admin', 'admin'))
571         self.assertEqual(response.status_code, requests.codes.not_found)
572
573     def test_20_check_no_interface_10GE_CLIENT(self):
574         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
575                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
576                "interface/XPDR1-CLIENT1-ETHERNET10G"
577                .format(self.restconf_baseurl))
578         headers = {'content-type': 'application/json'}
579         response = requests.request(
580             "GET", url, headers=headers, auth=('admin', 'admin'))
581         self.assertEqual(response.status_code, requests.codes.not_found)
582
583     def test_21_otn_service_path_delete_ODU4(self):
584         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
585         data = {"renderer:input": {
586             "service-name": "service_ODU4",
587             "operation": "delete",
588             "service-rate": "100G",
589             "service-type": "ODU",
590             "nodes": [
591                 {"node-id": "SPDR-SA1",
592                  "network-tp": "XPDR1-NETWORK1"}]}}
593         headers = {'content-type': 'application/json'}
594         response = requests.request(
595             "POST", url, data=json.dumps(data),
596             headers=headers, auth=('admin', 'admin'))
597         time.sleep(3)
598         self.assertEqual(response.status_code, requests.codes.ok)
599         res = response.json()
600         self.assertIn('Request processed', res["output"]["result"])
601         self.assertTrue(res["output"]["success"])
602
603     def test_22_check_no_interface_ODU4(self):
604         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
605                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
606                "interface/XPDR1-NETWORK1-ODU4"
607                .format(self.restconf_baseurl))
608         headers = {'content-type': 'application/json'}
609         response = requests.request(
610             "GET", url, headers=headers, auth=('admin', 'admin'))
611         self.assertEqual(response.status_code, requests.codes.not_found)
612
613     def test_23_service_path_delete_OCH_OTU4(self):
614         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
615         data = {"renderer:input": {
616             "service-name": "service_OTU4",
617             "wave-number": "1",
618             "modulation-format": "qpsk",
619             "operation": "delete",
620             "nodes": [
621                 {"node-id": "SPDR-SA1",
622                  "dest-tp": "XPDR1-NETWORK1"}]}}
623         headers = {'content-type': 'application/json'}
624         response = requests.request(
625             "POST", url, data=json.dumps(data),
626             headers=headers, auth=('admin', 'admin'))
627         time.sleep(3)
628         self.assertEqual(response.status_code, requests.codes.ok)
629         res = response.json()
630         self.assertIn('Request processed', res["output"]["result"])
631         self.assertTrue(res["output"]["success"])
632
633     def test_24_check_no_interface_OTU4(self):
634         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
635                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
636                "interface/XPDR1-NETWORK1-OTU"
637                .format(self.restconf_baseurl))
638         headers = {'content-type': 'application/json'}
639         response = requests.request(
640             "GET", url, headers=headers, auth=('admin', 'admin'))
641         self.assertEqual(response.status_code, requests.codes.not_found)
642
643     def test_25_check_no_interface_OCH(self):
644         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
645                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
646                "interface/XPDR1-NETWORK1-1"
647                .format(self.restconf_baseurl))
648         headers = {'content-type': 'application/json'}
649         response = requests.request(
650             "GET", url, headers=headers, auth=('admin', 'admin'))
651         self.assertEqual(response.status_code, requests.codes.not_found)
652
653     def test_26_disconnect_SPDR_SA1(self):
654         url = ("{}/config/network-topology:"
655                "network-topology/topology/topology-netconf/node/SPDR-SA1"
656                .format(self.restconf_baseurl))
657         data = {}
658         headers = {'content-type': 'application/json'}
659         response = requests.request(
660             "DELETE", url, data=json.dumps(data), headers=headers,
661             auth=('admin', 'admin'))
662         self.assertEqual(response.status_code, requests.codes.ok)
663
664
665 if __name__ == "__main__":
666     unittest.main(verbosity=2)