SIMS = simulators.SIMS
HONEYNODE_OK_START_MSG = 'Netconf SSH endpoint started successfully at 0.0.0.0'
-KARAF_OK_START_MSG = re.escape(
- "Blueprint container for bundle org.opendaylight.netconf.restconf")+".* was successfully created"
+KARAF_OK_START_MSG = "Blueprint container for bundle org.opendaylight.netconf.restconf.* was successfully created"
LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
ODL_LOGIN = 'admin'
#
+def post_portmapping(payload: str):
+ url = {'rfc8040': '{}/data/transportpce-portmapping:network',
+ 'draft-bierman02': '{}/config/transportpce-portmapping:network'}
+ json_payload = json.loads(payload)
+ response = post_request(url[RESTCONF_VERSION].format('{}'), json_payload)
+ return {'status_code': response.status_code}
+
+
+def del_portmapping():
+ url = {'rfc8040': '{}/data/transportpce-portmapping:network',
+ 'draft-bierman02': '{}/config/transportpce-portmapping:network'}
+ response = delete_request(url[RESTCONF_VERSION].format('{}'))
+ return {'status_code': response.status_code}
+
+
def get_portmapping_node_attr(node: str, attr: str, value: str):
# pylint: disable=consider-using-f-string
url = {'rfc8040': '{}/data/transportpce-portmapping:network/nodes={}',
else:
format_args = ('{}', 'operational', network)
response = get_request(url[RESTCONF_VERSION].format(*format_args))
- res = response.json()
- return_key = {'rfc8040': 'ietf-network:network',
- 'draft-bierman02': 'network'}
- networks = res[return_key[RESTCONF_VERSION]]
+ if bool(response):
+ res = response.json()
+ return_key = {'rfc8040': 'ietf-network:network',
+ 'draft-bierman02': 'network'}
+ networks = res[return_key[RESTCONF_VERSION]]
+ else:
+ networks = None
return {'status_code': response.status_code,
'network': networks}
+def put_ietf_network(network: str, payload: str):
+ url = {'rfc8040': '{}/data/ietf-network:networks/network={}',
+ 'draft-bierman02': '{}/config/ietf-network:networks/network/{}'}
+ json_payload = json.loads(payload)
+ response = put_request(url[RESTCONF_VERSION].format('{}', network), json_payload)
+ return {'status_code': response.status_code}
+
+
+def del_ietf_network(network: str):
+ url = {'rfc8040': '{}/data/ietf-network:networks/network={}',
+ 'draft-bierman02': '{}/config/ietf-network:networks/network/{}'}
+ response = delete_request(url[RESTCONF_VERSION].format('{}', network))
+ return {'status_code': response.status_code}
+
+
def get_ietf_network_link_request(network: str, link: str, content: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
else:
format_args = ('{}', 'operational', network, node)
response = get_request(url[RESTCONF_VERSION].format(*format_args))
- res = response.json()
- return_key = {'rfc8040': 'ietf-network:node',
- 'draft-bierman02': 'node'}
- node = res[return_key[RESTCONF_VERSION]][0]
+ if bool(response):
+ res = response.json()
+ return_key = {'rfc8040': 'ietf-network:node',
+ 'draft-bierman02': 'node'}
+ node = res[return_key[RESTCONF_VERSION]][0]
+ else:
+ node = None
return {'status_code': response.status_code,
'node': node}