3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
20 import test_utils_rfc8040 # nopep8
23 class TransportPCEtesting(unittest.TestCase):
26 NODE_VERSION = '2.2.1'
30 cls.processes = test_utils_rfc8040.start_tpce()
31 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
34 def tearDownClass(cls):
35 # pylint: disable=not-an-iterable
36 for process in cls.processes:
37 test_utils_rfc8040.shutdown_process(process)
38 print("all processes killed")
44 def test_01_connect_rdm(self):
45 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
46 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
48 # Verify the termination points of the ROADMA
49 def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
50 responseTopo = test_utils_rfc8040.get_request(test_utils_rfc8040.URL_CONFIG_ORDM_TOPO)
51 resTopo = responseTopo.json()
52 firstEntry = resTopo['ietf-network:network'][0]['node']
55 for i in range(0, len(firstEntry)):
56 nodeId = firstEntry[i]['node-id']
57 print("nodeId={}".format(nodeId))
58 nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
59 print("nodeMapId={}".format(nodeMapId))
60 response = test_utils_rfc8040.get_portmapping_node_info(nodeMapId)
61 self.assertEqual(response['status_code'], requests.codes.ok)
62 responseMapList = test_utils_rfc8040.get_portmapping(nodeMapId)
63 nbMappings = len(responseMapList['nodes'][0]['mapping']) - nbMapCumul
64 nbTp = len(firstEntry[i]['ietf-network-topology:termination-point'])
66 for j in range(0, nbTp):
67 tpId = firstEntry[i]['ietf-network-topology:termination-point'][j]['tp-id']
68 if (not "CP" in tpId) and (not "CTP" in tpId):
69 responseMap = test_utils_rfc8040.portmapping_request(nodeMapId, tpId)
70 self.assertEqual(responseMap['status_code'], requests.codes.ok)
71 if responseMap['status_code'] == requests.codes.ok:
73 nbMapCumul += nbMapCurrent
74 nbMappings -= nbMapCurrent
75 self.assertEqual(nbMappings, 0)
77 # Disconnect the ROADMA
78 def test_03_disconnect_rdm(self):
79 response = test_utils_rfc8040.unmount_device("ROADM-A1")
80 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
83 def test_04_connect_xpdr(self):
84 response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
85 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
87 # #Verify the termination points related to XPDR
88 def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
89 self.test_02_compareOpenroadmTopologyPortMapping_rdm()
91 # Disconnect the XPDRA
92 def test_06_disconnect_device(self):
93 response = test_utils_rfc8040.unmount_device("XPDR-A1")
94 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
97 if __name__ == "__main__":
98 # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
99 #logging.debug('I am there')
100 unittest.main(verbosity=2)