+++ /dev/null
-../common/
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env python
-##############################################################################
-# Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import os
-
-SIMS = {
- 'xpdra': {'port': '17830', 'configfile': 'oper-XPDRA.xml', 'logfile': 'oper-XPDRA.log'},
- 'roadma': {'port': '17831', 'configfile': 'oper-ROADMA.xml', 'logfile': 'oper-ROADMA.log'},
- 'roadmb': {'port': '17832', 'configfile': 'oper-ROADMB.xml', 'logfile': 'oper-ROADMB.log'},
- 'roadmc': {'port': '17833', 'configfile': 'oper-ROADMC.xml', 'logfile': 'oper-ROADMC.log'},
- 'xpdrc': {'port': '17834', 'configfile': 'oper-XPDRC.xml', 'logfile': 'oper-XPDRC.log'},
- 'roadma-full': {'port': '17821', 'configfile': 'oper-ROADMA-full.xml', 'logfile': 'oper-ROADMA.log'},
- 'roadmc-full': {'port': '17823', 'configfile': 'oper-ROADMC-full.xml', 'logfile': 'oper-ROADMC.log'}
-}
-
-HONEYNODE_EXECUTABLE = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "..", "..", "honeynode", "1.2.1", "honeynode-simulator", "honeycomb-tpce")
-SAMPLES_DIRECTORY = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "..", "..", "sample_configs", "openroadm", "1.2.1")
import base64
import time
import unittest
-
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEFulltesting(unittest.TestCase):
cr_serv_sample_data = {"input": {
}
processes = None
WAITING = 20
+ NODE_VERSION = '1.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma-full', cls.NODE_VERSION),
+ ('roadmc-full', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# connect netconf devices
def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDRA01", 'xpdra')
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDRC01", 'xpdrc')
+ response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADMA01", 'roadma-full')
+ response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADMC01", 'roadmc-full')
+ response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
import sys
import time
import requests
-from common import test_utils
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportGNPYtesting(unittest.TestCase):
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportOlmTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '1.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma-full', cls.NODE_VERSION),
+ ('roadmc-full', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(1)
def test_01_xpdrA_device_connected(self):
- response = test_utils.mount_device("XPDRA01", 'xpdra')
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_xpdrC_device_connected(self):
- response = test_utils.mount_device("XPDRC01", 'xpdrc')
+ response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_rdmA_device_connected(self):
- response = test_utils.mount_device("ROADMA01", 'roadma-full')
+ response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_rdmC_device_connected(self):
- response = test_utils.mount_device("ROADMC01", 'roadmc-full')
+ response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_to_roadmA(self):
import sys
import time
import requests
-from common import test_utils
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEPortMappingTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '1.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(10)
-# def test_01_restconfAPI(self):
-# response = test_utils.get_netconf_oper_request("controller-config")
-# self.assertEqual(response.status_code, requests.codes.ok)
-# res = response.json()
-# self.assertEqual(res['node'] [0] ['netconf-node-topology:connection-status'],
-# 'connected')
-
-# def test_02_restconfAPI(self):
-# response = test_utils.portmapping_request("controller-config")
-# self.assertEqual(response.status_code, requests.codes.not_found)
-# res = response.json()
-# self.assertIn(
-# {"error-type":"application", "error-tag":"data-missing",
-# "error-message":"Request could not be completed because the relevant data model content does not exist "},
-# res['errors']['error'])
-
def test_01_rdm_device_connection(self):
- response = test_utils.mount_device("ROADMA01", 'roadma')
+ response = test_utils.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_rdm_device_connected(self):
res['mapping'])
def test_07_xpdr_device_connection(self):
- response = test_utils.mount_device("XPDRA01", 'xpdra')
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_08_xpdr_device_connected(self):
import unittest
import time
import requests
-from common import test_utils
-
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCERendererTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '1.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(10)
def test_01_rdm_device_connected(self):
- response = test_utils.mount_device("ROADMA01", 'roadma')
+ response = test_utils.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_xpdr_device_connected(self):
- response = test_utils.mount_device("XPDRA01", 'xpdra')
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_rdm_portmapping(self):
import time
import unittest
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '1.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# Connect the ROADMA
def test_01_connect_rdm(self):
- response = test_utils.mount_device("ROADMA01", 'roadma')
+ response = test_utils.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# Verify the termination points of the ROADMA
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- response = test_utils.mount_device("XPDRA01", 'xpdra')
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# #Verify the termination points related to XPDR
import time
import unittest
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCETopologyTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '1.2.1'
CHECK_DICT1 = {
'ROADMA01-SRG1': {
'node_type': 'SRG',
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmb', 'roadmc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION),
+ ('roadmb', cls.NODE_VERSION), ('roadmc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(5)
def test_01_connect_ROADMA(self):
- response = test_utils.mount_device("ROADMA01", 'roadma')
+ response = test_utils.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_getClliNetwork(self):
self.assertEqual(len(listNode), 0)
def test_06_connect_XPDRA(self):
- response = test_utils.mount_device("XPDRA01", 'xpdra')
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_07_getClliNetwork(self):
self.assertEqual(len(check_list[link_type]), 0)
def test_13_connect_ROADMC(self):
- response = test_utils.mount_device("ROADMC01", 'roadmc')
+ response = test_utils.mount_device("ROADMC01", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_14_omsAttributes_ROADMA_ROADMC(self):
self.assertEqual(len(listNode), 0)
def test_21_connect_ROADMB(self):
- response = test_utils.mount_device("ROADMB01", 'roadmb')
+ response = test_utils.mount_device("ROADMB01", ('roadmb', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_22_omsAttributes_ROADMA_ROADMB(self):
+++ /dev/null
-../common/
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env python
-##############################################################################
-# Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import os
-
-SIMS = {
- 'xpdra': {'port': '17840', 'configfile': 'oper-XPDRA.xml', 'logfile': 'oper-XPDRA.log'},
- 'roadma': {'port': '17841', 'configfile': 'oper-ROADMA.xml', 'logfile': 'oper-ROADMA.log'},
- 'roadmb': {'port': '17842', 'configfile': 'oper-ROADMB.xml', 'logfile': 'oper-ROADMB.log'},
- 'roadmc': {'port': '17843', 'configfile': 'oper-ROADMC.xml', 'logfile': 'oper-ROADMC.log'},
- 'roadmd': {'port': '17847', 'configfile': 'oper-ROADMD.xml', 'logfile': 'oper-ROADMD.log'},
- 'xpdrc': {'port': '17844', 'configfile': 'oper-XPDRC.xml', 'logfile': 'oper-XPDRC.log'},
- 'spdra': {'port': '17845', 'configfile': 'oper-SPDRA.xml', 'logfile': 'oper-SPDRA.log'},
- 'spdrc': {'port': '17846', 'configfile': 'oper-SPDRC.xml', 'logfile': 'oper-SPDRC.log'}
-}
-
-HONEYNODE_EXECUTABLE = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "..", "..", "honeynode", "2.2.1", "honeynode-simulator", "honeycomb-tpce")
-SAMPLES_DIRECTORY = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "..", "..", "sample_configs", "openroadm", "2.2.1")
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEFulltesting(unittest.TestCase):
}
WAITING = 20 # nominal value is 300
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDR-C1", 'xpdrc')
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEPortMappingTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['roadmd'])
+ cls.processes = test_utils.start_sims([('roadmd', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(10)
def test_01_rdm_device_connection(self):
- response = test_utils.mount_device("ROADM-D1", 'roadmd')
+ response = test_utils.mount_device("ROADM-D1", ('roadmd', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
test_utils.CODE_SHOULD_BE_201)
import unittest
import time
import requests
-from common import test_utils
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportNbiNotificationstesting(unittest.TestCase):
}
WAITING = 20 # nominal value is 300
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
print("NBI notification installation feature failed...")
test_utils.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDR-C1", 'xpdrc')
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportOlmTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(1)
def test_01_xpdrA_device_connected(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_xpdrC_device_connected(self):
- response = test_utils.mount_device("XPDR-C1", 'xpdrc')
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_rdmA_device_connected(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_rdmC_device_connected(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_to_roadmA(self):
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
processes = None
WAITING = 20 # nominal value is 300
+ NODE_VERSION = '2.2.1'
cr_serv_sample_data = {"input": {
"sdnc-request-header": {
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(
- ['spdra', 'roadma', 'roadmc', 'spdrc'])
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(5)
def test_01_connect_spdrA(self):
- response = test_utils.mount_device("SPDR-SA1", 'spdra')
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_spdrC(self):
- response = test_utils.mount_device("SPDR-SC1", 'spdrc')
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
'lcp-hash-val': 'Swfw02qXGyI=',
'port-admin-state': 'InService',
'port-oper-state': 'InService'}
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['spdra'])
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", 'spdra')
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['spdra', 'spdrc'])
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", 'spdra')
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
'connected')
def test_02_connect_SPDR_SC1(self):
- response = test_utils.mount_device("SPDR-SC1", 'spdrc')
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
import time
import logging
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['spdra'])
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", 'spdra')
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEPortMappingTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(10)
def test_01_rdm_device_connection(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_rdm_device_connected(self):
res['mapping'])
def test_08_xpdr_device_connection(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_09_xpdr_device_connected(self):
import unittest
#from unittest.result import failfast
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCERendererTesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
print("all processes killed")
def test_01_rdm_device_connected(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_xpdr_device_connected(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_rdm_portmapping(self):
import sys
import time
import unittest
-
import requests
-
-from common import test_utils
+sys.path.append('transportpce_tests/common/')
+import test_utils
CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
processes = None
WAITING = 20
+ NODE_VERSION = '2.2.1'
cr_serv_sample_data = {"input": {
"sdnc-request-header": {
"request-id": "request-1",
print("tapi installation feature failed...")
test_utils.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmb', 'roadmc', 'xpdrc', 'spdra', 'spdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmb', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION),
+ ('spdra', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
def test_03_connect_rdmb(self):
- response = test_utils.mount_device("ROADM-B1", 'roadmb')
+ response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
time.sleep(5)
def test_06_connect_xpdra(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
self.test_04_check_tapi_topos()
def test_08_connect_rdma(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
def test_09_connect_rdmc(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
'Topology should contain 1 oms link')
def test_15_connect_xpdrc(self):
- response = test_utils.mount_device("XPDR-C1", 'xpdrc')
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
'Topology should contain 2 oms links')
def test_20_connect_spdr_sa1(self):
- response = test_utils.mount_device("SPDR-SA1", 'spdra')
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
# TODO replace connect and disconnect timers with test_utils.wait_until_log_contains
def test_21_connect_spdr_sc1(self):
- response = test_utils.mount_device("SPDR-SC1", 'spdrc')
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
# TODO replace connect and disconnect timers with test_utils.wait_until_log_contains
import unittest
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
processes = None
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# Connect the ROADMA
def test_01_connect_rdm(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# Verify the termination points of the ROADMA
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
# #Verify the termination points related to XPDR
import time
import logging
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEtesting(unittest.TestCase):
'org-openroadm-common-network:operational-state': 'inService'})]
}
}
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmb', 'roadmc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION),
+ ('roadmb', cls.NODE_VERSION), ('roadmc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
time.sleep(5)
def test_01_connect_ROADM_A1(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_getClliNetwork(self):
self.assertEqual(len(listNode), 0)
def test_06_connect_XPDRA(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_07_getClliNetwork(self):
self.assertEqual(len(check_list[link_type]), 0)
def test_13_connect_ROADMC(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_14_omsAttributes_ROADMA_ROADMC(self):
self.assertEqual(len(listNode), 0)
def test_21_connect_ROADMB(self):
- response = test_utils.mount_device("ROADM-B1", 'roadmb')
+ response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_22_omsAttributes_ROADMA_ROADMB(self):
+++ /dev/null
-../common/
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env python
-##############################################################################
-# Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import os
-
-SIMS = {}
-
-HONEYNODE_EXECUTABLE = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "..", "..", "honeynode", "7.1", "honeynode-simulator", "honeycomb-tpce")
-SAMPLES_DIRECTORY = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "..", "..", "sample_configs", "openroadm", "7.1")
import sys
import time
import requests
-from common import test_utils
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCE400Gtesting(unittest.TestCase):
+++ /dev/null
-../simulators.py
\ No newline at end of file
--- /dev/null
+#!/usr/bin/env python
+##############################################################################
+# Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+
+SIMS = {
+ ('xpdra', '1.2.1'): {'port': '17830', 'configfile': 'oper-XPDRA.xml', 'logfile': 'xpdra-121.log'},
+ ('roadma', '1.2.1'): {'port': '17831', 'configfile': 'oper-ROADMA.xml', 'logfile': 'roadma-121.log'},
+ ('roadmb', '1.2.1'): {'port': '17832', 'configfile': 'oper-ROADMB.xml', 'logfile': 'roadmb-121.log'},
+ ('roadmc', '1.2.1'): {'port': '17833', 'configfile': 'oper-ROADMC.xml', 'logfile': 'roadmc-121.log'},
+ ('xpdrc', '1.2.1'): {'port': '17834', 'configfile': 'oper-XPDRC.xml', 'logfile': 'xpdrc-121.log'},
+ ('roadma-full', '1.2.1'): {'port': '17821', 'configfile': 'oper-ROADMA-full.xml', 'logfile': 'roadma-121.log'},
+ ('roadmc-full', '1.2.1'): {'port': '17823', 'configfile': 'oper-ROADMC-full.xml', 'logfile': 'roadmc-121.log'},
+ ('xpdra', '2.2.1'): {'port': '17840', 'configfile': 'oper-XPDRA.xml', 'logfile': 'xpdra-221.log'},
+ ('roadma', '2.2.1'): {'port': '17841', 'configfile': 'oper-ROADMA.xml', 'logfile': 'roadma-221.log'},
+ ('roadmb', '2.2.1'): {'port': '17842', 'configfile': 'oper-ROADMB.xml', 'logfile': 'roadmb-221.log'},
+ ('roadmc', '2.2.1'): {'port': '17843', 'configfile': 'oper-ROADMC.xml', 'logfile': 'roadmc-221.log'},
+ ('roadmd', '2.2.1'): {'port': '17847', 'configfile': 'oper-ROADMD.xml', 'logfile': 'roadmd-221.log'},
+ ('xpdrc', '2.2.1'): {'port': '17844', 'configfile': 'oper-XPDRC.xml', 'logfile': 'xpdrc-221.log'},
+ ('spdra', '2.2.1'): {'port': '17845', 'configfile': 'oper-SPDRA.xml', 'logfile': 'spdra-221.log'},
+ ('spdrc', '2.2.1'): {'port': '17846', 'configfile': 'oper-SPDRC.xml', 'logfile': 'spdrc-221.log'},
+ ('xpdra', '7.1'): {'port': '17850', 'configfile': 'oper-XPDRA.xml', 'logfile': 'xpdra-71.log'},
+ ('roadma', '7.1'): {'port': '17851', 'configfile': 'oper-ROADMA.xml', 'logfile': 'roadma-71.log'},
+ ('roadmb', '7.1'): {'port': '17852', 'configfile': 'oper-ROADMB.xml', 'logfile': 'roadmb-71.log'},
+ ('roadmc', '7.1'): {'port': '17853', 'configfile': 'oper-ROADMC.xml', 'logfile': 'roadmc-71.log'},
+ ('xpdrc', '7.1'): {'port': '17854', 'configfile': 'oper-XPDRC.xml', 'logfile': 'xpdrc-71.log'},
+ ('xpdra2', '7.1'): {'port': '17857', 'configfile': 'oper-XPDRA2.xml', 'logfile': 'xpdra2-71.log'},
+ ('xpdrc2', '7.1'): {'port': '17858', 'configfile': 'oper-XPDRC2.xml', 'logfile': 'xpdrc2-71.log'}
+}
\ No newline at end of file
import simulators
SIMS = simulators.SIMS
-HONEYNODE_EXECUTABLE = simulators.HONEYNODE_EXECUTABLE
-SAMPLES_DIRECTORY = simulators.SAMPLES_DIRECTORY
HONEYNODE_OK_START_MSG = "Netconf SSH endpoint started successfully at 0.0.0.0"
KARAF_OK_START_MSG = re.escape(
CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
-LOG_DIRECTORY = os.path.dirname(os.path.realpath(__file__))
-
+SIM_LOG_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), "log")
KARAF_LOG = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", "karaf", "target", "assembly", "data", "log", "karaf.log")
def start_sims(sims_list):
for sim in sims_list:
- print("starting simulator for " + sim + "...")
- log_file = os.path.join(LOG_DIRECTORY, SIMS[sim]['logfile'])
- process = start_honeynode(log_file, SIMS[sim]['port'], SIMS[sim]['configfile'])
- if wait_until_log_contains(log_file, HONEYNODE_OK_START_MSG, 200):
- print("simulator for " + sim + " started")
+ print("starting simulator " + sim[0] + " in OpenROADM device version " + sim[1] + "...")
+ log_file = os.path.join(SIM_LOG_DIRECTORY, SIMS[sim]['logfile'])
+ process = start_honeynode(log_file, sim)
+ if wait_until_log_contains(log_file, HONEYNODE_OK_START_MSG, 100):
+ print("simulator for " + sim[0] + " started")
else:
- print("simulator for " + sim + " failed to start")
+ print("simulator for " + sim[0] + " failed to start")
shutdown_process(process)
for pid in process_list:
shutdown_process(pid)
process.send_signal(signal.SIGINT)
-def start_honeynode(log_file: str, node_port: str, node_config_file_name: str):
- if os.path.isfile(HONEYNODE_EXECUTABLE):
+def start_honeynode(log_file: str, sim):
+ executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "honeynode", sim[1], "honeynode-simulator", "honeycomb-tpce")
+ sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "sample_configs", "openroadm", sim[1])
+ if os.path.isfile(executable):
with open(log_file, 'w') as outfile:
return subprocess.Popen(
- [HONEYNODE_EXECUTABLE, node_port, os.path.join(SAMPLES_DIRECTORY, node_config_file_name)],
+ [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
stdout=outfile, stderr=outfile)
return None
def __exit__(self, type, value, traceback):
# pylint: disable=W0622
signal.alarm(0)
-