improve devices connection methods in tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 from common import test_utils
23
24
25 def extract_a_from_b(a, b):
26     return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32
33     @classmethod
34     def setUpClass(cls):
35         cls.processes = test_utils.start_tpce()
36         cls.processes = test_utils.start_sims(['spdrav2'])
37
38     @classmethod
39     def tearDownClass(cls):
40         for process in cls.processes:
41             test_utils.shutdown_process(process)
42         print("all processes killed")
43
44     def setUp(self):
45         time.sleep(5)
46
47     def test_01_connect_SPDR_SA1(self):
48         response = test_utils.mount_device("SPDR-SA1", 'spdrav2')
49         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
50         time.sleep(10)
51
52         url = ("{}/operational/network-topology:"
53                "network-topology/topology/topology-netconf/node/SPDR-SA1"
54                .format(test_utils.RESTCONF_BASE_URL))
55         response = requests.request(
56             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
57         self.assertEqual(response.status_code, requests.codes.ok)
58         res = response.json()
59         self.assertEqual(
60             res['node'][0]['netconf-node-topology:connection-status'],
61             'connected')
62
63     def test_02_get_portmapping_CLIENT1(self):
64         url = ("{}/config/transportpce-portmapping:network/"
65                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
66                .format(test_utils.RESTCONF_BASE_URL))
67         response = requests.request(
68             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
69         self.assertEqual(response.status_code, requests.codes.ok)
70         res = response.json()
71         self.assertIn(
72             {'supported-interface-capability': [
73                 'org-openroadm-port-types:if-10GE-ODU2e',
74                 'org-openroadm-port-types:if-10GE-ODU2',
75                 'org-openroadm-port-types:if-10GE'],
76              'supporting-port': 'CP1-SFP4-P1',
77              'supporting-circuit-pack-name': 'CP1-SFP4',
78              'logical-connection-point': 'XPDR1-CLIENT1',
79              'port-direction': 'bidirectional',
80              'port-qual': 'xpdr-client',
81              'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
82             res['mapping'])
83
84     def test_03_get_portmapping_NETWORK1(self):
85         url = ("{}/config/transportpce-portmapping:network/"
86                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
87                .format(test_utils.RESTCONF_BASE_URL))
88         response = requests.request(
89             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
90         self.assertEqual(response.status_code, requests.codes.ok)
91         res = response.json()
92         self.assertIn(
93             {"logical-connection-point": "XPDR1-NETWORK1",
94              "supporting-port": "CP1-CFP0-P1",
95              "supported-interface-capability": [
96                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
97              ],
98                 "port-direction": "bidirectional",
99                 "port-qual": "xpdr-network",
100                 "supporting-circuit-pack-name": "CP1-CFP0",
101                 "xponder-type": "mpdr",
102              'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
103             res['mapping'])
104
105     def test_04_service_path_create_OCH_OTU4(self):
106         url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
107         data = {"renderer:input": {
108             "service-name": "service_ODU4",
109             "wave-number": "1",
110             "modulation-format": "qpsk",
111             "operation": "create",
112             "nodes": [
113                 {"node-id": "SPDR-SA1",
114                  "dest-tp": "XPDR1-NETWORK1"}]}}
115         response = requests.request(
116             "POST", url, data=json.dumps(data),
117             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
118         time.sleep(3)
119         self.assertEqual(response.status_code, requests.codes.ok)
120         res = response.json()
121         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
122         self.assertTrue(res["output"]["success"])
123         self.assertIn(
124             {'node-id': 'SPDR-SA1',
125              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
126              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
127
128     def test_05_get_portmapping_NETWORK1(self):
129         url = ("{}/config/transportpce-portmapping:network/"
130                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
131                .format(test_utils.RESTCONF_BASE_URL))
132         response = requests.request(
133             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
134         self.assertEqual(response.status_code, requests.codes.ok)
135         res = response.json()
136         self.assertIn(
137             {"logical-connection-point": "XPDR1-NETWORK1",
138              "supporting-port": "CP1-CFP0-P1",
139              "supported-interface-capability": [
140                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
141              ],
142                 "port-direction": "bidirectional",
143                 "port-qual": "xpdr-network",
144                 "supporting-circuit-pack-name": "CP1-CFP0",
145                 "xponder-type": "mpdr",
146                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
147             res['mapping'])
148
149     def test_06_check_interface_och(self):
150         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
151                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
152                "interface/XPDR1-NETWORK1-1"
153                .format(test_utils.RESTCONF_BASE_URL))
154         response = requests.request(
155             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
156         self.assertEqual(response.status_code, requests.codes.ok)
157         res = response.json()
158
159         input_dict = {'name': 'XPDR1-NETWORK1-1',
160                       'administrative-state': 'inService',
161                       'supporting-circuit-pack-name': 'CP1-CFP0',
162                       'type': 'org-openroadm-interfaces:opticalChannel',
163                       'supporting-port': 'CP1-CFP0-P1'
164                       }
165         # assertDictContainsSubset is deprecated
166         '''
167         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
168                                        'supporting-circuit-pack-name': 'CP1-CFP0',
169                                        'type': 'org-openroadm-interfaces:opticalChannel',
170                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
171         '''
172         self.assertDictEqual(input_dict,
173                              extract_a_from_b(input_dict,
174                                               res['interface'][0])
175                              )
176         self.assertDictEqual(
177             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
178              u'transmit-power': -5},
179             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
180
181     def test_07_check_interface_OTU(self):
182         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
183                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
184                "interface/XPDR1-NETWORK1-OTU"
185                .format(test_utils.RESTCONF_BASE_URL))
186         response = requests.request(
187             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
188         self.assertEqual(response.status_code, requests.codes.ok)
189         res = response.json()
190         input_dict = {'name': 'XPDR1-NETWORK1-OTU',
191                       'administrative-state': 'inService',
192                       'supporting-circuit-pack-name': 'CP1-CFP0',
193                       'supporting-interface': 'XPDR1-NETWORK1-1',
194                       'type': 'org-openroadm-interfaces:otnOtu',
195                       'supporting-port': 'CP1-CFP0-P1'}
196
197         # assertDictContainsSubset is deprecated
198         '''
199         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
200                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
201                                        'type': 'org-openroadm-interfaces:otnOtu',
202                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
203         '''
204         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
205                                                           res['interface'][0])
206                              )
207
208         self.assertDictEqual(
209             {u'rate': u'org-openroadm-otn-common-types:OTU4',
210              u'fec': u'scfec'},
211             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
212
213     def test_08_otn_service_path_create_ODU4(self):
214         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
215         data = {"renderer:input": {
216             "service-name": "service_ODU4",
217             "operation": "create",
218             "service-rate": "100G",
219             "service-type": "ODU",
220             "nodes": [
221                 {"node-id": "SPDR-SA1",
222                  "network-tp": "XPDR1-NETWORK1"}]}}
223         response = requests.request(
224             "POST", url, data=json.dumps(data),
225             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
226         time.sleep(3)
227         self.assertEqual(response.status_code, requests.codes.ok)
228         res = response.json()
229         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
230         self.assertTrue(res["output"]["success"])
231         self.assertIn(
232             {'node-id': 'SPDR-SA1',
233              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
234
235     def test_09_get_portmapping_NETWORK1(self):
236         url = ("{}/config/transportpce-portmapping:network/"
237                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
238                .format(test_utils.RESTCONF_BASE_URL))
239         response = requests.request(
240             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
241         self.assertEqual(response.status_code, requests.codes.ok)
242         res = response.json()
243         self.assertIn(
244             {"logical-connection-point": "XPDR1-NETWORK1",
245              "supporting-port": "CP1-CFP0-P1",
246              "supported-interface-capability": [
247                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
248              ],
249                 "port-direction": "bidirectional",
250                 "port-qual": "xpdr-network",
251                 "supporting-circuit-pack-name": "CP1-CFP0",
252                 "xponder-type": "mpdr",
253                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
254                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
255              },
256             res['mapping'])
257
258     def test_10_check_interface_ODU4(self):
259         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
260                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
261                "interface/XPDR1-NETWORK1-ODU4"
262                .format(test_utils.RESTCONF_BASE_URL))
263         response = requests.request(
264             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
265         self.assertEqual(response.status_code, requests.codes.ok)
266         res = response.json()
267         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
268                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
269                         'type': 'org-openroadm-interfaces:otnOdu',
270                         'supporting-port': 'CP1-CFP0-P1'}
271         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
272                         'rate': 'org-openroadm-otn-common-types:ODU4'}
273
274         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
275                                                             res['interface'][0])
276                              )
277         self.assertDictEqual(input_dict_2,
278                              extract_a_from_b(input_dict_2,
279                                               res['interface'][0][
280                                                   'org-openroadm-otn-odu-interfaces:odu'])
281
282                              )
283         '''
284         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
285                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
286                                        'type': 'org-openroadm-interfaces:otnOdu',
287                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
288         self.assertDictContainsSubset(
289             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
290              'rate': 'org-openroadm-otn-common-types:ODU4'},
291             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
292         '''
293         self.assertDictEqual(
294             {u'payload-type': u'21', u'exp-payload-type': u'21'},
295             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
296
297     def test_11_otn_service_path_create_10GE(self):
298         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
299         data = {"renderer:input": {
300             "service-name": "service1",
301             "operation": "create",
302             "service-rate": "10G",
303             "service-type": "Ethernet",
304             "ethernet-encoding": "eth encode",
305             "trib-slot": ["1"],
306             "trib-port-number": "1",
307             "nodes": [
308                 {"node-id": "SPDR-SA1",
309                  "client-tp": "XPDR1-CLIENT1",
310                  "network-tp": "XPDR1-NETWORK1"}]}}
311         response = requests.request(
312             "POST", url, data=json.dumps(data),
313             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
314         time.sleep(3)
315         self.assertEqual(response.status_code, requests.codes.ok)
316         res = response.json()
317         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
318         self.assertTrue(res["output"]["success"])
319         self.assertIn(
320             {'node-id': 'SPDR-SA1',
321              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
322              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
323              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
324
325     def test_12_check_interface_10GE_CLIENT(self):
326         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
327                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
328                "interface/XPDR1-CLIENT1-ETHERNET10G"
329                .format(test_utils.RESTCONF_BASE_URL))
330         response = requests.request(
331             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
332         self.assertEqual(response.status_code, requests.codes.ok)
333         res = response.json()
334         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
335                       'administrative-state': 'inService',
336                       'supporting-circuit-pack-name': 'CP1-SFP4',
337                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
338                       'supporting-port': 'CP1-SFP4-P1'
339                       }
340
341         '''
342         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
343                                        'supporting-circuit-pack-name': 'CP1-SFP4',
344                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
345                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
346         '''
347         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
348                                                           res['interface'][0])
349                              )
350         self.assertDictEqual(
351             {u'speed': 10000},
352             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
353
354     def test_13_check_interface_ODU2E_CLIENT(self):
355         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
356                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
357                "interface/XPDR1-CLIENT1-ODU2e-service1"
358                .format(test_utils.RESTCONF_BASE_URL))
359         response = requests.request(
360             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
361         self.assertEqual(response.status_code, requests.codes.ok)
362         res = response.json()
363
364         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
365                         'administrative-state': 'inService',
366                         'supporting-circuit-pack-name': 'CP1-SFP4',
367                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
368                         'type': 'org-openroadm-interfaces:otnOdu',
369                         'supporting-port': 'CP1-SFP4-P1'}
370         input_dict_2 = {
371             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
372             'rate': 'org-openroadm-otn-common-types:ODU2e',
373             'monitoring-mode': 'terminated'}
374
375         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
376                                                             res['interface'][0])
377                              )
378         self.assertDictEqual(input_dict_2,
379                              extract_a_from_b(input_dict_2, res['interface'][0][
380                                  'org-openroadm-otn-odu-interfaces:odu'])
381                              )
382
383         '''
384         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
385                                        'supporting-circuit-pack-name': 'CP1-SFP4',
386                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
387                                        'type': 'org-openroadm-interfaces:otnOdu',
388                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
389         self.assertDictContainsSubset({
390             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
391             'rate': 'org-openroadm-otn-common-types:ODU2e',
392             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
393         '''
394         self.assertDictEqual(
395             {u'payload-type': u'03', u'exp-payload-type': u'03'},
396             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
397
398     def test_14_check_interface_ODU2E_NETWORK(self):
399         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
400                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
401                "interface/XPDR1-NETWORK1-ODU2e-service1"
402                .format(test_utils.RESTCONF_BASE_URL))
403         response = requests.request(
404             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
405         self.assertEqual(response.status_code, requests.codes.ok)
406         res = response.json()
407         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
408                         'supporting-circuit-pack-name': 'CP1-CFP0',
409                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
410                         'type': 'org-openroadm-interfaces:otnOdu',
411                         'supporting-port': 'CP1-CFP0-P1'}
412         input_dict_2 = {
413             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
414             'rate': 'org-openroadm-otn-common-types:ODU2e',
415             'monitoring-mode': 'monitored'}
416
417         input_dict_3 = {'trib-port-number': 1}
418
419         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
420                                                             res['interface'][0])
421                              )
422
423         self.assertDictEqual(input_dict_2,
424                              extract_a_from_b(input_dict_2,
425                                               res['interface'][0][
426                                                   'org-openroadm-otn-odu-interfaces:odu']
427                                               ))
428
429         self.assertDictEqual(input_dict_3,
430                              extract_a_from_b(input_dict_3,
431                                               res['interface'][0][
432                                                   'org-openroadm-otn-odu-interfaces:odu'][
433                                                   'parent-odu-allocation']))
434
435         '''
436         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
437                                        'supporting-circuit-pack-name': 'CP1-CFP0',
438                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
439                                        'type': 'org-openroadm-interfaces:otnOdu',
440                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
441         self.assertDictContainsSubset({
442             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
443             'rate': 'org-openroadm-otn-common-types:ODU2e',
444             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
445         self.assertDictContainsSubset(
446             {'trib-port-number': 1},
447             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
448         '''
449         self.assertIn(1,
450                       res['interface'][0][
451                           'org-openroadm-otn-odu-interfaces:odu'][
452                           'parent-odu-allocation']['trib-slots'])
453
454     def test_15_check_ODU2E_connection(self):
455         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
456                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
457                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
458                .format(test_utils.RESTCONF_BASE_URL))
459         response = requests.request(
460             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
461         self.assertEqual(response.status_code, requests.codes.ok)
462         res = response.json()
463         input_dict_1 = {
464             'connection-name':
465             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
466             'direction': 'bidirectional'
467         }
468
469         self.assertDictEqual(input_dict_1,
470                              extract_a_from_b(input_dict_1,
471                                               res['odu-connection'][0]))
472         '''
473         self.assertDictContainsSubset({
474             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
475             'direction': 'bidirectional'},
476             res['odu-connection'][0])
477         '''
478         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
479                              res['odu-connection'][0]['destination'])
480         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
481                              res['odu-connection'][0]['source'])
482
483     def test_16_otn_service_path_delete_10GE(self):
484         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
485         data = {"renderer:input": {
486             "service-name": "service1",
487             "operation": "delete",
488             "service-rate": "10G",
489             "service-type": "Ethernet",
490             "ethernet-encoding": "eth encode",
491             "trib-slot": ["1"],
492             "trib-port-number": "1",
493             "nodes": [
494                 {"node-id": "SPDR-SA1",
495                  "client-tp": "XPDR1-CLIENT1",
496                  "network-tp": "XPDR1-NETWORK1"}]}}
497         response = requests.request(
498             "POST", url, data=json.dumps(data),
499             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
500         time.sleep(3)
501         self.assertEqual(response.status_code, requests.codes.ok)
502         res = response.json()
503         self.assertIn('Request processed', res["output"]["result"])
504         self.assertTrue(res["output"]["success"])
505
506     def test_17_check_no_ODU2E_connection(self):
507         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
508                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
509                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
510                .format(test_utils.RESTCONF_BASE_URL))
511         response = requests.request(
512             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
513         self.assertEqual(response.status_code, requests.codes.not_found)
514
515     def test_18_check_no_interface_ODU2E_NETWORK(self):
516         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
517                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
518                "interface/XPDR1-NETWORK1-ODU2e-service1"
519                .format(test_utils.RESTCONF_BASE_URL))
520         response = requests.request(
521             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
522         self.assertEqual(response.status_code, requests.codes.not_found)
523
524     def test_19_check_no_interface_ODU2E_CLIENT(self):
525         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
526                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
527                "interface/XPDR1-CLIENT1-ODU2e-service1"
528                .format(test_utils.RESTCONF_BASE_URL))
529         response = requests.request(
530             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
531         self.assertEqual(response.status_code, requests.codes.not_found)
532
533     def test_20_check_no_interface_10GE_CLIENT(self):
534         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
535                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
536                "interface/XPDR1-CLIENT1-ETHERNET10G"
537                .format(test_utils.RESTCONF_BASE_URL))
538         response = requests.request(
539             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
540         self.assertEqual(response.status_code, requests.codes.not_found)
541
542     def test_21_otn_service_path_delete_ODU4(self):
543         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(test_utils.RESTCONF_BASE_URL)
544         data = {"renderer:input": {
545             "service-name": "service_ODU4",
546             "operation": "delete",
547             "service-rate": "100G",
548             "service-type": "ODU",
549             "nodes": [
550                 {"node-id": "SPDR-SA1",
551                  "network-tp": "XPDR1-NETWORK1"}]}}
552         response = requests.request(
553             "POST", url, data=json.dumps(data),
554             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
555         time.sleep(3)
556         self.assertEqual(response.status_code, requests.codes.ok)
557         res = response.json()
558         self.assertIn('Request processed', res["output"]["result"])
559         self.assertTrue(res["output"]["success"])
560
561     def test_22_check_no_interface_ODU4(self):
562         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
563                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
564                "interface/XPDR1-NETWORK1-ODU4"
565                .format(test_utils.RESTCONF_BASE_URL))
566         response = requests.request(
567             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
568         self.assertEqual(response.status_code, requests.codes.not_found)
569
570     def test_23_service_path_delete_OCH_OTU4(self):
571         url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
572         data = {"renderer:input": {
573             "service-name": "service_OTU4",
574             "wave-number": "1",
575             "modulation-format": "qpsk",
576             "operation": "delete",
577             "nodes": [
578                 {"node-id": "SPDR-SA1",
579                  "dest-tp": "XPDR1-NETWORK1"}]}}
580         response = requests.request(
581             "POST", url, data=json.dumps(data),
582             headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
583         time.sleep(3)
584         self.assertEqual(response.status_code, requests.codes.ok)
585         res = response.json()
586         self.assertIn('Request processed', res["output"]["result"])
587         self.assertTrue(res["output"]["success"])
588
589     def test_24_check_no_interface_OTU4(self):
590         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
591                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
592                "interface/XPDR1-NETWORK1-OTU"
593                .format(test_utils.RESTCONF_BASE_URL))
594         response = requests.request(
595             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
596         self.assertEqual(response.status_code, requests.codes.not_found)
597
598     def test_25_check_no_interface_OCH(self):
599         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
600                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
601                "interface/XPDR1-NETWORK1-1"
602                .format(test_utils.RESTCONF_BASE_URL))
603         response = requests.request(
604             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
605         self.assertEqual(response.status_code, requests.codes.not_found)
606
607     def test_26_disconnect_SPDR_SA1(self):
608         response = test_utils.unmount_device("SPDR-SA1")
609         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
610
611
612 if __name__ == "__main__":
613     unittest.main(verbosity=2)