import subprocess
import time
import unittest
+import test_utils
class TransportPCEFulltesting(unittest.TestCase):
#START_IGNORE_XTESTING
@classmethod
- def __start_honeynode1(cls):
- executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode1.log', 'w') as outfile:
- cls.honeynode_process1 = subprocess.Popen(
- [executable, "17840", "sample_configs/openroadm/2.2.1/oper-XPDRA.xml"],
- stdout=outfile)
-
- @classmethod
- def __start_honeynode2(cls):
- executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode2.log', 'w') as outfile:
- cls.honeynode_process2 = subprocess.Popen(
- [executable, "17841", "sample_configs/openroadm/2.2.1/oper-ROADMA.xml"],
- stdout=outfile)
+ def setUpClass(cls):
+ print ("starting honeynode1...")
+ cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
+ time.sleep(20)
- @classmethod
- def __start_honeynode3(cls):
- executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode3.log', 'w') as outfile:
- cls.honeynode_process3 = subprocess.Popen(
- [executable, "17843", "sample_configs/openroadm/2.2.1/oper-ROADMC.xml"],
- stdout=outfile)
+ print ("starting honeynode2...")
+ cls.honeynode_process2 = test_utils.start_roadma_honeynode()
+ time.sleep(20)
- @classmethod
- def __start_honeynode4(cls):
- executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode4.log', 'w') as outfile:
- cls.honeynode_process4 = subprocess.Popen(
- [executable, "17844", "sample_configs/openroadm/2.2.1/oper-XPDRC.xml"],
- stdout=outfile)
+ print ("starting honeynode3...")
+ cls.honeynode_process3 = test_utils.start_roadmc_honeynode()
+ time.sleep(20)
- @classmethod
- def __start_odl(cls):
- executable = "../karaf/target/assembly/bin/karaf"
- with open('odl.log', 'w') as outfile:
- cls.odl_process = subprocess.Popen(
- ["bash", executable, "server"], stdout=outfile,
- stdin=open(os.devnull))
+ print ("starting honeynode4...")
+ cls.honeynode_process4 = test_utils.start_xpdrc_honeynode()
+ time.sleep(20)
+ print ("all honeynodes started")
- @classmethod
- def setUpClass(cls):
- print ("starting honeynode1")
- cls.__start_honeynode1()
- time.sleep(40)
- print ("starting honeynode2")
- cls.__start_honeynode2()
- time.sleep(40)
- print ("starting honeynode3")
- cls.__start_honeynode3()
- time.sleep(40)
- print ("starting honeynode4")
- cls.__start_honeynode4()
- time.sleep(40)
- print ("starting opendaylight")
- cls.__start_odl()
+ print ("starting opendaylight...")
+ cls.odl_process = test_utils.start_tpce()
time.sleep(80)
+ print ("opendaylight started")
@classmethod
def tearDownClass(cls):
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
time.sleep(2)
-
def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
data = {
self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
time.sleep(2)
-
def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
data = {
auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.created)
-
#test service-create for Eth service from xpdr to xpdr
def test_11_create_eth_service1(self):
url = ("{}/operations/org-openroadm-service:service-create"
u'width': 40},
ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
- self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
+ self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
if ele['tp-id'] == 'XPDR1-NETWORK2':
- self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
+ self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
time.sleep(3)
def test_16_check_topo_ROADMA_SRG1(self):
u'width': 40},
ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
- self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
+ self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
time.sleep(10)
def test_26_check_topo_ROADMA_SRG1(self):
self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
time.sleep(10)
-
def test_27_check_topo_ROADMA_DEG2(self):
url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
.format(self.restconf_baseurl))
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
- if ele[u'org-openroadm-network-topology:tp-type'] == 'XPONDER-CLIENT':
- self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
- elif ele[u'org-openroadm-network-topology:tp-type'] == 'XPONDER-NETWORK':
- self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
+ if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
+ self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
+ elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
+ self.assertIn(u'tail-equipment-id', dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
+ self.assertNotIn('wavelength', dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
time.sleep(10)
def test_34_check_topo_ROADMA_SRG1(self):
self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
time.sleep(10)
-
# test service-create for Optical Channel (OC) service from srg-pp to srg-pp
def test_36_create_oc_service1(self):
url = ("{}/operations/org-openroadm-service:service-create"
self.test_27_check_topo_ROADMA_DEG2()
time.sleep(3)
-
def test_44_delete_oc_service1(self):
url = ("{}/operations/org-openroadm-service:service-delete"
.format(self.restconf_baseurl))
self.test_34_check_topo_ROADMA_SRG1()
self.test_35_check_topo_ROADMA_DEG2()
- @unittest.expectedFailure
def test_49_loop_create_eth_service(self):
for i in range(1,6):
print ("trial number {}".format(i))
print ("eth service deletion\n")
self.test_30_delete_eth_service1()
- @unittest.expectedFailure
def test_50_loop_create_oc_service(self):
url = ("{}/operational/org-openroadm-service:service-list/services/service1"
.format(self.restconf_baseurl))
print ("oc service deletion\n")
self.test_44_delete_oc_service1()
-
def test_51_disconnect_XPDRA(self):
url = ("{}/config/network-topology:"
"network-topology/topology/topology-netconf/node/XPDR-A1"