Bump lighty.io core and add tests support for it
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_gnpy.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 #Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24 class TransportGNPYtesting(unittest.TestCase):
25
26     gnpy_process = None
27     odl_process = None
28     restconf_baseurl = "http://localhost:8181/restconf"
29
30     @classmethod
31     def __init_logfile(cls):
32         if os.path.isfile("./transportpce_tests/gnpy.log"):
33             os.remove("transportpce_tests/gnpy.log")
34
35     @classmethod
36     def setUpClass(cls):
37         print ("starting opendaylight...")
38         cls.odl_process = test_utils.start_tpce()
39         time.sleep(30)
40         print ("opendaylight started")
41
42     @classmethod
43     def tearDownClass(cls):
44         for child in psutil.Process(cls.odl_process.pid).children():
45             child.send_signal(signal.SIGINT)
46             child.wait()
47         cls.odl_process.send_signal(signal.SIGINT)
48         cls.odl_process.wait()
49
50     def setUp(self):
51         time.sleep(2)
52
53     #Mount the different topologies
54     def test_01_connect_clliNetwork(self):
55         url = ("{}/config/ietf-network:networks/network/clli-network"
56                .format(self.restconf_baseurl))
57         topo_clliNet_file = "sample_configs/gnpy/clliNetwork.json"
58         if os.path.isfile(topo_clliNet_file):
59             with open(topo_clliNet_file, 'r') as clli_net:
60                 body = clli_net.read()
61         headers = {'content-type': 'application/json'}
62         response = requests.request(
63              "PUT", url, data=body, headers=headers,
64              auth=('admin', 'admin'))
65         self.assertEqual(response.status_code, requests.codes.ok)
66         time.sleep(3)
67
68     def test_02_connect_openroadmNetwork(self):
69         url = ("{}/config/ietf-network:networks/network/openroadm-network"
70                .format(self.restconf_baseurl))
71         topo_ordNet_file = "sample_configs/gnpy/openroadmNetwork.json"
72         if os.path.isfile(topo_ordNet_file):
73             with open(topo_ordNet_file, 'r') as ord_net:
74                 body = ord_net.read()
75         headers = {'content-type': 'application/json'}
76         response = requests.request(
77              "PUT", url, data=body, headers=headers,
78              auth=('admin', 'admin'))
79         self.assertEqual(response.status_code, requests.codes.ok)
80         time.sleep(3)
81
82     def test_03_connect_openroadmTopology(self):
83         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
84                .format(self.restconf_baseurl))
85         topo_ordTopo_file = "sample_configs/gnpy/openroadmTopology.json"
86         if os.path.isfile(topo_ordTopo_file):
87             with open(topo_ordTopo_file, 'r') as ord_topo:
88                 body = ord_topo.read()
89         headers = {'content-type': 'application/json'}
90         response = requests.request(
91              "PUT", url, data=body, headers=headers,
92              auth=('admin', 'admin'))
93         self.assertEqual(response.status_code, requests.codes.ok)
94         time.sleep(3)
95
96     #Test the gnpy
97     def test_04_path_computation_xpdr_bi(self):
98         url = ("{}/operations/transportpce-pce:path-computation-request"
99               .format(self.restconf_baseurl))
100         body = {"input": {
101                 "service-name": "service-1",
102                 "resource-reserve": "true",
103                 "pce-metric": "hop-count",
104                 "service-handler-header": {
105                     "request-id": "request-1"
106                 },
107                 "service-a-end": {
108                     "node-id": "XPDRA",
109                     "service-rate": "100",
110                     "clli": "nodeA"
111                 },
112                 "service-z-end": {
113                     "node-id": "XPDRB",
114                     "service-rate": "100",
115                     "clli": "nodeB"
116                 }
117             }
118         }
119         headers = {'content-type': 'application/json',
120         "Accept": "application/json"}
121         response = requests.request(
122             "POST", url, data=json.dumps(body), headers=headers,
123             auth=('admin', 'admin'))
124         self.assertEqual(response.status_code, requests.codes.ok)
125         res = response.json()
126         self.assertEqual(res['output']['gnpy-response'][0]['path-dir'], 'A-to-Z')
127         self.assertEqual(res['output']['gnpy-response'][0]['feasibility'],True)
128         self.assertEqual(res['output']['gnpy-response'][1]['path-dir'], 'Z-to-A')
129         self.assertEqual(res['output']['gnpy-response'][1]['feasibility'],True)
130         time.sleep(5)
131
132     #Disconnect the different topology
133     def test_05_disconnect_openroadmTopology(self):
134         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
135                .format(self.restconf_baseurl))
136         data = {}
137         headers = {'content-type': 'application/json'}
138         response = requests.request(
139              "DELETE", url, data=json.dumps(data), headers=headers,
140              auth=('admin', 'admin'))
141         self.assertEqual(response.status_code, requests.codes.ok)
142         time.sleep(3)
143
144     def test_06_disconnect_openroadmNetwork(self):
145         #Config ROADMA
146         url = ("{}/config/ietf-network:networks/network/openroadm-network"
147                .format(self.restconf_baseurl))
148         data = {}
149         headers = {'content-type': 'application/json'}
150         response = requests.request(
151              "DELETE", url, data=json.dumps(data), headers=headers,
152              auth=('admin', 'admin'))
153         self.assertEqual(response.status_code, requests.codes.ok)
154         time.sleep(3)
155
156     def test_07_disconnect_clliNetwork(self):
157         url = ("{}/config/ietf-network:networks/network/clli-network"
158                .format(self.restconf_baseurl))
159         data = {}
160         headers = {'content-type': 'application/json'}
161         response = requests.request(
162              "DELETE", url, data=json.dumps(data), headers=headers,
163              auth=('admin', 'admin'))
164         self.assertEqual(response.status_code, requests.codes.ok)
165         time.sleep(3)
166
167 if __name__ == "__main__":
168     #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
169     unittest.main(verbosity=2)