class TransportOlmTesting(unittest.TestCase):
processes = None
- restconf_baseurl = "http://localhost:8181/restconf"
@classmethod
def setUpClass(cls):
time.sleep(1)
def test_01_xpdrA_device_connected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDRA01", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_xpdrC_device_connected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRC01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDRC01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("XPDRC01", 'xpdrc')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_rdmA_device_connected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMA01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['roadma-full']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADMA01", 'roadma-full')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_rdmC_device_connected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMC01"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADMC01",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": test_utils.sims['roadmc-full']['port'],
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADMC01", 'roadmc-full')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_to_roadmA(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
def test_06_connect_roadmA_to_xpdrA(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_07_connect_xprdC_to_roadmC(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
def test_08_connect_roadmC_to_xpdrC(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_09_create_OTS_ROADMA(self):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"node-id": "ROADMA01",
"logical-connection-point": "DEG1-TTP-TXRX"
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA01',
res["output"]["result"])
def test_10_create_OTS_ROADMC(self):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"node-id": "ROADMC01",
"logical-connection-point": "DEG2-TTP-TXRX"
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC01',
res["output"]["result"])
def test_11_get_PM_ROADMA(self):
- url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:get-pm".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"node-id": "ROADMA01",
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn({
}, res["output"]["measurements"])
def test_12_get_PM_ROADMC(self):
- url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:get-pm".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"node-id": "ROADMC01",
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn({
}, res["output"]["measurements"])
def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
- url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"src-type": "link",
"link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Success',
time.sleep(5)
def test_14_calculate_span_loss_base_all(self):
- url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"src-type": "all"
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Success',
def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
- "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "org-openroadm-optical-transport-interfaces:ots".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
"node/ROADMC01/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
- "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "org-openroadm-optical-transport-interfaces:ots".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
def test_17_servicePath_create_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
time.sleep(10)
def test_18_servicePath_create_ZToA(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
time.sleep(10)
def test_19_service_power_setup_XPDRA_XPDRC(self):
- url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:service-power-setup".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Success', res["output"]["result"])
def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
- "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
def test_21_get_roadmconnection_ROADMA(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/roadm-connections/"
- "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
def test_22_get_roadmconnection_ROADMC(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/roadm-connections/"
- "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
def test_23_service_power_setup_XPDRC_XPDRA(self):
- url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:service-power-setup".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Success', res["output"]["result"])
def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
- "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
def test_25_get_roadmconnection_ROADMC(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/roadm-connections/"
- "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
def test_26_service_power_turndown_XPDRA_XPDRC(self):
- url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-olm:service-power-turndown".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Success', res["output"]["result"])
def test_27_get_roadmconnection_ROADMA(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/roadm-connections/"
- "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
def test_28_get_roadmconnection_ROADMC(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/roadm-connections/"
- "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
def test_29_servicePath_delete_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
time.sleep(10)
def test_30_servicePath_delete_ZToA(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
"""to test case where SRG where the xpdr is connected to has no optical range data"""
def test_31_connect_xprdA_to_roadmA(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
def test_32_connect_roadmA_to_xpdrA(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
data = {
"networkutils:input": {
"networkutils:links-input": {
}
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_33_servicePath_create_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test2",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA01/yang-ext:mount/"
"org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
- "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
+ "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
def test_35_servicePath_delete_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
+ url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
data = {
"input": {
"service-name": "test",
]
}
}
- headers = {'content-type': 'application/json'}
response = requests.request(
"POST", url, data=json.dumps(data),
- headers=headers, auth=('admin', 'admin'))
+ headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
time.sleep(10)
def test_36_xpdrA_device_disconnected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("XPDRA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_37_xpdrC_device_disconnected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDRC01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("XPDRC01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_38_calculate_span_loss_current(self):
- url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
- headers = {'content-type': 'application/json'}
+ url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(test_utils.RESTCONF_BASE_URL)
response = requests.request(
- "POST", url, headers=headers, auth=('admin', 'admin'))
+ "POST", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Success',
time.sleep(5)
def test_39_rdmA_device_disconnected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMA01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("ROADMA01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_40_rdmC_device_disconnected(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADMC01"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
- time.sleep(10)
+ response = test_utils.unmount_device("ROADMC01")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
if __name__ == "__main__":