#!/usr/bin/env python
##############################################################################
-#Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
+# Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
##############################################################################
import json
-import os
-import psutil
-import requests
import signal
-import shutil
-import subprocess
import time
import unittest
-import logging
+import requests
+import psutil
import test_utils
+
class TransportPCEtesting(unittest.TestCase):
honeynode_process1 = None
odl_process = None
restconf_baseurl = "http://localhost:8181/restconf"
-#START_IGNORE_XTESTING
+# START_IGNORE_XTESTING
@classmethod
def setUpClass(cls):
- print ("starting honeynode1...")
+ print("starting honeynode1...")
cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
time.sleep(20)
- print ("starting honeynode2...")
+ print("starting honeynode2...")
cls.honeynode_process2 = test_utils.start_roadma_honeynode()
time.sleep(20)
- print ("starting opendaylight...")
+ print("starting opendaylight...")
cls.odl_process = test_utils.start_tpce()
time.sleep(60)
- print ("opendaylight started")
+ print("opendaylight started")
@classmethod
def tearDownClass(cls):
def setUp(self):
time.sleep(10)
-#END_IGNORE_XTESTING
+# END_IGNORE_XTESTING
- #Connect the ROADMA
+ # Connect the ROADMA
def test_01_connect_rdm(self):
- #Config ROADMA
+ # Config ROADMA
url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
+ "network-topology/topology/topology-netconf/node/ROADMA01"
.format(self.restconf_baseurl))
data = {"node": [{
- "node-id": "ROADMA",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17831",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
+ "node-id": "ROADMA01",
+ "netconf-node-topology:username": "admin",
+ "netconf-node-topology:password": "admin",
+ "netconf-node-topology:host": "127.0.0.1",
+ "netconf-node-topology:port": "17831",
+ "netconf-node-topology:tcp-only": "false",
+ "netconf-node-topology:pass-through": {}}]}
headers = {'content-type': 'application/json'}
response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ "PUT", url, data=json.dumps(data), headers=headers,
+ auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.created)
time.sleep(20)
- #Verify the termination points of the ROADMA
+ # Verify the termination points of the ROADMA
def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
urlTopo = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
- responseTopo = requests.request(
- "GET", urlTopo, headers=headers, auth=('admin', 'admin'))
+ responseTopo = requests.request("GET", urlTopo, headers=headers, auth=('admin', 'admin'))
resTopo = responseTopo.json()
nbNode = len(resTopo['network'][0]['node'])
nbMapCumul = 0
nodeMapId = nodeId.split("-")[0]
urlMapList = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId
urlMapListFull = urlMapList.format(self.restconf_baseurl)
- responseMapList = requests.request(
- "GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
+ responseMapList = requests.request("GET", urlMapListFull, headers=headers, auth=('admin', 'admin'))
resMapList = responseMapList.json()
nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
for j in range(0, nbTp):
tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
if((not "CP" in tpId) and (not "CTP" in tpId)):
- urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
- urlMapFull = urlMap.format(self.restconf_baseurl)
- responseMap = requests.request(
- "GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
- self.assertEqual(responseMap.status_code, requests.codes.ok)
- if(responseMap.status_code == requests.codes.ok):
+ urlMap = "{}/config/transportpce-portmapping:network/nodes/" + nodeMapId + "/mapping/" + tpId
+ urlMapFull = urlMap.format(self.restconf_baseurl)
+ responseMap = requests.request("GET", urlMapFull, headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(responseMap.status_code, requests.codes.ok)
+ if responseMap.status_code == requests.codes.ok:
nbMapCurrent += 1
nbMapCumul += nbMapCurrent
nbMappings -= nbMapCurrent
self.assertEqual(nbMappings, 0)
- #Disconnect the ROADMA
+ # Disconnect the ROADMA
def test_03_disconnect_rdm(self):
url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
+ "network-topology/topology/topology-netconf/node/ROADMA01"
.format(self.restconf_baseurl))
data = {}
headers = {'content-type': 'application/json'}
response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ "DELETE", url, data=json.dumps(data), headers=headers,
+ auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.ok)
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- #Config XPDRA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRA",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17830",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ # Config XPDRA
+ url = ("{}/config/network-topology:"
+ "network-topology/topology/topology-netconf/node/XPDRA01"
+ .format(self.restconf_baseurl))
+ data = {"node": [{
+ "node-id": "XPDRA01",
+ "netconf-node-topology:username": "admin",
+ "netconf-node-topology:password": "admin",
+ "netconf-node-topology:host": "127.0.0.1",
+ "netconf-node-topology:port": "17830",
+ "netconf-node-topology:tcp-only": "false",
+ "netconf-node-topology:pass-through": {}}]}
+ headers = {'content-type': 'application/json'}
+ response = requests.request(
+ "PUT", url, data=json.dumps(data), headers=headers,
+ auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.created)
+ time.sleep(20)
# #Verify the termination points related to XPDR
def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
self.test_02_compareOpenroadmTopologyPortMapping_rdm()
- #Disconnect the XPDRA
+ # Disconnect the XPDRA
def test_06_disconnect_device(self):
url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA"
- .format(self.restconf_baseurl))
+ "network-topology/topology/topology-netconf/node/XPDRA01"
+ .format(self.restconf_baseurl))
data = {}
headers = {'content-type': 'application/json'}
response = requests.request(
auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.ok)
+
if __name__ == "__main__":
- #logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
+ # logging.basicConfig(filename='./transportpce_tests/log/response.log',filemode='w',level=logging.DEBUG)
#logging.debug('I am there')
unittest.main(verbosity=2)