14edf70964861d5295cf3ac85de415363e537c24
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_portmapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 from common import test_utils
22
23
24 class TransportPCEPortMappingTesting(unittest.TestCase):
25
26     processes = None
27     restconf_baseurl = "http://localhost:8181/restconf"
28
29     @classmethod
30     def setUpClass(cls):
31         cls.processes = test_utils.start_tpce()
32         cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
33
34     @classmethod
35     def tearDownClass(cls):
36         for process in cls.processes:
37             test_utils.shutdown_process(process)
38         print("all processes killed")
39
40     def setUp(self):
41         print("execution of {}".format(self.id().split(".")[-1]))
42         time.sleep(10)
43
44     def test_01_rdm_device_connected(self):
45         url = ("{}/config/network-topology:"
46                "network-topology/topology/topology-netconf/node/ROADM-A1"
47                .format(self.restconf_baseurl))
48         data = {"node": [{
49             "node-id": "ROADM-A1",
50             "netconf-node-topology:username": "admin",
51             "netconf-node-topology:password": "admin",
52             "netconf-node-topology:host": "127.0.0.1",
53             "netconf-node-topology:port": test_utils.sims['roadma']['port'],
54             "netconf-node-topology:tcp-only": "false",
55             "netconf-node-topology:pass-through": {}}]}
56         headers = {'content-type': 'application/json'}
57         response = requests.request(
58             "PUT", url, data=json.dumps(data), headers=headers,
59             auth=('admin', 'admin'))
60         self.assertEqual(response.status_code, requests.codes.created)
61         time.sleep(20)
62
63     def test_02_rdm_device_connected(self):
64         url = ("{}/operational/network-topology:"
65                "network-topology/topology/topology-netconf/node/ROADM-A1"
66                .format(self.restconf_baseurl))
67         headers = {'content-type': 'application/json'}
68         response = requests.request(
69             "GET", url, headers=headers, auth=('admin', 'admin'))
70         self.assertEqual(response.status_code, requests.codes.ok)
71         res = response.json()
72         self.assertEqual(
73             res['node'][0]['netconf-node-topology:connection-status'],
74             'connected')
75         time.sleep(10)
76
77     def test_03_rdm_portmapping_info(self):
78         url = ("{}/config/transportpce-portmapping:network/"
79                "nodes/ROADM-A1/node-info"
80                .format(self.restconf_baseurl))
81         headers = {'content-type': 'application/json'}
82         response = requests.request(
83             "GET", url, headers=headers, auth=('admin', 'admin'))
84         self.assertEqual(response.status_code, requests.codes.ok)
85         res = response.json()
86         self.assertEqual(
87             {u'node-info': {u'node-type': u'rdm',
88                             u'node-ip-address': u'127.0.0.11',
89                             u'node-clli': u'NodeA',
90                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
91                             u'node-model': u'model2'}},
92             res)
93         time.sleep(3)
94
95     def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
96         url = ("{}/config/transportpce-portmapping:network/"
97                "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
98                .format(self.restconf_baseurl))
99         headers = {'content-type': 'application/json'}
100         response = requests.request(
101             "GET", url, headers=headers, auth=('admin', 'admin'))
102         self.assertEqual(response.status_code, requests.codes.ok)
103         res = response.json()
104         self.assertIn(
105             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
106              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
107             res['mapping'])
108
109     def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
110         url = ("{}/config/transportpce-portmapping:network/"
111                "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
112                .format(self.restconf_baseurl))
113         headers = {'content-type': 'application/json'}
114         response = requests.request(
115             "GET", url, headers=headers, auth=('admin', 'admin'))
116         self.assertEqual(response.status_code, requests.codes.ok)
117         res = response.json()
118         self.assertIn(
119             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
120              'logical-connection-point': 'DEG2-TTP-TXRX',
121              'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
122              'port-direction': 'bidirectional'},
123             res['mapping'])
124
125     def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
126         url = ("{}/config/transportpce-portmapping:network/"
127                "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
128                .format(self.restconf_baseurl))
129         headers = {'content-type': 'application/json'}
130         response = requests.request(
131             "GET", url, headers=headers, auth=('admin', 'admin'))
132         self.assertEqual(response.status_code, requests.codes.ok)
133         res = response.json()
134         self.assertIn(
135             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
136              'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
137             res['mapping'])
138
139     def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
140         url = ("{}/config/transportpce-portmapping:network/"
141                "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
142                .format(self.restconf_baseurl))
143         headers = {'content-type': 'application/json'}
144         response = requests.request(
145             "GET", url, headers=headers, auth=('admin', 'admin'))
146         self.assertEqual(response.status_code, requests.codes.ok)
147         res = response.json()
148         self.assertIn(
149             {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
150              'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
151             res['mapping'])
152
153     def test_08_xpdr_device_connected(self):
154         url = ("{}/config/network-topology:"
155                "network-topology/topology/topology-netconf/node/XPDR-A1"
156                .format(self.restconf_baseurl))
157         data = {"node": [{
158             "node-id": "XPDR-A1",
159             "netconf-node-topology:username": "admin",
160             "netconf-node-topology:password": "admin",
161             "netconf-node-topology:host": "127.0.0.1",
162             "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
163             "netconf-node-topology:tcp-only": "false",
164             "netconf-node-topology:pass-through": {}}]}
165         headers = {'content-type': 'application/json'}
166         response = requests.request(
167             "PUT", url, data=json.dumps(data), headers=headers,
168             auth=('admin', 'admin'))
169         self.assertEqual(response.status_code, requests.codes.created)
170         time.sleep(20)
171
172     def test_09_xpdr_device_connected(self):
173         url = ("{}/operational/network-topology:"
174                "network-topology/topology/topology-netconf/node/XPDR-A1"
175                .format(self.restconf_baseurl))
176         headers = {'content-type': 'application/json'}
177         response = requests.request(
178             "GET", url, headers=headers, auth=('admin', 'admin'))
179         self.assertEqual(response.status_code, requests.codes.ok)
180         res = response.json()
181         self.assertEqual(
182             res['node'][0]['netconf-node-topology:connection-status'],
183             'connected')
184         time.sleep(10)
185
186     def test_10_xpdr_portmapping_info(self):
187         url = ("{}/config/transportpce-portmapping:network/"
188                "nodes/XPDR-A1/node-info"
189                .format(self.restconf_baseurl))
190         headers = {'content-type': 'application/json'}
191         response = requests.request(
192             "GET", url, headers=headers, auth=('admin', 'admin'))
193         self.assertEqual(response.status_code, requests.codes.ok)
194         res = response.json()
195         self.assertEqual(
196             {u'node-info': {u'node-type': u'xpdr',
197                             u'node-ip-address': u'1.2.3.4',
198                             u'node-clli': u'NodeA',
199                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
200                             u'node-model': u'model2'}},
201             res)
202         time.sleep(3)
203
204     def test_11_xpdr_portmapping_NETWORK1(self):
205         url = ("{}/config/transportpce-portmapping:network/"
206                "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
207                .format(self.restconf_baseurl))
208         headers = {'content-type': 'application/json'}
209         response = requests.request(
210             "GET", url, headers=headers, auth=('admin', 'admin'))
211         self.assertEqual(response.status_code, requests.codes.ok)
212         res = response.json()
213         self.assertIn(
214             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
215              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
216              'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
217              'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
218              'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
219             res['mapping'])
220
221     def test_12_xpdr_portmapping_NETWORK2(self):
222         url = ("{}/config/transportpce-portmapping:network/"
223                "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
224                .format(self.restconf_baseurl))
225         headers = {'content-type': 'application/json'}
226         response = requests.request(
227             "GET", url, headers=headers, auth=('admin', 'admin'))
228         self.assertEqual(response.status_code, requests.codes.ok)
229         res = response.json()
230         self.assertIn(
231             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
232              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
233              'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
234              'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
235              'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd942'},
236             res['mapping'])
237
238     def test_13_xpdr_portmapping_CLIENT1(self):
239         url = ("{}/config/transportpce-portmapping:network/"
240                "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
241                .format(self.restconf_baseurl))
242         headers = {'content-type': 'application/json'}
243         response = requests.request(
244             "GET", url, headers=headers, auth=('admin', 'admin'))
245         self.assertEqual(response.status_code, requests.codes.ok)
246         res = response.json()
247         self.assertIn(
248             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
249              'supporting-port': 'C1',
250              'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
251              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
252              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
253              'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
254             res['mapping'])
255
256     def test_14_xpdr_portmapping_CLIENT2(self):
257         url = ("{}/config/transportpce-portmapping:network/"
258                "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
259                .format(self.restconf_baseurl))
260         headers = {'content-type': 'application/json'}
261         response = requests.request(
262             "GET", url, headers=headers, auth=('admin', 'admin'))
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         self.assertIn(
266             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
267              'supporting-port': 'C1',
268              'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
269              'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
270              'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
271              'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03db'},
272             res['mapping'])
273
274     def test_15_xpdr_device_disconnected(self):
275         url = ("{}/config/network-topology:"
276                "network-topology/topology/topology-netconf/node/XPDR-A1"
277                .format(self.restconf_baseurl))
278         headers = {'content-type': 'application/json'}
279         response = requests.request(
280             "DELETE", url, headers=headers,
281             auth=('admin', 'admin'))
282         self.assertEqual(response.status_code, requests.codes.ok)
283         time.sleep(20)
284
285     def test_16_xpdr_device_disconnected(self):
286         url = ("{}/operational/network-topology:network-topology/topology/"
287                "topology-netconf/node/XPDR-A1".format(self.restconf_baseurl))
288         headers = {'content-type': 'application/json'}
289         response = requests.request(
290             "GET", url, headers=headers, auth=('admin', 'admin'))
291         self.assertEqual(response.status_code, requests.codes.not_found)
292         res = response.json()
293         self.assertIn(
294             {"error-type": "application", "error-tag": "data-missing",
295              "error-message": "Request could not be completed because the relevant data model content does not exist"},
296             res['errors']['error'])
297
298     def test_17_xpdr_device_disconnected(self):
299         url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(self.restconf_baseurl))
300         headers = {'content-type': 'application/json'}
301         response = requests.request(
302             "GET", url, headers=headers, auth=('admin', 'admin'))
303         self.assertEqual(response.status_code, requests.codes.not_found)
304         res = response.json()
305         self.assertIn(
306             {"error-type": "application", "error-tag": "data-missing",
307              "error-message": "Request could not be completed because the relevant data model content does not exist"},
308             res['errors']['error'])
309
310     def test_18_rdm_device_disconnected(self):
311         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
312                .format(self.restconf_baseurl))
313         headers = {'content-type': 'application/json'}
314         response = requests.request(
315             "DELETE", url, headers=headers,
316             auth=('admin', 'admin'))
317         self.assertEqual(response.status_code, requests.codes.ok)
318         time.sleep(20)
319
320     def test_19_rdm_device_disconnected(self):
321         url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
322                .format(self.restconf_baseurl))
323         headers = {'content-type': 'application/json'}
324         response = requests.request(
325             "GET", url, headers=headers, auth=('admin', 'admin'))
326         self.assertEqual(response.status_code, requests.codes.not_found)
327         res = response.json()
328         self.assertIn(
329             {"error-type": "application", "error-tag": "data-missing",
330              "error-message": "Request could not be completed because the relevant data model content does not exist"},
331             res['errors']['error'])
332
333     def test_20_rdm_device_disconnected(self):
334         url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(self.restconf_baseurl))
335         headers = {'content-type': 'application/json'}
336         response = requests.request(
337             "GET", url, headers=headers, auth=('admin', 'admin'))
338         self.assertEqual(response.status_code, requests.codes.not_found)
339         res = response.json()
340         self.assertIn(
341             {"error-type": "application", "error-tag": "data-missing",
342              "error-message": "Request could not be completed because the relevant data model content does not exist"},
343             res['errors']['error'])
344
345
346 if __name__ == "__main__":
347     unittest.main(verbosity=2)