fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_portmapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import time
14 import requests
15 from common import test_utils
16
17
18 class TransportPCEPortMappingTesting(unittest.TestCase):
19
20     processes = None
21
22     @classmethod
23     def setUpClass(cls):
24         cls.processes = test_utils.start_tpce()
25         cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
26
27     @classmethod
28     def tearDownClass(cls):
29         for process in cls.processes:
30             test_utils.shutdown_process(process)
31         print("all processes killed")
32
33     def setUp(self):
34         print("execution of {}".format(self.id().split(".")[-1]))
35         time.sleep(10)
36
37     def test_01_rdm_device_connection(self):
38         response = test_utils.mount_device("ROADM-A1", 'roadma')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40
41     def test_02_rdm_device_connected(self):
42         url = ("{}/operational/network-topology:"
43                "network-topology/topology/topology-netconf/node/ROADM-A1"
44                .format(test_utils.RESTCONF_BASE_URL))
45         response = requests.request(
46             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
47         self.assertEqual(response.status_code, requests.codes.ok)
48         res = response.json()
49         self.assertEqual(
50             res['node'][0]['netconf-node-topology:connection-status'],
51             'connected')
52         time.sleep(10)
53
54     def test_03_rdm_portmapping_info(self):
55         url = ("{}/config/transportpce-portmapping:network/"
56                "nodes/ROADM-A1/node-info"
57                .format(test_utils.RESTCONF_BASE_URL))
58         response = requests.request(
59             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
60         self.assertEqual(response.status_code, requests.codes.ok)
61         res = response.json()
62         self.assertEqual(
63             {u'node-info': {u'node-type': u'rdm',
64                             u'node-ip-address': u'127.0.0.11',
65                             u'node-clli': u'NodeA',
66                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
67                             u'node-model': u'model2'}},
68             res)
69         time.sleep(3)
70
71     def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
72         url = ("{}/config/transportpce-portmapping:network/"
73                "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
74                .format(test_utils.RESTCONF_BASE_URL))
75         response = requests.request(
76             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
77         self.assertEqual(response.status_code, requests.codes.ok)
78         res = response.json()
79         self.assertIn(
80             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
81              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
82             res['mapping'])
83
84     def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
85         url = ("{}/config/transportpce-portmapping:network/"
86                "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
87                .format(test_utils.RESTCONF_BASE_URL))
88         response = requests.request(
89             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
90         self.assertEqual(response.status_code, requests.codes.ok)
91         res = response.json()
92         self.assertIn(
93             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
94              'logical-connection-point': 'DEG2-TTP-TXRX',
95              'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
96              'port-direction': 'bidirectional'},
97             res['mapping'])
98
99     def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
100         url = ("{}/config/transportpce-portmapping:network/"
101                "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
102                .format(test_utils.RESTCONF_BASE_URL))
103         response = requests.request(
104             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
105         self.assertEqual(response.status_code, requests.codes.ok)
106         res = response.json()
107         self.assertIn(
108             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
109              'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
110             res['mapping'])
111
112     def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
113         url = ("{}/config/transportpce-portmapping:network/"
114                "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
115                .format(test_utils.RESTCONF_BASE_URL))
116         response = requests.request(
117             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
118         self.assertEqual(response.status_code, requests.codes.ok)
119         res = response.json()
120         self.assertIn(
121             {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
122              'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
123             res['mapping'])
124
125     def test_08_xpdr_device_connection(self):
126         response = test_utils.mount_device("XPDR-A1", 'xpdra')
127         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
128
129     def test_09_xpdr_device_connected(self):
130         url = ("{}/operational/network-topology:"
131                "network-topology/topology/topology-netconf/node/XPDR-A1"
132                .format(test_utils.RESTCONF_BASE_URL))
133         response = requests.request(
134             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
135         self.assertEqual(response.status_code, requests.codes.ok)
136         res = response.json()
137         self.assertEqual(
138             res['node'][0]['netconf-node-topology:connection-status'],
139             'connected')
140         time.sleep(10)
141
142     def test_10_xpdr_portmapping_info(self):
143         url = ("{}/config/transportpce-portmapping:network/"
144                "nodes/XPDR-A1/node-info"
145                .format(test_utils.RESTCONF_BASE_URL))
146         response = requests.request(
147             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
148         self.assertEqual(response.status_code, requests.codes.ok)
149         res = response.json()
150         self.assertEqual(
151             {u'node-info': {u'node-type': u'xpdr',
152                             u'node-ip-address': u'1.2.3.4',
153                             u'node-clli': u'NodeA',
154                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
155                             u'node-model': u'model2'}},
156             res)
157         time.sleep(3)
158
159     def test_11_xpdr_portmapping_NETWORK1(self):
160         url = ("{}/config/transportpce-portmapping:network/"
161                "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
162                .format(test_utils.RESTCONF_BASE_URL))
163         response = requests.request(
164             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
165         self.assertEqual(response.status_code, requests.codes.ok)
166         res = response.json()
167         self.assertIn(
168             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
169              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
170              'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
171              'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
172              'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
173             res['mapping'])
174
175     def test_12_xpdr_portmapping_NETWORK2(self):
176         url = ("{}/config/transportpce-portmapping:network/"
177                "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
178                .format(test_utils.RESTCONF_BASE_URL))
179         response = requests.request(
180             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
181         self.assertEqual(response.status_code, requests.codes.ok)
182         res = response.json()
183         self.assertIn(
184             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
185              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
186              'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
187              'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
188              'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd942'},
189             res['mapping'])
190
191     def test_13_xpdr_portmapping_CLIENT1(self):
192         url = ("{}/config/transportpce-portmapping:network/"
193                "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
194                .format(test_utils.RESTCONF_BASE_URL))
195         response = requests.request(
196             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
197         self.assertEqual(response.status_code, requests.codes.ok)
198         res = response.json()
199         self.assertIn(
200             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
201              'supporting-port': 'C1',
202              'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
203              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
204              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
205              'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
206             res['mapping'])
207
208     def test_14_xpdr_portmapping_CLIENT2(self):
209         url = ("{}/config/transportpce-portmapping:network/"
210                "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
211                .format(test_utils.RESTCONF_BASE_URL))
212         response = requests.request(
213             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
214         self.assertEqual(response.status_code, requests.codes.ok)
215         res = response.json()
216         self.assertIn(
217             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
218              'supporting-port': 'C1',
219              'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
220              'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
221              'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
222              'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03db'},
223             res['mapping'])
224
225     def test_15_xpdr_device_disconnection(self):
226         response = test_utils.unmount_device("XPDR-A1")
227         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
228
229     def test_16_xpdr_device_disconnected(self):
230         url = ("{}/operational/network-topology:network-topology/topology/"
231                "topology-netconf/node/XPDR-A1".format(test_utils.RESTCONF_BASE_URL))
232         response = requests.request(
233             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
234         self.assertEqual(response.status_code, requests.codes.not_found)
235         res = response.json()
236         self.assertIn(
237             {"error-type": "application", "error-tag": "data-missing",
238              "error-message": "Request could not be completed because the relevant data model content does not exist"},
239             res['errors']['error'])
240
241     def test_17_xpdr_device_not_connected(self):
242         url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(test_utils.RESTCONF_BASE_URL))
243         response = requests.request(
244             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
245         self.assertEqual(response.status_code, requests.codes.not_found)
246         res = response.json()
247         self.assertIn(
248             {"error-type": "application", "error-tag": "data-missing",
249              "error-message": "Request could not be completed because the relevant data model content does not exist"},
250             res['errors']['error'])
251
252     def test_18_rdm_device_disconnection(self):
253         response = test_utils.unmount_device("ROADM-A1")
254         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
255
256     def test_19_rdm_device_disconnected(self):
257         url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
258                .format(test_utils.RESTCONF_BASE_URL))
259         response = requests.request(
260             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
261         self.assertEqual(response.status_code, requests.codes.not_found)
262         res = response.json()
263         self.assertIn(
264             {"error-type": "application", "error-tag": "data-missing",
265              "error-message": "Request could not be completed because the relevant data model content does not exist"},
266             res['errors']['error'])
267
268     def test_20_rdm_device_not_connected(self):
269         url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(test_utils.RESTCONF_BASE_URL))
270         response = requests.request(
271             "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
272         self.assertEqual(response.status_code, requests.codes.not_found)
273         res = response.json()
274         self.assertIn(
275             {"error-type": "application", "error-tag": "data-missing",
276              "error-message": "Request could not be completed because the relevant data model content does not exist"},
277             res['errors']['error'])
278
279
280 if __name__ == "__main__":
281     unittest.main(verbosity=2)