#!/usr/bin/env python
+##############################################################################
+#Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
import json
import os
import psutil
import requests
+import signal
import shutil
import subprocess
import time
import unittest
-import zipfile
class TransportPCEtesting(unittest.TestCase):
testtools_process = None
odl_process = None
+ restconf_baseurl = "http://127.0.0.1:8181/restconf"
@classmethod
def __start_testtools(cls):
executable = ("./netconf/netconf/tools/netconf-testtool/target/"
- "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
+ "netconf-testtool-1.3.1-SNAPSHOT-executable.jar")
if os.path.isfile(executable):
with open('testtools.log', 'w') as outfile:
cls.testtools_process = subprocess.Popen(
@classmethod
def __start_odl(cls):
- zfile = "../karaf/target/transportpce-karaf-0.2.0-SNAPSHOT.zip"
- executable = "transportpce-karaf-0.2.0-SNAPSHOT/bin/karaf"
- if os.path.isfile(zfile):
- try:
- shutil.rmtree("transportpce-karaf-0.2.0-SNAPSHOT")
- except OSError:
- pass
- zipobj = zipfile.ZipFile(zfile)
- zipobj.extractall()
- with open('odl.log', 'w') as outfile:
- cls.odl_process = subprocess.Popen(
- ["bash", executable], stdout=outfile)
+ executable = "../karaf/target/assembly/bin/karaf"
+ with open('odl.log', 'w') as outfile:
+ cls.odl_process = subprocess.Popen(
+ ["bash", executable], stdout=outfile,
+ stdin=open(os.devnull))
@classmethod
def setUpClass(cls):
cls.__start_testtools()
cls.__start_odl()
- time.sleep(30)
+ time.sleep(60)
@classmethod
def tearDownClass(cls):
- cls.testtools_process.kill()
+ cls.testtools_process.send_signal(signal.SIGINT)
+ cls.testtools_process.wait()
for child in psutil.Process(cls.odl_process.pid).children():
- child.kill()
- cls.odl_process.kill()
+ child.send_signal(signal.SIGINT)
+ child.wait()
+ cls.odl_process.send_signal(signal.SIGINT)
+ cls.odl_process.wait()
def setUp(self):
time.sleep(1)
- def test_connect_device(self):
- url = ("http://127.0.0.1:8181/restconf/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA")
+ def test_01_connect_device(self):
+ url = ("{}/config/network-topology:"
+ "network-topology/topology/topology-netconf/node/ROADMA"
+ .format(self.restconf_baseurl))
data = {"node": [{
"node-id": "ROADMA",
"netconf-node-topology:username": "admin",
"PUT", url, data=json.dumps(data), headers=headers,
auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(10)
+ time.sleep(20)
- def test_device_connected(self):
- url = ("http://127.0.0.1:8181/restconf/operational/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA")
+ def test_02_device_connected(self):
+ url = ("{}/operational/network-topology:"
+ "network-topology/topology/topology-netconf/node/ROADMA"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
self.assertEqual(
res['node'][0]['netconf-node-topology:connection-status'],
'connected')
- time.sleep(2)
+ time.sleep(10)
- def test_portmapping_SRG1_PP3_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP3-TXRX")
+ def test_03_portmapping_SRG1_PP3_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP3-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP3-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP6_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP6-TXRX")
+ def test_04_portmapping_SRG1_PP6_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP6-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP6-TXRX'},
res['mapping'])
- def test_portmapping_DEG1_TTP_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/DEG1-TTP-TXRX")
+ def test_05_portmapping_DEG1_TTP_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/DEG1-TTP-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'DEG1-TTP-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP9_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP9-TXRX")
+ def test_06_portmapping_SRG1_PP9_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP9-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP9-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP16_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP16-TXRX")
+ def test_07_portmapping_SRG1_PP16_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP16-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP16-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP4_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP4-TXRX")
+ def test_08_portmapping_SRG1_PP4_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP4-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP4-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP2_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP2-TXRX")
+ def test_09_portmapping_SRG1_PP2_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP2-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP2-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP14_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP14-TXRX")
+ def test_10_portmapping_SRG1_PP14_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP14-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP14-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP11_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP11-TXRX")
+ def test_11_portmapping_SRG1_PP11_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP11-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP11-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP7_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP7-TXRX")
+ def test_12_portmapping_SRG1_PP7_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP7-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP7-TXRX'},
res['mapping'])
- def test_portmapping_DEG2_TTP_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
+ def test_13_portmapping_DEG2_TTP_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/DEG2-TTP-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'DEG2-TTP-TXRX'},
res['mapping'])
- def test_portmapping_DEG2_TTP_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
+ def test_14_portmapping_DEG2_TTP_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/DEG2-TTP-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'DEG2-TTP-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP12_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP12-TXRX")
+ def test_15_portmapping_SRG1_PP12_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP12-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP12-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP8_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP8-TXRX")
+ def test_16_portmapping_SRG1_PP8_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP8-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP8-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP5_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP5-TXRX")
+ def test_17_portmapping_SRG1_PP5_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP5-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP5-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP13_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP13-TXRX")
+ def test_18_portmapping_SRG1_PP13_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP13-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP13-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP15_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP15-TXRX")
+ def test_19_portmapping_SRG1_PP15_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP15-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP15-TXRX'},
res['mapping'])
- def test_portmapping_SRG1_PP10_TXRX(self):
- url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
- "nodes/ROADMA/mapping/SRG1-PP10-TXRX")
+ def test_20_portmapping_SRG1_PP10_TXRX(self):
+ url = ("{}/config/portmapping:network/"
+ "nodes/ROADMA/mapping/SRG1-PP10-TXRX"
+ .format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
'logical-connection-point': 'SRG1-PP10-TXRX'},
res['mapping'])
+ def test_21_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
+ url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
+ data = {"renderer:input": {
+ "renderer:service-name": "service_32",
+ "renderer:wave-number": "32",
+ "renderer:operation": "create",
+ "renderer:nodes": [
+ {"renderer:node-id": "ROADMA",
+ "renderer:src-tp": "DEG1-TTP-TXRX",
+ "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
+ headers = {'content-type': 'application/json'}
+ response = requests.request(
+ "POST", url, data=json.dumps(data),
+ headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ self.assertEqual(response.json(), {
+ 'output': {
+ 'result':
+ 'Roadm-connection successfully created for nodes [ROADMA]'}})
+
+ def test_22_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
+ url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
+ data = {"renderer:input": {
+ "renderer:service-name": "service_32",
+ "renderer:wave-number": "32",
+ "renderer:operation": "create",
+ "renderer:nodes": [
+ {"renderer:node-id": "ROADMA",
+ "renderer:src-tp": "SRG1-PP3-TXRX",
+ "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
+ headers = {'content-type': 'application/json'}
+ response = requests.request(
+ "POST", url, data=json.dumps(data),
+ headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ self.assertEqual(response.json(), {
+ 'output': {
+ 'result':
+ 'Roadm-connection successfully created for nodes [ROADMA]'}})
+
+ def test_23_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
+ url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
+ data = {"renderer:input": {
+ "renderer:service-name": "service_32",
+ "renderer:wave-number": "32",
+ "renderer:operation": "delete",
+ "renderer:nodes": [
+ {"renderer:node-id": "ROADMA",
+ "renderer:src-tp": "DEG1-TTP-TXRX",
+ "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
+ headers = {'content-type': 'application/json'}
+ response = requests.request(
+ "POST", url, data=json.dumps(data),
+ headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ self.assertEqual(response.json(), {
+ 'output': {'result': 'Request processed'}})
+
+ def test_24_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
+ url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
+ data = {"renderer:input": {
+ "renderer:service-name": "service_32",
+ "renderer:wave-number": "32",
+ "renderer:operation": "delete",
+ "renderer:nodes": [
+ {"renderer:node-id": "ROADMA",
+ "renderer:src-tp": "SRG1-PP3-TXRX",
+ "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
+ headers = {'content-type': 'application/json'}
+ response = requests.request(
+ "POST", url, data=json.dumps(data),
+ headers=headers, auth=('admin', 'admin'))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ self.assertEqual(response.json(), {
+ 'output': {'result': 'Request processed'}})
-def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(TransportPCEtesting('test_connect_device'))
- suite.addTest(TransportPCEtesting('test_device_connected'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP3_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP6_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_DEG1_TTP_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP9_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP16_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP4_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP2_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP14_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP11_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP7_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_DEG2_TTP_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP12_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP8_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP5_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP13_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP15_TXRX'))
- suite.addTest(TransportPCEtesting('test_portmapping_SRG1_PP10_TXRX'))
- return suite
if __name__ == "__main__":
- RUNNER = unittest.TextTestRunner(verbosity=2)
- RUNNER.run(test_suite())
+ unittest.main(verbosity=2)