15 class TransportPCEtesting(unittest.TestCase):
17 testtools_process = None
21 def __start_testtools(cls):
22 executable = ("./netconf/netconf/tools/netconf-testtool/target/"
23 "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
24 if os.path.isfile(executable):
25 with open('testtools.log', 'w') as outfile:
26 cls.testtools_process = subprocess.Popen(
27 ["java", "-jar", executable, "--schemas-dir", "schemas",
28 "--initial-config-xml", "sample-config-ROADM.xml"],
33 zfile = "../karaf/target/transportpce-karaf-0.2.0-SNAPSHOT.zip"
34 executable = "transportpce-karaf-0.2.0-SNAPSHOT/bin/karaf"
35 if os.path.isfile(zfile):
37 shutil.rmtree("transportpce-karaf-0.2.0-SNAPSHOT")
40 zipobj = zipfile.ZipFile(zfile)
42 with open('odl.log', 'w') as outfile:
43 cls.odl_process = subprocess.Popen(
44 ["bash", executable], stdout=outfile,
45 stdin=open(os.devnull))
49 cls.__start_testtools()
54 def tearDownClass(cls):
55 cls.testtools_process.send_signal(signal.SIGINT)
56 cls.testtools_process.wait()
57 for child in psutil.Process(cls.odl_process.pid).children():
58 child.send_signal(signal.SIGINT)
60 cls.odl_process.send_signal(signal.SIGINT)
61 cls.odl_process.wait()
66 def test_connect_device(self):
67 url = ("http://127.0.0.1:8181/restconf/config/network-topology:"
68 "network-topology/topology/topology-netconf/node/ROADMA")
71 "netconf-node-topology:username": "admin",
72 "netconf-node-topology:password": "admin",
73 "netconf-node-topology:host": "127.0.0.1",
74 "netconf-node-topology:port": "17830",
75 "netconf-node-topology:tcp-only": "false",
76 "netconf-node-topology:pass-through": {}}]}
77 headers = {'content-type': 'application/json'}
78 response = requests.request(
79 "PUT", url, data=json.dumps(data), headers=headers,
80 auth=('admin', 'admin'))
81 self.assertEqual(response.status_code, requests.codes.created)
84 def test_device_connected(self):
85 url = ("http://127.0.0.1:8181/restconf/operational/network-topology:"
86 "network-topology/topology/topology-netconf/node/ROADMA")
87 headers = {'content-type': 'application/json'}
88 response = requests.request(
89 "GET", url, headers=headers, auth=('admin', 'admin'))
90 self.assertEqual(response.status_code, requests.codes.ok)
93 res['node'][0]['netconf-node-topology:connection-status'],
97 def test_portmapping_SRG1_PP3_TXRX(self):
98 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
99 "nodes/ROADMA/mapping/SRG1-PP3-TXRX")
100 headers = {'content-type': 'application/json'}
101 response = requests.request(
102 "GET", url, headers=headers, auth=('admin', 'admin'))
103 self.assertEqual(response.status_code, requests.codes.ok)
104 res = response.json()
106 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
107 'logical-connection-point': 'SRG1-PP3-TXRX'},
110 def test_portmapping_SRG1_PP6_TXRX(self):
111 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
112 "nodes/ROADMA/mapping/SRG1-PP6-TXRX")
113 headers = {'content-type': 'application/json'}
114 response = requests.request(
115 "GET", url, headers=headers, auth=('admin', 'admin'))
116 self.assertEqual(response.status_code, requests.codes.ok)
117 res = response.json()
119 {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
120 'logical-connection-point': 'SRG1-PP6-TXRX'},
123 def test_portmapping_DEG1_TTP_TXRX(self):
124 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
125 "nodes/ROADMA/mapping/DEG1-TTP-TXRX")
126 headers = {'content-type': 'application/json'}
127 response = requests.request(
128 "GET", url, headers=headers, auth=('admin', 'admin'))
129 self.assertEqual(response.status_code, requests.codes.ok)
130 res = response.json()
132 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
133 'logical-connection-point': 'DEG1-TTP-TXRX'},
136 def test_portmapping_SRG1_PP9_TXRX(self):
137 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
138 "nodes/ROADMA/mapping/SRG1-PP9-TXRX")
139 headers = {'content-type': 'application/json'}
140 response = requests.request(
141 "GET", url, headers=headers, auth=('admin', 'admin'))
142 self.assertEqual(response.status_code, requests.codes.ok)
143 res = response.json()
145 {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
146 'logical-connection-point': 'SRG1-PP9-TXRX'},
149 def test_portmapping_SRG1_PP16_TXRX(self):
150 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
151 "nodes/ROADMA/mapping/SRG1-PP16-TXRX")
152 headers = {'content-type': 'application/json'}
153 response = requests.request(
154 "GET", url, headers=headers, auth=('admin', 'admin'))
155 self.assertEqual(response.status_code, requests.codes.ok)
156 res = response.json()
158 {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
159 'logical-connection-point': 'SRG1-PP16-TXRX'},
162 def test_portmapping_SRG1_PP4_TXRX(self):
163 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
164 "nodes/ROADMA/mapping/SRG1-PP4-TXRX")
165 headers = {'content-type': 'application/json'}
166 response = requests.request(
167 "GET", url, headers=headers, auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
171 {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
172 'logical-connection-point': 'SRG1-PP4-TXRX'},
175 def test_portmapping_SRG1_PP2_TXRX(self):
176 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
177 "nodes/ROADMA/mapping/SRG1-PP2-TXRX")
178 headers = {'content-type': 'application/json'}
179 response = requests.request(
180 "GET", url, headers=headers, auth=('admin', 'admin'))
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
184 {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
185 'logical-connection-point': 'SRG1-PP2-TXRX'},
188 def test_portmapping_SRG1_PP14_TXRX(self):
189 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
190 "nodes/ROADMA/mapping/SRG1-PP14-TXRX")
191 headers = {'content-type': 'application/json'}
192 response = requests.request(
193 "GET", url, headers=headers, auth=('admin', 'admin'))
194 self.assertEqual(response.status_code, requests.codes.ok)
195 res = response.json()
197 {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
198 'logical-connection-point': 'SRG1-PP14-TXRX'},
201 def test_portmapping_SRG1_PP11_TXRX(self):
202 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
203 "nodes/ROADMA/mapping/SRG1-PP11-TXRX")
204 headers = {'content-type': 'application/json'}
205 response = requests.request(
206 "GET", url, headers=headers, auth=('admin', 'admin'))
207 self.assertEqual(response.status_code, requests.codes.ok)
208 res = response.json()
210 {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
211 'logical-connection-point': 'SRG1-PP11-TXRX'},
214 def test_portmapping_SRG1_PP7_TXRX(self):
215 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
216 "nodes/ROADMA/mapping/SRG1-PP7-TXRX")
217 headers = {'content-type': 'application/json'}
218 response = requests.request(
219 "GET", url, headers=headers, auth=('admin', 'admin'))
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
223 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
224 'logical-connection-point': 'SRG1-PP7-TXRX'},
227 def test_portmapping_DEG2_TTP_TXRX(self):
228 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
229 "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
230 headers = {'content-type': 'application/json'}
231 response = requests.request(
232 "GET", url, headers=headers, auth=('admin', 'admin'))
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
236 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
237 'logical-connection-point': 'DEG2-TTP-TXRX'},
240 def test_portmapping_DEG2_TTP_TXRX(self):
241 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
242 "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
243 headers = {'content-type': 'application/json'}
244 response = requests.request(
245 "GET", url, headers=headers, auth=('admin', 'admin'))
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
249 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
250 'logical-connection-point': 'DEG2-TTP-TXRX'},
253 def test_portmapping_SRG1_PP12_TXRX(self):
254 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
255 "nodes/ROADMA/mapping/SRG1-PP12-TXRX")
256 headers = {'content-type': 'application/json'}
257 response = requests.request(
258 "GET", url, headers=headers, auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
262 {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
263 'logical-connection-point': 'SRG1-PP12-TXRX'},
266 def test_portmapping_SRG1_PP8_TXRX(self):
267 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
268 "nodes/ROADMA/mapping/SRG1-PP8-TXRX")
269 headers = {'content-type': 'application/json'}
270 response = requests.request(
271 "GET", url, headers=headers, auth=('admin', 'admin'))
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
275 {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
276 'logical-connection-point': 'SRG1-PP8-TXRX'},
279 def test_portmapping_SRG1_PP5_TXRX(self):
280 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
281 "nodes/ROADMA/mapping/SRG1-PP5-TXRX")
282 headers = {'content-type': 'application/json'}
283 response = requests.request(
284 "GET", url, headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
288 {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
289 'logical-connection-point': 'SRG1-PP5-TXRX'},
292 def test_portmapping_SRG1_PP13_TXRX(self):
293 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
294 "nodes/ROADMA/mapping/SRG1-PP13-TXRX")
295 headers = {'content-type': 'application/json'}
296 response = requests.request(
297 "GET", url, headers=headers, auth=('admin', 'admin'))
298 self.assertEqual(response.status_code, requests.codes.ok)
299 res = response.json()
301 {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
302 'logical-connection-point': 'SRG1-PP13-TXRX'},
305 def test_portmapping_SRG1_PP15_TXRX(self):
306 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
307 "nodes/ROADMA/mapping/SRG1-PP15-TXRX")
308 headers = {'content-type': 'application/json'}
309 response = requests.request(
310 "GET", url, headers=headers, auth=('admin', 'admin'))
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
314 {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
315 'logical-connection-point': 'SRG1-PP15-TXRX'},
318 def test_portmapping_SRG1_PP10_TXRX(self):
319 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
320 "nodes/ROADMA/mapping/SRG1-PP10-TXRX")
321 headers = {'content-type': 'application/json'}
322 response = requests.request(
323 "GET", url, headers=headers, auth=('admin', 'admin'))
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
327 {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
328 'logical-connection-point': 'SRG1-PP10-TXRX'},
331 def test_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
332 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
333 data = {"renderer:input": {
334 "renderer:service-name": "service_32",
335 "renderer:wave-number": "32",
336 "renderer:operation": "create",
338 {"renderer:node-id": "ROADMA",
339 "renderer:src-tp": "DEG1-TTP-TXRX",
340 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
341 headers = {'content-type': 'application/json'}
342 response = requests.request(
343 "POST", url, data=json.dumps(data),
344 headers=headers, auth=('admin', 'admin'))
345 self.assertEqual(response.status_code, requests.codes.ok)
346 self.assertEqual(response.json(), {
349 'Roadm-connection successfully created for nodes [ROADMA]'}})
351 def test_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
352 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
353 data = {"renderer:input": {
354 "renderer:service-name": "service_32",
355 "renderer:wave-number": "32",
356 "renderer:operation": "create",
358 {"renderer:node-id": "ROADMA",
359 "renderer:src-tp": "SRG1-PP3-TXRX",
360 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
361 headers = {'content-type': 'application/json'}
362 response = requests.request(
363 "POST", url, data=json.dumps(data),
364 headers=headers, auth=('admin', 'admin'))
365 self.assertEqual(response.status_code, requests.codes.ok)
366 self.assertEqual(response.json(), {
369 'Roadm-connection successfully created for nodes [ROADMA]'}})
371 def test_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
372 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
373 data = {"renderer:input": {
374 "renderer:service-name": "service_32",
375 "renderer:wave-number": "32",
376 "renderer:operation": "delete",
378 {"renderer:node-id": "ROADMA",
379 "renderer:src-tp": "DEG1-TTP-TXRX",
380 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
381 headers = {'content-type': 'application/json'}
382 response = requests.request(
383 "POST", url, data=json.dumps(data),
384 headers=headers, auth=('admin', 'admin'))
385 self.assertEqual(response.status_code, requests.codes.ok)
386 self.assertEqual(response.json(), {
387 'output': {'result': 'Request processed'}})
389 def test_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
390 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
391 data = {"renderer:input": {
392 "renderer:service-name": "service_32",
393 "renderer:wave-number": "32",
394 "renderer:operation": "delete",
396 {"renderer:node-id": "ROADMA",
397 "renderer:src-tp": "SRG1-PP3-TXRX",
398 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
399 headers = {'content-type': 'application/json'}
400 response = requests.request(
401 "POST", url, data=json.dumps(data),
402 headers=headers, auth=('admin', 'admin'))
403 self.assertEqual(response.status_code, requests.codes.ok)
404 self.assertEqual(response.json(), {
405 'output': {'result': 'Request processed'}})
409 suite = unittest.TestSuite()
410 suite.addTest(TransportPCEtesting('test_connect_device'))
411 suite.addTest(TransportPCEtesting('test_device_connected'))
412 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP3_TXRX'))
413 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP6_TXRX'))
414 suite.addTest(TransportPCEtesting('test_portmapping_DEG1_TTP_TXRX'))
415 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP9_TXRX'))
416 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP16_TXRX'))
417 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP4_TXRX'))
418 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP2_TXRX'))
419 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP14_TXRX'))
420 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP11_TXRX'))
421 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP7_TXRX'))
422 suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
423 suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
424 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP12_TXRX'))
425 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP8_TXRX'))
426 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP5_TXRX'))
427 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP13_TXRX'))
428 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP15_TXRX'))
429 suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP10_TXRX'))
430 suite.addTest(TransportPCEtesting(
431 'test_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX'))
432 suite.addTest(TransportPCEtesting(
433 'test_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX'))
434 suite.addTest(TransportPCEtesting(
435 'test_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX'))
436 suite.addTest(TransportPCEtesting(
437 'test_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX'))
440 if __name__ == "__main__":
441 RUNNER = unittest.TextTestRunner(verbosity=2)
442 RUNNER.run(test_suite())