2 ##############################################################################
3 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
22 class TransportPCEtesting(unittest.TestCase):
24 testtools_process = None
26 restconf_baseurl = "http://127.0.0.1:8181/restconf"
29 def __start_testtools(cls):
30 executable = ("./netconf/netconf/tools/netconf-testtool/target/"
31 "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
32 if os.path.isfile(executable):
33 with open('testtools.log', 'w') as outfile:
34 cls.testtools_process = subprocess.Popen(
35 ["java", "-jar", executable, "--schemas-dir", "schemas",
36 "--initial-config-xml", "sample-config-ROADM.xml"],
41 executable = "../karaf/target/assembly/bin/karaf"
42 with open('odl.log', 'w') as outfile:
43 cls.odl_process = subprocess.Popen(
44 ["bash", executable], stdout=outfile,
45 stdin=open(os.devnull))
49 cls.__start_testtools()
54 def tearDownClass(cls):
55 cls.testtools_process.send_signal(signal.SIGINT)
56 cls.testtools_process.wait()
57 for child in psutil.Process(cls.odl_process.pid).children():
58 child.send_signal(signal.SIGINT)
60 cls.odl_process.send_signal(signal.SIGINT)
61 cls.odl_process.wait()
66 def test_01_connect_device(self):
67 url = ("{}/config/network-topology:"
68 "network-topology/topology/topology-netconf/node/ROADMA"
69 .format(self.restconf_baseurl))
72 "netconf-node-topology:username": "admin",
73 "netconf-node-topology:password": "admin",
74 "netconf-node-topology:host": "127.0.0.1",
75 "netconf-node-topology:port": "17830",
76 "netconf-node-topology:tcp-only": "false",
77 "netconf-node-topology:pass-through": {}}]}
78 headers = {'content-type': 'application/json'}
79 response = requests.request(
80 "PUT", url, data=json.dumps(data), headers=headers,
81 auth=('admin', 'admin'))
82 self.assertEqual(response.status_code, requests.codes.created)
85 def test_02_device_connected(self):
86 url = ("{}/operational/network-topology:"
87 "network-topology/topology/topology-netconf/node/ROADMA"
88 .format(self.restconf_baseurl))
89 headers = {'content-type': 'application/json'}
90 response = requests.request(
91 "GET", url, headers=headers, auth=('admin', 'admin'))
92 self.assertEqual(response.status_code, requests.codes.ok)
95 res['node'][0]['netconf-node-topology:connection-status'],
99 def test_03_portmapping_SRG1_PP3_TXRX(self):
100 url = ("{}/config/portmapping:network/"
101 "nodes/ROADMA/mapping/SRG1-PP3-TXRX"
102 .format(self.restconf_baseurl))
103 headers = {'content-type': 'application/json'}
104 response = requests.request(
105 "GET", url, headers=headers, auth=('admin', 'admin'))
106 self.assertEqual(response.status_code, requests.codes.ok)
107 res = response.json()
109 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
110 'logical-connection-point': 'SRG1-PP3-TXRX'},
113 def test_04_portmapping_SRG1_PP6_TXRX(self):
114 url = ("{}/config/portmapping:network/"
115 "nodes/ROADMA/mapping/SRG1-PP6-TXRX"
116 .format(self.restconf_baseurl))
117 headers = {'content-type': 'application/json'}
118 response = requests.request(
119 "GET", url, headers=headers, auth=('admin', 'admin'))
120 self.assertEqual(response.status_code, requests.codes.ok)
121 res = response.json()
123 {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
124 'logical-connection-point': 'SRG1-PP6-TXRX'},
127 def test_05_portmapping_DEG1_TTP_TXRX(self):
128 url = ("{}/config/portmapping:network/"
129 "nodes/ROADMA/mapping/DEG1-TTP-TXRX"
130 .format(self.restconf_baseurl))
131 headers = {'content-type': 'application/json'}
132 response = requests.request(
133 "GET", url, headers=headers, auth=('admin', 'admin'))
134 self.assertEqual(response.status_code, requests.codes.ok)
135 res = response.json()
137 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
138 'logical-connection-point': 'DEG1-TTP-TXRX'},
141 def test_06_portmapping_SRG1_PP9_TXRX(self):
142 url = ("{}/config/portmapping:network/"
143 "nodes/ROADMA/mapping/SRG1-PP9-TXRX"
144 .format(self.restconf_baseurl))
145 headers = {'content-type': 'application/json'}
146 response = requests.request(
147 "GET", url, headers=headers, auth=('admin', 'admin'))
148 self.assertEqual(response.status_code, requests.codes.ok)
149 res = response.json()
151 {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
152 'logical-connection-point': 'SRG1-PP9-TXRX'},
155 def test_07_portmapping_SRG1_PP16_TXRX(self):
156 url = ("{}/config/portmapping:network/"
157 "nodes/ROADMA/mapping/SRG1-PP16-TXRX"
158 .format(self.restconf_baseurl))
159 headers = {'content-type': 'application/json'}
160 response = requests.request(
161 "GET", url, headers=headers, auth=('admin', 'admin'))
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
165 {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
166 'logical-connection-point': 'SRG1-PP16-TXRX'},
169 def test_08_portmapping_SRG1_PP4_TXRX(self):
170 url = ("{}/config/portmapping:network/"
171 "nodes/ROADMA/mapping/SRG1-PP4-TXRX"
172 .format(self.restconf_baseurl))
173 headers = {'content-type': 'application/json'}
174 response = requests.request(
175 "GET", url, headers=headers, auth=('admin', 'admin'))
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
179 {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
180 'logical-connection-point': 'SRG1-PP4-TXRX'},
183 def test_09_portmapping_SRG1_PP2_TXRX(self):
184 url = ("{}/config/portmapping:network/"
185 "nodes/ROADMA/mapping/SRG1-PP2-TXRX"
186 .format(self.restconf_baseurl))
187 headers = {'content-type': 'application/json'}
188 response = requests.request(
189 "GET", url, headers=headers, auth=('admin', 'admin'))
190 self.assertEqual(response.status_code, requests.codes.ok)
191 res = response.json()
193 {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
194 'logical-connection-point': 'SRG1-PP2-TXRX'},
197 def test_10_portmapping_SRG1_PP14_TXRX(self):
198 url = ("{}/config/portmapping:network/"
199 "nodes/ROADMA/mapping/SRG1-PP14-TXRX"
200 .format(self.restconf_baseurl))
201 headers = {'content-type': 'application/json'}
202 response = requests.request(
203 "GET", url, headers=headers, auth=('admin', 'admin'))
204 self.assertEqual(response.status_code, requests.codes.ok)
205 res = response.json()
207 {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
208 'logical-connection-point': 'SRG1-PP14-TXRX'},
211 def test_11_portmapping_SRG1_PP11_TXRX(self):
212 url = ("{}/config/portmapping:network/"
213 "nodes/ROADMA/mapping/SRG1-PP11-TXRX"
214 .format(self.restconf_baseurl))
215 headers = {'content-type': 'application/json'}
216 response = requests.request(
217 "GET", url, headers=headers, auth=('admin', 'admin'))
218 self.assertEqual(response.status_code, requests.codes.ok)
219 res = response.json()
221 {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
222 'logical-connection-point': 'SRG1-PP11-TXRX'},
225 def test_12_portmapping_SRG1_PP7_TXRX(self):
226 url = ("{}/config/portmapping:network/"
227 "nodes/ROADMA/mapping/SRG1-PP7-TXRX"
228 .format(self.restconf_baseurl))
229 headers = {'content-type': 'application/json'}
230 response = requests.request(
231 "GET", url, headers=headers, auth=('admin', 'admin'))
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
235 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
236 'logical-connection-point': 'SRG1-PP7-TXRX'},
239 def test_13_portmapping_DEG2_TTP_TXRX(self):
240 url = ("{}/config/portmapping:network/"
241 "nodes/ROADMA/mapping/DEG2-TTP-TXRX"
242 .format(self.restconf_baseurl))
243 headers = {'content-type': 'application/json'}
244 response = requests.request(
245 "GET", url, headers=headers, auth=('admin', 'admin'))
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
249 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
250 'logical-connection-point': 'DEG2-TTP-TXRX'},
253 def test_14_portmapping_DEG2_TTP_TXRX(self):
254 url = ("{}/config/portmapping:network/"
255 "nodes/ROADMA/mapping/DEG2-TTP-TXRX"
256 .format(self.restconf_baseurl))
257 headers = {'content-type': 'application/json'}
258 response = requests.request(
259 "GET", url, headers=headers, auth=('admin', 'admin'))
260 self.assertEqual(response.status_code, requests.codes.ok)
261 res = response.json()
263 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
264 'logical-connection-point': 'DEG2-TTP-TXRX'},
267 def test_15_portmapping_SRG1_PP12_TXRX(self):
268 url = ("{}/config/portmapping:network/"
269 "nodes/ROADMA/mapping/SRG1-PP12-TXRX"
270 .format(self.restconf_baseurl))
271 headers = {'content-type': 'application/json'}
272 response = requests.request(
273 "GET", url, headers=headers, auth=('admin', 'admin'))
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
277 {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
278 'logical-connection-point': 'SRG1-PP12-TXRX'},
281 def test_16_portmapping_SRG1_PP8_TXRX(self):
282 url = ("{}/config/portmapping:network/"
283 "nodes/ROADMA/mapping/SRG1-PP8-TXRX"
284 .format(self.restconf_baseurl))
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "GET", url, headers=headers, auth=('admin', 'admin'))
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
291 {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
292 'logical-connection-point': 'SRG1-PP8-TXRX'},
295 def test_17_portmapping_SRG1_PP5_TXRX(self):
296 url = ("{}/config/portmapping:network/"
297 "nodes/ROADMA/mapping/SRG1-PP5-TXRX"
298 .format(self.restconf_baseurl))
299 headers = {'content-type': 'application/json'}
300 response = requests.request(
301 "GET", url, headers=headers, auth=('admin', 'admin'))
302 self.assertEqual(response.status_code, requests.codes.ok)
303 res = response.json()
305 {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
306 'logical-connection-point': 'SRG1-PP5-TXRX'},
309 def test_18_portmapping_SRG1_PP13_TXRX(self):
310 url = ("{}/config/portmapping:network/"
311 "nodes/ROADMA/mapping/SRG1-PP13-TXRX"
312 .format(self.restconf_baseurl))
313 headers = {'content-type': 'application/json'}
314 response = requests.request(
315 "GET", url, headers=headers, auth=('admin', 'admin'))
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
319 {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
320 'logical-connection-point': 'SRG1-PP13-TXRX'},
323 def test_19_portmapping_SRG1_PP15_TXRX(self):
324 url = ("{}/config/portmapping:network/"
325 "nodes/ROADMA/mapping/SRG1-PP15-TXRX"
326 .format(self.restconf_baseurl))
327 headers = {'content-type': 'application/json'}
328 response = requests.request(
329 "GET", url, headers=headers, auth=('admin', 'admin'))
330 self.assertEqual(response.status_code, requests.codes.ok)
331 res = response.json()
333 {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
334 'logical-connection-point': 'SRG1-PP15-TXRX'},
337 def test_20_portmapping_SRG1_PP10_TXRX(self):
338 url = ("{}/config/portmapping:network/"
339 "nodes/ROADMA/mapping/SRG1-PP10-TXRX"
340 .format(self.restconf_baseurl))
341 headers = {'content-type': 'application/json'}
342 response = requests.request(
343 "GET", url, headers=headers, auth=('admin', 'admin'))
344 self.assertEqual(response.status_code, requests.codes.ok)
345 res = response.json()
347 {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
348 'logical-connection-point': 'SRG1-PP10-TXRX'},
351 def test_21_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
352 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
353 data = {"renderer:input": {
354 "renderer:service-name": "service_32",
355 "renderer:wave-number": "32",
356 "renderer:operation": "create",
358 {"renderer:node-id": "ROADMA",
359 "renderer:src-tp": "DEG1-TTP-TXRX",
360 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
361 headers = {'content-type': 'application/json'}
362 response = requests.request(
363 "POST", url, data=json.dumps(data),
364 headers=headers, auth=('admin', 'admin'))
365 self.assertEqual(response.status_code, requests.codes.ok)
366 self.assertEqual(response.json(), {
369 'Roadm-connection successfully created for nodes [ROADMA]'}})
371 def test_22_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
372 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
373 data = {"renderer:input": {
374 "renderer:service-name": "service_32",
375 "renderer:wave-number": "32",
376 "renderer:operation": "create",
378 {"renderer:node-id": "ROADMA",
379 "renderer:src-tp": "SRG1-PP3-TXRX",
380 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
381 headers = {'content-type': 'application/json'}
382 response = requests.request(
383 "POST", url, data=json.dumps(data),
384 headers=headers, auth=('admin', 'admin'))
385 self.assertEqual(response.status_code, requests.codes.ok)
386 self.assertEqual(response.json(), {
389 'Roadm-connection successfully created for nodes [ROADMA]'}})
391 def test_23_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
392 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
393 data = {"renderer:input": {
394 "renderer:service-name": "service_32",
395 "renderer:wave-number": "32",
396 "renderer:operation": "delete",
398 {"renderer:node-id": "ROADMA",
399 "renderer:src-tp": "DEG1-TTP-TXRX",
400 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
401 headers = {'content-type': 'application/json'}
402 response = requests.request(
403 "POST", url, data=json.dumps(data),
404 headers=headers, auth=('admin', 'admin'))
405 self.assertEqual(response.status_code, requests.codes.ok)
406 self.assertEqual(response.json(), {
407 'output': {'result': 'Request processed'}})
409 def test_24_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
410 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
411 data = {"renderer:input": {
412 "renderer:service-name": "service_32",
413 "renderer:wave-number": "32",
414 "renderer:operation": "delete",
416 {"renderer:node-id": "ROADMA",
417 "renderer:src-tp": "SRG1-PP3-TXRX",
418 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
419 headers = {'content-type': 'application/json'}
420 response = requests.request(
421 "POST", url, data=json.dumps(data),
422 headers=headers, auth=('admin', 'admin'))
423 self.assertEqual(response.status_code, requests.codes.ok)
424 self.assertEqual(response.json(), {
425 'output': {'result': 'Request processed'}})
428 if __name__ == "__main__":
429 unittest.main(verbosity=2)