c937ad73685bf3b140b6d5169377ce194ab7b2d4
[transportpce.git] / tests / transportpce_tests / test_portmapping.py
1 #!/usr/bin/env python
2
3 import json
4 import os
5 import psutil
6 import requests
7 import shutil
8 import subprocess
9 import time
10 import unittest
11 import zipfile
12
13
14 class TransportPCEtesting(unittest.TestCase):
15
16     testtools_process = None
17     odl_process = None
18
19     @classmethod
20     def __start_testtools(cls):
21         executable = ("./netconf/netconf/tools/netconf-testtool/target/"
22                       "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
23         if os.path.isfile(executable):
24             with open('testtools.log', 'w') as outfile:
25                 cls.testtools_process = subprocess.Popen(
26                     ["java", "-jar", executable, "--schemas-dir", "schemas",
27                      "--initial-config-xml", "sample-config-ROADM.xml"],
28                     stdout=outfile)
29
30     @classmethod
31     def __start_odl(cls):
32         zfile = "../karaf/target/transportpce-karaf-0.2.0-SNAPSHOT.zip"
33         executable = "transportpce-karaf-0.2.0-SNAPSHOT/bin/karaf"
34         if os.path.isfile(zfile):
35             try:
36                 shutil.rmtree("transportpce-karaf-0.2.0-SNAPSHOT")
37             except OSError:
38                 pass
39             zipobj = zipfile.ZipFile(zfile)
40             zipobj.extractall()
41             with open('odl.log', 'w') as outfile:
42                 cls.odl_process = subprocess.Popen(
43                     ["bash", executable], stdout=outfile)
44
45     @classmethod
46     def setUpClass(cls):
47         cls.__start_testtools()
48         cls.__start_odl()
49         time.sleep(30)
50
51     @classmethod
52     def tearDownClass(cls):
53         cls.testtools_process.kill()
54         for child in psutil.Process(cls.odl_process.pid).children():
55             child.kill()
56         cls.odl_process.kill()
57
58     def setUp(self):
59         time.sleep(1)
60
61     def test_connect_device(self):
62         url = ("http://127.0.0.1:8181/restconf/config/network-topology:"
63                "network-topology/topology/topology-netconf/node/ROADMA")
64         data = {"node": [{
65             "node-id": "ROADMA",
66             "netconf-node-topology:username": "admin",
67             "netconf-node-topology:password": "admin",
68             "netconf-node-topology:host": "127.0.0.1",
69             "netconf-node-topology:port": "17830",
70             "netconf-node-topology:tcp-only": "false",
71             "netconf-node-topology:pass-through": {}}]}
72         headers = {'content-type': 'application/json'}
73         response = requests.request(
74             "PUT", url, data=json.dumps(data), headers=headers,
75             auth=('admin', 'admin'))
76         self.assertEqual(response.status_code, requests.codes.created)
77         time.sleep(10)
78
79     def test_device_connected(self):
80         url = ("http://127.0.0.1:8181/restconf/operational/network-topology:"
81                "network-topology/topology/topology-netconf/node/ROADMA")
82         headers = {'content-type': 'application/json'}
83         response = requests.request(
84             "GET", url, headers=headers, auth=('admin', 'admin'))
85         self.assertEqual(response.status_code, requests.codes.ok)
86         res = response.json()
87         self.assertEqual(
88             res['node'][0]['netconf-node-topology:connection-status'],
89             'connected')
90         time.sleep(2)
91
92     def test_portmapping_SRG1_PP3_TXRX(self):
93         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
94                "nodes/ROADMA/mapping/SRG1-PP3-TXRX")
95         headers = {'content-type': 'application/json'}
96         response = requests.request(
97             "GET", url, headers=headers, auth=('admin', 'admin'))
98         self.assertEqual(response.status_code, requests.codes.ok)
99         res = response.json()
100         self.assertIn(
101             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
102              'logical-connection-point': 'SRG1-PP3-TXRX'},
103             res['mapping'])
104
105     def test_portmapping_SRG1_PP6_TXRX(self):
106         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
107                "nodes/ROADMA/mapping/SRG1-PP6-TXRX")
108         headers = {'content-type': 'application/json'}
109         response = requests.request(
110             "GET", url, headers=headers, auth=('admin', 'admin'))
111         self.assertEqual(response.status_code, requests.codes.ok)
112         res = response.json()
113         self.assertIn(
114             {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
115              'logical-connection-point': 'SRG1-PP6-TXRX'},
116             res['mapping'])
117
118     def test_portmapping_DEG1_TTP_TXRX(self):
119         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
120                "nodes/ROADMA/mapping/DEG1-TTP-TXRX")
121         headers = {'content-type': 'application/json'}
122         response = requests.request(
123             "GET", url, headers=headers, auth=('admin', 'admin'))
124         self.assertEqual(response.status_code, requests.codes.ok)
125         res = response.json()
126         self.assertIn(
127             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
128              'logical-connection-point': 'DEG1-TTP-TXRX'},
129             res['mapping'])
130
131     def test_portmapping_SRG1_PP9_TXRX(self):
132         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
133                "nodes/ROADMA/mapping/SRG1-PP9-TXRX")
134         headers = {'content-type': 'application/json'}
135         response = requests.request(
136             "GET", url, headers=headers, auth=('admin', 'admin'))
137         self.assertEqual(response.status_code, requests.codes.ok)
138         res = response.json()
139         self.assertIn(
140             {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
141              'logical-connection-point': 'SRG1-PP9-TXRX'},
142             res['mapping'])
143
144     def test_portmapping_SRG1_PP16_TXRX(self):
145         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
146                "nodes/ROADMA/mapping/SRG1-PP16-TXRX")
147         headers = {'content-type': 'application/json'}
148         response = requests.request(
149             "GET", url, headers=headers, auth=('admin', 'admin'))
150         self.assertEqual(response.status_code, requests.codes.ok)
151         res = response.json()
152         self.assertIn(
153             {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
154              'logical-connection-point': 'SRG1-PP16-TXRX'},
155             res['mapping'])
156
157     def test_portmapping_SRG1_PP4_TXRX(self):
158         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
159                "nodes/ROADMA/mapping/SRG1-PP4-TXRX")
160         headers = {'content-type': 'application/json'}
161         response = requests.request(
162             "GET", url, headers=headers, auth=('admin', 'admin'))
163         self.assertEqual(response.status_code, requests.codes.ok)
164         res = response.json()
165         self.assertIn(
166             {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
167              'logical-connection-point': 'SRG1-PP4-TXRX'},
168             res['mapping'])
169
170     def test_portmapping_SRG1_PP2_TXRX(self):
171         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
172                "nodes/ROADMA/mapping/SRG1-PP2-TXRX")
173         headers = {'content-type': 'application/json'}
174         response = requests.request(
175             "GET", url, headers=headers, auth=('admin', 'admin'))
176         self.assertEqual(response.status_code, requests.codes.ok)
177         res = response.json()
178         self.assertIn(
179             {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
180              'logical-connection-point': 'SRG1-PP2-TXRX'},
181             res['mapping'])
182
183     def test_portmapping_SRG1_PP14_TXRX(self):
184         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
185                "nodes/ROADMA/mapping/SRG1-PP14-TXRX")
186         headers = {'content-type': 'application/json'}
187         response = requests.request(
188             "GET", url, headers=headers, auth=('admin', 'admin'))
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         self.assertIn(
192             {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
193              'logical-connection-point': 'SRG1-PP14-TXRX'},
194             res['mapping'])
195
196     def test_portmapping_SRG1_PP11_TXRX(self):
197         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
198                "nodes/ROADMA/mapping/SRG1-PP11-TXRX")
199         headers = {'content-type': 'application/json'}
200         response = requests.request(
201             "GET", url, headers=headers, auth=('admin', 'admin'))
202         self.assertEqual(response.status_code, requests.codes.ok)
203         res = response.json()
204         self.assertIn(
205             {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
206              'logical-connection-point': 'SRG1-PP11-TXRX'},
207             res['mapping'])
208
209     def test_portmapping_SRG1_PP7_TXRX(self):
210         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
211                "nodes/ROADMA/mapping/SRG1-PP7-TXRX")
212         headers = {'content-type': 'application/json'}
213         response = requests.request(
214             "GET", url, headers=headers, auth=('admin', 'admin'))
215         self.assertEqual(response.status_code, requests.codes.ok)
216         res = response.json()
217         self.assertIn(
218             {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
219              'logical-connection-point': 'SRG1-PP7-TXRX'},
220             res['mapping'])
221
222     def test_portmapping_DEG2_TTP_TXRX(self):
223         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
224                "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
225         headers = {'content-type': 'application/json'}
226         response = requests.request(
227             "GET", url, headers=headers, auth=('admin', 'admin'))
228         self.assertEqual(response.status_code, requests.codes.ok)
229         res = response.json()
230         self.assertIn(
231             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
232              'logical-connection-point': 'DEG2-TTP-TXRX'},
233             res['mapping'])
234
235     def test_portmapping_DEG2_TTP_TXRX(self):
236         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
237                "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
238         headers = {'content-type': 'application/json'}
239         response = requests.request(
240             "GET", url, headers=headers, auth=('admin', 'admin'))
241         self.assertEqual(response.status_code, requests.codes.ok)
242         res = response.json()
243         self.assertIn(
244             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
245              'logical-connection-point': 'DEG2-TTP-TXRX'},
246             res['mapping'])
247
248     def test_portmapping_SRG1_PP12_TXRX(self):
249         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
250                "nodes/ROADMA/mapping/SRG1-PP12-TXRX")
251         headers = {'content-type': 'application/json'}
252         response = requests.request(
253             "GET", url, headers=headers, auth=('admin', 'admin'))
254         self.assertEqual(response.status_code, requests.codes.ok)
255         res = response.json()
256         self.assertIn(
257             {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
258              'logical-connection-point': 'SRG1-PP12-TXRX'},
259             res['mapping'])
260
261     def test_portmapping_SRG1_PP8_TXRX(self):
262         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
263                "nodes/ROADMA/mapping/SRG1-PP8-TXRX")
264         headers = {'content-type': 'application/json'}
265         response = requests.request(
266             "GET", url, headers=headers, auth=('admin', 'admin'))
267         self.assertEqual(response.status_code, requests.codes.ok)
268         res = response.json()
269         self.assertIn(
270             {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
271              'logical-connection-point': 'SRG1-PP8-TXRX'},
272             res['mapping'])
273
274     def test_portmapping_SRG1_PP5_TXRX(self):
275         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
276                "nodes/ROADMA/mapping/SRG1-PP5-TXRX")
277         headers = {'content-type': 'application/json'}
278         response = requests.request(
279             "GET", url, headers=headers, auth=('admin', 'admin'))
280         self.assertEqual(response.status_code, requests.codes.ok)
281         res = response.json()
282         self.assertIn(
283             {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
284              'logical-connection-point': 'SRG1-PP5-TXRX'},
285             res['mapping'])
286
287     def test_portmapping_SRG1_PP13_TXRX(self):
288         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
289                "nodes/ROADMA/mapping/SRG1-PP13-TXRX")
290         headers = {'content-type': 'application/json'}
291         response = requests.request(
292             "GET", url, headers=headers, auth=('admin', 'admin'))
293         self.assertEqual(response.status_code, requests.codes.ok)
294         res = response.json()
295         self.assertIn(
296             {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
297              'logical-connection-point': 'SRG1-PP13-TXRX'},
298             res['mapping'])
299
300     def test_portmapping_SRG1_PP15_TXRX(self):
301         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
302                "nodes/ROADMA/mapping/SRG1-PP15-TXRX")
303         headers = {'content-type': 'application/json'}
304         response = requests.request(
305             "GET", url, headers=headers, auth=('admin', 'admin'))
306         self.assertEqual(response.status_code, requests.codes.ok)
307         res = response.json()
308         self.assertIn(
309             {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
310              'logical-connection-point': 'SRG1-PP15-TXRX'},
311             res['mapping'])
312
313     def test_portmapping_SRG1_PP10_TXRX(self):
314         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
315                "nodes/ROADMA/mapping/SRG1-PP10-TXRX")
316         headers = {'content-type': 'application/json'}
317         response = requests.request(
318             "GET", url, headers=headers, auth=('admin', 'admin'))
319         self.assertEqual(response.status_code, requests.codes.ok)
320         res = response.json()
321         self.assertIn(
322             {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
323              'logical-connection-point': 'SRG1-PP10-TXRX'},
324             res['mapping'])
325
326     def test_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
327         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
328         data = {"renderer:input": {
329             "renderer:service-name": "service_32",
330             "renderer:wave-number": "32",
331             "renderer:operation": "create",
332             "renderer:nodes": [
333                 {"renderer:node-id": "ROADMA",
334                  "renderer:src-tp": "DEG1-TTP-TXRX",
335                  "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
336         headers = {'content-type': 'application/json'}
337         response = requests.request(
338             "POST", url, data=json.dumps(data),
339             headers=headers, auth=('admin', 'admin'))
340         self.assertEqual(response.status_code, requests.codes.ok)
341         self.assertEqual(response.json(), {
342             'output': {
343                 'result':
344                 'Roadm-connection successfully created for nodes [ROADMA]'}})
345
346     def test_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
347         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
348         data = {"renderer:input": {
349             "renderer:service-name": "service_32",
350             "renderer:wave-number": "32",
351             "renderer:operation": "create",
352             "renderer:nodes": [
353                 {"renderer:node-id": "ROADMA",
354                  "renderer:src-tp": "SRG1-PP3-TXRX",
355                  "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
356         headers = {'content-type': 'application/json'}
357         response = requests.request(
358             "POST", url, data=json.dumps(data),
359             headers=headers, auth=('admin', 'admin'))
360         self.assertEqual(response.status_code, requests.codes.ok)
361         self.assertEqual(response.json(), {
362             'output': {
363                 'result':
364                 'Roadm-connection successfully created for nodes [ROADMA]'}})
365
366
367 def test_suite():
368     suite = unittest.TestSuite()
369     suite.addTest(TransportPCEtesting('test_connect_device'))
370     suite.addTest(TransportPCEtesting('test_device_connected'))
371     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP3_TXRX'))
372     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP6_TXRX'))
373     suite.addTest(TransportPCEtesting('test_portmapping_DEG1_TTP_TXRX'))
374     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP9_TXRX'))
375     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP16_TXRX'))
376     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP4_TXRX'))
377     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP2_TXRX'))
378     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP14_TXRX'))
379     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP11_TXRX'))
380     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP7_TXRX'))
381     suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
382     suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
383     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP12_TXRX'))
384     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP8_TXRX'))
385     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP5_TXRX'))
386     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP13_TXRX'))
387     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP15_TXRX'))
388     suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP10_TXRX'))
389     suite.addTest(TransportPCEtesting(
390         'test_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX'))
391     suite.addTest(TransportPCEtesting(
392         'test_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX'))
393     return suite
394
395 if __name__ == "__main__":
396     RUNNER = unittest.TextTestRunner(verbosity=2)
397     RUNNER.run(test_suite())