fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_topoPortMapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(10)
36
37     # Connect the ROADMA
38     def test_01_connect_rdm(self):
39         response = test_utils.mount_device("ROADM-A1", 'roadma')
40         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
41
42     # Verify the termination points of the ROADMA
43     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
44         urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
45                    .format(test_utils.RESTCONF_BASE_URL))
46         responseTopo = requests.request(
47             "GET", urlTopo, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
48         resTopo = responseTopo.json()
49         nbNode = len(resTopo['network'][0]['node'])
50         nbMapCumul = 0
51         nbMappings = 0
52         for i in range(0, nbNode):
53             nodeId = resTopo['network'][0]['node'][i]['node-id']
54             print("nodeId={}".format(nodeId))
55             nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
56             print("nodeMapId={}".format(nodeMapId))
57             urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
58             urlMapListFull = urlMapList.format(test_utils.RESTCONF_BASE_URL)
59             responseMapList = requests.request(
60                 "GET", urlMapListFull, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
61             resMapList = responseMapList.json()
62
63             nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
64             nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
65             nbMapCurrent = 0
66             for j in range(0, nbTp):
67                 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
68                 if((not "CP" in tpId) and (not "CTP" in tpId)):
69                     urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
70                     urlMapFull = urlMap.format(test_utils.RESTCONF_BASE_URL)
71                     responseMap = requests.request(
72                         "GET", urlMapFull, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
73                     self.assertEqual(responseMap.status_code, requests.codes.ok)
74                     if(responseMap.status_code == requests.codes.ok):
75                         nbMapCurrent += 1
76             nbMapCumul += nbMapCurrent
77         nbMappings -= nbMapCurrent
78         self.assertEqual(nbMappings, 0)
79
80     # Disconnect the ROADMA
81     def test_03_disconnect_rdm(self):
82         url = ("{}/config/network-topology:"
83                "network-topology/topology/topology-netconf/node/ROADM-A1"
84                .format(test_utils.RESTCONF_BASE_URL))
85         data = {}
86         response = requests.request(
87             "DELETE", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
88             auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
89         self.assertEqual(response.status_code, requests.codes.ok)
90
91 #     #Connect the XPDRA
92     def test_04_connect_xpdr(self):
93         response = test_utils.mount_device("XPDR-A1", 'xpdra')
94         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
95
96 #     #Verify the termination points related to XPDR
97     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
98         self.test_02_compareOpenroadmTopologyPortMapping_rdm()
99
100     # Disconnect the XPDRA
101     def test_06_disconnect_device(self):
102         url = ("{}/config/network-topology:"
103                "network-topology/topology/topology-netconf/node/XPDR-A1"
104                .format(test_utils.RESTCONF_BASE_URL))
105         data = {}
106         response = requests.request(
107             "DELETE", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
108             auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
109         self.assertEqual(response.status_code, requests.codes.ok)
110
111
112 if __name__ == "__main__":
113     # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
114     #logging.debug('I am there')
115     unittest.main(verbosity=2)