-class TransportPCEFulltesting(unittest.TestCase):
-
- odl_process = None
- honeynode_process1 = None
- honeynode_process2 = None
- honeynode_process3 = None
- honeynode_process4 = None
- restconf_baseurl = "http://localhost:8181/restconf"
-
-#START_IGNORE_XTESTING
-
- @classmethod
- def __start_honeynode1(cls):
- executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode1.log', 'w') as outfile:
- cls.honeynode_process1 = subprocess.Popen(
- [executable, "17840", "sample_configs/openroadm/2.1/oper-ROADMA-full.xml"],
- stdout=outfile)
-
- @classmethod
- def __start_honeynode2(cls):
- executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode2.log', 'w') as outfile:
- cls.honeynode_process2 = subprocess.Popen(
- [executable, "17831", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
- stdout=outfile)
-
- @classmethod
- def __start_honeynode3(cls):
- executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode3.log', 'w') as outfile:
- cls.honeynode_process3 = subprocess.Popen(
- [executable, "17843", "sample_configs/openroadm/2.1/oper-ROADMC-full.xml"],
- stdout=outfile)
-
- @classmethod
- def __start_honeynode4(cls):
- executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
- "/honeynode-distribution-1.18.01/honeycomb-tpce")
- if os.path.isfile(executable):
- with open('honeynode4.log', 'w') as outfile:
- cls.honeynode_process4 = subprocess.Popen(
- [executable, "17834", "sample_configs/openroadm/2.1/oper-XPDRC.xml"],
- stdout=outfile)
-
- @classmethod
- def __start_odl(cls):
- executable = "../karaf/target/assembly/bin/karaf"
- with open('odl.log', 'w') as outfile:
- cls.odl_process = subprocess.Popen(
- ["bash", executable, "server"], stdout=outfile,
- stdin=open(os.devnull))
-
- @classmethod
- def setUpClass(cls):
- print ("starting honeynode1")
- cls.__start_honeynode1()
- time.sleep(40)
- print ("starting honeynode2")
- cls.__start_honeynode2()
- time.sleep(40)
- print ("starting honeynode3")
- cls.__start_honeynode3()
- time.sleep(40)
- print ("starting honeynode4")
- cls.__start_honeynode4()
- time.sleep(40)
- print ("starting opendaylight")
- cls.__start_odl()
- time.sleep(80)
-
- @classmethod
- def tearDownClass(cls):
- for child in psutil.Process(cls.odl_process.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.odl_process.send_signal(signal.SIGINT)
- cls.odl_process.wait()
- for child in psutil.Process(cls.honeynode_process1.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process1.send_signal(signal.SIGINT)
- cls.honeynode_process1.wait()
- for child in psutil.Process(cls.honeynode_process2.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process2.send_signal(signal.SIGINT)
- cls.honeynode_process2.wait()
- for child in psutil.Process(cls.honeynode_process3.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process3.send_signal(signal.SIGINT)
- cls.honeynode_process3.wait()
- for child in psutil.Process(cls.honeynode_process4.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process4.send_signal(signal.SIGINT)
- cls.honeynode_process4.wait()
- print ("all processes killed")
-
- def setUp(self): # instruction executed before each test method
- print ("execution of {}".format(self.id().split(".")[-1]))
-
-#END_IGNORE_XTESTING
-
-# connect netconf devices
- def test_01_connect_xpdrA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRA",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17831",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
-
- def test_02_connect_xpdrC(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRC"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRC",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17834",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
-
- def test_03_connect_rdmA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17840",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
-
- def test_04_connect_rdmC(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMC"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMC",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17843",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
-
- def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADMA",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_06_connect_xprdA_N2_to_roadmA_PP2(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADMA",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_07_connect_roadmA_PP1_to_xpdrA_N1(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADMA",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_08_connect_roadmA_PP2_to_xpdrA_N2(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADMA",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_09_connect_xprdC_N1_to_roadmC_PP1(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRC",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADMC",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)