Migrate portmapping functional tests to RFC8040
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test01_portmapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 import sys
19 sys.path.append('transportpce_tests/common/')
20 import test_utils_rfc8040  # nopep8
21
22
23 class TransportPCEPortMappingTesting(unittest.TestCase):
24
25     processes = None
26     NODE_VERSION = '1.2.1'
27
28     @classmethod
29     def setUpClass(cls):
30         cls.processes = test_utils_rfc8040.start_tpce()
31         cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
32
33     @classmethod
34     def tearDownClass(cls):
35         # pylint: disable=not-an-iterable
36         for process in cls.processes:
37             test_utils_rfc8040.shutdown_process(process)
38         print("all processes killed")
39
40     def setUp(self):
41         print("execution of {}".format(self.id().split(".")[-1]))
42         time.sleep(10)
43
44     def test_01_rdm_device_connection(self):
45         response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
46         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
47
48     def test_02_rdm_device_connected(self):
49         response = test_utils_rfc8040.check_device_connection("ROADMA01")
50         self.assertEqual(response['status_code'], requests.codes.ok)
51         self.assertEqual(response['connection-status'], 'connected')
52         time.sleep(10)
53
54     def test_03_rdm_portmapping_info(self):
55         response = test_utils_rfc8040.get_portmapping_node_info("ROADMA01")
56         self.assertEqual(response['status_code'], requests.codes.ok)
57         self.assertEqual(
58             {'node-type': 'rdm',
59              'node-ip-address': '127.0.0.12',
60              'node-clli': 'NodeA',
61              'openroadm-version': '1.2.1',
62              'node-vendor': 'vendorA',
63              'node-model': '2'},
64             response['node-info'])
65         time.sleep(3)
66
67     def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
68         response = test_utils_rfc8040.portmapping_request("ROADMA01", "DEG1-TTP-TXRX")
69         self.assertEqual(response['status_code'], requests.codes.ok)
70         self.assertIn(
71             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
72              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
73              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
74             response['mapping'])
75
76     def test_05_rdm_portmapping_SRG1_PP7_TXRX(self):
77         response = test_utils_rfc8040.portmapping_request("ROADMA01", "SRG1-PP7-TXRX")
78         self.assertEqual(response['status_code'], requests.codes.ok)
79         self.assertIn(
80             {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
81              'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional',
82              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
83             response['mapping'])
84
85     def test_06_rdm_portmapping_SRG3_PP1_TXRX(self):
86         response = test_utils_rfc8040.portmapping_request("ROADMA01", "SRG3-PP1-TXRX")
87         self.assertEqual(response['status_code'], requests.codes.ok)
88         self.assertIn(
89             {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
90              'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional',
91              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
92             response['mapping'])
93
94     def test_07_xpdr_device_connection(self):
95         response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
96         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
97
98     def test_08_xpdr_device_connected(self):
99         response = test_utils_rfc8040.check_device_connection("XPDRA01")
100         self.assertEqual(response['status_code'], requests.codes.ok)
101         self.assertEqual(response['connection-status'], 'connected')
102         time.sleep(10)
103
104     def test_09_xpdr_portmapping_info(self):
105         response = test_utils_rfc8040.get_portmapping_node_info("XPDRA01")
106         self.assertEqual(response['status_code'], requests.codes.ok)
107         self.assertEqual(
108             {'node-type': 'xpdr',
109              'node-ip-address': '127.0.0.10',
110              'node-clli': 'NodeA',
111              'openroadm-version': '1.2.1',
112              'node-vendor': 'vendorA',
113              'node-model': '1'},
114             response['node-info'])
115         time.sleep(3)
116
117     def test_10_xpdr_portmapping_NETWORK1(self):
118         response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-NETWORK1")
119         self.assertEqual(response['status_code'], requests.codes.ok)
120         self.assertIn(
121             {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
122              'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
123              'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
124              'lcp-hash-val': 'OSvMgUyP+mE=',
125              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
126             response['mapping'])
127
128     def test_11_xpdr_portmapping_NETWORK2(self):
129         response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-NETWORK2")
130         self.assertEqual(response['status_code'], requests.codes.ok)
131         self.assertIn(
132             {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
133              'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
134              'connection-map-lcp': 'XPDR1-CLIENT3', 'port-qual': 'xpdr-network',
135              'lcp-hash-val': 'OSvMgUyP+mI=',
136              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
137             response['mapping'])
138
139     def test_12_xpdr_portmapping_CLIENT1(self):
140         response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT1")
141         self.assertEqual(response['status_code'], requests.codes.ok)
142         self.assertIn(
143             {'supporting-port': 'C1',
144              'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
145              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
146              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
147              'lcp-hash-val': 'AO9UFkY/TLYw',
148              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
149             response['mapping'])
150
151     def test_13_xpdr_portmapping_CLIENT2(self):
152         response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT2")
153         self.assertEqual(response['status_code'], requests.codes.ok)
154         self.assertIn(
155             {'supporting-port': 'C2',
156              'supporting-circuit-pack-name': '1/0/C2-PLUG-CLIENT',
157              'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
158              'port-qual': 'xpdr-client',
159              'lcp-hash-val': 'AO9UFkY/TLYz',
160              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
161             response['mapping'])
162
163     def test_14_xpdr_portmapping_CLIENT3(self):
164         response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT3")
165         self.assertEqual(response['status_code'], requests.codes.ok)
166         self.assertIn(
167             {'supporting-port': 'C3',
168              'supporting-circuit-pack-name': '1/0/C3-PLUG-CLIENT',
169              'logical-connection-point': 'XPDR1-CLIENT3',
170              'connection-map-lcp': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
171              'port-qual': 'xpdr-client', 'lcp-hash-val': 'AO9UFkY/TLYy',
172              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
173             response['mapping'])
174
175     def test_15_xpdr_portmapping_CLIENT4(self):
176         response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT4")
177         self.assertEqual(response['status_code'], requests.codes.ok)
178         self.assertIn(
179             {'supporting-port': 'C4',
180              'supporting-circuit-pack-name': '1/0/C4-PLUG-CLIENT',
181              'logical-connection-point': 'XPDR1-CLIENT4', 'port-direction': 'bidirectional',
182              'port-qual': 'xpdr-client', 'lcp-hash-val': 'AO9UFkY/TLY1',
183              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
184             response['mapping'])
185
186     def test_16_xpdr_device_disconnection(self):
187         response = test_utils_rfc8040.unmount_device("XPDRA01")
188         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
189
190     def test_17_xpdr_device_disconnected(self):
191         response = test_utils_rfc8040.check_device_connection("XPDRA01")
192         self.assertEqual(response['status_code'], requests.codes.conflict)
193         self.assertIn(
194             {"error-tag": "data-missing",
195              "error-message": "Request could not be completed because the relevant data model content does not exist",
196              "error-type": "protocol"},
197             response['connection-status'])
198
199     def test_18_xpdr_device_not_connected(self):
200         response = test_utils_rfc8040.get_portmapping_node_info("XPDRA01")
201         self.assertEqual(response['status_code'], requests.codes.conflict)
202         self.assertIn(
203             {"error-tag": "data-missing",
204              "error-message": "Request could not be completed because the relevant data model content does not exist",
205              "error-type": "protocol"},
206             response['node-info'])
207
208     def test_19_rdm_device_disconnection(self):
209         response = test_utils_rfc8040.unmount_device("ROADMA01")
210         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
211
212     def test_20_rdm_device_disconnected(self):
213         response = test_utils_rfc8040.check_device_connection("ROADMA01")
214         self.assertEqual(response['status_code'], requests.codes.conflict)
215         self.assertIn(
216             {"error-tag": "data-missing",
217              "error-message": "Request could not be completed because the relevant data model content does not exist",
218              "error-type": "protocol"},
219             response['connection-status'])
220
221     def test_21_rdm_device_not_connected(self):
222         response = test_utils_rfc8040.get_portmapping_node_info("ROADMA01")
223         self.assertEqual(response['status_code'], requests.codes.conflict)
224         self.assertIn(
225             {"error-tag": "data-missing",
226              "error-message": "Request could not be completed because the relevant data model content does not exist",
227              "error-type": "protocol"},
228             response['node-info'])
229
230
231 if __name__ == "__main__":
232     unittest.main(verbosity=2)