3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
18 from common import test_utils
21 class TransportPCEPortMappingTesting(unittest.TestCase):
24 restconf_baseurl = "http://localhost:8181/restconf"
28 cls.processes = test_utils.start_tpce()
29 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
32 def tearDownClass(cls):
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
38 print("execution of {}".format(self.id().split(".")[-1]))
41 # def test_01_restconfAPI(self):
42 # url = ("{}/operational/network-topology:network-topology/topology/"
43 # "topology-netconf/node/controller-config".format(self.restconf_baseurl))
44 # headers = {'content-type': 'application/json'}
45 # response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
46 # self.assertEqual(response.status_code, requests.codes.ok)
47 # res = response.json()
48 # self.assertEqual(res['node'] [0] ['netconf-node-topology:connection-status'],
51 # def test_02_restconfAPI(self):
52 # url = ("{}/config/transportpce-portmapping:network/nodes/controller-config".format(self.restconf_baseurl))
53 # headers = {'content-type': 'application/json'}
54 # response = requests.request(
55 # "GET", url, headers=headers, auth=('admin', 'admin'))
56 # self.assertEqual(response.status_code, requests.codes.not_found)
57 # res = response.json()
59 # {"error-type":"application", "error-tag":"data-missing",
60 # "error-message":"Request could not be completed because the relevant data model content does not exist "},
61 # res['errors']['error'])
63 def test_01_rdm_device_connected(self):
64 url = ("{}/config/network-topology:"
65 "network-topology/topology/topology-netconf/node/ROADMA01"
66 .format(self.restconf_baseurl))
68 "node-id": "ROADMA01",
69 "netconf-node-topology:username": "admin",
70 "netconf-node-topology:password": "admin",
71 "netconf-node-topology:host": "127.0.0.1",
72 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
73 "netconf-node-topology:tcp-only": "false",
74 "netconf-node-topology:pass-through": {}}]}
75 headers = {'content-type': 'application/json'}
76 response = requests.request(
77 "PUT", url, data=json.dumps(data), headers=headers,
78 auth=('admin', 'admin'))
79 self.assertEqual(response.status_code, requests.codes.created)
82 def test_02_rdm_device_connected(self):
83 url = ("{}/operational/network-topology:"
84 "network-topology/topology/topology-netconf/node/ROADMA01"
85 .format(self.restconf_baseurl))
86 headers = {'content-type': 'application/json'}
87 response = requests.request(
88 "GET", url, headers=headers, auth=('admin', 'admin'))
89 self.assertEqual(response.status_code, requests.codes.ok)
92 res['node'][0]['netconf-node-topology:connection-status'],
96 def test_03_rdm_portmapping_info(self):
97 url = ("{}/config/transportpce-portmapping:network/"
98 "nodes/ROADMA01/node-info"
99 .format(self.restconf_baseurl))
100 headers = {'content-type': 'application/json'}
101 response = requests.request(
102 "GET", url, headers=headers, auth=('admin', 'admin'))
103 self.assertEqual(response.status_code, requests.codes.ok)
104 res = response.json()
106 {u'node-info': {u'node-type': u'rdm',
107 u'node-ip-address': u'127.0.0.12',
108 u'node-clli': u'NodeA',
109 u'openroadm-version': u'1.2.1', u'node-vendor': u'vendorA',
110 u'node-model': u'2'}},
114 def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
115 url = ("{}/config/transportpce-portmapping:network/"
116 "nodes/ROADMA01/mapping/DEG1-TTP-TXRX"
117 .format(self.restconf_baseurl))
118 headers = {'content-type': 'application/json'}
119 response = requests.request(
120 "GET", url, headers=headers, auth=('admin', 'admin'))
121 self.assertEqual(response.status_code, requests.codes.ok)
122 res = response.json()
124 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
125 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
128 def test_05_rdm_portmapping_SRG1_PP7_TXRX(self):
129 url = ("{}/config/transportpce-portmapping:network/"
130 "nodes/ROADMA01/mapping/SRG1-PP7-TXRX"
131 .format(self.restconf_baseurl))
132 headers = {'content-type': 'application/json'}
133 response = requests.request(
134 "GET", url, headers=headers, auth=('admin', 'admin'))
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
138 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
139 'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
142 def test_06_rdm_portmapping_SRG3_PP1_TXRX(self):
143 url = ("{}/config/transportpce-portmapping:network/"
144 "nodes/ROADMA01/mapping/SRG3-PP1-TXRX"
145 .format(self.restconf_baseurl))
146 headers = {'content-type': 'application/json'}
147 response = requests.request(
148 "GET", url, headers=headers, auth=('admin', 'admin'))
149 self.assertEqual(response.status_code, requests.codes.ok)
150 res = response.json()
152 {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
153 'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
156 def test_07_xpdr_device_connected(self):
157 url = ("{}/config/network-topology:"
158 "network-topology/topology/topology-netconf/node/XPDRA01"
159 .format(self.restconf_baseurl))
161 "node-id": "XPDRA01",
162 "netconf-node-topology:username": "admin",
163 "netconf-node-topology:password": "admin",
164 "netconf-node-topology:host": "127.0.0.1",
165 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
166 "netconf-node-topology:tcp-only": "false",
167 "netconf-node-topology:pass-through": {}}]}
168 headers = {'content-type': 'application/json'}
169 response = requests.request(
170 "PUT", url, data=json.dumps(data), headers=headers,
171 auth=('admin', 'admin'))
172 self.assertEqual(response.status_code, requests.codes.created)
175 def test_08_xpdr_device_connected(self):
176 url = ("{}/operational/network-topology:"
177 "network-topology/topology/topology-netconf/node/XPDRA01"
178 .format(self.restconf_baseurl))
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "GET", url, headers=headers, auth=('admin', 'admin'))
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
185 res['node'][0]['netconf-node-topology:connection-status'],
189 def test_09_xpdr_portmapping_info(self):
190 url = ("{}/config/transportpce-portmapping:network/"
191 "nodes/XPDRA01/node-info"
192 .format(self.restconf_baseurl))
193 headers = {'content-type': 'application/json'}
194 response = requests.request(
195 "GET", url, headers=headers, auth=('admin', 'admin'))
196 self.assertEqual(response.status_code, requests.codes.ok)
197 res = response.json()
199 {u'node-info': {u'node-type': u'xpdr',
200 u'node-ip-address': u'127.0.0.10',
201 u'node-clli': u'NodeA',
202 u'openroadm-version': u'1.2.1', u'node-vendor': u'vendorA',
203 u'node-model': u'1'}},
207 def test_10_xpdr_portmapping_NETWORK1(self):
208 url = ("{}/config/transportpce-portmapping:network/"
209 "nodes/XPDRA01/mapping/XPDR1-NETWORK1"
210 .format(self.restconf_baseurl))
211 headers = {'content-type': 'application/json'}
212 response = requests.request(
213 "GET", url, headers=headers, auth=('admin', 'admin'))
214 self.assertEqual(response.status_code, requests.codes.ok)
215 res = response.json()
217 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
218 'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
219 'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
220 'lcp-hash-val': '3b3ab304d2a6eb3c3623e52746dbb7aa'},
223 def test_11_xpdr_portmapping_NETWORK2(self):
224 url = ("{}/config/transportpce-portmapping:network/"
225 "nodes/XPDRA01/mapping/XPDR1-NETWORK2"
226 .format(self.restconf_baseurl))
227 headers = {'content-type': 'application/json'}
228 response = requests.request(
229 "GET", url, headers=headers, auth=('admin', 'admin'))
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
233 {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
234 'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
235 'connection-map-lcp': 'XPDR1-CLIENT3', 'port-qual': 'xpdr-network',
236 'lcp-hash-val': '3b3ab304d2a6eb3c3623e52746dbb7a9'},
239 def test_12_xpdr_portmapping_CLIENT1(self):
240 url = ("{}/config/transportpce-portmapping:network/"
241 "nodes/XPDRA01/mapping/XPDR1-CLIENT1"
242 .format(self.restconf_baseurl))
243 headers = {'content-type': 'application/json'}
244 response = requests.request(
245 "GET", url, headers=headers, auth=('admin', 'admin'))
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
249 {'supporting-port': 'C1',
250 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
251 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
252 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
253 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae5'},
256 def test_13_xpdr_portmapping_CLIENT2(self):
257 url = ("{}/config/transportpce-portmapping:network/"
258 "nodes/XPDRA01/mapping/XPDR1-CLIENT2"
259 .format(self.restconf_baseurl))
260 headers = {'content-type': 'application/json'}
261 response = requests.request(
262 "GET", url, headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
266 {'supporting-port': 'C2',
267 'supporting-circuit-pack-name': '1/0/C2-PLUG-CLIENT',
268 'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
269 'port-qual': 'xpdr-client',
270 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae6'},
273 def test_14_xpdr_portmapping_CLIENT3(self):
274 url = ("{}/config/transportpce-portmapping:network/"
275 "nodes/XPDRA01/mapping/XPDR1-CLIENT3"
276 .format(self.restconf_baseurl))
277 headers = {'content-type': 'application/json'}
278 response = requests.request(
279 "GET", url, headers=headers, auth=('admin', 'admin'))
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
283 {'supporting-port': 'C3',
284 'supporting-circuit-pack-name': '1/0/C3-PLUG-CLIENT',
285 'logical-connection-point': 'XPDR1-CLIENT3',
286 'connection-map-lcp': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
287 'port-qual': 'xpdr-client', 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae7'},
290 def test_15_xpdr_portmapping_CLIENT4(self):
291 url = ("{}/config/transportpce-portmapping:network/"
292 "nodes/XPDRA01/mapping/XPDR1-CLIENT4"
293 .format(self.restconf_baseurl))
294 headers = {'content-type': 'application/json'}
295 response = requests.request(
296 "GET", url, headers=headers, auth=('admin', 'admin'))
297 self.assertEqual(response.status_code, requests.codes.ok)
298 res = response.json()
300 {'supporting-port': 'C4',
301 'supporting-circuit-pack-name': '1/0/C4-PLUG-CLIENT',
302 'logical-connection-point': 'XPDR1-CLIENT4', 'port-direction': 'bidirectional',
303 'port-qual': 'xpdr-client', 'lcp-hash-val': '64b8effe7ba72211420bf267d0ca1ae0'},
306 def test_16_xpdr_device_disconnected(self):
307 url = ("{}/config/network-topology:"
308 "network-topology/topology/topology-netconf/node/XPDRA01"
309 .format(self.restconf_baseurl))
310 headers = {'content-type': 'application/json'}
311 response = requests.request(
312 "DELETE", url, headers=headers,
313 auth=('admin', 'admin'))
314 self.assertEqual(response.status_code, requests.codes.ok)
317 def test_17_xpdr_device_disconnected(self):
318 url = ("{}/operational/network-topology:network-topology/topology/"
319 "topology-netconf/node/XPDRA01".format(self.restconf_baseurl))
320 headers = {'content-type': 'application/json'}
321 response = requests.request(
322 "GET", url, headers=headers, auth=('admin', 'admin'))
323 self.assertEqual(response.status_code, requests.codes.not_found)
324 res = response.json()
326 {"error-type": "application", "error-tag": "data-missing",
327 "error-message": "Request could not be completed because the relevant data model content does not exist"},
328 res['errors']['error'])
330 def test_18_xpdr_device_disconnected(self):
331 url = ("{}/config/transportpce-portmapping:network/nodes/XPDRA01".format(self.restconf_baseurl))
332 headers = {'content-type': 'application/json'}
333 response = requests.request(
334 "GET", url, headers=headers, auth=('admin', 'admin'))
335 self.assertEqual(response.status_code, requests.codes.not_found)
336 res = response.json()
338 {"error-type": "application", "error-tag": "data-missing",
339 "error-message": "Request could not be completed because the relevant data model content does not exist"},
340 res['errors']['error'])
342 def test_19_rdm_device_disconnected(self):
343 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01"
344 .format(self.restconf_baseurl))
345 headers = {'content-type': 'application/json'}
346 response = requests.request(
347 "DELETE", url, headers=headers,
348 auth=('admin', 'admin'))
349 self.assertEqual(response.status_code, requests.codes.ok)
352 def test_20_rdm_device_disconnected(self):
353 url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADMA01"
354 .format(self.restconf_baseurl))
355 headers = {'content-type': 'application/json'}
356 response = requests.request(
357 "GET", url, headers=headers, auth=('admin', 'admin'))
358 self.assertEqual(response.status_code, requests.codes.not_found)
359 res = response.json()
361 {"error-type": "application", "error-tag": "data-missing",
362 "error-message": "Request could not be completed because the relevant data model content does not exist"},
363 res['errors']['error'])
365 def test_21_rdm_device_disconnected(self):
366 url = ("{}/config/transportpce-portmapping:network/nodes/ROADMA01".format(self.restconf_baseurl))
367 headers = {'content-type': 'application/json'}
368 response = requests.request(
369 "GET", url, headers=headers, auth=('admin', 'admin'))
370 self.assertEqual(response.status_code, requests.codes.not_found)
371 res = response.json()
373 {"error-type": "application", "error-tag": "data-missing",
374 "error-message": "Request could not be completed because the relevant data model content does not exist"},
375 res['errors']['error'])
378 if __name__ == "__main__":
379 unittest.main(verbosity=2)