Order unit tests
[transportpce.git] / tests / transportpce_tests / test_portmapping.py
1 #!/usr/bin/env python
2
3 import json
4 import os
5 import psutil
6 import requests
7 import signal
8 import shutil
9 import subprocess
10 import time
11 import unittest
12
13
14 class TransportPCEtesting(unittest.TestCase):
15
16     testtools_process = None
17     odl_process = None
18
19     @classmethod
20     def __start_testtools(cls):
21         executable = ("./netconf/netconf/tools/netconf-testtool/target/"
22                       "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
23         if os.path.isfile(executable):
24             with open('testtools.log', 'w') as outfile:
25                 cls.testtools_process = subprocess.Popen(
26                     ["java", "-jar", executable, "--schemas-dir", "schemas",
27                      "--initial-config-xml", "sample-config-ROADM.xml"],
28                     stdout=outfile)
29
30     @classmethod
31     def __start_odl(cls):
32         executable = "../karaf/target/assembly/bin/karaf"
33         with open('odl.log', 'w') as outfile:
34             cls.odl_process = subprocess.Popen(
35                 ["bash", executable], stdout=outfile,
36                 stdin=open(os.devnull))
37
38     @classmethod
39     def setUpClass(cls):
40         cls.__start_testtools()
41         cls.__start_odl()
42         time.sleep(30)
43
44     @classmethod
45     def tearDownClass(cls):
46         cls.testtools_process.send_signal(signal.SIGINT)
47         cls.testtools_process.wait()
48         for child in psutil.Process(cls.odl_process.pid).children():
49             child.send_signal(signal.SIGINT)
50             child.wait()
51         cls.odl_process.send_signal(signal.SIGINT)
52         cls.odl_process.wait()
53
54     def setUp(self):
55         time.sleep(1)
56
57     def test_01_connect_device(self):
58         url = ("http://127.0.0.1:8181/restconf/config/network-topology:"
59                "network-topology/topology/topology-netconf/node/ROADMA")
60         data = {"node": [{
61             "node-id": "ROADMA",
62             "netconf-node-topology:username": "admin",
63             "netconf-node-topology:password": "admin",
64             "netconf-node-topology:host": "127.0.0.1",
65             "netconf-node-topology:port": "17830",
66             "netconf-node-topology:tcp-only": "false",
67             "netconf-node-topology:pass-through": {}}]}
68         headers = {'content-type': 'application/json'}
69         response = requests.request(
70             "PUT", url, data=json.dumps(data), headers=headers,
71             auth=('admin', 'admin'))
72         self.assertEqual(response.status_code, requests.codes.created)
73         time.sleep(10)
74
75     def test_02_device_connected(self):
76         url = ("http://127.0.0.1:8181/restconf/operational/network-topology:"
77                "network-topology/topology/topology-netconf/node/ROADMA")
78         headers = {'content-type': 'application/json'}
79         response = requests.request(
80             "GET", url, headers=headers, auth=('admin', 'admin'))
81         self.assertEqual(response.status_code, requests.codes.ok)
82         res = response.json()
83         self.assertEqual(
84             res['node'][0]['netconf-node-topology:connection-status'],
85             'connected')
86         time.sleep(2)
87
88     def test_03_portmapping_SRG1_PP3_TXRX(self):
89         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
90                "nodes/ROADMA/mapping/SRG1-PP3-TXRX")
91         headers = {'content-type': 'application/json'}
92         response = requests.request(
93             "GET", url, headers=headers, auth=('admin', 'admin'))
94         self.assertEqual(response.status_code, requests.codes.ok)
95         res = response.json()
96         self.assertIn(
97             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
98              'logical-connection-point': 'SRG1-PP3-TXRX'},
99             res['mapping'])
100
101     def test_04_portmapping_SRG1_PP6_TXRX(self):
102         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
103                "nodes/ROADMA/mapping/SRG1-PP6-TXRX")
104         headers = {'content-type': 'application/json'}
105         response = requests.request(
106             "GET", url, headers=headers, auth=('admin', 'admin'))
107         self.assertEqual(response.status_code, requests.codes.ok)
108         res = response.json()
109         self.assertIn(
110             {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
111              'logical-connection-point': 'SRG1-PP6-TXRX'},
112             res['mapping'])
113
114     def test_05_portmapping_DEG1_TTP_TXRX(self):
115         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
116                "nodes/ROADMA/mapping/DEG1-TTP-TXRX")
117         headers = {'content-type': 'application/json'}
118         response = requests.request(
119             "GET", url, headers=headers, auth=('admin', 'admin'))
120         self.assertEqual(response.status_code, requests.codes.ok)
121         res = response.json()
122         self.assertIn(
123             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
124              'logical-connection-point': 'DEG1-TTP-TXRX'},
125             res['mapping'])
126
127     def test_06_portmapping_SRG1_PP9_TXRX(self):
128         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
129                "nodes/ROADMA/mapping/SRG1-PP9-TXRX")
130         headers = {'content-type': 'application/json'}
131         response = requests.request(
132             "GET", url, headers=headers, auth=('admin', 'admin'))
133         self.assertEqual(response.status_code, requests.codes.ok)
134         res = response.json()
135         self.assertIn(
136             {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
137              'logical-connection-point': 'SRG1-PP9-TXRX'},
138             res['mapping'])
139
140     def test_07_portmapping_SRG1_PP16_TXRX(self):
141         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
142                "nodes/ROADMA/mapping/SRG1-PP16-TXRX")
143         headers = {'content-type': 'application/json'}
144         response = requests.request(
145             "GET", url, headers=headers, auth=('admin', 'admin'))
146         self.assertEqual(response.status_code, requests.codes.ok)
147         res = response.json()
148         self.assertIn(
149             {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
150              'logical-connection-point': 'SRG1-PP16-TXRX'},
151             res['mapping'])
152
153     def test_08_portmapping_SRG1_PP4_TXRX(self):
154         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
155                "nodes/ROADMA/mapping/SRG1-PP4-TXRX")
156         headers = {'content-type': 'application/json'}
157         response = requests.request(
158             "GET", url, headers=headers, auth=('admin', 'admin'))
159         self.assertEqual(response.status_code, requests.codes.ok)
160         res = response.json()
161         self.assertIn(
162             {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
163              'logical-connection-point': 'SRG1-PP4-TXRX'},
164             res['mapping'])
165
166     def test_09_portmapping_SRG1_PP2_TXRX(self):
167         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
168                "nodes/ROADMA/mapping/SRG1-PP2-TXRX")
169         headers = {'content-type': 'application/json'}
170         response = requests.request(
171             "GET", url, headers=headers, auth=('admin', 'admin'))
172         self.assertEqual(response.status_code, requests.codes.ok)
173         res = response.json()
174         self.assertIn(
175             {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
176              'logical-connection-point': 'SRG1-PP2-TXRX'},
177             res['mapping'])
178
179     def test_10_portmapping_SRG1_PP14_TXRX(self):
180         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
181                "nodes/ROADMA/mapping/SRG1-PP14-TXRX")
182         headers = {'content-type': 'application/json'}
183         response = requests.request(
184             "GET", url, headers=headers, auth=('admin', 'admin'))
185         self.assertEqual(response.status_code, requests.codes.ok)
186         res = response.json()
187         self.assertIn(
188             {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
189              'logical-connection-point': 'SRG1-PP14-TXRX'},
190             res['mapping'])
191
192     def test_11_portmapping_SRG1_PP11_TXRX(self):
193         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
194                "nodes/ROADMA/mapping/SRG1-PP11-TXRX")
195         headers = {'content-type': 'application/json'}
196         response = requests.request(
197             "GET", url, headers=headers, auth=('admin', 'admin'))
198         self.assertEqual(response.status_code, requests.codes.ok)
199         res = response.json()
200         self.assertIn(
201             {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
202              'logical-connection-point': 'SRG1-PP11-TXRX'},
203             res['mapping'])
204
205     def test_12_portmapping_SRG1_PP7_TXRX(self):
206         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
207                "nodes/ROADMA/mapping/SRG1-PP7-TXRX")
208         headers = {'content-type': 'application/json'}
209         response = requests.request(
210             "GET", url, headers=headers, auth=('admin', 'admin'))
211         self.assertEqual(response.status_code, requests.codes.ok)
212         res = response.json()
213         self.assertIn(
214             {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
215              'logical-connection-point': 'SRG1-PP7-TXRX'},
216             res['mapping'])
217
218     def test_13_portmapping_DEG2_TTP_TXRX(self):
219         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
220                "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
221         headers = {'content-type': 'application/json'}
222         response = requests.request(
223             "GET", url, headers=headers, auth=('admin', 'admin'))
224         self.assertEqual(response.status_code, requests.codes.ok)
225         res = response.json()
226         self.assertIn(
227             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
228              'logical-connection-point': 'DEG2-TTP-TXRX'},
229             res['mapping'])
230
231     def test_14_portmapping_DEG2_TTP_TXRX(self):
232         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
233                "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
234         headers = {'content-type': 'application/json'}
235         response = requests.request(
236             "GET", url, headers=headers, auth=('admin', 'admin'))
237         self.assertEqual(response.status_code, requests.codes.ok)
238         res = response.json()
239         self.assertIn(
240             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
241              'logical-connection-point': 'DEG2-TTP-TXRX'},
242             res['mapping'])
243
244     def test_15_portmapping_SRG1_PP12_TXRX(self):
245         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
246                "nodes/ROADMA/mapping/SRG1-PP12-TXRX")
247         headers = {'content-type': 'application/json'}
248         response = requests.request(
249             "GET", url, headers=headers, auth=('admin', 'admin'))
250         self.assertEqual(response.status_code, requests.codes.ok)
251         res = response.json()
252         self.assertIn(
253             {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
254              'logical-connection-point': 'SRG1-PP12-TXRX'},
255             res['mapping'])
256
257     def test_16_portmapping_SRG1_PP8_TXRX(self):
258         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
259                "nodes/ROADMA/mapping/SRG1-PP8-TXRX")
260         headers = {'content-type': 'application/json'}
261         response = requests.request(
262             "GET", url, headers=headers, auth=('admin', 'admin'))
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         self.assertIn(
266             {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
267              'logical-connection-point': 'SRG1-PP8-TXRX'},
268             res['mapping'])
269
270     def test_17_portmapping_SRG1_PP5_TXRX(self):
271         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
272                "nodes/ROADMA/mapping/SRG1-PP5-TXRX")
273         headers = {'content-type': 'application/json'}
274         response = requests.request(
275             "GET", url, headers=headers, auth=('admin', 'admin'))
276         self.assertEqual(response.status_code, requests.codes.ok)
277         res = response.json()
278         self.assertIn(
279             {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
280              'logical-connection-point': 'SRG1-PP5-TXRX'},
281             res['mapping'])
282
283     def test_18_portmapping_SRG1_PP13_TXRX(self):
284         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
285                "nodes/ROADMA/mapping/SRG1-PP13-TXRX")
286         headers = {'content-type': 'application/json'}
287         response = requests.request(
288             "GET", url, headers=headers, auth=('admin', 'admin'))
289         self.assertEqual(response.status_code, requests.codes.ok)
290         res = response.json()
291         self.assertIn(
292             {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
293              'logical-connection-point': 'SRG1-PP13-TXRX'},
294             res['mapping'])
295
296     def test_19_portmapping_SRG1_PP15_TXRX(self):
297         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
298                "nodes/ROADMA/mapping/SRG1-PP15-TXRX")
299         headers = {'content-type': 'application/json'}
300         response = requests.request(
301             "GET", url, headers=headers, auth=('admin', 'admin'))
302         self.assertEqual(response.status_code, requests.codes.ok)
303         res = response.json()
304         self.assertIn(
305             {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
306              'logical-connection-point': 'SRG1-PP15-TXRX'},
307             res['mapping'])
308
309     def test_20_portmapping_SRG1_PP10_TXRX(self):
310         url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
311                "nodes/ROADMA/mapping/SRG1-PP10-TXRX")
312         headers = {'content-type': 'application/json'}
313         response = requests.request(
314             "GET", url, headers=headers, auth=('admin', 'admin'))
315         self.assertEqual(response.status_code, requests.codes.ok)
316         res = response.json()
317         self.assertIn(
318             {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
319              'logical-connection-point': 'SRG1-PP10-TXRX'},
320             res['mapping'])
321
322     def test_21_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
323         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
324         data = {"renderer:input": {
325             "renderer:service-name": "service_32",
326             "renderer:wave-number": "32",
327             "renderer:operation": "create",
328             "renderer:nodes": [
329                 {"renderer:node-id": "ROADMA",
330                  "renderer:src-tp": "DEG1-TTP-TXRX",
331                  "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
332         headers = {'content-type': 'application/json'}
333         response = requests.request(
334             "POST", url, data=json.dumps(data),
335             headers=headers, auth=('admin', 'admin'))
336         self.assertEqual(response.status_code, requests.codes.ok)
337         self.assertEqual(response.json(), {
338             'output': {
339                 'result':
340                 'Roadm-connection successfully created for nodes [ROADMA]'}})
341
342     def test_22_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
343         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
344         data = {"renderer:input": {
345             "renderer:service-name": "service_32",
346             "renderer:wave-number": "32",
347             "renderer:operation": "create",
348             "renderer:nodes": [
349                 {"renderer:node-id": "ROADMA",
350                  "renderer:src-tp": "SRG1-PP3-TXRX",
351                  "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
352         headers = {'content-type': 'application/json'}
353         response = requests.request(
354             "POST", url, data=json.dumps(data),
355             headers=headers, auth=('admin', 'admin'))
356         self.assertEqual(response.status_code, requests.codes.ok)
357         self.assertEqual(response.json(), {
358             'output': {
359                 'result':
360                 'Roadm-connection successfully created for nodes [ROADMA]'}})
361
362     def test_23_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
363         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
364         data = {"renderer:input": {
365             "renderer:service-name": "service_32",
366             "renderer:wave-number": "32",
367             "renderer:operation": "delete",
368             "renderer:nodes": [
369                 {"renderer:node-id": "ROADMA",
370                  "renderer:src-tp": "DEG1-TTP-TXRX",
371                  "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
372         headers = {'content-type': 'application/json'}
373         response = requests.request(
374             "POST", url, data=json.dumps(data),
375             headers=headers, auth=('admin', 'admin'))
376         self.assertEqual(response.status_code, requests.codes.ok)
377         self.assertEqual(response.json(), {
378             'output': {'result': 'Request processed'}})
379
380     def test_24_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
381         url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
382         data = {"renderer:input": {
383             "renderer:service-name": "service_32",
384             "renderer:wave-number": "32",
385             "renderer:operation": "delete",
386             "renderer:nodes": [
387                 {"renderer:node-id": "ROADMA",
388                  "renderer:src-tp": "SRG1-PP3-TXRX",
389                  "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
390         headers = {'content-type': 'application/json'}
391         response = requests.request(
392             "POST", url, data=json.dumps(data),
393             headers=headers, auth=('admin', 'admin'))
394         self.assertEqual(response.status_code, requests.codes.ok)
395         self.assertEqual(response.json(), {
396             'output': {'result': 'Request processed'}})
397
398
399 if __name__ == "__main__":
400     unittest.main(verbosity=2)