class TransportPCERendererTesting(unittest.TestCase):
processes = None
- restconf_baseurl = "http://localhost:8181/restconf"
@classmethod
def setUpClass(cls):
time.sleep(10)
def test_01_rdm_device_connected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['roadma']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADMA01", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_xpdr_device_connected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDRA01", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_rdm_portmapping(self):
url = ("{}/config/transportpce-portmapping:network/"
"nodes/ROADMA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn(
def test_04_xpdr_portmapping(self):
url = ("{}/config/transportpce-portmapping:network/"
"nodes/XPDRA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn(
res['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {"renderer:input": {
"renderer:service-name": "service_test",
"renderer:wave-number": "7",
{"renderer:node-id": "XPDRA01",
"renderer:src-tp": "XPDR1-CLIENT1",
"renderer:dest-tp": "XPDR1-NETWORK1"}]}}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes: ROADMA01', res["output"]["result"])
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/DEG1-TTP-TXRX-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/SRG1-PP7-TXRX-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-NETWORK1-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-NETWORK1-OTU"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-NETWORK1-ODU"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-CLIENT1-ETHERNET"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"circuit-packs/1%2F0%2F1-PLUG-NET"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
def test_14_service_path_delete(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {"renderer:input": {
"renderer:service-name": "service_test",
"renderer:wave-number": "7",
{"renderer:node-id": "XPDRA01",
"renderer:src-tp": "XPDR1-CLIENT1",
"renderer:dest-tp": "XPDR1-NETWORK1"}]}}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json(), {
'output': {'result': 'Request processed', 'success': True}})
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/DEG1-TTP-TXRX-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/SRG1-PP7-TXRX-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"roadm-connections/SRG1-PP7-TXRX-DEG1-TTP-TXRX-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-NETWORK1-7"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-NETWORK1-OTU"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-NETWORK1-ODU"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"interface/XPDR1-CLIENT1-ETHERNET"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/XPDRA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
"circuit-packs/1%2F0%2F1-PLUG-NET"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ .format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual('not-reserved-available', res["circuit-packs"][0]['equipment-state'])
def test_23_rdm_device_disconnected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(20)
+ response = test_utils.unmount_device("ROADMA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_24_xpdr_device_disconnected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(20)
+ response = test_utils.unmount_device("XPDRA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
if __name__ == "__main__":