3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
23 class TransportPCEtesting(unittest.TestCase):
25 testtools_process = None
27 restconf_baseurl = "http://127.0.0.1:8181/restconf"
30 def __start_testtools(cls):
31 executable = ("./netconf/netconf/tools/netconf-testtool/target/"
32 "netconf-testtool-1.3.1-executable.jar")
33 if os.path.isfile(executable):
34 with open('testtools.log', 'w') as outfile:
35 cls.testtools_process = subprocess.Popen(
36 ["java", "-jar", executable, "--schemas-dir", "schemas",
37 "--initial-config-xml", "sample-config-ROADM.xml"],
42 executable = "../karaf/target/assembly/bin/karaf"
43 with open('odl.log', 'w') as outfile:
44 cls.odl_process = subprocess.Popen(
45 ["bash", executable], stdout=outfile,
46 stdin=open(os.devnull))
50 cls.__start_testtools()
55 def tearDownClass(cls):
56 cls.testtools_process.send_signal(signal.SIGINT)
57 cls.testtools_process.wait()
58 for child in psutil.Process(cls.odl_process.pid).children():
59 child.send_signal(signal.SIGINT)
61 cls.odl_process.send_signal(signal.SIGINT)
62 cls.odl_process.wait()
67 def test_01_connect_device(self):
68 url = ("{}/config/network-topology:"
69 "network-topology/topology/topology-netconf/node/ROADMA"
70 .format(self.restconf_baseurl))
73 "netconf-node-topology:username": "admin",
74 "netconf-node-topology:password": "admin",
75 "netconf-node-topology:host": "127.0.0.1",
76 "netconf-node-topology:port": "17830",
77 "netconf-node-topology:tcp-only": "false",
78 "netconf-node-topology:pass-through": {}}]}
79 headers = {'content-type': 'application/json'}
80 response = requests.request(
81 "PUT", url, data=json.dumps(data), headers=headers,
82 auth=('admin', 'admin'))
83 self.assertEqual(response.status_code, requests.codes.created)
86 def test_02_device_connected(self):
87 url = ("{}/operational/network-topology:"
88 "network-topology/topology/topology-netconf/node/ROADMA"
89 .format(self.restconf_baseurl))
90 headers = {'content-type': 'application/json'}
91 response = requests.request(
92 "GET", url, headers=headers, auth=('admin', 'admin'))
93 self.assertEqual(response.status_code, requests.codes.ok)
96 res['node'][0]['netconf-node-topology:connection-status'],
100 def test_03_portmapping_SRG1_PP3_TXRX(self):
101 url = ("{}/config/portmapping:network/"
102 "nodes/ROADMA/mapping/SRG1-PP3-TXRX"
103 .format(self.restconf_baseurl))
104 headers = {'content-type': 'application/json'}
105 response = requests.request(
106 "GET", url, headers=headers, auth=('admin', 'admin'))
107 self.assertEqual(response.status_code, requests.codes.ok)
108 res = response.json()
110 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
111 'logical-connection-point': 'SRG1-PP3-TXRX'},
114 def test_04_portmapping_SRG1_PP6_TXRX(self):
115 url = ("{}/config/portmapping:network/"
116 "nodes/ROADMA/mapping/SRG1-PP6-TXRX"
117 .format(self.restconf_baseurl))
118 headers = {'content-type': 'application/json'}
119 response = requests.request(
120 "GET", url, headers=headers, auth=('admin', 'admin'))
121 self.assertEqual(response.status_code, requests.codes.ok)
122 res = response.json()
124 {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
125 'logical-connection-point': 'SRG1-PP6-TXRX'},
128 def test_05_portmapping_DEG1_TTP_TXRX(self):
129 url = ("{}/config/portmapping:network/"
130 "nodes/ROADMA/mapping/DEG1-TTP-TXRX"
131 .format(self.restconf_baseurl))
132 headers = {'content-type': 'application/json'}
133 response = requests.request(
134 "GET", url, headers=headers, auth=('admin', 'admin'))
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
138 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
139 'logical-connection-point': 'DEG1-TTP-TXRX'},
142 def test_06_portmapping_SRG1_PP9_TXRX(self):
143 url = ("{}/config/portmapping:network/"
144 "nodes/ROADMA/mapping/SRG1-PP9-TXRX"
145 .format(self.restconf_baseurl))
146 headers = {'content-type': 'application/json'}
147 response = requests.request(
148 "GET", url, headers=headers, auth=('admin', 'admin'))
149 self.assertEqual(response.status_code, requests.codes.ok)
150 res = response.json()
152 {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
153 'logical-connection-point': 'SRG1-PP9-TXRX'},
156 def test_07_portmapping_SRG1_PP16_TXRX(self):
157 url = ("{}/config/portmapping:network/"
158 "nodes/ROADMA/mapping/SRG1-PP16-TXRX"
159 .format(self.restconf_baseurl))
160 headers = {'content-type': 'application/json'}
161 response = requests.request(
162 "GET", url, headers=headers, auth=('admin', 'admin'))
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
166 {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
167 'logical-connection-point': 'SRG1-PP16-TXRX'},
170 def test_08_portmapping_SRG1_PP4_TXRX(self):
171 url = ("{}/config/portmapping:network/"
172 "nodes/ROADMA/mapping/SRG1-PP4-TXRX"
173 .format(self.restconf_baseurl))
174 headers = {'content-type': 'application/json'}
175 response = requests.request(
176 "GET", url, headers=headers, auth=('admin', 'admin'))
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
180 {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
181 'logical-connection-point': 'SRG1-PP4-TXRX'},
184 def test_09_portmapping_SRG1_PP2_TXRX(self):
185 url = ("{}/config/portmapping:network/"
186 "nodes/ROADMA/mapping/SRG1-PP2-TXRX"
187 .format(self.restconf_baseurl))
188 headers = {'content-type': 'application/json'}
189 response = requests.request(
190 "GET", url, headers=headers, auth=('admin', 'admin'))
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
194 {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
195 'logical-connection-point': 'SRG1-PP2-TXRX'},
198 def test_10_portmapping_SRG1_PP14_TXRX(self):
199 url = ("{}/config/portmapping:network/"
200 "nodes/ROADMA/mapping/SRG1-PP14-TXRX"
201 .format(self.restconf_baseurl))
202 headers = {'content-type': 'application/json'}
203 response = requests.request(
204 "GET", url, headers=headers, auth=('admin', 'admin'))
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
208 {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
209 'logical-connection-point': 'SRG1-PP14-TXRX'},
212 def test_11_portmapping_SRG1_PP11_TXRX(self):
213 url = ("{}/config/portmapping:network/"
214 "nodes/ROADMA/mapping/SRG1-PP11-TXRX"
215 .format(self.restconf_baseurl))
216 headers = {'content-type': 'application/json'}
217 response = requests.request(
218 "GET", url, headers=headers, auth=('admin', 'admin'))
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
222 {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
223 'logical-connection-point': 'SRG1-PP11-TXRX'},
226 def test_12_portmapping_SRG1_PP7_TXRX(self):
227 url = ("{}/config/portmapping:network/"
228 "nodes/ROADMA/mapping/SRG1-PP7-TXRX"
229 .format(self.restconf_baseurl))
230 headers = {'content-type': 'application/json'}
231 response = requests.request(
232 "GET", url, headers=headers, auth=('admin', 'admin'))
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
236 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
237 'logical-connection-point': 'SRG1-PP7-TXRX'},
240 def test_13_portmapping_DEG2_TTP_TXRX(self):
241 url = ("{}/config/portmapping:network/"
242 "nodes/ROADMA/mapping/DEG2-TTP-TXRX"
243 .format(self.restconf_baseurl))
244 headers = {'content-type': 'application/json'}
245 response = requests.request(
246 "GET", url, headers=headers, auth=('admin', 'admin'))
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
250 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
251 'logical-connection-point': 'DEG2-TTP-TXRX'},
254 def test_14_portmapping_DEG2_TTP_TXRX(self):
255 url = ("{}/config/portmapping:network/"
256 "nodes/ROADMA/mapping/DEG2-TTP-TXRX"
257 .format(self.restconf_baseurl))
258 headers = {'content-type': 'application/json'}
259 response = requests.request(
260 "GET", url, headers=headers, auth=('admin', 'admin'))
261 self.assertEqual(response.status_code, requests.codes.ok)
262 res = response.json()
264 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
265 'logical-connection-point': 'DEG2-TTP-TXRX'},
268 def test_15_portmapping_SRG1_PP12_TXRX(self):
269 url = ("{}/config/portmapping:network/"
270 "nodes/ROADMA/mapping/SRG1-PP12-TXRX"
271 .format(self.restconf_baseurl))
272 headers = {'content-type': 'application/json'}
273 response = requests.request(
274 "GET", url, headers=headers, auth=('admin', 'admin'))
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
278 {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
279 'logical-connection-point': 'SRG1-PP12-TXRX'},
282 def test_16_portmapping_SRG1_PP8_TXRX(self):
283 url = ("{}/config/portmapping:network/"
284 "nodes/ROADMA/mapping/SRG1-PP8-TXRX"
285 .format(self.restconf_baseurl))
286 headers = {'content-type': 'application/json'}
287 response = requests.request(
288 "GET", url, headers=headers, auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
292 {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
293 'logical-connection-point': 'SRG1-PP8-TXRX'},
296 def test_17_portmapping_SRG1_PP5_TXRX(self):
297 url = ("{}/config/portmapping:network/"
298 "nodes/ROADMA/mapping/SRG1-PP5-TXRX"
299 .format(self.restconf_baseurl))
300 headers = {'content-type': 'application/json'}
301 response = requests.request(
302 "GET", url, headers=headers, auth=('admin', 'admin'))
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
306 {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
307 'logical-connection-point': 'SRG1-PP5-TXRX'},
310 def test_18_portmapping_SRG1_PP13_TXRX(self):
311 url = ("{}/config/portmapping:network/"
312 "nodes/ROADMA/mapping/SRG1-PP13-TXRX"
313 .format(self.restconf_baseurl))
314 headers = {'content-type': 'application/json'}
315 response = requests.request(
316 "GET", url, headers=headers, auth=('admin', 'admin'))
317 self.assertEqual(response.status_code, requests.codes.ok)
318 res = response.json()
320 {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
321 'logical-connection-point': 'SRG1-PP13-TXRX'},
324 def test_19_portmapping_SRG1_PP15_TXRX(self):
325 url = ("{}/config/portmapping:network/"
326 "nodes/ROADMA/mapping/SRG1-PP15-TXRX"
327 .format(self.restconf_baseurl))
328 headers = {'content-type': 'application/json'}
329 response = requests.request(
330 "GET", url, headers=headers, auth=('admin', 'admin'))
331 self.assertEqual(response.status_code, requests.codes.ok)
332 res = response.json()
334 {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
335 'logical-connection-point': 'SRG1-PP15-TXRX'},
338 def test_20_portmapping_SRG1_PP10_TXRX(self):
339 url = ("{}/config/portmapping:network/"
340 "nodes/ROADMA/mapping/SRG1-PP10-TXRX"
341 .format(self.restconf_baseurl))
342 headers = {'content-type': 'application/json'}
343 response = requests.request(
344 "GET", url, headers=headers, auth=('admin', 'admin'))
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
348 {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
349 'logical-connection-point': 'SRG1-PP10-TXRX'},
352 def test_21_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
353 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
354 data = {"renderer:input": {
355 "renderer:service-name": "service_32",
356 "renderer:wave-number": "32",
357 "renderer:operation": "create",
359 {"renderer:node-id": "ROADMA",
360 "renderer:src-tp": "DEG1-TTP-TXRX",
361 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
362 headers = {'content-type': 'application/json'}
363 response = requests.request(
364 "POST", url, data=json.dumps(data),
365 headers=headers, auth=('admin', 'admin'))
366 self.assertEqual(response.status_code, requests.codes.ok)
367 self.assertEqual(response.json(), {
370 'Roadm-connection successfully created for nodes [ROADMA]'}})
372 def test_22_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
373 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
374 data = {"renderer:input": {
375 "renderer:service-name": "service_32",
376 "renderer:wave-number": "32",
377 "renderer:operation": "create",
379 {"renderer:node-id": "ROADMA",
380 "renderer:src-tp": "SRG1-PP3-TXRX",
381 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
382 headers = {'content-type': 'application/json'}
383 response = requests.request(
384 "POST", url, data=json.dumps(data),
385 headers=headers, auth=('admin', 'admin'))
386 self.assertEqual(response.status_code, requests.codes.ok)
387 self.assertEqual(response.json(), {
390 'Roadm-connection successfully created for nodes [ROADMA]'}})
392 def test_23_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
393 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
394 data = {"renderer:input": {
395 "renderer:service-name": "service_32",
396 "renderer:wave-number": "32",
397 "renderer:operation": "delete",
399 {"renderer:node-id": "ROADMA",
400 "renderer:src-tp": "DEG1-TTP-TXRX",
401 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
402 headers = {'content-type': 'application/json'}
403 response = requests.request(
404 "POST", url, data=json.dumps(data),
405 headers=headers, auth=('admin', 'admin'))
406 self.assertEqual(response.status_code, requests.codes.ok)
407 self.assertEqual(response.json(), {
408 'output': {'result': 'Request processed'}})
410 def test_24_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
411 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
412 data = {"renderer:input": {
413 "renderer:service-name": "service_32",
414 "renderer:wave-number": "32",
415 "renderer:operation": "delete",
417 {"renderer:node-id": "ROADMA",
418 "renderer:src-tp": "SRG1-PP3-TXRX",
419 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
420 headers = {'content-type': 'application/json'}
421 response = requests.request(
422 "POST", url, data=json.dumps(data),
423 headers=headers, auth=('admin', 'admin'))
424 self.assertEqual(response.status_code, requests.codes.ok)
425 self.assertEqual(response.json(), {
426 'output': {'result': 'Request processed'}})
429 if __name__ == "__main__":
430 unittest.main(verbosity=2)