f532c4e231f10e64b20a93a2400554a1070a4195
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_portmapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import test_utils
22
23
24 class TransportPCEPortMappingTesting(unittest.TestCase):
25
26     processes = None
27     restconf_baseurl = "http://localhost:8181/restconf"
28
29     @classmethod
30     def setUpClass(cls):
31         cls.processes = test_utils.start_tpce()
32         cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
33
34     @classmethod
35     def tearDownClass(cls):
36         for process in cls.processes:
37             test_utils.shutdown_process(process)
38         print("all processes killed")
39
40     def setUp(self):
41         print("execution of {}".format(self.id().split(".")[-1]))
42         time.sleep(10)
43
44     def test_01_rdm_device_connected(self):
45         url = ("{}/config/network-topology:"
46                "network-topology/topology/topology-netconf/node/ROADM-A1"
47                .format(self.restconf_baseurl))
48         data = {"node": [{
49             "node-id": "ROADM-A1",
50             "netconf-node-topology:username": "admin",
51             "netconf-node-topology:password": "admin",
52             "netconf-node-topology:host": "127.0.0.1",
53             "netconf-node-topology:port": test_utils.sims['roadma']['port'],
54             "netconf-node-topology:tcp-only": "false",
55             "netconf-node-topology:pass-through": {}}]}
56         headers = {'content-type': 'application/json'}
57         response = requests.request(
58             "PUT", url, data=json.dumps(data), headers=headers,
59             auth=('admin', 'admin'))
60         self.assertEqual(response.status_code, requests.codes.created)
61         time.sleep(20)
62
63     def test_02_rdm_device_connected(self):
64         url = ("{}/operational/network-topology:"
65                "network-topology/topology/topology-netconf/node/ROADM-A1"
66                .format(self.restconf_baseurl))
67         headers = {'content-type': 'application/json'}
68         response = requests.request(
69             "GET", url, headers=headers, auth=('admin', 'admin'))
70         self.assertEqual(response.status_code, requests.codes.ok)
71         res = response.json()
72         self.assertEqual(
73             res['node'][0]['netconf-node-topology:connection-status'],
74             'connected')
75         time.sleep(10)
76
77     def test_03_rdm_portmapping_info(self):
78         url = ("{}/config/transportpce-portmapping:network/"
79                "nodes/ROADM-A1/node-info"
80                .format(self.restconf_baseurl))
81         headers = {'content-type': 'application/json'}
82         response = requests.request(
83             "GET", url, headers=headers, auth=('admin', 'admin'))
84         self.assertEqual(response.status_code, requests.codes.ok)
85         res = response.json()
86         self.assertEqual(
87             {u'node-info': {u'node-type': u'rdm',
88                             u'node-ip-address': u'127.0.0.11',
89                             u'node-clli': u'NodeA',
90                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
91                             u'node-model': u'model2'}},
92             res)
93         time.sleep(3)
94
95     def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
96         url = ("{}/config/transportpce-portmapping:network/"
97                "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
98                .format(self.restconf_baseurl))
99         headers = {'content-type': 'application/json'}
100         response = requests.request(
101             "GET", url, headers=headers, auth=('admin', 'admin'))
102         self.assertEqual(response.status_code, requests.codes.ok)
103         res = response.json()
104         self.assertIn(
105             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
106              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
107             res['mapping'])
108
109     def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
110         url = ("{}/config/transportpce-portmapping:network/"
111                "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
112                .format(self.restconf_baseurl))
113         headers = {'content-type': 'application/json'}
114         response = requests.request(
115             "GET", url, headers=headers, auth=('admin', 'admin'))
116         self.assertEqual(response.status_code, requests.codes.ok)
117         res = response.json()
118         self.assertIn(
119             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
120              'logical-connection-point': 'DEG2-TTP-TXRX',
121              'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
122              'port-direction': 'bidirectional'},
123             res['mapping'])
124
125     def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
126         url = ("{}/config/transportpce-portmapping:network/"
127                "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
128                .format(self.restconf_baseurl))
129         headers = {'content-type': 'application/json'}
130         response = requests.request(
131             "GET", url, headers=headers, auth=('admin', 'admin'))
132         self.assertEqual(response.status_code, requests.codes.ok)
133         res = response.json()
134         self.assertIn(
135             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
136              'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
137             res['mapping'])
138
139     def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
140         url = ("{}/config/transportpce-portmapping:network/"
141                "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
142                .format(self.restconf_baseurl))
143         headers = {'content-type': 'application/json'}
144         response = requests.request(
145             "GET", url, headers=headers, auth=('admin', 'admin'))
146         self.assertEqual(response.status_code, requests.codes.ok)
147         res = response.json()
148         self.assertIn(
149             {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
150              'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
151             res['mapping'])
152
153     def test_08_xpdr_device_connected(self):
154         url = ("{}/config/network-topology:"
155                "network-topology/topology/topology-netconf/node/XPDR-A1"
156                .format(self.restconf_baseurl))
157         data = {"node": [{
158             "node-id": "XPDR-A1",
159             "netconf-node-topology:username": "admin",
160             "netconf-node-topology:password": "admin",
161             "netconf-node-topology:host": "127.0.0.1",
162             "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
163             "netconf-node-topology:tcp-only": "false",
164             "netconf-node-topology:pass-through": {}}]}
165         headers = {'content-type': 'application/json'}
166         response = requests.request(
167             "PUT", url, data=json.dumps(data), headers=headers,
168             auth=('admin', 'admin'))
169         self.assertEqual(response.status_code, requests.codes.created)
170         time.sleep(20)
171
172     def test_09_xpdr_device_connected(self):
173         url = ("{}/operational/network-topology:"
174                "network-topology/topology/topology-netconf/node/XPDR-A1"
175                .format(self.restconf_baseurl))
176         headers = {'content-type': 'application/json'}
177         response = requests.request(
178             "GET", url, headers=headers, auth=('admin', 'admin'))
179         self.assertEqual(response.status_code, requests.codes.ok)
180         res = response.json()
181         self.assertEqual(
182             res['node'][0]['netconf-node-topology:connection-status'],
183             'connected')
184         time.sleep(10)
185
186     def test_10_xpdr_portmapping_info(self):
187         url = ("{}/config/transportpce-portmapping:network/"
188                "nodes/XPDR-A1/node-info"
189                .format(self.restconf_baseurl))
190         headers = {'content-type': 'application/json'}
191         response = requests.request(
192             "GET", url, headers=headers, auth=('admin', 'admin'))
193         self.assertEqual(response.status_code, requests.codes.ok)
194         res = response.json()
195         self.assertEqual(
196             {u'node-info': {u'node-type': u'xpdr',
197                             u'node-ip-address': u'1.2.3.4',
198                             u'node-clli': u'NodeA',
199                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
200                             u'node-model': u'model2'}},
201             res)
202         time.sleep(3)
203
204     def test_11_xpdr_portmapping_NETWORK1(self):
205         url = ("{}/config/transportpce-portmapping:network/"
206                "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
207                .format(self.restconf_baseurl))
208         headers = {'content-type': 'application/json'}
209         response = requests.request(
210             "GET", url, headers=headers, auth=('admin', 'admin'))
211         self.assertEqual(response.status_code, requests.codes.ok)
212         res = response.json()
213         self.assertIn(
214             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
215              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
216              'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
217              'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
218              'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
219             res['mapping'])
220
221     def test_12_xpdr_portmapping_NETWORK2(self):
222         url = ("{}/config/transportpce-portmapping:network/"
223                "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
224                .format(self.restconf_baseurl))
225         headers = {'content-type': 'application/json'}
226         response = requests.request(
227             "GET", url, headers=headers, auth=('admin', 'admin'))
228         self.assertEqual(response.status_code, requests.codes.ok)
229         res = response.json()
230         self.assertIn(
231             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
232              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
233              'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
234              'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
235              'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd942'},
236             res['mapping'])
237
238     def test_13_xpdr_portmapping_CLIENT1(self):
239         url = ("{}/config/transportpce-portmapping:network/"
240                "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
241                .format(self.restconf_baseurl))
242         headers = {'content-type': 'application/json'}
243         response = requests.request(
244             "GET", url, headers=headers, auth=('admin', 'admin'))
245         self.assertEqual(response.status_code, requests.codes.ok)
246         res = response.json()
247         self.assertIn(
248             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
249              'supporting-port': 'C1',
250              'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
251              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
252              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
253              'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
254             res['mapping'])
255
256     def test_14_xpdr_portmapping_CLIENT2(self):
257         url = ("{}/config/transportpce-portmapping:network/"
258                "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
259                .format(self.restconf_baseurl))
260         headers = {'content-type': 'application/json'}
261         response = requests.request(
262             "GET", url, headers=headers, auth=('admin', 'admin'))
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         self.assertIn(
266             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
267              'supporting-port': 'C1',
268              'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
269              'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
270              'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
271              'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03db'},
272             res['mapping'])
273
274     def test_15_xpdr_device_disconnected(self):
275         url = ("{}/config/network-topology:"
276                "network-topology/topology/topology-netconf/node/XPDR-A1"
277                .format(self.restconf_baseurl))
278         headers = {'content-type': 'application/json'}
279         response = requests.request(
280             "DELETE", url, headers=headers,
281             auth=('admin', 'admin'))
282         self.assertEqual(response.status_code, requests.codes.ok)
283         time.sleep(20)
284
285     def test_16_xpdr_device_disconnected(self):
286         url = ("{}/operational/network-topology:network-topology/topology/"
287                "topology-netconf/node/XPDR-A1".format(self.restconf_baseurl))
288         headers = {'content-type': 'application/json'}
289         response = requests.request(
290             "GET", url, headers=headers, auth=('admin', 'admin'))
291         self.assertEqual(response.status_code, requests.codes.not_found)
292         res = response.json()
293         self.assertIn(
294             {"error-type": "application", "error-tag": "data-missing",
295              "error-message": "Request could not be completed because the relevant data model content does not exist"},
296             res['errors']['error'])
297
298     def test_17_xpdr_device_disconnected(self):
299         url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(self.restconf_baseurl))
300         headers = {'content-type': 'application/json'}
301         response = requests.request(
302             "GET", url, headers=headers, auth=('admin', 'admin'))
303         self.assertEqual(response.status_code, requests.codes.not_found)
304         res = response.json()
305         self.assertIn(
306             {"error-type": "application", "error-tag": "data-missing",
307              "error-message": "Request could not be completed because the relevant data model content does not exist"},
308             res['errors']['error'])
309
310     def test_18_rdm_device_disconnected(self):
311         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
312                .format(self.restconf_baseurl))
313         headers = {'content-type': 'application/json'}
314         response = requests.request(
315             "DELETE", url, headers=headers,
316             auth=('admin', 'admin'))
317         self.assertEqual(response.status_code, requests.codes.ok)
318         time.sleep(20)
319
320     def test_19_rdm_device_disconnected(self):
321         url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
322                .format(self.restconf_baseurl))
323         headers = {'content-type': 'application/json'}
324         response = requests.request(
325             "GET", url, headers=headers, auth=('admin', 'admin'))
326         self.assertEqual(response.status_code, requests.codes.not_found)
327         res = response.json()
328         self.assertIn(
329             {"error-type": "application", "error-tag": "data-missing",
330              "error-message": "Request could not be completed because the relevant data model content does not exist"},
331             res['errors']['error'])
332
333     def test_20_rdm_device_disconnected(self):
334         url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(self.restconf_baseurl))
335         headers = {'content-type': 'application/json'}
336         response = requests.request(
337             "GET", url, headers=headers, auth=('admin', 'admin'))
338         self.assertEqual(response.status_code, requests.codes.not_found)
339         res = response.json()
340         self.assertIn(
341             {"error-type": "application", "error-tag": "data-missing",
342              "error-message": "Request could not be completed because the relevant data model content does not exist"},
343             res['errors']['error'])
344
345
346 if __name__ == "__main__":
347     unittest.main(verbosity=2)