Device+OTN renderer tests 7.1 w/ intermediate rate 92/99592/1
authorBalagangadhar Bathula <bb4341@att.com>
Tue, 18 Jan 2022 06:43:24 +0000 (01:43 -0500)
committerGilles Thouenon <gilles.thouenon@orange.com>
Fri, 4 Feb 2022 09:13:52 +0000 (10:13 +0100)
- add functional tests to perform device and OTN renderer service path.
- include ODUC2 (ODUC3, ODUC4), OTUC2 (OTUC3, OTUC4) and Ethernet
- even if not needed, update legacy test_utils.service_path_request
  method to support modulation formats used here different from dp-qpsk
- force old draft-bierman02 RESTCONF API use in tox.ini 7.1 tests:
  with RFC8040 URLs, some new tests return a HTTP code 503 (service
  unavailable) instead of expected code 409 (conflict).
  TODO: investigate this problem

JIRA: TRNSPRTPCE-528 TRNSPRTPCE-595
Change-Id: I56d33746f6283d5ca23199c0968ee04c15da8299
Co-authored-by: guillaume.lambert <guillaume.lambert@orange.com>
Signed-off-by: guillaume.lambert <guillaume.lambert@orange.com>
tests/transportpce_tests/7.1/test02_otn_renderer.py [new file with mode: 0644]
tox.ini

diff --git a/tests/transportpce_tests/7.1/test02_otn_renderer.py b/tests/transportpce_tests/7.1/test02_otn_renderer.py
new file mode 100644 (file)
index 0000000..7d77949
--- /dev/null
@@ -0,0 +1,851 @@
+#!/usr/bin/env python
+##############################################################################
+# Copyright (c) 2022 AT&T, Inc. and others.  All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
+import unittest
+import time
+import requests
+# pylint: disable=wrong-import-order
+import sys
+sys.path.append('transportpce_tests/common')
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils_rfc8040  # nopep8
+
+
+class TransportPCE400GPortMappingTesting(unittest.TestCase):
+
+    processes = None
+    NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
+                           "supporting-port": "L1",
+                           "supported-interface-capability": [
+                               "org-openroadm-port-types:if-otsi-otsigroup"
+                           ],
+                           "port-direction": "bidirectional",
+                           "port-qual": "switch-network",
+                           "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+                           "xponder-type": "mpdr",
+                           'lcp-hash-val': 'LY9PxYJqUbw=',
+                           'port-admin-state': 'InService',
+                           'port-oper-state': 'InService'}
+    NODE_VERSION = '7.1'
+
+    @classmethod
+    def setUpClass(cls):
+        cls.processes = test_utils_rfc8040.start_tpce()
+        cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
+
+    @classmethod
+    def tearDownClass(cls):
+        # pylint: disable=not-an-iterable
+        for process in cls.processes:
+            test_utils_rfc8040.shutdown_process(process)
+        print("all processes killed")
+
+    def setUp(self):
+        # pylint: disable=consider-using-f-string
+        print("execution of {}".format(self.id().split(".")[-1]))
+        time.sleep(10)
+
+    def test_01_xpdr_device_connection(self):
+        response = test_utils_rfc8040.mount_device("XPDR-A2",
+                                                   ('xpdra2', self.NODE_VERSION))
+        self.assertEqual(response.status_code, requests.codes.created,
+                         test_utils_rfc8040.CODE_SHOULD_BE_201)
+
+    # Check if the node appears in the ietf-network topology
+    # this test has been removed, since it already exists in port-mapping
+    # 1a) create a OTUC2 device renderer
+    def test_02_service_path_create_otuc2(self):
+        response = test_utils_rfc8040.device_renderer_service_path_request(
+            {'input': {
+                'service-name': 'service_OTUC2',
+                'wave-number': '0',
+                'modulation-format': 'dp-qpsk',
+                'operation': 'create',
+                'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+                'center-freq': 196.1,
+                'nmc-width': 75,
+                'min-freq': 196.0375,
+                'max-freq': 196.125,
+                'lower-spectral-slot-number': 755,
+                'higher-spectral-slot-number': 768
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+        self.assertIn(
+            {'node-id': 'XPDR-A2',
+             'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
+             'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
+
+    def test_03_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response['mapping'])
+
+    def test_04_check_interface_otsi(self):
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'type': 'org-openroadm-interfaces:otsi',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {
+            "frequency": 196.0812,
+            "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
+            "fec": "org-openroadm-common-types:ofec",
+            "transmit-power": -5,
+            "provision-mode": "explicit",
+            "modulation-format": "dp-qpsk"}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+                             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+        self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
+                             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+    def test_05_check_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+                        'type': 'org-openroadm-interfaces:otsi-group',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {"group-id": 1,
+                        "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+                             response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+    def test_06_check_interface_otuc2(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
+                        'type': 'org-openroadm-interfaces:otnOtu',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+                        "degthr-percentage": 100,
+                        "tim-detect-mode": "Disabled",
+                        "otucn-n-rate": 2,
+                        "degm-intervals": 2}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+                             response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+    # 1b) create a ODUC2 device renderer
+    def test_07_otn_service_path_create_oduc2(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_ODUC2',
+                'operation': 'create',
+                'service-rate': '200',
+                'service-format': 'ODU',
+                'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+        self.assertIn(
+            {'node-id': 'XPDR-A2',
+             'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
+
+    def test_08_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response['mapping'])
+
+    def test_09_check_interface_oduc2(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
+                        'type': 'org-openroadm-interfaces:otnOdu',
+                        'supporting-port': 'L1'}
+
+        input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+                        'rate': 'org-openroadm-otn-common-types:ODUCn',
+                        'tx-sapi': 'LY9PxYJqUbw=',
+                        'tx-dapi': 'LY9PxYJqUbw=',
+                        'expected-sapi': 'LY9PxYJqUbw=',
+                        'expected-dapi': 'LY9PxYJqUbw=',
+                        "degm-intervals": 2,
+                        "degthr-percentage": 100,
+                        "monitoring-mode": "terminated",
+                        "oducn-n-rate": 2
+                        }
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+                             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+        self.assertDictEqual(
+            {'payload-type': '22', 'exp-payload-type': '22'},
+            response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+    # 1c) create Ethernet device renderer
+    def test_10_otn_service_path_create_100ge(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_Ethernet',
+                'operation': 'create',
+                'service-rate': '100',
+                'service-format': 'Ethernet',
+                'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
+                'ethernet-encoding': 'eth encode',
+                'opucn-trib-slots': ['1.1', '1.20']
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+        self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
+        self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+                      response['output']['node-interface'][0]['connection-id'])
+        self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
+        self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
+                      response['output']['node-interface'][0]['odu-interface-id'])
+        self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
+                      response['output']['node-interface'][0]['odu-interface-id'])
+
+    def test_11_check_interface_100ge_client(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
+                        'supporting-port': 'C1'
+                        }
+        input_dict_2 = {'speed': 100000}
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
+                             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+    def test_12_check_interface_odu4_client(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+                        'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
+                        'type': 'org-openroadm-interfaces:otnOdu',
+                        'supporting-port': 'C1'}
+        input_dict_2 = {
+            'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+            'rate': 'org-openroadm-otn-common-types:ODU4',
+            'monitoring-mode': 'terminated'}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+                             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+        self.assertDictEqual(
+            {'payload-type': '07', 'exp-payload-type': '07'},
+            response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+    def test_13_check_interface_odu4_network(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
+                        'type': 'org-openroadm-interfaces:otnOdu',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {
+            'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+            'rate': 'org-openroadm-otn-common-types:ODU4',
+            'monitoring-mode': 'not-terminated'}
+        input_dict_3 = {'trib-port-number': 1}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+                             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+        self.assertDictEqual(dict(input_dict_3,
+                                  **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+                                      'parent-odu-allocation']),
+                             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+        self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+                      ['opucn-trib-slots'])
+        self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+                      ['opucn-trib-slots'])
+
+    def test_14_check_odu_connection_xpdra2(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2",
+            "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {
+            'connection-name':
+            'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+            'direction': 'bidirectional'
+        }
+
+        self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+                             response['odu-connection'][0])
+        self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
+                             response['odu-connection'][0]['destination'])
+        self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
+                             response['odu-connection'][0]['source'])
+
+    # 1d) Delete Ethernet device interfaces
+    def test_15_otn_service_path_delete_100ge(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_Ethernet',
+                'operation': 'delete',
+                'service-rate': '100',
+                'service-format': 'Ethernet',
+                'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
+                'ethernet-encoding': 'eth encode',
+                'trib-slot': ['1'],
+                'trib-port-number': '1'
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_16_check_no_odu_connection(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2",
+            "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_17_check_no_interface_odu_network(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_18_check_no_interface_odu_client(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_19_check_no_interface_100ge_client(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # 1e) Delete ODUC2 device interfaces
+    def test_20_otn_service_path_delete_oduc2(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_ODUC2',
+                'operation': 'delete',
+                'service-rate': '200',
+                'service-format': 'ODU',
+                'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_21_check_no_interface_oduc2(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # 1f) Delete OTUC2 device interfaces
+    def test_22_service_path_delete_otuc2(self):
+        response = test_utils_rfc8040.device_renderer_service_path_request(
+            {'input': {
+                'service-name': 'service_OTUC2',
+                'wave-number': '0',
+                'modulation-format': 'dp-qpsk',
+                'operation': 'delete',
+                'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+                'center-freq': 196.1,
+                'nmc-width': 75,
+                'min-freq': 196.0375,
+                'max-freq': 196.125,
+                'lower-spectral-slot-number': 755,
+                'higher-spectral-slot-number': 768
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_23_check_no_interface_otuc2(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_24_check_no_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_25_check_no_interface_otsi(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # 2a) create a OTUC3 device renderer
+    def test_26_service_path_create_otuc3(self):
+        response = test_utils_rfc8040.device_renderer_service_path_request(
+            {'input': {
+                'service-name': 'service_OTUC3',
+                'wave-number': '0',
+                'modulation-format': 'dp-qam8',
+                'operation': 'create',
+                'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+                'center-freq': 196.1,
+                'nmc-width': 75,
+                'min-freq': 196.0375,
+                'max-freq': 196.125,
+                'lower-spectral-slot-number': 755,
+                'higher-spectral-slot-number': 768
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+        self.assertIn(
+            {'node-id': 'XPDR-A2',
+             'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
+             'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
+
+    def test_27_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response['mapping'])
+
+    def test_28_check_interface_otsi(self):
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'type': 'org-openroadm-interfaces:otsi',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {
+            "frequency": 196.0812,
+            "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
+            "fec": "org-openroadm-common-types:ofec",
+            "transmit-power": -5,
+            "provision-mode": "explicit",
+            "modulation-format": "dp-qam8"}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+                             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+        self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
+                             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+    def test_29_check_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+                        'type': 'org-openroadm-interfaces:otsi-group',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {"group-id": 1,
+                        "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+                             response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+    def test_30_check_interface_otuc3(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
+                        'type': 'org-openroadm-interfaces:otnOtu',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+                        "degthr-percentage": 100,
+                        "tim-detect-mode": "Disabled",
+                        "otucn-n-rate": 3,
+                        "degm-intervals": 2}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+                             response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+    # 2b) create a ODUC3 device renderer
+    def test_31_otn_service_path_create_oduc3(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_ODUC3',
+                'operation': 'create',
+                'service-rate': '300',
+                'service-format': 'ODU',
+                'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+        self.assertIn(
+            {'node-id': 'XPDR-A2',
+             'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
+
+    def test_32_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response['mapping'])
+
+    def test_33_check_interface_oduc3(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
+                        'type': 'org-openroadm-interfaces:otnOdu',
+                        'supporting-port': 'L1'}
+
+        input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+                        'rate': 'org-openroadm-otn-common-types:ODUCn',
+                        'tx-sapi': 'LY9PxYJqUbw=',
+                        'tx-dapi': 'LY9PxYJqUbw=',
+                        'expected-sapi': 'LY9PxYJqUbw=',
+                        'expected-dapi': 'LY9PxYJqUbw=',
+                        "degm-intervals": 2,
+                        "degthr-percentage": 100,
+                        "monitoring-mode": "terminated",
+                        "oducn-n-rate": 3
+                        }
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+                             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+        self.assertDictEqual(
+            {'payload-type': '22', 'exp-payload-type': '22'},
+            response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+    # 2c) create Ethernet device renderer
+    # No change in the ethernet device renderer so skipping those tests
+    # 2d) Delete Ethernet device interfaces
+    # No change in the ethernet device renderer so skipping those tests
+
+    # 2e) Delete ODUC3 device interfaces
+    def test_34_otn_service_path_delete_oduc3(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_ODUC3',
+                'operation': 'delete',
+                'service-rate': '300',
+                'service-format': 'ODU',
+                'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_35_check_no_interface_oduc3(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # 2f) Delete OTUC3 device interfaces
+    def test_36_service_path_delete_otuc3(self):
+        response = test_utils_rfc8040.device_renderer_service_path_request(
+            {'input': {
+                'service-name': 'service_OTUC3',
+                'wave-number': '0',
+                'modulation-format': 'dp-qam8',
+                'operation': 'delete',
+                'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+                'center-freq': 196.1,
+                'nmc-width': 75,
+                'min-freq': 196.0375,
+                'max-freq': 196.125,
+                'lower-spectral-slot-number': 755,
+                'higher-spectral-slot-number': 768
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_37_check_no_interface_otuc3(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_38_check_no_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_39_check_no_interface_otsi(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # 3a) create a OTUC4 device renderer
+    def test_40_service_path_create_otuc3(self):
+        response = test_utils_rfc8040.device_renderer_service_path_request(
+            {'input': {
+                'service-name': 'service_OTUC4',
+                'wave-number': '0',
+                'modulation-format': 'dp-qam16',
+                'operation': 'create',
+                'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+                'center-freq': 196.1,
+                'nmc-width': 75,
+                'min-freq': 196.0375,
+                'max-freq': 196.125,
+                'lower-spectral-slot-number': 755,
+                'higher-spectral-slot-number': 768
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
+        self.assertIn(
+            {'node-id': 'XPDR-A2',
+             'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
+             'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
+
+    def test_41_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response['mapping'])
+
+    def test_42_check_interface_otsi(self):
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'type': 'org-openroadm-interfaces:otsi',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {
+            "frequency": 196.0812,
+            "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
+            "fec": "org-openroadm-common-types:ofec",
+            "transmit-power": -5,
+            "provision-mode": "explicit",
+            "modulation-format": "dp-qam16"}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
+                             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
+        self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
+                             response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
+
+    def test_43_check_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
+                        'type': 'org-openroadm-interfaces:otsi-group',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {"group-id": 1,
+                        "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
+                             response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
+
+    def test_44_check_interface_otuc4(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
+                        'type': 'org-openroadm-interfaces:otnOtu',
+                        'supporting-port': 'L1'}
+        input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+                        "degthr-percentage": 100,
+                        "tim-detect-mode": "Disabled",
+                        "otucn-n-rate": 4,
+                        "degm-intervals": 2}
+
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+                             response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+
+    # 3b) create a ODUC4 device renderer
+    def test_45_otn_service_path_create_oduc3(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_ODUC4',
+                'operation': 'create',
+                'service-rate': '400',
+                'service-format': 'ODU',
+                'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
+        self.assertIn(
+            {'node-id': 'XPDR-A2',
+             'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
+
+    def test_46_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response['mapping'])
+
+    def test_47_check_interface_oduc4(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+        self.assertEqual(response['status_code'], requests.codes.ok)
+
+        input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
+                        'administrative-state': 'inService',
+                        'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+                        'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
+                        'type': 'org-openroadm-interfaces:otnOdu',
+                        'supporting-port': 'L1'}
+
+        input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
+                        'rate': 'org-openroadm-otn-common-types:ODUCn',
+                        'tx-sapi': 'LY9PxYJqUbw=',
+                        'tx-dapi': 'LY9PxYJqUbw=',
+                        'expected-sapi': 'LY9PxYJqUbw=',
+                        'expected-dapi': 'LY9PxYJqUbw=',
+                        "degm-intervals": 2,
+                        "degthr-percentage": 100,
+                        "monitoring-mode": "terminated",
+                        "oducn-n-rate": 4
+                        }
+        self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+                             response['interface'][0])
+        self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+                             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+        self.assertDictEqual(
+            {'payload-type': '22', 'exp-payload-type': '22'},
+            response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+    # 3c) create Ethernet device renderer
+    # No change in the ethernet device renderer so skipping those tests
+    # 3d) Delete Ethernet device interfaces
+    # No change in the ethernet device renderer so skipping those tests
+
+    # 3e) Delete ODUC4 device interfaces
+    def test_48_otn_service_path_delete_oduc4(self):
+        response = test_utils_rfc8040.device_renderer_otn_service_path_request(
+            {'input': {
+                'service-name': 'service_ODUC4',
+                'operation': 'delete',
+                'service-rate': '400',
+                'service-format': 'ODU',
+                'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_49_check_no_interface_oduc4(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # 3f) Delete OTUC4 device interfaces
+    def test_50_service_path_delete_otuc4(self):
+        response = test_utils_rfc8040.device_renderer_service_path_request(
+            {'input': {
+                'service-name': 'service_OTUC4',
+                'wave-number': '0',
+                'modulation-format': 'dp-qam16',
+                'operation': 'delete',
+                'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
+                'center-freq': 196.1,
+                'nmc-width': 75,
+                'min-freq': 196.0375,
+                'max-freq': 196.125,
+                'lower-spectral-slot-number': 755,
+                'higher-spectral-slot-number': 768
+            }})
+        self.assertEqual(response['status_code'], requests.codes.ok)
+        self.assertIn('Request processed', response['output']['result'])
+
+    def test_51_check_no_interface_otuc4(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_52_check_no_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    def test_53_check_no_interface_otsi(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+
+    # Disconnect the XPDR
+    def test_54_xpdr_device_disconnection(self):
+        response = test_utils_rfc8040.unmount_device("XPDR-A2")
+        self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
+
+    def test_55_xpdr_device_disconnected(self):
+        response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+        self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
+        self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
+        self.assertEqual(response['connection-status']['error-message'],
+                         'Request could not be completed because the relevant data model content does not exist')
+
+    def test_56_xpdr_device_not_connected(self):
+        response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+        self.assertEqual(response['status_code'], requests.codes.conflict)
+        self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
+        self.assertEqual(response['node-info']['error-tag'], 'data-missing')
+        self.assertEqual(response['node-info']['error-message'],
+                         'Request could not be completed because the relevant data model content does not exist')
+
+
+if __name__ == '__main__':
+    unittest.main(verbosity=2)
diff --git a/tox.ini b/tox.ini
index 41bc8670a74b545e00cad6015523c1f534eda9c0..db710a348e18b210964f646975500be76cf99fc7 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -148,7 +148,7 @@ whitelist_externals = launch_tests.sh
 passenv = LAUNCHER USE_LIGHTY USE_ODL_RESTCONF_VERSION
 setenv =
 #    USE_LIGHTY=True
-#    USE_ODL_RESTCONF_VERSION=draft-bierman02
+    USE_ODL_RESTCONF_VERSION=draft-bierman02
     USE_ODL_ALT_KARAF_ENV=./karaf71.env
     USE_ODL_ALT_KARAF_INSTALL_DIR=karaf71
 commands =