self.assertEqual(response.status_code, requests.codes.ok)
def test_06_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADMA01", "DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
def test_07_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADMA01", "SRG1-PP7-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "SRG1-PP7-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
def test_08_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_roadm_connections_request("ROADMA01", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "ROADMA01", "roadm-connections", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-713:720'}, response['roadm-connections'][0]['destination'])
def test_09_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-NETWORK1-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
'modulation-format': 'dp-qpsk'}])
def test_10_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-NETWORK1-OTU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-OTU")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_11_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-NETWORK1-ODU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-ODU")
self.assertEqual(response['status_code'], requests.codes.ok)
# the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_12_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-CLIENT1-ETHERNET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-CLIENT1-ETHERNET")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
def test_13_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_circuit_packs_request("XPDRA01", "1%2F0%2F1-PLUG-NET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "circuit-packs", "1%2F0%2F1-PLUG-NET")
# FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('not-reserved-inuse', response['circuit-packs'][0]["equipment-state"])
{'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
def test_15_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADMA01", "DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_16_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADMA01", "SRG1-PP7-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "SRG1-PP7-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_17_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_roadm_connections_request("ROADMA01", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "ROADMA01", "roadm-connections", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_18_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-NETWORK1-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-NETWORK1-OTU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-OTU")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_20_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-NETWORK1-ODU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-ODU")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDRA01", "XPDR1-CLIENT1-ETHERNET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-CLIENT1-ETHERNET")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_22_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_circuit_packs_request("XPDRA01", "1%2F0%2F1-PLUG-NET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "circuit-packs", "1%2F0%2F1-PLUG-NET")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
# Get Degree MC interface and check
def test_08_degree_mc_interface(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-D1", "DEG1-TTP-TXRX-mc-749:763")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-mc-749:763")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(
dict({"name": "DEG1-TTP-TXRX-mc-749:763",
# get DEG-NMC interface and check
def test_09_degree_nmc_interface(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-D1", "DEG1-TTP-TXRX-nmc-749:763")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-nmc-749:763")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(
dict({"name": "DEG1-TTP-TXRX-nmc-749:763",
# get SRG-NMC interface
def test_10_srg_nmc_interface(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-D1", "SRG1-PP1-TXRX-nmc-749:763")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "SRG1-PP1-TXRX-nmc-749:763")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
dict({"name": "SRG1-PP1-TXRX-nmc-749:763",
# Create ROADM-connection
def test_11_roadm_connection(self):
- response = test_utils_rfc8040.check_roadm_connections_request("ROADM-D1", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "ROADM-D1", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual("SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763",
response['roadm-connections'][0]['connection-name'])
# delete ROADM connection
def test_12_delete_roadm_connection(self):
- response = test_utils_rfc8040.del_roadm_connections_request("ROADM-D1", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
+ response = test_utils_rfc8040.del_node_attribute_request(
+ "ROADM-D1", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
time.sleep(3)
# Delete NMC SRG interface
def test_13_delete_srg_interface(self):
- response = test_utils_rfc8040.del_interface_request("ROADM-D1", "SRG1-PP1-TXRX-nmc-749:763")
+ response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "SRG1-PP1-TXRX-nmc-749:763")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
time.sleep(3)
# Delete NMC Degree interface
def test_14_delete_degree_interface(self):
- response = test_utils_rfc8040.del_interface_request("ROADM-D1", "DEG1-TTP-TXRX-nmc-749:763")
+ response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-nmc-749:763")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
time.sleep(3)
# Delete MC Degree interface
def test_15_delete_degree_interface(self):
- response = test_utils_rfc8040.del_interface_request("ROADM-D1", "DEG1-TTP-TXRX-mc-749:763")
+ response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-mc-749:763")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
def test_06_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-nmc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
{'frequency': 195.8, 'width': 40}])
def test_07_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-mc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
{'min-freq': 195.775, 'max-freq': 195.825}])
def test_08_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-nmc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-nmc-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
# -mc supporting interfaces must not be created for SRG, only degrees
def test_09_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-mc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-mc-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_10_service_path_create_rdm_check(self):
- response = test_utils_rfc8040.check_roadm_connections_request("ROADM-A1", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "ROADM-A1", "roadm-connections", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'}, response['roadm-connections'][0]['destination'])
def test_11_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-713:720")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
'modulation-format': 'dp-qpsk', 'frequency': 195.8}])
def test_12_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-OTU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-OTU")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_13_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-ODU4")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-ODU4")
self.assertEqual(response['status_code'], requests.codes.ok)
# the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_14_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-CLIENT1-ETHERNET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-CLIENT1-ETHERNET")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
def test_15_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-NET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-NET")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
# FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
def test_16_service_path_create_xpdr_check(self):
- response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-CLIENT")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-CLIENT")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
# FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
{'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
def test_18_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-mc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-nmc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_20_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-mc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-mc-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-nmc-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-nmc-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_22_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "ROADM-A1", "interface", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_23_service_path_delete_rdm_check(self):
- response = test_utils_rfc8040.check_roadm_connections_request("ROADM-A1", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "ROADM-A1", "roadm-connections", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_24_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-713:720")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-713:720")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-OTU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-OTU")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_26_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-ODU")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-ODU")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_27_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-CLIENT1-ETHERNET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-CLIENT1-ETHERNET")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_28_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-NET")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-NET")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
def test_29_service_path_delete_xpdr_check(self):
- response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-CLIENT")
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-CLIENT")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
'connection-status': connection_status}
-def check_interface_request(node: str, interface: str):
+def check_node_attribute_request(node: str, attribute: str, attribute_value: str):
# pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface={}', # nopep8
- 'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/{}'} # nopep8
- response = get_request(url[RESTCONF_VERSION].format('{}', node, interface))
+ url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}', # nopep8
+ 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'} # nopep8
+ response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
res = response.json()
- return_key = {'rfc8040': 'org-openroadm-device:interface',
- 'draft-bierman02': 'interface'}
+ return_key = {'rfc8040': 'org-openroadm-device:' + attribute,
+ 'draft-bierman02': attribute}
if return_key[RESTCONF_VERSION] in res.keys():
- interface = res[return_key[RESTCONF_VERSION]]
+ response_attribute = res[return_key[RESTCONF_VERSION]]
else:
- interface = res['errors']['error'][0]
+ response_attribute = res['errors']['error'][0]
return {'status_code': response.status_code,
- 'interface': interface}
+ attribute: response_attribute}
-def del_interface_request(node: str, interface: str):
+def del_node_attribute_request(node: str, attribute: str, attribute_value: str):
# pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface={}', # nopep8
- 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/{}'} # nopep8
- # draft-bierman02: note this is config here and not operational as previously in check_interface_request
- response = delete_request(url[RESTCONF_VERSION].format('{}', node, interface))
- return response
-
-
-def check_roadm_connections_request(node: str, connections: str):
- # pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/roadm-connections={}', # nopep8
- 'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/roadm-connections/{}'} # nopep8
- response = get_request(url[RESTCONF_VERSION].format('{}', node, connections))
- res = response.json()
- return_key = {'rfc8040': 'org-openroadm-device:roadm-connections',
- 'draft-bierman02': 'roadm-connections'}
- if return_key[RESTCONF_VERSION] in res.keys():
- roadm_connections = res[return_key[RESTCONF_VERSION]]
- else:
- roadm_connections = res['errors']['error'][0]
- return {'status_code': response.status_code,
- 'roadm-connections': roadm_connections}
-
-
-def del_roadm_connections_request(node: str, connections: str):
- # pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/roadm-connections={}', # nopep8
- 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/roadm-connections/{}'} # nopep8
- # draft-bierman02: note this is config here and not operational as previously in check_interface_request
- response = delete_request(url[RESTCONF_VERSION].format('{}', node, connections))
- return response
-
-
-def check_circuit_packs_request(node: str, circuit_packs: str):
- # pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/circuit-packs={}', # nopep8
- 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/circuit-packs/{}'} # nopep8
- # draft-bierman02: note this is config here and not operational as previously in check_interface_request
- # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
- response = get_request(url[RESTCONF_VERSION].format('{}', node, circuit_packs))
- res = response.json()
- return_key = {'rfc8040': 'org-openroadm-device:circuit-packs',
- 'draft-bierman02': 'circuit-packs'}
- if return_key[RESTCONF_VERSION] in res.keys():
- cp = res[return_key[RESTCONF_VERSION]]
- else:
- cp = res['errors']['error'][0]
- return {'status_code': response.status_code,
- 'circuit-packs': cp}
-
-
-def del_circuit_packs_request(node: str, circuit_packs: str):
- # pylint: disable=line-too-long
- url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/circuit-packs={}', # nopep8
- 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/circuit-packs/{}'} # nopep8
- response = delete_request(url[RESTCONF_VERSION].format('{}', node, circuit_packs))
+ url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}', # nopep8
+ 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'} # nopep8
+ response = delete_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
return response
#