3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
18 from common import test_utils
21 class TransportPCEtesting(unittest.TestCase):
24 restconf_baseurl = "http://localhost:8181/restconf"
28 cls.processes = test_utils.start_tpce()
29 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
32 def tearDownClass(cls):
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
41 def test_01_connect_rdm(self):
43 url = ("{}/config/network-topology:"
44 "network-topology/topology/topology-netconf/node/ROADMA01"
45 .format(self.restconf_baseurl))
47 "node-id": "ROADMA01",
48 "netconf-node-topology:username": "admin",
49 "netconf-node-topology:password": "admin",
50 "netconf-node-topology:host": "127.0.0.1",
51 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
52 "netconf-node-topology:tcp-only": "false",
53 "netconf-node-topology:pass-through": {}}]}
54 headers = {'content-type': 'application/json'}
55 response = requests.request(
56 "PUT", url, data=json.dumps(data), headers=headers,
57 auth=('admin', 'admin'))
58 self.assertEqual(response.status_code, requests.codes.created)
61 # Verify the termination points of the ROADMA
62 def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
63 urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
64 .format(self.restconf_baseurl))
65 headers = {'content-type': 'application/json'}
66 responseTopo = requests.request("GET", urlTopo, headers=headers, auth=('admin', 'admin'))
67 resTopo = responseTopo.json()
68 nbNode = len(resTopo['network'][0]['node'])
71 for i in range(0, nbNode):
72 nodeId = resTopo['network'][0]['node'][i]['node-id']
73 nodeMapId = nodeId.split("-")[0]
74 urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
75 urlMapListFull = urlMapList.format(self.restconf_baseurl)
76 responseMapList = requests.request("GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
77 resMapList = responseMapList.json()
79 nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
80 nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
82 for j in range(0, nbTp):
83 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
84 if((not "CP" in tpId) and (not "CTP" in tpId)):
85 urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
86 urlMapFull = urlMap.format(self.restconf_baseurl)
87 responseMap = requests.request("GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
88 self.assertEqual(responseMap.status_code, requests.codes.ok)
89 if responseMap.status_code == requests.codes.ok:
91 nbMapCumul += nbMapCurrent
92 nbMappings -= nbMapCurrent
93 self.assertEqual(nbMappings, 0)
95 # Disconnect the ROADMA
96 def test_03_disconnect_rdm(self):
97 url = ("{}/config/network-topology:"
98 "network-topology/topology/topology-netconf/node/ROADMA01"
99 .format(self.restconf_baseurl))
101 headers = {'content-type': 'application/json'}
102 response = requests.request(
103 "DELETE", url, data=json.dumps(data), headers=headers,
104 auth=('admin', 'admin'))
105 self.assertEqual(response.status_code, requests.codes.ok)
108 def test_04_connect_xpdr(self):
110 url = ("{}/config/network-topology:"
111 "network-topology/topology/topology-netconf/node/XPDRA01"
112 .format(self.restconf_baseurl))
114 "node-id": "XPDRA01",
115 "netconf-node-topology:username": "admin",
116 "netconf-node-topology:password": "admin",
117 "netconf-node-topology:host": "127.0.0.1",
118 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
119 "netconf-node-topology:tcp-only": "false",
120 "netconf-node-topology:pass-through": {}}]}
121 headers = {'content-type': 'application/json'}
122 response = requests.request(
123 "PUT", url, data=json.dumps(data), headers=headers,
124 auth=('admin', 'admin'))
125 self.assertEqual(response.status_code, requests.codes.created)
128 # #Verify the termination points related to XPDR
129 def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
130 self.test_02_compareOpenroadmTopologyPortMapping_rdm()
132 # Disconnect the XPDRA
133 def test_06_disconnect_device(self):
134 url = ("{}/config/network-topology:"
135 "network-topology/topology/topology-netconf/node/XPDRA01"
136 .format(self.restconf_baseurl))
138 headers = {'content-type': 'application/json'}
139 response = requests.request(
140 "DELETE", url, data=json.dumps(data), headers=headers,
141 auth=('admin', 'admin'))
142 self.assertEqual(response.status_code, requests.codes.ok)
145 if __name__ == "__main__":
146 # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
147 #logging.debug('I am there')
148 unittest.main(verbosity=2)