a2f88b5f6ad8b4d6547f6d4ec542d444fa64ba83
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test02_topo_portmapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # a pylint false positive due to unittest
15 # pylint: disable=no-self-use
16
17 import time
18 import unittest
19 import requests
20 # pylint: disable=wrong-import-order
21 import sys
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils_rfc8040  # nopep8
26
27
28 class TransportPCEtesting(unittest.TestCase):
29
30     processes = None
31     NODE_VERSION = '1.2.1'
32
33     @classmethod
34     def setUpClass(cls):
35         cls.processes = test_utils_rfc8040.start_tpce()
36         cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
37
38     @classmethod
39     def tearDownClass(cls):
40         # pylint: disable=not-an-iterable
41         for process in cls.processes:
42             test_utils_rfc8040.shutdown_process(process)
43         print("all processes killed")
44
45     def setUp(self):
46         time.sleep(10)
47
48     # Connect the ROADMA
49     def test_01_connect_rdm(self):
50         response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
51         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
52
53     # Verify the termination points of the ROADMA
54     def test_02_compare_Openroadm_topology_portmapping_rdm(self):
55         responseTopo = test_utils_rfc8040.get_request(test_utils_rfc8040.URL_CONFIG_ORDM_TOPO)
56         resTopo = responseTopo.json()
57         firstEntry = resTopo['ietf-network:network'][0]['node']
58         for i in range(0, len(firstEntry)):
59             nodeId = firstEntry[i]['node-id']
60             nodeMapId = nodeId.split("-")[0]
61             response = test_utils_rfc8040.get_portmapping_node_info(nodeMapId)
62             self.assertEqual(response['status_code'], requests.codes.ok)
63             nbTp = len(firstEntry[i]['ietf-network-topology:termination-point'])
64             for j in range(0, nbTp):
65                 tpId = firstEntry[i]['ietf-network-topology:termination-point'][j]['tp-id']
66                 if((not "CP" in tpId) and (not "CTP" in tpId)):
67                     response2 = test_utils_rfc8040.portmapping_request(nodeMapId, tpId)
68                     self.assertEqual(response2['status_code'], requests.codes.ok)
69
70     # Disconnect the ROADMA
71     def test_03_disconnect_rdm(self):
72         response = test_utils_rfc8040.unmount_device("ROADMA01")
73         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
74
75 #     #Connect the XPDRA
76     def test_04_connect_xpdr(self):
77         response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
78         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
79
80 #     #Verify the termination points related to XPDR
81     def test_05_compare_Openroadm_topology_portmapping_xpdr(self):
82         self.test_02_compare_Openroadm_topology_portmapping_rdm()
83
84     # Disconnect the XPDRA
85     def test_06_disconnect_device(self):
86         response = test_utils_rfc8040.unmount_device("XPDRA01")
87         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
88
89
90 if __name__ == "__main__":
91     # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
92     #logging.debug('I am there')
93     unittest.main(verbosity=2)