3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
25 class TransportPCEtesting(unittest.TestCase):
30 restconf_baseurl = "http://localhost:8181/restconf"
32 # START_IGNORE_XTESTING
36 cls.odl_process = test_utils.start_tpce()
37 cls.sim_process1 = test_utils.start_sim('xpdra')
38 cls.sim_process2 = test_utils.start_sim('roadma')
41 def tearDownClass(cls):
42 for child in psutil.Process(cls.odl_process.pid).children():
43 child.send_signal(signal.SIGINT)
45 cls.odl_process.send_signal(signal.SIGINT)
46 cls.odl_process.wait()
47 for child in psutil.Process(cls.sim_process1.pid).children():
48 child.send_signal(signal.SIGINT)
50 cls.sim_process1.send_signal(signal.SIGINT)
51 cls.sim_process1.wait()
52 for child in psutil.Process(cls.sim_process2.pid).children():
53 child.send_signal(signal.SIGINT)
55 cls.sim_process2.send_signal(signal.SIGINT)
56 cls.sim_process2.wait()
64 def test_01_connect_rdm(self):
66 url = ("{}/config/network-topology:"
67 "network-topology/topology/topology-netconf/node/ROADM-A1"
68 .format(self.restconf_baseurl))
70 "node-id": "ROADM-A1",
71 "netconf-node-topology:username": "admin",
72 "netconf-node-topology:password": "admin",
73 "netconf-node-topology:host": "127.0.0.1",
74 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
75 "netconf-node-topology:tcp-only": "false",
76 "netconf-node-topology:pass-through": {}}]}
77 headers = {'content-type': 'application/json'}
78 response = requests.request(
79 "PUT", url, data=json.dumps(data), headers=headers,
80 auth=('admin', 'admin'))
81 self.assertEqual(response.status_code, requests.codes.created)
84 # Verify the termination points of the ROADMA
85 def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
86 urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
87 .format(self.restconf_baseurl))
88 headers = {'content-type': 'application/json'}
89 responseTopo = requests.request(
90 "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
91 resTopo = responseTopo.json()
92 nbNode = len(resTopo['network'][0]['node'])
95 for i in range(0, nbNode):
96 nodeId = resTopo['network'][0]['node'][i]['node-id']
97 print("nodeId={}".format(nodeId))
98 nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
99 print("nodeMapId={}".format(nodeMapId))
100 urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
101 urlMapListFull = urlMapList.format(self.restconf_baseurl)
102 responseMapList = requests.request(
103 "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
104 resMapList = responseMapList.json()
106 nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
107 nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
109 for j in range(0, nbTp):
110 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
111 if((not "CP" in tpId) and (not "CTP" in tpId)):
112 urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
113 urlMapFull = urlMap.format(self.restconf_baseurl)
114 responseMap = requests.request(
115 "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
116 self.assertEqual(responseMap.status_code, requests.codes.ok)
117 if(responseMap.status_code == requests.codes.ok):
119 nbMapCumul += nbMapCurrent
120 nbMappings -= nbMapCurrent
121 self.assertEqual(nbMappings, 0)
123 # Disconnect the ROADMA
124 def test_03_disconnect_rdm(self):
125 url = ("{}/config/network-topology:"
126 "network-topology/topology/topology-netconf/node/ROADM-A1"
127 .format(self.restconf_baseurl))
129 headers = {'content-type': 'application/json'}
130 response = requests.request(
131 "DELETE", url, data=json.dumps(data), headers=headers,
132 auth=('admin', 'admin'))
133 self.assertEqual(response.status_code, requests.codes.ok)
136 def test_04_connect_xpdr(self):
138 url = ("{}/config/network-topology:"
139 "network-topology/topology/topology-netconf/node/XPDR-A1"
140 .format(self.restconf_baseurl))
142 "node-id": "XPDR-A1",
143 "netconf-node-topology:username": "admin",
144 "netconf-node-topology:password": "admin",
145 "netconf-node-topology:host": "127.0.0.1",
146 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
147 "netconf-node-topology:tcp-only": "false",
148 "netconf-node-topology:pass-through": {}}]}
149 headers = {'content-type': 'application/json'}
150 response = requests.request(
151 "PUT", url, data=json.dumps(data), headers=headers,
152 auth=('admin', 'admin'))
153 self.assertEqual(response.status_code, requests.codes.created)
156 # #Verify the termination points related to XPDR
157 def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
158 self.test_02_compareOpenroadmTopologyPortMapping_rdm()
160 # Disconnect the XPDRA
161 def test_06_disconnect_device(self):
162 url = ("{}/config/network-topology:"
163 "network-topology/topology/topology-netconf/node/XPDR-A1"
164 .format(self.restconf_baseurl))
166 headers = {'content-type': 'application/json'}
167 response = requests.request(
168 "DELETE", url, data=json.dumps(data), headers=headers,
169 auth=('admin', 'admin'))
170 self.assertEqual(response.status_code, requests.codes.ok)
173 if __name__ == "__main__":
174 # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
175 #logging.debug('I am there')
176 unittest.main(verbosity=2)