Align 1.2.1 tests sims/tpce management to 2.2.1
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_topoPortMapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import signal
14 import time
15 import unittest
16 import requests
17 import psutil
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24     restconf_baseurl = "http://localhost:8181/restconf"
25
26     @classmethod
27     def setUpClass(cls):
28         cls.processes = test_utils.start_tpce()
29         cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
30
31     @classmethod
32     def tearDownClass(cls):
33         for process in cls.processes:
34             test_utils.shutdown_process(process)
35         print("all processes killed")
36
37     def setUp(self):
38         time.sleep(10)
39
40     # Connect the ROADMA
41     def test_01_connect_rdm(self):
42         # Config ROADMA
43         url = ("{}/config/network-topology:"
44                "network-topology/topology/topology-netconf/node/ROADMA01"
45                .format(self.restconf_baseurl))
46         data = {"node": [{
47             "node-id": "ROADMA01",
48             "netconf-node-topology:username": "admin",
49             "netconf-node-topology:password": "admin",
50             "netconf-node-topology:host": "127.0.0.1",
51             "netconf-node-topology:port": test_utils.sims['roadma']['port'],
52             "netconf-node-topology:tcp-only": "false",
53             "netconf-node-topology:pass-through": {}}]}
54         headers = {'content-type': 'application/json'}
55         response = requests.request(
56             "PUT", url, data=json.dumps(data), headers=headers,
57             auth=('admin', 'admin'))
58         self.assertEqual(response.status_code, requests.codes.created)
59         time.sleep(20)
60
61     # Verify the termination points of the ROADMA
62     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
63         urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
64                    .format(self.restconf_baseurl))
65         headers = {'content-type': 'application/json'}
66         responseTopo = requests.request("GET", urlTopo, headers=headers, auth=('admin', 'admin'))
67         resTopo = responseTopo.json()
68         nbNode = len(resTopo['network'][0]['node'])
69         nbMapCumul = 0
70         nbMappings = 0
71         for i in range(0, nbNode):
72             nodeId = resTopo['network'][0]['node'][i]['node-id']
73             nodeMapId = nodeId.split("-")[0]
74             urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
75             urlMapListFull = urlMapList.format(self.restconf_baseurl)
76             responseMapList = requests.request("GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
77             resMapList = responseMapList.json()
78
79             nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
80             nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
81             nbMapCurrent = 0
82             for j in range(0, nbTp):
83                 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
84                 if((not "CP" in tpId) and (not "CTP" in tpId)):
85                     urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
86                     urlMapFull = urlMap.format(self.restconf_baseurl)
87                     responseMap = requests.request("GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
88                     self.assertEqual(responseMap.status_code, requests.codes.ok)
89                     if responseMap.status_code == requests.codes.ok:
90                         nbMapCurrent += 1
91             nbMapCumul += nbMapCurrent
92         nbMappings -= nbMapCurrent
93         self.assertEqual(nbMappings, 0)
94
95     # Disconnect the ROADMA
96     def test_03_disconnect_rdm(self):
97         url = ("{}/config/network-topology:"
98                "network-topology/topology/topology-netconf/node/ROADMA01"
99                .format(self.restconf_baseurl))
100         data = {}
101         headers = {'content-type': 'application/json'}
102         response = requests.request(
103             "DELETE", url, data=json.dumps(data), headers=headers,
104             auth=('admin', 'admin'))
105         self.assertEqual(response.status_code, requests.codes.ok)
106
107 #     #Connect the XPDRA
108     def test_04_connect_xpdr(self):
109         # Config XPDRA
110         url = ("{}/config/network-topology:"
111                "network-topology/topology/topology-netconf/node/XPDRA01"
112                .format(self.restconf_baseurl))
113         data = {"node": [{
114             "node-id": "XPDRA01",
115             "netconf-node-topology:username": "admin",
116             "netconf-node-topology:password": "admin",
117             "netconf-node-topology:host": "127.0.0.1",
118             "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
119             "netconf-node-topology:tcp-only": "false",
120             "netconf-node-topology:pass-through": {}}]}
121         headers = {'content-type': 'application/json'}
122         response = requests.request(
123             "PUT", url, data=json.dumps(data), headers=headers,
124             auth=('admin', 'admin'))
125         self.assertEqual(response.status_code, requests.codes.created)
126         time.sleep(20)
127
128 #     #Verify the termination points related to XPDR
129     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
130         self.test_02_compareOpenroadmTopologyPortMapping_rdm()
131
132     # Disconnect the XPDRA
133     def test_06_disconnect_device(self):
134         url = ("{}/config/network-topology:"
135                "network-topology/topology/topology-netconf/node/XPDRA01"
136                .format(self.restconf_baseurl))
137         data = {}
138         headers = {'content-type': 'application/json'}
139         response = requests.request(
140             "DELETE", url, data=json.dumps(data), headers=headers,
141             auth=('admin', 'admin'))
142         self.assertEqual(response.status_code, requests.codes.ok)
143
144
145 if __name__ == "__main__":
146     # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
147     #logging.debug('I am there')
148     unittest.main(verbosity=2)