Add PowerMock library in OLM module
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_topoPortMapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     honeynode_process1 = None
26     honeynode_process2 = None
27     odl_process = None
28     restconf_baseurl = "http://localhost:8181/restconf"
29
30 #START_IGNORE_XTESTING
31
32     @classmethod
33     def __start_honeynode1(cls):
34         executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
35                       "/honeynode-distribution-1.18.01/honeycomb-tpce")
36         if os.path.isfile(executable):
37             with open('honeynode1.log', 'w') as outfile:
38                 cls.honeynode_process1 = subprocess.Popen(
39                     [executable, "17830", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
40                     stdout=outfile)
41
42     @classmethod
43     def __start_honeynode2(cls):
44         executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
45                       "/honeynode-distribution-1.18.01/honeycomb-tpce")
46         if os.path.isfile(executable):
47             with open('honeynode2.log', 'w') as outfile:
48                 cls.honeynode_process2 = subprocess.Popen(
49                     [executable, "17831", "sample_configs/openroadm/2.1/oper-ROADMA.xml"],
50                     stdout=outfile)
51
52     @classmethod
53     def __start_odl(cls):
54         executable = "../karaf/target/assembly/bin/karaf"
55         with open('odl.log', 'w') as outfile:
56             cls.odl_process = subprocess.Popen(
57                 ["bash", executable, "server"], stdout=outfile,
58                 stdin=open(os.devnull))
59
60     @classmethod
61     def setUpClass(cls):
62         cls.__start_honeynode1()
63         time.sleep(20)
64         cls.__start_honeynode2()
65         time.sleep(20)
66         cls.__start_odl()
67         time.sleep(60)
68
69     @classmethod
70     def tearDownClass(cls):
71         for child in psutil.Process(cls.odl_process.pid).children():
72             child.send_signal(signal.SIGINT)
73             child.wait()
74         cls.odl_process.send_signal(signal.SIGINT)
75         cls.odl_process.wait()
76         for child in psutil.Process(cls.honeynode_process1.pid).children():
77             child.send_signal(signal.SIGINT)
78             child.wait()
79         cls.honeynode_process1.send_signal(signal.SIGINT)
80         cls.honeynode_process1.wait()
81         for child in psutil.Process(cls.honeynode_process2.pid).children():
82             child.send_signal(signal.SIGINT)
83             child.wait()
84         cls.honeynode_process2.send_signal(signal.SIGINT)
85         cls.honeynode_process2.wait()
86
87     def setUp(self):
88         time.sleep(10)
89
90 #END_IGNORE_XTESTING
91
92     #Connect the ROADMA
93     def test_01_connect_rdm(self):
94         #Config ROADMA
95         url = ("{}/config/network-topology:"
96                 "network-topology/topology/topology-netconf/node/ROADMA"
97                .format(self.restconf_baseurl))
98         data = {"node": [{
99              "node-id": "ROADMA",
100              "netconf-node-topology:username": "admin",
101              "netconf-node-topology:password": "admin",
102              "netconf-node-topology:host": "127.0.0.1",
103              "netconf-node-topology:port": "17831",
104              "netconf-node-topology:tcp-only": "false",
105              "netconf-node-topology:pass-through": {}}]}
106         headers = {'content-type': 'application/json'}
107         response = requests.request(
108              "PUT", url, data=json.dumps(data), headers=headers,
109              auth=('admin', 'admin'))
110         self.assertEqual(response.status_code, requests.codes.created)
111         time.sleep(20)
112
113     #Verify the termination points of the ROADMA
114     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
115         urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
116             .format(self.restconf_baseurl))
117         headers = {'content-type': 'application/json'}
118         responseTopo = requests.request(
119             "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
120         resTopo = responseTopo.json()
121         nbNode = len(resTopo['network'][0]['node'])
122         nbMapCumul = 0
123         nbMappings = 0
124         for i in range(0, nbNode):
125             nodeId = resTopo['network'][0]['node'][i]['node-id']
126             nodeMapId = nodeId.split("-")[0]
127             urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
128             urlMapListFull = urlMapList.format(self.restconf_baseurl)
129             responseMapList = requests.request(
130                         "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
131             resMapList = responseMapList.json()
132
133             nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
134             nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
135             nbMapCurrent = 0
136             for j in range(0, nbTp):
137                 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
138                 if((not "CP" in tpId) and (not "CTP" in tpId)):
139                      urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
140                      urlMapFull = urlMap.format(self.restconf_baseurl)
141                      responseMap = requests.request(
142                         "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
143                      self.assertEqual(responseMap.status_code, requests.codes.ok)
144                      if(responseMap.status_code == requests.codes.ok):
145                         nbMapCurrent += 1
146             nbMapCumul += nbMapCurrent
147         nbMappings -= nbMapCurrent
148         self.assertEqual(nbMappings, 0)
149
150     #Disconnect the ROADMA
151     def test_03_disconnect_rdm(self):
152         url = ("{}/config/network-topology:"
153                 "network-topology/topology/topology-netconf/node/ROADMA"
154                .format(self.restconf_baseurl))
155         data = {}
156         headers = {'content-type': 'application/json'}
157         response = requests.request(
158              "DELETE", url, data=json.dumps(data), headers=headers,
159              auth=('admin', 'admin'))
160         self.assertEqual(response.status_code, requests.codes.ok)
161
162 #     #Connect the XPDRA
163     def test_04_connect_xpdr(self):
164          #Config XPDRA
165          url = ("{}/config/network-topology:"
166                  "network-topology/topology/topology-netconf/node/XPDRA"
167                 .format(self.restconf_baseurl))
168          data = {"node": [{
169               "node-id": "XPDRA",
170               "netconf-node-topology:username": "admin",
171               "netconf-node-topology:password": "admin",
172               "netconf-node-topology:host": "127.0.0.1",
173               "netconf-node-topology:port": "17830",
174               "netconf-node-topology:tcp-only": "false",
175               "netconf-node-topology:pass-through": {}}]}
176          headers = {'content-type': 'application/json'}
177          response = requests.request(
178               "PUT", url, data=json.dumps(data), headers=headers,
179               auth=('admin', 'admin'))
180          self.assertEqual(response.status_code, requests.codes.created)
181          time.sleep(20)
182
183 #     #Verify the termination points related to XPDR
184     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
185         self.test_02_compareOpenroadmTopologyPortMapping_rdm()
186
187     #Disconnect the XPDRA
188     def test_06_disconnect_device(self):
189         url = ("{}/config/network-topology:"
190                "network-topology/topology/topology-netconf/node/XPDRA"
191               .format(self.restconf_baseurl))
192         data = {}
193         headers = {'content-type': 'application/json'}
194         response = requests.request(
195             "DELETE", url, data=json.dumps(data), headers=headers,
196             auth=('admin', 'admin'))
197         self.assertEqual(response.status_code, requests.codes.ok)
198
199 if __name__ == "__main__":
200     #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
201     #logging.debug('I am there')
202     unittest.main(verbosity=2)