review sims/tpce start-up sequence in 2.2.1 tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_topoPortMapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24
25 class TransportPCEtesting(unittest.TestCase):
26
27     sim_process1 = None
28     sim_process2 = None
29     odl_process = None
30     restconf_baseurl = "http://localhost:8181/restconf"
31
32 # START_IGNORE_XTESTING
33
34     @classmethod
35     def setUpClass(cls):
36         cls.odl_process = test_utils.start_tpce()
37         cls.sim_process1 = test_utils.start_sim('xpdra')
38         cls.sim_process2 = test_utils.start_sim('roadma')
39
40     @classmethod
41     def tearDownClass(cls):
42         for child in psutil.Process(cls.odl_process.pid).children():
43             child.send_signal(signal.SIGINT)
44             child.wait()
45         cls.odl_process.send_signal(signal.SIGINT)
46         cls.odl_process.wait()
47         for child in psutil.Process(cls.sim_process1.pid).children():
48             child.send_signal(signal.SIGINT)
49             child.wait()
50         cls.sim_process1.send_signal(signal.SIGINT)
51         cls.sim_process1.wait()
52         for child in psutil.Process(cls.sim_process2.pid).children():
53             child.send_signal(signal.SIGINT)
54             child.wait()
55         cls.sim_process2.send_signal(signal.SIGINT)
56         cls.sim_process2.wait()
57
58     def setUp(self):
59         time.sleep(10)
60
61 # END_IGNORE_XTESTING
62
63     # Connect the ROADMA
64     def test_01_connect_rdm(self):
65         # Config ROADMA
66         url = ("{}/config/network-topology:"
67                "network-topology/topology/topology-netconf/node/ROADM-A1"
68                .format(self.restconf_baseurl))
69         data = {"node": [{
70             "node-id": "ROADM-A1",
71             "netconf-node-topology:username": "admin",
72             "netconf-node-topology:password": "admin",
73             "netconf-node-topology:host": "127.0.0.1",
74             "netconf-node-topology:port": test_utils.sims['roadma']['port'],
75             "netconf-node-topology:tcp-only": "false",
76             "netconf-node-topology:pass-through": {}}]}
77         headers = {'content-type': 'application/json'}
78         response = requests.request(
79             "PUT", url, data=json.dumps(data), headers=headers,
80             auth=('admin', 'admin'))
81         self.assertEqual(response.status_code, requests.codes.created)
82         time.sleep(20)
83
84     # Verify the termination points of the ROADMA
85     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
86         urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
87                    .format(self.restconf_baseurl))
88         headers = {'content-type': 'application/json'}
89         responseTopo = requests.request(
90             "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
91         resTopo = responseTopo.json()
92         nbNode = len(resTopo['network'][0]['node'])
93         nbMapCumul = 0
94         nbMappings = 0
95         for i in range(0, nbNode):
96             nodeId = resTopo['network'][0]['node'][i]['node-id']
97             print("nodeId={}".format(nodeId))
98             nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
99             print("nodeMapId={}".format(nodeMapId))
100             urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
101             urlMapListFull = urlMapList.format(self.restconf_baseurl)
102             responseMapList = requests.request(
103                 "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
104             resMapList = responseMapList.json()
105
106             nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
107             nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
108             nbMapCurrent = 0
109             for j in range(0, nbTp):
110                 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
111                 if((not "CP" in tpId) and (not "CTP" in tpId)):
112                     urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
113                     urlMapFull = urlMap.format(self.restconf_baseurl)
114                     responseMap = requests.request(
115                         "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
116                     self.assertEqual(responseMap.status_code, requests.codes.ok)
117                     if(responseMap.status_code == requests.codes.ok):
118                         nbMapCurrent += 1
119             nbMapCumul += nbMapCurrent
120         nbMappings -= nbMapCurrent
121         self.assertEqual(nbMappings, 0)
122
123     # Disconnect the ROADMA
124     def test_03_disconnect_rdm(self):
125         url = ("{}/config/network-topology:"
126                "network-topology/topology/topology-netconf/node/ROADM-A1"
127                .format(self.restconf_baseurl))
128         data = {}
129         headers = {'content-type': 'application/json'}
130         response = requests.request(
131             "DELETE", url, data=json.dumps(data), headers=headers,
132             auth=('admin', 'admin'))
133         self.assertEqual(response.status_code, requests.codes.ok)
134
135 #     #Connect the XPDRA
136     def test_04_connect_xpdr(self):
137         # Config XPDRA
138         url = ("{}/config/network-topology:"
139                "network-topology/topology/topology-netconf/node/XPDR-A1"
140                .format(self.restconf_baseurl))
141         data = {"node": [{
142             "node-id": "XPDR-A1",
143             "netconf-node-topology:username": "admin",
144             "netconf-node-topology:password": "admin",
145             "netconf-node-topology:host": "127.0.0.1",
146             "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
147             "netconf-node-topology:tcp-only": "false",
148             "netconf-node-topology:pass-through": {}}]}
149         headers = {'content-type': 'application/json'}
150         response = requests.request(
151             "PUT", url, data=json.dumps(data), headers=headers,
152             auth=('admin', 'admin'))
153         self.assertEqual(response.status_code, requests.codes.created)
154         time.sleep(20)
155
156 #     #Verify the termination points related to XPDR
157     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
158         self.test_02_compareOpenroadmTopologyPortMapping_rdm()
159
160     # Disconnect the XPDRA
161     def test_06_disconnect_device(self):
162         url = ("{}/config/network-topology:"
163                "network-topology/topology/topology-netconf/node/XPDR-A1"
164                .format(self.restconf_baseurl))
165         data = {}
166         headers = {'content-type': 'application/json'}
167         response = requests.request(
168             "DELETE", url, data=json.dumps(data), headers=headers,
169             auth=('admin', 'admin'))
170         self.assertEqual(response.status_code, requests.codes.ok)
171
172
173 if __name__ == "__main__":
174     # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
175     #logging.debug('I am there')
176     unittest.main(verbosity=2)