Test renderer:operation": "delete"
[transportpce.git] / tests / transportpce_tests / test_portmapping.py
1 #!/usr/bin/env python
2
3 import json
4 import os
5 import psutil
6 import requests
7 import signal
8 import shutil
9 import subprocess
10 import time
11 import unittest
12 import zipfile
13
14
15 class TransportPCEtesting(unittest.TestCase):
16
17     testtools_process = None
18     odl_process = None
19
20     @classmethod
21     def __start_testtools(cls):
22         executable = ("./netconf/netconf/tools/netconf-testtool/target/"
23                       "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
24         if os.path.isfile(executable):
25             with open('testtools.log', 'w') as outfile:
26                 cls.testtools_process = subprocess.Popen(
27                     ["java", "-jar", executable, "--schemas-dir", "schemas",
28                      "--initial-config-xml", "sample-config-ROADM.xml"],
29                     stdout=outfile)
30
31     @classmethod
32     def __start_odl(cls):
33         zfile = "../karaf/target/transportpce-karaf-0.2.0-SNAPSHOT.zip"
34         executable = "transportpce-karaf-0.2.0-SNAPSHOT/bin/karaf"
35         if os.path.isfile(zfile):
36             try:
37                 shutil.rmtree("transportpce-karaf-0.2.0-SNAPSHOT")
38             except OSError:
39                 pass
40             zipobj = zipfile.ZipFile(zfile)
41             zipobj.extractall()
42             with open('odl.log', 'w') as outfile:
43                 cls.odl_process = subprocess.Popen(
44                     ["bash", executable], stdout=outfile,
45                     stdin=open(os.devnull))
46
47     @classmethod
48     def setUpClass(cls):
49         cls.__start_testtools()
50         cls.__start_odl()
51         time.sleep(30)
52
53     @classmethod
54     def tearDownClass(cls):
55         cls.testtools_process.send_signal(signal.SIGINT)
56         cls.testtools_process.wait()
57         for child in psutil.Process(cls.odl_process.pid).children():
58             child.send_signal(signal.SIGINT)
59             child.wait()
60         cls.odl_process.send_signal(signal.SIGINT)
61         cls.odl_process.wait()
62
63     def setUp(self):
64         time.sleep(1)
65
66     def test_connect_device(self):
67         url = ("http://127.0.0.1:8181/restconf/config/network-topology:"
68                "network-topology/topology/topology-netconf/node/ROADMA")
69         data = {"node": [{
70             "node-id": "ROADMA",
71             "netconf-node-topology:username": "admin",
72             "netconf-node-topology:password": "admin",
73             "netconf-node-topology:host": "127.0.0.1",
74             "netconf-node-topology:port": "17830",
75             "netconf-node-topology:tcp-only": "false",
76             "netconf-node-topology:pass-through": {}}]}
77         headers = {'content-type': 'application/json'}
78         response = requests.request(
79             "PUT", url, data=json.dumps(data), headers=headers,
80             auth=('admin', 'admin'))
81         self.assertEqual(response.status_code, requests.codes.created)
82         time.sleep(10)
83
84     def test_device_connected(self):
85         url = ("http://127.0.0.1:8181/restconf/operational/network-topology:"
86                "network-topology/topology/topology-netconf/node/ROADMA")
87         headers = {'content-type': 'application/json'}
88         response = requests.request(
89             "GET", url, headers=headers, auth=('admin', 'admin'))
90         self.assertEqual(response.status_code, requests.codes.ok)
91         res = response.json()
92         self.assertEqual(
93             res['node'][0]['netconf-node-topology:connection-status'],
94             'connected')
95         time.sleep(2)
96
97     def test_portmapping_SRG1_PP3_TXRX(self):
98         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
99                "nodes/ROADMA/mapping/SRG1-PP3-TXRX")
100         headers = {'content-type': 'application/json'}
101         response = requests.request(
102             "GET", url, headers=headers, auth=('admin', 'admin'))
103         self.assertEqual(response.status_code, requests.codes.ok)
104         res = response.json()
105         self.assertIn(
106             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
107              'logical-connection-point': 'SRG1-PP3-TXRX'},
108             res['mapping'])
109
110     def test_portmapping_SRG1_PP6_TXRX(self):
111         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
112                "nodes/ROADMA/mapping/SRG1-PP6-TXRX")
113         headers = {'content-type': 'application/json'}
114         response = requests.request(
115             "GET", url, headers=headers, auth=('admin', 'admin'))
116         self.assertEqual(response.status_code, requests.codes.ok)
117         res = response.json()
118         self.assertIn(
119             {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
120              'logical-connection-point': 'SRG1-PP6-TXRX'},
121             res['mapping'])
122
123     def test_portmapping_DEG1_TTP_TXRX(self):
124         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
125                "nodes/ROADMA/mapping/DEG1-TTP-TXRX")
126         headers = {'content-type': 'application/json'}
127         response = requests.request(
128             "GET", url, headers=headers, auth=('admin', 'admin'))
129         self.assertEqual(response.status_code, requests.codes.ok)
130         res = response.json()
131         self.assertIn(
132             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
133              'logical-connection-point': 'DEG1-TTP-TXRX'},
134             res['mapping'])
135
136     def test_portmapping_SRG1_PP9_TXRX(self):
137         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
138                "nodes/ROADMA/mapping/SRG1-PP9-TXRX")
139         headers = {'content-type': 'application/json'}
140         response = requests.request(
141             "GET", url, headers=headers, auth=('admin', 'admin'))
142         self.assertEqual(response.status_code, requests.codes.ok)
143         res = response.json()
144         self.assertIn(
145             {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
146              'logical-connection-point': 'SRG1-PP9-TXRX'},
147             res['mapping'])
148
149     def test_portmapping_SRG1_PP16_TXRX(self):
150         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
151                "nodes/ROADMA/mapping/SRG1-PP16-TXRX")
152         headers = {'content-type': 'application/json'}
153         response = requests.request(
154             "GET", url, headers=headers, auth=('admin', 'admin'))
155         self.assertEqual(response.status_code, requests.codes.ok)
156         res = response.json()
157         self.assertIn(
158             {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
159              'logical-connection-point': 'SRG1-PP16-TXRX'},
160             res['mapping'])
161
162     def test_portmapping_SRG1_PP4_TXRX(self):
163         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
164                "nodes/ROADMA/mapping/SRG1-PP4-TXRX")
165         headers = {'content-type': 'application/json'}
166         response = requests.request(
167             "GET", url, headers=headers, auth=('admin', 'admin'))
168         self.assertEqual(response.status_code, requests.codes.ok)
169         res = response.json()
170         self.assertIn(
171             {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
172              'logical-connection-point': 'SRG1-PP4-TXRX'},
173             res['mapping'])
174
175     def test_portmapping_SRG1_PP2_TXRX(self):
176         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
177                "nodes/ROADMA/mapping/SRG1-PP2-TXRX")
178         headers = {'content-type': 'application/json'}
179         response = requests.request(
180             "GET", url, headers=headers, auth=('admin', 'admin'))
181         self.assertEqual(response.status_code, requests.codes.ok)
182         res = response.json()
183         self.assertIn(
184             {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
185              'logical-connection-point': 'SRG1-PP2-TXRX'},
186             res['mapping'])
187
188     def test_portmapping_SRG1_PP14_TXRX(self):
189         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
190                "nodes/ROADMA/mapping/SRG1-PP14-TXRX")
191         headers = {'content-type': 'application/json'}
192         response = requests.request(
193             "GET", url, headers=headers, auth=('admin', 'admin'))
194         self.assertEqual(response.status_code, requests.codes.ok)
195         res = response.json()
196         self.assertIn(
197             {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
198              'logical-connection-point': 'SRG1-PP14-TXRX'},
199             res['mapping'])
200
201     def test_portmapping_SRG1_PP11_TXRX(self):
202         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
203                "nodes/ROADMA/mapping/SRG1-PP11-TXRX")
204         headers = {'content-type': 'application/json'}
205         response = requests.request(
206             "GET", url, headers=headers, auth=('admin', 'admin'))
207         self.assertEqual(response.status_code, requests.codes.ok)
208         res = response.json()
209         self.assertIn(
210             {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
211              'logical-connection-point': 'SRG1-PP11-TXRX'},
212             res['mapping'])
213
214     def test_portmapping_SRG1_PP7_TXRX(self):
215         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
216                "nodes/ROADMA/mapping/SRG1-PP7-TXRX")
217         headers = {'content-type': 'application/json'}
218         response = requests.request(
219             "GET", url, headers=headers, auth=('admin', 'admin'))
220         self.assertEqual(response.status_code, requests.codes.ok)
221         res = response.json()
222         self.assertIn(
223             {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
224              'logical-connection-point': 'SRG1-PP7-TXRX'},
225             res['mapping'])
226
227     def test_portmapping_DEG2_TTP_TXRX(self):
228         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
229                "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
230         headers = {'content-type': 'application/json'}
231         response = requests.request(
232             "GET", url, headers=headers, auth=('admin', 'admin'))
233         self.assertEqual(response.status_code, requests.codes.ok)
234         res = response.json()
235         self.assertIn(
236             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
237              'logical-connection-point': 'DEG2-TTP-TXRX'},
238             res['mapping'])
239
240     def test_portmapping_DEG2_TTP_TXRX(self):
241         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
242                "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
243         headers = {'content-type': 'application/json'}
244         response = requests.request(
245             "GET", url, headers=headers, auth=('admin', 'admin'))
246         self.assertEqual(response.status_code, requests.codes.ok)
247         res = response.json()
248         self.assertIn(
249             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
250              'logical-connection-point': 'DEG2-TTP-TXRX'},
251             res['mapping'])
252
253     def test_portmapping_SRG1_PP12_TXRX(self):
254         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
255                "nodes/ROADMA/mapping/SRG1-PP12-TXRX")
256         headers = {'content-type': 'application/json'}
257         response = requests.request(
258             "GET", url, headers=headers, auth=('admin', 'admin'))
259         self.assertEqual(response.status_code, requests.codes.ok)
260         res = response.json()
261         self.assertIn(
262             {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
263              'logical-connection-point': 'SRG1-PP12-TXRX'},
264             res['mapping'])
265
266     def test_portmapping_SRG1_PP8_TXRX(self):
267         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
268                "nodes/ROADMA/mapping/SRG1-PP8-TXRX")
269         headers = {'content-type': 'application/json'}
270         response = requests.request(
271             "GET", url, headers=headers, auth=('admin', 'admin'))
272         self.assertEqual(response.status_code, requests.codes.ok)
273         res = response.json()
274         self.assertIn(
275             {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
276              'logical-connection-point': 'SRG1-PP8-TXRX'},
277             res['mapping'])
278
279     def test_portmapping_SRG1_PP5_TXRX(self):
280         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
281                "nodes/ROADMA/mapping/SRG1-PP5-TXRX")
282         headers = {'content-type': 'application/json'}
283         response = requests.request(
284             "GET", url, headers=headers, auth=('admin', 'admin'))
285         self.assertEqual(response.status_code, requests.codes.ok)
286         res = response.json()
287         self.assertIn(
288             {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
289              'logical-connection-point': 'SRG1-PP5-TXRX'},
290             res['mapping'])
291
292     def test_portmapping_SRG1_PP13_TXRX(self):
293         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
294                "nodes/ROADMA/mapping/SRG1-PP13-TXRX")
295         headers = {'content-type': 'application/json'}
296         response = requests.request(
297             "GET", url, headers=headers, auth=('admin', 'admin'))
298         self.assertEqual(response.status_code, requests.codes.ok)
299         res = response.json()
300         self.assertIn(
301             {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
302              'logical-connection-point': 'SRG1-PP13-TXRX'},
303             res['mapping'])
304
305     def test_portmapping_SRG1_PP15_TXRX(self):
306         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
307                "nodes/ROADMA/mapping/SRG1-PP15-TXRX")
308         headers = {'content-type': 'application/json'}
309         response = requests.request(
310             "GET", url, headers=headers, auth=('admin', 'admin'))
311         self.assertEqual(response.status_code, requests.codes.ok)
312         res = response.json()
313         self.assertIn(
314             {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
315              'logical-connection-point': 'SRG1-PP15-TXRX'},
316             res['mapping'])
317
318     def test_portmapping_SRG1_PP10_TXRX(self):
319         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
320                "nodes/ROADMA/mapping/SRG1-PP10-TXRX")
321         headers = {'content-type': 'application/json'}
322         response = requests.request(
323             "GET", url, headers=headers, auth=('admin', 'admin'))
324         self.assertEqual(response.status_code, requests.codes.ok)
325         res = response.json()
326         self.assertIn(
327             {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
328              'logical-connection-point': 'SRG1-PP10-TXRX'},
329             res['mapping'])
330
331     def test_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
332         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
333         data = {"renderer:input": {
334             "renderer:service-name": "service_32",
335             "renderer:wave-number": "32",
336             "renderer:operation": "create",
337             "renderer:nodes": [
338                 {"renderer:node-id": "ROADMA",
339                  "renderer:src-tp": "DEG1-TTP-TXRX",
340                  "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
341         headers = {'content-type': 'application/json'}
342         response = requests.request(
343             "POST", url, data=json.dumps(data),
344             headers=headers, auth=('admin', 'admin'))
345         self.assertEqual(response.status_code, requests.codes.ok)
346         self.assertEqual(response.json(), {
347             'output': {
348                 'result':
349                 'Roadm-connection successfully created for nodes [ROADMA]'}})
350
351     def test_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
352         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
353         data = {"renderer:input": {
354             "renderer:service-name": "service_32",
355             "renderer:wave-number": "32",
356             "renderer:operation": "create",
357             "renderer:nodes": [
358                 {"renderer:node-id": "ROADMA",
359                  "renderer:src-tp": "SRG1-PP3-TXRX",
360                  "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
361         headers = {'content-type': 'application/json'}
362         response = requests.request(
363             "POST", url, data=json.dumps(data),
364             headers=headers, auth=('admin', 'admin'))
365         self.assertEqual(response.status_code, requests.codes.ok)
366         self.assertEqual(response.json(), {
367             'output': {
368                 'result':
369                 'Roadm-connection successfully created for nodes [ROADMA]'}})
370
371     def test_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
372         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
373         data = {"renderer:input": {
374             "renderer:service-name": "service_32",
375             "renderer:wave-number": "32",
376             "renderer:operation": "delete",
377             "renderer:nodes": [
378                 {"renderer:node-id": "ROADMA",
379                  "renderer:src-tp": "DEG1-TTP-TXRX",
380                  "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
381         headers = {'content-type': 'application/json'}
382         response = requests.request(
383             "POST", url, data=json.dumps(data),
384             headers=headers, auth=('admin', 'admin'))
385         self.assertEqual(response.status_code, requests.codes.ok)
386         self.assertEqual(response.json(), {
387             'output': {'result': 'Request processed'}})
388
389     def test_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
390         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
391         data = {"renderer:input": {
392             "renderer:service-name": "service_32",
393             "renderer:wave-number": "32",
394             "renderer:operation": "delete",
395             "renderer:nodes": [
396                 {"renderer:node-id": "ROADMA",
397                  "renderer:src-tp": "SRG1-PP3-TXRX",
398                  "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
399         headers = {'content-type': 'application/json'}
400         response = requests.request(
401             "POST", url, data=json.dumps(data),
402             headers=headers, auth=('admin', 'admin'))
403         self.assertEqual(response.status_code, requests.codes.ok)
404         self.assertEqual(response.json(), {
405             'output': {'result': 'Request processed'}})
406
407
408 def test_suite():
409     suite = unittest.TestSuite()
410     suite.addTest(TransportPCEtesting('test_connect_device'))
411     suite.addTest(TransportPCEtesting('test_device_connected'))
412     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP3_TXRX'))
413     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP6_TXRX'))
414     suite.addTest(TransportPCEtesting('test_portmapping_DEG1_TTP_TXRX'))
415     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP9_TXRX'))
416     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP16_TXRX'))
417     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP4_TXRX'))
418     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP2_TXRX'))
419     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP14_TXRX'))
420     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP11_TXRX'))
421     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP7_TXRX'))
422     suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
423     suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
424     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP12_TXRX'))
425     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP8_TXRX'))
426     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP5_TXRX'))
427     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP13_TXRX'))
428     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP15_TXRX'))
429     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP10_TXRX'))
430     suite.addTest(TransportPCEtesting(
431         'test_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX'))
432     suite.addTest(TransportPCEtesting(
433         'test_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX'))
434     suite.addTest(TransportPCEtesting(
435         'test_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX'))
436     suite.addTest(TransportPCEtesting(
437         'test_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX'))
438     return suite
439
440 if __name__ == "__main__":
441     RUNNER = unittest.TextTestRunner(verbosity=2)
442     RUNNER.run(test_suite())