3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
21 class TransportPCEtesting(unittest.TestCase):
26 restconf_baseurl = "http://localhost:8181/restconf"
28 # START_IGNORE_XTESTING
32 cls.sim_process1 = test_utils.start_sim('xpdra')
35 cls.sim_process2 = test_utils.start_sim('roadma')
38 cls.odl_process = test_utils.start_tpce()
40 print("opendaylight started")
43 def tearDownClass(cls):
44 for child in psutil.Process(cls.odl_process.pid).children():
45 child.send_signal(signal.SIGINT)
47 cls.odl_process.send_signal(signal.SIGINT)
48 cls.odl_process.wait()
49 for child in psutil.Process(cls.sim_process1.pid).children():
50 child.send_signal(signal.SIGINT)
52 cls.sim_process1.send_signal(signal.SIGINT)
53 cls.sim_process1.wait()
54 for child in psutil.Process(cls.sim_process2.pid).children():
55 child.send_signal(signal.SIGINT)
57 cls.sim_process2.send_signal(signal.SIGINT)
58 cls.sim_process2.wait()
66 def test_01_connect_rdm(self):
68 url = ("{}/config/network-topology:"
69 "network-topology/topology/topology-netconf/node/ROADMA01"
70 .format(self.restconf_baseurl))
72 "node-id": "ROADMA01",
73 "netconf-node-topology:username": "admin",
74 "netconf-node-topology:password": "admin",
75 "netconf-node-topology:host": "127.0.0.1",
76 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
77 "netconf-node-topology:tcp-only": "false",
78 "netconf-node-topology:pass-through": {}}]}
79 headers = {'content-type': 'application/json'}
80 response = requests.request(
81 "PUT", url, data=json.dumps(data), headers=headers,
82 auth=('admin', 'admin'))
83 self.assertEqual(response.status_code, requests.codes.created)
86 # Verify the termination points of the ROADMA
87 def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
88 urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
89 .format(self.restconf_baseurl))
90 headers = {'content-type': 'application/json'}
91 responseTopo = requests.request("GET", urlTopo, headers=headers, auth=('admin', 'admin'))
92 resTopo = responseTopo.json()
93 nbNode = len(resTopo['network'][0]['node'])
96 for i in range(0, nbNode):
97 nodeId = resTopo['network'][0]['node'][i]['node-id']
98 nodeMapId = nodeId.split("-")[0]
99 urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
100 urlMapListFull = urlMapList.format(self.restconf_baseurl)
101 responseMapList = requests.request("GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
102 resMapList = responseMapList.json()
104 nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
105 nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
107 for j in range(0, nbTp):
108 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
109 if((not "CP" in tpId) and (not "CTP" in tpId)):
110 urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
111 urlMapFull = urlMap.format(self.restconf_baseurl)
112 responseMap = requests.request("GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
113 self.assertEqual(responseMap.status_code, requests.codes.ok)
114 if responseMap.status_code == requests.codes.ok:
116 nbMapCumul += nbMapCurrent
117 nbMappings -= nbMapCurrent
118 self.assertEqual(nbMappings, 0)
120 # Disconnect the ROADMA
121 def test_03_disconnect_rdm(self):
122 url = ("{}/config/network-topology:"
123 "network-topology/topology/topology-netconf/node/ROADMA01"
124 .format(self.restconf_baseurl))
126 headers = {'content-type': 'application/json'}
127 response = requests.request(
128 "DELETE", url, data=json.dumps(data), headers=headers,
129 auth=('admin', 'admin'))
130 self.assertEqual(response.status_code, requests.codes.ok)
133 def test_04_connect_xpdr(self):
135 url = ("{}/config/network-topology:"
136 "network-topology/topology/topology-netconf/node/XPDRA01"
137 .format(self.restconf_baseurl))
139 "node-id": "XPDRA01",
140 "netconf-node-topology:username": "admin",
141 "netconf-node-topology:password": "admin",
142 "netconf-node-topology:host": "127.0.0.1",
143 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
144 "netconf-node-topology:tcp-only": "false",
145 "netconf-node-topology:pass-through": {}}]}
146 headers = {'content-type': 'application/json'}
147 response = requests.request(
148 "PUT", url, data=json.dumps(data), headers=headers,
149 auth=('admin', 'admin'))
150 self.assertEqual(response.status_code, requests.codes.created)
153 # #Verify the termination points related to XPDR
154 def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
155 self.test_02_compareOpenroadmTopologyPortMapping_rdm()
157 # Disconnect the XPDRA
158 def test_06_disconnect_device(self):
159 url = ("{}/config/network-topology:"
160 "network-topology/topology/topology-netconf/node/XPDRA01"
161 .format(self.restconf_baseurl))
163 headers = {'content-type': 'application/json'}
164 response = requests.request(
165 "DELETE", url, data=json.dumps(data), headers=headers,
166 auth=('admin', 'admin'))
167 self.assertEqual(response.status_code, requests.codes.ok)
170 if __name__ == "__main__":
171 # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
172 #logging.debug('I am there')
173 unittest.main(verbosity=2)