Refactor tests launching procedure
[transportpce.git] / tests / transportpce_tests / 7.1 / test01_portmapping.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2021 AT&T, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 import unittest
15 import time
16 import requests
17 import sys
18 sys.path.append('transportpce_tests/common')
19 import test_utils
20
21
22 class TransportPCE400GPortMappingTesting(unittest.TestCase):
23
24     processes = None
25     NODE_VERSION = '7.1'
26
27     @classmethod
28     def setUpClass(cls):
29         cls.processes = test_utils.start_tpce()
30         cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION)])
31
32     @classmethod
33     def tearDownClass(cls):
34         # pylint: disable=not-an-iterable
35         for process in cls.processes:
36             test_utils.shutdown_process(process)
37         print("all processes killed")
38
39     def setUp(self):
40         print("execution of {}".format(self.id().split(".")[-1]))
41         time.sleep(10)
42
43     def test_01_xpdr_device_connection(self):
44         response = test_utils.mount_device("XPDR-A2",
45                                            ('xpdra2', self.NODE_VERSION))
46         self.assertEqual(response.status_code, requests.codes.created,
47                          test_utils.CODE_SHOULD_BE_201)
48
49     # Check if the node appears in the ietf-network topology
50     def test_02_xpdr_device_connected(self):
51         response = test_utils.get_netconf_oper_request("XPDR-A2")
52         self.assertEqual(response.status_code, requests.codes.ok)
53         res = response.json()
54         self.assertEqual(
55             res['node'][0]['netconf-node-topology:connection-status'],
56             'connected')
57         time.sleep(10)
58
59     # Check node info in the port-mappings
60     def test_03_xpdr_portmapping_info(self):
61         response = test_utils.portmapping_request("XPDR-A2/node-info")
62         self.assertEqual(response.status_code, requests.codes.ok)
63         res = response.json()
64         self.assertEqual(
65             {u'node-info': {u'node-type': u'xpdr',
66                             u'node-ip-address': u'1.2.3.4',
67                             u'node-clli': u'NodeA',
68                             u'openroadm-version': u'7.1',
69                             u'node-vendor': u'vendorA',
70                             u'node-model': u'model'}},
71             res)
72         time.sleep(3)
73
74     # Check the if-capabilities and the other details for network
75     def test_04_tpdr_portmapping_NETWORK1(self):
76         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR1-NETWORK1")
77         self.assertEqual(response.status_code, requests.codes.ok)
78         res = response.json()
79         self.assertIn(
80             {'supported-interface-capability':
81               ['org-openroadm-port-types:if-otsi-otsigroup'],
82              'supporting-port': 'L1',
83              'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
84              'logical-connection-point': 'XPDR1-NETWORK1',
85              'port-qual': 'xpdr-network',
86              'port-direction': 'bidirectional',
87              'connection-map-lcp': 'XPDR1-CLIENT1',
88              'lcp-hash-val': 'AIGiVAQ4gDil',
89              'port-admin-state': 'InService',
90              'port-oper-state': 'InService',
91              'xponder-type': 'tpdr'
92              },
93             res['mapping'])
94
95     def test_05_tpdr_portmapping_CLIENT1(self):
96         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR1-CLIENT1")
97         self.assertEqual(response.status_code, requests.codes.ok)
98         res = response.json()
99         self.assertIn(
100             {'supported-interface-capability': ['org-openroadm-port-types:if-400GE'],
101              'supporting-port': 'C1',
102              'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
103              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
104              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
105              'lcp-hash-val': 'AODABTVSOHH0',
106              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
107             res['mapping'])
108
109     # Check the port-mapping for the switch-client and switch-network port-quals
110     def test_06_mpdr_portmapping_NETWORK1(self):
111         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
112         self.assertEqual(response.status_code, requests.codes.ok)
113         res = response.json()
114         self.assertIn(
115             {'supported-interface-capability':
116                ['org-openroadm-port-types:if-otsi-otsigroup'],
117              'supporting-port': 'L1',
118              'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
119              'logical-connection-point': 'XPDR2-NETWORK1',
120              'port-qual': 'switch-network',
121              'port-direction': 'bidirectional',
122              'lcp-hash-val': 'LY9PxYJqUbw=',
123              'port-admin-state': 'InService',
124              'port-oper-state': 'InService',
125              'xponder-type': 'mpdr'
126              },
127             res['mapping'])
128
129     def test_07_mpdr_portmapping_CLIENT1(self):
130         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-CLIENT1")
131         self.assertEqual(response.status_code, requests.codes.ok)
132         res = response.json()
133         self.assertIn(
134             {'supported-interface-capability':
135                ['org-openroadm-port-types:if-100GE-ODU4',
136                 'org-openroadm-port-types:if-OCH-OTU4-ODU4'],
137              'supporting-port': 'C1',
138              'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
139              'logical-connection-point': 'XPDR2-CLIENT1',
140              'port-direction': 'bidirectional',
141               'port-qual': 'switch-client',
142              'lcp-hash-val': 'AK+Cna4EclRH',
143              'port-admin-state': 'InService',
144              'port-oper-state': 'InService',
145              "mpdr-restrictions": {
146                "min-trib-slot": "1.1",
147                "max-trib-slot": "1.20"
148              }},
149             res['mapping'])
150
151     # Added test to check mc-capability-profile for a transponder
152     def test_08_check_mccapprofile(self):
153         response = test_utils.portmapping_request("XPDR-A2/mc-capabilities/XPDR-mcprofile")
154         self.assertEqual(response.status_code, requests.codes.ok)
155         res = response.json()
156         self.assertEqual(res['mc-capabilities'][0]['mc-node-name'], 'XPDR-mcprofile')
157         self.assertEqual(res['mc-capabilities'][0]['center-freq-granularity'], 3.125)
158         self.assertEqual(res['mc-capabilities'][0]['slot-width-granularity'], 6.25)
159
160     def test_09_xpdr_device_disconnection(self):
161         response = test_utils.unmount_device("XPDR-A2")
162         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
163
164     def test_10_xpdr_device_disconnected(self):
165         response = test_utils.get_netconf_oper_request("XPDR-A2")
166         self.assertEqual(response.status_code, requests.codes.conflict)
167         res = response.json()
168         self.assertIn(
169             {"error-type": "application", "error-tag": "data-missing",
170              "error-message": "Request could not be completed because the "
171                               "relevant data model content does not exist"},
172             res['errors']['error'])
173
174     def test_11_xpdr_device_not_connected(self):
175         response = test_utils.portmapping_request("XPDR-A2")
176         self.assertEqual(response.status_code, requests.codes.conflict)
177         res = response.json()
178         self.assertIn(
179             {"error-type": "application", "error-tag": "data-missing",
180              "error-message": "Request could not be completed because the "
181                               "relevant data model content does not exist"},
182             res['errors']['error'])
183
184
185 if __name__ == '__main__':
186   unittest.main(verbosity=2)