#!/usr/bin/env python ############################################################################## # Copyright (c) 2017 Orange, Inc. and others. All rights reserved. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import json import os import psutil import requests import signal import shutil import subprocess import time import unittest import logging import test_utils class TransportPCEtesting(unittest.TestCase): processes = None restconf_baseurl = "http://localhost:8181/restconf" @classmethod def setUpClass(cls): cls.processes = test_utils.start_tpce() cls.processes = test_utils.start_sims(['xpdra', 'roadma']) @classmethod def tearDownClass(cls): for process in cls.processes: test_utils.shutdown_process(process) print("all processes killed") def setUp(self): time.sleep(10) # Connect the ROADMA def test_01_connect_rdm(self): # Config ROADMA url = ("{}/config/network-topology:" "network-topology/topology/topology-netconf/node/ROADM-A1" .format(self.restconf_baseurl)) data = {"node": [{ "node-id": "ROADM-A1", "netconf-node-topology:username": "admin", "netconf-node-topology:password": "admin", "netconf-node-topology:host": "127.0.0.1", "netconf-node-topology:port": test_utils.sims['roadma']['port'], "netconf-node-topology:tcp-only": "false", "netconf-node-topology:pass-through": {}}]} headers = {'content-type': 'application/json'} response = requests.request( "PUT", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin')) self.assertEqual(response.status_code, requests.codes.created) time.sleep(20) # Verify the termination points of the ROADMA def test_02_compareOpenroadmTopologyPortMapping_rdm(self): urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology" .format(self.restconf_baseurl)) headers = {'content-type': 'application/json'} responseTopo = requests.request( "GET", urlTopo, headers=headers, auth=('admin', 'admin')) resTopo = responseTopo.json() nbNode = len(resTopo['network'][0]['node']) nbMapCumul = 0 nbMappings = 0 for i in range(0, nbNode): nodeId = resTopo['network'][0]['node'][i]['node-id'] print("nodeId={}".format(nodeId)) nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1] print("nodeMapId={}".format(nodeMapId)) urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId urlMapListFull = urlMapList.format(self.restconf_baseurl) responseMapList = requests.request( "GET", urlMapListFull, headers=headers, auth=('admin', 'admin')) resMapList = responseMapList.json() nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point']) nbMapCurrent = 0 for j in range(0, nbTp): tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id'] if((not "CP" in tpId) and (not "CTP" in tpId)): urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId urlMapFull = urlMap.format(self.restconf_baseurl) responseMap = requests.request( "GET", urlMapFull, headers=headers, auth=('admin', 'admin')) self.assertEqual(responseMap.status_code, requests.codes.ok) if(responseMap.status_code == requests.codes.ok): nbMapCurrent += 1 nbMapCumul += nbMapCurrent nbMappings -= nbMapCurrent self.assertEqual(nbMappings, 0) # Disconnect the ROADMA def test_03_disconnect_rdm(self): url = ("{}/config/network-topology:" "network-topology/topology/topology-netconf/node/ROADM-A1" .format(self.restconf_baseurl)) data = {} headers = {'content-type': 'application/json'} response = requests.request( "DELETE", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin')) self.assertEqual(response.status_code, requests.codes.ok) # #Connect the XPDRA def test_04_connect_xpdr(self): # Config XPDRA url = ("{}/config/network-topology:" "network-topology/topology/topology-netconf/node/XPDR-A1" .format(self.restconf_baseurl)) data = {"node": [{ "node-id": "XPDR-A1", "netconf-node-topology:username": "admin", "netconf-node-topology:password": "admin", "netconf-node-topology:host": "127.0.0.1", "netconf-node-topology:port": test_utils.sims['xpdra']['port'], "netconf-node-topology:tcp-only": "false", "netconf-node-topology:pass-through": {}}]} headers = {'content-type': 'application/json'} response = requests.request( "PUT", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin')) self.assertEqual(response.status_code, requests.codes.created) time.sleep(20) # #Verify the termination points related to XPDR def test_05_compareOpenroadmTopologyPortMapping_xpdr(self): self.test_02_compareOpenroadmTopologyPortMapping_rdm() # Disconnect the XPDRA def test_06_disconnect_device(self): url = ("{}/config/network-topology:" "network-topology/topology/topology-netconf/node/XPDR-A1" .format(self.restconf_baseurl)) data = {} headers = {'content-type': 'application/json'} response = requests.request( "DELETE", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin')) self.assertEqual(response.status_code, requests.codes.ok) if __name__ == "__main__": # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG) #logging.debug('I am there') unittest.main(verbosity=2)