Align 1.2.1 tests sims/tpce management to 2.2.1
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 from common import test_utils
23
24
25 def extract_a_from_b(a, b):
26     return dict([(i, b[i]) for i in b.keys() if i in a.keys()])
27
28
29 class TransportPCEtesting(unittest.TestCase):
30
31     processes = None
32     restconf_baseurl = "http://localhost:8181/restconf"
33
34     @classmethod
35     def setUpClass(cls):
36         cls.processes = test_utils.start_tpce()
37         cls.processes = test_utils.start_sims(['spdrav2'])
38
39     @classmethod
40     def tearDownClass(cls):
41         for process in cls.processes:
42             test_utils.shutdown_process(process)
43         print("all processes killed")
44
45     def setUp(self):
46         time.sleep(5)
47
48     def test_01_connect_SPDR_SA1(self):
49         url = ("{}/config/network-topology:"
50                "network-topology/topology/topology-netconf/node/SPDR-SA1"
51                .format(self.restconf_baseurl))
52         data = {"node": [{
53             "node-id": "SPDR-SA1",
54             "netconf-node-topology:username": "admin",
55             "netconf-node-topology:password": "admin",
56             "netconf-node-topology:host": "127.0.0.1",
57             "netconf-node-topology:port": test_utils.sims['spdrav2']['port'],
58             "netconf-node-topology:tcp-only": "false",
59             "netconf-node-topology:pass-through": {}}]}
60         headers = {'content-type': 'application/json'}
61         response = requests.request(
62             "PUT", url, data=json.dumps(data), headers=headers,
63             auth=('admin', 'admin'))
64         self.assertEqual(response.status_code, requests.codes.created)
65         time.sleep(10)
66         url = ("{}/operational/network-topology:"
67                "network-topology/topology/topology-netconf/node/SPDR-SA1"
68                .format(self.restconf_baseurl))
69         response = requests.request(
70             "GET", url, headers=headers, auth=('admin', 'admin'))
71         self.assertEqual(response.status_code, requests.codes.ok)
72         res = response.json()
73         self.assertEqual(
74             res['node'][0]['netconf-node-topology:connection-status'],
75             'connected')
76
77     def test_02_get_portmapping_CLIENT1(self):
78         url = ("{}/config/transportpce-portmapping:network/"
79                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
80                .format(self.restconf_baseurl))
81         headers = {'content-type': 'application/json'}
82         response = requests.request(
83             "GET", url, headers=headers, auth=('admin', 'admin'))
84         self.assertEqual(response.status_code, requests.codes.ok)
85         res = response.json()
86         self.assertIn(
87             {'supported-interface-capability': [
88                 'org-openroadm-port-types:if-10GE-ODU2e',
89                 'org-openroadm-port-types:if-10GE-ODU2',
90                 'org-openroadm-port-types:if-10GE'],
91              'supporting-port': 'CP1-SFP4-P1',
92              'supporting-circuit-pack-name': 'CP1-SFP4',
93              'logical-connection-point': 'XPDR1-CLIENT1',
94              'port-direction': 'bidirectional',
95              'port-qual': 'xpdr-client',
96              'lcp-hash-val': '8b3efff522736722500b5e68fb6e696e'},
97             res['mapping'])
98
99     def test_03_get_portmapping_NETWORK1(self):
100         url = ("{}/config/transportpce-portmapping:network/"
101                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
102                .format(self.restconf_baseurl))
103         headers = {'content-type': 'application/json'}
104         response = requests.request(
105             "GET", url, headers=headers, auth=('admin', 'admin'))
106         self.assertEqual(response.status_code, requests.codes.ok)
107         res = response.json()
108         self.assertIn(
109             {"logical-connection-point": "XPDR1-NETWORK1",
110              "supporting-port": "CP1-CFP0-P1",
111              "supported-interface-capability": [
112                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
113              ],
114                 "port-direction": "bidirectional",
115                 "port-qual": "xpdr-network",
116                 "supporting-circuit-pack-name": "CP1-CFP0",
117                 "xponder-type": "mpdr",
118              'lcp-hash-val': '1021db8d2affe7386705c438c67ea21f'},
119             res['mapping'])
120
121     def test_04_service_path_create_OCH_OTU4(self):
122         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
123         data = {"renderer:input": {
124             "service-name": "service_ODU4",
125             "wave-number": "1",
126             "modulation-format": "qpsk",
127             "operation": "create",
128             "nodes": [
129                 {"node-id": "SPDR-SA1",
130                  "dest-tp": "XPDR1-NETWORK1"}]}}
131         headers = {'content-type': 'application/json'}
132         response = requests.request(
133             "POST", url, data=json.dumps(data),
134             headers=headers, auth=('admin', 'admin'))
135         time.sleep(3)
136         self.assertEqual(response.status_code, requests.codes.ok)
137         res = response.json()
138         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
139         self.assertTrue(res["output"]["success"])
140         self.assertIn(
141             {'node-id': 'SPDR-SA1',
142              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
143              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
144
145     def test_05_get_portmapping_NETWORK1(self):
146         url = ("{}/config/transportpce-portmapping:network/"
147                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
148                .format(self.restconf_baseurl))
149         headers = {'content-type': 'application/json'}
150         response = requests.request(
151             "GET", url, headers=headers, auth=('admin', 'admin'))
152         self.assertEqual(response.status_code, requests.codes.ok)
153         res = response.json()
154         self.assertIn(
155             {"logical-connection-point": "XPDR1-NETWORK1",
156              "supporting-port": "CP1-CFP0-P1",
157              "supported-interface-capability": [
158                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
159              ],
160                 "port-direction": "bidirectional",
161                 "port-qual": "xpdr-network",
162                 "supporting-circuit-pack-name": "CP1-CFP0",
163                 "xponder-type": "mpdr",
164                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"},
165             res['mapping'])
166
167     def test_06_check_interface_och(self):
168         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
169                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
170                "interface/XPDR1-NETWORK1-1"
171                .format(self.restconf_baseurl))
172         headers = {'content-type': 'application/json'}
173         response = requests.request(
174             "GET", url, headers=headers, auth=('admin', 'admin'))
175         self.assertEqual(response.status_code, requests.codes.ok)
176         res = response.json()
177
178         input_dict = {'name': 'XPDR1-NETWORK1-1',
179                       'administrative-state': 'inService',
180                       'supporting-circuit-pack-name': 'CP1-CFP0',
181                       'type': 'org-openroadm-interfaces:opticalChannel',
182                       'supporting-port': 'CP1-CFP0-P1'
183                       }
184         # assertDictContainsSubset is deprecated
185         '''
186         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
187                                        'supporting-circuit-pack-name': 'CP1-CFP0',
188                                        'type': 'org-openroadm-interfaces:opticalChannel',
189                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
190         '''
191         self.assertDictEqual(input_dict,
192                              extract_a_from_b(input_dict,
193                                               res['interface'][0])
194                              )
195         self.assertDictEqual(
196             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
197              u'transmit-power': -5},
198             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
199
200     def test_07_check_interface_OTU(self):
201         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
202                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
203                "interface/XPDR1-NETWORK1-OTU"
204                .format(self.restconf_baseurl))
205         headers = {'content-type': 'application/json'}
206         response = requests.request(
207             "GET", url, headers=headers, auth=('admin', 'admin'))
208         self.assertEqual(response.status_code, requests.codes.ok)
209         res = response.json()
210         input_dict = {'name': 'XPDR1-NETWORK1-OTU',
211                       'administrative-state': 'inService',
212                       'supporting-circuit-pack-name': 'CP1-CFP0',
213                       'supporting-interface': 'XPDR1-NETWORK1-1',
214                       'type': 'org-openroadm-interfaces:otnOtu',
215                       'supporting-port': 'CP1-CFP0-P1'}
216
217         # assertDictContainsSubset is deprecated
218         '''
219         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
220                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
221                                        'type': 'org-openroadm-interfaces:otnOtu',
222                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
223         '''
224         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
225                                                           res['interface'][0])
226                              )
227
228         self.assertDictEqual(
229             {u'rate': u'org-openroadm-otn-common-types:OTU4',
230              u'fec': u'scfec'},
231             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
232
233     def test_08_otn_service_path_create_ODU4(self):
234         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
235         data = {"renderer:input": {
236             "service-name": "service_ODU4",
237             "operation": "create",
238             "service-rate": "100G",
239             "service-type": "ODU",
240             "nodes": [
241                 {"node-id": "SPDR-SA1",
242                  "network-tp": "XPDR1-NETWORK1"}]}}
243         headers = {'content-type': 'application/json'}
244         response = requests.request(
245             "POST", url, data=json.dumps(data),
246             headers=headers, auth=('admin', 'admin'))
247         time.sleep(3)
248         self.assertEqual(response.status_code, requests.codes.ok)
249         res = response.json()
250         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
251         self.assertTrue(res["output"]["success"])
252         self.assertIn(
253             {'node-id': 'SPDR-SA1',
254              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
255
256     def test_09_get_portmapping_NETWORK1(self):
257         url = ("{}/config/transportpce-portmapping:network/"
258                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
259                .format(self.restconf_baseurl))
260         headers = {'content-type': 'application/json'}
261         response = requests.request(
262             "GET", url, headers=headers, auth=('admin', 'admin'))
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         self.assertIn(
266             {"logical-connection-point": "XPDR1-NETWORK1",
267              "supporting-port": "CP1-CFP0-P1",
268              "supported-interface-capability": [
269                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
270              ],
271                 "port-direction": "bidirectional",
272                 "port-qual": "xpdr-network",
273                 "supporting-circuit-pack-name": "CP1-CFP0",
274                 "xponder-type": "mpdr",
275                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
276                 "lcp-hash-val": "1021db8d2affe7386705c438c67ea21f"
277              },
278             res['mapping'])
279
280     def test_10_check_interface_ODU4(self):
281         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
282                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
283                "interface/XPDR1-NETWORK1-ODU4"
284                .format(self.restconf_baseurl))
285         headers = {'content-type': 'application/json'}
286         response = requests.request(
287             "GET", url, headers=headers, auth=('admin', 'admin'))
288         self.assertEqual(response.status_code, requests.codes.ok)
289         res = response.json()
290         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
291                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
292                         'type': 'org-openroadm-interfaces:otnOdu',
293                         'supporting-port': 'CP1-CFP0-P1'}
294         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
295                         'rate': 'org-openroadm-otn-common-types:ODU4'}
296
297         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
298                                                             res['interface'][0])
299                              )
300         self.assertDictEqual(input_dict_2,
301                              extract_a_from_b(input_dict_2,
302                                               res['interface'][0][
303                                                   'org-openroadm-otn-odu-interfaces:odu'])
304
305                              )
306         '''
307         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
308                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
309                                        'type': 'org-openroadm-interfaces:otnOdu',
310                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
311         self.assertDictContainsSubset(
312             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
313              'rate': 'org-openroadm-otn-common-types:ODU4'},
314             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
315         '''
316         self.assertDictEqual(
317             {u'payload-type': u'21', u'exp-payload-type': u'21'},
318             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
319
320     def test_11_otn_service_path_create_10GE(self):
321         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
322         data = {"renderer:input": {
323             "service-name": "service1",
324             "operation": "create",
325             "service-rate": "10G",
326             "service-type": "Ethernet",
327             "ethernet-encoding": "eth encode",
328             "trib-slot": ["1"],
329             "trib-port-number": "1",
330             "nodes": [
331                 {"node-id": "SPDR-SA1",
332                  "client-tp": "XPDR1-CLIENT1",
333                  "network-tp": "XPDR1-NETWORK1"}]}}
334         headers = {'content-type': 'application/json'}
335         response = requests.request(
336             "POST", url, data=json.dumps(data),
337             headers=headers, auth=('admin', 'admin'))
338         time.sleep(3)
339         self.assertEqual(response.status_code, requests.codes.ok)
340         res = response.json()
341         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
342         self.assertTrue(res["output"]["success"])
343         self.assertIn(
344             {'node-id': 'SPDR-SA1',
345              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
346              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
347              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
348
349     def test_12_check_interface_10GE_CLIENT(self):
350         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
351                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
352                "interface/XPDR1-CLIENT1-ETHERNET10G"
353                .format(self.restconf_baseurl))
354         headers = {'content-type': 'application/json'}
355         response = requests.request(
356             "GET", url, headers=headers, auth=('admin', 'admin'))
357         self.assertEqual(response.status_code, requests.codes.ok)
358         res = response.json()
359         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
360                       'administrative-state': 'inService',
361                       'supporting-circuit-pack-name': 'CP1-SFP4',
362                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
363                       'supporting-port': 'CP1-SFP4-P1'
364                       }
365
366         '''
367         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
368                                        'supporting-circuit-pack-name': 'CP1-SFP4',
369                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
370                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
371         '''
372         self.assertDictEqual(input_dict, extract_a_from_b(input_dict,
373                                                           res['interface'][0])
374                              )
375         self.assertDictEqual(
376             {u'speed': 10000},
377             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
378
379     def test_13_check_interface_ODU2E_CLIENT(self):
380         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
381                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
382                "interface/XPDR1-CLIENT1-ODU2e-service1"
383                .format(self.restconf_baseurl))
384         headers = {'content-type': 'application/json'}
385         response = requests.request(
386             "GET", url, headers=headers, auth=('admin', 'admin'))
387         self.assertEqual(response.status_code, requests.codes.ok)
388         res = response.json()
389
390         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
391                         'administrative-state': 'inService',
392                         'supporting-circuit-pack-name': 'CP1-SFP4',
393                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
394                         'type': 'org-openroadm-interfaces:otnOdu',
395                         'supporting-port': 'CP1-SFP4-P1'}
396         input_dict_2 = {
397             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
398             'rate': 'org-openroadm-otn-common-types:ODU2e',
399             'monitoring-mode': 'terminated'}
400
401         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
402                                                             res['interface'][0])
403                              )
404         self.assertDictEqual(input_dict_2,
405                              extract_a_from_b(input_dict_2, res['interface'][0][
406                                  'org-openroadm-otn-odu-interfaces:odu'])
407                              )
408
409         '''
410         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
411                                        'supporting-circuit-pack-name': 'CP1-SFP4',
412                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
413                                        'type': 'org-openroadm-interfaces:otnOdu',
414                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
415         self.assertDictContainsSubset({
416             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
417             'rate': 'org-openroadm-otn-common-types:ODU2e',
418             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
419         '''
420         self.assertDictEqual(
421             {u'payload-type': u'03', u'exp-payload-type': u'03'},
422             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
423
424     def test_14_check_interface_ODU2E_NETWORK(self):
425         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
426                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
427                "interface/XPDR1-NETWORK1-ODU2e-service1"
428                .format(self.restconf_baseurl))
429         headers = {'content-type': 'application/json'}
430         response = requests.request(
431             "GET", url, headers=headers, auth=('admin', 'admin'))
432         self.assertEqual(response.status_code, requests.codes.ok)
433         res = response.json()
434         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
435                         'supporting-circuit-pack-name': 'CP1-CFP0',
436                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
437                         'type': 'org-openroadm-interfaces:otnOdu',
438                         'supporting-port': 'CP1-CFP0-P1'}
439         input_dict_2 = {
440             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
441             'rate': 'org-openroadm-otn-common-types:ODU2e',
442             'monitoring-mode': 'monitored'}
443
444         input_dict_3 = {'trib-port-number': 1}
445
446         self.assertDictEqual(input_dict_1, extract_a_from_b(input_dict_1,
447                                                             res['interface'][0])
448                              )
449
450         self.assertDictEqual(input_dict_2,
451                              extract_a_from_b(input_dict_2,
452                                               res['interface'][0][
453                                                   'org-openroadm-otn-odu-interfaces:odu']
454                                               ))
455
456         self.assertDictEqual(input_dict_3,
457                              extract_a_from_b(input_dict_3,
458                                               res['interface'][0][
459                                                   'org-openroadm-otn-odu-interfaces:odu'][
460                                                   'parent-odu-allocation']))
461
462         '''
463         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
464                                        'supporting-circuit-pack-name': 'CP1-CFP0',
465                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
466                                        'type': 'org-openroadm-interfaces:otnOdu',
467                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
468         self.assertDictContainsSubset({
469             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
470             'rate': 'org-openroadm-otn-common-types:ODU2e',
471             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
472         self.assertDictContainsSubset(
473             {'trib-port-number': 1},
474             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
475         '''
476         self.assertIn(1,
477                       res['interface'][0][
478                           'org-openroadm-otn-odu-interfaces:odu'][
479                           'parent-odu-allocation']['trib-slots'])
480
481     def test_15_check_ODU2E_connection(self):
482         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
483                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
484                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
485                .format(self.restconf_baseurl))
486         headers = {'content-type': 'application/json'}
487         response = requests.request(
488             "GET", url, headers=headers, auth=('admin', 'admin'))
489         self.assertEqual(response.status_code, requests.codes.ok)
490         res = response.json()
491         input_dict_1 = {
492             'connection-name':
493             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
494             'direction': 'bidirectional'
495         }
496
497         self.assertDictEqual(input_dict_1,
498                              extract_a_from_b(input_dict_1,
499                                               res['odu-connection'][0]))
500         '''
501         self.assertDictContainsSubset({
502             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
503             'direction': 'bidirectional'},
504             res['odu-connection'][0])
505         '''
506         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
507                              res['odu-connection'][0]['destination'])
508         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
509                              res['odu-connection'][0]['source'])
510
511     def test_16_otn_service_path_delete_10GE(self):
512         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
513         data = {"renderer:input": {
514             "service-name": "service1",
515             "operation": "delete",
516             "service-rate": "10G",
517             "service-type": "Ethernet",
518             "ethernet-encoding": "eth encode",
519             "trib-slot": ["1"],
520             "trib-port-number": "1",
521             "nodes": [
522                 {"node-id": "SPDR-SA1",
523                  "client-tp": "XPDR1-CLIENT1",
524                  "network-tp": "XPDR1-NETWORK1"}]}}
525         headers = {'content-type': 'application/json'}
526         response = requests.request(
527             "POST", url, data=json.dumps(data),
528             headers=headers, auth=('admin', 'admin'))
529         time.sleep(3)
530         self.assertEqual(response.status_code, requests.codes.ok)
531         res = response.json()
532         self.assertIn('Request processed', res["output"]["result"])
533         self.assertTrue(res["output"]["success"])
534
535     def test_17_check_no_ODU2E_connection(self):
536         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
537                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
538                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
539                .format(self.restconf_baseurl))
540         headers = {'content-type': 'application/json'}
541         response = requests.request(
542             "GET", url, headers=headers, auth=('admin', 'admin'))
543         self.assertEqual(response.status_code, requests.codes.not_found)
544
545     def test_18_check_no_interface_ODU2E_NETWORK(self):
546         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
547                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
548                "interface/XPDR1-NETWORK1-ODU2e-service1"
549                .format(self.restconf_baseurl))
550         headers = {'content-type': 'application/json'}
551         response = requests.request(
552             "GET", url, headers=headers, auth=('admin', 'admin'))
553         self.assertEqual(response.status_code, requests.codes.not_found)
554
555     def test_19_check_no_interface_ODU2E_CLIENT(self):
556         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
557                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
558                "interface/XPDR1-CLIENT1-ODU2e-service1"
559                .format(self.restconf_baseurl))
560         headers = {'content-type': 'application/json'}
561         response = requests.request(
562             "GET", url, headers=headers, auth=('admin', 'admin'))
563         self.assertEqual(response.status_code, requests.codes.not_found)
564
565     def test_20_check_no_interface_10GE_CLIENT(self):
566         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
567                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
568                "interface/XPDR1-CLIENT1-ETHERNET10G"
569                .format(self.restconf_baseurl))
570         headers = {'content-type': 'application/json'}
571         response = requests.request(
572             "GET", url, headers=headers, auth=('admin', 'admin'))
573         self.assertEqual(response.status_code, requests.codes.not_found)
574
575     def test_21_otn_service_path_delete_ODU4(self):
576         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
577         data = {"renderer:input": {
578             "service-name": "service_ODU4",
579             "operation": "delete",
580             "service-rate": "100G",
581             "service-type": "ODU",
582             "nodes": [
583                 {"node-id": "SPDR-SA1",
584                  "network-tp": "XPDR1-NETWORK1"}]}}
585         headers = {'content-type': 'application/json'}
586         response = requests.request(
587             "POST", url, data=json.dumps(data),
588             headers=headers, auth=('admin', 'admin'))
589         time.sleep(3)
590         self.assertEqual(response.status_code, requests.codes.ok)
591         res = response.json()
592         self.assertIn('Request processed', res["output"]["result"])
593         self.assertTrue(res["output"]["success"])
594
595     def test_22_check_no_interface_ODU4(self):
596         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
597                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
598                "interface/XPDR1-NETWORK1-ODU4"
599                .format(self.restconf_baseurl))
600         headers = {'content-type': 'application/json'}
601         response = requests.request(
602             "GET", url, headers=headers, auth=('admin', 'admin'))
603         self.assertEqual(response.status_code, requests.codes.not_found)
604
605     def test_23_service_path_delete_OCH_OTU4(self):
606         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
607         data = {"renderer:input": {
608             "service-name": "service_OTU4",
609             "wave-number": "1",
610             "modulation-format": "qpsk",
611             "operation": "delete",
612             "nodes": [
613                 {"node-id": "SPDR-SA1",
614                  "dest-tp": "XPDR1-NETWORK1"}]}}
615         headers = {'content-type': 'application/json'}
616         response = requests.request(
617             "POST", url, data=json.dumps(data),
618             headers=headers, auth=('admin', 'admin'))
619         time.sleep(3)
620         self.assertEqual(response.status_code, requests.codes.ok)
621         res = response.json()
622         self.assertIn('Request processed', res["output"]["result"])
623         self.assertTrue(res["output"]["success"])
624
625     def test_24_check_no_interface_OTU4(self):
626         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
627                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
628                "interface/XPDR1-NETWORK1-OTU"
629                .format(self.restconf_baseurl))
630         headers = {'content-type': 'application/json'}
631         response = requests.request(
632             "GET", url, headers=headers, auth=('admin', 'admin'))
633         self.assertEqual(response.status_code, requests.codes.not_found)
634
635     def test_25_check_no_interface_OCH(self):
636         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
637                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
638                "interface/XPDR1-NETWORK1-1"
639                .format(self.restconf_baseurl))
640         headers = {'content-type': 'application/json'}
641         response = requests.request(
642             "GET", url, headers=headers, auth=('admin', 'admin'))
643         self.assertEqual(response.status_code, requests.codes.not_found)
644
645     def test_26_disconnect_SPDR_SA1(self):
646         url = ("{}/config/network-topology:"
647                "network-topology/topology/topology-netconf/node/SPDR-SA1"
648                .format(self.restconf_baseurl))
649         data = {}
650         headers = {'content-type': 'application/json'}
651         response = requests.request(
652             "DELETE", url, data=json.dumps(data), headers=headers,
653             auth=('admin', 'admin'))
654         self.assertEqual(response.status_code, requests.codes.ok)
655
656
657 if __name__ == "__main__":
658     unittest.main(verbosity=2)