f6285bafb527b5a9cadd346924845d92e2a5db48
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_topoPortMapping.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 from common import test_utils
23
24
25 class TransportPCEtesting(unittest.TestCase):
26
27     processes = None
28     restconf_baseurl = "http://localhost:8181/restconf"
29
30     @classmethod
31     def setUpClass(cls):
32         cls.processes = test_utils.start_tpce()
33         cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
34
35     @classmethod
36     def tearDownClass(cls):
37         for process in cls.processes:
38             test_utils.shutdown_process(process)
39         print("all processes killed")
40
41     def setUp(self):
42         time.sleep(10)
43
44     # Connect the ROADMA
45     def test_01_connect_rdm(self):
46         # Config ROADMA
47         url = ("{}/config/network-topology:"
48                "network-topology/topology/topology-netconf/node/ROADM-A1"
49                .format(self.restconf_baseurl))
50         data = {"node": [{
51             "node-id": "ROADM-A1",
52             "netconf-node-topology:username": "admin",
53             "netconf-node-topology:password": "admin",
54             "netconf-node-topology:host": "127.0.0.1",
55             "netconf-node-topology:port": test_utils.sims['roadma']['port'],
56             "netconf-node-topology:tcp-only": "false",
57             "netconf-node-topology:pass-through": {}}]}
58         headers = {'content-type': 'application/json'}
59         response = requests.request(
60             "PUT", url, data=json.dumps(data), headers=headers,
61             auth=('admin', 'admin'))
62         self.assertEqual(response.status_code, requests.codes.created)
63         time.sleep(20)
64
65     # Verify the termination points of the ROADMA
66     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
67         urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
68                    .format(self.restconf_baseurl))
69         headers = {'content-type': 'application/json'}
70         responseTopo = requests.request(
71             "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
72         resTopo = responseTopo.json()
73         nbNode = len(resTopo['network'][0]['node'])
74         nbMapCumul = 0
75         nbMappings = 0
76         for i in range(0, nbNode):
77             nodeId = resTopo['network'][0]['node'][i]['node-id']
78             print("nodeId={}".format(nodeId))
79             nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
80             print("nodeMapId={}".format(nodeMapId))
81             urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
82             urlMapListFull = urlMapList.format(self.restconf_baseurl)
83             responseMapList = requests.request(
84                 "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
85             resMapList = responseMapList.json()
86
87             nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
88             nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
89             nbMapCurrent = 0
90             for j in range(0, nbTp):
91                 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
92                 if((not "CP" in tpId) and (not "CTP" in tpId)):
93                     urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
94                     urlMapFull = urlMap.format(self.restconf_baseurl)
95                     responseMap = requests.request(
96                         "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
97                     self.assertEqual(responseMap.status_code, requests.codes.ok)
98                     if(responseMap.status_code == requests.codes.ok):
99                         nbMapCurrent += 1
100             nbMapCumul += nbMapCurrent
101         nbMappings -= nbMapCurrent
102         self.assertEqual(nbMappings, 0)
103
104     # Disconnect the ROADMA
105     def test_03_disconnect_rdm(self):
106         url = ("{}/config/network-topology:"
107                "network-topology/topology/topology-netconf/node/ROADM-A1"
108                .format(self.restconf_baseurl))
109         data = {}
110         headers = {'content-type': 'application/json'}
111         response = requests.request(
112             "DELETE", url, data=json.dumps(data), headers=headers,
113             auth=('admin', 'admin'))
114         self.assertEqual(response.status_code, requests.codes.ok)
115
116 #     #Connect the XPDRA
117     def test_04_connect_xpdr(self):
118         # Config XPDRA
119         url = ("{}/config/network-topology:"
120                "network-topology/topology/topology-netconf/node/XPDR-A1"
121                .format(self.restconf_baseurl))
122         data = {"node": [{
123             "node-id": "XPDR-A1",
124             "netconf-node-topology:username": "admin",
125             "netconf-node-topology:password": "admin",
126             "netconf-node-topology:host": "127.0.0.1",
127             "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
128             "netconf-node-topology:tcp-only": "false",
129             "netconf-node-topology:pass-through": {}}]}
130         headers = {'content-type': 'application/json'}
131         response = requests.request(
132             "PUT", url, data=json.dumps(data), headers=headers,
133             auth=('admin', 'admin'))
134         self.assertEqual(response.status_code, requests.codes.created)
135         time.sleep(20)
136
137 #     #Verify the termination points related to XPDR
138     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
139         self.test_02_compareOpenroadmTopologyPortMapping_rdm()
140
141     # Disconnect the XPDRA
142     def test_06_disconnect_device(self):
143         url = ("{}/config/network-topology:"
144                "network-topology/topology/topology-netconf/node/XPDR-A1"
145                .format(self.restconf_baseurl))
146         data = {}
147         headers = {'content-type': 'application/json'}
148         response = requests.request(
149             "DELETE", url, data=json.dumps(data), headers=headers,
150             auth=('admin', 'admin'))
151         self.assertEqual(response.status_code, requests.codes.ok)
152
153
154 if __name__ == "__main__":
155     # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
156     #logging.debug('I am there')
157     unittest.main(verbosity=2)