fix some pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_topoPortMapping.py
index f6285bafb527b5a9cadd346924845d92e2a5db48..66c0c23d328fb0c271dd87877a6942e735e8472b 100644 (file)
@@ -9,23 +9,18 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-import json
-import os
-import psutil
-import requests
-import signal
-import shutil
-import subprocess
-import time
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
 import unittest
-import logging
+import time
+import requests
 from common import test_utils
 
 
 class TransportPCEtesting(unittest.TestCase):
 
     processes = None
-    restconf_baseurl = "http://localhost:8181/restconf"
 
     @classmethod
     def setUpClass(cls):
@@ -34,6 +29,7 @@ class TransportPCEtesting(unittest.TestCase):
 
     @classmethod
     def tearDownClass(cls):
+        # pylint: disable=not-an-iterable
         for process in cls.processes:
             test_utils.shutdown_process(process)
         print("all processes killed")
@@ -43,32 +39,12 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Connect the ROADMA
     def test_01_connect_rdm(self):
-        # Config ROADMA
-        url = ("{}/config/network-topology:"
-               "network-topology/topology/topology-netconf/node/ROADM-A1"
-               .format(self.restconf_baseurl))
-        data = {"node": [{
-            "node-id": "ROADM-A1",
-            "netconf-node-topology:username": "admin",
-            "netconf-node-topology:password": "admin",
-            "netconf-node-topology:host": "127.0.0.1",
-            "netconf-node-topology:port": test_utils.sims['roadma']['port'],
-            "netconf-node-topology:tcp-only": "false",
-            "netconf-node-topology:pass-through": {}}]}
-        headers = {'content-type': 'application/json'}
-        response = requests.request(
-            "PUT", url, data=json.dumps(data), headers=headers,
-            auth=('admin', 'admin'))
-        self.assertEqual(response.status_code, requests.codes.created)
-        time.sleep(20)
+        response = test_utils.mount_device("ROADM-A1", 'roadma')
+        self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
     # Verify the termination points of the ROADMA
     def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
-        urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
-                   .format(self.restconf_baseurl))
-        headers = {'content-type': 'application/json'}
-        responseTopo = requests.request(
-            "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
+        responseTopo = test_utils.get_ordm_topo_request("")
         resTopo = responseTopo.json()
         nbNode = len(resTopo['network'][0]['node'])
         nbMapCumul = 0
@@ -78,10 +54,7 @@ class TransportPCEtesting(unittest.TestCase):
             print("nodeId={}".format(nodeId))
             nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
             print("nodeMapId={}".format(nodeMapId))
-            urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
-            urlMapListFull = urlMapList.format(self.restconf_baseurl)
-            responseMapList = requests.request(
-                "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
+            responseMapList = test_utils.portmapping_request(nodeMapId)
             resMapList = responseMapList.json()
 
             nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
@@ -89,13 +62,10 @@ class TransportPCEtesting(unittest.TestCase):
             nbMapCurrent = 0
             for j in range(0, nbTp):
                 tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
-                if((not "CP" in tpId) and (not "CTP" in tpId)):
-                    urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
-                    urlMapFull = urlMap.format(self.restconf_baseurl)
-                    responseMap = requests.request(
-                        "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
+                if (not "CP" in tpId) and (not "CTP" in tpId):
+                    responseMap = test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
                     self.assertEqual(responseMap.status_code, requests.codes.ok)
-                    if(responseMap.status_code == requests.codes.ok):
+                    if responseMap.status_code == requests.codes.ok:
                         nbMapCurrent += 1
             nbMapCumul += nbMapCurrent
         nbMappings -= nbMapCurrent
@@ -103,36 +73,13 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Disconnect the ROADMA
     def test_03_disconnect_rdm(self):
-        url = ("{}/config/network-topology:"
-               "network-topology/topology/topology-netconf/node/ROADM-A1"
-               .format(self.restconf_baseurl))
-        data = {}
-        headers = {'content-type': 'application/json'}
-        response = requests.request(
-            "DELETE", url, data=json.dumps(data), headers=headers,
-            auth=('admin', 'admin'))
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils.unmount_device("ROADM-A1")
+        self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
 #     #Connect the XPDRA
     def test_04_connect_xpdr(self):
-        # Config XPDRA
-        url = ("{}/config/network-topology:"
-               "network-topology/topology/topology-netconf/node/XPDR-A1"
-               .format(self.restconf_baseurl))
-        data = {"node": [{
-            "node-id": "XPDR-A1",
-            "netconf-node-topology:username": "admin",
-            "netconf-node-topology:password": "admin",
-            "netconf-node-topology:host": "127.0.0.1",
-            "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
-            "netconf-node-topology:tcp-only": "false",
-            "netconf-node-topology:pass-through": {}}]}
-        headers = {'content-type': 'application/json'}
-        response = requests.request(
-            "PUT", url, data=json.dumps(data), headers=headers,
-            auth=('admin', 'admin'))
-        self.assertEqual(response.status_code, requests.codes.created)
-        time.sleep(20)
+        response = test_utils.mount_device("XPDR-A1", 'xpdra')
+        self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
 
 #     #Verify the termination points related to XPDR
     def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
@@ -140,15 +87,8 @@ class TransportPCEtesting(unittest.TestCase):
 
     # Disconnect the XPDRA
     def test_06_disconnect_device(self):
-        url = ("{}/config/network-topology:"
-               "network-topology/topology/topology-netconf/node/XPDR-A1"
-               .format(self.restconf_baseurl))
-        data = {}
-        headers = {'content-type': 'application/json'}
-        response = requests.request(
-            "DELETE", url, data=json.dumps(data), headers=headers,
-            auth=('admin', 'admin'))
-        self.assertEqual(response.status_code, requests.codes.ok)
+        response = test_utils.unmount_device("XPDR-A1")
+        self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
 
 if __name__ == "__main__":