3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
30 restconf_baseurl = "http://localhost:8181/restconf"
32 # START_IGNORE_XTESTING
36 cls.sim_process1 = test_utils.start_sim('xpdra')
38 cls.sim_process2 = test_utils.start_sim('roadma')
39 print("all sims started")
41 cls.odl_process = test_utils.start_tpce()
43 print("opendaylight started")
46 def tearDownClass(cls):
47 for child in psutil.Process(cls.odl_process.pid).children():
48 child.send_signal(signal.SIGINT)
50 cls.odl_process.send_signal(signal.SIGINT)
51 cls.odl_process.wait()
52 for child in psutil.Process(cls.sim_process1.pid).children():
53 child.send_signal(signal.SIGINT)
55 cls.sim_process1.send_signal(signal.SIGINT)
56 cls.sim_process1.wait()
57 for child in psutil.Process(cls.sim_process2.pid).children():
58 child.send_signal(signal.SIGINT)
60 cls.sim_process2.send_signal(signal.SIGINT)
61 cls.sim_process2.wait()
69 def test_01_connect_rdm(self):
71 url = ("{}/config/network-topology:"
72 "network-topology/topology/topology-netconf/node/ROADM-A1"
73 .format(self.restconf_baseurl))
75 "node-id": "ROADM-A1",
76 "netconf-node-topology:username": "admin",
77 "netconf-node-topology:password": "admin",
78 "netconf-node-topology:host": "127.0.0.1",
79 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
80 "netconf-node-topology:tcp-only": "false",
81 "netconf-node-topology:pass-through": {}}]}
82 headers = {'content-type': 'application/json'}
83 response = requests.request(
84 "PUT", url, data=json.dumps(data), headers=headers,
85 auth=('admin', 'admin'))
86 self.assertEqual(response.status_code, requests.codes.created)
89 # Verify the termination points of the ROADMA
90 def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
91 urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
92 .format(self.restconf_baseurl))
93 headers = {'content-type': 'application/json'}
94 responseTopo = requests.request(
95 "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
96 resTopo = responseTopo.json()
97 nbNode = len(resTopo['network'][0]['node'])
100 for i in range(0, nbNode):
101 nodeId = resTopo['network'][0]['node'][i]['node-id']
102 print("nodeId={}".format(nodeId))
103 nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
104 print("nodeMapId={}".format(nodeMapId))
105 urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
106 urlMapListFull = urlMapList.format(self.restconf_baseurl)
107 responseMapList = requests.request(
108 "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
109 resMapList = responseMapList.json()
111 nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
112 nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
114 for j in range(0, nbTp):
115 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
116 if((not "CP" in tpId) and (not "CTP" in tpId)):
117 urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
118 urlMapFull = urlMap.format(self.restconf_baseurl)
119 responseMap = requests.request(
120 "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
121 self.assertEqual(responseMap.status_code, requests.codes.ok)
122 if(responseMap.status_code == requests.codes.ok):
124 nbMapCumul += nbMapCurrent
125 nbMappings -= nbMapCurrent
126 self.assertEqual(nbMappings, 0)
128 # Disconnect the ROADMA
129 def test_03_disconnect_rdm(self):
130 url = ("{}/config/network-topology:"
131 "network-topology/topology/topology-netconf/node/ROADM-A1"
132 .format(self.restconf_baseurl))
134 headers = {'content-type': 'application/json'}
135 response = requests.request(
136 "DELETE", url, data=json.dumps(data), headers=headers,
137 auth=('admin', 'admin'))
138 self.assertEqual(response.status_code, requests.codes.ok)
141 def test_04_connect_xpdr(self):
143 url = ("{}/config/network-topology:"
144 "network-topology/topology/topology-netconf/node/XPDR-A1"
145 .format(self.restconf_baseurl))
147 "node-id": "XPDR-A1",
148 "netconf-node-topology:username": "admin",
149 "netconf-node-topology:password": "admin",
150 "netconf-node-topology:host": "127.0.0.1",
151 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
152 "netconf-node-topology:tcp-only": "false",
153 "netconf-node-topology:pass-through": {}}]}
154 headers = {'content-type': 'application/json'}
155 response = requests.request(
156 "PUT", url, data=json.dumps(data), headers=headers,
157 auth=('admin', 'admin'))
158 self.assertEqual(response.status_code, requests.codes.created)
161 # #Verify the termination points related to XPDR
162 def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
163 self.test_02_compareOpenroadmTopologyPortMapping_rdm()
165 # Disconnect the XPDRA
166 def test_06_disconnect_device(self):
167 url = ("{}/config/network-topology:"
168 "network-topology/topology/topology-netconf/node/XPDR-A1"
169 .format(self.restconf_baseurl))
171 headers = {'content-type': 'application/json'}
172 response = requests.request(
173 "DELETE", url, data=json.dumps(data), headers=headers,
174 auth=('admin', 'admin'))
175 self.assertEqual(response.status_code, requests.codes.ok)
178 if __name__ == "__main__":
179 # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
180 #logging.debug('I am there')
181 unittest.main(verbosity=2)