import json
-import signal
import time
import unittest
-import psutil
import requests
-import test_utils
+from common import test_utils
class TransportPCEFulltesting(unittest.TestCase):
- headers = {'content-type': 'application/json'}
- odl_process = None
- honeynode_process1 = None
- honeynode_process2 = None
- honeynode_process3 = None
- honeynode_process4 = None
- restconf_baseurl = "http://localhost:8181/restconf"
+
WAITING = 20 # nominal value is 300
- # START_IGNORE_XTESTING
+ processes = None
@classmethod
def setUpClass(cls):
- print("starting honeynode1...")
- cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
- time.sleep(20)
-
- print("starting honeynode2...")
- cls.honeynode_process2 = test_utils.start_roadma_full_honeynode()
- time.sleep(20)
-
- print("starting honeynode3...")
- cls.honeynode_process3 = test_utils.start_roadmc_full_honeynode()
- time.sleep(20)
-
- print("starting honeynode4...")
- cls.honeynode_process4 = test_utils.start_xpdrc_honeynode()
- time.sleep(20)
- print("all honeynodes started")
-
- print("starting opendaylight...")
- cls.odl_process = test_utils.start_tpce()
- time.sleep(80)
- print("opendaylight started")
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
@classmethod
def tearDownClass(cls):
- for child in psutil.Process(cls.odl_process.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.odl_process.send_signal(signal.SIGINT)
- cls.odl_process.wait()
- for child in psutil.Process(cls.honeynode_process1.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process1.send_signal(signal.SIGINT)
- cls.honeynode_process1.wait()
- for child in psutil.Process(cls.honeynode_process2.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process2.send_signal(signal.SIGINT)
- cls.honeynode_process2.wait()
- for child in psutil.Process(cls.honeynode_process3.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process3.send_signal(signal.SIGINT)
- cls.honeynode_process3.wait()
- for child in psutil.Process(cls.honeynode_process4.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process4.send_signal(signal.SIGINT)
- cls.honeynode_process4.wait()
+ for process in cls.processes:
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self): # instruction executed before each test method
print("execution of {}".format(self.id().split(".")[-1]))
- # END_IGNORE_XTESTING
-
# connect netconf devices
def test_01_connect_xpdrA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17830",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDRA01", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRC01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRC01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17834",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDRC01", 'xpdrc')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17821",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADMA01", 'roadma-full')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMC01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMC01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17823",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADMC01", 'roadmc-full')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADMA01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
+ "ROADMA01", "1", "SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully',
time.sleep(2)
def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
- format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
+ response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
+ "ROADMA01", "1", "SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully',
time.sleep(2)
def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRC01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADMC01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
+ "ROADMC01", "1", "SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully',
time.sleep(2)
def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRC01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADMC01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
+ "ROADMC01", "1", "SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully',
"link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/"
"org-openroadm-network-topology:"
"OMS-attributes/span"
- .format(self.restconf_baseurl))
+ )
data = {"span": {
"clfi": "fiber1",
"auto-spanloss": "true",
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.put_request(url, data)
self.assertEqual(response.status_code, requests.codes.created)
def test_10_add_omsAttributes_ROADMC_ROADMA(self):
"link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/"
"org-openroadm-network-topology:"
"OMS-attributes/span"
- .format(self.restconf_baseurl))
+ )
data = {"span": {
"clfi": "fiber1",
"auto-spanloss": "true",
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.put_request(url, data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for Eth service from xpdr to xpdr
def test_11_create_eth_service1(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-create"
data = {
"input": {
"sdnc-request-header": {
"operator-contact": "pw1234"
}
}
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_12_get_eth_service1(self):
- url = ("{}/operational/org-openroadm-service:service-list/services/"
- "service1".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ url = "{}/operational/org-openroadm-service:service-list/services/service1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(2)
def test_13_check_xc1_ROADMA(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMA01/yang-ext:"
- "mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(5)
def test_14_check_xc1_ROADMC(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
- "org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(5)
def test_15_check_topo_XPDRA(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA01-XPDR1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'XPDR1-NETWORK1':
self.assertEqual({u'frequency': 196.1, u'width': 40},
- ele[
- 'org-openroadm-network-topology:'
- 'xpdr-network-attributes'][
- 'wavelength'])
- if ele['tp-id'] == 'XPDR1-CLIENT1' or \
- ele['tp-id'] == 'XPDR1-CLIENT3':
+ ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
+ if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
self.assertNotIn(
'org-openroadm-network-topology:xpdr-client-attributes',
dict.keys(ele))
time.sleep(3)
def test_16_check_topo_ROADMA_SRG1(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/ROADMA01-SRG1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1},
time.sleep(3)
def test_17_check_topo_ROADMA_DEG1(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/ROADMA01-DEG1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-DEG1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1},
time.sleep(3)
def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADMA01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
+ "ROADMA01", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully',
time.sleep(2)
def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRA01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADMA01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
+ "ROADMA01", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully',
time.sleep(2)
def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRC01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADMC01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
+ "ROADMC01", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully',
time.sleep(2)
def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
- format(self.restconf_baseurl)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDRC01",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADMC01",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
+ "ROADMC01", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully',
time.sleep(2)
def test_22_create_eth_service2(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-create"
data = {
"input": {
"sdnc-request-header": {
"operator-contact": "pw1234"
}
}
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_23_get_eth_service2(self):
- url = ("{}/operational/org-openroadm-service:"
- "service-list/services/service2"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ url = "{}/operational/org-openroadm-service:service-list/services/service2"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(1)
def test_24_check_xc2_ROADMA(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMA01/yang-ext:"
- "mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
res['roadm-connections'][0]['destination'])
def test_25_check_topo_XPDRA(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA01-XPDR1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
time.sleep(10)
def test_26_check_topo_ROADMA_SRG1(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/ROADMA01-SRG1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1}, res['node'][0][
time.sleep(10)
def test_27_check_topo_ROADMA_DEG1(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/ROADMA01-DEG1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-DEG1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1}, res['node'][0][
# creation service test on a non-available resource
def test_28_create_eth_service3(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-create"
data = {
"input": {
"sdnc-request-header": {
"operator-contact": "pw1234"
}
}
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
# contains 2 elements
def test_29_delete_eth_service3(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Service \'service3\' does not exist in datastore',
time.sleep(20)
def test_30_delete_eth_service1(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_31_delete_eth_service2(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_32_check_no_xc_ROADMA(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMA01/yang-ext:"
- "mount/org-openroadm-device:org-openroadm-device/"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMA01", "")
res = response.json()
self.assertEqual(response.status_code, requests.codes.ok)
self.assertNotIn('roadm-connections',
time.sleep(2)
def test_33_check_topo_XPDRA(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA01-XPDR1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
time.sleep(10)
def test_34_check_topo_ROADMA_SRG1(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/ROADMA01-SRG1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn({u'index': 1}, res['node'][0][
time.sleep(10)
def test_35_check_topo_ROADMA_DEG1(self):
- url1 = (
- "{}/config/ietf-network:"
- "networks/network/openroadm-topology/node/ROADMA01-DEG1"
- .format(self.restconf_baseurl))
- response = requests.request(
- "GET", url1, auth=('admin', 'admin'))
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-DEG1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn({u'index': 1}, res['node'][0][
# test service-create for Optical Channel (OC) service from srg-pp to srg-pp
def test_36_create_oc_service1(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-create"
data = {
"input": {
"sdnc-request-header": {
"operator-contact": "pw1234"
}
}
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_37_get_oc_service1(self):
- url = ("{}/operational/org-openroadm-service:"
- "service-list/services/service1"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ url = "{}/operational/org-openroadm-service:service-list/services/service1"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(1)
def test_38_check_xc1_ROADMA(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMA01/yang-ext:"
- "mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(7)
def test_39_check_xc1_ROADMC(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
- "org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(7)
def test_40_create_oc_service2(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-create"
data = {
"input": {
"sdnc-request-header": {
"operator-contact": "pw1234"
}
}
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_41_get_oc_service2(self):
- url = ("{}/operational/org-openroadm-service:"
- "service-list/services/service2"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ url = "{}/operational/org-openroadm-service:service-list/services/service2"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(2)
def test_42_check_xc2_ROADMA(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf/"
- "node/ROADMA01/yang-ext:mount/org-openroadm-device:"
- "org-openroadm-device/"
- "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(3)
def test_44_delete_oc_service1(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_45_delete_oc_service2(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_46_get_no_oc_services(self):
print("start test")
- url = ("{}/operational/org-openroadm-service:service-list"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ url = "{}/operational/org-openroadm-service:service-list"
+ response = test_utils.get_request(url)
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
time.sleep(1)
def test_47_get_no_xc_ROADMA(self):
- url = (
- "{}/config/network-topology:"
- "network-topology/topology/topology-netconf"
- "/node/ROADMA01/yang-ext:mount/org-openroadm-device:"
- "org-openroadm-device/"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json',
- "Accept": "application/json"}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.check_netconf_node_request("ROADMA01", "")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
def test_49_loop_create_eth_service(self):
for i in range(1, 6):
- print("trial number {}".format(i))
+ print("iteration number {}".format(i))
print("eth service creation")
self.test_11_create_eth_service1()
print("check xc in ROADMA01")
self.test_30_delete_eth_service1()
def test_50_loop_create_oc_service(self):
- url = ("{}/operational/org-openroadm-service:"
- "service-list/services/service1"
- .format(self.restconf_baseurl))
- response = requests.request("GET", url, auth=('admin', 'admin'))
+ url = "{}/operational/org-openroadm-service:service-list/services/service1"
+ response = test_utils.get_request(url)
if response.status_code != 404:
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(self.restconf_baseurl))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- headers = {'content-type': 'application/json'}
- requests.request("POST", url, data=json.dumps(data),
- headers=headers,
- auth=('admin', 'admin')
- )
+ test_utils.post_request(url, data)
time.sleep(5)
for i in range(1, 6):
- print("trial number {}".format(i))
+ print("iteration number {}".format(i))
print("oc service creation")
self.test_36_create_oc_service1()
print("check xc in ROADMA01")
self.test_44_delete_oc_service1()
def test_51_disconnect_XPDRA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("XPDRA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_52_disconnect_XPDRC(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRC01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("XPDRC01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_53_disconnect_ROADMA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("ROADMA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_54_disconnect_ROADMC(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMC01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("ROADMC01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
if __name__ == "__main__":