6051f7e53cdea54d7a31285d3664659229cb6178
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test02_topo_portmapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040  # nopep8
24
25
26 class TransportPCEtesting(unittest.TestCase):
27
28     processes = None
29     NODE_VERSION = '2.2.1'
30
31     @classmethod
32     def setUpClass(cls):
33         cls.processes = test_utils_rfc8040.start_tpce()
34         cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
35
36     @classmethod
37     def tearDownClass(cls):
38         # pylint: disable=not-an-iterable
39         for process in cls.processes:
40             test_utils_rfc8040.shutdown_process(process)
41         print("all processes killed")
42
43     def setUp(self):
44         time.sleep(10)
45
46     # Connect the ROADMA
47     def test_01_connect_rdm(self):
48         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
49         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
50
51     # Verify the termination points of the ROADMA
52     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
53         responseTopo = test_utils_rfc8040.get_request(test_utils_rfc8040.URL_CONFIG_ORDM_TOPO)
54         resTopo = responseTopo.json()
55         firstEntry = resTopo['ietf-network:network'][0]['node']
56         nbMapCumul = 0
57         nbMappings = 0
58         for i in range(0, len(firstEntry)):
59             nodeId = firstEntry[i]['node-id']
60             print("nodeId={}".format(nodeId))
61             nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
62             print("nodeMapId={}".format(nodeMapId))
63             response = test_utils_rfc8040.get_portmapping_node_info(nodeMapId)
64             self.assertEqual(response['status_code'], requests.codes.ok)
65             responseMapList = test_utils_rfc8040.get_portmapping(nodeMapId)
66             nbMappings = len(responseMapList['nodes'][0]['mapping']) - nbMapCumul
67             nbTp = len(firstEntry[i]['ietf-network-topology:termination-point'])
68             nbMapCurrent = 0
69             for j in range(0, nbTp):
70                 tpId = firstEntry[i]['ietf-network-topology:termination-point'][j]['tp-id']
71                 if (not "CP" in tpId) and (not "CTP" in tpId):
72                     responseMap = test_utils_rfc8040.portmapping_request(nodeMapId, tpId)
73                     self.assertEqual(responseMap['status_code'], requests.codes.ok)
74                     if responseMap['status_code'] == requests.codes.ok:
75                         nbMapCurrent += 1
76             nbMapCumul += nbMapCurrent
77         nbMappings -= nbMapCurrent
78         self.assertEqual(nbMappings, 0)
79
80     # Disconnect the ROADMA
81     def test_03_disconnect_rdm(self):
82         response = test_utils_rfc8040.unmount_device("ROADM-A1")
83         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
84
85 #     #Connect the XPDRA
86     def test_04_connect_xpdr(self):
87         response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
88         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
89
90 #     #Verify the termination points related to XPDR
91     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
92         self.test_02_compareOpenroadmTopologyPortMapping_rdm()
93
94     # Disconnect the XPDRA
95     def test_06_disconnect_device(self):
96         response = test_utils_rfc8040.unmount_device("XPDR-A1")
97         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
98
99
100 if __name__ == "__main__":
101     # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
102     #logging.debug('I am there')
103     unittest.main(verbosity=2)